Skip to content

Commit e3a6039

Browse files
authored
Merge pull request #309 from vikinghawk/4.3.x-stable
Add backoff strategy for recovery retry interval Fixes #308
2 parents 815ae9c + d4c93ab commit e3a6039

File tree

5 files changed

+181
-8
lines changed

5 files changed

+181
-8
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public class ConnectionFactory implements Cloneable {
7777

7878
/** The default continuation timeout for RPC calls in channels: 10 minutes */
7979
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);
80+
81+
/** The default network recovery interval: 5000 millis */
82+
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
8083

8184
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
8285

@@ -111,7 +114,8 @@ public class ConnectionFactory implements Cloneable {
111114
// long is used to make sure the users can use both ints
112115
// and longs safely. It is unlikely that anybody'd need
113116
// to use recovery intervals > Integer.MAX_VALUE in practice.
114-
private long networkRecoveryInterval = 5000;
117+
private long networkRecoveryInterval = DEFAULT_NETWORK_RECOVERY_INTERVAL;
118+
private RecoveryDelayHandler recoveryDelayHandler;
115119

116120
private MetricsCollector metricsCollector;
117121

@@ -960,6 +964,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
960964
result.setShutdownTimeout(shutdownTimeout);
961965
result.setSaslConfig(saslConfig);
962966
result.setNetworkRecoveryInterval(networkRecoveryInterval);
967+
result.setRecoveryDelayHandler(recoveryDelayHandler);
963968
result.setTopologyRecovery(topologyRecovery);
964969
result.setExceptionHandler(exceptionHandler);
965970
result.setThreadFactory(threadFactory);
@@ -1077,6 +1082,22 @@ public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
10771082
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
10781083
this.networkRecoveryInterval = networkRecoveryInterval;
10791084
}
1085+
1086+
/**
1087+
* Returns automatic connection recovery delay handler.
1088+
* @return recovery delay handler. May be null if not set.
1089+
*/
1090+
public RecoveryDelayHandler getRecoveryDelayHandler() {
1091+
return recoveryDelayHandler;
1092+
}
1093+
1094+
/**
1095+
* Sets the automatic connection recovery delay handler.
1096+
* @param recoveryDelayHandler the recovery delay handler
1097+
*/
1098+
public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) {
1099+
this.recoveryDelayHandler = recoveryDelayHandler;
1100+
}
10801101

