Skip to content

Commit 4055b62

Browse files
committed
Merge branch '4.x.x-stable' into 5.2.x-stable
2 parents b71405c + 3499d2f commit 4055b62

File tree

10 files changed

+329
-122
lines changed

10 files changed

+329
-122
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public class ConnectionFactory implements Cloneable {
134134

135135
private boolean automaticRecovery = true;
136136
private boolean topologyRecovery = true;
137-
137+
private ExecutorService topologyRecoveryExecutor;
138+
138139
// long is used to make sure the users can use both ints
139140
// and longs safely. It is unlikely that anybody'd need
140141
// to use recovery intervals > Integer.MAX_VALUE in practice.
@@ -346,7 +347,7 @@ public void setUri(String uriString)
346347
setUri(new URI(uriString));
347348
}
348349

349-
private String uriDecode(String s) {
350+
private static String uriDecode(String s) {
350351
try {
351352
// URLDecode decodes '+' to a space, as for
352353
// form encoding. So protect plus signs.
@@ -531,7 +532,6 @@ public void setSocketFactory(SocketFactory factory) {
531532
*
532533
* @see #setSocketConfigurator(SocketConfigurator)
533534
*/
534-
@SuppressWarnings("unused")
535535
public SocketConfigurator getSocketConfigurator() {
536536
return socketConf;
537537
}
@@ -727,7 +727,6 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
727727
* @return true if topology recovery is enabled, false otherwise
728728
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
729729
*/
730-
@SuppressWarnings("unused")
731730
public boolean isTopologyRecoveryEnabled() {
732731
return topologyRecovery;
733732
}
@@ -740,6 +739,27 @@ public boolean isTopologyRecoveryEnabled() {
740739
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
741740
this.topologyRecovery = topologyRecovery;
742741
}
742+
743+
/**
744+
* Get the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
745+
* @return thread pool executor
746+
* @since 4.7.0
747+
*/
748+
public ExecutorService getTopologyRecoveryExecutor() {
749+
return topologyRecoveryExecutor;
750+
}
751+
752+
/**
753+
* Set the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
754+
* It is recommended to pass a ThreadPoolExecutor that will allow its core threads to timeout so these threads can die when recovery is complete.
755+
* It's developer's responsibility to shut down the executor when it is no longer needed.
756+
* Note: your {@link ExceptionHandler#handleTopologyRecoveryException(Connection, Channel, TopologyRecoveryException)} method should be thread-safe.
757+
* @param topologyRecoveryExecutor thread pool executor
758+
* @since 4.7.0
759+
*/
760+
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
761+
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
762+
}
743763

744764
public void setMetricsCollector(MetricsCollector metricsCollector) {
745765
this.metricsCollector = metricsCollector;
@@ -1039,6 +1059,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10391059
result.setNetworkRecoveryInterval(networkRecoveryInterval);
10401060
result.setRecoveryDelayHandler(recoveryDelayHandler);
10411061
result.setTopologyRecovery(topologyRecovery);
1062+
result.setTopologyRecoveryExecutor(topologyRecoveryExecutor);
10421063
result.setExceptionHandler(exceptionHandler);
10431064
result.setThreadFactory(threadFactory);
10441065
result.setHandshakeTimeout(handshakeTimeout);

src/main/java/com/rabbitmq/client/ExceptionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void handleConsumerException(Channel channel,
9898
* during topology (exchanges, queues, bindings, consumers) recovery
9999
* that it can't otherwise deal with.
100100
* @param conn the Connection that caught the exception
101-
* @param ch the Channel that caught the exception
101+
* @param ch the Channel that caught the exception. May be null.
102102
* @param exception the exception caught in the driver thread
103103
*/
104104

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ConnectionParams {
4141
private long networkRecoveryInterval;
4242
private RecoveryDelayHandler recoveryDelayHandler;
4343
private boolean topologyRecovery;
44+
private ExecutorService topologyRecoveryExecutor;
4445
private int channelRpcTimeout;
4546
private boolean channelShouldCheckRpcResponseType;
4647
private ErrorOnWriteListener errorOnWriteListener;
@@ -114,10 +115,18 @@ public RecoveryDelayHandler getRecoveryDelayHandler() {
114115
public boolean isTopologyRecoveryEnabled() {
115116
return topologyRecovery;
116117
}
118+
119+
/**
120+
* Get the topology recovery executor. If null, the main connection thread should be used.
121+
* @return executor. May be null.
122+
*/
123+
public ExecutorService getTopologyRecoveryExecutor() {
124+
return topologyRecoveryExecutor;
125+
}
117126

118127
public ThreadFactory getThreadFactory() {
119-
return threadFactory;
120-
}
128+
return threadFactory;
129+
}
121130

122131
public int getChannelRpcTimeout() {
123132
return channelRpcTimeout;
@@ -174,6 +183,10 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand
174183
public void setTopologyRecovery(boolean topologyRecovery) {
175184
this.topologyRecovery = topologyRecovery;
176185
}
186+
187+
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
188+
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
189+
}
177190

178191
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
179192
this.exceptionHandler = exceptionHandler;

0 commit comments

Comments
 (0)