Skip to content

Commit 4c036d8

Browse files
author
Simon MacMullen
committed
Merge bug26402
2 parents de60c33 + e2e3d99 commit 4c036d8

File tree

7 files changed

+52
-8
lines changed

7 files changed

+52
-8
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public class ConnectionFactory implements Cloneable {
7575
/** The default connection timeout;
7676
* zero means wait indefinitely */
7777
public static final int DEFAULT_CONNECTION_TIMEOUT = 0;
78+
/** The default shutdown timeout;
79+
* zero means wait indefinitely */
80+
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
7881

7982
/** The default SSL protocol */
8083
private static final String DEFAULT_SSL_PROTOCOL = "SSLv3";
@@ -88,6 +91,7 @@ public class ConnectionFactory implements Cloneable {
8891
private int requestedFrameMax = DEFAULT_FRAME_MAX;
8992
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
9093
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
94+
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
9195
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
9296
private SocketFactory factory = SocketFactory.getDefault();
9397
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
@@ -333,6 +337,26 @@ public int getConnectionTimeout() {
333337
return this.connectionTimeout;
334338
}
335339

340+
/**
341+
* Set the shutdown timeout. This is the amount of time that Consumer implementations have to
342+
* continue working through deliveries (and other Consumer callbacks) <b>after</b> the connection
343+
* has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout
344+
* then any remaining queued deliveries (and other Consumer callbacks, <b>including</b>
345+
* the Consumer's handleShutdownSignal() invocation) will be lost.
346+
* @param shutdownTimeout shutdown timeout in milliseconds; zero for infinite; default 10000
347+
*/
348+
public void setShutdownTimeout(int shutdownTimeout) {
349+
this.shutdownTimeout = shutdownTimeout;
350+
}
351+
352+
/**
353+
* Retrieve the shutdown timeout.
354+
* @return the shutdown timeout, in milliseconds; zero for infinite
355+
*/
356+
public int getShutdownTimeout() {
357+
return shutdownTimeout;
358+
}
359+
336360
/**
337361
* Set the requested heartbeat.
338362
* @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
@@ -602,7 +626,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
602626

603627
public ConnectionParams params(ExecutorService executor) {
604628
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
605-
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
629+
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
606630
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
607631
}
608632

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public static final Map<String, Object> defaultClientProperties() {
139139
private final int requestedHeartbeat;
140140
private final int requestedChannelMax;
141141
private final int requestedFrameMax;
142+
private final int shutdownTimeout;
142143
private final String username;
143144
private final String password;
144145
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
@@ -217,6 +218,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
217218
this.requestedFrameMax = params.getRequestedFrameMax();
218219
this.requestedChannelMax = params.getRequestedChannelMax();
219220
this.requestedHeartbeat = params.getRequestedHeartbeat();
221+
this.shutdownTimeout = params.getShutdownTimeout();
220222
this.saslConfig = params.getSaslConfig();
221223
this.executor = params.getExecutor();
222224
this.threadFactory = params.getThreadFactory();
@@ -229,7 +231,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
229231
}
230232

231233
private void initializeConsumerWorkService() {
232-
this._workService = new ConsumerWorkService(executor, threadFactory);
234+
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
233235
}
234236

235237
private void initializeHeartbeatSender() {

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
3434
*/
3535
public class ChannelManager {
36-
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;
37-
3836
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
3937
private final Object monitor = new Object();
4038
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
@@ -108,7 +106,13 @@ private void scheduleShutdownProcessing() {
108106
Runnable target = new Runnable() {
109107
public void run() {
110108
for (CountDownLatch latch : sdSet) {
111-
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { /*ignored*/ }
109+
try {
110+
int shutdownTimeout = ssWorkService.getShutdownTimeout();
111+
if (shutdownTimeout == 0) latch.await();
112+
else latch.await(shutdownTimeout, TimeUnit.MILLISECONDS);
113+
} catch (Throwable e) {
114+
/*ignored*/
115+
}
112116
}
113117
ssWorkService.shutdown();
114118
}

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class ConnectionParams {
1616
private final int requestedFrameMax;
1717
private final int requestedChannelMax;
1818
private final int requestedHeartbeat;
19+
private final int shutdownTimeout;
1920
private final SaslConfig saslConfig;
2021
private final long networkRecoveryInterval;
2122
private final boolean topologyRecovery;
@@ -41,7 +42,7 @@ public class ConnectionParams {
4142
public ConnectionParams(String username, String password, ExecutorService executor,
4243
String virtualHost, Map<String, Object> clientProperties,
4344
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
44-
SaslConfig saslConfig, long networkRecoveryInterval,
45+
int shutdownTimeout, SaslConfig saslConfig, long networkRecoveryInterval,
4546
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
4647
this.username = username;
4748
this.password = password;
@@ -51,6 +52,7 @@ public ConnectionParams(String username, String password, ExecutorService execut
5152
this.requestedFrameMax = requestedFrameMax;
5253
this.requestedChannelMax = requestedChannelMax;
5354
this.requestedHeartbeat = requestedHeartbeat;
55+
this.shutdownTimeout = shutdownTimeout;
5456
this.saslConfig = saslConfig;
5557
this.networkRecoveryInterval = networkRecoveryInterval;
5658
this.topologyRecovery = topologyRecovery;
@@ -90,6 +92,10 @@ public int getRequestedHeartbeat() {
9092
return requestedHeartbeat;
9193
}
9294

95+
public int getShutdownTimeout() {
96+
return shutdownTimeout;
97+
}
98+
9399
public SaslConfig getSaslConfig() {
94100
return saslConfig;
95101
}

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,18 @@ final public class ConsumerWorkService {
2929
private final ExecutorService executor;
3030
private final boolean privateExecutor;
3131
private final WorkPool<Channel, Runnable> workPool;
32+
private final int shutdownTimeout;
3233

33-
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory) {
34+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
3435
this.privateExecutor = (executor == null);
3536
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
3637
: executor;
3738
this.workPool = new WorkPool<Channel, Runnable>();
39+
this.shutdownTimeout = shutdownTimeout;
40+
}
41+
42+
public int getShutdownTimeout() {
43+
return shutdownTimeout;
3844
}
3945

4046
/**

test/src/com/rabbitmq/client/test/server/ChannelLimitNegotiation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public void testOpeningTooManyChannels() throws Exception {
8080

8181
// Construct a channel directly
8282
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
83-
new ConsumerWorkService(Executors.newSingleThreadExecutor(), Executors.defaultThreadFactory()));
83+
new ConsumerWorkService(Executors.newSingleThreadExecutor(),
84+
Executors.defaultThreadFactory(), ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT));
8485
conn.addShutdownListener(new ShutdownListener() {
8586
public void shutdownCompleted(ShutdownSignalException cause) {
8687
// make sure channel.open continuation is released

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public static void main(String[] args) {
8282
confirm != -1);
8383

8484
ConnectionFactory factory = new ConnectionFactory();
85+
factory.setShutdownTimeout(0); // So we still shut down even with slow consumers
8586
factory.setUri(uri);
8687
factory.setRequestedFrameMax(frameMax);
8788
factory.setRequestedHeartbeat(heartbeat);

0 commit comments

Comments
 (0)