Skip to content

Commit 54cd4f0

Browse files
committed
Add test for ack/nack/reject metrics with offset
References #339
1 parent d4b4e95 commit 54cd4f0

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,9 @@ private void updateChannelStateAfterAckReject(Channel channel, long deliveryTag,
231231
}
232232
}
233233
} else {
234-
channelState.unackedMessageDeliveryTags.remove(deliveryTag);
235-
action.run();
234+
if (channelState.unackedMessageDeliveryTags.remove(deliveryTag)) {
235+
action.run();
236+
}
236237
}
237238
} finally {
238239
channelState.lock.unlock();

src/test/java/com/rabbitmq/client/test/functional/Metrics.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Random;
3333
import java.util.concurrent.*;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435

3536
import static org.awaitility.Awaitility.*;
3637
import static org.hamcrest.Matchers.*;
@@ -426,6 +427,57 @@ private void errorInChannel(ConnectionFactory connectionFactory) throws IOExcept
426427

427428
}
428429

430+
@Test public void checkAcksWithAutomaticRecovery() throws Exception {
431+
ConnectionFactory connectionFactory = createConnectionFactory();
432+
connectionFactory.setNetworkRecoveryInterval(2000);
433+
connectionFactory.setAutomaticRecoveryEnabled(true);
434+
StandardMetricsCollector metrics = new StandardMetricsCollector();
435+
connectionFactory.setMetricsCollector(metrics);
436+
437+
Connection connection = null;
438+
try {
439+
connection = connectionFactory.newConnection();
440+
441+
final Channel channel1 = connection.createChannel();
442+
final AtomicInteger ackedMessages = new AtomicInteger(0);
443+
444+
channel1.basicConsume(QUEUE, false, new DefaultConsumer(channel1) {
445+
@Override
446+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
447+
channel1.basicAck(envelope.getDeliveryTag(), false);
448+
ackedMessages.incrementAndGet();
449+
}
450+
});
451+
452+
Channel channel2 = connection.createChannel();
453+
channel2.confirmSelect();
454+
int nbMessages = 10;
455+
for (int i = 0; i < nbMessages; i++) {
456+
sendMessage(channel2);
457+
}
458+
channel2.waitForConfirms(1000);
459+
460+
closeAndWaitForRecovery((AutorecoveringConnection) connection);
461+
462+
for (int i = 0; i < nbMessages; i++) {
463+
sendMessage(channel2);
464+
}
465+
466+
waitAtMost(timeout()).until(new Callable<Integer>() {
467+
@Override
468+
public Integer call() {
469+
return ackedMessages.get();
470+
}
471+
}, equalTo(nbMessages * 2));
472+
473+
assertThat(metrics.getConsumedMessages().getCount(), is((long) (nbMessages * 2)));
474+
assertThat(metrics.getAcknowledgedMessages().getCount(), is((long) (nbMessages * 2)));
475+
476+
} finally {
477+
safeClose(connection);
478+
}
479+
}
480+
429481
private ConnectionFactory createConnectionFactory() {
430482
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
431483
return connectionFactory;

0 commit comments

Comments
 (0)