Skip to content

Commit e460887

Browse files
committed
Add optional retry logic to topology recovery
There's no topology recovery retry by default. The default implementation is composable: not all have the recoverable entities have to retry and the retry operations don't have to be only the corresponding entity recovery, but also other operations, like recovering the corresponding channel. Fixes #387 (cherry picked from commit 34e33ea)
1 parent 60961f6 commit e460887

18 files changed

+1160
-155
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.client.impl.nio.NioParams;
3030
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
3131
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
32+
import com.rabbitmq.client.impl.recovery.RetryHandler;
3233
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3334

3435
import java.io.IOException;
@@ -192,6 +193,13 @@ public class ConnectionFactory implements Cloneable {
192193
*/
193194
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
194195

196+
/**
197+
* Retry handler for topology recovery.
198+
* Default is no retry.
199+
* @since 5.4.0
200+
*/
201+
private RetryHandler topologyRecoveryRetryHandler;
202+
195203
/** @return the default host to use for connections */
196204
public String getHost() {
197205
return host;
@@ -1087,6 +1095,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10871095
result.setErrorOnWriteListener(errorOnWriteListener);
10881096
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
10891097
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
1098+
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
10901099
return result;
10911100
}
10921101

@@ -1454,4 +1463,13 @@ public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalExc
14541463
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
14551464
}
14561465

1466+
/**
1467+
* Set retry handler for topology recovery.
1468+
* Default is no retry.
1469+
* @param topologyRecoveryRetryHandler
1470+
* @since 5.4.0
1471+
*/
1472+
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
1473+
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
1474+
}
14571475
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
2222
import com.rabbitmq.client.ShutdownSignalException;
23+
import com.rabbitmq.client.impl.recovery.RetryHandler;
2324
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2425

2526
import java.util.Map;
@@ -51,6 +52,7 @@ public class ConnectionParams {
5152
private int workPoolTimeout = -1;
5253
private TopologyRecoveryFilter topologyRecoveryFilter;
5354
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
55+
private RetryHandler topologyRecoveryRetryHandler;
5456

5557
private ExceptionHandler exceptionHandler;
5658
private ThreadFactory threadFactory;
@@ -257,4 +259,11 @@ public Predicate<ShutdownSignalException> getConnectionRecoveryTriggeringConditi
257259
return connectionRecoveryTriggeringCondition;
258260
}
259261

262+
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
263+
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
264+
}
265+
266+
public RetryHandler getTopologyRecoveryRetryHandler() {
267+
return topologyRecoveryRetryHandler;
268+
}
260269
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 87 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
9595

9696
private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
9797

98+
private final RetryHandler retryHandler;
99+
98100
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
99101
this(params, f, new ListAddressResolver(addrs));
100102
}
@@ -115,6 +117,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
115117
this.channels = new ConcurrentHashMap<>();
116118
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
117119
letAllPassFilter() : params.getTopologyRecoveryFilter();
120+
121+
this.retryHandler = params.getTopologyRecoveryRetryHandler();
118122
}
119123

