diff --git a/src/main/java/com/rabbitmq/client/RecoveryListener.java b/src/main/java/com/rabbitmq/client/RecoveryListener.java index a3374414e2..4caf77e323 100644 --- a/src/main/java/com/rabbitmq/client/RecoveryListener.java +++ b/src/main/java/com/rabbitmq/client/RecoveryListener.java @@ -36,4 +36,12 @@ public interface RecoveryListener { * @param recoverable a {@link Recoverable} connection. */ void handleRecoveryStarted(Recoverable recoverable); + + /** + * Invoked before automatic topology recovery starts. + * This means that the connection and channel recovery has completed + * and that exchange/queue/binding/consumer recovery is about to begin. + * @param recoverable a {@link Recoverable} connection. + */ + default void handleTopologyRecoveryStarted(Recoverable recoverable) {} } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 94b24aa7c2..1886b2ec1a 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -560,7 +560,10 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) { } private synchronized void beginAutomaticRecovery() throws InterruptedException { - this.wait(this.params.getRecoveryDelayHandler().getDelay(0)); + final long delay = this.params.getRecoveryDelayHandler().getDelay(0); + if (delay > 0) { + this.wait(delay); + } this.notifyRecoveryListenersStarted(); @@ -576,6 +579,7 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException { // don't assign new delegate connection until channel recovery is complete this.delegate = newConn; if (this.params.isTopologyRecoveryEnabled()) { + notifyTopologyRecoveryListenersStarted(); recoverTopology(params.getTopologyRecoveryExecutor()); } this.notifyRecoveryListenersComplete(); @@ -650,6 +654,12 @@ private void notifyRecoveryListenersStarted() { } } + private void notifyTopologyRecoveryListenersStarted() { + for (RecoveryListener f : Utility.copy(this.recoveryListeners)) { + f.handleTopologyRecoveryStarted(this); + } + } + private void recoverTopology(final ExecutorService executor) { // The recovery sequence is the following: // 1. Recover exchanges diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index ac43c15faf..e8e66d2ac7 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -170,7 +170,7 @@ public String getPassword() { // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135 @Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException { final List events = new CopyOnWriteArrayList(); - final CountDownLatch latch = new CountDownLatch(2); // one when started, another when complete + final CountDownLatch latch = new CountDownLatch(3); // one when started, another when complete connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { @@ -202,6 +202,10 @@ public void handleRecovery(Recoverable recoverable) { public void handleRecoveryStarted(Recoverable recoverable) { latch.countDown(); } + @Override + public void handleTopologyRecoveryStarted(Recoverable recoverable) { + latch.countDown(); + } }); assertThat(connection.isOpen()).isTrue(); closeAndWaitForRecovery();