Skip to content

Commit 70feefc

Browse files
authored
Merge pull request #1657 from rabbitmq/refactor-blocking-cell
Refactor BlockingCell with modern concurrency utilities
2 parents 803bc7c + a676f1c commit 70feefc

File tree

2 files changed

+27
-36
lines changed

2 files changed

+27
-36
lines changed

src/main/java/com/rabbitmq/utility/BlockingCell.java

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,34 @@
1616

1717
package com.rabbitmq.utility;
1818

19+
import java.util.concurrent.CountDownLatch;
1920
import java.util.concurrent.TimeoutException;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2024

2125
/**
2226
* Simple one-shot IPC mechanism. Essentially a one-place buffer that cannot be emptied once filled.
2327
*/
2428
public class BlockingCell<T> {
25-
/** Indicator of not-yet-filledness */
26-
private boolean _filled = false;
2729

28-
/** Will be null until a value is supplied, and possibly still then. */
29-
private T _value;
30+
private final AtomicReference<T> value = new AtomicReference<>();
31+
private final CountDownLatch latch = new CountDownLatch(1);
3032

3133
private static final long NANOS_IN_MILLI = 1000L * 1000L;
3234

3335
private static final long INFINITY = -1;
3436

35-
/** Instantiate a new BlockingCell waiting for a value of the specified type. */
36-
public BlockingCell() {
37-
// no special handling required in default constructor
38-
}
39-
4037
/**
4138
* Wait for a value, and when one arrives, return it (without clearing it). If there's already a value present, there's no need to wait - the existing value
4239
* is returned.
4340
* @return the waited-for value
4441
*
4542
* @throws InterruptedException if this thread is interrupted
4643
*/
47-
public synchronized T get() throws InterruptedException {
48-
while (!_filled) {
49-
wait();
50-
}
51-
return _value;
44+
public T get() throws InterruptedException {
45+
this.latch.await();
46+
return this.value.get();
5247
}
5348

5449
/**
@@ -60,30 +55,27 @@ public synchronized T get() throws InterruptedException {
6055
* @return the waited-for value
6156
* @throws InterruptedException if this thread is interrupted
6257
*/
63-
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
58+
public T get(long timeout) throws InterruptedException, TimeoutException {
6459
if (timeout == INFINITY) return get();
6560

6661
if (timeout < 0) {
6762
throw new IllegalArgumentException("Timeout cannot be less than zero");
6863
}
6964

70-
long now = System.nanoTime() / NANOS_IN_MILLI;
71-
long maxTime = now + timeout;
72-
while (!_filled && (now = (System.nanoTime() / NANOS_IN_MILLI)) < maxTime) {
73-
wait(maxTime - now);
74-
}
65+
boolean done = this.latch.await(timeout, MILLISECONDS);
7566

76-
if (!_filled)
67+
if (!done) {
7768
throw new TimeoutException();
69+
}
7870

79-
return _value;
71+
return this.value.get();
8072
}
8173

8274
/**
8375
* As get(), but catches and ignores InterruptedException, retrying until a value appears.
8476
* @return the waited-for value
8577
*/
86-
public synchronized T uninterruptibleGet() {
78+
public T uninterruptibleGet() {
8779
boolean wasInterrupted = false;
8880
try {
8981
while (true) {
@@ -110,7 +102,7 @@ public synchronized T uninterruptibleGet() {
110102
* @param timeout timeout in milliseconds. -1 means 'infinity': never time out
111103
* @return the waited-for value
112104
*/
113-
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
105+
public T uninterruptibleGet(int timeout) throws TimeoutException {
114106
long now = System.nanoTime() / NANOS_IN_MILLI;
115107
long runTime = now + timeout;
116108
boolean wasInterrupted = false;
@@ -128,33 +120,32 @@ public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
128120
Thread.currentThread().interrupt();
129121
}
130122
}
131-
132123
throw new TimeoutException();
133124
}
134125

135126
/**
136127
* Store a value in this BlockingCell, throwing {@link IllegalStateException} if the cell already has a value.
137128
* @param newValue the new value to store
138129
*/
139-
public synchronized void set(T newValue) {
140-
if (_filled) {
130+
public void set(T newValue) {
131+
if (this.value.compareAndSet(null, newValue)) {
132+
this.latch.countDown();
133+
} else {
141134
throw new IllegalStateException("BlockingCell can only be set once");
142135
}
143-
_value = newValue;
144-
_filled = true;
145-
notifyAll();
146136
}
147137

148138
/**
149139
* Store a value in this BlockingCell if it doesn't already have a value.
150140
* @return true if this call to setIfUnset actually updated the BlockingCell; false if the cell already had a value.
151141
* @param newValue the new value to store
152142
*/
153-
public synchronized boolean setIfUnset(T newValue) {
154-
if (_filled) {
143+
public boolean setIfUnset(T newValue) {
144+
if (this.value.compareAndSet(null, newValue)) {
145+
this.latch.countDown();
146+
return true;
147+
} else {
155148
return false;
156149
}
157-
set(newValue);
158-
return true;
159150
}
160151
}

src/main/java/com/rabbitmq/utility/BlockingValueOrException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ public class BlockingValueOrException<V, E extends Throwable & SensibleClone<E>>
2121
extends BlockingCell<ValueOrException<V, E>>
2222
{
2323
public void setValue(V v) {
24-
super.set(ValueOrException.<V, E>makeValue(v));
24+
super.set(ValueOrException.makeValue(v));
2525
}
2626

2727
public void setException(E e) {
28-
super.set(ValueOrException.<V, E>makeException(e));
28+
super.set(ValueOrException.makeException(e));
2929
}
3030

3131
public V uninterruptibleGetValue() throws E {

0 commit comments

Comments
 (0)