Skip to content

Commit c244435

Browse files
committed
Check qos, heartbeat, max channel are unsigned shorts
To avoid truncation and subtle bugs. Fixes #640 (cherry picked from commit 733788e) Conflicts: src/main/java/com/rabbitmq/client/impl/ChannelN.java src/test/java/com/rabbitmq/client/test/ClientTests.java src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java
1 parent ece4a62 commit c244435

File tree

7 files changed

+159
-52
lines changed

7 files changed

+159
-52
lines changed

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,42 +214,49 @@ public interface Channel extends ShutdownNotifier {
214214

215215
/**
216216
* Request specific "quality of service" settings.
217-
*
217+
* <p>
218218
* These settings impose limits on the amount of data the server
219219
* will deliver to consumers before requiring acknowledgements.
220220
* Thus they provide a means of consumer-initiated flow control.
221-
* @see com.rabbitmq.client.AMQP.Basic.Qos
222-
* @param prefetchSize maximum amount of content (measured in
223-
* octets) that the server will deliver, 0 if unlimited
221+
* <p>
222+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
223+
*
224+
* @param prefetchSize maximum amount of content (measured in
225+
* octets) that the server will deliver, 0 if unlimited
224226
* @param prefetchCount maximum number of messages that the server
225-
* will deliver, 0 if unlimited
226-
* @param global true if the settings should be applied to the
227-
* entire channel rather than each consumer
227+
* will deliver, 0 if unlimited
228+
* @param global true if the settings should be applied to the
229+
* entire channel rather than each consumer
228230
* @throws java.io.IOException if an error is encountered
231+
* @see com.rabbitmq.client.AMQP.Basic.Qos
229232
*/
230233
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
231234

232235
/**
233236
* Request a specific prefetchCount "quality of service" settings
234237
* for this channel.
238+
* <p>
239+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
235240
*
236-
* @see #basicQos(int, int, boolean)
237241
* @param prefetchCount maximum number of messages that the server
238-
* will deliver, 0 if unlimited
239-
* @param global true if the settings should be applied to the
240-
* entire channel rather than each consumer
242+
* will deliver, 0 if unlimited
243+
* @param global true if the settings should be applied to the
244+
* entire channel rather than each consumer
241245
* @throws java.io.IOException if an error is encountered
246+
* @see #basicQos(int, int, boolean)
242247
*/
243248
void basicQos(int prefetchCount, boolean global) throws IOException;
244249

245250
/**
246251
* Request a specific prefetchCount "quality of service" settings
247252
* for this channel.
253+
* <p>
254+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
248255
*
249-
* @see #basicQos(int, int, boolean)
250256
* @param prefetchCount maximum number of messages that the server
251-
* will deliver, 0 if unlimited
257+
* will deliver, 0 if unlimited
252258
* @throws java.io.IOException if an error is encountered
259+
* @see #basicQos(int, int, boolean)
253260
*/
254261
void basicQos(int prefetchCount) throws IOException;
255262

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
*/
6262
public class ConnectionFactory implements Cloneable {
6363

64+
private static final int MAX_UNSIGNED_SHORT = 65535;
65+
6466
/** Default user name */
6567
public static final String DEFAULT_USER = "guest";
6668
/** Default password */
@@ -402,10 +404,16 @@ public int getRequestedChannelMax() {
402404
}
403405

404406
/**
405-
* Set the requested maximum channel number
407+
* Set the requested maximum channel number.
408+
* <p>
409+
* Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
410+
*
406411
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
407412
*/
408413
public void setRequestedChannelMax(int requestedChannelMax) {
414+
if (requestedChannelMax < 0 || requestedChannelMax > MAX_UNSIGNED_SHORT) {
415+
throw new IllegalArgumentException("Requested channel max must be between 0 and " + MAX_UNSIGNED_SHORT);
416+
}
409417
this.requestedChannelMax = requestedChannelMax;
410418
}
411419

@@ -495,10 +503,16 @@ public int getShutdownTimeout() {
495503
* Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval.
496504
* If server heartbeat timeout is configured to a non-zero value, this method can only be used
497505
* to lower the value; otherwise any value provided by the client will be used.
506+
* <p>
507+
* Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
508+
*
498509
* @param requestedHeartbeat the initially requested heartbeat timeout, in seconds; zero for none
499510
* @see <a href="https://rabbitmq.com/heartbeats.html">RabbitMQ Heartbeats Guide</a>
500511
*/
501512
public void setRequestedHeartbeat(int requestedHeartbeat) {
513+
if (requestedHeartbeat < 0 || requestedHeartbeat > MAX_UNSIGNED_SHORT) {
514+
throw new IllegalArgumentException("Requested heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT);
515+
}
502516
this.requestedHeartbeat = requestedHeartbeat;
503517
}
504518

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18-
import com.rabbitmq.client.*;
1918
import com.rabbitmq.client.Method;
19+
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
2121
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
2222
import com.rabbitmq.utility.BlockingCell;
2323
import com.rabbitmq.utility.Utility;
24-
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

@@ -47,6 +46,8 @@ final class Copyright {
4746
*/
4847
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
4948

49+
private static final int MAX_UNSIGNED_SHORT = 65535;
50+
5051
private static final Logger LOGGER = LoggerFactory.getLogger(AMQConnection.class);
5152
// we want socket write and channel shutdown timeouts to kick in after
5253
// the heartbeat one, so we use a value of 105% of the effective heartbeat timeout
@@ -388,6 +389,11 @@ public void start()
388389
int channelMax =
389390
negotiateChannelMax(this.requestedChannelMax,
390391
connTune.getChannelMax());
392+
393+
if (!checkUnsignedShort(channelMax)) {
394+
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
395+
}
396+
391397
_channelManager = instantiateChannelManager(channelMax, threadFactory);
392398

393399
int frameMax =
@@ -399,6 +405,10 @@ public void start()
399405
negotiatedMaxValue(this.requestedHeartbeat,
400406
connTune.getHeartbeat());
401407

408+
if (!checkUnsignedShort(heartbeat)) {
409+
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
410+
}
411+
402412
setHeartbeat(heartbeat);
403413

404414
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
@@ -588,6 +598,10 @@ private static int negotiatedMaxValue(int clientValue, int serverValue) {
588598
Math.min(clientValue, serverValue);
589599
}
590600

601+
private static boolean checkUnsignedShort(int value) {
602+
return value >= 0 && value <= MAX_UNSIGNED_SHORT;
603+
}
604+
591605
private class MainLoop implements Runnable {
592606

593607
/**

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,13 @@
2727
import java.util.concurrent.TimeoutException;
2828

2929
import com.rabbitmq.client.*;
30-
import com.rabbitmq.client.AMQP.BasicProperties;
30+
import com.rabbitmq.client.Connection;
3131
import com.rabbitmq.client.Method;
32-
import com.rabbitmq.client.impl.AMQImpl.Basic;
32+
import com.rabbitmq.client.AMQP.BasicProperties;
3333
import com.rabbitmq.client.impl.AMQImpl.Channel;
34-
import com.rabbitmq.client.impl.AMQImpl.Confirm;
35-
import com.rabbitmq.client.impl.AMQImpl.Exchange;
3634
import com.rabbitmq.client.impl.AMQImpl.Queue;
37-
import com.rabbitmq.client.impl.AMQImpl.Tx;
35+
import com.rabbitmq.client.impl.AMQImpl.*;
3836
import com.rabbitmq.utility.Utility;
39-
4037
import org.slf4j.Logger;
4138
import org.slf4j.LoggerFactory;
4239

@@ -51,6 +48,7 @@
5148
* </pre>
5249
*/
5350
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
51+
private static final int MAX_UNSIGNED_SHORT = 65535;
5452
private static final String UNSPECIFIED_OUT_OF_BAND = "";
5553
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);
5654

@@ -646,7 +644,10 @@ public AMQCommand transformReply(AMQCommand command) {
646644
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
647645
throws IOException
648646
{
649-
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
647+
if (prefetchCount < 0 || prefetchCount > MAX_UNSIGNED_SHORT) {
648+
throw new IllegalArgumentException("Prefetch count must be between 0 and " + MAX_UNSIGNED_SHORT);
649+
}
650+
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
650651
}
651652

652653
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import java.io.IOException;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.stream.Stream;
29+
30+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2831

2932
public class ChannelNTest {
3033

@@ -58,4 +61,32 @@ public void callingBasicCancelForUnknownConsumerThrowsException() throws Excepti
5861
channel.basicCancel("does-not-exist");
5962
}
6063

64+
@Test
65+
public void qosShouldBeUnsignedShort() {
66+
AMQConnection connection = Mockito.mock(AMQConnection.class);
67+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
68+
class TestConfig {
69+
int value;
70+
Consumer call;
71+
72+
public TestConfig(int value, Consumer call) {
73+
this.value = value;
74+
this.call = call;
75+
}
76+
}
77+
Consumer qos = value -> channel.basicQos(value);
78+
Consumer qosGlobal = value -> channel.basicQos(value, true);
79+
Consumer qosPrefetchSize = value -> channel.basicQos(10, value, true);
80+
Stream.of(
81+
new TestConfig(-1, qos), new TestConfig(65536, qos)
82+
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal), new TestConfig(config.value, qosPrefetchSize)))
83+
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
84+
}
85+
86+
interface Consumer {
87+
88+
void apply(int value) throws Exception;
89+
90+
}
91+
6192
}

0 commit comments

Comments
 (0)