From a676f1c1f1212f0e83f5f47cd61a1c3ead8dd84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 21 Jul 2025 16:43:16 +0200 Subject: [PATCH] Refactor BlockingCell with modern concurrency utilities "Modern" as in "from the 21st century". --- .../com/rabbitmq/utility/BlockingCell.java | 59 ++++++++----------- .../utility/BlockingValueOrException.java | 4 +- 2 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/rabbitmq/utility/BlockingCell.java b/src/main/java/com/rabbitmq/utility/BlockingCell.java index a78d3a88e..03d694344 100644 --- a/src/main/java/com/rabbitmq/utility/BlockingCell.java +++ b/src/main/java/com/rabbitmq/utility/BlockingCell.java @@ -16,27 +16,24 @@ package com.rabbitmq.utility; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * Simple one-shot IPC mechanism. Essentially a one-place buffer that cannot be emptied once filled. */ public class BlockingCell { - /** Indicator of not-yet-filledness */ - private boolean _filled = false; - /** Will be null until a value is supplied, and possibly still then. */ - private T _value; + private final AtomicReference value = new AtomicReference<>(); + private final CountDownLatch latch = new CountDownLatch(1); private static final long NANOS_IN_MILLI = 1000L * 1000L; private static final long INFINITY = -1; - /** Instantiate a new BlockingCell waiting for a value of the specified type. */ - public BlockingCell() { - // no special handling required in default constructor - } - /** * Wait for a value, and when one arrives, return it (without clearing it). If there's already a value present, there's no need to wait - the existing value * is returned. @@ -44,11 +41,9 @@ public BlockingCell() { * * @throws InterruptedException if this thread is interrupted */ - public synchronized T get() throws InterruptedException { - while (!_filled) { - wait(); - } - return _value; + public T get() throws InterruptedException { + this.latch.await(); + return this.value.get(); } /** @@ -60,30 +55,27 @@ public synchronized T get() throws InterruptedException { * @return the waited-for value * @throws InterruptedException if this thread is interrupted */ - public synchronized T get(long timeout) throws InterruptedException, TimeoutException { + public T get(long timeout) throws InterruptedException, TimeoutException { if (timeout == INFINITY) return get(); if (timeout < 0) { throw new IllegalArgumentException("Timeout cannot be less than zero"); } - long now = System.nanoTime() / NANOS_IN_MILLI; - long maxTime = now + timeout; - while (!_filled && (now = (System.nanoTime() / NANOS_IN_MILLI)) < maxTime) { - wait(maxTime - now); - } + boolean done = this.latch.await(timeout, MILLISECONDS); - if (!_filled) + if (!done) { throw new TimeoutException(); + } - return _value; + return this.value.get(); } /** * As get(), but catches and ignores InterruptedException, retrying until a value appears. * @return the waited-for value */ - public synchronized T uninterruptibleGet() { + public T uninterruptibleGet() { boolean wasInterrupted = false; try { while (true) { @@ -110,7 +102,7 @@ public synchronized T uninterruptibleGet() { * @param timeout timeout in milliseconds. -1 means 'infinity': never time out * @return the waited-for value */ - public synchronized T uninterruptibleGet(int timeout) throws TimeoutException { + public T uninterruptibleGet(int timeout) throws TimeoutException { long now = System.nanoTime() / NANOS_IN_MILLI; long runTime = now + timeout; boolean wasInterrupted = false; @@ -128,7 +120,6 @@ public synchronized T uninterruptibleGet(int timeout) throws TimeoutException { Thread.currentThread().interrupt(); } } - throw new TimeoutException(); } @@ -136,13 +127,12 @@ public synchronized T uninterruptibleGet(int timeout) throws TimeoutException { * Store a value in this BlockingCell, throwing {@link IllegalStateException} if the cell already has a value. * @param newValue the new value to store */ - public synchronized void set(T newValue) { - if (_filled) { + public void set(T newValue) { + if (this.value.compareAndSet(null, newValue)) { + this.latch.countDown(); + } else { throw new IllegalStateException("BlockingCell can only be set once"); } - _value = newValue; - _filled = true; - notifyAll(); } /** @@ -150,11 +140,12 @@ public synchronized void set(T newValue) { * @return true if this call to setIfUnset actually updated the BlockingCell; false if the cell already had a value. * @param newValue the new value to store */ - public synchronized boolean setIfUnset(T newValue) { - if (_filled) { + public boolean setIfUnset(T newValue) { + if (this.value.compareAndSet(null, newValue)) { + this.latch.countDown(); + return true; + } else { return false; } - set(newValue); - return true; } } diff --git a/src/main/java/com/rabbitmq/utility/BlockingValueOrException.java b/src/main/java/com/rabbitmq/utility/BlockingValueOrException.java index 332ab3ddb..654e93681 100644 --- a/src/main/java/com/rabbitmq/utility/BlockingValueOrException.java +++ b/src/main/java/com/rabbitmq/utility/BlockingValueOrException.java @@ -21,11 +21,11 @@ public class BlockingValueOrException> extends BlockingCell> { public void setValue(V v) { - super.set(ValueOrException.makeValue(v)); + super.set(ValueOrException.makeValue(v)); } public void setException(E e) { - super.set(ValueOrException.makeException(e)); + super.set(ValueOrException.makeException(e)); } public V uninterruptibleGetValue() throws E {