Skip to content

Commit 7f4ae26

Browse files
committed
Add TrafficListener interface
[#161109942] Fixes #411 (cherry picked from commit 0223b61) (cherry picked from commit f613c67) Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java
1 parent 64cd84b commit 7f4ae26

File tree

7 files changed

+232
-0
lines changed

7 files changed

+232
-0
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,15 @@ public void postProcess(ConnectionContext context) {
203203
}
204204
};
205205

206+
/*
207+
* Traffic listener notified of inbound and outbound {@link Command}s.
208+
* <p>
209+
* Useful for debugging purposes. Default is no-op.
210+
*
211+
* @since 4.9.0
212+
*/
213+
private TrafficListener trafficListener = TrafficListener.NO_OP;
214+
206215
/** @return the default host to use for connections */
207216
public String getHost() {
208217
return host;
@@ -1203,6 +1212,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
12031212
result.setErrorOnWriteListener(errorOnWriteListener);
12041213
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
12051214
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
1215+
result.setTrafficListener(trafficListener);
12061216
return result;
12071217
}
12081218

@@ -1539,6 +1549,7 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
15391549

15401550
/**
15411551
* Set filter to include/exclude entities from topology recovery.
1552+
*
15421553
* @since 4.8.0
15431554
*/
15441555
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
@@ -1548,6 +1559,7 @@ public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFil
15481559
/**
15491560
* Set retry handler for topology recovery.
15501561
* Default is no retry.
1562+
*
15511563
* @param topologyRecoveryRetryHandler
15521564
* @since 4.8.0
15531565
*/
@@ -1565,4 +1577,19 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
15651577
public void setConnectionPostProcessor(ConnectionPostProcessor connectionPostProcessor) {
15661578
this.connectionPostProcessor = connectionPostProcessor;
15671579
}
1580+
1581+
/**
1582+
* Traffic listener notified of inbound and outbound {@link Command}s.
1583+
* <p>
1584+
* Useful for debugging purposes, e.g. logging all sent and received messages.
1585+
* Default is no-op.
1586+
*
1587+
* @param trafficListener
1588+
* @see TrafficListener
1589+
* @see com.rabbitmq.client.impl.LogTrafficListener
1590+
* @since 4.9.0
1591+
*/
1592+
public void setTrafficListener(TrafficListener trafficListener) {
1593+
this.trafficListener = trafficListener;
1594+
}
15681595
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Contract to log outbound and inbound {@link Command}s.
5+
*
6+
* @see ConnectionFactory#setTrafficListener(TrafficListener)
7+
* @since 5.5.0
8+
*/
9+
public interface TrafficListener {
10+
11+
/**
12+
* No-op {@link TrafficListener}.
13+
*/
14+
TrafficListener NO_OP = new TrafficListener() {
15+
16+
@Override
17+
public void write(Command outboundCommand) {
18+
19+
}
20+
21+
@Override
22+
public void read(Command inboundCommand) {
23+
24+
}
25+
};
26+
27+
/**
28+
* Notified for each outbound {@link Command}.
29+
*
30+
* @param outboundCommand
31+
*/
32+
void write(Command outboundCommand);
33+
34+
/**
35+
* Notified for each inbound {@link Command}.
36+
*
37+
* @param inboundCommand
38+
*/
39+
void read(Command inboundCommand);
40+
}

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
7373

7474
private final boolean _checkRpcResponseType;
7575

76+
private final TrafficListener _trafficListener;
77+
7678
/**
7779
* Construct a channel on the given connection, with the given channel number.
7880
* @param connection the underlying connection for this channel
@@ -86,6 +88,7 @@ public AMQChannel(AMQConnection connection, int channelNumber) {
8688
}
8789
this._rpcTimeout = connection.getChannelRpcTimeout();
8890
this._checkRpcResponseType = connection.willCheckRpcResponseType();
91+
this._trafficListener = connection.getTrafficListener();
8992
}
9093

9194
/**
@@ -159,6 +162,7 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
159162
// asynchronous commands (deliveries/returns/other events),
160163
// and false for commands that should be passed on to some
161164
// waiting RPC continuation.
165+
this._trafficListener.read(command);
162166
if (!processAsync(command)) {
163167
// The filter decided not to handle/consume the command,
164168
// so it must be a response to an earlier RPC.
@@ -393,6 +397,7 @@ public void quiescingTransmit(AMQCommand c) throws IOException {
393397
ensureIsOpen();
394398
}
395399
}
400+
this._trafficListener.write(c);
396401
c.transmit(this);
397402
}
398403
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public static Map<String, Object> defaultClientProperties() {
141141
protected final MetricsCollector metricsCollector;
142142
private final int channelRpcTimeout;
143143
private final boolean channelShouldCheckRpcResponseType;
144+
private final TrafficListener trafficListener;
144145

145146
/* State modified after start - all volatile */
146147

@@ -237,6 +238,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
237238
this.channelRpcTimeout = params.getChannelRpcTimeout();
238239
this.channelShouldCheckRpcResponseType = params.channelShouldCheckRpcResponseType();
239240

