Skip to content

Commit 9e02adb

Browse files
committed
Add optional retry logic to topology recovery
There's no topology recovery retry by default. The default implementation is composable: not all have the recoverable entities have to retry and the retry operations don't have to be only the corresponding entity recovery, but also other operations, like recovering the corresponding channel. Fixes #387 (cherry picked from commit 34e33ea) Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/impl/ConnectionParams.java src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java src/test/java/com/rabbitmq/client/test/TestUtils.java src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java
1 parent 77b9747 commit 9e02adb

18 files changed

+1412
-154
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.client.impl.nio.NioParams;
3030
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
3131
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
32+
import com.rabbitmq.client.impl.recovery.RetryHandler;
3233
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3334

3435
import java.io.IOException;
@@ -180,6 +181,13 @@ public class ConnectionFactory implements Cloneable {
180181
*/
181182
private TopologyRecoveryFilter topologyRecoveryFilter;
182183

184+
/**
185+
* Retry handler for topology recovery.
186+
* Default is no retry.
187+
* @since 4.8.0
188+
*/
189+
private RetryHandler topologyRecoveryRetryHandler;
190+
183191
/** @return the default host to use for connections */
184192
public String getHost() {
185193
return host;
@@ -1055,6 +1063,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10551063
result.setWorkPoolTimeout(workPoolTimeout);
10561064
result.setErrorOnWriteListener(errorOnWriteListener);
10571065
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
1066+
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
10581067
return result;
10591068
}
10601069

@@ -1396,4 +1405,14 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
13961405
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
13971406
this.topologyRecoveryFilter = topologyRecoveryFilter;
13981407
}
1408+
1409+
/**
1410+
* Set retry handler for topology recovery.
1411+
* Default is no retry.
1412+
* @param topologyRecoveryRetryHandler
1413+
* @since 4.8.0
1414+
*/
1415+
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
1416+
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
1417+
}
13991418
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.rabbitmq.client.RecoveryDelayHandler;
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
22+
import com.rabbitmq.client.ShutdownSignalException;
23+
import com.rabbitmq.client.impl.recovery.RetryHandler;
2224
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2325

2426
import java.util.Map;
@@ -48,6 +50,7 @@ public class ConnectionParams {
4850
private ErrorOnWriteListener errorOnWriteListener;
4951
private int workPoolTimeout = -1;
5052
private TopologyRecoveryFilter topologyRecoveryFilter;
53+
private RetryHandler topologyRecoveryRetryHandler;
5154

5255
private ExceptionHandler exceptionHandler;
5356
private ThreadFactory threadFactory;
@@ -245,4 +248,12 @@ public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFil
245248
public TopologyRecoveryFilter getTopologyRecoveryFilter() {
246249
return topologyRecoveryFilter;
247250
}
251+
252+
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
253+
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
254+
}
255+
256+
public RetryHandler getTopologyRecoveryRetryHandler() {
257+
return topologyRecoveryRetryHandler;
258+
}
248259
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
9090
// be created after application code has initiated shutdown.
9191
private final Object recoveryLock = new Object();
9292

93+
private final RetryHandler retryHandler;
94+
9395
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
9496
this(params, f, new ListAddressResolver(addrs));
9597
}
@@ -109,6 +111,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
109111

110112
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
111113
letAllPassFilter() : params.getTopologyRecoveryFilter();
114+
115+
this.retryHandler = params.getTopologyRecoveryRetryHandler();
112116
}
113117

114118
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -633,6 +637,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
633637
}
634638
}
635639

640+
void recoverChannel(AutorecoveringChannel channel) throws IOException {
641+
channel.automaticallyRecover(this, this.delegate);
642+
}
643+
636644
private void notifyRecoveryListenersComplete() {
637645
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
638646
f.handleRecovery(this);
@@ -654,16 +662,16 @@ private void recoverTopology(final ExecutorService executor) {
654662
if (executor == null) {
655663
// recover entities in serial on the main connection thread
656664
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
657-
recoverExchange(exchange);
665+
recoverExchange(exchange, true);
658666
}
659667
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
660-
recoverQueue(entry.getKey(), entry.getValue());
668+
recoverQueue(entry.getKey(), entry.getValue(), true);
661669
}
662670
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
663-
recoverBinding(b);
671+
recoverBinding(b, true);
664672
}
665673
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
666-
recoverConsumer(entry.getKey(), entry.getValue());
674+
recoverConsumer(entry.getKey(), entry.getValue(), true);
667675
}
668676
} else {
669677
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
@@ -683,11 +691,22 @@ private void recoverTopology(final ExecutorService executor) {
683691
}
684692
}
685693

