Skip to content

Commit 18c1022

Browse files
author
Simon MacMullen
committed
default to stable
2 parents 2303391 + f6820ed commit 18c1022

33 files changed

+1421
-237
lines changed

build.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@
444444
</jar>
445445
</target>
446446

447-
<target name="dist" depends="jar, test-jar">
447+
<target name="dist" depends="jar, test-jar" description="Build all library JARs and documentation">
448448
<mkdir dir="${dist.out}"/>
449449
<copy todir="${dist.out}">
450450
<!-- ant doesn't seem to provide any form of usable abstraction over sets of file names -->
@@ -464,7 +464,7 @@
464464
</copy>
465465
</target>
466466

467-
<target name="clean">
467+
<target name="clean" description="Cleans build artifacts">
468468
<delete dir="build"/>
469469
</target>
470470

src/com/rabbitmq/client/Channel.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,28 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
343343
boolean internal,
344344
Map<String, Object> arguments) throws IOException;
345345

346+
/**
347+
* Like {@link Channel#exchangeDeclare(String, String, boolean, boolean, java.util.Map)} but
348+
* sets nowait parameter to true and returns nothing (as there will be no response from
349+
* the server).
350+
*
351+
* @param exchange the name of the exchange
352+
* @param type the exchange type
353+
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
354+
* @param autoDelete true if the server should delete the exchange when it is no longer in use
355+
* @param internal true if the exchange is internal, i.e. can't be directly
356+
* published to by a client.
357+
* @param arguments other properties (construction arguments) for the exchange
358+
* @return a declaration-confirm method to indicate the exchange was successfully declared
359+
* @throws java.io.IOException if an error is encountered
360+
*/
361+
void exchangeDeclareNoWait(String exchange,
362+
String type,
363+
boolean durable,
364+
boolean autoDelete,
365+
boolean internal,
366+
Map<String, Object> arguments) throws IOException;
367+
346368
/**
347369
* Declare an exchange passively; that is, check if the named exchange exists.
348370
* @param name check the existence of an exchange named this
@@ -361,6 +383,18 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
361383
*/
362384
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
363385

386+
/**
387+
* Like {@link Channel#exchangeDelete(String, boolean)} but sets nowait parameter to true
388+
* and returns void (as there will be no response from the server).
389+
* @see com.rabbitmq.client.AMQP.Exchange.Delete
390+
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
391+
* @param exchange the name of the exchange
392+
* @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
393+
* @throws java.io.IOException if an error is encountered
394+
*/
395+
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
396+
397+
364398
/**
365399
* Delete an exchange, without regard for whether it is in use or not
366400
* @see com.rabbitmq.client.AMQP.Exchange.Delete
@@ -396,6 +430,17 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
396430
*/
397431
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
398432

433+
/**
434+
* Like {@link Channel#exchangeBind(String, String, String, java.util.Map)} but sets nowait parameter
435+
* to true and returns void (as there will be no response from the server).
436+
* @param destination the name of the exchange to which messages flow across the binding
437+
* @param source the name of the exchange from which messages flow across the binding
438+
* @param routingKey the routine key to use for the binding
439+
* @param arguments other properties (binding parameters)
440+
* @throws java.io.IOException if an error is encountered
441+
*/
442+
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
443+
399444
/**
400445
* Unbind an exchange from an exchange, with no extra arguments.
401446
* @see com.rabbitmq.client.AMQP.Exchange.Bind
@@ -421,6 +466,17 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
421466
*/
422467
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
423468

469+
/**
470+
* Same as {@link Channel#exchangeUnbind(String, String, String, java.util.Map)} but sets no-wait parameter to true
471+
* and returns nothing (as there will be no response from the server).
472+
* @param destination the name of the exchange to which messages flow across the binding
473+
* @param source the name of the exchange from which messages flow across the binding
474+
* @param routingKey the routine key to use for the binding
475+
* @param arguments other properties (binding parameters)
476+
* @throws java.io.IOException if an error is encountered
477+
*/
478+
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
479+
424480
/**
425481
* Actively declare a server-named exclusive, autodelete, non-durable queue.
426482
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
@@ -446,6 +502,19 @@ Exchange.DeclareOk exchangeDeclare(String exchange,
446502
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
447503
Map<String, Object> arguments) throws IOException;
448504

505+
/**
506+
* Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
507+
* flag to true and returns no result (as there will be no response from the server).
508+
* @param queue the name of the queue
509+
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
510+
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
511+
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
512+
* @param arguments other properties (construction arguments) for the queue
513+
* @throws java.io.IOException if an error is encountered
514+
*/
515+
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
516+
Map<String, Object> arguments) throws IOException;
517+
449518
/**
450519
* Declare a queue passively; i.e., check if it exists. In AMQP
451520
* 0-9-1, all arguments aside from nowait are ignored; and sending
@@ -482,6 +551,18 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
482551
*/
483552
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
484553

