Skip to content

Commit d72ee8a

Browse files
authored
Merge pull request #278 from vikinghawk/4.1.x-stable
basicCancel and basicConsume honor rpc timeout.
2 parents e309568 + d42fab7 commit d72ee8a

File tree

2 files changed

+61
-25
lines changed

2 files changed

+61
-25
lines changed

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
3939

4040
private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
4141

42-
private static final int NO_RPC_TIMEOUT = 0;
42+
protected static final int NO_RPC_TIMEOUT = 0;
4343

4444
/**
4545
* Protected; used instead of synchronizing on the channel itself,
@@ -64,7 +64,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
6464
public volatile boolean _blockContent = false;
6565

6666
/** Timeout for RPC calls */
67-
private final int _rpcTimeout;
67+
protected final int _rpcTimeout;
6868

6969
/**
7070
* Construct a channel on the given connection, with the given channel number.
@@ -243,24 +243,38 @@ private AMQCommand privateRpc(Method m)
243243
try {
244244
return k.getReply(_rpcTimeout);
245245
} catch (TimeoutException e) {
246-
try {
247-
// clean RPC channel state
248-
nextOutstandingRpc();
249-
markRpcFinished();
250-
} catch(Exception ex) {
251-
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
252-
}
253-
throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
246+
throw wrapTimeoutException(m, e);
254247
}
255248
}
256249
}
250+
251+
private void cleanRpcChannelState() {
252+
try {
253+
// clean RPC channel state
254+
nextOutstandingRpc();
255+
markRpcFinished();
256+
} catch (Exception ex) {
257+
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
258+
}
259+
}
260+
261+
/** Cleans RPC channel state after a timeout and wraps the TimeoutException in a ChannelContinuationTimeoutException */
262+
protected ChannelContinuationTimeoutException wrapTimeoutException(final Method m, final TimeoutException e) {
263+
cleanRpcChannelState();
264+
return new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
265+
}
257266

258267
private AMQCommand privateRpc(Method m, int timeout)
259268
throws IOException, ShutdownSignalException, TimeoutException {
260269
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
261270
rpc(m, k);
262271

263-
return k.getReply(timeout);
272+
try {
273+
return k.getReply(timeout);
274+
} catch (TimeoutException e) {
275+
cleanRpcChannelState();
276+
throw e;
277+
}
264278
}
265279

266280
public void rpc(Method m, RpcContinuation k)

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import com.rabbitmq.client.impl.AMQImpl.Tx;
3838
import com.rabbitmq.utility.Utility;
3939

40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
4043
/**
4144
* Main interface to AMQP protocol functionality. Public API -
4245
* Implementation of all AMQChannels except channel zero.
@@ -49,6 +52,7 @@
4952
*/
5053
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
5154
private static final String UNSPECIFIED_OUT_OF_BAND = "";
55+
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);
5256

5357
/** Map from consumer tag to {@link Consumer} instance.
5458
* <p/>
@@ -1239,18 +1243,26 @@ public String transformReply(AMQCommand replyCommand) {
12391243
}
12401244
};
12411245

1242-
rpc(new Basic.Consume.Builder()
1243-
.queue(queue)
1244-
.consumerTag(consumerTag)
1245-
.noLocal(noLocal)
1246-
.noAck(autoAck)
1247-
.exclusive(exclusive)
1248-
.arguments(arguments)
1249-
.build(),
1250-
k);
1246+
final Method m = new Basic.Consume.Builder()
1247+
.queue(queue)
1248+
.consumerTag(consumerTag)
1249+
.noLocal(noLocal)
1250+
.noAck(autoAck)
1251+
.exclusive(exclusive)
1252+
.arguments(arguments)
1253+
.build();
1254+
rpc(m, k);
12511255

12521256
try {
1253-
return k.getReply();
1257+
if(_rpcTimeout == NO_RPC_TIMEOUT) {
1258+
return k.getReply();
1259+
} else {
1260+
try {
1261+
return k.getReply(_rpcTimeout);
1262+
} catch (TimeoutException e) {
1263+
throw wrapTimeoutException(m, e);
1264+
}
1265+
}
12541266
} catch(ShutdownSignalException ex) {
12551267
throw wrap(ex);
12561268
}
@@ -1267,17 +1279,27 @@ public void basicCancel(final String consumerTag)
12671279
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
12681280
@Override
12691281
public Consumer transformReply(AMQCommand replyCommand) {
1270-
replyCommand.getMethod();
1282+
if (!(replyCommand.getMethod() instanceof Basic.CancelOk))
1283+
LOGGER.warn("Received reply {} was not of expected method Basic.CancelOk", replyCommand.getMethod());
12711284
_consumers.remove(consumerTag); //may already have been removed
12721285
dispatcher.handleCancelOk(originalConsumer, consumerTag);
12731286
return originalConsumer;
12741287
}
12751288
};
12761289

1277-
rpc(new Basic.Cancel(consumerTag, false), k);
1278-
1290+
final Method m = new Basic.Cancel(consumerTag, false);
1291+
rpc(m, k);
1292+
12791293
try {
1280-
k.getReply(); // discard result
1294+
if(_rpcTimeout == NO_RPC_TIMEOUT) {
1295+
k.getReply(); // discard result
1296+
} else {
1297+
try {
1298+
k.getReply(_rpcTimeout);
1299+
} catch (TimeoutException e) {
1300+
throw wrapTimeoutException(m, e);
1301+
}
1302+
}
12811303
} catch(ShutdownSignalException ex) {
12821304
throw wrap(ex);
12831305
}

0 commit comments

Comments
 (0)