241+
this.trafficListener = params.getTrafficListener() == null ? TrafficListener.NO_OP : params.getTrafficListener();
240242
this._channel0 = new AMQChannel(this, 0) {
241243
@Override public boolean processAsync(Command c) throws IOException {
242244
return getConnection().processControlCommand(c);
@@ -1113,4 +1115,8 @@ public int getChannelRpcTimeout() {
11131115
public boolean willCheckRpcResponseType() {
11141116
return channelShouldCheckRpcResponseType;
11151117
}
1118+
1119+
TrafficListener getTrafficListener() {
1120+
return trafficListener;
1121+
}
11161122
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
2222
import com.rabbitmq.client.ShutdownSignalException;
23+
import com.rabbitmq.client.TrafficListener;
2324
import com.rabbitmq.client.impl.recovery.RetryHandler;
2425
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2526

@@ -55,6 +56,8 @@ public class ConnectionParams {
5556
private ExceptionHandler exceptionHandler;
5657
private ThreadFactory threadFactory;
5758

59+
private TrafficListener trafficListener;
60+
5861
public ConnectionParams() {}
5962

6063
public CredentialsProvider getCredentialsProvider() {
@@ -256,4 +259,12 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
256259
public RetryHandler getTopologyRecoveryRetryHandler() {
257260
return topologyRecoveryRetryHandler;
258261
}
262+
263+
public void setTrafficListener(TrafficListener trafficListener) {
264+
this.trafficListener = trafficListener;
265+
}
266+
267+
public TrafficListener getTrafficListener() {
268+
return trafficListener;
269+
}
259270
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import com.rabbitmq.client.Command;
4+
import com.rabbitmq.client.TrafficListener;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
/**
9+
* {@link TrafficListener} that logs {@link Command} at <code>TRACE</code> level.
10+
* <p>
11+
* This implementation checks whether the <code>TRACE</code> log level
12+
* is enabled before logging anything. This {@link TrafficListener}
13+
* should only be activated for debugging purposes, not in a production
14+
* environment.
15+
*
16+
* @see TrafficListener
17+
* @see com.rabbitmq.client.ConnectionFactory#setTrafficListener(TrafficListener)
18+
* @since 5.5.0
19+
*/
20+
public class LogTrafficListener implements TrafficListener {
21+
22+
private static final Logger LOGGER = LoggerFactory.getLogger(LogTrafficListener.class);
23+
24+
@Override
25+
public void write(Command outboundCommand) {
26+
if (shouldLog(outboundCommand)) {
27+
LOGGER.trace("Outbound command: {}", outboundCommand);
28+
}
29+
}
30+
31+
@Override
32+
public void read(Command inboundCommand) {
33+
if (shouldLog(inboundCommand)) {
34+
LOGGER.trace("Inbound command: {}", inboundCommand);
35+
}
36+
}
37+
38+
protected boolean shouldLog(Command command) {
39+
return LOGGER.isTraceEnabled();
40+
}
41+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.Channel;
20+
import com.rabbitmq.client.Command;
21+
import com.rabbitmq.client.Connection;
22+
import com.rabbitmq.client.ConnectionFactory;
23+
import com.rabbitmq.client.TrafficListener;
24+
import org.junit.Test;
25+
import org.junit.runner.RunWith;
26+
import org.junit.runners.Parameterized;
27+
28+
import java.util.List;
29+
import java.util.UUID;
30+
import java.util.concurrent.CopyOnWriteArrayList;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.function.Consumer;
34+
35+
import static org.junit.Assert.assertEquals;
36+
import static org.junit.Assert.assertTrue;
37+
38+
/**
39+
*
40+
*/
41+
@RunWith(Parameterized.class)
42+
public class TrafficListenerTest {
43+
44+
@Parameterized.Parameter
45+
public Consumer<ConnectionFactory> configurator;
46+
47+
@Parameterized.Parameters
48+
public static Object[] data() {
49+
return new Object[] { automaticRecoveryEnabled(), automaticRecoveryDisabled() };
50+
}
51+
52+
static Consumer<ConnectionFactory> automaticRecoveryEnabled() {
53+
return cf -> cf.setAutomaticRecoveryEnabled(true);
54+
}
55+
56+
static Consumer<ConnectionFactory> automaticRecoveryDisabled() {
57+
return cf -> cf.setAutomaticRecoveryEnabled(false);
58+
}
59+
60+
@Test
61+
public void trafficListenerIsCalled() throws Exception {
62+
ConnectionFactory cf = TestUtils.connectionFactory();
63+
TestTrafficListener testTrafficListener = new TestTrafficListener();
64+
cf.setTrafficListener(testTrafficListener);
65+
configurator.accept(cf);
66+
try (Connection c = cf.newConnection()) {
67+
Channel ch = c.createChannel();
68+
String queue = ch.queueDeclare().getQueue();
69+
CountDownLatch latch = new CountDownLatch(1);
70+
ch.basicConsume(queue, true,
71+
(consumerTag, message) -> latch.countDown(), consumerTag -> {
72+
});
73+
String messageContent = UUID.randomUUID().toString();
74+
ch.basicPublish("", queue, null, messageContent.getBytes());
75+
assertTrue(latch.await(5, TimeUnit.SECONDS));
76+
assertEquals(1, testTrafficListener.outboundContent.size());
77+
assertEquals(messageContent, testTrafficListener.outboundContent.get(0));
78+
assertEquals(1, testTrafficListener.inboundContent.size());
79+
assertEquals(messageContent, testTrafficListener.inboundContent.get(0));
80+
}
81+
}
82+
83+
private static class TestTrafficListener implements TrafficListener {
84+
85+
final List<String> outboundContent = new CopyOnWriteArrayList<>();
86+
final List<String> inboundContent = new CopyOnWriteArrayList<>();
87+
88+
@Override
89+
public void write(Command outboundCommand) {
90+
if (outboundCommand.getMethod() instanceof AMQP.Basic.Publish) {
91+
outboundContent.add(new String(outboundCommand.getContentBody()));
92+
}
93+
}
94+
95+
@Override
96+
public void read(Command inboundCommand) {
97+
if (inboundCommand.getMethod() instanceof AMQP.Basic.Deliver) {
98+
inboundContent.add(new String(inboundCommand.getContentBody()));
99+
}
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)