10811102
/**
10821103
* Sets the parameters when using NIO.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.Arrays;
4+
import java.util.Collections;
5+
import java.util.List;
6+
7+
/**
8+
* A RecoveryDelayHandler is used to tell automatic recovery how long to sleep between reconnect attempts.
9+
*
10+
* @since 4.3.0
11+
*/
12+
public interface RecoveryDelayHandler {
13+
14+
/**
15+
* Get the time to sleep (in milliseconds) before attempting to reconnect and recover again.
16+
* This method will be called with recoveryAttempts=0 before the first recovery attempt and then again after each failed recovery.
17+
*
18+
* @param recoveryAttempts
19+
* The number of recovery attempts so far.
20+
* @return the delay in milliseconds
21+
*/
22+
long getDelay(final int recoveryAttempts);
23+
24+
/**
25+
* Basic implementation of {@link RecoveryDelayHandler} that returns the {@link ConnectionFactory#getNetworkRecoveryInterval() network recovery interval} each time.
26+
*/
27+
public static class DefaultRecoveryDelayHandler implements RecoveryDelayHandler {
28+
29+
private final long networkRecoveryInterval;
30+
31+
/**
32+
* Default Constructor
33+
* @param networkRecoveryInterval
34+
* recovery delay time in millis
35+
*/
36+
public DefaultRecoveryDelayHandler(final long networkRecoveryInterval) {
37+
this.networkRecoveryInterval = networkRecoveryInterval;
38+
}
39+
40+
@Override
41+
public long getDelay(int recoveryAttempts) {
42+
return networkRecoveryInterval;
43+
}
44+
}
45+
46+
/**
47+
* Backoff implementation of {@link RecoveryDelayHandler} that uses the Fibonacci sequence (by default) to increase the recovery delay time after each failed attempt.
48+
* You can optionally use your own backoff sequence.
49+
*/
50+
public static class ExponentialBackoffDelayHandler implements RecoveryDelayHandler {
51+
52+
private final List<Long> sequence;
53+
54+
/**
55+
* Default Constructor. Uses the fibonacci sequence: {0, 1000, 1000, 2000, 3000, 5000, 8000, 13000, 21000}.
56+
*/
57+
public ExponentialBackoffDelayHandler() {
58+
sequence = Arrays.asList(0L, 1000L, 1000L, 2000L, 3000L, 5000L, 8000L, 13000L, 21000L);
59+
}
60+
61+
/**
62+
* Constructor for passing your own backoff sequence
63+
*
64+
* @param sequence
65+
* List of recovery delay values in milliseconds.
66+
* @throws IllegalArgumentException if the sequence is null or empty
67+
*/
68+
public ExponentialBackoffDelayHandler(final List<Long> sequence) {
69+
if (sequence == null || sequence.isEmpty())
70+
throw new IllegalArgumentException();
71+
this.sequence = Collections.unmodifiableList(sequence);
72+
}
73+
74+
@Override
75+
public long getDelay(int recoveryAttempts) {
76+
return sequence.get(recoveryAttempts >= sequence.size() ? sequence.size() - 1 : recoveryAttempts);
77+
}
78+
}
79+
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.rabbitmq.client.impl;
1717

1818
import com.rabbitmq.client.ExceptionHandler;
19+
import com.rabbitmq.client.RecoveryDelayHandler;
20+
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
1921
import com.rabbitmq.client.SaslConfig;
2022

2123
import java.util.Map;
@@ -38,6 +40,7 @@ public class ConnectionParams {
3840
private int shutdownTimeout;
3941
private SaslConfig saslConfig;
4042
private long networkRecoveryInterval;
43+
private RecoveryDelayHandler recoveryDelayHandler;
4144
private boolean topologyRecovery;
4245
private int channelRpcTimeout;
4346
private boolean channelShouldCheckRpcResponseType;
@@ -102,6 +105,14 @@ public ExceptionHandler getExceptionHandler() {
102105
public long getNetworkRecoveryInterval() {
103106
return networkRecoveryInterval;
104107
}
108+
109+
/**
110+
* Get the recovery delay handler.
111+
* @return recovery delay handler or if none was set a {@link DefaultRecoveryDelayHandler} will be returned with a delay of {@link #getNetworkRecoveryInterval()}.
112+
*/
113+
public RecoveryDelayHandler getRecoveryDelayHandler() {
114+
return recoveryDelayHandler == null ? new DefaultRecoveryDelayHandler(networkRecoveryInterval) : recoveryDelayHandler;
115+
}
105116

106117
public boolean isTopologyRecoveryEnabled() {
107118
return topologyRecovery;
@@ -162,6 +173,10 @@ public void setSaslConfig(SaslConfig saslConfig) {
162173
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
163174
this.networkRecoveryInterval = networkRecoveryInterval;
164175
}
176+
177+
public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) {
178+
this.recoveryDelayHandler = recoveryDelayHandler;
179+
}
165180

166181
public void setTopologyRecovery(boolean topologyRecovery) {
167182
this.topologyRecovery = topologyRecovery;

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
487487
}
488488

489489
synchronized private void beginAutomaticRecovery() throws InterruptedException {
490-
Thread.sleep(this.params.getNetworkRecoveryInterval());
490+
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(0));
491491

492492
this.notifyRecoveryListenersStarted();
493493

@@ -525,9 +525,10 @@ private void recoverBlockedListeners(final RecoveryAwareAMQConnection newConn) {
525525
// Returns new connection if the connection was recovered,
526526
// null if application initiated shutdown while attempting recovery.
527527
private RecoveryAwareAMQConnection recoverConnection() throws InterruptedException {
528-
while (!manuallyClosed)
529-
{
528+
int attempts = 0;
529+
while (!manuallyClosed) {
530530
try {
531+
attempts++;
531532
RecoveryAwareAMQConnection newConn = this.cf.newConnection();
532533
synchronized(recoveryLock) {
533534
if (!manuallyClosed) {
@@ -541,8 +542,7 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
541542
newConn.abort();
542543
return null;
543544
} catch (Exception e) {
544-
// TODO: exponential back-off
545-
Thread.sleep(this.params.getNetworkRecoveryInterval());
545+
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
546546
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
547547
}
548548
}
@@ -561,13 +561,13 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
561561
}
562562

563563
private void notifyRecoveryListenersComplete() {
564-
for (RecoveryListener f : this.recoveryListeners) {
564+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
565565
f.handleRecovery(this);
566566
}
567567
}
568568

569569
private void notifyRecoveryListenersStarted() {
570-
for (RecoveryListener f : this.recoveryListeners) {
570+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
571571
f.handleRecoveryStarted(this);
572572
}
573573
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.rabbitmq.client.test;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.Arrays;
6+
import java.util.Collections;
7+
8+
import com.rabbitmq.client.RecoveryDelayHandler;
9+
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
10+
import com.rabbitmq.client.RecoveryDelayHandler.ExponentialBackoffDelayHandler;
11+
12+
import org.junit.Test;
13+
14+
public class RecoveryDelayHandlerTest {
15+
16+
@Test
17+
public void testDefaultRecoveryDelayHandler() {
18+
final RecoveryDelayHandler handler = new DefaultRecoveryDelayHandler(5000);
19+
assertEquals(5000L, handler.getDelay(0));
20+
assertEquals(5000L, handler.getDelay(1));
21+
assertEquals(5000L, handler.getDelay(Integer.MAX_VALUE));
22+
}
23+
24+
@Test
25+
public void testExponentialBackoffDelayHandler_default() {
26+
final RecoveryDelayHandler handler = new ExponentialBackoffDelayHandler();
27+
assertEquals(0, handler.getDelay(0));
28+
assertEquals(1000L, handler.getDelay(1));
29+
assertEquals(1000L, handler.getDelay(2));
30+
assertEquals(2000L, handler.getDelay(3));
31+
assertEquals(3000L, handler.getDelay(4));
32+
assertEquals(5000L, handler.getDelay(5));
33+
assertEquals(8000L, handler.getDelay(6));
34+
assertEquals(13000L, handler.getDelay(7));
35+
assertEquals(21000L, handler.getDelay(8));
36+
assertEquals(21000L, handler.getDelay(9));
37+
assertEquals(21000L, handler.getDelay(Integer.MAX_VALUE));
38+
}
39+
40+
@Test
41+
public void testExponentialBackoffDelayHandler_sequence() {
42+
final RecoveryDelayHandler handler = new ExponentialBackoffDelayHandler(Arrays.asList(1L, 2L));
43+
assertEquals(1, handler.getDelay(0));
44+
assertEquals(2, handler.getDelay(1));
45+
assertEquals(2, handler.getDelay(2));
46+
assertEquals(2, handler.getDelay(Integer.MAX_VALUE));
47+
}
48+
49+
@Test(expected=IllegalArgumentException.class)
50+
public void testExponentialBackoffDelayHandler_sequence_null() {
51+
new ExponentialBackoffDelayHandler(null);
52+
}
53+
54+
@Test(expected=IllegalArgumentException.class)
55+
public void testExponentialBackoffDelayHandler_sequence_empty() {
56+
new ExponentialBackoffDelayHandler(Collections.<Long>emptyList());
57+
}
58+
}

0 commit comments

Comments
 (0)