高并发下生产端confirm消息丢失

来源:2-12 Rabbitmq高级特性-生产端特性讲解_确认机制和返回机制

这是你写的代码吗

2021-05-22 10:57:14

confirm消息丢失情况如生产端代码所示。在这种情况下,为了保证confirm机制的可靠性,是不是要做生产端消息投递限流,同时用消息补充机制做兜底?

生产端相关代码:

final AtomicInteger integer = new AtomicInteger(0);
// 开启确认监听
channel.confirmSelect();
// 注册异步监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("生产端确认" + integer.incrementAndGet());
//System.err.println("------- error ---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("生产端确认" + integer.incrementAndGet());
//System.err.println("------- ok ---------");
}
});

int loopN = 100;
for (int i = 0; i < loopN; i++) {
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
// sleep后能收到100条确认消息;
// 如果不作sleep处理,不管loopN值为何值,只能收到大概1/3的确认消息,
//TimeUnit.MILLISECONDS.sleep(10);
}

TimeUnit.SECONDS.sleep(3);

消费端相关代码:

final AtomicInteger integer = new AtomicInteger(0);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String msg = new String(body);
System.out.println("消费端收到消息:" + integer.incrementAndGet());
//System.out.println("收到消息:" + msg);
//channel.basicAck(envelope.getDeliveryTag(), false);
}
});

相关截图:

http://img.mukewang.com/climg/60a870ba096041d800000000.jpg

写回答

1回答

阿神

2021-05-26

不需要做限流,如果出现confirm失败 就需要做一个兜底了,因为网络抖动可能导致回来的ack没有收到

0

Java架构师-技术专家

千万级电商项目从0到100全过程,覆盖Java程序员不同成长阶段的核心问题与解决方案

2672 学习 · 5839 问题

查看课程