Skip to content

Commit b248a7a

Browse files
authored
Merge pull request #360 from slayful/rabbitmq-java-354-ack-nack-unrouted
#354 | Metrics for ack, nack & unrouted publishes
2 parents 152f4b7 + b3d2733 commit b248a7a

File tree

8 files changed

+289
-2
lines changed

8 files changed

+289
-2
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ public interface MetricsCollector {
3838

3939
void basicPublishFailure(Channel channel, Throwable cause);
4040

41+
void basicPublishAck(Channel channel, long deliveryTag, boolean multiple);
42+
43+
void basicPublishNack(Channel channel, long deliveryTag, boolean multiple);
44+
45+
void basicPublishUnrouted(Channel channel);
46+
4147
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
4248

4349
void consumedMessage(Channel channel, long deliveryTag, String consumerTag);

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,21 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
7575

7676
}
7777

78+
@Override
79+
public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
80+
81+
}
82+
83+
@Override
84+
public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
85+
86+
}
87+
88+
@Override
89+
public void basicPublishUnrouted(Channel channel) {
90+
91+
}
92+
7893
@Override
7994
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {
8095

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,41 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
111111
}
112112
}
113113

114+
@Override
115+
public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
116+
if (multiple) {
117+
// this is a naive approach, as it does not handle multiple nacks
118+
return;
119+
}
120+
try {
121+
markMessagePublishAcknowledged();
122+
} catch(Exception e) {
123+
LOGGER.info("Error while computing metrics in basicPublishAck: " + e.getMessage());
124+
}
125+
}
126+
127+
@Override
128+
public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
129+
if (multiple) {
130+
// this is a naive approach, as it does not handle multiple nacks
131+
return;
132+
}
133+
try {
134+
markMessagePublishNotAcknowledged();
135+
} catch(Exception e) {
136+
LOGGER.info("Error while computing metrics in basicPublishNack: " + e.getMessage());
137+
}
138+
}
139+
140+
@Override
141+
public void basicPublishUnrouted(Channel channel) {
142+
try {
143+
markPublishedMessageNotRouted();
144+
} catch(Exception e) {
145+
LOGGER.info("Error while computing metrics in markPublishedMessageNotRouted: " + e.getMessage());
146+
}
147+
}
148+
114149
@Override
115150
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
116151
try {
@@ -360,6 +395,17 @@ private ChannelState(Channel channel) {
360395
*/
361396
protected abstract void markRejectedMessage();
362397

398+
/**
399+
* Marks the event of a message publishing acknowledgement.
400+
*/
401+
protected abstract void markMessagePublishAcknowledged();
363402

364-
403+
/**
404+
* Marks the event of a message publishing not being acknowledged.
405+
*/
406+
protected abstract void markMessagePublishNotAcknowledged();
407+
/**
408+
* Marks the event of a published message not being routed.
409+
*/
410+
protected abstract void markPublishedMessageNotRouted();
365411
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
481481
}
482482
} catch (Throwable ex) {
483483
getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
484+
} finally {
485+
metricsCollector.basicPublishUnrouted(this);
484486
}
485487
}
486488

@@ -491,6 +493,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B
491493
}
492494
} catch (Throwable ex) {
493495
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
496+
} finally {
497+
metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple());
494498
}
495499
}
496500

@@ -501,6 +505,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B
501505
}
502506
} catch (Throwable ex) {
503507
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
508+
} finally {
509+
metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple());
504510
}
505511
}
506512

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
5151

5252
private final Counter failedToPublishMessages;
5353

54+
private final Counter ackedPublishedMessages;
55+
56+
private final Counter nackedPublishedMessages;
57+
58+
private final Counter unroutedPublishedMessages;
59+
5460
private final Counter consumedMessages;
5561

5662
private final Counter acknowledgedMessages;
@@ -81,6 +87,9 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
8187
this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES);
8288
this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES);
8389
this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES);
90+
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
91+
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
92+
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
8493
}
8594

8695
@Override
@@ -128,6 +137,21 @@ protected void markRejectedMessage() {
128137
rejectedMessages.increment();
129138
}
130139

