Skip to content

Commit 9d0284a

Browse files
committed
Add test for blocked connection
And make sure channel shutdown listener is called. (cherry picked from commit d292064) Conflicts: src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java
1 parent 4bb5343 commit 9d0284a

File tree

5 files changed

+52
-34
lines changed

5 files changed

+52
-34
lines changed

src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom
2+
// Inc. and/or its subsidiaries.
23
//
34
// This software, the RabbitMQ Java client library, is triple-licensed under the
45
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -18,11 +19,18 @@
1819
import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
1920
import static com.rabbitmq.client.test.TestUtils.waitAtMost;
2021
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2123

2224
import com.rabbitmq.client.Channel;
2325
import com.rabbitmq.client.Connection;
2426
import com.rabbitmq.client.ConnectionFactory;
27+
28+
import java.time.Duration;
2529
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeoutException;
31+
32+
import com.rabbitmq.client.MessageProperties;
33+
import org.junit.jupiter.api.Test;
2634
import org.junit.jupiter.params.ParameterizedTest;
2735
import org.junit.jupiter.params.provider.ValueSource;
2836

@@ -59,4 +67,34 @@ void errorInBlockListenerShouldCloseConnection(boolean nio) throws Exception {
5967
waitAtMost(() -> !c.isOpen());
6068
}
6169

70+
@Test
71+
void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception {
72+
long confirmTimeout = Duration.ofSeconds(2).toMillis();
73+
ConnectionFactory cf = TestUtils.connectionFactory();
74+
Connection c = cf.newConnection();
75+
boolean blocked = false;
76+
try {
77+
CountDownLatch blockedLatch = new CountDownLatch(1);
78+
c.addBlockedListener(reason -> blockedLatch.countDown(), () -> {});
79+
Channel ch = c.createChannel();
80+
CountDownLatch chShutdownLatch = new CountDownLatch(1);
81+
ch.addShutdownListener(cause -> chShutdownLatch.countDown());
82+
ch.confirmSelect();
83+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
84+
ch.waitForConfirmsOrDie(confirmTimeout);
85+
block();
86+
blocked = true;
87+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
88+
assertThat(blockedLatch).is(completed());
89+
ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes());
90+
assertThatThrownBy(() -> ch.waitForConfirmsOrDie(confirmTimeout))
91+
.isInstanceOf(TimeoutException.class);
92+
assertThat(chShutdownLatch).is(completed());
93+
} finally {
94+
if (blocked) {
95+
unblock();
96+
}
97+
c.close();
98+
}
99+
}
62100
}

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,12 @@ protected void clearResourceAlarm(String source) throws IOException {
315315
Host.clearResourceAlarm(source);
316316
}
317317

318-
protected void block() throws IOException, InterruptedException {
318+
protected void block() throws IOException {
319319
Host.rabbitmqctl("set_vm_memory_high_watermark 0.000000001");
320320
setResourceAlarm("disk");
321321
}
322322

323-
protected void unblock() throws IOException, InterruptedException {
323+
protected void unblock() throws IOException {
324324
Host.rabbitmqctl("set_vm_memory_high_watermark 0.4");
325325
clearResourceAlarm("disk");
326326
}

src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@ protected void createResources() throws IOException, TimeoutException {
3939
@Override
4040
protected void releaseResources() throws IOException {
4141
channel.queueDelete(queue);
42-
try {
43-
unblock();
44-
} catch (InterruptedException e) {
45-
e.printStackTrace();
46-
}
42+
unblock();
4743
}
4844

4945
@Test public void shutdownListener() throws Exception {
@@ -78,13 +74,7 @@ protected void releaseResources() throws IOException {
7874
final CountDownLatch latch = new CountDownLatch(1);
7975
try(Connection connection = TestUtils.connectionFactory().newConnection()) {
8076
connection.addBlockedListener(
81-
reason -> {
82-
try {
83-
unblock();
84-
} catch (InterruptedException e) {
85-
e.printStackTrace();
86-
}
87-
},
77+
reason -> unblock(),
8878
() -> latch.countDown()
8979
);
9080
block();

src/test/java/com/rabbitmq/client/test/server/BlockedConnection.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@
3535

3636
public class BlockedConnection extends BrokerTestCase {
3737
protected void releaseResources() throws IOException {
38-
try {
39-
unblock();
40-
} catch (InterruptedException e) {
41-
e.printStackTrace();
42-
}
38+
unblock();
4339
}
4440
// this test first opens a connection, then triggers
4541
// and alarm and blocks
@@ -79,14 +75,10 @@ private Connection connection(final CountDownLatch latch) throws IOException, Ti
7975
Connection connection = factory.newConnection();
8076
connection.addBlockedListener(new BlockedListener() {
8177
public void handleBlocked(String reason) throws IOException {
82-
try {
83-
unblock();
84-
} catch (InterruptedException e) {
85-
e.printStackTrace();
86-
}
78+
unblock();
8779
}
8880

89-
public void handleUnblocked() throws IOException {
81+
public void handleUnblocked() {
9082
latch.countDown();
9183
}
9284
});

src/test/java/com/rabbitmq/tools/Host.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,7 @@ public static String node_portB()
254254
}
255255

256256
public static String rabbitmqctlCommand() {
257-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
258-
if (rabbitmqCtl == null) {
259-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
260-
}
257+
String rabbitmqCtl = rabbitmqctl();
261258
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
262259
String containerId = rabbitmqCtl.split(":")[1];
263260
return "docker exec " + containerId + " rabbitmqctl";
@@ -266,11 +263,12 @@ public static String rabbitmqctlCommand() {
266263
}
267264
}
268265

266+
private static String rabbitmqctl() {
267+
return System.getProperty("rabbitmqctl.bin", "DOCKER:rabbitmq");
268+
}
269+
269270
public static boolean isOnDocker() {
270-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
271-
if (rabbitmqCtl == null) {
272-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
273-
}
271+
String rabbitmqCtl = rabbitmqctl();
274272
return rabbitmqCtl.startsWith(DOCKER_PREFIX);
275273
}
276274

0 commit comments

Comments
 (0)