@KafkaListener(topics = {"${kafka.topic.topicB}"}, groupId = "groupB")
public void consumeTopicB(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
/*
* 执行消费逻辑处理的代码,
*
*/
acknowledgment.acknowledge();// 消费成功后手动提交offset
logger.info("消费者B消费topicB:{} partition:{}的消息 -> {}", consumerRecord.topic(), consumerRecord.partition(),message);
}
}
比如在上面的消费逻辑处理过程中,失败了。那么此条消费要怎么处理呢?我是设置手动提交offset的。
第一种方案: 如果失败了以后,把失败的数据存入到数据库中,然后在提交offset。然后后续在定时的从数据库中把失败的数据再次发送到对应的topic下,等待下次的消费。
但是这样的话有个问题,比如某条消息一直失败,不可能无限重复上面的操作吧? 所以我想的是在消息模型中添加一个失败重试次数属性:
public class KafkaMsg implements Serializable {
private static final long serialVersionUID = -1532915942422600087L;
private String msgId;
private String content;
private Integer retryTime; // 重试次数记录
public String getMsgId() {
return msgId;
}
public String getContent() {
return content;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public void setContent(String content) {
this.content = content;
}
public Integer getRetryTime() {
return retryTime;
}
public Integer setRetryTime(Integer time) {
this.retryTime = time;
}
@Override
public String toString() {
return "KafkaMsg{" +
"msgId='" + msgId + '\'' +
", content='" + content + '\'' +
'}';
}
}
然后消费失败后,先记录一下重试次数再把它存入数据库,然后定时再次发送到topic时,先判断它的重试次数是否达到上限,没有就再次写入topic等待再次被消费
其实不光是Kafka还有rabbitmq消费端消费失败后,重试也可以使用这样的方式处理。
第二种方案:
消费失败后把消息转发到另一个主题中,然后对于失败的消息你想怎么处理都可以,入库,写文件,程序处理都随你便,相当于 rabbitmq 的死信队列