Skip to content

Commit 3c6badf

Browse files
Merge pull request #314 from rabbitmq/rabbitmq-java-client-313
Add support for Micrometer
2 parents 7559d50 + b9ef759 commit 3c6badf

File tree

4 files changed

+287
-19
lines changed

4 files changed

+287
-19
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
<slf4j.version>1.7.21</slf4j.version>
5858
<metrics.version>3.1.2</metrics.version>
59+
<micrometer.version>1.0.0-rc.2</micrometer.version>
5960
<logback.version>1.1.7</logback.version>
6061
<commons-cli.version>1.1</commons-cli.version>
6162
<junit.version>4.12</junit.version>
@@ -633,6 +634,12 @@
633634
<version>${metrics.version}</version>
634635
<optional>true</optional>
635636
</dependency>
637+
<dependency>
638+
<groupId>io.micrometer</groupId>
639+
<artifactId>micrometer-core</artifactId>
640+
<version>${micrometer.version}</version>
641+
<optional>true</optional>
642+
</dependency>
636643
<dependency>
637644
<groupId>commons-cli</groupId>
638645
<artifactId>commons-cli</artifactId>
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.MetricsCollector;
6+
import io.micrometer.core.instrument.Counter;
7+
import io.micrometer.core.instrument.MeterRegistry;
8+
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
12+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES;
13+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS;
14+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS;
15+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES;
16+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES;
17+
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES;
18+
19+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
20+
//
21+
// This software, the RabbitMQ Java client library, is triple-licensed under the
22+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
23+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
24+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
25+
// please see LICENSE-APACHE2.
26+
//
27+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
28+
// either express or implied. See the LICENSE file for specific language governing
29+
// rights and limitations of this software.
30+
//
31+
// If you have any questions regarding licensing, please contact us at
32+
// info@rabbitmq.com.
33+
34+
/**
35+
* Micrometer implementation of {@link MetricsCollector}.
36+
* Note transactions are not supported (see {@link MetricsCollector}.
37+
* Micrometer provides out-of-the-box support for report backends like JMX,
38+
* Graphite, Ganglia, Atlas, Datadog, etc. See Micrometer documentation for
39+
* more details.
40+
*
41+
* Note Micrometer requires Java 8 or more, so does this {@link MetricsCollector}.
42+
*
43+
* @see MetricsCollector
44+
* @since 4.3.0
45+
*/
46+
public class MicrometerMetricsCollector extends AbstractMetricsCollector {
47+
48+
private final AtomicLong connections;
49+
50+
private final AtomicLong channels;
51+
52+
private final Counter publishedMessages;
53+
54+
private final Counter consumedMessages;
55+
56+
private final Counter acknowledgedMessages;
57+
58+
private final Counter rejectedMessages;
59+
60+
public MicrometerMetricsCollector(MeterRegistry registry) {
61+
this(registry, "rabbitmq");
62+
}
63+
64+
public MicrometerMetricsCollector(final MeterRegistry registry, final String prefix) {
65+
this(new MetricsCreator() {
66+
@Override
67+
public Object create(Metrics metric) {
68+
return metric.create(registry, prefix);
69+
}
70+
});
71+
}
72+
73+
public MicrometerMetricsCollector(MetricsCreator creator) {
74+
this.connections = (AtomicLong) creator.create(CONNECTIONS);
75+
this.channels = (AtomicLong) creator.create(CHANNELS);
76+
this.publishedMessages = (Counter) creator.create(PUBLISHED_MESSAGES);
77+
this.consumedMessages = (Counter) creator.create(CONSUMED_MESSAGES);
78+
this.acknowledgedMessages = (Counter) creator.create(ACKNOWLEDGED_MESSAGES);
79+
this.rejectedMessages = (Counter) creator.create(REJECTED_MESSAGES);
80+
}
81+
82+
@Override
83+
protected void incrementConnectionCount(Connection connection) {
84+
connections.incrementAndGet();
85+
}
86+
87+
@Override
88+
protected void decrementConnectionCount(Connection connection) {
89+
connections.decrementAndGet();
90+
}
91+
92+
@Override
93+
protected void incrementChannelCount(Channel channel) {
94+
channels.incrementAndGet();
95+
}
96+
97+
@Override
98+
protected void decrementChannelCount(Channel channel) {
99+
channels.decrementAndGet();
100+
}
101+
102+
@Override
103+
protected void markPublishedMessage() {
104+
publishedMessages.increment();
105+
}
106+
107+
@Override
108+
protected void markConsumedMessage() {
109+
consumedMessages.increment();
110+
}
111+
112+
@Override
113+
protected void markAcknowledgedMessage() {
114+
acknowledgedMessages.increment();
115+
}
116+
117+
@Override
118+
protected void markRejectedMessage() {
119+
rejectedMessages.increment();
120+
}
121+
122+
public AtomicLong getConnections() {
123+
return connections;
124+
}
125+
126+
public AtomicLong getChannels() {
127+
return channels;
128+
}
129+
130+
public Counter getPublishedMessages() {
131+
return publishedMessages;
132+
}
133+
134+
public Counter getConsumedMessages() {
135+
return consumedMessages;
136+
}
137+
138+
public Counter getAcknowledgedMessages() {
139+
return acknowledgedMessages;
140+
}
141+
142+
public Counter getRejectedMessages() {
143+
return rejectedMessages;
144+
}
145+
146+
public enum Metrics {
147+
CONNECTIONS {
148+
@Override
149+
Object create(MeterRegistry registry, String prefix) {
150+
return registry.gauge(prefix + ".connections", new AtomicLong(0));
151+
}
152+
},
153+
CHANNELS {
154+
@Override
155+
Object create(MeterRegistry registry, String prefix) {
156+
return registry.gauge(prefix + ".channels", new AtomicLong(0));
157+
}
158+
},
159+
PUBLISHED_MESSAGES {
160+
@Override
161+
Object create(MeterRegistry registry, String prefix) {
162+
return registry.counter(prefix + ".published");
163+
}
164+
},
165+
CONSUMED_MESSAGES {
166+
@Override
167+
Object create(MeterRegistry registry, String prefix) {
168+
return registry.counter(prefix + ".consumed");
169+
}
170+
},
171+
ACKNOWLEDGED_MESSAGES {
172+
@Override
173+
Object create(MeterRegistry registry, String prefix) {
174+
return registry.counter(prefix + ".acknowledged");
175+
}
176+
},
177+
REJECTED_MESSAGES {
178+
@Override
179+
Object create(MeterRegistry registry, String prefix) {
180+
return registry.counter(prefix + ".rejected");
181+
}
182+
};
183+
184+
abstract Object create(MeterRegistry registry, String prefix);
185+
}
186+
187+
public interface MetricsCreator {
188+
189+
Object create(Metrics metric);
190+
191+
}
192+
193+
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
JSONReadWriteTest.class,
4444
SharedThreadPoolTest.class,
4545
DnsRecordIpAddressResolverTests.class,
46-
StandardMetricsCollectorTest.class,
46+
MetricsCollectorTest.class,
4747
DnsSrvRecordAddressResolverTest.class,
4848
JavaNioTest.class,
4949
ConnectionFactoryTest.class,

src/test/java/com/rabbitmq/client/test/StandardMetricsCollectorTest.java renamed to src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717

1818
import com.rabbitmq.client.Channel;
1919
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.MetricsCollector;
21+
import com.rabbitmq.client.impl.AbstractMetricsCollector;
22+
import com.rabbitmq.client.impl.MicrometerMetricsCollector;
2023
import com.rabbitmq.client.impl.StandardMetricsCollector;
24+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
2125
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
import org.junit.runners.Parameterized;
2228

2329
import static org.hamcrest.Matchers.*;
2430
import static org.junit.Assert.*;
@@ -27,11 +33,23 @@
2733
/**
2834
*
2935
*/
30-
public class StandardMetricsCollectorTest {
36+
@RunWith(Parameterized.class)
37+
public class MetricsCollectorTest {
38+
39+
@Parameterized.Parameters
40+
public static Object[] data() {
41+
// need to resort to a factory, as this method is called only once
42+
// if creating the collector instance, it's reused across the test methods
43+
// and this doesn't work (it cannot be reset)
44+
return new Object[] { new StandardMetricsCollectorFactory(), new MicrometerMetricsCollectorFactory() };
45+
}
46+
47+
@Parameterized.Parameter
48+
public MetricsCollectorFactory factory;
3149

3250
@Test
3351
public void basicGetAndAck() {
34-
StandardMetricsCollector metrics = new StandardMetricsCollector();
52+
AbstractMetricsCollector metrics = factory.create();
3553
Connection connection = mock(Connection.class);
3654
when(connection.getId()).thenReturn("connection-1");
3755
Channel channel = mock(Channel.class);
@@ -49,20 +67,20 @@ public void basicGetAndAck() {
4967
metrics.consumedMessage(channel, 6, false);
5068

5169
metrics.basicAck(channel, 6, false);
52-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L));
70+
assertThat(acknowledgedMessages(metrics), is(1L));
5371

5472
metrics.basicAck(channel, 3, true);
55-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L));
73+
assertThat(acknowledgedMessages(metrics), is(1L+2L));
5674

5775
metrics.basicAck(channel, 6, true);
58-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
76+
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
5977

6078
metrics.basicAck(channel, 10, true);
61-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
79+
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
6280
}
6381