140+
@Override
141+
protected void markMessagePublishAcknowledged() {
142+
ackedPublishedMessages.increment();
143+
}
144+
145+
@Override
146+
protected void markMessagePublishNotAcknowledged() {
147+
nackedPublishedMessages.increment();
148+
}
149+
150+
@Override
151+
protected void markPublishedMessageNotRouted() {
152+
unroutedPublishedMessages.increment();
153+
}
154+
131155
public AtomicLong getConnections() {
132156
return connections;
133157
}
@@ -144,6 +168,18 @@ public Counter getFailedToPublishMessages() {
144168
return failedToPublishMessages;
145169
}
146170

171+
public Counter getAckedPublishedMessages() {
172+
return ackedPublishedMessages;
173+
}
174+
175+
public Counter getNackedPublishedMessages() {
176+
return nackedPublishedMessages;
177+
}
178+
179+
public Counter getUnroutedPublishedMessages() {
180+
return unroutedPublishedMessages;
181+
}
182+
147183
public Counter getConsumedMessages() {
148184
return consumedMessages;
149185
}
@@ -198,6 +234,24 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
198234
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
199235
return registry.counter(prefix + ".failed_to_publish", tags);
200236
}
237+
},
238+
ACKED_PUBLISHED_MESSAGES {
239+
@Override
240+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
241+
return registry.counter(prefix + ".acknowledged_published", tags);
242+
}
243+
},
244+
NACKED_PUBLISHED_MESSAGES {
245+
@Override
246+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
247+
return registry.counter(prefix + ".not_acknowledged_published", tags);
248+
}
249+
},
250+
UNROUTED_PUBLISHED_MESSAGES {
251+
@Override
252+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
253+
return registry.counter(prefix + ".unrouted_published", tags);
254+
}
201255
};
202256

