Skip to content

Commit ef393cb

Browse files
Avoid mutating collections we iterate over during recovery
1 parent d64b292 commit ef393cb

File tree

2 files changed

+30
-9
lines changed

2 files changed

+30
-9
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.net.ConnectException;
2020
import java.net.InetAddress;
2121
import java.util.ArrayList;
22+
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.concurrent.ConcurrentHashMap;
@@ -439,7 +440,8 @@ private void recoverExchanges() {
439440
}
440441

441442
private void recoverQueues() {
442-
for (Map.Entry<String, RecordedQueue> entry : this.recordedQueues.entrySet()) {
443+
Map<String, RecordedQueue> copy = new HashMap<String, RecordedQueue>(this.recordedQueues);
444+
for (Map.Entry<String, RecordedQueue> entry : copy.entrySet()) {
443445
String oldName = entry.getKey();
444446
RecordedQueue q = entry.getValue();
445447
try {
@@ -473,7 +475,8 @@ private void recoverBindings() {
473475
}
474476

475477
private void recoverConsumers() {
476-
for (Map.Entry<String, RecordedConsumer> entry : this.consumers.entrySet()) {
478+
Map<String, RecordedConsumer> copy = new HashMap<String, RecordedConsumer>(this.consumers);
479+
for (Map.Entry<String, RecordedConsumer> entry : copy.entrySet()) {
477480
String tag = entry.getKey();
478481
RecordedConsumer consumer = entry.getValue();
479482

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import com.rabbitmq.tools.Host;
1010

1111
import java.io.IOException;
12+
import java.util.List;
13+
import java.util.ArrayList;
1214
import java.util.UUID;
1315
import java.util.concurrent.CountDownLatch;
1416
import java.util.concurrent.TimeUnit;
@@ -259,17 +261,33 @@ public void testThatDeletedQueueDoesNotReappearOnRecover() throws IOException, I
259261
}
260262
}
261263

262-
public void testThatCancelledConsumerDoesNotReappearOnRecover() throws IOException, InterruptedException {
263-
String q = UUID.randomUUID().toString();
264-
channel.queueDeclare(q, false, false, false, null);
265-
String tag = channel.basicConsume(q, new DefaultConsumer(channel));
264+
public void testConsumerRecoveryWithManyConsumers() throws IOException, InterruptedException {
265+
String q = channel.queueDeclare(UUID.randomUUID().toString(), false, false, false, null).getQueue();
266+
final int n = 1024;
267+
for (int i = 0; i < n; i++) {
268+
channel.basicConsume(q, new DefaultConsumer(channel));
269+
}
266270
AMQP.Queue.DeclareOk ok1 = channel.queueDeclarePassive(q);
267-
assertEquals(1, ok1.getConsumerCount());
268-
channel.basicCancel(tag);
271+
assertEquals(n, ok1.getConsumerCount());
269272
closeAndWaitForRecovery();
270273
expectChannelRecovery(channel);
271274
AMQP.Queue.DeclareOk ok2 = channel.queueDeclarePassive(q);
272-
assertEquals(0, ok2.getConsumerCount());
275+
assertEquals(n, ok2.getConsumerCount());
276+
277+
}
278+
279+
public void testQueueRecoveryWithManyQueues() throws IOException, InterruptedException, TimeoutException {
280+
List<String> qs = new ArrayList<String>();
281+
final int n = 1024;
282+
for (int i = 0; i < n; i++) {
283+
qs.add(channel.queueDeclare(UUID.randomUUID().toString(), true, false, false, null).getQueue());
284+
}
285+
closeAndWaitForRecovery();
286+
expectChannelRecovery(channel);
287+
for(String q : qs) {
288+
expectQueueRecovery(channel, q);
289+
channel.queueDelete(q);
290+
}
273291
}
274292

275293
public void testChannelRecoveryCallback() throws IOException, InterruptedException {

0 commit comments

Comments
 (0)