Skip to content

Commit 6ff2da5

Browse files
committed
Add abstraction to create NIO frame queue
Allows to use other data structure implementations than JDK's ArrayBlockingQueue and BlockingQueue. References #410 (cherry picked from commit 8c9577d) (cherry picked from commit 5e0c25a) Conflicts: src/main/java/com/rabbitmq/client/impl/nio/NioParams.java
1 parent 6de69db commit 6ff2da5

File tree

5 files changed

+153
-12
lines changed

5 files changed

+153
-12
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.TimeUnit;
5+
6+
/**
7+
* Bridge between {@link NioQueue} and JDK's {@link BlockingQueue}.
8+
*
9+
* @see NioQueue
10+
* @since 5.5.0
11+
*/
12+
public class BlockingQueueNioQueue implements NioQueue {
13+
14+
private final BlockingQueue<WriteRequest> delegate;
15+
private final int writeEnqueuingTimeoutInMs;
16+
17+
public BlockingQueueNioQueue(BlockingQueue<WriteRequest> delegate, int writeEnqueuingTimeoutInMs) {
18+
this.delegate = delegate;
19+
this.writeEnqueuingTimeoutInMs = writeEnqueuingTimeoutInMs;
20+
}
21+
22+
@Override
23+
public boolean offer(WriteRequest writeRequest) throws InterruptedException {
24+
return this.delegate.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
25+
}
26+
27+
@Override
28+
public int size() {
29+
return this.delegate.size();
30+
}
31+
32+
@Override
33+
public WriteRequest poll() {
34+
return this.delegate.poll();
35+
}
36+
37+
@Override
38+
public boolean isEmpty() {
39+
return this.delegate.isEmpty();
40+
}
41+
}

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222

2323
import javax.net.ssl.SSLEngine;
2424
import java.io.IOException;
25+
import java.util.concurrent.ArrayBlockingQueue;
2526
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.ThreadFactory;
28+
import java.util.function.Function;
2729

2830
import static com.rabbitmq.client.SslEngineConfigurators.ENABLE_HOSTNAME_VERIFICATION;
2931

@@ -34,6 +36,12 @@
3436
*/
3537
public class NioParams {
3638

39+
static Function<? super NioContext, ? extends NioQueue> DEFAULT_WRITE_QUEUE_FACTORY =
40+
ctx -> new BlockingQueueNioQueue(
41+
new ArrayBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity(), true),
42+
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
43+
);
44+
3745
/**
3846
* size of the byte buffer used for inbound data
3947
*/
@@ -99,6 +107,14 @@ public void configure(SSLEngine sslEngine) throws IOException {
99107
*/
100108
private ByteBufferFactory byteBufferFactory = new DefaultByteBufferFactory();
101109

110+
/**
111+
* Factory to create a {@link NioQueue}.
112+
*
113+
* @since 5.5.0
114+
*/
115+
private Function<? super NioContext, ? extends NioQueue> writeQueueFactory =
116+
DEFAULT_WRITE_QUEUE_FACTORY;
117+
102118
public NioParams() {
103119
}
104120

@@ -114,6 +130,7 @@ public NioParams(NioParams nioParams) {
114130
setSslEngineConfigurator(nioParams.getSslEngineConfigurator());
115131
setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor());
116132
setByteBufferFactory(nioParams.getByteBufferFactory());
133+
setWriteQueueFactory(nioParams.getWriteQueueFactory());
117134
}
118135

