@@ -265,13 +265,11 @@ public void testThatCancelledConsumerDoesNotReappearOnRecover() throws IOExcepti
265
265
String q = UUID .randomUUID ().toString ();
266
266
channel .queueDeclare (q , false , false , false , null );
267
267
String tag = channel .basicConsume (q , new DefaultConsumer (channel ));
268
- AMQP .Queue .DeclareOk ok1 = channel .queueDeclarePassive (q );
269
- assertEquals (1 , ok1 .getConsumerCount ());
268
+ assertConsumerCount (1 , q );
270
269
channel .basicCancel (tag );
271
270
closeAndWaitForRecovery ();
272
271
expectChannelRecovery (channel );
273
- AMQP .Queue .DeclareOk ok2 = channel .queueDeclarePassive (q );
274
- assertEquals (0 , ok2 .getConsumerCount ());
272
+ assertConsumerCount (0 , q );
275
273
}
276
274
277
275
public void testConsumerRecoveryWithManyConsumers () throws IOException , InterruptedException {
@@ -280,12 +278,10 @@ public void testConsumerRecoveryWithManyConsumers() throws IOException, Interrup
280
278
for (int i = 0 ; i < n ; i ++) {
281
279
channel .basicConsume (q , new DefaultConsumer (channel ));
282
280
}
283
- AMQP .Queue .DeclareOk ok1 = channel .queueDeclarePassive (q );
284
- assertEquals (n , ok1 .getConsumerCount ());
281
+ assertConsumerCount (n , q );
285
282
closeAndWaitForRecovery ();
286
283
expectChannelRecovery (channel );
287
- AMQP .Queue .DeclareOk ok2 = channel .queueDeclarePassive (q );
288
- assertEquals (n , ok2 .getConsumerCount ());
284
+ assertConsumerCount (n , q );
289
285
290
286
}
291
287
@@ -360,6 +356,10 @@ public void handleDelivery(String consumerTag,
360
356
publishingConnection .abort ();
361
357
}
362
358
359
+ private void assertConsumerCount (int exp , String q ) throws IOException {
360
+ assertEquals (exp , channel .queueDeclarePassive (q ).getConsumerCount ());
361
+ }
362
+
363
363
private AMQP .Queue .DeclareOk declareClientNamedQueue (Channel ch , String q ) throws IOException {
364
364
return ch .queueDeclare (q , true , false , false , null );
365
365
}
0 commit comments