Skip to content

Commit 5e0c25a

Browse files
committed
Add abstraction to create NIO frame queue
Allows to use other data structure implementations than JDK's ArrayBlockingQueue and BlockingQueue. References #410 (cherry picked from commit 8c9577d)
1 parent 0252e2c commit 5e0c25a

File tree

5 files changed

+153
-12
lines changed

5 files changed

+153
-12
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.TimeUnit;
5+
6+
/**
7+
* Bridge between {@link NioQueue} and JDK's {@link BlockingQueue}.
8+
*
9+
* @see NioQueue
10+
* @since 5.5.0
11+
*/
12+
public class BlockingQueueNioQueue implements NioQueue {
13+
14+
private final BlockingQueue<WriteRequest> delegate;
15+
private final int writeEnqueuingTimeoutInMs;
16+
17+
public BlockingQueueNioQueue(BlockingQueue<WriteRequest> delegate, int writeEnqueuingTimeoutInMs) {
18+
this.delegate = delegate;
19+
this.writeEnqueuingTimeoutInMs = writeEnqueuingTimeoutInMs;
20+
}
21+
22+
@Override
23+
public boolean offer(WriteRequest writeRequest) throws InterruptedException {
24+
return this.delegate.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
25+
}
26+
27+
@Override
28+
public int size() {
29+
return this.delegate.size();
30+
}
31+
32+
@Override
33+
public WriteRequest poll() {
34+
return this.delegate.poll();
35+
}
36+
37+
@Override
38+
public boolean isEmpty() {
39+
return this.delegate.isEmpty();
40+
}
41+
}

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import com.rabbitmq.client.SslEngineConfigurator;
2121

2222
import javax.net.ssl.SSLEngine;
23+
import java.util.concurrent.ArrayBlockingQueue;
2324
import java.util.concurrent.ExecutorService;
2425
import java.util.concurrent.ThreadFactory;
26+
import java.util.function.Function;
2527

2628
import static com.rabbitmq.client.SslEngineConfigurators.ENABLE_HOSTNAME_VERIFICATION;
2729

@@ -32,6 +34,12 @@
3234
*/
3335
public class NioParams {
3436

37+
static Function<? super NioContext, ? extends NioQueue> DEFAULT_WRITE_QUEUE_FACTORY =
38+
ctx -> new BlockingQueueNioQueue(
39+
new ArrayBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity(), true),
40+
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
41+
);
42+
3543
/**
3644
* size of the byte buffer used for inbound data
3745
*/
@@ -93,6 +101,14 @@ public class NioParams {
93101
*/
94102
private ByteBufferFactory byteBufferFactory = new DefaultByteBufferFactory();
95103

104+
/**
105+
* Factory to create a {@link NioQueue}.
106+
*
107+
* @since 5.5.0
108+
*/
109+
private Function<? super NioContext, ? extends NioQueue> writeQueueFactory =
110+
DEFAULT_WRITE_QUEUE_FACTORY;
111+
96112
public NioParams() {
97113
}
98114

@@ -108,6 +124,7 @@ public NioParams(NioParams nioParams) {
108124
setSslEngineConfigurator(nioParams.getSslEngineConfigurator());
109125
setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor());
110126
setByteBufferFactory(nioParams.getByteBufferFactory());
127+
setWriteQueueFactory(nioParams.getWriteQueueFactory());
111128
}
112129

