Skip to content

Commit cf7b7a9

Browse files
merge bug26396 into default
2 parents 4c036d8 + b48b916 commit cf7b7a9

File tree

5 files changed

+130
-12
lines changed

5 files changed

+130
-12
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,23 @@ public AMQCommand rpc(Method method) throws IOException {
11861186
return exnWrappingRpc(method);
11871187
}
11881188

1189+
@Override
1190+
public void enqueueRpc(RpcContinuation k) {
1191+
synchronized (_channelMutex) {
1192+
super.enqueueRpc(k);
1193+
dispatcher.setUnlimited(true);
1194+
}
1195+
}
1196+
1197+
@Override
1198+
public RpcContinuation nextOutstandingRpc() {
1199+
synchronized (_channelMutex) {
1200+
RpcContinuation res = super.nextOutstandingRpc();
1201+
if (res != null) dispatcher.setUnlimited(false);
1202+
return res;
1203+
}
1204+
}
1205+
11891206
private void handleAckNack(long seqNo, boolean multiple, boolean nack) {
11901207
if (multiple) {
11911208
unconfirmedSet.headSet(seqNo + 1).clear();

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public void quiesce() {
6262
this.shuttingDown = true;
6363
}
6464

65+
public void setUnlimited(boolean unlimited) {
66+
this.workService.unlimit(channel, unlimited);
67+
}
68+
6569
public void handleConsumeOk(final Consumer delegate,
6670
final String consumerTag) {
6771
executeUnlessShuttingDown(

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public void registerKey(Channel channel) {
6464
this.workPool.registerKey(channel);
6565
}
6666

67+
public void unlimit(Channel channel, boolean unlimited) {
68+
this.workPool.unlimit(channel, unlimited);
69+
}
70+
6771
public void addWork(Channel channel, Runnable runnable) {
6872
if (this.workPool.addWorkItem(channel, runnable)) {
6973
this.executor.execute(new WorkPoolRunnable());

src/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
import java.util.Collection;
44
import java.util.HashMap;
55
import java.util.HashSet;
6+
import java.util.LinkedList;
67
import java.util.Map;
78
import java.util.Set;
8-
import java.util.concurrent.BlockingQueue;
9-
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.concurrent.Semaphore;
10+
import java.util.concurrent.atomic.AtomicInteger;
1011

1112
/**
1213
* This is a generic implementation of the <q>Channels</q> specification
@@ -69,12 +70,97 @@
6970
public class WorkPool<K, W> {
7071
private static final int MAX_QUEUE_LENGTH = 1000;
7172

73+
// This is like a LinkedBlockingQueue of limited length except you can turn the limit
74+
// on and off. And it only has the methods we need.
75+
//
76+
// This class is partly synchronised because:
77+
//
78+
// a) we cannot make put(T) synchronised as it may block indefinitely. Therefore we
79+
// only lock before modifying the list.
80+
// b) we don't want to make setUnlimited() synchronised as it is called frequently by
81+
// the channel.
82+
// c) anyway the issue with setUnlimited() is not that it be synchronised itself but
83+
// that calls to it should alternate between false and true. We assert this, but
84+
// it should not be able to go wrong because the RPC calls in AMQChannel and
85+
// ChannelN are all protected by the _channelMutex; we can't have more than one
86+
// outstanding RPC or finish the same RPC twice.
87+
88+
private class WorkQueue {
89+
private LinkedList<W> list;
90+
private boolean unlimited;
91+
private int maxLengthWhenLimited;
92+
93+
private WorkQueue(int maxLengthWhenLimited) {
94+
this.list = new LinkedList<W>();
95+
this.unlimited = false; // Just for assertions
96+
this.maxLengthWhenLimited = maxLengthWhenLimited;
97+
}
98+
99+
public void put(W w) throws InterruptedException {
100+
if (list.size() > maxLengthWhenLimited) {
101+
acquireSemaphore();
102+
}
103+
synchronized (this) {
104+
list.add(w);
105+
}
106+
}
107+
108+
public synchronized W poll() {
109+
W res = list.poll();
110+
111+
if (list.size() <= maxLengthWhenLimited) {
112+
releaseSemaphore();
113+
}
114+
115+
return res;
116+
}
117+
118+
public void setUnlimited(boolean unlimited) {
119+
assert this.unlimited != unlimited;
120+
this.unlimited = unlimited;
121+
if (unlimited) {
122+
increaseUnlimited();
123+
}
124+
else {
125+
decreaseUnlimited();
126+
}
127+
}
128+
129+
public boolean isEmpty() {
130+
return list.isEmpty();
131+
}
132+
}
133+
72134
/** An injective queue of <i>ready</i> clients. */
73135
private final SetQueue<K> ready = new SetQueue<K>();
74136
/** The set of clients which have work <i>in progress</i>. */
75137
private final Set<K> inProgress = new HashSet<K>();
76138
/** The pool of registered clients, with their work queues. */
77-
private final Map<K, BlockingQueue<W>> pool = new HashMap<K, BlockingQueue<W>>();
139+
private final Map<K, WorkQueue> pool = new HashMap<K, WorkQueue>();
140+
141+
// The semaphore should only be used when unlimitedQueues == 0, otherwise we ignore it and
142+
// thus don't block the connection.
143+
private Semaphore semaphore = new Semaphore(1);
144+
private AtomicInteger unlimitedQueues = new AtomicInteger(0);
145+
146+
private void acquireSemaphore() throws InterruptedException {
147+
if (unlimitedQueues.get() == 0) {
148+
semaphore.acquire();
149+
}
150+
}
151+
152+
private void releaseSemaphore() {
153+
semaphore.release();
154+
}
155+
156+
private void increaseUnlimited() {
157+
unlimitedQueues.getAndIncrement();
158+
semaphore.release();
159+
}
160+
161+
private void decreaseUnlimited() {
162+
unlimitedQueues.getAndDecrement();
163+
}
78164

79165
/**
80166
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -86,7 +172,16 @@ public class WorkPool<K, W> {
86172
public void registerKey(K key) {
87173
synchronized (this) {
88174
if (!this.pool.containsKey(key)) {
89-
this.pool.put(key, new LinkedBlockingQueue<W>(MAX_QUEUE_LENGTH));
175+
this.pool.put(key, new WorkQueue(MAX_QUEUE_LENGTH));
176+
}
177+
}
178+
}
179+
180+
public void unlimit(K key, boolean unlimited) {
181+
synchronized (this) {
182+
WorkQueue queue = this.pool.get(key);
183+
if (queue != null) {
184+
queue.setUnlimited(unlimited);
90185
}
91186
}
92187
}
@@ -128,7 +223,7 @@ public K nextWorkBlock(Collection<W> to, int size) {
128223
synchronized (this) {
129224
K nextKey = readyToInProgress();
130225
if (nextKey != null) {
131-
BlockingQueue<W> queue = this.pool.get(nextKey);
226+
WorkQueue queue = this.pool.get(nextKey);
132227
drainTo(queue, to, size);
133228
}
134229
return nextKey;
@@ -137,13 +232,12 @@ public K nextWorkBlock(Collection<W> to, int size) {
137232

138233
/**
139234
* Private implementation of <code><b>drainTo</b></code> (not implemented for <code><b>LinkedList&lt;W&gt;</b></code>s).
140-
* @param <W> element type
141235
* @param deList to take (poll) elements from
142236
* @param c to add elements to
143237
* @param maxElements to take from deList
144238
* @return number of elements actually taken
145239
*/
146-
private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int maxElements) {
240+
private int drainTo(WorkQueue deList, Collection<W> c, int maxElements) {
147241
int n = 0;
148242
while (n < maxElements) {
149243
W first = deList.poll();
@@ -165,7 +259,7 @@ private static <W> int drainTo(BlockingQueue<W> deList, Collection<W> c, int max
165259
* &mdash; <i>as a result of this work item</i>
166260
*/
167261
public boolean addWorkItem(K key, W item) {
168-
BlockingQueue<W> queue;
262+
WorkQueue queue;
169263
synchronized (this) {
170264
queue = this.pool.get(key);
171265
}
@@ -213,7 +307,7 @@ public boolean finishWorkBlock(K key) {
213307
}
214308

215309
private boolean moreWorkItems(K key) {
216-
BlockingQueue<W> leList = this.pool.get(key);
310+
WorkQueue leList = this.pool.get(key);
217311
return leList != null && !leList.isEmpty();
218312
}
219313

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ public static void add(TestSuite suite) {
4040
suite.addTestSuite(Recover.class);
4141
suite.addTestSuite(Reject.class);
4242
suite.addTestSuite(Transactions.class);
43-
// TODO uncomment after we fix bug 26396 one way or the other
44-
//suite.addTestSuite(RequeueOnConnectionClose.class);
45-
//suite.addTestSuite(RequeueOnChannelClose.class);
43+
suite.addTestSuite(RequeueOnConnectionClose.class);
44+
suite.addTestSuite(RequeueOnChannelClose.class);
4645
suite.addTestSuite(DurableOnTransient.class);
4746
suite.addTestSuite(NoRequeueOnCancel.class);
4847
suite.addTestSuite(Bug20004Test.class);

0 commit comments

Comments
 (0)