17370845950

解决 Docker Compose 中 Kafka 消息发送失败问题

本文旨在解决在使用 Docker Compose 部署 Kafka 集群时,应用程序无法向 Kafka 主题发送消息的问题。我们将分析常见的配置错误,并提供修改建议,确保生产者能够正确连接到 Kafka Broker,从而成功发送消息。通过调整 Kafka 的监听器配置以及生产者端的 Broker 地址,可以有效解决此类连接问题。

问题分析

在使用 Docker Compose 部署 Kafka 集群时,生产者无法发送消息到 Kafka 主题,并出现类似 Topic general-events not present in metadata after 60000 ms 的错误,通常是由于以下原因导致:

  1. Kafka Broker 地址配置错误: 生产者配置的 Broker 地址与 Kafka 实际监听的地址不匹配。
  2. 网络配置问题: 容器之间的网络连接存在问题,导致生产者无法访问 Kafka Broker。
  3. Kafka 监听器配置不正确: Kafka 的监听器配置可能不允许从容器外部或特定的网络访问。

解决方案

针对上述问题,可以采取以下步骤进行排查和解决:

1. 检查 Kafka 监听器配置

Kafka 的 KAFKA_ADVERTISED_LISTENERS 环境变量至关重要,它决定了 Kafka Broker 如何向客户端公布自己的地址。确保该配置正确反映了客户端访问 Kafka 的方式。

在 docker-compose.yml 文件中,检查 Kafka 服务的 environment 部分:

  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    expose:
      - '29092'
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
      KAFKA_MIN_INSYNC_REPLICAS: '1'
  • PLAINTEXT://kafka:29092:允许容器内部的其他服务通过 kafka 这个 hostname 和 29092 端口访问 Kafka Broker。
  • PLAINTEXT_HOST://localhost:9092:允许宿主机通过 localhost 和 9092 端口访问 Kafka Broker。

注意事项:

  • 如果你的生产者运行在 Docker 容器之外,确保 PLAINTEXT_HOST 配置正确,并且宿主机能够访问 Kafka Broker。
  • 如果你的生产者运行在 Docker 容器内部,但与 Kafka Broker 不在同一个 Docker 网络中,你需要配置一个可以从生产者容器访问的地址。

2. 修改生产者 Broker 地址

生产者需要使用正确的 Broker 地址才能成功连接到 Kafka 集群。如果 Kafka Broker 的地址发生变化,或者生产者使用了错误的地址,就会导致连接失败。

在生产者代码中,检查 bootstrap.servers 配置项。确保它指向正确的 Kafka Broker 地址。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
    private static final String TOPIC_NAME = "general-events";
    private KafkaProducer kafkaProducer = null;
    private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER";

    public Producer() {
        LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());
        Properties kafkaProps = new Properties();

        // 使用环境变量或默认值配置 Kafka Broker 地址
        String defaultClusterValue = "kafka:29092"; // 修改为 kafka:29092,容器内部访问
        String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);
        LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);

        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");

        this.kafkaProducer = new KafkaProducer<>(kafkaProps);

    }
}

注意事项:

  • 如果生产者运行在 Docker 容器内部,应该使用 Kafka 容器的 hostname (kafka) 和内部端口 (29092)。
  • 如果生产者运行在 Docker 容器外部,应该使用 localhost 和映射到宿主机的端口 (9092)。

3. 检查容器网络

确保生产者容器和 Kafka 容器在同一个 Docker 网络中。可以使用 docker network inspect 命令来检查容器的网络配置。

如果生产者容器和 Kafka 容器不在同一个网络中,可以使用 docker network connect ainer_name> 命令将生产者容器连接到 Kafka 容器所在的网络。

4. 验证 Topic 创建

虽然 init-kafka 服务创建了 topic,但仍建议再次确认 topic 是否成功创建。可以在 Kafka 容器内部执行以下命令:

docker exec -it kafka bash
kafka-topics --bootstrap-server kafka:29092 --list

确认列表中包含 general-events topic。

总结

解决 Docker Compose 中 Kafka 消息发送失败的问题,关键在于确保 Kafka 的监听器配置正确,生产者使用正确的 Broker 地址,以及容器之间的网络连接正常。通过仔细检查这些配置,可以有效地解决此类连接问题,确保应用程序能够成功地向 Kafka 主题发送消息。