Skip to content

Commit eb18ca4

Browse files
committed
Call consumer shutdown callback even if close times out
References #1651 (cherry picked from commit f9b69f3)
1 parent 9d0284a commit eb18ca4

File tree

4 files changed

+45
-19
lines changed

4 files changed

+45
-19
lines changed

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.CopyOnWriteArrayList;
3434
import java.util.concurrent.CountDownLatch;
3535
import java.util.concurrent.TimeoutException;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3637

3738
/**
3839
* Main interface to AMQP protocol functionality. Public API -
@@ -609,10 +610,13 @@ protected void close(int closeCode,
609610
signal.initCause(cause);
610611
}
611612

613+
AtomicBoolean finishProcessShutdownSignalCalled = new AtomicBoolean(false);
612614
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
613615
@Override
614616
public AMQCommand transformReply(AMQCommand command) {
615-
ChannelN.this.finishProcessShutdownSignal();
617+
if (finishProcessShutdownSignalCalled.compareAndSet(false, true)) {
618+
ChannelN.this.finishProcessShutdownSignal();
619+
}
616620
return command;
617621
}};
618622
boolean notify = false;
@@ -643,6 +647,13 @@ public AMQCommand transformReply(AMQCommand command) {
643647
if (!abort)
644648
throw ioe;
645649
} finally {
650+
if (finishProcessShutdownSignalCalled.compareAndSet(false, true)) {
651+
try {
652+
ChannelN.this.finishProcessShutdownSignal();
653+
} catch (Exception e) {
654+
LOGGER.info("Error while processing shutdown signal: {}", e.getMessage());
655+
}
656+
}
646657
if (abort || notify) {
647658
// Now we know everything's been cleaned up and there should
648659
// be no more surprises arriving on the wire. Release the

src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception {
7777
CountDownLatch blockedLatch = new CountDownLatch(1);
7878
c.addBlockedListener(reason -> blockedLatch.countDown(), () -> {});
7979
Channel ch = c.createChannel();
80+
String q = ch.queueDeclare().getQueue();
81+
CountDownLatch consShutdownLatch = new CountDownLatch(1);
82+
ch.basicConsume(q, (ctag, msg) -> { }, (ctag, r) -> consShutdownLatch.countDown());
8083
CountDownLatch chShutdownLatch = new CountDownLatch(1);
8184
ch.addShutdownListener(cause -> chShutdownLatch.countDown());
8285
ch.confirmSelect();
@@ -89,6 +92,7 @@ void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception {
8992
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
9093
assertThatThrownBy(() -> ch.waitForConfirmsOrDie(confirmTimeout))
9194
.isInstanceOf(TimeoutException.class);
95+
assertThat(consShutdownLatch).is(completed());
9296
assertThat(chShutdownLatch).is(completed());
9397
} finally {
9498
if (blocked) {

src/test/java/com/rabbitmq/client/test/functional/ConsumerCancelNotification.java renamed to src/test/java/com/rabbitmq/client/test/functional/ConsumerNotifications.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@
2828
import java.util.concurrent.CountDownLatch;
2929
import java.util.concurrent.TimeUnit;
3030

31+
import static org.assertj.core.api.Assertions.assertThat;
3132
import static org.junit.jupiter.api.Assertions.assertTrue;
3233
import static org.junit.jupiter.api.Assertions.fail;
3334

34-
public class ConsumerCancelNotification extends BrokerTestCase {
35+
public class ConsumerNotifications extends BrokerTestCase {
3536

3637
private final String queue = "cancel_notification_queue";
3738

@@ -42,7 +43,7 @@ public class ConsumerCancelNotification extends BrokerTestCase {
4243
channel.queueDeclare(queue, false, true, false, null);
4344
Consumer consumer = new DefaultConsumer(channel) {
4445
@Override
45-
public void handleCancel(String consumerTag) throws IOException {
46+
public void handleCancel(String consumerTag) {
4647
try {
4748
result.put(true);
4849
} catch (InterruptedException e) {
@@ -55,7 +56,31 @@ public void handleCancel(String consumerTag) throws IOException {
5556
assertTrue(result.take());
5657
}
5758

58-
class AlteringConsumer extends DefaultConsumer {
59+
@Test public void consumerCancellationHandlerUsesBlockingOperations()
60+
throws IOException, InterruptedException {
61+
final String altQueue = "basic.cancel.fallback";
62+
channel.queueDeclare(queue, false, true, false, null);
63+
64+
CountDownLatch latch = new CountDownLatch(1);
65+
final AlteringConsumer consumer = new AlteringConsumer(channel, altQueue, latch);
66+
67+
channel.basicConsume(queue, consumer);
68+
channel.queueDelete(queue);
69+
70+
latch.await(2, TimeUnit.SECONDS);
71+
}
72+
73+
@Test
74+
void handleShutdownShouldBeCalledWhenChannelIsClosed() throws Exception {
75+
Channel ch = connection.createChannel();
76+
String q = ch.queueDeclare().getQueue();
77+
CountDownLatch latch = new CountDownLatch(1);
78+
ch.basicConsume(q, (ctag, msg) -> {}, (ctag, r) -> latch.countDown());
79+
ch.close();
80+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
81+
}
82+
83+
private static class AlteringConsumer extends DefaultConsumer {
5984
private final String altQueue;
6085
private final CountDownLatch latch;
6186

@@ -81,18 +106,4 @@ public void handleCancel(String consumerTag) {
81106
}
82107
}
83108
}
84-
85-
@Test public void consumerCancellationHandlerUsesBlockingOperations()
86-
throws IOException, InterruptedException {
87-
final String altQueue = "basic.cancel.fallback";
88-
channel.queueDeclare(queue, false, true, false, null);
89-
90-
CountDownLatch latch = new CountDownLatch(1);
91-
final AlteringConsumer consumer = new AlteringConsumer(channel, altQueue, latch);
92-
93-
channel.basicConsume(queue, consumer);
94-
channel.queueDelete(queue);
95-
96-
latch.await(2, TimeUnit.SECONDS);
97-
}
98109
}

src/test/java/com/rabbitmq/client/test/functional/FunctionalTestSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
DefaultExchange.class,
5454
UnbindAutoDeleteExchange.class,
5555
Confirm.class,
56-
ConsumerCancelNotification.class,
56+
ConsumerNotifications.class,
5757
UnexpectedFrames.class,
5858
PerQueueTTL.class,
5959
PerMessageTTL.class,

0 commit comments

Comments
 (0)