203257
/**

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4242
private final Meter acknowledgedMessages;
4343
private final Meter rejectedMessages;
4444
private final Meter failedToPublishMessages;
45+
private final Meter publishAcknowledgedMessages;
46+
private final Meter publishNacknowledgedMessages;
47+
private final Meter publishUnroutedMessages;
4548

4649

4750
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
@@ -50,6 +53,9 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5053
this.channels = registry.counter(metricsPrefix+".channels");
5154
this.publishedMessages = registry.meter(metricsPrefix+".published");
5255
this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish");
56+
this.publishAcknowledgedMessages = registry.meter(metricsPrefix+".publish_ack");
57+
this.publishNacknowledgedMessages = registry.meter(metricsPrefix+".publish_nack");
58+
this.publishUnroutedMessages = registry.meter(metricsPrefix+".publish_unrouted");
5359
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
5460
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
5561
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
@@ -108,8 +114,21 @@ protected void markRejectedMessage() {
108114
rejectedMessages.mark();
109115
}
110116

117+
@Override
118+
protected void markMessagePublishAcknowledged() {
119+
publishAcknowledgedMessages.mark();
120+
}
121+
122+
@Override
123+
protected void markMessagePublishNotAcknowledged() {
124+
publishNacknowledgedMessages.mark();
125+
}
126+
127+
@Override
128+
protected void markPublishedMessageNotRouted() {
129+
publishUnroutedMessages.mark();
130+
}
111131

112-
113132
public MetricRegistry getMetricRegistry() {
114133
return registry;
115134
}
@@ -141,4 +160,17 @@ public Meter getRejectedMessages() {
141160
public Meter getFailedToPublishMessages() {
142161
return failedToPublishMessages;
143162
}
163+
164+
public Meter getPublishAcknowledgedMessages() {
165+
return publishAcknowledgedMessages;
166+
}
167+
168+
public Meter getPublishNotAcknowledgedMessages() {
169+
return publishNacknowledgedMessages;
170+
}
171+
172+
public Meter getPublishUnroutedMessages() {
173+
return publishUnroutedMessages;
174+
}
175+
144176
}

src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,62 @@ public void basicGetAndAck() {
149149
assertThat(publishedMessages(metrics), is(2L));
150150
}
151151

152+
@Test public void publishingAcknowledgements() {
153+
long anyDeliveryTag = 123L;
154+
AbstractMetricsCollector metrics = factory.create();
155+
Channel channel = mock(Channel.class);
156+
// begins with no messages acknowledged
157+
assertThat(publishAck(metrics), is(0L));
158+
// first acknowledgement gets tracked
159+
metrics.basicPublishAck(channel, anyDeliveryTag, false);
160+
assertThat(publishAck(metrics), is(1L));
161+
// second acknowledgement gets tracked
162+
metrics.basicPublishAck(channel, anyDeliveryTag, false);
163+
assertThat(publishAck(metrics), is(2L));
164+
// multiple deliveries aren't tracked
165+
metrics.basicPublishAck(channel, anyDeliveryTag, true);
166+
assertThat(publishAck(metrics), is(2L));
167+
// cleaning stale state doesn't affect the metric
168+
metrics.cleanStaleState();
169+
assertThat(publishAck(metrics), is(2L));
170+
}
171+
172+
@Test public void publishingNotAcknowledgements() {
173+
long anyDeliveryTag = 123L;
174+
AbstractMetricsCollector metrics = factory.create();
175+
Channel channel = mock(Channel.class);
176+
// begins with no messages not-acknowledged
177+
assertThat(publishNack(metrics), is(0L));
178+
// first not-acknowledgement gets tracked
179+
metrics.basicPublishNack(channel, anyDeliveryTag, false);
180+
assertThat(publishNack(metrics), is(1L));
181+
// second not-acknowledgement gets tracked
182+
metrics.basicPublishNack(channel, anyDeliveryTag, false);
183+
assertThat(publishNack(metrics), is(2L));
184+
// multiple deliveries aren't tracked
185+
metrics.basicPublishNack(channel, anyDeliveryTag, true);
186+
assertThat(publishNack(metrics), is(2L));
187+
// cleaning stale state doesn't affect the metric
188+
metrics.cleanStaleState();
189+
assertThat(publishNack(metrics), is(2L));
190+
}
191+
192+
@Test public void publishingUnrouted() {
193+
AbstractMetricsCollector metrics = factory.create();
194+
Channel channel = mock(Channel.class);
195+
// begins with no messages not-acknowledged
196+
assertThat(publishUnrouted(metrics), is(0L));
197+
// first unrouted gets tracked
198+
metrics.basicPublishUnrouted(channel);
199+
assertThat(publishUnrouted(metrics), is(1L));
200+
// second unrouted gets tracked
201+
metrics.basicPublishUnrouted(channel);
202+
assertThat(publishUnrouted(metrics), is(2L));
203+
// cleaning stale state doesn't affect the metric
204+
metrics.cleanStaleState();
205+
assertThat(publishUnrouted(metrics), is(2L));
206+
}
207+
152208
@Test public void cleanStaleState() {
153209
AbstractMetricsCollector metrics = factory.create();
154210
Connection openConnection = mock(Connection.class);
@@ -189,6 +245,31 @@ public void basicGetAndAck() {
189245
assertThat(channels(metrics), is(1L));
190246
}
191247

248+
249+
long publishAck(MetricsCollector metrics) {
250+
if (metrics instanceof StandardMetricsCollector) {
251+
return ((StandardMetricsCollector) metrics).getPublishAcknowledgedMessages().getCount();
252+
} else {
253+
return (long) ((MicrometerMetricsCollector) metrics).getAckedPublishedMessages().count();
254+
}
255+
}
256+
257+
long publishNack(MetricsCollector metrics) {
258+
if (metrics instanceof StandardMetricsCollector) {
259+
return ((StandardMetricsCollector) metrics).getPublishNotAcknowledgedMessages().getCount();
260+
} else {
261+
return (long) ((MicrometerMetricsCollector) metrics).getNackedPublishedMessages().count();
262+
}
263+
}
264+
265+
long publishUnrouted(MetricsCollector metrics) {
266+
if (metrics instanceof StandardMetricsCollector) {
267+
return ((StandardMetricsCollector) metrics).getPublishUnroutedMessages().getCount();
268+
} else {
269+
return (long) ((MicrometerMetricsCollector) metrics).getUnroutedPublishedMessages().count();
270+
}
271+
}
272+
192273
long publishedMessages(MetricsCollector metrics) {
193274
if (metrics instanceof StandardMetricsCollector) {
194275
return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount();

0 commit comments

Comments
 (0)