Skip to content

Refactor BlockingCell with modern concurrency utilities #1657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 25 additions & 34 deletions src/main/java/com/rabbitmq/utility/BlockingCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,34 @@

package com.rabbitmq.utility;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Simple one-shot IPC mechanism. Essentially a one-place buffer that cannot be emptied once filled.
*/
public class BlockingCell<T> {
/** Indicator of not-yet-filledness */
private boolean _filled = false;

/** Will be null until a value is supplied, and possibly still then. */
private T _value;
private final AtomicReference<T> value = new AtomicReference<>();
private final CountDownLatch latch = new CountDownLatch(1);

private static final long NANOS_IN_MILLI = 1000L * 1000L;

private static final long INFINITY = -1;

/** Instantiate a new BlockingCell waiting for a value of the specified type. */
public BlockingCell() {
// no special handling required in default constructor
}

/**
* Wait for a value, and when one arrives, return it (without clearing it). If there's already a value present, there's no need to wait - the existing value
* is returned.
* @return the waited-for value
*
* @throws InterruptedException if this thread is interrupted
*/
public synchronized T get() throws InterruptedException {
while (!_filled) {
wait();
}
return _value;
public T get() throws InterruptedException {
this.latch.await();
return this.value.get();
}

/**
Expand All @@ -60,30 +55,27 @@ public synchronized T get() throws InterruptedException {
* @return the waited-for value
* @throws InterruptedException if this thread is interrupted
*/
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
public T get(long timeout) throws InterruptedException, TimeoutException {
if (timeout == INFINITY) return get();

if (timeout < 0) {
throw new IllegalArgumentException("Timeout cannot be less than zero");
}

long now = System.nanoTime() / NANOS_IN_MILLI;
long maxTime = now + timeout;
while (!_filled && (now = (System.nanoTime() / NANOS_IN_MILLI)) < maxTime) {
wait(maxTime - now);
}
boolean done = this.latch.await(timeout, MILLISECONDS);

if (!_filled)
if (!done) {
throw new TimeoutException();
}

return _value;
return this.value.get();
}

/**
* As get(), but catches and ignores InterruptedException, retrying until a value appears.
* @return the waited-for value
*/
public synchronized T uninterruptibleGet() {
public T uninterruptibleGet() {
boolean wasInterrupted = false;
try {
while (true) {
Expand All @@ -110,7 +102,7 @@ public synchronized T uninterruptibleGet() {
* @param timeout timeout in milliseconds. -1 means 'infinity': never time out
* @return the waited-for value
*/
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
public T uninterruptibleGet(int timeout) throws TimeoutException {
long now = System.nanoTime() / NANOS_IN_MILLI;
long runTime = now + timeout;
boolean wasInterrupted = false;
Expand All @@ -128,33 +120,32 @@ public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
Thread.currentThread().interrupt();
}
}

throw new TimeoutException();
}

/**
* Store a value in this BlockingCell, throwing {@link IllegalStateException} if the cell already has a value.
* @param newValue the new value to store
*/
public synchronized void set(T newValue) {
if (_filled) {
public void set(T newValue) {
if (this.value.compareAndSet(null, newValue)) {
this.latch.countDown();
} else {
throw new IllegalStateException("BlockingCell can only be set once");
}
_value = newValue;
_filled = true;
notifyAll();
}

/**
* Store a value in this BlockingCell if it doesn't already have a value.
* @return true if this call to setIfUnset actually updated the BlockingCell; false if the cell already had a value.
* @param newValue the new value to store
*/
public synchronized boolean setIfUnset(T newValue) {
if (_filled) {
public boolean setIfUnset(T newValue) {
if (this.value.compareAndSet(null, newValue)) {
this.latch.countDown();
return true;
} else {
return false;
}
set(newValue);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public class BlockingValueOrException<V, E extends Throwable & SensibleClone<E>>
extends BlockingCell<ValueOrException<V, E>>
{
public void setValue(V v) {
super.set(ValueOrException.<V, E>makeValue(v));
super.set(ValueOrException.makeValue(v));
}

public void setException(E e) {
super.set(ValueOrException.<V, E>makeException(e));
super.set(ValueOrException.makeException(e));
}

public V uninterruptibleGetValue() throws E {
Expand Down