文章目录
- *发布者确认*
- *Publisher confirms(using the java client)*
- *概览*
- *在信道上启用发布者确认*
- *策略一:单独发布消息*
- *发布者确认不是异步的吗?*
- *策略二:批量发布消息*
- *策略三:异步处理发布者确认*
- *如何跟踪未完成的确认*
- *重新发布无应答的消息?*
- *总结*
- *整合代码*
发布者确认
Publisher confirms(using the java client)
发布者确认是rabbitMQ实现可靠发布的扩展。当在信道上启用发布者确认时,客户端发布的消息将由代理来异步确认,这意味着它们将由服务器端处理。
概览
在本章中,我们将使用发布者确认来确保发布的消息已经安全到达代理。并且将介绍几种使用发布者确认的策略,确认并解释它们的优缺点。
在信道上启用发布者确认
发布者确认是AMQP 0.9.1协议的rabbitMQ扩展,所以默认情况下不启用。可以在信道上通过方法
confirmSelect
开启:
Channel channel = connection.createChannel();
channel.confirmSelect();
此方法必须在希望使用发布者确认的每个信道上调用,并且确认应该只启用一次,而不是对每个发布的消息都启用。
策略一:单独发布消息
让我们从使用确认进行发布的最简单方法开始,即发布消息并同步等待其确认:
while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);// uses a 5 second timeoutchannel.waitForConfirmsOrDie(5_000);
}
在上面的例子中,我们像往常一样发布消息,并使用
Channel#waitForConfirmsOrDie(long)
方法等待消息确认。消息一经确认,该方法即返回。如果消息在超时时间内未被确认,或者代理由于某种原因无法处理它(nack-ed),则该方法将抛出异常。异常的处理通常包括记录错误消息和(或)重试发送消息。
需要注意的是,不同的客户端库有不同的方法来同步处理发布者确认,所以一定要仔细阅读正在使用的客户端文档。
这种技术非常简单,但也有一个主要缺点:它降低了发布的速度,因为消息的确认会阻塞所有后续消息的发布。这种方法不会提供每秒超过数百条已发布消息的吞吐量。然而,对于某些应用来说,这已经足够好了。
发布者确认不是异步的吗?
我们在开始时提到,代理以异步方式确认已发布的消息,但在第一个示例中,代码会同步等待,直到消息被确认为止。不过,客户端实际上是异步接收确认,并相应地解除对
waitForConfirmsOrDie
调用的阻塞。可以将waitForConfirmsOrDie
看作是一个同步助手,它在底层依赖于异步通知。
策略二:批量发布消息
为了改进前面的示例,我们可以发布一批消息并等待整个批消息被确认。下面的示例使用100个批处理:
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}
}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);
}
与等待单个消息的确认相比,等待批消息的确认可以大大提高吞吐量(远程的rabbitMQ节点最多20-30次)。一个缺点是,在出现故障时,不能确切地知道出错的地方,因此可能必须在内存中保存整个批处理,以记录有意义的内容或重新发布消息。这个解决方案仍然是同步的,因此它阻塞了消息的发布。
策略三:异步处理发布者确认
代理异步地确认已发布的消息,只需在客户端注册一个回调,就可以得到这些确认消息的通知:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {// code when message is confirmed
}, (deliveryTag, multiple) -> {// code when message is nack-ed
});
有两个回调:一个用于已确认的消息,一个用于可能被代理认为丢失的消息(nack-ed)。每个回调都有两个参数:
deliveryTag(long)
:标识已确认或无应答(nack-ed)消息的序列号。我们将很快看到如何将其与发布的消息关联起来。multiple(boolean)
:这是一个布尔值。如果为false
,则只确认一条消息是否被确认或无应答;如果为true
,则确认所有序列号较低的或相等的消息。序列号可以在发布之前通过调用
Channel#getNextPublishSeqNo()
方法获得:
int deliveryTag = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
将消息与序列号关联的一种简单方法是使用映射。假设希望发布字符串,因为它们很容易转换为用于发布的字节数组。下面是一个代码示例,它使用映射将发布序列号与消息的字符串主体关联起来:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
发布代码现在使用映射跟踪出去的消息。当确认到达时,我们需要清理该映射并且做一些额外的事情,例如,当消息无应答时记录一个警告信息。
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);confirmed.clear();} else {outstandingConfirms.remove(deliveryTag);}
};channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {String body = outstandingConfirms.get(deliveryTag);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, deliveryTag, multiple);cleanOutstandingConfirms.handle(deliveryTag, multiple);
});
// ... publishing code
上面的示例包含一个回调,在确认到达时将清理映射。注意这个回调函数同时处理单个和多个确认。这个回调在确认到达时被调用(作为
Channel#addConfirmListener
的第一个参数)。而无应答消息的回调函数将检索消息体并发出警告。然后,它重新使用之前的回调来清理未执行确认的映射(无论消息是被确认还是无应答,它们在映射中的对应条目都必须被删除)。
如何跟踪未完成的确认
我们的示例使用
ConcurrentNavigableMap
来跟踪未完成的确认。这种数据结构很方便,有几个原因。它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地清除给定序列id之前的条目(以处理多个确认/无应答)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应该与发布线程保持不同。
除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发哈希映射和一个变量来跟踪发布序列的下界,但这些方法通常更复杂。
总之,异步处理发布者确认通常需要以下步骤:
- 提供一种将发布序列号与消息关联起来的方法。
- 在信道上注册确认监听器,以便在发布者确认/无应答到达时收到通知,以执行适当的操作,例如记录或重新发布无应答的消息。在此步骤中,序列号与消息关联的机制也可能需要一些适当的清理。
- 在发布消息之前跟踪发布序列号。
重新发布无应答的消息?
从相应的回调重新发布无应答的消息很有吸引力,但这应该避免,因为确认回调是在信道不应该执行操作的I/O线程中调度的。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像
ConcurrentLinkedQueue
这样的类是在确认回调和发布线程之间传输消息的很好的候选者。
总结
在某些应用程序中,确保已发布的消息被发送到代理是至关重要的。发布者确认是rabbitMQ的一个特性,可以帮助满足这一需求。发布者确认在本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,这通常归结到应用程序和整个系统的约束。典型技术包括:
- 单独发布消息,同步等待确认:很简单,但吞吐量非常有限。
- 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但很难判断什么时候出错。
- 异步处理:最佳的性能和资源的使用,在出现错误时良好的控制,但可能需要正确地实现。
整合代码
public class PublisherConfirms {static final int MESSAGE_COUNT = 50_000;static Connection createConnection() throws Exception {ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.1.254");cf.setUsername("admin");cf.setPassword("admin123");return cf.newConnection();}public static void main(String[] args) throws Exception {publishMessagesIndividually();publishMessagesInBatch();handlePublishConfirmsAsynchronously();}static void publishMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();String queue = UUID.randomUUID().toString();ch.queueDeclare(queue, false, false, true, null);ch.confirmSelect();long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());ch.waitForConfirmsOrDie(5_000);}long end = System.nanoTime();System.out.format("Published %,d messages individually in %,d ms%n",MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());}}static void publishMessagesInBatch() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();String queue = UUID.randomUUID().toString();ch.queueDeclare(queue, false, false, true, null);ch.confirmSelect();int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}long end = System.nanoTime();System.out.format("Published %,d messages in batch in %,d ms%n",MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());}}static void handlePublishConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();String queue = UUID.randomUUID().toString();ch.queueDeclare(queue, false, false, true, null);ch.confirmSelect();ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);confirmed.clear();} else {outstandingConfirms.remove(sequenceNumber);}};ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, sequenceNumber, multiple);cleanOutstandingConfirms.handle(sequenceNumber, multiple);});long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);outstandingConfirms.put(ch.getNextPublishSeqNo(), body);ch.basicPublish("", queue, null, body.getBytes());}if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {throw new IllegalStateException("All messages could not be confirmed in 60 seconds");}long end = System.nanoTime();System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());}}static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {int waited = 0;while (!condition.getAsBoolean() && waited < timeout.toMillis()) {Thread.sleep(100L);waited = +100;}return condition.getAsBoolean();}
}
如果客户端和服务器位于同一台机器上,那么计算机上的输出应该类似。单独发布消息的性能不如预期,但与批量发布相比,异步处理的结果有点令人失望。
发布者确认非常依赖于网络,所以我们最好尝试使用远程节点,这更现实,因为在生产中客户端和服务器通常不在同一台机器上。Java可以很容易地更改为使用非本地节点(修改ConnectionFactory
中的相应信息即可)。