120124
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -125,12 +129,9 @@ private void setupErrorOnWriteListenerForPotentialRecovery() {
125129
// we should trigger the error handling and the recovery only once
126130
if (errorOnWriteLock.tryLock()) {
127131
try {
128-
Thread recoveryThread = threadFactory.newThread(new Runnable() {
129-
@Override
130-
public void run() {
131-
AMQConnection c = (AMQConnection) connection;
132-
c.handleIoError(exception);
133-
}
132+
Thread recoveryThread = threadFactory.newThread(() -> {
133+
AMQConnection c = (AMQConnection) connection;
134+
c.handleIoError(exception);
134135
});
135136
recoveryThread.setName("RabbitMQ Error On Write Thread");
136137
recoveryThread.start();
@@ -630,6 +631,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
630631
}
631632
}
632633

634+
void recoverChannel(AutorecoveringChannel channel) throws IOException {
635+
channel.automaticallyRecover(this, this.delegate);
636+
}
637+
633638
private void notifyRecoveryListenersComplete() {
634639
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
635640
f.handleRecovery(this);
@@ -651,16 +656,16 @@ private void recoverTopology(final ExecutorService executor) {
651656
if (executor == null) {
652657
// recover entities in serial on the main connection thread
653658
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
654-
recoverExchange(exchange);
659+
recoverExchange(exchange, true);
655660
}
656661
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
657-
recoverQueue(entry.getKey(), entry.getValue());
662+
recoverQueue(entry.getKey(), entry.getValue(), true);
658663
}
659664
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
660-
recoverBinding(b);
665+
recoverBinding(b, true);
661666
}
662667
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
663-
recoverConsumer(entry.getKey(), entry.getValue());
668+
recoverConsumer(entry.getKey(), entry.getValue(), true);
664669
}
665670
} else {
666671
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
@@ -680,11 +685,19 @@ private void recoverTopology(final ExecutorService executor) {
680685
}
681686
}
682687

683-
private void recoverExchange(final RecordedExchange x) {
688+
private void recoverExchange(RecordedExchange x, boolean retry) {
684689
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
685690
try {
686691
if (topologyRecoveryFilter.filterExchange(x)) {
687-
x.recover();
692+
if (retry) {
693+
final RecordedExchange entity = x;
694+
x = (RecordedExchange) wrapRetryIfNecessary(x, () -> {
695+
entity.recover();
696+
return null;
697+
}).getRecordedEntity();
698+
} else {
699+
x.recover();
700+
}
688701
LOGGER.debug("{} has recovered", x);
689702
}
690703
} catch (Exception cause) {
@@ -695,12 +708,20 @@ private void recoverExchange(final RecordedExchange x) {
695708
}
696709
}
697710

698-
private void recoverQueue(final String oldName, final RecordedQueue q) {
699711

712+
void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
700713
try {
701714
if (topologyRecoveryFilter.filterQueue(q)) {
702715
LOGGER.debug("Recovering {}", q);
703-
q.recover();
716+
if (retry) {
717+
final RecordedQueue entity = q;
718+
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
719+
entity.recover();
720+
return null;
721+
}).getRecordedEntity();
722+
} else {
723+
q.recover();
724+
}
704725
String newName = q.getName();
705726
if (!oldName.equals(newName)) {
706727
// make sure server-named queues are re-added with
@@ -731,10 +752,18 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
731752
}
732753
}
733754

734-
private void recoverBinding(final RecordedBinding b) {
755+
private void recoverBinding(RecordedBinding b, boolean retry) {
735756
try {
736757
if (this.topologyRecoveryFilter.filterBinding(b)) {
737-
b.recover();
758+
if (retry) {
759+
final RecordedBinding entity = b;
760+
b = (RecordedBinding) wrapRetryIfNecessary(b, () -> {
761+
entity.recover();
762+
return null;
763+
}).getRecordedEntity();
764+
} else {
765+
b.recover();
766+
}
738767
LOGGER.debug("{} has recovered", b);
739768
}
740769
} catch (Exception cause) {
@@ -745,11 +774,20 @@ private void recoverBinding(final RecordedBinding b) {
745774
}
746775
}
747776

748-
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
777+
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
749778
try {
750779
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
751780
LOGGER.debug("Recovering {}", consumer);
752-
String newTag = consumer.recover();
781+
String newTag = null;
782+
if (retry) {
783+
final RecordedConsumer entity = consumer;
784+
RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover());
785+
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
786+
newTag = (String) retryResult.getResult();
787+
} else {
788+
newTag = consumer.recover();
789+
}
790+
753791
// make sure server-generated tags are re-added. MK.
754792
if(tag != null && !tag.equals(newTag)) {
755793
synchronized (this.consumers) {
@@ -772,6 +810,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer)
772810
}
773811
}
774812

813+
private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
814+
if (this.retryHandler == null) {
815+
T result = recoveryAction.call();
816+
return new RetryResult(entity, result);
817+
} else {
818+
try {
819+
T result = recoveryAction.call();
820+
return new RetryResult(entity, result);
821+
} catch (Exception e) {
822+
RetryContext retryContext = new RetryContext(entity, e, this);
823+
RetryResult retryResult;
824+
if (entity instanceof RecordedQueue) {
825+
retryResult = this.retryHandler.retryQueueRecovery(retryContext);
826+
} else if (entity instanceof RecordedExchange) {
827+
retryResult = this.retryHandler.retryExchangeRecovery(retryContext);
828+
} else if (entity instanceof RecordedBinding) {
829+
retryResult = this.retryHandler.retryBindingRecovery(retryContext);
830+
} else if (entity instanceof RecordedConsumer) {
831+
retryResult = this.retryHandler.retryConsumerRecovery(retryContext);
832+
} else {
833+
throw new IllegalArgumentException("Unknown type of recorded entity: " + entity);
834+
}
835+
return retryResult;
836+
}
837+
}
838+
}
839+
775840
private void propagateQueueNameChangeToBindings(String oldName, String newName) {
776841
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
777842
if (b.getDestination().equals(oldName)) {
@@ -820,15 +885,15 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
820885
callables.add(Executors.callable(() -> {
821886
for (final E entity : entityList) {
822887
if (entity instanceof RecordedExchange) {
823-
recoverExchange((RecordedExchange)entity);
888+
recoverExchange((RecordedExchange)entity, true);
824889
} else if (entity instanceof RecordedQueue) {
825890
final RecordedQueue q = (RecordedQueue) entity;
826-
recoverQueue(q.getName(), q);
891+
recoverQueue(q.getName(), q, true);
827892
} else if (entity instanceof RecordedBinding) {
828-
recoverBinding((RecordedBinding) entity);
893+
recoverBinding((RecordedBinding) entity, true);
829894
} else if (entity instanceof RecordedConsumer) {
830895
final RecordedConsumer c = (RecordedConsumer) entity;
831-
recoverConsumer(c.getConsumerTag(), c);
896+
recoverConsumer(c.getConsumerTag(), c, true);
832897
}
833898
}
834899
}));
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl.recovery;
17+
18+
/**
19+
* Backoff policy for topology recovery retry attempts.
20+
*
21+
* @see DefaultRetryHandler
22+
* @see TopologyRecoveryRetryHandlerBuilder
23+
* @since 5.4.0
24+
*/
25+
@FunctionalInterface
26+
public interface BackoffPolicy {
27+
28+
/**
29+
* Wait depending on the current attempt number (1, 2, 3, etc)
30+
* @param attemptNumber current attempt number
31+
* @throws InterruptedException
32+
*/
33+
void backoff(int attemptNumber) throws InterruptedException;
34+
}

0 commit comments

Comments
 (0)