Skip to content

Commit 842cb42

Browse files
committed
Merge branch '4.0.x-stable' into 4.1.x-stable
2 parents e4fded4 + 8bd7c8c commit 842cb42

File tree

5 files changed

+171
-8
lines changed

5 files changed

+171
-8
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -912,19 +912,28 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
912912
return conn;
913913
} else {
914914
List<Address> addrs = addressResolver.getAddresses();
915-
IOException lastException = null;
915+
Exception lastException = null;
916916
for (Address addr : addrs) {
917917
try {
918918
FrameHandler handler = fhFactory.create(addr);
919-
AMQConnection conn = new AMQConnection(params, handler, metricsCollector);
919+
AMQConnection conn = createConnection(params, handler, metricsCollector);
920920
conn.start();
921921
this.metricsCollector.newConnection(conn);
922922
return conn;
923923
} catch (IOException e) {
924924
lastException = e;
925+
} catch (TimeoutException te) {
926+
lastException = te;
925927
}
926928
}
927-
throw (lastException != null) ? lastException : new IOException("failed to connect");
929+
if (lastException != null) {
930+
if (lastException instanceof IOException) {
931+
throw (IOException) lastException;
932+
} else if (lastException instanceof TimeoutException) {
933+
throw (TimeoutException) lastException;
934+
}
935+
}
936+
throw new IOException("failed to connect");
928937
}
929938
}
930939

@@ -952,6 +961,10 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
952961
return result;
953962
}
954963

964+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
965+
return new AMQConnection(params, frameHandler, metricsCollector);
966+
}
967+
955968
/**
956969
* Create a new broker connection.
957970
*

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,28 +51,42 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa
5151
* @return an interface to the connection
5252
* @throws java.io.IOException if it encounters a problem
5353
*/
54-
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
55-
IOException lastException = null;
54+
// package protected API, made public for testing only
55+
public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
56+
Exception lastException = null;
5657
List<Address> shuffled = shuffle(addressResolver.getAddresses());
5758

5859
for (Address addr : shuffled) {
5960
try {
6061
FrameHandler frameHandler = factory.create(addr);
61-
RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler, metricsCollector);
62+
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
6263
conn.start();
6364
metricsCollector.newConnection(conn);
6465
return conn;
6566
} catch (IOException e) {
6667
lastException = e;
68+
} catch (TimeoutException te) {
69+
lastException = te;
6770
}
6871
}
6972

70-
throw (lastException != null) ? lastException : new IOException("failed to connect");
73+
if (lastException != null) {
74+
if (lastException instanceof IOException) {
75+
throw (IOException) lastException;
76+
} else if (lastException instanceof TimeoutException) {
77+
throw (TimeoutException) lastException;
78+
}
79+
}
80+
throw new IOException("failed to connect");
7181
}
7282

