Skip to content

Commit 518c340

Browse files
committed
Handle TimeoutException in ConnectionFactory
When auto-recovery is disabled. References #262
1 parent 1735cb4 commit 518c340

File tree

2 files changed

+86
-3
lines changed

2 files changed

+86
-3
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -901,19 +901,28 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
901901
return conn;
902902
} else {
903903
List<Address> addrs = addressResolver.getAddresses();
904-
IOException lastException = null;
904+
Exception lastException = null;
905905
for (Address addr : addrs) {
906906
try {
907907
FrameHandler handler = fhFactory.create(addr);
908-
AMQConnection conn = new AMQConnection(params, handler, metricsCollector);
908+
AMQConnection conn = createConnection(params, handler, metricsCollector);
909909
conn.start();
910910
this.metricsCollector.newConnection(conn);
911911
return conn;
912912
} catch (IOException e) {
913913
lastException = e;
914+
} catch (TimeoutException te) {
915+
lastException = te;
914916
}
915917
}
916-
throw (lastException != null) ? lastException : new IOException("failed to connect");
918+
if (lastException != null) {
919+
if (lastException instanceof IOException) {
920+
throw (IOException) lastException;
921+
} else if (lastException instanceof TimeoutException) {
922+
throw (TimeoutException) lastException;
923+
}
924+
}
925+
throw new IOException("failed to connect");
917926
}
918927
}
919928

@@ -940,6 +949,10 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
940949
return result;
941950
}
942951

952+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
953+
return new AMQConnection(params, frameHandler, metricsCollector);
954+
}
955+
943956
/**
944957
* Create a new broker connection.
945958
*
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.ConnectionFactory;
21+
import com.rabbitmq.client.MetricsCollector;
22+
import com.rabbitmq.client.impl.AMQConnection;
23+
import com.rabbitmq.client.impl.ConnectionParams;
24+
import com.rabbitmq.client.impl.FrameHandler;
25+
import com.rabbitmq.client.impl.FrameHandlerFactory;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
import org.mockito.Mockito;
29+
30+
import java.io.IOException;
31+
import java.util.Queue;
32+
import java.util.concurrent.ArrayBlockingQueue;
33+
import java.util.concurrent.TimeoutException;
34+
35+
import static org.junit.Assert.assertSame;
36+
import static org.mockito.Mockito.doNothing;
37+
import static org.mockito.Mockito.doThrow;
38+
import static org.mockito.Mockito.mock;
39+
40+
public class ConnectionFactoryTest {
41+
42+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/262
43+
@Test public void tryNextAddressIfTimeoutExceptionNoAutoRecovery() throws IOException, TimeoutException {
44+
final AMQConnection connectionThatThrowsTimeout = mock(AMQConnection.class);
45+
final AMQConnection connectionThatSucceeds = mock(AMQConnection.class);
46+
final Queue<AMQConnection> connections = new ArrayBlockingQueue<AMQConnection>(10);
47+
connections.add(connectionThatThrowsTimeout);
48+
connections.add(connectionThatSucceeds);
49+
ConnectionFactory connectionFactory = new ConnectionFactory() {
50+
51+
@Override
52+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
53+
return connections.poll();
54+
}
55+
56+
@Override
57+
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
58+
return mock(FrameHandlerFactory.class);
59+
}
60+
};
61+
connectionFactory.setAutomaticRecoveryEnabled(false);
62+
doThrow(TimeoutException.class).when(connectionThatThrowsTimeout).start();
63+
doNothing().when(connectionThatSucceeds).start();
64+
Connection returnedConnection = connectionFactory.newConnection(
65+
new Address[] { new Address("host1"), new Address("host2") }
66+
);
67+
assertSame(connectionThatSucceeds, returnedConnection);
68+
}
69+
70+
}

0 commit comments

Comments
 (0)