生产者发送消息到rabbitmq 没有收到confirm
来源:3-24 可靠性消息最终演示
qq_ToutAn諾尘_0
2020-08-16 14:31:54
这有可能是什么原因呢。我看了一下rabbitmq 控制台。 rabbitmq 已经收到了消息,但是我的RabbitTemplateContainer 中的confirm 缺没有回调 。
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback{
private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
private Splitter splitter = Splitter.on("#");
private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private MessageStoreService messageStoreService;
public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
Preconditions.checkNotNull(message);
String topic = message.getTopic();
RabbitTemplate rabbitTemplate = rabbitMap.get(topic);
if(rabbitTemplate != null) {
return rabbitTemplate;
}
log.info("#RabbitTemplateContainer.getTemplate# topic: {} is not exists, create one", topic);
RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory);
newTemplate.setExchange(topic);
newTemplate.setRoutingKey(message.getRoutingKey());
newTemplate.setRetryTemplate(new RetryTemplate());
// 添加序列化反序列化和converter对象
Serializer serializer = serializerFactory.create();
GenericMessageConverter gmc = new GenericMessageConverter(serializer);
RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
newTemplate.setMessageConverter(rmc);
String messageType = message.getMessageType();
if(!MessageType.RAPID.equals(messageType)) {
newTemplate.setConfirmCallback(this);
}
rabbitMap.putIfAbsent(topic, newTemplate);
return rabbitMap.get(topic);
}
/**
* 无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 具体的消息应答
List<String> strings = splitter.splitToList(correlationData.getId());
String messageId = strings.get(0);
long sendTime = Long.parseLong(strings.get(1));
String messageType = strings.get(2);
if(ack) {
// 如果当前消息类型为reliant 我们就去数据库查找并进行更新
if(MessageType.RELIANT.endsWith(messageType)) {
this.messageStoreService.succuess(messageId);
}
log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
} else {
// 队列满了,磁盘满,或者broker
if(MessageType.RELIANT.endsWith(messageType)) {
this.messageStoreService.failure(messageId,cause);
}
log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
}
}
}已经看了下日志 。send message XXX 成功或者失败的分支完全没有走。有可能是什么原因呢
1回答
qq_ToutAn諾尘_0
提问者
2020-08-16
没开启 confirm 配置 。解决了。
相似问题