Skip to content

Commit f892aaa

Browse files
authored
Merge pull request #370 from vikinghawk/4.x.x-stable
Multi-threaded topology recovery
2 parents a44f168 + b29ebe6 commit f892aaa

File tree

10 files changed

+309
-122
lines changed

10 files changed

+309
-122
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public class ConnectionFactory implements Cloneable {
129129

130130
private boolean automaticRecovery = true;
131131
private boolean topologyRecovery = true;
132-
132+
private ExecutorService topologyRecoveryExecutor;
133+
133134
// long is used to make sure the users can use both ints
134135
// and longs safely. It is unlikely that anybody'd need
135136
// to use recovery intervals > Integer.MAX_VALUE in practice.
@@ -341,7 +342,7 @@ public void setUri(String uriString)
341342
setUri(new URI(uriString));
342343
}
343344

344-
private String uriDecode(String s) {
345+
private static String uriDecode(String s) {
345346
try {
346347
// URLDecode decodes '+' to a space, as for
347348
// form encoding. So protect plus signs.
@@ -525,7 +526,6 @@ public void setSocketFactory(SocketFactory factory) {
525526
*
526527
* @see #setSocketConfigurator(SocketConfigurator)
527528
*/
528-
@SuppressWarnings("unused")
529529
public SocketConfigurator getSocketConfigurator() {
530530
return socketConf;
531531
}
@@ -703,7 +703,6 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
703703
* @return true if topology recovery is enabled, false otherwise
704704
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
705705
*/
706-
@SuppressWarnings("unused")
707706
public boolean isTopologyRecoveryEnabled() {
708707
return topologyRecovery;
709708
}
@@ -716,6 +715,24 @@ public boolean isTopologyRecoveryEnabled() {
716715
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
717716
this.topologyRecovery = topologyRecovery;
718717
}
718+
719+
/**
720+
* Get the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
721+
* @return thread pool executor
722+
*/
723+
public ExecutorService getTopologyRecoveryExecutor() {
724+
return topologyRecoveryExecutor;
725+
}
726+
727+
/**
728+
* Set the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
729+
* It is recommended to pass a ThreadPoolExecutor that will allow its core threads to timeout so these threads can die when recovery is complete.
730+
* Note: your {@link ExceptionHandler#handleTopologyRecoveryException(Connection, Channel, TopologyRecoveryException)} method should be thread-safe.
731+
* @param topologyRecoveryExecutor thread pool executor
732+
*/
733+
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
734+
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
735+
}
719736

720737
public void setMetricsCollector(MetricsCollector metricsCollector) {
721738
this.metricsCollector = metricsCollector;
@@ -1015,6 +1032,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10151032
result.setNetworkRecoveryInterval(networkRecoveryInterval);
10161033
result.setRecoveryDelayHandler(recoveryDelayHandler);
10171034
result.setTopologyRecovery(topologyRecovery);
1035+
result.setTopologyRecoveryExecutor(topologyRecoveryExecutor);
10181036
result.setExceptionHandler(exceptionHandler);
10191037
result.setThreadFactory(threadFactory);
10201038
result.setHandshakeTimeout(handshakeTimeout);

src/main/java/com/rabbitmq/client/ExceptionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void handleConsumerException(Channel channel,
108108
* during topology (exchanges, queues, bindings, consumers) recovery
109109
* that it can't otherwise deal with.
110110
* @param conn the Connection that caught the exception
111-
* @param ch the Channel that caught the exception
111+
* @param ch the Channel that caught the exception. May be null.
112112
* @param exception the exception caught in the driver thread
113113
*/
114114

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ConnectionParams {
4141
private long networkRecoveryInterval;
4242
private RecoveryDelayHandler recoveryDelayHandler;
4343
private boolean topologyRecovery;
44+
private ExecutorService topologyRecoveryExecutor;
4445
private int channelRpcTimeout;
4546
private boolean channelShouldCheckRpcResponseType;
4647
private ErrorOnWriteListener errorOnWriteListener;
@@ -114,10 +115,18 @@ public RecoveryDelayHandler getRecoveryDelayHandler() {
114115
public boolean isTopologyRecoveryEnabled() {
115116
return topologyRecovery;
116117
}
118+
119+
/**
120+
* Get the topology recovery executor. If null, the main connection thread should be used.
121+
* @return executor. May be null.
122+
*/
123+
public ExecutorService getTopologyRecoveryExecutor() {
124+
return topologyRecoveryExecutor;
125+
}
117126

118127
public ThreadFactory getThreadFactory() {
119-
return threadFactory;
120-
}
128+
return threadFactory;
129+
}
121130

122131
public int getChannelRpcTimeout() {
123132
return channelRpcTimeout;
@@ -174,6 +183,10 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand
174183
public void setTopologyRecovery(boolean topologyRecovery) {
175184
this.topologyRecovery = topologyRecovery;
176185
}
186+
187+
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
188+
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
189+
}
177190

178191
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
179192
this.exceptionHandler = exceptionHandler;

0 commit comments

Comments
 (0)