本教程详细阐述了如何在 apache flink 中使用 `kafkasource` 读取带键(keyed)的 kafka 记录。通过实现自定义的 `kafkarecorddeserializationschema`,用户可以灵活地访问 kafka `consumerrecord` 中的键、值、时间戳及其他元数据,从而构建更丰富的数据处理逻辑,克服了默认 `valueonly` 模式的局限性。
当从 Apache Kafka 消费数据时,生产者通常会为记录同时指定键(Key)和值(Value),尤其是在需要进行日志压缩、状态管理或基于键的路由等场景中。Apache Flink 的 KafkaSource 是一个强大的连接器,用于与 Kafka 进行集成。然而,默认的反序列化策略,例如 KafkaRecordDeserializationSchema.valueOnly(),仅提取记录的值,使得键和其他重要的元数据无法直接访问。为了在 Flink 中充分利用带键的 Kafka 记录,需要采用一种自定义的反序列化方法。
在 Flink 中读取带键的 Kafka 记录的核心在于实现一个自定义的 KafkaRecordDeserializationSchema。这个接口提供了一个 deserialize 方法,它接收一个 ConsumerRecord
以下步骤将指导您如何创建一个自定义的反序列化器,以从带键的 Kafka 记录中提取键、值和时间戳,并在 Flink DataStream 中进行处理。
首先,我们需要一个 Java 类来封装从 Kafka 记录中提取的键、值和时间戳。这个类通常被称为 POJO (Plain Old Java Object),并应遵循 Flink 的 POJO 规则(例如,所有字段都必须是 public 或有 getter/setter 方法,并且必须有一个无参构造函数)。
import java.io.Serializable;
public class KeyedKafkaRecord implements Serializable {
private String key;
private String value;
private long timestamp;
// 可根据需要添加其他元数据,例如 topic, partition, offset
public KeyedKafkaRecord() {} // Flink POJO 要求无参构造函数
public KeyedKafkaRecord(String key, String value, long timestamp) {
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
public String getKey() { return key; }
public void setKey(String key) { this.key = key; }
public String getValue() { return value; }
public void setValue(String value) { this.value = value; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return "KeyedKafkaRecord{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
", timestamp=" + timestamp +
'}';
}
}接下来,创建一个实现了 KafkaRecordDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; public class CustomKeyedKafkaDeserializationSchema implements KafkaRecordDeserializationSchema{ private transient StringDeserializer keyDeserializer; private transient StringDeserializer valueDeserializer; @Override public void open(KafkaRecordDeserializationSchema.InitializationContext context) throws Exception { // 在反序列化器初始化时创建 Kafka Deserializer 实例 keyDeserializer = new StringDeserializer(); valueDeserializer = new StringDeserializer(); // 如果需要配置,可以在这里进行,例如 keyDeserializer.configure(configs, true); } @Override public void deserialize(ConsumerRecord record, Collector out) throws IOException { // 使用 Kafka StringDeserializer 反序列化键和值 String key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()); String value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value()); long timestamp = record.timestamp(); // 获取记录的时间戳 // 将反序列化后的数据封装到自定义的 DTO 中 out.collect(new KeyedKafkaRecord(key, value, timestamp)); } @Override public TypeInformation getProducedTypeInfo() { // 返回反序列化器产生的数据类型信息 return TypeInformation.of(KeyedKafkaRecord.class); } }
注意: open 方法用于初始化反序列化器实例,确保它们在运行时可用。getProducedTypeInfo() 方法必须返回您自定义的 KeyedKafkaRecord 类的 TypeInformation。
最后,将自定义的 CustomKeyedKafkaDeserializationSchema 应用到 KafkaSource 的构建器中。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
public class FlinkKeyedKafkaConsumerJob {
public static void main(String[] args) throws Exception {
// 获取 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 集群地址
String topic = "test3"; // 替换为您的 Kafka 主题
String groupId = "my-flink-consumer-group"; // 消费者组 ID
// 构建 KafkaSource
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest()) // 从最早的偏移量开始消费
.setDeserializer(new CustomKeyedKafkaDeserializationSchema()) // 使用自定义的反序列化器
.build();
// 从 Kafka Source 创建数据流
DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");
// 对接收到的数据进行处理并打印
stream.map(record -> "Received Key: " + record.getKey() +
", Value: " + record.getValue() +
", Timestamp: " + record.getTimestamp())
.print();
// 执行 Flink 作业
env.execute("Flink Keyed Kafka Consumer Job");
}
} 通过实现 KafkaRecordDeserializationSchema 接口,Apache Flink 能够灵活地处理带键的 Kafka 记录,并提取出包括键、值、时间戳在内的所有重要元数据。这种方法为构建更复杂、更精细的 Flink 流处理应用提供了坚实的基础,特别是在需要基于键进行状态管理、数据去重或关联的场景中,它使得 Flink 能够充分利用 Kafka 消息的完整语义信息。