Skip to content

Commit 8bd7c8c

Browse files
Merge pull request #270 from rabbitmq/rabbitmq-java-client-262
Handle TimeoutException on connection creation
2 parents 1735cb4 + a77deb9 commit 8bd7c8c

File tree

5 files changed

+171
-8
lines changed

5 files changed

+171
-8
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -901,19 +901,28 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
901901
return conn;
902902
} else {
903903
List<Address> addrs = addressResolver.getAddresses();
904-
IOException lastException = null;
904+
Exception lastException = null;
905905
for (Address addr : addrs) {
906906
try {
907907
FrameHandler handler = fhFactory.create(addr);
908-
AMQConnection conn = new AMQConnection(params, handler, metricsCollector);
908+
AMQConnection conn = createConnection(params, handler, metricsCollector);
909909
conn.start();
910910
this.metricsCollector.newConnection(conn);
911911
return conn;
912912
} catch (IOException e) {
913913
lastException = e;
914+
} catch (TimeoutException te) {
915+
lastException = te;
914916
}
915917
}
916-
throw (lastException != null) ? lastException : new IOException("failed to connect");
918+
if (lastException != null) {
919+
if (lastException instanceof IOException) {
920+
throw (IOException) lastException;
921+
} else if (lastException instanceof TimeoutException) {
922+
throw (TimeoutException) lastException;
923+
}
924+
}
925+
throw new IOException("failed to connect");
917926
}
918927
}
919928

@@ -940,6 +949,10 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
940949
return result;
941950
}
942951

952+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
953+
return new AMQConnection(params, frameHandler, metricsCollector);
954+
}
955+
943956
/**
944957
* Create a new broker connection.
945958
*

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,28 +51,42 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
5151
* @return an interface to the connection
5252
* @throws java.io.IOException if it encounters a problem
5353
*/
54-
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
55-
IOException lastException = null;
54+
// package protected API, made public for testing only
55+
public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
56+
Exception lastException = null;
5657
List<Address> shuffled = shuffle(addressResolver.getAddresses());
5758

5859
for (Address addr : shuffled) {
5960
try {
6061
FrameHandler frameHandler = factory.create(addr);
61-
RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler, metricsCollector);
62+
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
6263
conn.start();
6364
metricsCollector.newConnection(conn);
6465
return conn;
6566
} catch (IOException e) {
6667
lastException = e;
68+
} catch (TimeoutException te) {
69+
lastException = te;
6770
}
6871
}
6972

70-
throw (lastException != null) ? lastException : new IOException("failed to connect");
73+
if (lastException != null) {
74+
if (lastException instanceof IOException) {
75+
throw (IOException) lastException;
76+
} else if (lastException instanceof TimeoutException) {
77+
throw (TimeoutException) lastException;
78+
}
79+
}
80+
throw new IOException("failed to connect");
7181
}
7282

