Skip to content

Commit 7579bf8

Browse files
committed
Refactor NIO factories to Java 6
References #410
1 parent 2ab256e commit 7579bf8

File tree

6 files changed

+102
-43
lines changed

6 files changed

+102
-43
lines changed

src/main/java/com/rabbitmq/client/impl/nio/ByteBufferFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* Contract to create {@link ByteBuffer}s.
77
*
88
* @see NioParams
9-
* @since 5.5.0
9+
* @since 4.9.0
1010
*/
1111
public interface ByteBufferFactory {
1212

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,49 @@
11
package com.rabbitmq.client.impl.nio;
22

33
import java.nio.ByteBuffer;
4-
import java.util.function.Function;
54

65
/**
76
* Default {@link ByteBufferFactory} that creates heap-based {@link ByteBuffer}s.
8-
* This behavior can be changed by passing in a custom {@link Function<Integer, ByteBuffer>}
7+
* This behavior can be changed by passing in a custom {@link ByteBufferAllocator}
98
* to the constructor.
109
*
1110
* @see NioParams
1211
* @see ByteBufferFactory
13-
* @since 5.5.0
12+
* @since 4.9.0
1413
*/
1514
public class DefaultByteBufferFactory implements ByteBufferFactory {
1615

17-
private final Function<Integer, ByteBuffer> allocator;
16+
private final ByteBufferAllocator allocator;
1817

19-
public DefaultByteBufferFactory(Function<Integer, ByteBuffer> allocator) {
18+
public DefaultByteBufferFactory(ByteBufferAllocator allocator) {
2019
this.allocator = allocator;
2120
}
2221

2322
public DefaultByteBufferFactory() {
24-
this(capacity -> ByteBuffer.allocate(capacity));
23+
this(new ByteBufferAllocator() {
24+
25+
@Override
26+
public ByteBuffer allocate(int capacity) {
27+
return ByteBuffer.allocate(capacity);
28+
}
29+
});
2530
}
2631

2732
@Override
2833
public ByteBuffer createReadBuffer(NioContext nioContext) {
2934
if (nioContext.getSslEngine() == null) {
30-
return allocator.apply(nioContext.getNioParams().getReadByteBufferSize());
35+
return allocator.allocate(nioContext.getNioParams().getReadByteBufferSize());
3136
} else {
32-
return allocator.apply(nioContext.getSslEngine().getSession().getApplicationBufferSize());
37+
return allocator.allocate(nioContext.getSslEngine().getSession().getApplicationBufferSize());
3338
}
3439
}
3540

3641
@Override
3742
public ByteBuffer createWriteBuffer(NioContext nioContext) {
3843
if (nioContext.getSslEngine() == null) {
39-
return allocator.apply(nioContext.getNioParams().getWriteByteBufferSize());
44+
return allocator.allocate(nioContext.getNioParams().getWriteByteBufferSize());
4045
} else {
41-
return allocator.apply(nioContext.getSslEngine().getSession().getApplicationBufferSize());
46+
return allocator.allocate(nioContext.getSslEngine().getSession().getApplicationBufferSize());
4247
}
4348
}
4449

@@ -56,7 +61,12 @@ protected ByteBuffer createEncryptedByteBuffer(NioContext nioContext) {
5661
if (nioContext.getSslEngine() == null) {
5762
throw new IllegalArgumentException("Encrypted byte buffer should be created only in SSL/TLS context");
5863
} else {
59-
return allocator.apply(nioContext.getSslEngine().getSession().getPacketBufferSize());
64+
return allocator.allocate(nioContext.getSslEngine().getSession().getPacketBufferSize());
6065
}
6166
}
67+
68+
public interface ByteBufferAllocator {
69+
70+
ByteBuffer allocate(int capacity);
71+
}
6272
}

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.ArrayBlockingQueue;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.ThreadFactory;
28-
import java.util.function.Function;
2928

3029
import static com.rabbitmq.client.SslEngineConfigurators.ENABLE_HOSTNAME_VERIFICATION;
3130

@@ -36,11 +35,17 @@
3635
*/
3736
public class NioParams {
3837

39-
static Function<NioContext, NioQueue> DEFAULT_WRITE_QUEUE_FACTORY =
40-
ctx -> new BlockingQueueNioQueue(
41-
new ArrayBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity(), true),
42-
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
43-
);
38+
static NioQueueFactory DEFAULT_WRITE_QUEUE_FACTORY =
39+
new NioQueueFactory() {
40+
41+
@Override
42+
public NioQueue create(NioContext ctx) {
43+
return new BlockingQueueNioQueue(
44+
new ArrayBlockingQueue<WriteRequest>(ctx.getNioParams().getWriteQueueCapacity(), true),
45+
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
46+
);
47+
}
48+
};
4449

4550
/**
4651
* size of the byte buffer used for inbound data
@@ -103,16 +108,16 @@ public void configure(SSLEngine sslEngine) throws IOException {
103108
* The factory to create {@link java.nio.ByteBuffer}s.
104109
* The default is to create heap-based {@link java.nio.ByteBuffer}s.
105110
*
106-
* @since 5.5.0
111+
* @since 4.9.0
107112
*/
108113
private ByteBufferFactory byteBufferFactory = new DefaultByteBufferFactory();
109114

110115
/**
111116
* Factory to create a {@link NioQueue}.
112117
*
113-
* @since 5.5.0
118+
* @since 4.9.0
114119
*/
115-
private Function<NioContext, NioQueue> writeQueueFactory =
120+
private NioQueueFactory writeQueueFactory =
116121
DEFAULT_WRITE_QUEUE_FACTORY;
117122

118123
public NioParams() {
@@ -419,15 +424,14 @@ public ByteBufferFactory getByteBufferFactory() {
419424
* @param writeQueueFactory the factory to use
420425
* @return this {@link NioParams} instance
421426
* @see NioQueue
422-
* @since 5.5.0
427+
* @since 4.9.0
423428
*/
424-
public NioParams setWriteQueueFactory(
425-
Function<NioContext, NioQueue> writeQueueFactory) {
429+
public NioParams setWriteQueueFactory(NioQueueFactory writeQueueFactory) {
426430
this.writeQueueFactory = writeQueueFactory;
427431
return this;
428432
}
429433

430-
public Function<NioContext, NioQueue> getWriteQueueFactory() {
434+
public NioQueueFactory getWriteQueueFactory() {
431435
return writeQueueFactory;
432436
}
433437
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
/**
4+
* Contract to create {@link NioQueue}.
5+
*
6+
* @see NioQueue
7+
* @since 4.9.0
8+
*/
9+
public interface NioQueueFactory {
10+
11+
/**
12+
* Create a {@link NioQueue} instance
13+
*
14+
* @param nioContext
15+
* @return
16+
*/
17+
NioQueue create(NioContext nioContext);
18+
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
7878
NioContext nioContext = new NioContext(nioParams, sslEngine);
7979

8080
this.writeQueue = nioParams.getWriteQueueFactory() == null ?
81-
NioParams.DEFAULT_WRITE_QUEUE_FACTORY.apply(nioContext) :
82-
nioParams.getWriteQueueFactory().apply(nioContext);
81+
NioParams.DEFAULT_WRITE_QUEUE_FACTORY.create(nioContext) :
82+
nioParams.getWriteQueueFactory().create(nioContext);
8383

8484
this.sslEngine = sslEngine;
8585
if(this.sslEngine == null) {

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
import com.rabbitmq.client.*;
44
import com.rabbitmq.client.impl.nio.BlockingQueueNioQueue;
55
import com.rabbitmq.client.impl.nio.DefaultByteBufferFactory;
6+
import com.rabbitmq.client.impl.nio.NioContext;
67
import com.rabbitmq.client.impl.nio.NioParams;
8+
import com.rabbitmq.client.impl.nio.NioQueue;
9+
import com.rabbitmq.client.impl.nio.NioQueueFactory;
10+
import com.rabbitmq.client.impl.nio.WriteRequest;
711
import org.junit.After;
812
import org.junit.Before;
913
import org.junit.Test;
@@ -144,15 +148,22 @@ public void nioLoopCleaning() throws Exception {
144148
NioParams nioParams = new NioParams();
145149
nioParams.setReadByteBufferSize(baseCapacity / 2);
146150
nioParams.setWriteByteBufferSize(baseCapacity / 4);
147-
List<ByteBuffer> byteBuffers = new CopyOnWriteArrayList<>();
148-
cf.setNioParams(nioParams.setByteBufferFactory(new DefaultByteBufferFactory(capacity -> {
149-
ByteBuffer bb = ByteBuffer.allocate(capacity);
150-
byteBuffers.add(bb);
151-
return bb;
151+
final List<ByteBuffer> byteBuffers = new CopyOnWriteArrayList<ByteBuffer>();
152+
cf.setNioParams(nioParams.setByteBufferFactory(new DefaultByteBufferFactory(new DefaultByteBufferFactory.ByteBufferAllocator() {
153+
154+
@Override
155+
public ByteBuffer allocate(int capacity) {
156+
ByteBuffer bb = ByteBuffer.allocate(capacity);
157+
byteBuffers.add(bb);
158+
return bb;
159+
}
152160
})));
153161

154-
try (Connection c = cf.newConnection()) {
162+
Connection c = cf.newConnection();
163+
try {
155164
sendAndVerifyMessage(c, 100);
165+
} finally {
166+
TestUtils.close(c);
156167
}
157168

158169
assertThat(byteBuffers, hasSize(2));
@@ -163,25 +174,41 @@ public void nioLoopCleaning() throws Exception {
163174
@Test public void directByteBuffers() throws Exception {
164175
ConnectionFactory cf = new ConnectionFactory();
165176
cf.useNio();
166-
cf.setNioParams(new NioParams().setByteBufferFactory(new DefaultByteBufferFactory(capacity -> ByteBuffer.allocateDirect(capacity))));
167-
try (Connection c = cf.newConnection()) {
177+
cf.setNioParams(new NioParams().setByteBufferFactory(new DefaultByteBufferFactory(new DefaultByteBufferFactory.ByteBufferAllocator() {
178+
179+
@Override
180+
public ByteBuffer allocate(int capacity) {
181+
return ByteBuffer.allocateDirect(capacity);
182+
}
183+
})));
184+
Connection c = cf.newConnection();
185+
try {
168186
sendAndVerifyMessage(c, 100);
187+
} finally {
188+
TestUtils.close(c);
169189
}
170190
}
171191

172192
@Test public void customWriteQueue() throws Exception {
173193
ConnectionFactory cf = new ConnectionFactory();
174194
cf.useNio();
175-
AtomicInteger count = new AtomicInteger(0);
176-
cf.setNioParams(new NioParams().setWriteQueueFactory(ctx -> {
177-
count.incrementAndGet();
178-
return new BlockingQueueNioQueue(
179-
new LinkedBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity()),
180-
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
181-
);
195+
final AtomicInteger count = new AtomicInteger(0);
196+
cf.setNioParams(new NioParams().setWriteQueueFactory(new NioQueueFactory() {
197+
198+
@Override
199+
public NioQueue create(NioContext ctx) {
200+
count.incrementAndGet();
201+
return new BlockingQueueNioQueue(
202+
new LinkedBlockingQueue<WriteRequest>(ctx.getNioParams().getWriteQueueCapacity()),
203+
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
204+
);
205+
}
182206
}));
183-
try (Connection c = cf.newConnection()) {
207+
Connection c = cf.newConnection();
208+
try {
184209
sendAndVerifyMessage(c, 100);
210+
} finally {
211+
TestUtils.close(c);
185212
}
186213
assertEquals(1, count.get());
187214
}

0 commit comments

Comments
 (0)