554+
/**
555+
* Like {@link Channel#queueDelete(String, boolean, boolean)} but sets nowait parameter
556+
* to true and returns nothing (as there will be no response from the server).
557+
* @see com.rabbitmq.client.AMQP.Queue.Delete
558+
* @see com.rabbitmq.client.AMQP.Queue.DeleteOk
559+
* @param queue the name of the queue
560+
* @param ifUnused true if the queue should be deleted only if not in use
561+
* @param ifEmpty true if the queue should be deleted only if empty
562+
* @throws java.io.IOException if an error is encountered
563+
*/
564+
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
565+
485566
/**
486567
* Bind a queue to an exchange, with no extra arguments.
487568
* @see com.rabbitmq.client.AMQP.Queue.Bind
@@ -507,6 +588,18 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
507588
*/
508589
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
509590

591+
/**
592+
* Same as {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
593+
* parameter to true and returns void (as there will be no response
594+
* from the server).
595+
* @param queue the name of the queue
596+
* @param exchange the name of the exchange
597+
* @param routingKey the routine key to use for the binding
598+
* @param arguments other properties (binding parameters)
599+
* @throws java.io.IOException if an error is encountered
600+
*/
601+
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
602+
510603
/**
511604
* Unbinds a queue from an exchange, with no extra arguments.
512605
* @see com.rabbitmq.client.AMQP.Queue.Unbind

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,12 @@ public class ConnectionFactory implements Cloneable {
7575
/** The default connection timeout;
7676
* zero means wait indefinitely */
7777
public static final int DEFAULT_CONNECTION_TIMEOUT = 0;
78+
/** The default shutdown timeout;
79+
* zero means wait indefinitely */
80+
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
7881

7982
/** The default SSL protocol */
80-
private static final String DEFAULT_SSL_PROTOCOL = "SSLv3";
83+
private static final String DEFAULT_SSL_PROTOCOL = "TLSv1";
8184

