Skip to content

Commit a77deb9

Browse files
committed
Handle TimeoutException in RecoveryAwareAMQConnectionFactory
Fixes #262
1 parent 518c340 commit a77deb9

File tree

4 files changed

+90
-10
lines changed

4 files changed

+90
-10
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,28 +51,42 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
5151
* @return an interface to the connection
5252
* @throws java.io.IOException if it encounters a problem
5353
*/
54-
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
55-
IOException lastException = null;
54+
// package protected API, made public for testing only
55+
public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
56+
Exception lastException = null;
5657
List<Address> shuffled = shuffle(addressResolver.getAddresses());
5758

5859
for (Address addr : shuffled) {
5960
try {
6061
FrameHandler frameHandler = factory.create(addr);
61-
RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler, metricsCollector);
62+
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
6263
conn.start();
6364
metricsCollector.newConnection(conn);
6465
return conn;
6566
} catch (IOException e) {
6667
lastException = e;
68+
} catch (TimeoutException te) {
69+
lastException = te;
6770
}
6871
}
6972

70-
throw (lastException != null) ? lastException : new IOException("failed to connect");
73+
if (lastException != null) {
74+
if (lastException instanceof IOException) {
75+
throw (IOException) lastException;
76+
} else if (lastException instanceof TimeoutException) {
77+
throw (TimeoutException) lastException;
78+
}
79+
}
80+
throw new IOException("failed to connect");
7181
}
7282

7383
private static List<Address> shuffle(List<Address> addrs) {
7484
List<Address> list = new ArrayList<Address>(addrs);
7585
Collections.shuffle(list);
7686
return list;
7787
}
88+
89+
protected RecoveryAwareAMQConnection createConnection(ConnectionParams params, FrameHandler handler, MetricsCollector metricsCollector) {
90+
return new RecoveryAwareAMQConnection(params, handler, metricsCollector);
91+
}
7892
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
DnsRecordIpAddressResolverTests.class,
4444
StandardMetricsCollectorTest.class,
4545
DnsSrvRecordAddressResolverTest.class,
46-
JavaNioTest.class
46+
JavaNioTest.class,
47+
ConnectionFactoryTest.class,
48+
RecoveryAwareAMQConnectionFactoryTest.class
4749
})
4850
public class ClientTests {
4951

src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,15 @@
2323
import com.rabbitmq.client.impl.ConnectionParams;
2424
import com.rabbitmq.client.impl.FrameHandler;
2525
import com.rabbitmq.client.impl.FrameHandlerFactory;
26-
import org.junit.Assert;
2726
import org.junit.Test;
28-
import org.mockito.Mockito;
2927

3028
import java.io.IOException;
3129
import java.util.Queue;
3230
import java.util.concurrent.ArrayBlockingQueue;
3331
import java.util.concurrent.TimeoutException;
3432

3533
import static org.junit.Assert.assertSame;
36-
import static org.mockito.Mockito.doNothing;
37-
import static org.mockito.Mockito.doThrow;
38-
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.*;
3935

4036
public class ConnectionFactoryTest {
4137

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.AddressResolver;
20+
import com.rabbitmq.client.Connection;
21+
import com.rabbitmq.client.MetricsCollector;
22+
import com.rabbitmq.client.impl.ConnectionParams;
23+
import com.rabbitmq.client.impl.FrameHandler;
24+
import com.rabbitmq.client.impl.FrameHandlerFactory;
25+
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnection;
26+
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory;
27+
import org.junit.Test;
28+
29+
import java.io.IOException;
30+
import java.util.Arrays;
31+
import java.util.List;
32+
import java.util.Queue;
33+
import java.util.concurrent.ArrayBlockingQueue;
34+
import java.util.concurrent.TimeoutException;
35+
36+
import static org.junit.Assert.assertSame;
37+
import static org.mockito.Mockito.*;
38+
39+
public class RecoveryAwareAMQConnectionFactoryTest {
40+
41+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/262
42+
@Test public void tryNextAddressIfTimeoutException() throws IOException, TimeoutException {
43+
final RecoveryAwareAMQConnection connectionThatThrowsTimeout = mock(RecoveryAwareAMQConnection.class);
44+
final RecoveryAwareAMQConnection connectionThatSucceeds = mock(RecoveryAwareAMQConnection.class);
45+
final Queue<RecoveryAwareAMQConnection> connections = new ArrayBlockingQueue<RecoveryAwareAMQConnection>(10);
46+
connections.add(connectionThatThrowsTimeout);
47+
connections.add(connectionThatSucceeds);
48+
AddressResolver addressResolver = new AddressResolver() {
49+
@Override
50+
public List<Address> getAddresses() throws IOException {
51+
return Arrays.asList(new Address("host1"), new Address("host2"));
52+
}
53+
};
54+
RecoveryAwareAMQConnectionFactory connectionFactory = new RecoveryAwareAMQConnectionFactory(
55+
new ConnectionParams(), mock(FrameHandlerFactory.class), addressResolver
56+
) {
57+
@Override
58+
protected RecoveryAwareAMQConnection createConnection(ConnectionParams params, FrameHandler handler, MetricsCollector metricsCollector) {
59+
return connections.poll();
60+
}
61+
};
62+
doThrow(TimeoutException.class).when(connectionThatThrowsTimeout).start();
63+
doNothing().when(connectionThatSucceeds).start();
64+
Connection returnedConnection = connectionFactory.newConnection();
65+
assertSame(connectionThatSucceeds, returnedConnection);
66+
}
67+
68+
}

0 commit comments

Comments
 (0)