|
1 |
| -// Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. |
| 1 | +// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. |
2 | 2 | //
|
3 | 3 | // This software, the RabbitMQ Java client library, is triple-licensed under the
|
4 | 4 | // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
|
|
15 | 15 |
|
16 | 16 | package com.rabbitmq.client.test.ssl;
|
17 | 17 |
|
18 |
| -import com.rabbitmq.client.*; |
| 18 | +import static com.rabbitmq.client.test.TestUtils.basicGetBasicConsume; |
| 19 | +import static org.assertj.core.api.Assertions.assertThat; |
| 20 | +import static org.junit.Assert.assertTrue; |
| 21 | +import static org.junit.Assert.fail; |
| 22 | + |
| 23 | +import com.rabbitmq.client.Connection; |
| 24 | +import com.rabbitmq.client.ConnectionFactory; |
| 25 | +import com.rabbitmq.client.TrustEverythingTrustManager; |
19 | 26 | import com.rabbitmq.client.impl.nio.NioParams;
|
20 | 27 | import com.rabbitmq.client.test.BrokerTestCase;
|
21 | 28 | import com.rabbitmq.client.test.TestUtils;
|
| 29 | +import java.io.IOException; |
| 30 | +import java.net.InetSocketAddress; |
| 31 | +import java.net.SocketTimeoutException; |
22 | 32 | import java.util.Collection;
|
| 33 | +import java.util.concurrent.CountDownLatch; |
| 34 | +import java.util.concurrent.ExecutorService; |
| 35 | +import java.util.concurrent.Executors; |
| 36 | +import java.util.concurrent.TimeUnit; |
| 37 | +import java.util.concurrent.TimeoutException; |
| 38 | +import java.util.concurrent.atomic.AtomicBoolean; |
23 | 39 | import java.util.concurrent.atomic.AtomicReference;
|
24 | 40 | import java.util.stream.Collectors;
|
25 | 41 | import java.util.stream.Stream;
|
26 | 42 | import javax.net.ssl.SSLContext;
|
| 43 | +import javax.net.ssl.SSLEngine; |
27 | 44 | import javax.net.ssl.TrustManager;
|
28 | 45 | import org.junit.Test;
|
| 46 | +import org.netcrusher.core.reactor.NioReactor; |
| 47 | +import org.netcrusher.tcp.TcpCrusher; |
| 48 | +import org.netcrusher.tcp.TcpCrusherBuilder; |
29 | 49 | import org.slf4j.LoggerFactory;
|
30 | 50 |
|
31 |
| -import javax.net.ssl.SSLEngine; |
32 |
| -import java.io.IOException; |
33 |
| -import java.util.concurrent.CountDownLatch; |
34 |
| -import java.util.concurrent.TimeUnit; |
35 |
| -import java.util.concurrent.TimeoutException; |
36 |
| -import java.util.concurrent.atomic.AtomicBoolean; |
37 |
| - |
38 |
| -import static com.rabbitmq.client.test.TestUtils.basicGetBasicConsume; |
39 |
| -import static org.assertj.core.api.Assertions.assertThat; |
40 |
| -import static org.junit.Assert.assertTrue; |
41 |
| -import static org.junit.Assert.fail; |
42 |
| - |
43 | 51 | /**
|
44 | 52 | *
|
45 | 53 | */
|
@@ -149,6 +157,59 @@ public void connectionGetConsumeProtocols() throws Exception {
|
149 | 157 | }
|
150 | 158 | }
|
151 | 159 |
|
| 160 | + @Test |
| 161 | + public void connectionShouldEnforceConnectionTimeout() throws Exception { |
| 162 | + int amqpPort = 5671; // assumes RabbitMQ server running on localhost; |
| 163 | + int amqpProxyPort = TestUtils.randomNetworkPort(); |
| 164 | + |
| 165 | + int connectionTimeout = 3_000; |
| 166 | + int handshakeTimeout = 1_000; |
| 167 | + |
| 168 | + try (NioReactor reactor = new NioReactor(); |
| 169 | + TcpCrusher tcpProxy = |
| 170 | + TcpCrusherBuilder.builder() |
| 171 | + .withReactor(reactor) |
| 172 | + .withBindAddress(new InetSocketAddress(amqpProxyPort)) |
| 173 | + .withConnectAddress("localhost", amqpPort) |
| 174 | + .build()) { |
| 175 | + |
| 176 | + tcpProxy.open(); |
| 177 | + tcpProxy.freeze(); |
| 178 | + |
| 179 | + ConnectionFactory factory = new ConnectionFactory(); |
| 180 | + factory.setHost("localhost"); |
| 181 | + factory.setPort(amqpProxyPort); |
| 182 | + |
| 183 | + factory.useSslProtocol(); |
| 184 | + factory.useNio(); |
| 185 | + |
| 186 | + factory.setConnectionTimeout(connectionTimeout); |
| 187 | + factory.setHandshakeTimeout(handshakeTimeout); |
| 188 | + |
| 189 | + ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| 190 | + try { |
| 191 | + CountDownLatch latch = new CountDownLatch(1); |
| 192 | + executorService.submit( |
| 193 | + () -> { |
| 194 | + try { |
| 195 | + factory.newConnection(); |
| 196 | + latch.countDown(); |
| 197 | + } catch (SocketTimeoutException e) { |
| 198 | + latch.countDown(); |
| 199 | + } catch (Exception e) { |
| 200 | + // not supposed to happen |
| 201 | + } |
| 202 | + }); |
| 203 | + |
| 204 | + boolean connectionCreatedTimedOut = latch.await(10, TimeUnit.SECONDS); |
| 205 | + assertThat(connectionCreatedTimedOut).isTrue(); |
| 206 | + |
| 207 | + } finally { |
| 208 | + executorService.shutdownNow(); |
| 209 | + } |
| 210 | + } |
| 211 | + } |
| 212 | + |
152 | 213 | private void sendAndVerifyMessage(int size) throws Exception {
|
153 | 214 | CountDownLatch latch = new CountDownLatch(1);
|
154 | 215 | boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size);
|
|
0 commit comments