21
21
import com .rabbitmq .client .Connection ;
22
22
import com .rabbitmq .client .QueueingConsumer ;
23
23
24
+ /**
25
+ * Java Application to create multiple connections with multiple channels,
26
+ * sending and receiving messages on each channel on distinct threads.
27
+ * All messages are sent to and consumed from a common, non-durable, shared, auto-delete queue.
28
+ * <p/>
29
+ * The first thread (thread number 0) outputs statistics for each message it receives.
30
+ */
24
31
public class ManyConnections {
25
- public static double rate ;
26
- public static int connectionCount ;
27
- public static int channelPerConnectionCount ;
28
- public static int heartbeatInterval ;
29
-
30
- public static int totalCount () {
31
- return connectionCount * channelPerConnectionCount ;
32
- }
32
+ private static final String QUEUE_NAME = "ManyConnections" ;
33
+ private static double rate ;
34
+ private static int connectionCount ;
35
+ private static int channelPerConnectionCount ;
36
+ private static int heartbeatInterval ;
33
37
38
+ /**
39
+ * @param args command-line parameters:
40
+ * <p>
41
+ * Four mandatory and one optional positional parameters:
42
+ * </p>
43
+ * <ul>
44
+ * <li><i>AMQP-uri</i> - the AMQP uri to connect to the broker.
45
+ * (See {@link ConnectionFactory#setUri(String) setUri()}.)
46
+ * </li>
47
+ * <li><i>connection-count</i> - the number of connections to create.
48
+ * </li>
49
+ * <li><i>channel-count</i> - the number of channels to create on <i>each</i> connection.
50
+ * </li>
51
+ * <li><i>heartbeat-interval</i> - the heartbeat interval, in seconds, for each channel.
52
+ * Zero means no heartbeats.
53
+ * </li>
54
+ * <li><i>rate</i> - the message rate, in floating point messages per second
55
+ * (<code>0.0 < rate < 50.0</code> is realistic). Default <code>1.0</code>.
56
+ * </li>
57
+ * </ul>
58
+ * <p>
59
+ * There are <i>connection-count</i> x <i>channel-count</i> threads created, each one sending messages at approximately the given rate.
60
+ * </p>
61
+ */
34
62
public static void main (String [] args ) {
35
63
try {
36
64
if (args .length < 4 ) {
37
65
System .err
38
- .println ("Usage: ManyConnections uri connCount chanPerConnCount heartbeatInterval [rate]" );
66
+ .println ("Usage: ManyConnections uri connCount chanPerConnCount heartbeatInterval [rate]" );
39
67
System .exit (2 );
40
68
}
41
69
42
70
String uri = args [0 ];
43
71
connectionCount = Integer .parseInt (args [1 ]);
44
72
channelPerConnectionCount = Integer .parseInt (args [2 ]);
73
+ final int totalCount = connectionCount * channelPerConnectionCount ;
45
74
heartbeatInterval = Integer .parseInt (args [3 ]);
46
75
rate = (args .length > 4 ) ? Double .parseDouble (args [4 ]) : 1.0 ;
76
+ final int delayBetweenMessages = (int ) (1000 / rate );
47
77
48
78
ConnectionFactory factory = new ConnectionFactory ();
49
79
factory .setRequestedHeartbeat (heartbeatInterval );
80
+ factory .setUri (uri );
81
+
50
82
for (int i = 0 ; i < connectionCount ; i ++) {
51
83
System .out .println ("Starting connection " + i );
52
- factory .setUri (uri );
53
84
final Connection conn = factory .newConnection ();
54
85
55
86
for (int j = 0 ; j < channelPerConnectionCount ; j ++) {
@@ -58,14 +89,13 @@ public static void main(String[] args) {
58
89
final int threadNumber = i * channelPerConnectionCount + j ;
59
90
System .out .println ("Starting " + threadNumber + " " + ch
60
91
+ " thread..." );
61
- new Thread (new Runnable () {
62
- public void run () {
63
- runChannel (threadNumber , conn , ch );
64
- }
65
- }).start ();
92
+ new Thread
93
+ ( new ChannelRunnable (threadNumber , ch , delayBetweenMessages , totalCount )
94
+ , "ManyConnections thread " + threadNumber
95
+ ).start ();
66
96
}
67
97
}
68
- System .out .println ("Started " + totalCount ()
98
+ System .out .println ("Started " + totalCount
69
99
+ " channels and threads." );
70
100
} catch (Exception e ) {
71
101
System .err .println ("Main thread caught exception: " + e );
@@ -74,33 +104,49 @@ public void run() {
74
104
}
75
105
}
76
106
77
- public static void runChannel (int threadNumber , Connection conn , Channel ch ) {
107
+ private final static class ChannelRunnable implements Runnable {
108
+
109
+ private final int threadNumber ;
110
+ private final Channel ch ;
111
+ private final int delayBetweenMessages ;
112
+ private final int totalCount ;
113
+
114
+ public ChannelRunnable (int threadNumber , Channel ch , int delayBetweenMessages , int totalCount ) {
115
+ this .threadNumber = threadNumber ;
116
+ this .ch = ch ;
117
+ this .delayBetweenMessages = delayBetweenMessages ;
118
+ this .totalCount = totalCount ;
119
+ }
120
+
121
+ public void run () {
122
+ runChannel (threadNumber , ch , delayBetweenMessages , totalCount );
123
+ }
124
+
125
+ }
126
+
127
+ private final static void runChannel (int threadNumber , final Channel ch , int delayLen , int totalCount ) {
78
128
try {
79
- int delayLen = (int ) (1000 / rate );
80
129
long startTime = System .currentTimeMillis ();
81
130
82
131
int msgCount = 0 ;
83
- String queueName = "ManyConnections" ;
84
- ch .queueDeclare (queueName , false , false , false , null );
132
+ ch .queueDeclare (QUEUE_NAME , false , false , true , null );
85
133
86
134
QueueingConsumer consumer = new QueueingConsumer (ch );
87
- ch .basicConsume (queueName , true , consumer );
135
+ ch .basicConsume (QUEUE_NAME , true , consumer );
88
136
while (true ) {
89
137
String toSend = threadNumber + "/" + msgCount ++;
90
- ch .basicPublish ("" , queueName , null , toSend .getBytes ());
138
+ ch .basicPublish ("" , QUEUE_NAME , null , toSend .getBytes ());
91
139
Thread .sleep (delayLen );
92
140
93
141
QueueingConsumer .Delivery delivery = consumer .nextDelivery ();
94
142
if (threadNumber == 0 ) {
95
143
long now = System .currentTimeMillis ();
96
144
double delta = (now - startTime ) / 1000.0 ;
97
145
double actualRate = msgCount / delta ;
98
- double totalRate = totalCount () * actualRate ;
99
- System .out .println (threadNumber + " got message: "
100
- + new String (delivery .getBody ()) + "; " + msgCount
101
- + " messages in " + delta + " seconds ("
102
- + actualRate + " Hz * " + totalCount ()
103
- + " channels -> " + totalRate + " Hz)" );
146
+ double totalRate = totalCount * actualRate ;
147
+ System .out .println (String .format ("thread %3d got message: %20s; "
148
+ + "%4d messages in %10.3f seconds (%8.2f Hz x %4d channels -> %8.2f Hz)"
149
+ , threadNumber , "\" " + new String (delivery .getBody ()) + "\" " , msgCount , delta , actualRate , totalCount , totalRate ));
104
150
}
105
151
}
106
152
} catch (Exception e ) {
0 commit comments