7383
private static List<Address> shuffle(List<Address> addrs) {
7484
List<Address> list = new ArrayList<Address>(addrs);
7585
Collections.shuffle(list);
7686
return list;
7787
}
88+
89+
protected RecoveryAwareAMQConnection createConnection(ConnectionParams params, FrameHandler handler, MetricsCollector metricsCollector) {
90+
return new RecoveryAwareAMQConnection(params, handler, metricsCollector);
91+
}
7892
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
DnsRecordIpAddressResolverTests.class,
4646
StandardMetricsCollectorTest.class,
4747
DnsSrvRecordAddressResolverTest.class,
48-
JavaNioTest.class
48+
JavaNioTest.class,
49+
ConnectionFactoryTest.class,
50+
RecoveryAwareAMQConnectionFactoryTest.class
4951
})
5052
public class ClientTests {
5153

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.ConnectionFactory;
21+
import com.rabbitmq.client.MetricsCollector;
22+
import com.rabbitmq.client.impl.AMQConnection;
23+
import com.rabbitmq.client.impl.ConnectionParams;
24+
import com.rabbitmq.client.impl.FrameHandler;
25+
import com.rabbitmq.client.impl.FrameHandlerFactory;
26+
import org.junit.Test;
27+
28+
import java.io.IOException;
29+
import java.util.Queue;
30+
import java.util.concurrent.ArrayBlockingQueue;
31+
import java.util.concurrent.TimeoutException;
32+
33+
import static org.junit.Assert.assertSame;
34+
import static org.mockito.Mockito.*;
35+
36+
public class ConnectionFactoryTest {
37+
38+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/262
39+
@Test public void tryNextAddressIfTimeoutExceptionNoAutoRecovery() throws IOException, TimeoutException {
40+
final AMQConnection connectionThatThrowsTimeout = mock(AMQConnection.class);
41+
final AMQConnection connectionThatSucceeds = mock(AMQConnection.class);
42+
final Queue<AMQConnection> connections = new ArrayBlockingQueue<AMQConnection>(10);
43+
connections.add(connectionThatThrowsTimeout);
44+
connections.add(connectionThatSucceeds);
45+
ConnectionFactory connectionFactory = new ConnectionFactory() {
46+
47+
@Override
48+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector) {
49+
return connections.poll();
50+
}
51+
52+
@Override
53+
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
54+
return mock(FrameHandlerFactory.class);
55+
}
56+
};
57+
connectionFactory.setAutomaticRecoveryEnabled(false);
58+
doThrow(TimeoutException.class).when(connectionThatThrowsTimeout).start();
59+
doNothing().when(connectionThatSucceeds).start();
60+
Connection returnedConnection = connectionFactory.newConnection(
61+
new Address[] { new Address("host1"), new Address("host2") }
62+
);
63+
assertSame(connectionThatSucceeds, returnedConnection);
64+
}
65+
66+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Address;
19+
import com.rabbitmq.client.AddressResolver;
20+
import com.rabbitmq.client.Connection;
21+
import com.rabbitmq.client.MetricsCollector;
22+
import com.rabbitmq.client.impl.ConnectionParams;
23+
import com.rabbitmq.client.impl.FrameHandler;
24+
import com.rabbitmq.client.impl.FrameHandlerFactory;
25+
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnection;
26+
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory;
27+
import org.junit.Test;
28+
29+
import java.io.IOException;
30+
import java.util.Arrays;
31+
import java.util.List;
32+
import java.util.Queue;
33+
import java.util.concurrent.ArrayBlockingQueue;
34+
import java.util.concurrent.TimeoutException;
35+
36+
import static org.junit.Assert.assertSame;
37+
import static org.mockito.Mockito.*;
38+
39+
public class RecoveryAwareAMQConnectionFactoryTest {
40+
41+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/262
42+
@Test public void tryNextAddressIfTimeoutException() throws IOException, TimeoutException {
43+
final RecoveryAwareAMQConnection connectionThatThrowsTimeout = mock(RecoveryAwareAMQConnection.class);
44+
final RecoveryAwareAMQConnection connectionThatSucceeds = mock(RecoveryAwareAMQConnection.class);
45+
final Queue<RecoveryAwareAMQConnection> connections = new ArrayBlockingQueue<RecoveryAwareAMQConnection>(10);
46+
connections.add(connectionThatThrowsTimeout);
47+
connections.add(connectionThatSucceeds);
48+
AddressResolver addressResolver = new AddressResolver() {
49+
@Override
50+
public List<Address> getAddresses() throws IOException {
51+
return Arrays.asList(new Address("host1"), new Address("host2"));
52+
}
53+
};
54+
RecoveryAwareAMQConnectionFactory connectionFactory = new RecoveryAwareAMQConnectionFactory(
55+
new ConnectionParams(), mock(FrameHandlerFactory.class), addressResolver
56+
) {
57+
@Override
58+
protected RecoveryAwareAMQConnection createConnection(ConnectionParams params, FrameHandler handler, MetricsCollector metricsCollector) {
59+
return connections.poll();
60+
}
61+
};
62+
doThrow(TimeoutException.class).when(connectionThatThrowsTimeout).start();
63+
doNothing().when(connectionThatSucceeds).start();
64+
Connection returnedConnection = connectionFactory.newConnection();
65+
assertSame(connectionThatSucceeds, returnedConnection);
66+
}
67+
68+
}

0 commit comments

Comments
 (0)