Skip to content

Multi-threaded topology recovery #370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public class ConnectionFactory implements Cloneable {

private boolean automaticRecovery = true;
private boolean topologyRecovery = true;

private int topologyRecoveryThreads = 1;

// long is used to make sure the users can use both ints
// and longs safely. It is unlikely that anybody'd need
// to use recovery intervals > Integer.MAX_VALUE in practice.
Expand Down Expand Up @@ -339,7 +340,7 @@ public void setUri(String uriString)
setUri(new URI(uriString));
}

private String uriDecode(String s) {
private static String uriDecode(String s) {
try {
// URLDecode decodes '+' to a space, as for
// form encoding. So protect plus signs.
Expand Down Expand Up @@ -523,7 +524,6 @@ public void setSocketFactory(SocketFactory factory) {
*
* @see #setSocketConfigurator(SocketConfigurator)
*/
@SuppressWarnings("unused")
public SocketConfigurator getSocketConfigurator() {
return socketConf;
}
Expand Down Expand Up @@ -701,7 +701,6 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
* @return true if topology recovery is enabled, false otherwise
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
@SuppressWarnings("unused")
public boolean isTopologyRecoveryEnabled() {
return topologyRecovery;
}
Expand All @@ -714,6 +713,15 @@ public boolean isTopologyRecoveryEnabled() {
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
}

public int getTopologyRecoveryThreadCount() {
return topologyRecoveryThreads;
}

// TODO Document that your exception handler method should be thread safe
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
this.topologyRecoveryThreads = topologyRecoveryThreads;
}

public void setMetricsCollector(MetricsCollector metricsCollector) {
this.metricsCollector = metricsCollector;
Expand Down Expand Up @@ -1013,6 +1021,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setNetworkRecoveryInterval(networkRecoveryInterval);
result.setRecoveryDelayHandler(recoveryDelayHandler);
result.setTopologyRecovery(topologyRecovery);
result.setTopologyRecoveryThreadCount(topologyRecoveryThreads);
result.setExceptionHandler(exceptionHandler);
result.setThreadFactory(threadFactory);
result.setHandshakeTimeout(handshakeTimeout);
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ConnectionParams {
private long networkRecoveryInterval;
private RecoveryDelayHandler recoveryDelayHandler;
private boolean topologyRecovery;
private int topologyRecoveryThreads = 1;
private int channelRpcTimeout;
private boolean channelShouldCheckRpcResponseType;
private ErrorOnWriteListener errorOnWriteListener;
Expand Down Expand Up @@ -114,10 +115,14 @@ public RecoveryDelayHandler getRecoveryDelayHandler() {
public boolean isTopologyRecoveryEnabled() {
return topologyRecovery;
}

public int getTopologyRecoveryThreadCount() {
return topologyRecoveryThreads;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
return threadFactory;
}

public int getChannelRpcTimeout() {
return channelRpcTimeout;
Expand Down Expand Up @@ -174,6 +179,10 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand
public void setTopologyRecovery(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
}

public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
this.topologyRecoveryThreads = topologyRecoveryThreads;
}

public void setExceptionHandler(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -525,7 +527,7 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
this.consumerRecoveryListeners.remove(listener);
}

synchronized private void beginAutomaticRecovery() throws InterruptedException {
private synchronized void beginAutomaticRecovery() throws InterruptedException {
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(0));

this.notifyRecoveryListenersStarted();
Expand All @@ -534,18 +536,16 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException {
if (newConn == null) {
return;
}

LOGGER.debug("Connection {} has recovered", newConn);
this.addAutomaticRecoveryListener(newConn);
this.recoverShutdownListeners(newConn);
this.recoverBlockedListeners(newConn);
this.recoverChannels(newConn);
// don't assign new delegate connection until channel recovery is complete
this.delegate = newConn;
if(this.params.isTopologyRecoveryEnabled()) {
this.recoverEntities();
this.recoverConsumers();
if (this.params.isTopologyRecoveryEnabled()) {
recoverTopology(params.getTopologyRecoveryThreadCount());
}

this.notifyRecoveryListenersComplete();
}

Expand Down Expand Up @@ -593,6 +593,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
for (AutorecoveringChannel ch : this.channels.values()) {
try {
ch.automaticallyRecover(this, newConn);
LOGGER.debug("Channel {} has recovered", ch);
} catch (Throwable t) {
newConn.getExceptionHandler().handleChannelRecoveryException(ch, t);
}
Expand All @@ -610,113 +611,124 @@ private void notifyRecoveryListenersStarted() {
f.handleRecoveryStarted(this);
}
}

private void recoverEntities() {
private void recoverTopology(final int recoveryThreads) throws InterruptedException {
// The recovery sequence is the following:
//
// 1. Recover exchanges
// 2. Recover queues
// 3. Recover bindings
// 4. Recover consumers
recoverExchanges();
recoverQueues();
recoverBindings();
}

private void recoverExchanges() {
// recorded exchanges are guaranteed to be
// non-predefined (we filter out predefined ones
// in exchangeDeclare). MK.
for (RecordedExchange x : Utility.copy(this.recordedExchanges).values()) {
if (recoveryThreads > 1) {
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel
// We still need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
final ExecutorService executor = Executors.newFixedThreadPool(recoveryThreads, delegate.getThreadFactory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new pool of threads can be a concern in some environments, so maybe we should just add (yet) another executor service in the connection factory. Parallel topology recovery would occur only if this executor service is not null (so we could get rid of the recoveryThreads property).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
x.recover();
} catch (Exception cause) {
final String message = "Caught an exception while recovering exchange " + x.getName() +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e);
// invokeAll will block until all callables are completed
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could get the Futures back, check there's the same number as the inputs and they are all done, log a warning if not.

executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
} finally {
executor.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check it returns an empty list and log a warning if not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer calling shutdown since the executor is passed in and needs to be reused on subsequent recoveries

}
} else {
// recover entities in serial on the main connection thread
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
recoverExchange(exchange);
}
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
recoverQueue(entry.getKey(), entry.getValue());
}
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
recoverBinding(b);
}
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
recoverConsumer(entry.getKey(), entry.getValue());
}
}
}

private void recoverQueues() {
for (Map.Entry<String, RecordedQueue> entry : Utility.copy(this.recordedQueues).entrySet()) {
String oldName = entry.getKey();
RecordedQueue q = entry.getValue();
try {
q.recover();
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
// their new names. MK.
synchronized (this.recordedQueues) {
this.propagateQueueNameChangeToBindings(oldName, newName);
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
this.recordedQueues.put(newName, q);
private void recoverExchange(final RecordedExchange x) {
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
try {
x.recover();
LOGGER.debug("{} has recovered", x);
} catch (Exception cause) {
final String message = "Caught an exception while recovering exchange " + x.getName() +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e);
}
}

private void recoverQueue(final String oldName, final RecordedQueue q) {
LOGGER.debug("Recovering {}", q);
try {
q.recover();
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
// their new names. MK.
synchronized (this.recordedQueues) {
this.propagateQueueNameChangeToBindings(oldName, newName);
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
this.recordedQueues.put(newName, q);
}
for(QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
} catch (Exception cause) {
final String message = "Caught an exception while recovering queue " + oldName +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
}
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
LOGGER.debug("{} has recovered", q);
} catch (Exception cause) {
final String message = "Caught an exception while recovering queue " + oldName +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
}
}

private void recoverBindings() {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
try {
b.recover();
} catch (Exception cause) {
String message = "Caught an exception while recovering binding between " + b.getSource() +
" and " + b.getDestination() + ": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e);
}
private void recoverBinding(final RecordedBinding b) {
try {
b.recover();
LOGGER.debug("{} has recovered", b);
} catch (Exception cause) {
String message = "Caught an exception while recovering binding between " + b.getSource() +
" and " + b.getDestination() + ": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e);
}
}

private void recoverConsumers() {
for (Map.Entry<String, RecordedConsumer> entry : Utility.copy(this.consumers).entrySet()) {
String tag = entry.getKey();
RecordedConsumer consumer = entry.getValue();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Recovering consumer {}", consumer);
}
try {
String newTag = consumer.recover();
// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
}
consumer.getChannel().updateConsumerTag(tag, newTag);
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
LOGGER.debug("Recovering {}", consumer);
try {
String newTag = consumer.recover();
// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
}
consumer.getChannel().updateConsumerTag(tag, newTag);
}

for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Consumer {} has recovered", consumer);
}
} catch (Exception cause) {
final String message = "Caught an exception while recovering consumer " + tag +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
}
LOGGER.debug("{} has recovered", consumer);
} catch (Exception cause) {
final String message = "Caught an exception while recovering consumer " + tag +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
}
}

Expand All @@ -735,6 +747,42 @@ private void propagateQueueNameChangeToConsumers(String oldName, String newName)
}
}
}

private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel(final Collection<E> entities) {
// map entities by channel
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<AutorecoveringChannel, List<E>>();
for (final E entity : entities) {
final AutorecoveringChannel channel = entity.getChannel();
List<E> list = map.get(channel);
if (list == null) {
map.put(channel, list = new ArrayList<E>());
}
list.add(entity);
}
// now create a runnable per channel
final List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
for (final List<E> entityList : map.values()) {
callables.add(Executors.callable(new Runnable() {
@Override
public void run() {
for (final E entity : entityList) {
if (entity instanceof RecordedExchange) {
recoverExchange((RecordedExchange)entity);
} else if (entity instanceof RecordedQueue) {
final RecordedQueue q = (RecordedQueue) entity;
recoverQueue(q.getName(), q);
} else if (entity instanceof RecordedBinding) {
recoverBinding((RecordedBinding) entity);
} else if (entity instanceof RecordedConsumer) {
final RecordedConsumer c = (RecordedConsumer) entity;
recoverConsumer(c.getConsumerTag(), c);
}
}
}
}));
}
return callables;
}

void recordQueueBinding(AutorecoveringChannel ch,
String queue,
Expand Down
Loading