686-
private void recoverExchange(final RecordedExchange x) {
694+
private void recoverExchange(RecordedExchange x, boolean retry) {
687695
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
688696
try {
689697
if (topologyRecoveryFilter.filterExchange(x)) {
690-
x.recover();
698+
if (retry) {
699+
final RecordedExchange entity = x;
700+
x = (RecordedExchange) wrapRetryIfNecessary(x, new Callable<Void>() {
701+
@Override
702+
public Void call() throws Exception {
703+
entity.recover();
704+
return null;
705+
}
706+
}).getRecordedEntity();
707+
} else {
708+
x.recover();
709+
}
691710
LOGGER.debug("{} has recovered", x);
692711
}
693712
} catch (Exception cause) {
@@ -698,12 +717,23 @@ private void recoverExchange(final RecordedExchange x) {
698717
}
699718
}
700719

701-
private void recoverQueue(final String oldName, final RecordedQueue q) {
702720

721+
void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
703722
try {
704723
if (topologyRecoveryFilter.filterQueue(q)) {
705724
LOGGER.debug("Recovering {}", q);
706-
q.recover();
725+
if (retry) {
726+
final RecordedQueue entity = q;
727+
q = (RecordedQueue) wrapRetryIfNecessary(q, new Callable<Void>() {
728+
@Override
729+
public Void call() throws Exception {
730+
entity.recover();
731+
return null;
732+
}
733+
}).getRecordedEntity();
734+
} else {
735+
q.recover();
736+
}
707737
String newName = q.getName();
708738
if (!oldName.equals(newName)) {
709739
// make sure server-named queues are re-added with
@@ -734,10 +764,21 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
734764
}
735765
}
736766

737-
private void recoverBinding(final RecordedBinding b) {
767+
private void recoverBinding(RecordedBinding b, boolean retry) {
738768
try {
739769
if (this.topologyRecoveryFilter.filterBinding(b)) {
740-
b.recover();
770+
if (retry) {
771+
final RecordedBinding entity = b;
772+
b = (RecordedBinding) wrapRetryIfNecessary(b, new Callable<Void>() {
773+
@Override
774+
public Void call() throws Exception {
775+
entity.recover();
776+
return null;
777+
}
778+
}).getRecordedEntity();
779+
} else {
780+
b.recover();
781+
}
741782
LOGGER.debug("{} has recovered", b);
742783
}
743784
} catch (Exception cause) {
@@ -748,11 +789,25 @@ private void recoverBinding(final RecordedBinding b) {
748789
}
749790
}
750791

751-
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
792+
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
752793
try {
753794
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
754795
LOGGER.debug("Recovering {}", consumer);
755-
String newTag = consumer.recover();
796+
String newTag = null;
797+
if (retry) {
798+
final RecordedConsumer entity = consumer;
799+
RetryResult retryResult = wrapRetryIfNecessary(consumer, new Callable<String>() {
800+
@Override
801+
public String call() throws Exception {
802+
return entity.recover();
803+
}
804+
});
805+
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
806+
newTag = (String) retryResult.getResult();
807+
} else {
808+
newTag = consumer.recover();
809+
}
810+
756811
// make sure server-generated tags are re-added. MK.
757812
if(tag != null && !tag.equals(newTag)) {
758813
synchronized (this.consumers) {
@@ -775,6 +830,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer)
775830
}
776831
}
777832

833+
private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
834+
if (this.retryHandler == null) {
835+
T result = recoveryAction.call();
836+
return new RetryResult(entity, result);
837+
} else {
838+
try {
839+
T result = recoveryAction.call();
840+
return new RetryResult(entity, result);
841+
} catch (Exception e) {
842+
RetryContext retryContext = new RetryContext(entity, e, this);
843+
RetryResult retryResult;
844+
if (entity instanceof RecordedQueue) {
845+
retryResult = this.retryHandler.retryQueueRecovery(retryContext);
846+
} else if (entity instanceof RecordedExchange) {
847+
retryResult = this.retryHandler.retryExchangeRecovery(retryContext);
848+
} else if (entity instanceof RecordedBinding) {
849+
retryResult = this.retryHandler.retryBindingRecovery(retryContext);
850+
} else if (entity instanceof RecordedConsumer) {
851+
retryResult = this.retryHandler.retryConsumerRecovery(retryContext);
852+
} else {
853+
throw new IllegalArgumentException("Unknown type of recorded entity: " + entity);
854+
}
855+
return retryResult;
856+
}
857+
}
858+
}
859+
778860
private void propagateQueueNameChangeToBindings(String oldName, String newName) {
779861
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
780862
if (b.getDestination().equals(oldName)) {
@@ -825,15 +907,15 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
825907
public void run() {
826908
for (final E entity : entityList) {
827909
if (entity instanceof RecordedExchange) {
828-
recoverExchange((RecordedExchange)entity);
910+
recoverExchange((RecordedExchange)entity, true);
829911
} else if (entity instanceof RecordedQueue) {
830912
final RecordedQueue q = (RecordedQueue) entity;
831-
recoverQueue(q.getName(), q);
913+
recoverQueue(q.getName(), q, true);
832914
} else if (entity instanceof RecordedBinding) {
833-
recoverBinding((RecordedBinding) entity);
915+
recoverBinding((RecordedBinding) entity, true);
834916
} else if (entity instanceof RecordedConsumer) {
835917
final RecordedConsumer c = (RecordedConsumer) entity;
836-
recoverConsumer(c.getConsumerTag(), c);
918+
recoverConsumer(c.getConsumerTag(), c, true);
837919
}
838920
}
839921
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl.recovery;
17+
18+
/**
19+
* Backoff policy for topology recovery retry attempts.
20+
*
21+
* @see DefaultRetryHandler
22+
* @see TopologyRecoveryRetryHandlerBuilder
23+
* @since 4.8.0
24+
*/
25+
public interface BackoffPolicy {
26+
27+
/**
28+
* Wait depending on the current attempt number (1, 2, 3, etc)
29+
* @param attemptNumber current attempt number
30+
* @throws InterruptedException
31+
*/
32+
void backoff(int attemptNumber) throws InterruptedException;
33+
}

0 commit comments

Comments
 (0)