生产者发送消息到rabbitmq 没有收到confirm

来源:3-24 可靠性消息最终演示

qq_ToutAn諾尘_0

2020-08-16 14:31:54

这有可能是什么原因呢。我看了一下rabbitmq 控制台。 rabbitmq 已经收到了消息,但是我的RabbitTemplateContainer 中的confirm 缺没有回调 。

@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback{

    private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();

    private Splitter splitter = Splitter.on("#");

    private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;

    @Autowired
    private ConnectionFactory connectionFactory;
    @Autowired
    private MessageStoreService messageStoreService;


    public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message);
        String topic = message.getTopic();
        RabbitTemplate rabbitTemplate = rabbitMap.get(topic);
        if(rabbitTemplate != null) {
            return rabbitTemplate;
        }
        log.info("#RabbitTemplateContainer.getTemplate# topic: {} is not exists, create one", topic);

        RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory);
        newTemplate.setExchange(topic);
        newTemplate.setRoutingKey(message.getRoutingKey());
        newTemplate.setRetryTemplate(new RetryTemplate());

        // 添加序列化反序列化和converter对象
        Serializer serializer = serializerFactory.create();
        GenericMessageConverter gmc = new GenericMessageConverter(serializer);
        RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
        newTemplate.setMessageConverter(rmc);

        String messageType = message.getMessageType();
        if(!MessageType.RAPID.equals(messageType)) {
            newTemplate.setConfirmCallback(this);
        }

        rabbitMap.putIfAbsent(topic, newTemplate);

        return rabbitMap.get(topic);
    }

    /**
     *     无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //     具体的消息应答
        List<String> strings = splitter.splitToList(correlationData.getId());
        String messageId = strings.get(0);
        long sendTime = Long.parseLong(strings.get(1));
        String messageType = strings.get(2);
        if(ack) {
            //     如果当前消息类型为reliant 我们就去数据库查找并进行更新
            if(MessageType.RELIANT.endsWith(messageType)) {
                this.messageStoreService.succuess(messageId);
            }
            log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
        } else {
            // 队列满了,磁盘满,或者broker
            if(MessageType.RELIANT.endsWith(messageType)) {
                this.messageStoreService.failure(messageId,cause);
            }
            log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);

        }
    }
}

已经看了下日志 。send message XXX  成功或者失败的分支完全没有走。有可能是什么原因呢
 

写回答

1回答

qq_ToutAn諾尘_0

提问者

2020-08-16

没开启 confirm 配置 。解决了。

0

Java架构师-技术专家

千万级电商项目从0到100全过程,覆盖Java程序员不同成长阶段的核心问题与解决方案

2672 学习 · 5839 问题

查看课程