6482
@Test public void basicConsumeAndAck() {
65-
StandardMetricsCollector metrics = new StandardMetricsCollector();
83+
AbstractMetricsCollector metrics = factory.create();
6684
Connection connection = mock(Connection.class);
6785
when(connection.getId()).thenReturn("connection-1");
6886
Channel channel = mock(Channel.class);
@@ -78,8 +96,8 @@ public void basicGetAndAck() {
7896
metrics.basicConsume(channel, consumerTagWithManualAck, false);
7997

8098
metrics.consumedMessage(channel, 1, consumerTagWithAutoAck);
81-
assertThat(metrics.getConsumedMessages().getCount(), is(1L));
82-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(0L));
99+
assertThat(consumedMessages(metrics), is(1L));
100+
assertThat(acknowledgedMessages(metrics), is(0L));
83101

84102
metrics.consumedMessage(channel, 2, consumerTagWithManualAck);
85103
metrics.consumedMessage(channel, 3, consumerTagWithManualAck);
@@ -88,21 +106,21 @@ public void basicGetAndAck() {
88106
metrics.consumedMessage(channel, 6, consumerTagWithManualAck);
89107

90108
metrics.basicAck(channel, 6, false);
91-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L));
109+
assertThat(acknowledgedMessages(metrics), is(1L));
92110