119136
/**
@@ -393,4 +410,24 @@ public NioParams setByteBufferFactory(ByteBufferFactory byteBufferFactory) {
393410
public ByteBufferFactory getByteBufferFactory() {
394411
return byteBufferFactory;
395412
}
413+
414+
/**
415+
* Set the factory to create {@link NioQueue}s.
416+
* <p>
417+
* The default uses a {@link ArrayBlockingQueue}.
418+
*
419+
* @param writeQueueFactory the factory to use
420+
* @return this {@link NioParams} instance
421+
* @see NioQueue
422+
* @since 5.5.0
423+
*/
424+
public NioParams setWriteQueueFactory(
425+
Function<? super NioContext, ? extends NioQueue> writeQueueFactory) {
426+
this.writeQueueFactory = writeQueueFactory;
427+
return this;
428+
}
429+
430+
public Function<? super NioContext, ? extends NioQueue> getWriteQueueFactory() {
431+
return writeQueueFactory;
432+
}
396433
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
/**
4+
* Contract to exchange frame between application threads and NIO thread.
5+
* <p>
6+
* This is a simplified subset of {@link java.util.concurrent.BlockingQueue}.
7+
* This interface is considered a SPI and is likely to move between
8+
* minor and patch releases.
9+
*
10+
* @see NioParams
11+
* @since 5.5.0
12+
*/
13+
public interface NioQueue {
14+
15+
/**
16+
* Enqueue a frame, block if the queue is full.
17+
*
18+
* @param writeRequest
19+
* @return
20+
* @throws InterruptedException
21+
*/
22+
boolean offer(WriteRequest writeRequest) throws InterruptedException;
23+
24+
/**
25+
* Get the current size of the queue.
26+
*
27+
* @return
28+
*/
29+
int size();
30+
31+
/**
32+
* Retrieves and removes the head of this queue,
33+
* or returns {@code null} if this queue is empty.
34+
*
35+
* @return the head of this queue, or {@code null} if this queue is empty
36+
*/
37+
WriteRequest poll();
38+
39+
/**
40+
* Returns {@code true} if the queue contains no element.
41+
*
42+
* @return {@code true} if the queue contains no element
43+
*/
44+
boolean isEmpty();
45+
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626
import java.nio.ByteBuffer;
2727
import java.nio.channels.SelectionKey;
2828
import java.nio.channels.SocketChannel;
29-
import java.util.Queue;
30-
import java.util.concurrent.ArrayBlockingQueue;
31-
import java.util.concurrent.BlockingQueue;
32-
import java.util.concurrent.TimeUnit;
3329

3430
/**
3531
*
@@ -43,7 +39,7 @@ public class SocketChannelFrameHandlerState {
4339

4440
private final SocketChannel channel;
4541

46-
private final BlockingQueue<WriteRequest> writeQueue;
42+
private final NioQueue writeQueue;
4743

4844
private volatile AMQConnection connection;
4945

@@ -54,8 +50,6 @@ public class SocketChannelFrameHandlerState {
5450

5551
private final SelectorHolder readSelectorState;
5652

57-
private final int writeEnqueuingTimeoutInMs;
58-
5953
final boolean ssl;
6054

6155
final SSLEngine sslEngine;
@@ -80,8 +74,13 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
8074
this.channel = channel;
8175
this.readSelectorState = nioLoopsState.readSelectorState;
8276
this.writeSelectorState = nioLoopsState.writeSelectorState;
83-
this.writeQueue = new ArrayBlockingQueue<WriteRequest>(nioParams.getWriteQueueCapacity(), true);
84-
this.writeEnqueuingTimeoutInMs = nioParams.getWriteEnqueuingTimeoutInMs();
77+
78+
NioContext nioContext = new NioContext(nioParams, sslEngine);
79+
80+
this.writeQueue = nioParams.getWriteQueueFactory() == null ?
81+
NioParams.DEFAULT_WRITE_QUEUE_FACTORY.apply(nioContext) :
82+
nioParams.getWriteQueueFactory().apply(nioContext);
83+
8584
this.sslEngine = sslEngine;
8685
if(this.sslEngine == null) {
8786
this.ssl = false;
@@ -98,7 +97,6 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
9897

9998
} else {
10099
this.ssl = true;
101-
NioContext nioContext = new NioContext(nioParams, sslEngine);
102100
this.plainOut = nioParams.getByteBufferFactory().createWriteBuffer(nioContext);
103101
this.cipherOut = nioParams.getByteBufferFactory().createEncryptedWriteBuffer(nioContext);
104102
this.plainIn = nioParams.getByteBufferFactory().createReadBuffer(nioContext);
@@ -116,7 +114,7 @@ public SocketChannel getChannel() {
116114
return channel;
117115
}
118116

119-
public Queue<WriteRequest> getWriteQueue() {
117+
public NioQueue getWriteQueue() {
120118
return writeQueue;
121119
}
122120

@@ -130,7 +128,7 @@ public void write(Frame frame) throws IOException {
130128

131129
private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
132130
try {
133-
boolean offered = this.writeQueue.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
131+
boolean offered = this.writeQueue.offer(writeRequest);
134132
if(offered) {
135133
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
136134
this.readSelectorState.selector.wakeup();

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.rabbitmq.client.test;
22

33
import com.rabbitmq.client.*;
4+
import com.rabbitmq.client.impl.nio.BlockingQueueNioQueue;
45
import com.rabbitmq.client.impl.nio.DefaultByteBufferFactory;
56
import com.rabbitmq.client.impl.nio.NioParams;
67
import org.junit.After;
@@ -11,9 +12,11 @@
1112
import java.nio.ByteBuffer;
1213
import java.util.List;
1314
import java.util.concurrent.*;
15+
import java.util.concurrent.atomic.AtomicInteger;
1416

1517
import static org.hamcrest.Matchers.hasSize;
1618
import static org.hamcrest.Matchers.isOneOf;
19+
import static org.junit.Assert.assertEquals;
1720
import static org.junit.Assert.assertThat;
1821
import static org.junit.Assert.assertTrue;
1922

@@ -166,6 +169,23 @@ public void nioLoopCleaning() throws Exception {
166169
}
167170
}
168171

172+
@Test public void customWriteQueue() throws Exception {
173+
ConnectionFactory cf = new ConnectionFactory();
174+
cf.useNio();
175+
AtomicInteger count = new AtomicInteger(0);
176+
cf.setNioParams(new NioParams().setWriteQueueFactory(ctx -> {
177+
count.incrementAndGet();
178+
return new BlockingQueueNioQueue(
179+
new LinkedBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity()),
180+
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
181+
);
182+
}));
183+
try (Connection c = cf.newConnection()) {
184+
sendAndVerifyMessage(c, 100);
185+
}
186+
assertEquals(1, count.get());
187+
}
188+
169189
private void sendAndVerifyMessage(Connection connection, int size) throws Exception {
170190
CountDownLatch latch = new CountDownLatch(1);
171191
boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size);

0 commit comments

Comments
 (0)