113130
/**
@@ -384,4 +401,24 @@ public NioParams setByteBufferFactory(ByteBufferFactory byteBufferFactory) {
384401
public ByteBufferFactory getByteBufferFactory() {
385402
return byteBufferFactory;
386403
}
404+
405+
/**
406+
* Set the factory to create {@link NioQueue}s.
407+
* <p>
408+
* The default uses a {@link ArrayBlockingQueue}.
409+
*
410+
* @param writeQueueFactory the factory to use
411+
* @return this {@link NioParams} instance
412+
* @see NioQueue
413+
* @since 5.5.0
414+
*/
415+
public NioParams setWriteQueueFactory(
416+
Function<? super NioContext, ? extends NioQueue> writeQueueFactory) {
417+
this.writeQueueFactory = writeQueueFactory;
418+
return this;
419+
}
420+
421+
public Function<? super NioContext, ? extends NioQueue> getWriteQueueFactory() {
422+
return writeQueueFactory;
423+
}
387424
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
/**
4+
* Contract to exchange frame between application threads and NIO thread.
5+
* <p>
6+
* This is a simplified subset of {@link java.util.concurrent.BlockingQueue}.
7+
* This interface is considered a SPI and is likely to move between
8+
* minor and patch releases.
9+
*
10+
* @see NioParams
11+
* @since 5.5.0
12+
*/
13+
public interface NioQueue {
14+
15+
/**
16+
* Enqueue a frame, block if the queue is full.
17+
*
18+
* @param writeRequest
19+
* @return
20+
* @throws InterruptedException
21+
*/
22+
boolean offer(WriteRequest writeRequest) throws InterruptedException;
23+
24+
/**
25+
* Get the current size of the queue.
26+
*
27+
* @return
28+
*/
29+
int size();
30+
31+
/**
32+
* Retrieves and removes the head of this queue,
33+
* or returns {@code null} if this queue is empty.
34+
*
35+
* @return the head of this queue, or {@code null} if this queue is empty
36+
*/
37+
WriteRequest poll();
38+
39+
/**
40+
* Returns {@code true} if the queue contains no element.
41+
*
42+
* @return {@code true} if the queue contains no element
43+
*/
44+
boolean isEmpty();
45+
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626
import java.nio.ByteBuffer;
2727
import java.nio.channels.SelectionKey;
2828
import java.nio.channels.SocketChannel;
29-
import java.util.Queue;
30-
import java.util.concurrent.ArrayBlockingQueue;
31-
import java.util.concurrent.BlockingQueue;
32-
import java.util.concurrent.TimeUnit;
3329

3430
/**
3531
*
@@ -43,7 +39,7 @@ public class SocketChannelFrameHandlerState {
4339

4440
private final SocketChannel channel;
4541

46-
private final BlockingQueue<WriteRequest> writeQueue;
42+
private final NioQueue writeQueue;
4743

4844
private volatile AMQConnection connection;
4945

@@ -54,8 +50,6 @@ public class SocketChannelFrameHandlerState {
5450

5551
private final SelectorHolder readSelectorState;
5652

57-
private final int writeEnqueuingTimeoutInMs;
58-
5953
final boolean ssl;
6054

6155
final SSLEngine sslEngine;
@@ -80,8 +74,13 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
8074
this.channel = channel;
8175
this.readSelectorState = nioLoopsState.readSelectorState;
8276
this.writeSelectorState = nioLoopsState.writeSelectorState;
83-
this.writeQueue = new ArrayBlockingQueue<WriteRequest>(nioParams.getWriteQueueCapacity(), true);
84-
this.writeEnqueuingTimeoutInMs = nioParams.getWriteEnqueuingTimeoutInMs();
77+
78+
NioContext nioContext = new NioContext(nioParams, sslEngine);
79+
80+
this.writeQueue = nioParams.getWriteQueueFactory() == null ?
81+
NioParams.DEFAULT_WRITE_QUEUE_FACTORY.apply(nioContext) :
82+
nioParams.getWriteQueueFactory().apply(nioContext);
83+
8584
this.sslEngine = sslEngine;
8685
if(this.sslEngine == null) {
8786
this.ssl = false;
@@ -98,7 +97,6 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
9897

9998
} else {
10099
this.ssl = true;
101-
NioContext nioContext = new NioContext(nioParams, sslEngine);
102100
this.plainOut = nioParams.getByteBufferFactory().createWriteBuffer(nioContext);
103101
this.cipherOut = nioParams.getByteBufferFactory().createEncryptedWriteBuffer(nioContext);
104102
this.plainIn = nioParams.getByteBufferFactory().createReadBuffer(nioContext);
@@ -116,7 +114,7 @@ public SocketChannel getChannel() {
116114
return channel;
117115
}
118116

119-
public Queue<WriteRequest> getWriteQueue() {
117+
public NioQueue getWriteQueue() {
120118
return writeQueue;
121119
}
122120

@@ -130,7 +128,7 @@ public void write(Frame frame) throws IOException {
130128

131129
private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
132130
try {
133-
boolean offered = this.writeQueue.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
131+
boolean offered = this.writeQueue.offer(writeRequest);
134132
if(offered) {
135133
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
136134
this.readSelectorState.selector.wakeup();

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.rabbitmq.client.test;
22

33
import com.rabbitmq.client.*;
4+
import com.rabbitmq.client.impl.nio.BlockingQueueNioQueue;
45
import com.rabbitmq.client.impl.nio.DefaultByteBufferFactory;
56
import com.rabbitmq.client.impl.nio.NioParams;
67
import org.junit.After;
@@ -11,9 +12,11 @@
1112
import java.nio.ByteBuffer;
1213
import java.util.List;
1314
import java.util.concurrent.*;
15+
import java.util.concurrent.atomic.AtomicInteger;
1416

1517
import static org.hamcrest.Matchers.hasSize;
1618
import static org.hamcrest.Matchers.isOneOf;
19+
import static org.junit.Assert.assertEquals;
1720
import static org.junit.Assert.assertThat;
1821
import static org.junit.Assert.assertTrue;
1922

@@ -166,6 +169,23 @@ public void nioLoopCleaning() throws Exception {
166169
}
167170
}
168171

172+
@Test public void customWriteQueue() throws Exception {
173+
ConnectionFactory cf = new ConnectionFactory();
174+
cf.useNio();
175+
AtomicInteger count = new AtomicInteger(0);
176+
cf.setNioParams(new NioParams().setWriteQueueFactory(ctx -> {
177+
count.incrementAndGet();
178+
return new BlockingQueueNioQueue(
179+
new LinkedBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity()),
180+
ctx.getNioParams().getWriteEnqueuingTimeoutInMs()
181+
);
182+
}));
183+
try (Connection c = cf.newConnection()) {
184+
sendAndVerifyMessage(c, 100);
185+
}
186+
assertEquals(1, count.get());
187+
}
188+
169189
private void sendAndVerifyMessage(Connection connection, int size) throws Exception {
170190
CountDownLatch latch = new CountDownLatch(1);
171191
boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size);

0 commit comments

Comments
 (0)