93111
metrics.basicAck(channel, 3, true);
94-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L));
112+
assertThat(acknowledgedMessages(metrics), is(1L+2L));
95113

96114
metrics.basicAck(channel, 6, true);
97-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
115+
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
98116

99117
metrics.basicAck(channel, 10, true);
100-
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
118+
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
101119

102120
}
103121

104122
@Test public void cleanStaleState() {
105-
StandardMetricsCollector metrics = new StandardMetricsCollector();
123+
AbstractMetricsCollector metrics = factory.create();
106124
Connection openConnection = mock(Connection.class);
107125
when(openConnection.getId()).thenReturn("connection-1");
108126
when(openConnection.isOpen()).thenReturn(true);
@@ -132,13 +150,63 @@ public void basicGetAndAck() {
132150
metrics.newChannel(closedChannel);
133151
metrics.newChannel(openChannelInClosedConnection);
134152

135-
assertThat(metrics.getConnections().getCount(), is(2L));
136-
assertThat(metrics.getChannels().getCount(), is(2L+1L));
153+
assertThat(connections(metrics), is(2L));
154+
assertThat(channels(metrics), is(2L+1L));
137155

138156
metrics.cleanStaleState();
139157

140-
assertThat(metrics.getConnections().getCount(), is(1L));
141-
assertThat(metrics.getChannels().getCount(), is(1L));
158+
assertThat(connections(metrics), is(1L));
159+
assertThat(channels(metrics), is(1L));
160+
}
161+
162+
long consumedMessages(MetricsCollector metrics) {
163+
if (metrics instanceof StandardMetricsCollector) {
164+
return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();
165+
} else {
166+
return (long) ((MicrometerMetricsCollector) metrics).getConsumedMessages().count();
167+
}
168+
}
169+
170+
long acknowledgedMessages(MetricsCollector metrics) {
171+
if (metrics instanceof StandardMetricsCollector) {
172+
return ((StandardMetricsCollector) metrics).getAcknowledgedMessages().getCount();
173+
} else {
174+
return (long) ((MicrometerMetricsCollector) metrics).getAcknowledgedMessages().count();
175+
}
176+
}
177+
178+
long connections(MetricsCollector metrics) {
179+
if (metrics instanceof StandardMetricsCollector) {
180+
return ((StandardMetricsCollector) metrics).getConnections().getCount();
181+
} else {
182+
return ((MicrometerMetricsCollector) metrics).getConnections().get();
183+
}
184+
}
185+
186+
long channels(MetricsCollector metrics) {
187+
if (metrics instanceof StandardMetricsCollector) {
188+
return ((StandardMetricsCollector) metrics).getChannels().getCount();
189+
} else {
190+
return ((MicrometerMetricsCollector) metrics).getChannels().get();
191+
}
192+
}
193+
194+
interface MetricsCollectorFactory {
195+
AbstractMetricsCollector create();
196+
}
197+
198+
static class StandardMetricsCollectorFactory implements MetricsCollectorFactory {
199+
@Override
200+
public AbstractMetricsCollector create() {
201+
return new StandardMetricsCollector();
202+
}
203+
}
204+
205+
static class MicrometerMetricsCollectorFactory implements MetricsCollectorFactory {
206+
@Override
207+
public AbstractMetricsCollector create() {
208+
return new MicrometerMetricsCollector(new SimpleMeterRegistry());
209+
}
142210
}
143211

144212
}

0 commit comments

Comments
 (0)