在 Kafka 消息发送场景中,开发者经常面临选择:是为所有消息类型共享一个通用的 KafkaTemplate
使用共享的 KafkaTemplate
相反,使用专用的 KafkaTemplate
flush() 方法的必要性
在深入讨论性能问题之前,我们需要评估 flush() 方法的必要性。通常情况下,并不需要显式地调用 flush() 方法。 KafkaProducer 具有 linger.ms 配置项,用于控制消息的批量发送。当达到 linger.ms 时间或消息达到一定数量时,Producer 会自动发送消息。除非 linger.ms 设置得非常大,并且需要立即发送消息,否则无需手动调用 flush()。
基于 Future 的同步等待
如果需要确保消息成功发送,更好的做法是利用 kafkaTemplate.send() 方法返回的 Future 对象。可以通过调用 Future.get() 方法来同步等待消息发送完成,并获取 SendResult 对象。
以下是使用 Future 的示例代码:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutionException;
public class SenderService {
private final KafkaTemplate kafkaTemplate;
public SenderService(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic
, String message) {
ListenableFuture> future = kafkaTemplate.send(topic, message);
try {
SendResult result = future.get();
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
} catch (InterruptedException | ExecutionException e) {
System.out.println("Unable to send message=[" + message + "] due to : " + e.getMessage());
}
}
} 在这个示例中,future.get() 方法会阻塞当前线程,直到消息发送完成或发生异常。通过捕获 InterruptedException 和 ExecutionException 异常,可以处理发送失败的情况。
性能考量与建议
总结
选择共享的 KafkaTemplate