Skip to content

Commit c5e63f0

Browse files
Recover channel shutdown hooks
1 parent d0a770e commit c5e63f0

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
public class AutorecoveringChannel implements Channel, Recoverable {
3131
private RecoveryAwareChannelN delegate;
3232
private AutorecoveringConnection connection;
33+
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>();
3334
private List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>();
3435
private List<ReturnListener> returnListeners = new ArrayList<ReturnListener>();
3536
private List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>();
@@ -379,11 +380,16 @@ public Command rpc(Method method) throws IOException {
379380
return delegate.rpc(method);
380381
}
381382

383+
/**
384+
* @see Connection#addShutdownListener(com.rabbitmq.client.ShutdownListener)
385+
*/
382386
public void addShutdownListener(ShutdownListener listener) {
387+
this.shutdownHooks.add(listener);
383388
delegate.addShutdownListener(listener);
384389
}
385390

386391
public void removeShutdownListener(ShutdownListener listener) {
392+
this.shutdownHooks.remove(listener);
387393
delegate.removeShutdownListener(listener);
388394
}
389395

@@ -417,13 +423,20 @@ public void automaticallyRecover(AutorecoveringConnection connection, Connection
417423
this.delegate = (RecoveryAwareChannelN) connDelegate.createChannel(this.getChannelNumber());
418424
this.delegate.inheritOffsetFrom(defunctChannel);
419425

426+
this.recoverShutdownListeners();
420427
this.recoverReturnListeners();
421428
this.recoverConfirmListeners();
422429
this.recoverFlowListeners();
423430
this.recoverState();
424431
this.notifyRecoveryListeners();
425432
}
426433

434+
private void recoverShutdownListeners() {
435+
for (ShutdownListener sh : this.shutdownHooks) {
436+
this.delegate.addShutdownListener(sh);
437+
}
438+
}
439+
427440
private void recoverReturnListeners() {
428441
for(ReturnListener rl : this.returnListeners) {
429442
this.delegate.addReturnListener(rl);

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testConnectionRecoveryWithDisabledTopologyRecovery() throws IOExcept
6868
}
6969
}
7070

71-
public void testShutdownHooksRecovery() throws IOException, InterruptedException {
71+
public void testShutdownHooksRecoveryOnConnection() throws IOException, InterruptedException {
7272
final CountDownLatch latch = new CountDownLatch(2);
7373
connection.addShutdownListener(new ShutdownListener() {
7474
public void shutdownCompleted(ShutdownSignalException cause) {
@@ -82,6 +82,22 @@ public void shutdownCompleted(ShutdownSignalException cause) {
8282
wait(latch);
8383
}
8484

85+
public void testShutdownHooksRecoveryOnChannel() throws IOException, InterruptedException {
86+
final CountDownLatch latch = new CountDownLatch(3);
87+
channel.addShutdownListener(new ShutdownListener() {
88+
public void shutdownCompleted(ShutdownSignalException cause) {
89+
latch.countDown();
90+
}
91+
});
92+
assertTrue(connection.isOpen());
93+
closeAndWaitForRecovery();
94+
assertTrue(connection.isOpen());
95+
closeAndWaitForRecovery();
96+
assertTrue(connection.isOpen());
97+
connection.close();
98+
wait(latch);
99+
}
100+
85101
public void testBlockedListenerRecovery() throws IOException, InterruptedException {
86102
final CountDownLatch latch = new CountDownLatch(2);
87103
connection.addBlockedListener(new BlockedListener() {

0 commit comments

Comments
 (0)