• 注册
当前位置:1313e > 默认分类 >正文

7.发布者确认

文章目录

      • *发布者确认*
        • *Publisher confirms(using the java client)*
        • *概览*
        • *在信道上启用发布者确认*
        • *策略一:单独发布消息*
          • *发布者确认不是异步的吗?*
        • *策略二:批量发布消息*
        • *策略三:异步处理发布者确认*
          • *如何跟踪未完成的确认*
          • *重新发布无应答的消息?*
        • *总结*
        • *整合代码*

发布者确认

Publisher confirms(using the java client)

发布者确认是rabbitMQ实现可靠发布的扩展。当在信道上启用发布者确认时,客户端发布的消息将由代理来异步确认,这意味着它们将由服务器端处理。

概览

在本章中,我们将使用发布者确认来确保发布的消息已经安全到达代理。并且将介绍几种使用发布者确认的策略,确认并解释它们的优缺点。

在信道上启用发布者确认

发布者确认是AMQP 0.9.1协议的rabbitMQ扩展,所以默认情况下不启用。可以在信道上通过方法confirmSelect开启:

Channel channel = connection.createChannel();
channel.confirmSelect();

此方法必须在希望使用发布者确认的每个信道上调用,并且确认应该只启用一次,而不是对每个发布的消息都启用。

策略一:单独发布消息

让我们从使用确认进行发布的最简单方法开始,即发布消息并同步等待其确认:

while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);// uses a 5 second timeoutchannel.waitForConfirmsOrDie(5_000);
}

在上面的例子中,我们像往常一样发布消息,并使用Channel#waitForConfirmsOrDie(long)方法等待消息确认。消息一经确认,该方法即返回。如果消息在超时时间内未被确认,或者代理由于某种原因无法处理它(nack-ed),则该方法将抛出异常。异常的处理通常包括记录错误消息和(或)重试发送消息。
需要注意的是,不同的客户端库有不同的方法来同步处理发布者确认,所以一定要仔细阅读正在使用的客户端文档。
这种技术非常简单,但也有一个主要缺点:它降低了发布的速度,因为消息的确认会阻塞所有后续消息的发布。这种方法不会提供每秒超过数百条已发布消息的吞吐量。然而,对于某些应用来说,这已经足够好了。

发布者确认不是异步的吗?

我们在开始时提到,代理以异步方式确认已发布的消息,但在第一个示例中,代码会同步等待,直到消息被确认为止。不过,客户端实际上是异步接收确认,并相应地解除对waitForConfirmsOrDie调用的阻塞。可以将waitForConfirmsOrDie看作是一个同步助手,它在底层依赖于异步通知。

策略二:批量发布消息

为了改进前面的示例,我们可以发布一批消息并等待整个批消息被确认。下面的示例使用100个批处理:

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}
}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);
}

与等待单个消息的确认相比,等待批消息的确认可以大大提高吞吐量(远程的rabbitMQ节点最多20-30次)。一个缺点是,在出现故障时,不能确切地知道出错的地方,因此可能必须在内存中保存整个批处理,以记录有意义的内容或重新发布消息。这个解决方案仍然是同步的,因此它阻塞了消息的发布。

策略三:异步处理发布者确认

代理异步地确认已发布的消息,只需在客户端注册一个回调,就可以得到这些确认消息的通知:

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {// code when message is confirmed
}, (deliveryTag, multiple) -> {// code when message is nack-ed
});

有两个回调:一个用于已确认的消息,一个用于可能被代理认为丢失的消息(nack-ed)。每个回调都有两个参数:

  • deliveryTag(long):标识已确认或无应答(nack-ed)消息的序列号。我们将很快看到如何将其与发布的消息关联起来。
  • multiple(boolean):这是一个布尔值。如果为false,则只确认一条消息是否被确认或无应答;如果为true,则确认所有序列号较低的或相等的消息。

序列号可以在发布之前通过调用Channel#getNextPublishSeqNo()方法获得:

int deliveryTag = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

将消息与序列号关联的一种简单方法是使用映射。假设希望发布字符串,因为它们很容易转换为用于发布的字节数组。下面是一个代码示例,它使用映射将发布序列号与消息的字符串主体关联起来:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

发布代码现在使用映射跟踪出去的消息。当确认到达时,我们需要清理该映射并且做一些额外的事情,例如,当消息无应答时记录一个警告信息。

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);confirmed.clear();} else {outstandingConfirms.remove(deliveryTag);}
};channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {String body = outstandingConfirms.get(deliveryTag);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, deliveryTag, multiple);cleanOutstandingConfirms.handle(deliveryTag, multiple);
});
// ... publishing code

