高并发下生产端confirm消息丢失
来源:2-12 Rabbitmq高级特性-生产端特性讲解_确认机制和返回机制
这是你写的代码吗
2021-05-22 10:57:14
confirm消息丢失情况如生产端代码所示。在这种情况下,为了保证confirm机制的可靠性,是不是要做生产端消息投递限流,同时用消息补充机制做兜底?
生产端相关代码:
final AtomicInteger integer = new AtomicInteger(0);
// 开启确认监听
channel.confirmSelect();
// 注册异步监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("生产端确认" + integer.incrementAndGet());
//System.err.println("------- error ---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("生产端确认" + integer.incrementAndGet());
//System.err.println("------- ok ---------");
}
});
int loopN = 100;
for (int i = 0; i < loopN; i++) {
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
// sleep后能收到100条确认消息;
// 如果不作sleep处理,不管loopN值为何值,只能收到大概1/3的确认消息,
//TimeUnit.MILLISECONDS.sleep(10);
}
TimeUnit.SECONDS.sleep(3);
消费端相关代码:
final AtomicInteger integer = new AtomicInteger(0);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String msg = new String(body);
System.out.println("消费端收到消息:" + integer.incrementAndGet());
//System.out.println("收到消息:" + msg);
//channel.basicAck(envelope.getDeliveryTag(), false);
}
});
相关截图:
1回答
阿神
2021-05-26
不需要做限流,如果出现confirm失败 就需要做一个兜底了,因为网络抖动可能导致回来的ack没有收到
相似问题