8285
private String username = DEFAULT_USER;
8386
private String password = DEFAULT_PASS;
@@ -88,6 +91,7 @@ public class ConnectionFactory implements Cloneable {
8891
private int requestedFrameMax = DEFAULT_FRAME_MAX;
8992
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
9093
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
94+
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
9195
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
9296
private SocketFactory factory = SocketFactory.getDefault();
9397
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
@@ -99,7 +103,10 @@ public class ConnectionFactory implements Cloneable {
99103
private boolean automaticRecovery = false;
100104
private boolean topologyRecovery = true;
101105

102-
private int networkRecoveryInterval = 5000;
106+
// long is used to make sure the users can use both ints
107+
// and longs safely. It is unlikely that anybody'd need
108+
// to use recovery intervals > Integer.MAX_VALUE in practice.
109+
private long networkRecoveryInterval = 5000;
103110

104111
/** @return number of consumer threads in default {@link ExecutorService} */
105112
@Deprecated
@@ -330,6 +337,26 @@ public int getConnectionTimeout() {
330337
return this.connectionTimeout;
331338
}
332339

340+
/**
341+
* Set the shutdown timeout. This is the amount of time that Consumer implementations have to
342+
* continue working through deliveries (and other Consumer callbacks) <b>after</b> the connection
343+
* has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout
344+
* then any remaining queued deliveries (and other Consumer callbacks, <b>including</b>
345+
* the Consumer's handleShutdownSignal() invocation) will be lost.
346+
* @param shutdownTimeout shutdown timeout in milliseconds; zero for infinite; default 10000
347+
*/
348+
public void setShutdownTimeout(int shutdownTimeout) {
349+
this.shutdownTimeout = shutdownTimeout;
350+
}
351+
352+
/**
353+
* Retrieve the shutdown timeout.
354+
* @return the shutdown timeout, in milliseconds; zero for infinite
355+
*/
356+
public int getShutdownTimeout() {
357+
return shutdownTimeout;
358+
}
359+
333360
/**
334361
* Set the requested heartbeat.
335362
* @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
@@ -493,7 +520,7 @@ public void useSslProtocol(String protocol)
493520

494521
/**
495522
* Convenience method for setting up an SSL socket factory.
496-
* Pass in the SSL protocol to use, e.g. "TLS" or "SSLv3".
523+
* Pass in the SSL protocol to use, e.g. "TLSv1" or "TLSv1.2".
497524
*
498525
* @param protocol SSL protocol to use.
499526
*/
@@ -599,7 +626,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
599626

600627
public ConnectionParams params(ExecutorService executor) {
601628
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
602-
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
629+
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
603630
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
604631
}
605632

@@ -638,7 +665,7 @@ public Connection newConnection(ExecutorService executor) throws IOException {
638665
* Returns automatic connection recovery interval in milliseconds.
639666
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
640667
*/
641-
public int getNetworkRecoveryInterval() {
668+
public long getNetworkRecoveryInterval() {
642669
return networkRecoveryInterval;
643670
}
644671

@@ -649,4 +676,12 @@ public int getNetworkRecoveryInterval() {
649676
public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
650677
this.networkRecoveryInterval = networkRecoveryInterval;
651678
}
679+
680+
/**
681+
* Sets connection recovery interval. Default is 5000.
682+
* @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
683+
*/
684+
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
685+
this.networkRecoveryInterval = networkRecoveryInterval;
686+
}
652687
}

src/com/rabbitmq/client/RpcClient.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@
3636
import com.rabbitmq.utility.BlockingCell;
3737

3838
/**
39-
* Convenience class which manages a temporary reply queue for simple RPC-style communication.
39+
* Convenience class which manages simple RPC-style communication.
4040
* The class is agnostic about the format of RPC arguments / return values.
4141
* It simply provides a mechanism for sending a message to an exchange with a given routing key,
42-
* and waiting for a response on a reply queue.
42+
* and waiting for a response.
4343
*/
4444
public class RpcClient {
4545
/** Channel we are communicating on */
@@ -58,8 +58,6 @@ public class RpcClient {
5858
/** Contains the most recently-used request correlation ID */
5959
private int _correlationId;
6060

61-
/** The name of our private reply queue */
62-
private String _replyQueue;
6361
/** Consumer attached to our reply queue */
6462
private DefaultConsumer _consumer;
6563

@@ -73,7 +71,6 @@ public class RpcClient {
7371
* @param routingKey the routing key
7472
* @param timeout milliseconds before timing out on wait for response
7573
* @throws IOException if an error is encountered
76-
* @see #setupReplyQueue
7774
*/
7875
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
7976
_channel = channel;
@@ -83,7 +80,6 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
8380
_timeout = timeout;
8481
_correlationId = 0;
8582

86-
_replyQueue = setupReplyQueue();
8783
_consumer = setupConsumer();
8884
}
8985

@@ -98,7 +94,6 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
9894
* @param exchange the exchange to connect to
9995
* @param routingKey the routing key
10096
* @throws IOException if an error is encountered
101-
* @see #setupReplyQueue
10297
*/
10398
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException {
10499
this(channel, exchange, routingKey, NO_TIMEOUT);
@@ -125,16 +120,6 @@ public void close() throws IOException {
125120
}
126121
}
127122

128-
/**
129-
* Creates a server-named exclusive autodelete queue to use for
130-
* receiving replies to RPC requests.
131-
* @throws IOException if an error is encountered
132-
* @return the name of the reply queue
133-
*/
134-
protected String setupReplyQueue() throws IOException {
135-
return _channel.queueDeclare("", false, true, true, null).getQueue();
136-
}
137-
138123
/**
139124
* Registers a consumer on the reply queue.
140125
* @throws IOException if an error is encountered
@@ -167,7 +152,7 @@ public void handleDelivery(String consumerTag,
167152
}
168153
}
169154
};
170-
_channel.basicConsume(_replyQueue, true, consumer);
155+
_channel.basicConsume("amq.rabbitmq.reply-to", true, consumer);
171156
return consumer;
172157
}
173158

@@ -186,7 +171,7 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
186171
_correlationId++;
187172
String replyId = "" + _correlationId;
188173
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
189-
.correlationId(replyId).replyTo(_replyQueue).build();
174+
.correlationId(replyId).replyTo("amq.rabbitmq.reply-to").build();
190175
_continuationMap.put(replyId, k);
191176
}
192177
publish(props, message);
@@ -332,14 +317,6 @@ public int getCorrelationId() {
332317
return _correlationId;
333318
}
334319

335-
/**
336-
* Retrieve the reply queue.
337-
* @return the name of the client's reply queue
338-
*/
339-
public String getReplyQueue() {
340-
return _replyQueue;
341-
}
342-
343320
/**
344321
* Retrieve the consumer.
345322
* @return an interface to the client's consumer object

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public static final Map<String, Object> defaultClientProperties() {
139139
private final int requestedHeartbeat;
140140
private final int requestedChannelMax;
141141
private final int requestedFrameMax;
142+
private final int shutdownTimeout;
142143
private final String username;
143144
private final String password;
144145
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
@@ -217,6 +218,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
217218
this.requestedFrameMax = params.getRequestedFrameMax();
218219
this.requestedChannelMax = params.getRequestedChannelMax();
219220
this.requestedHeartbeat = params.getRequestedHeartbeat();
221+
this.shutdownTimeout = params.getShutdownTimeout();
220222
this.saslConfig = params.getSaslConfig();
221223
this.executor = params.getExecutor();
222224
this.threadFactory = params.getThreadFactory();
@@ -229,7 +231,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
229231
}
230232

231233
private void initializeConsumerWorkService() {
232-
this._workService = new ConsumerWorkService(executor, threadFactory);
234+
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
233235
}
234236

235237
private void initializeHeartbeatSender() {

0 commit comments

Comments
 (0)