在许多业务场景中,我们需要将数据库中的数据实时或准实时地同步到kafka。这通常伴随着几个核心要求:
传统的做法是采用同步发送机制,即发送一条消息后等待其确认,再发送下一条。这种方法虽然能严格保证顺序和不丢失,但在性能上往往表现不佳。
为了实现消息不丢失和严格顺序,一种直观的实现方式是利用Kafka生产者的同步发送特性。Kafka提供了acks=all和min.insync.replicas等配置来保证消息的可靠性,而通过同步等待每条消息的发送结果,可以确保消息的顺序。
以下是一个典型的同步发送代码示例:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class SynchronousKafkaSender {
private final KafkaTemplate kafkaTemplate; // 假设已注入
public SynchronousKafkaSender(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 以同步阻塞方式发送消息,保证顺序和不丢失。
*
* @param topicName Kafka主题名称
* @param data 待发送的数据列表
* @return 成功发送并确认的消息ID列表
*/
public List sendMessagesSynchronously(String topicName, List data) {
List successIds = new ArrayList<>();
for (MyDataObject value : data) {
// 发送消息并获取ListenableFuture
ListenableFuture> listenableFuture =
kafkaTemplate.send(topicName, value.getSiebelId(), value);
try {
// 阻塞等待发送结果,设置超时时间
listenableFuture.get(3, TimeUnit.SECONDS);
successIds.add(value.getId());
} catch (Exception e) {
// 如果发送失败或超时,记录警告并中断当前批次发送
System.err.println("消息发送失败或超时,ID: " + value.getId() + ", 错误: " + e.getMessage());
// 这里可以添加更详细的错误处理逻辑,如将失败消息记录到死信队列
break; // 中断,未发送的消息将在下一次调度中处理
}
}
return successIds;
}
}
// 假设MyDataObject是一个包含getId()和getSiebelId()方法的自定义类
class MyDataObject {
private String id;
private String siebelId;
private Object payload; // 实际数据
public MyDataObject(String id, String siebelId, Object payload) {
this.id = id;
this.siebelId = siebelId;
this.payload = payload;
}
public String getId() { return id; }
public String getSiebelId() { return siebelId; }
public Object getPayload() { return payload; }
} 工作原理与局限性:
为了解决同步发送的性能问题,可以采用异步发送结合回调机制。Kafka生产者本身就是异步的,send()方法会立即返回一个ListenableFuture,而不会阻塞。我们可以通过向ListenableFuture添加回调来处理发送结果。
以下是优化后的异步发送代码示例:
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; // 线程安全的列表 public class AsynchronousKafkaSender { private final KafkaTemplate
kafkaTemplate; // 假设已注入 public AsynchronousKafkaSender(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } /** * 以异步回调方式发送消息,优先保证性能和不丢失,但可能牺牲局部故障时的严格顺序。 * * @param topicName Kafka主题名称 * @param data 待发送的数据列表 * @return 成功发送并确认的消息ID列表 */ public List sendMessagesAsynchronously(String topicName, List data) { // 使用线程安全的列表来收集成功发送的消息ID List successIds = Collections.synchronizedList(new ArrayList<>()); data.forEach(value -> { kafkaTemplate.send(topicName, value.getSiebelId(), value) .addCallback(new ListenableFutureCallback >() { @Override public void onSuccess(SendResult result) { successIds.add(value.getId()); // 可以在这里记录成功日志 } @Override public void onFailure(Throwable exception) { System.err.println("消息发送失败,ID: " + value.getId() + ", 错误: " + exception.getMessage()); // 可以在这里实现更复杂的失败处理,如重试、记录到死信队列 } }); }); // 刷新KafkaTemplate,确保所有缓冲中的消息都被发送出去 kafkaTemplate.flush(); return successIds; } }
工作原理与权衡:
异步回调方案之所以能带来巨大的性能提升,主要得益于以下几点:
关于autoflush=true的注意事项: 如果将kafkaTemplate配置为autoflush=true,并移除代码中的kafkaTemplate.flush(),可能会发现性能反而下降。这可能是因为autoflush的实现机制通常是基于时间或缓冲区大小的自动触发,它可能不会像手动flush()那样在遍历完所有消息后立即强制清空并等待所有回调。手动flush()提供了一个明确的同步点,确保当前批次的所有异步发送操作都在返回successIds之前完成,这对于需要立即知道当前批次发送结果的场景(如数据库删除)至关重要。
从数据库到Kafka的消息传输,在保证不丢失和高性能的同时,对消息顺序的严格要求是核心挑战。同步阻塞式发送(listenableFuture.get())虽然能严格保证顺序和不丢失,但性能极差。异步回调式发送结合kafkaTemplate.flush()则提供了一个高性能的解决方案,它在保证消息最终不丢失的前提下,显著提升了吞吐量。这种方案在局部故障时可能导致消息的即时顺序发生偏差,但对于许多能够容忍“最终一致性”和“近似顺序”的业务场景来说,这是一个非常实用且有效的折衷方案。在选择具体方案时,应根据业务对消息顺序的严格程度和对性能的需求进行权衡。同时,完善的错误处理和数据库删除策略是确保系统健壮性的关键。