7383
private static List<Address> shuffle(List<Address> addrs) {
7484
List<Address> list = new ArrayList<Address>(addrs);
7585
Collections.shuffle(list);
7686
return list;
7787
}
88+
89+
protected RecoveryAwareAMQConnection createConnection(ConnectionParams params, FrameHandler handler, MetricsCollector metricsCollector) {
90+
return new RecoveryAwareAMQConnection(params, handler, metricsCollector);
91+
}
7892
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
DnsRecordIpAddressResolverTests.class,
4444
StandardMetricsCollectorTest.class,
4545
DnsSrvRecordAddressResolverTest.class,
46-
JavaNioTest.class
46+
JavaNioTest.class,
47+
ConnectionFactoryTest.class,
48+
RecoveryAwareAMQConnectionFactoryTest.class
4749
})
4850
public class ClientTests {
4951

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.ConnectionFactory;
21+
import com.rabbitmq.client.MetricsCollector;
22+
import com.rabbitmq.client.impl.AMQConnection;
23+
import com.rabbitmq.client.impl.ConnectionParams;
24+
import com.rabbitmq.client.impl.FrameHandler;
25+
import com.rabbitmq.client.impl.FrameHandlerFactory;
26+
import org.junit.Test;
27+
28+
import java.io.IOException;
29+
import java.util.Queue;
30+
import java.util.concurrent.ArrayBlockingQueue;
31+
import java.util.concurrent.TimeoutException;
32+
33+
import static org.junit.Assert.assertSame;
34+
import static org.mockito.Mockito.*;
35+
36+
public class ConnectionFactoryTest {
37+
38+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/262
39+
@Test public void tryNextAddressIfTimeoutExceptionNoAutoRecovery() throws IOException, TimeoutException {
40+
final AMQConnection connectionThatThrowsTimeout = mock(AMQConnection.class);
41+
final AMQConnection connectionThatSucceeds = mock(AMQConnection.class);
42+
final Queue<AMQConnection> connections = new ArrayBlockingQueue<AMQConnection>(10);
43+
connections.add(connectionThatThrowsTimeout);
44+
connections.add(connectionThatSucceeds);
45+
ConnectionFactory connectionFactory = new ConnectionFactory() {
46+
47+
@Override
48+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
49+
return connections.poll();
50+
}
51+
52+
@Override
53+
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
54+
return mock(FrameHandlerFactory.class);
55+
}
56+
};
57+
connectionFactory.setAutomaticRecoveryEnabled(false);
58+
doThrow(TimeoutException.class).when(connectionThatThrowsTimeout).start();
59+
doNothing().when(connectionThatSucceeds).start();
60+
Connection returnedConnection = connectionFactory.newConnection(
61+
new Address[] { new Address("host1"), new Address("host2") }
62+
);
63+
assertSame(connectionThatSucceeds, returnedConnection);
64+
}
65+
66+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.AddressResolver;
20+
import com.rabbitmq.client.Connection;
21+
import com.rabbitmq.client.MetricsCollector;
22+
import com.rabbitmq.client.impl.ConnectionParams;
23+
import com.rabbitmq.client.impl.FrameHandler;
24+
import com.rabbitmq.client.impl.FrameHandlerFactory;
25+
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnection;
26+
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory;
27+
import org.junit.Test;
28+
29+
import java.io.IOException;
30+
import java.util.Arrays;
31+
import java.util.List;
32+
import java.util.Queue;
33+
import java.util.concurrent.ArrayBlockingQueue;
34+
import java.util.concurrent.TimeoutException;
35+
36+
import static org.junit.Assert.assertSame;
37+
import static org.mockito.Mockito.*;
38+
39+
public class RecoveryAwareAMQConnectionFactoryTest {
40+
41+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/262
42+
@Test public void tryNextAddressIfTimeoutException() throws IOException, TimeoutException {
43+
final RecoveryAwareAMQConnection connectionThatThrowsTimeout = mock(RecoveryAwareAMQConnection.class);
44+
final RecoveryAwareAMQConnection connectionThatSucceeds = mock(RecoveryAwareAMQConnection.class);
45+
final Queue<RecoveryAwareAMQConnection> connections = new ArrayBlockingQueue<RecoveryAwareAMQConnection>(10);
46+
connections.add(connectionThatThrowsTimeout);
47+
connections.add(connectionThatSucceeds);
48+
AddressResolver addressResolver = new AddressResolver() {
49+
@Override
50+
public List<Address> getAddresses() throws IOException {
51+
return Arrays.asList(new Address("host1"), new Address("host2"));
52+
}
53+
};
54+
RecoveryAwareAMQConnectionFactory connectionFactory = new RecoveryAwareAMQConnectionFactory(
55+
new ConnectionParams(), mock(FrameHandlerFactory.class), addressResolver
56+
) {
57+
@Override
58+
protected RecoveryAwareAMQConnection createConnection(ConnectionParams params, FrameHandler handler, MetricsCollector metricsCollector) {
59+
return connections.poll();
60+
}
61+
};
62+
doThrow(TimeoutException.class).when(connectionThatThrowsTimeout).start();
63+
doNothing().when(connectionThatSucceeds).start();
64+
Connection returnedConnection = connectionFactory.newConnection();
65+
assertSame(connectionThatSucceeds, returnedConnection);
66+
}
67+
68+
}

0 commit comments

Comments
 (0)