Skip to content

Commit 1238701

Browse files
Merge pull request #346 from vikinghawk/4.x.x-stable
Add excludeQueueFromRecovery method
2 parents a00e9ff + 19d7d53 commit 1238701

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,28 @@ void deleteRecordedQueue(String queue) {
761761
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
762762
}
763763
}
764+
765+
/**
766+
* Exclude the queue from the list of queues to recover after connection failure.
767+
* Intended to be used in usecases where you want to remove the queue from this connection's recovery list but don't want to delete the queue from the server.
768+
*
769+
* @param queue queue name to exclude from recorded recovery queues
770+
* @param ifUnused if true, the RecordedQueue will only be excluded if no local consumers are using it.
771+
*/
772+
public void excludeQueueFromRecovery(final String queue, final boolean ifUnused) {
773+
if (ifUnused) {
774+
// Note: This is basically the same as maybeDeleteRecordedAutoDeleteQueue except it works for non auto-delete queues as well.
775+
synchronized (this.consumers) {
776+
synchronized (this.recordedQueues) {
777+
if (!hasMoreConsumersOnQueue(this.consumers.values(), queue)) {
778+
deleteRecordedQueue(queue);
779+
}
780+
}
781+
}
782+
} else {
783+
deleteRecordedQueue(queue);
784+
}
785+
}
764786

765787
void recordExchange(String exchange, RecordedExchange x) {
766788
this.recordedExchanges.put(exchange, x);
@@ -789,7 +811,7 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
789811
RecordedQueue q = this.recordedQueues.get(queue);
790812
// last consumer on this connection is gone, remove recorded queue
791813
// if it is auto-deleted. See bug 26364.
792-
if((q != null) && q.isAutoDelete()) {
814+
if(q != null && q.isAutoDelete()) {
793815
deleteRecordedQueue(queue);
794816
}
795817
}
@@ -804,7 +826,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
804826
RecordedExchange x = this.recordedExchanges.get(exchange);
805827
// last binding where this exchange is the source is gone, remove recorded exchange
806828
// if it is auto-deleted. See bug 26364.
807-
if((x != null) && x.isAutoDelete()) {
829+
if(x != null && x.isAutoDelete()) {
808830
deleteRecordedExchange(exchange);
809831
}
810832
}

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,28 @@ public void queueRecovered(String oldName, String newName) {
559559
// expected
560560
}
561561
}
562+
563+
@Test public void thatExcludedQueueDoesNotReappearOnRecover() throws IOException, InterruptedException {
564+
final String q = "java-client.test.recovery.excludedQueue1";
565+
channel.queueDeclare(q, true, false, false, null);
566+
// now delete it using the delegate so AutorecoveringConnection and AutorecoveringChannel are not aware of it
567+
((AutorecoveringChannel)channel).getDelegate().queueDelete(q);
568+
assertNotNull(((AutorecoveringConnection)connection).getRecordedQueues().get(q));
569+
// exclude the queue from recovery
570+
((AutorecoveringConnection)connection).excludeQueueFromRecovery(q, true);
571+
// verify its not there
572+
assertNull(((AutorecoveringConnection)connection).getRecordedQueues().get(q));
573+
// reconnect
574+
closeAndWaitForRecovery();
575+
expectChannelRecovery(channel);
576+
// verify queue was not recreated
577+
try {
578+
channel.queueDeclarePassive(q);
579+
fail("Expected passive declare to fail");
580+
} catch (IOException ioe) {
581+
// expected
582+
}
583+
}
562584

563585
@Test public void thatCancelledConsumerDoesNotReappearOnRecover() throws IOException, InterruptedException {
564586
String q = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)