Skip to content

Commit 2bf5296

Browse files
dimasacogoluegnes
authored andcommitted
Fix SslEngineByteBufferInputStream to be prepared that read() may still result in BUFFER_UNDERFLOW
1 parent 57fd829 commit 2bf5296

File tree

3 files changed

+50
-47
lines changed

3 files changed

+50
-47
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,11 @@ static int retryRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOEx
4747
return read;
4848
}
4949

50+
static int readWithRetry(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
51+
int bytesRead = NioHelper.read(channel, buffer);
52+
if (bytesRead <= 0) {
53+
bytesRead = NioHelper.retryRead(channel, buffer);
54+
}
55+
return bytesRead;
56+
}
5057
}

src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferInputStream.java

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,57 +43,40 @@ public SslEngineByteBufferInputStream(SSLEngine sslEngine, ByteBuffer plainIn, B
4343

4444
@Override
4545
public int read() throws IOException {
46+
4647
if (plainIn.hasRemaining()) {
4748
return readFromBuffer(plainIn);
48-
} else {
49-
plainIn.clear();
49+
}
50+
51+
plainIn.clear();
52+
53+
while (true) {
54+
5055
SSLEngineResult result = sslEngine.unwrap(cipherIn, plainIn);
5156

5257
switch (result.getStatus()) {
53-
case OK:
54-
plainIn.flip();
55-
if (plainIn.hasRemaining()) {
56-
return readFromBuffer(plainIn);
57-
}
58-
break;
59-
case BUFFER_OVERFLOW:
60-
throw new SSLException("buffer overflow in read");
61-
case BUFFER_UNDERFLOW:
62-
if (cipherIn.hasRemaining()) {
58+
case OK:
59+
plainIn.flip();
60+
if (plainIn.hasRemaining()) {
61+
return readFromBuffer(plainIn);
62+
}
63+
break;
64+
case BUFFER_OVERFLOW:
65+
throw new SSLException("buffer overflow in read");
66+
case BUFFER_UNDERFLOW:
6367
cipherIn.compact();
64-
} else {
65-
cipherIn.clear();
66-
}
67-
68-
int bytesRead = NioHelper.read(channel, cipherIn);
69-
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/307
70-
if (bytesRead <= 0) {
71-
bytesRead = NioHelper.retryRead(channel, cipherIn);
72-
if(bytesRead <= 0) {
68+
int bytesRead = NioHelper.readWithRetry(channel, cipherIn);
69+
if (bytesRead <= 0) {
7370
throw new IllegalStateException("Should be reading something from the network");
7471
}
75-
}
76-
cipherIn.flip();
77-
78-
plainIn.clear();
79-
result = sslEngine.unwrap(cipherIn, plainIn);
80-
81-
if (result.getStatus() != SSLEngineResult.Status.OK) {
82-
throw new SSLException("Unexpected result: " + result);
83-
}
84-
plainIn.flip();
85-
if (plainIn.hasRemaining()) {
86-
return readFromBuffer(plainIn);
87-
}
88-
break;
89-
case CLOSED:
90-
throw new SSLException("closed in read");
91-
default:
92-
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
72+
cipherIn.flip();
73+
break;
74+
case CLOSED:
75+
throw new SSLException("closed in read");
76+
default:
77+
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
9378
}
9479
}
95-
96-
return -1;
9780
}
9881

9982
private int readFromBuffer(ByteBuffer buffer) {

src/test/java/com/rabbitmq/client/test/ssl/NioTlsUnverifiedConnection.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,28 @@ public void configure(SSLEngine sslEngine) throws IOException {
9696
}
9797

9898
@Test public void messageSize() throws Exception {
99-
int [] sizes = new int [] {100, 1000, 10 * 1000, 1 * 1000 * 1000, 5 * 1000 * 1000};
100-
for(int size : sizes) {
101-
CountDownLatch latch = new CountDownLatch(1);
102-
connection = basicGetBasicConsume(connection, "tls.nio.queue", latch, size);
103-
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
104-
assertTrue("Message has not been received", messagesReceived);
99+
int[] sizes = new int[]{100, 1000, 10 * 1000, 1 * 1000 * 1000, 5 * 1000 * 1000};
100+
for (int size : sizes) {
101+
sendAndVerifyMessage(size);
105102
}
106103
}
107104

105+
// The purpose of this test is to put some stress on client TLS layer (SslEngineByteBufferInputStream to be specific)
106+
// in an attempt to trigger condition described in https://github.com/rabbitmq/rabbitmq-java-client/issues/317
107+
// Unfortunately it is not guaranteed to be reproducible
108+
@Test public void largeMessagesTlsTraffic() throws Exception {
109+
for (int i = 0; i < 50; i++) {
110+
sendAndVerifyMessage(76390);
111+
}
112+
}
113+
114+
private void sendAndVerifyMessage(int size) throws Exception {
115+
CountDownLatch latch = new CountDownLatch(1);
116+
connection = basicGetBasicConsume(connection, "tls.nio.queue", latch, size);
117+
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
118+
assertTrue("Message has not been received", messagesReceived);
119+
}
120+
108121
private Connection basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize)
109122
throws IOException, TimeoutException {
110123
Channel channel = connection.createChannel();

0 commit comments

Comments
 (0)