Skip to content

Commit d292064

Browse files
committed
Add test for blocked connection
And make sure channel shutdown listener is called.
1 parent 1c4a40c commit d292064

File tree

5 files changed

+61
-34
lines changed

5 files changed

+61
-34
lines changed

src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom
2+
// Inc. and/or its subsidiaries.
23
//
34
// This software, the RabbitMQ Java client library, is triple-licensed under the
45
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -18,11 +19,27 @@
1819
import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
1920
import static com.rabbitmq.client.test.TestUtils.waitAtMost;
2021
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
import static org.assertj.core.api.Assertions.fail;
2124

2225
import com.rabbitmq.client.Channel;
2326
import com.rabbitmq.client.Connection;
2427
import com.rabbitmq.client.ConnectionFactory;
28+
29+
import java.io.IOException;
30+
import java.time.Duration;
2531
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.TimeoutException;
34+
35+
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
36+
import com.rabbitmq.client.DeliverCallback;
37+
import com.rabbitmq.client.Delivery;
38+
import com.rabbitmq.client.MessageProperties;
39+
import com.rabbitmq.client.ShutdownListener;
40+
import com.rabbitmq.client.ShutdownSignalException;
41+
import org.assertj.core.api.Assertions;
42+
import org.junit.jupiter.api.Test;
2643
import org.junit.jupiter.params.ParameterizedTest;
2744
import org.junit.jupiter.params.provider.ValueSource;
2845

@@ -59,4 +76,34 @@ void errorInBlockListenerShouldCloseConnection(boolean nio) throws Exception {
5976
waitAtMost(() -> !c.isOpen());
6077
}
6178

79+
@Test
80+
void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception {
81+
long confirmTimeout = Duration.ofSeconds(2).toMillis();
82+
ConnectionFactory cf = TestUtils.connectionFactory();
83+
Connection c = cf.newConnection();
84+
boolean blocked = false;
85+
try {
86+
CountDownLatch blockedLatch = new CountDownLatch(1);
87+
c.addBlockedListener(reason -> blockedLatch.countDown(), () -> {});
88+
Channel ch = c.createChannel();
89+
CountDownLatch chShutdownLatch = new CountDownLatch(1);
90+
ch.addShutdownListener(cause -> chShutdownLatch.countDown());
91+
ch.confirmSelect();
92+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
93+
ch.waitForConfirmsOrDie(confirmTimeout);
94+
block();
95+
blocked = true;
96+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
97+
assertThat(blockedLatch).is(completed());
98+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
99+
assertThatThrownBy(() -> ch.waitForConfirmsOrDie(confirmTimeout))
100+
.isInstanceOf(TimeoutException.class);
101+
assertThat(chShutdownLatch).is(completed());
102+
} finally {
103+
if (blocked) {
104+
unblock();
105+
}
106+
c.close();
107+
}
108+
}
62109
}

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,12 @@ protected void clearResourceAlarm(String source) throws IOException {
315315
Host.clearResourceAlarm(source);
316316
}
317317

318-
protected void block() throws IOException, InterruptedException {
318+
protected void block() throws IOException {
319319
Host.rabbitmqctl("set_vm_memory_high_watermark 0.000000001");
320320
setResourceAlarm("disk");
321321
}
322322

323-
protected void unblock() throws IOException, InterruptedException {
323+
protected void unblock() throws IOException {
324324
Host.rabbitmqctl("set_vm_memory_high_watermark 0.4");
325325
clearResourceAlarm("disk");
326326
}

src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@ protected void createResources() throws IOException, TimeoutException {
3939
@Override
4040
protected void releaseResources() throws IOException {
4141
channel.queueDelete(queue);
42-
try {
43-
unblock();
44-
} catch (InterruptedException e) {
45-
e.printStackTrace();
46-
}
42+
unblock();
4743
}
4844

4945
@Test public void shutdownListener() throws Exception {
@@ -78,13 +74,7 @@ protected void releaseResources() throws IOException {
7874
final CountDownLatch latch = new CountDownLatch(1);
7975
try(Connection connection = TestUtils.connectionFactory().newConnection()) {
8076
connection.addBlockedListener(
81-
reason -> {
82-
try {
83-
unblock();
84-
} catch (InterruptedException e) {
85-
e.printStackTrace();
86-
}
87-
},
77+
reason -> unblock(),
8878
() -> latch.countDown()
8979
);
9080
block();

src/test/java/com/rabbitmq/client/test/server/BlockedConnection.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@
3535

3636
public class BlockedConnection extends BrokerTestCase {
3737
protected void releaseResources() throws IOException {
38-
try {
39-
unblock();
40-
} catch (InterruptedException e) {
41-
e.printStackTrace();
42-
}
38+
unblock();
4339
}
4440
// this test first opens a connection, then triggers
4541
// and alarm and blocks
@@ -79,14 +75,10 @@ private Connection connection(final CountDownLatch latch) throws IOException, Ti
7975
Connection connection = factory.newConnection();
8076
connection.addBlockedListener(new BlockedListener() {
8177
public void handleBlocked(String reason) throws IOException {
82-
try {
83-
unblock();
84-
} catch (InterruptedException e) {
85-
e.printStackTrace();
86-
}
78+
unblock();
8779
}
8880

89-
public void handleUnblocked() throws IOException {
81+
public void handleUnblocked() {
9082
latch.countDown();
9183
}
9284
});

src/test/java/com/rabbitmq/tools/Host.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,7 @@ public static String node_portB()
254254
}
255255

256256
public static String rabbitmqctlCommand() {
257-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
258-
if (rabbitmqCtl == null) {
259-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
260-
}
257+
String rabbitmqCtl = rabbitmqctl();
261258
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
262259
String containerId = rabbitmqCtl.split(":")[1];
263260
return "docker exec " + containerId + " rabbitmqctl";
@@ -266,11 +263,12 @@ public static String rabbitmqctlCommand() {
266263
}
267264
}
268265

266+
private static String rabbitmqctl() {
267+
return System.getProperty("rabbitmqctl.bin", "DOCKER:rabbitmq");
268+
}
269+
269270
public static boolean isOnDocker() {
270-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
271-
if (rabbitmqCtl == null) {
272-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
273-
}
271+
String rabbitmqCtl = rabbitmqctl();
274272
return rabbitmqCtl.startsWith(DOCKER_PREFIX);
275273
}
276274

0 commit comments

Comments
 (0)