Skip to content

Commit f75f61d

Browse files
author
Simon MacMullen
committed
Partial revert of be191879c24a to bring the convenience method back.
1 parent 0307e72 commit f75f61d

File tree

4 files changed

+35
-6
lines changed

4 files changed

+35
-6
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,23 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
628628
*/
629629
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
630630

631+
/**
632+
* Start a non-nolocal, non-exclusive consumer, with
633+
* a server-generated consumerTag and specified arguments.
634+
* @param queue the name of the queue
635+
* @param autoAck true if the server should consider messages
636+
* acknowledged once delivered; false if the server should expect
637+
* explicit acknowledgements
638+
* @param arguments a set of arguments for the consume
639+
* @param callback an interface to the consumer object
640+
* @return the consumerTag generated by the server
641+
* @throws java.io.IOException if an error is encountered
642+
* @see com.rabbitmq.client.AMQP.Basic.Consume
643+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
644+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
645+
*/
646+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
647+
631648
/**
632649
* Start a non-nolocal, non-exclusive consumer.
633650
* @param queue the name of the queue

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,14 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback)
946946
return basicConsume(queue, autoAck, "", callback);
947947
}
948948

949+
/** Public API - {@inheritDoc} */
950+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments,
951+
Consumer callback)
952+
throws IOException
953+
{
954+
return basicConsume(queue, autoAck, "", false, false, arguments, callback);
955+
}
956+
949957
/** Public API - {@inheritDoc} */
950958
public String basicConsume(String queue, boolean autoAck, String consumerTag,
951959
Consumer callback)

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,10 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, Co
304304
return basicConsume(queue, autoAck, consumerTag, false, false, null, callback);
305305
}
306306

307+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException {
308+
return basicConsume(queue, autoAck, "", false, false, arguments, callback);
309+
}
310+
307311
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException {
308312
final String result = delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback);
309313
recordConsumer(result, queue, autoAck, exclusive, arguments, callback);

test/src/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
2323
Channel ch = connection.createChannel();
2424
String queue = ch.queueDeclare().getQueue();
2525
try {
26-
ch.basicConsume(queue, true, "", false, false, args, new QueueingConsumer(ch));
26+
ch.basicConsume(queue, true, args, new QueueingConsumer(ch));
2727
fail("Validation should fail for " + args);
2828
} catch (IOException ioe) {
2929
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
@@ -37,14 +37,14 @@ public void testConsumerPriorities() throws Exception {
3737
QueueingConsumer highConsumer = new QueueingConsumer(channel);
3838
QueueingConsumer medConsumer = new QueueingConsumer(channel);
3939
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
40-
channel.basicConsume(queue, true, "high", false, false, args(1), highConsumer);
41-
channel.basicConsume(queue, true, "med", false, false, null, medConsumer);
42-
channel.basicConsume(queue, true, "low", false, false, args(-1), lowConsumer);
40+
String high = channel.basicConsume(queue, true, args(1), highConsumer);
41+
String med = channel.basicConsume(queue, true, medConsumer);
42+
channel.basicConsume(queue, true, args(-1), lowConsumer);
4343

4444
publish(queue, COUNT, "high");
45-
channel.basicCancel("high");
45+
channel.basicCancel(high);
4646
publish(queue, COUNT, "med");
47-
channel.basicCancel("med");
47+
channel.basicCancel(med);
4848
publish(queue, COUNT, "low");
4949

5050
assertContents(highConsumer, COUNT, "high");

0 commit comments

Comments
 (0)