上面的示例包含一个回调,在确认到达时将清理映射。注意这个回调函数同时处理单个和多个确认。这个回调在确认到达时被调用(作为Channel#addConfirmListener的第一个参数)。而无应答消息的回调函数将检索消息体并发出警告。然后,它重新使用之前的回调来清理未执行确认的映射(无论消息是被确认还是无应答,它们在映射中的对应条目都必须被删除)。

如何跟踪未完成的确认

我们的示例使用ConcurrentNavigableMap来跟踪未完成的确认。这种数据结构很方便,有几个原因。它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地清除给定序列id之前的条目(以处理多个确认/无应答)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应该与发布线程保持不同。
除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发哈希映射和一个变量来跟踪发布序列的下界,但这些方法通常更复杂。

总之,异步处理发布者确认通常需要以下步骤:

  • 提供一种将发布序列号与消息关联起来的方法。
  • 在信道上注册确认监听器,以便在发布者确认/无应答到达时收到通知,以执行适当的操作,例如记录或重新发布无应答的消息。在此步骤中,序列号与消息关联的机制也可能需要一些适当的清理。
  • 在发布消息之前跟踪发布序列号。
重新发布无应答的消息?

从相应的回调重新发布无应答的消息很有吸引力,但这应该避免,因为确认回调是在信道不应该执行操作的I/O线程中调度的。更好的解决方案是将消息排队到内存队列中,该队列由发布线程轮询。像ConcurrentLinkedQueue这样的类是在确认回调和发布线程之间传输消息的很好的候选者。

总结

在某些应用程序中,确保已发布的消息被发送到代理是至关重要的。发布者确认是rabbitMQ的一个特性,可以帮助满足这一需求。发布者确认在本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,这通常归结到应用程序和整个系统的约束。典型技术包括:

  • 单独发布消息,同步等待确认:很简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但很难判断什么时候出错。
  • 异步处理:最佳的性能和资源的使用,在出现错误时良好的控制,但可能需要正确地实现。

整合代码

public class PublisherConfirms {static final int MESSAGE_COUNT = 50_000;static Connection createConnection() throws Exception {ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.1.254");cf.setUsername("admin");cf.setPassword("admin123");return cf.newConnection();}public static void main(String[] args) throws Exception {publishMessagesIndividually();publishMessagesInBatch();handlePublishConfirmsAsynchronously();}static void publishMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();String queue = UUID.randomUUID().toString();ch.queueDeclare(queue, false, false, true, null);ch.confirmSelect();long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());ch.waitForConfirmsOrDie(5_000);}long end = System.nanoTime();System.out.format("Published %,d messages individually in %,d ms%n",MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());}}static void publishMessagesInBatch() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();String queue = UUID.randomUUID().toString();ch.queueDeclare(queue, false, false, true, null);ch.confirmSelect();int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}long end = System.nanoTime();System.out.format("Published %,d messages in batch in %,d ms%n",MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());}}static void handlePublishConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();String queue = UUID.randomUUID().toString();ch.queueDeclare(queue, false, false, true, null);ch.confirmSelect();ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);confirmed.clear();} else {outstandingConfirms.remove(sequenceNumber);}};ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, sequenceNumber, multiple);cleanOutstandingConfirms.handle(sequenceNumber, multiple);});long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);outstandingConfirms.put(ch.getNextPublishSeqNo(), body);ch.basicPublish("", queue, null, body.getBytes());}if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {throw new IllegalStateException("All messages could not be confirmed in 60 seconds");}long end = System.nanoTime();System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());}}static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {int waited = 0;while (!condition.getAsBoolean() && waited < timeout.toMillis()) {Thread.sleep(100L);waited = +100;}return condition.getAsBoolean();}
}

如果客户端和服务器位于同一台机器上,那么计算机上的输出应该类似。单独发布消息的性能不如预期,但与批量发布相比,异步处理的结果有点令人失望。
发布者确认非常依赖于网络,所以我们最好尝试使用远程节点,这更现实,因为在生产中客户端和服务器通常不在同一台机器上。Java可以很容易地更改为使用非本地节点(修改ConnectionFactory中的相应信息即可)。

本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 162202241@qq.com 举报,一经查实,本站将立刻删除。

最新评论

欢迎您发表评论:

请登录之后再进行评论

登录