Skip to content

Commit 9a23627

Browse files
committed
Handle protocol version mismatch in frame builder
References #319
1 parent 4e282fe commit 9a23627

File tree

4 files changed

+128
-31
lines changed

4 files changed

+128
-31
lines changed

src/main/java/com/rabbitmq/client/impl/nio/FrameBuilder.java

Lines changed: 84 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.MalformedFrameException;
2020
import com.rabbitmq.client.impl.Frame;
2121

22+
import java.io.DataInputStream;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
2425
import java.nio.channels.ReadableByteChannel;
@@ -32,6 +33,7 @@
3233
* start where it left off when the {@link NioLoop} comes back to
3334
* this connection.
3435
* This class is not thread safe.
36+
*
3537
* @since 4.4.0
3638
*/
3739
public class FrameBuilder {
@@ -41,17 +43,14 @@ public class FrameBuilder {
4143
protected final ReadableByteChannel channel;
4244

4345
protected final ByteBuffer applicationBuffer;
44-
45-
private int frameType;
46-
private int frameChannel;
47-
private byte [] framePayload;
48-
49-
private int bytesRead = 0;
50-
5146
// to store the bytes of the outstanding data
5247
// 3 byte-long because the longest we read is an unsigned int
5348
// (not need to store the latest byte)
54-
private final int [] frameBuffer = new int[3];
49+
private final int[] frameBuffer = new int[3];
50+
private int frameType;
51+
private int frameChannel;
52+
private byte[] framePayload;
53+
private int bytesRead = 0;
5554

5655
public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) {
5756
this.channel = channel;
@@ -63,15 +62,19 @@ public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) {
6362
* This method returns null f a frame could not have been fully built from
6463
* the network. The client must then retry later (typically
6564
* when the channel notifies it has something to read).
65+
*
6666
* @return a complete frame or null if a frame couldn't have been fully built
6767
* @throws IOException
68+
* @see Frame#readFrom(DataInputStream)
6869
*/
6970
public Frame readFrame() throws IOException {
70-
while(somethingToRead()) {
71+
while (somethingToRead()) {
7172
if (bytesRead == 0) {
7273
// type
73-
// FIXME check first byte isn't 'A' and thus a header indicating protocol version mismatch
7474
frameType = readFromBuffer();
75+
if (frameType == 'A') {
76+
handleProtocolVersionMismatch();
77+
}
7578
} else if (bytesRead == 1) {
7679
// channel 1/2
7780
frameBuffer[0] = readFromBuffer();
@@ -108,24 +111,17 @@ public Frame readFrame() throws IOException {
108111
return null;
109112
}
110113

111-
private int read() throws IOException {
112-
return NioHelper.read(channel, applicationBuffer);
113-
}
114-
115-
private int readFromBuffer() {
116-
return applicationBuffer.get() & 0xff;
117-
}
118-
119114
/**
120115
* Tells whether there's something to read in the application buffer or not.
121116
* Tries to read from the network if necessary.
117+
*
122118
* @return true if there's something to read in the application buffer
123119
* @throws IOException
124120
*/
125121
protected boolean somethingToRead() throws IOException {
126-
if(!applicationBuffer.hasRemaining()) {
122+
if (!applicationBuffer.hasRemaining()) {
127123
applicationBuffer.clear();
128-
int read = read();
124+
int read = NioHelper.read(channel, applicationBuffer);
129125
applicationBuffer.flip();
130126
if (read > 0) {
131127
return true;
@@ -136,4 +132,72 @@ protected boolean somethingToRead() throws IOException {
136132
return true;
137133
}
138134
}
135+
136+
private int readFromBuffer() {
137+
return applicationBuffer.get() & 0xff;
138+
}
139+
140+
/**
141+
* Handle a protocol version mismatch.
142+
* @return
143+
* @throws IOException
144+
* @see Frame#protocolVersionMismatch(DataInputStream)
145+
*/
146+
private void handleProtocolVersionMismatch() throws IOException {
147+
// Probably an AMQP.... header indicating a version mismatch
148+
// Otherwise meaningless, so try to read the version,
149+
// and throw an exception, whether we read the version
150+
// okay or not.
151+
// Try to read everything from the network, this header
152+
// is small and should never require several network reads.
153+
byte[] expectedBytes = new byte[] { 'M', 'Q', 'P' };
154+
int expectedBytesCount = 0;
155+
while (somethingToRead() && expectedBytesCount < 3) {
156+
// We expect the letters M, Q, P in that order: generate an informative error if they're not found
157+
int nextByte = readFromBuffer();
158+
if (nextByte != expectedBytes[expectedBytesCount]) {
159+
throw new MalformedFrameException("Invalid AMQP protocol header from server: expected character " +
160+
expectedBytes[expectedBytesCount] + ", got " + nextByte);
161+
}
162+
expectedBytesCount++;
163+
}
164+
165+
if (expectedBytesCount != 3) {
166+
throw new MalformedFrameException("Invalid AMQP protocol header from server: read only "
167+
+ (expectedBytesCount + 1) + " byte(s) instead of 4");
168+
}
169+
170+
int[] signature = new int[4];
171+
172+
for (int i = 0; i < 4; i++) {
173+
if (somethingToRead()) {
174+
signature[i] = readFromBuffer();
175+
} else {
176+
throw new MalformedFrameException("Invalid AMQP protocol header from server");
177+
}
178+
}
179+
180+
MalformedFrameException x;
181+
182+
if (signature[0] == 1 &&
183+
signature[1] == 1 &&
184+
signature[2] == 8 &&
185+
signature[3] == 0) {
186+
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " +
187+
AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION +
188+
", server is 0-8");
189+
} else {
190+
String sig = "";
191+
for (int i = 0; i < 4; i++) {
192+
if (i != 0)
193+
sig += ",";
194+
sig += signature[i];
195+
}
196+
197+
x = new MalformedFrameException("AMQP protocol version mismatch; we are version " +
198+
AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION +
199+
", server sent signature " + sig);
200+
}
201+
throw x;
202+
}
139203
}

src/main/java/com/rabbitmq/client/impl/nio/HeaderWriteRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
*/
2626
public class HeaderWriteRequest implements WriteRequest {
2727

28+
public static final WriteRequest SINGLETON = new HeaderWriteRequest();
29+
30+
private HeaderWriteRequest() { }
31+
2832
@Override
2933
public void handle(DataOutputStream outputStream) throws IOException {
3034
outputStream.write("AMQP".getBytes("US-ASCII"));

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public Queue<WriteRequest> getWriteQueue() {
120120
}
121121

122122
public void sendHeader() throws IOException {
123-
sendWriteRequest(new HeaderWriteRequest());
123+
sendWriteRequest(HeaderWriteRequest.SINGLETON);
124124
}
125125

126126
public void write(Frame frame) throws IOException {
@@ -192,12 +192,8 @@ boolean continueReading() throws IOException {
192192
if (!plainIn.hasRemaining() && !cipherIn.hasRemaining()) {
193193
// need to try to read something
194194
cipherIn.clear();
195-
196-
// FIXME this logic may be simplified:
197-
// flipping cipherIn and return cipherIn.hasRemaining should be enough
198-
199195
int bytesRead = NioHelper.read(channel, cipherIn);
200-
if (bytesRead <= 0) {
196+
if (bytesRead == 0) {
201197
return false;
202198
} else {
203199
cipherIn.flip();

src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client.test;
1717

1818
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.MalformedFrameException;
1920
import com.rabbitmq.client.impl.Frame;
2021
import com.rabbitmq.client.impl.nio.FrameBuilder;
2122
import org.junit.Test;
@@ -31,6 +32,7 @@
3132
import static org.hamcrest.Matchers.notNullValue;
3233
import static org.hamcrest.Matchers.nullValue;
3334
import static org.junit.Assert.assertThat;
35+
import static org.junit.Assert.fail;
3436

3537
/**
3638
*
@@ -47,7 +49,7 @@ public class FrameBuilderTest {
4749

4850
@Test
4951
public void buildFrameInOneGo() throws IOException {
50-
buffer = ByteBuffer.wrap(new byte[]{1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end()});
52+
buffer = ByteBuffer.wrap(new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end() });
5153
builder = new FrameBuilder(channel, buffer);
5254
Frame frame = builder.readFrame();
5355
assertThat(frame, notNullValue());
@@ -58,10 +60,10 @@ public void buildFrameInOneGo() throws IOException {
5860

5961
@Test
6062
public void buildFramesInOneGo() throws IOException {
61-
byte[] frameContent = new byte[]{1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end()};
63+
byte[] frameContent = new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end() };
6264
int nbFrames = 13;
6365
byte[] frames = new byte[frameContent.length * nbFrames];
64-
for(int i = 0; i < nbFrames; i++) {
66+
for (int i = 0; i < nbFrames; i++) {
6567
for (int j = 0; j < frameContent.length; j++) {
6668
frames[i * frameContent.length + j] = frameContent[j];
6769
}
@@ -82,7 +84,7 @@ public void buildFramesInOneGo() throws IOException {
8284

8385
@Test
8486
public void buildFrameInSeveralCalls() throws IOException {
85-
buffer = ByteBuffer.wrap(new byte[]{1, 0, 0, 0, 0, 0, 3, 1, 2});
87+
buffer = ByteBuffer.wrap(new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2 });
8688
builder = new FrameBuilder(channel, buffer);
8789
Frame frame = builder.readFrame();
8890
assertThat(frame, nullValue());
@@ -98,12 +100,43 @@ public void buildFrameInSeveralCalls() throws IOException {
98100
assertThat(frame.getPayload().length, is(3));
99101
}
100102

103+
@Test
104+
public void protocolMismatchHeader() throws IOException {
105+
ByteBuffer[] buffers = new ByteBuffer[] {
106+
ByteBuffer.wrap(new byte[] { 'A' }),
107+
ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q' }),
108+
ByteBuffer.wrap(new byte[] { 'A', 'N', 'Q', 'P' }),
109+
ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P' }),
110+
ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P', 1, 1, 8 }),
111+
ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P', 1, 1, 8, 0 }),
112+
ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P', 1, 1, 9, 1 })
113+
};
114+
String[] messages = new String[] {
115+
"Invalid AMQP protocol header from server: read only 1 byte(s) instead of 4",
116+
"Invalid AMQP protocol header from server: read only 3 byte(s) instead of 4",
117+
"Invalid AMQP protocol header from server: expected character 77, got 78",
118+
"Invalid AMQP protocol header from server",
119+
"Invalid AMQP protocol header from server",
120+
"AMQP protocol version mismatch; we are version 0-9-1, server is 0-8",
121+
"AMQP protocol version mismatch; we are version 0-9-1, server sent signature 1,1,9,1"
122+
};
123+
124+
for (int i = 0; i < buffers.length; i++) {
125+
builder = new FrameBuilder(channel, buffers[i]);
126+
try {
127+
builder.readFrame();
128+
fail("protocol header not correct, exception should have been thrown");
129+
} catch (MalformedFrameException e) {
130+
assertThat(e.getMessage(), is(messages[i]));
131+
}
132+
}
133+
}
134+
101135
byte b(int b) {
102136
return (byte) b;
103137
}
104138

105139
byte end() {
106140
return (byte) AMQP.FRAME_END;
107141
}
108-
109142
}

0 commit comments

Comments
 (0)