Skip to content

Commit 6de69db

Browse files
committed
Add ByteBufferFactory for NIO mode
This allows to create heap-based or direct ByteBuffer instances. References #410 (cherry picked from commit bb2c47a) (cherry picked from commit 0252e2c)
1 parent 32520b4 commit 6de69db

File tree

7 files changed

+234
-6
lines changed

7 files changed

+234
-6
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
import java.nio.ByteBuffer;
4+
5+
/**
6+
* Contract to create {@link ByteBuffer}s.
7+
*
8+
* @see NioParams
9+
* @since 5.5.0
10+
*/
11+
public interface ByteBufferFactory {
12+
13+
/**
14+
* Create the {@link ByteBuffer} that contains inbound frames.
15+
* This buffer is the network buffer for plain connections.
16+
* When using SSL/TLS, this buffer isn't directly connected to
17+
* the network, the encrypted read buffer is.
18+
*
19+
* @param nioContext
20+
* @return
21+
*/
22+
ByteBuffer createReadBuffer(NioContext nioContext);
23+
24+
/**
25+
* Create the {@link ByteBuffer} that contains outbound frames.
26+
* This buffer is the network buffer for plain connections.
27+
* When using SSL/TLS, this buffer isn't directed connected to
28+
* the network, the encrypted write buffer is.
29+
*
30+
* @param nioContext
31+
* @return
32+
*/
33+
ByteBuffer createWriteBuffer(NioContext nioContext);
34+
35+
/**
36+
* Create the network read {@link ByteBuffer}.
37+
* This buffer contains encrypted frames read from the network.
38+
* The {@link javax.net.ssl.SSLEngine} decrypts frame and pass them
39+
* over to the read buffer.
40+
*
41+
* @param nioContext
42+
* @return
43+
*/
44+
ByteBuffer createEncryptedReadBuffer(NioContext nioContext);
45+
46+
/**
47+
* Create the network write {@link ByteBuffer}.
48+
* This buffer contains encrypted outbound frames. These
49+
* frames come from the write buffer that sends them through
50+
* the {@link javax.net.ssl.SSLContext} for encryption to
51+
* this buffer.
52+
*
53+
* @param nioContext
54+
* @return
55+
*/
56+
ByteBuffer createEncryptedWriteBuffer(NioContext nioContext);
57+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
import java.nio.ByteBuffer;
4+
import java.util.function.Function;
5+
6+
/**
7+
* Default {@link ByteBufferFactory} that creates heap-based {@link ByteBuffer}s.
8+
* This behavior can be changed by passing in a custom {@link Function<Integer, ByteBuffer>}
9+
* to the constructor.
10+
*
11+
* @see NioParams
12+
* @see ByteBufferFactory
13+
* @since 5.5.0
14+
*/
15+
public class DefaultByteBufferFactory implements ByteBufferFactory {
16+
17+
private final Function<Integer, ByteBuffer> allocator;
18+
19+
public DefaultByteBufferFactory(Function<Integer, ByteBuffer> allocator) {
20+
this.allocator = allocator;
21+
}
22+
23+
public DefaultByteBufferFactory() {
24+
this(capacity -> ByteBuffer.allocate(capacity));
25+
}
26+
27+
@Override
28+
public ByteBuffer createReadBuffer(NioContext nioContext) {
29+
if (nioContext.getSslEngine() == null) {
30+
return allocator.apply(nioContext.getNioParams().getReadByteBufferSize());
31+
} else {
32+
return allocator.apply(nioContext.getSslEngine().getSession().getApplicationBufferSize());
33+
}
34+
}
35+
36+
@Override
37+
public ByteBuffer createWriteBuffer(NioContext nioContext) {
38+
if (nioContext.getSslEngine() == null) {
39+
return allocator.apply(nioContext.getNioParams().getWriteByteBufferSize());
40+
} else {
41+
return allocator.apply(nioContext.getSslEngine().getSession().getApplicationBufferSize());
42+
}
43+
}
44+
45+
@Override
46+
public ByteBuffer createEncryptedReadBuffer(NioContext nioContext) {
47+
return createEncryptedByteBuffer(nioContext);
48+
}
49+
50+
@Override
51+
public ByteBuffer createEncryptedWriteBuffer(NioContext nioContext) {
52+
return createEncryptedByteBuffer(nioContext);
53+
}
54+
55+
protected ByteBuffer createEncryptedByteBuffer(NioContext nioContext) {
56+
if (nioContext.getSslEngine() == null) {
57+
throw new IllegalArgumentException("Encrypted byte buffer should be created only in SSL/TLS context");
58+
} else {
59+
return allocator.apply(nioContext.getSslEngine().getSession().getPacketBufferSize());
60+
}
61+
}
62+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
import javax.net.ssl.SSLEngine;
4+
5+
/**
6+
* Context when creating resources for a NIO-based connection.
7+
*
8+
* @see ByteBufferFactory
9+
* @since 5.5.0
10+
*/
11+
public class NioContext {
12+
13+
private final NioParams nioParams;
14+
15+
private final SSLEngine sslEngine;
16+
17+
NioContext(NioParams nioParams, SSLEngine sslEngine) {
18+
this.nioParams = nioParams;
19+
this.sslEngine = sslEngine;
20+
}
21+
22+
/**
23+
* NIO params.
24+
*
25+
* @return
26+
*/
27+
public NioParams getNioParams() {
28+
return nioParams;
29+
}
30+
31+
/**
32+
* {@link SSLEngine} for SSL/TLS connection.
33+
* Null for plain connection.
34+
*
35+
* @return
36+
*/
37+
public SSLEngine getSslEngine() {
38+
return sslEngine;
39+
}
40+
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoopContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ public NioLoopContext(SocketChannelFrameHandlerFactory socketChannelFrameHandler
4848
this.socketChannelFrameHandlerFactory = socketChannelFrameHandlerFactory;
4949
this.executorService = nioParams.getNioExecutor();
5050
this.threadFactory = nioParams.getThreadFactory();
51-
this.readBuffer = ByteBuffer.allocate(nioParams.getReadByteBufferSize());
52-
this.writeBuffer = ByteBuffer.allocate(nioParams.getWriteByteBufferSize());
51+
NioContext nioContext = new NioContext(nioParams, null);
52+
this.readBuffer = nioParams.getByteBufferFactory().createReadBuffer(nioContext);
53+
this.writeBuffer = nioParams.getByteBufferFactory().createWriteBuffer(nioContext);
5354
}
5455

5556
void initStateIfNecessary() throws IOException {

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ public void configure(SSLEngine sslEngine) throws IOException {
9191
*/
9292
private ExecutorService connectionShutdownExecutor;
9393

94+
/**
95+
* The factory to create {@link java.nio.ByteBuffer}s.
96+
* The default is to create heap-based {@link java.nio.ByteBuffer}s.
97+
*
98+
* @since 5.5.0
99+
*/
100+
private ByteBufferFactory byteBufferFactory = new DefaultByteBufferFactory();
101+
94102
public NioParams() {
95103
}
96104

@@ -105,6 +113,7 @@ public NioParams(NioParams nioParams) {
105113
setSocketChannelConfigurator(nioParams.getSocketChannelConfigurator());
106114
setSslEngineConfigurator(nioParams.getSslEngineConfigurator());
107115
setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor());
116+
setByteBufferFactory(nioParams.getByteBufferFactory());
108117
}
109118

110119
/**
@@ -364,4 +373,24 @@ public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdow
364373
this.connectionShutdownExecutor = connectionShutdownExecutor;
365374
return this;
366375
}
376+
377+
/**
378+
* Set the factory to create {@link java.nio.ByteBuffer}s.
379+
* <p>
380+
* The default implementation creates heap-based {@link java.nio.ByteBuffer}s.
381+
*
382+
* @param byteBufferFactory the factory to use
383+
* @return this {@link NioParams} instance
384+
* @see ByteBufferFactory
385+
* @see DefaultByteBufferFactory
386+
* @since 5.5.0
387+
*/
388+
public NioParams setByteBufferFactory(ByteBufferFactory byteBufferFactory) {
389+
this.byteBufferFactory = byteBufferFactory;
390+
return this;
391+
}
392+
393+
public ByteBufferFactory getByteBufferFactory() {
394+
return byteBufferFactory;
395+
}
367396
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,11 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
9898

9999
} else {
100100
this.ssl = true;
101-
this.plainOut = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
102-
this.cipherOut = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
103-
this.plainIn = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
104-
this.cipherIn = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
101+
NioContext nioContext = new NioContext(nioParams, sslEngine);
102+
this.plainOut = nioParams.getByteBufferFactory().createWriteBuffer(nioContext);
103+
this.cipherOut = nioParams.getByteBufferFactory().createEncryptedWriteBuffer(nioContext);
104+
this.plainIn = nioParams.getByteBufferFactory().createReadBuffer(nioContext);
105+
this.cipherIn = nioParams.getByteBufferFactory().createEncryptedReadBuffer(nioContext);
105106

106107
this.outputStream = new DataOutputStream(
107108
new SslEngineByteBufferOutputStream(sslEngine, plainOut, cipherOut, channel)

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
package com.rabbitmq.client.test;
22

33
import com.rabbitmq.client.*;
4+
import com.rabbitmq.client.impl.nio.DefaultByteBufferFactory;
45
import com.rabbitmq.client.impl.nio.NioParams;
56
import org.junit.After;
67
import org.junit.Before;
78
import org.junit.Test;
89

910
import java.io.IOException;
11+
import java.nio.ByteBuffer;
12+
import java.util.List;
1013
import java.util.concurrent.*;
1114

15+
import static org.hamcrest.Matchers.hasSize;
16+
import static org.hamcrest.Matchers.isOneOf;
17+
import static org.junit.Assert.assertThat;
1218
import static org.junit.Assert.assertTrue;
1319

1420
/**
@@ -128,6 +134,38 @@ public void nioLoopCleaning() throws Exception {
128134
}
129135
}
130136

137+
@Test public void byteBufferFactory() throws Exception {
138+
ConnectionFactory cf = new ConnectionFactory();
139+
cf.useNio();
140+
int baseCapacity = 32768;
141+
NioParams nioParams = new NioParams();
142+
nioParams.setReadByteBufferSize(baseCapacity / 2);
143+
nioParams.setWriteByteBufferSize(baseCapacity / 4);
144+
List<ByteBuffer> byteBuffers = new CopyOnWriteArrayList<>();
145+
cf.setNioParams(nioParams.setByteBufferFactory(new DefaultByteBufferFactory(capacity -> {
146+
ByteBuffer bb = ByteBuffer.allocate(capacity);
147+
byteBuffers.add(bb);
148+
return bb;
149+
})));
150+
151+
try (Connection c = cf.newConnection()) {
152+
sendAndVerifyMessage(c, 100);
153+
}
154+
155+
assertThat(byteBuffers, hasSize(2));
156+
assertThat(byteBuffers.get(0).capacity(), isOneOf(nioParams.getReadByteBufferSize(), nioParams.getWriteByteBufferSize()));
157+
assertThat(byteBuffers.get(1).capacity(), isOneOf(nioParams.getReadByteBufferSize(), nioParams.getWriteByteBufferSize()));
158+
}
159+
160+
@Test public void directByteBuffers() throws Exception {
161+
ConnectionFactory cf = new ConnectionFactory();
162+
cf.useNio();
163+
cf.setNioParams(new NioParams().setByteBufferFactory(new DefaultByteBufferFactory(capacity -> ByteBuffer.allocateDirect(capacity))));
164+
try (Connection c = cf.newConnection()) {
165+
sendAndVerifyMessage(c, 100);
166+
}
167+
}
168+
131169
private void sendAndVerifyMessage(Connection connection, int size) throws Exception {
132170
CountDownLatch latch = new CountDownLatch(1);
133171
boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size);

0 commit comments

Comments
 (0)