Skip to content

Commit 4f900f0

Browse files
committed
Dispatch heartbeat failure in dedicated thread in NIO mode
Otherwise connection recovery can occur in the same NIO thread, leading to a deadlock. Fixes #413 (cherry picked from commit 46d3d2e)
1 parent 5a97bc1 commit 4f900f0

File tree

1 file changed

+34
-17
lines changed

1 file changed

+34
-17
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void run() {
7777
long now = System.currentTimeMillis();
7878
if ((now - state.getLastActivity()) > state.getConnection().getHeartbeat() * 1000 * 2) {
7979
try {
80-
state.getConnection().handleHeartbeatFailure();
80+
handleHeartbeatFailure(state);
8181
} catch (Exception e) {
8282
LOGGER.warn("Error after heartbeat failure of connection {}", state.getConnection());
8383
} finally {
@@ -267,33 +267,50 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex)
267267
}
268268
}
269269

270+
protected void handleHeartbeatFailure(SocketChannelFrameHandlerState state) {
271+
if (needToDispatchIoError(state)) {
272+
dispatchShutdownToConnection(
273+
() -> state.getConnection().handleHeartbeatFailure(),
274+
state.getConnection().toString()
275+
);
276+
} else {
277+
try {
278+
state.close();
279+
} catch (IOException e) {
280+
281+
}
282+
}
283+
}
284+
270285
protected boolean needToDispatchIoError(final SocketChannelFrameHandlerState state) {
271286
return state.getConnection().isOpen();
272287
}
273288

274289
protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState state, final Throwable ex) {
275-
// In case of recovery after the shutdown,
276-
// the new connection shouldn't be initialized in
277-
// the NIO thread, to avoid a deadlock.
278-
Runnable shutdown = () -> state.getConnection().handleIoError(ex);
279-
if (executorService() == null) {
280-
String name = "rabbitmq-connection-shutdown-" + state.getConnection();
281-
Thread shutdownThread = Environment.newThread(threadFactory(), shutdown, name);
282-
shutdownThread.start();
283-
} else {
284-
executorService().submit(shutdown);
285-
}
290+
dispatchShutdownToConnection(
291+
() -> state.getConnection().handleIoError(ex),
292+
state.getConnection().toString()
293+
);
286294
}
287295

288296
protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
289-
Runnable shutdown = () -> state.getConnection().doFinalShutdown();
297+
dispatchShutdownToConnection(
298+
() -> state.getConnection().doFinalShutdown(),
299+
state.getConnection().toString()
300+
);
301+
}
302+
303+
protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable, String connectionName) {
304+
// In case of recovery after the shutdown,
305+
// the new connection shouldn't be initialized in
306+
// the NIO thread, to avoid a deadlock.
290307
if (this.connectionShutdownExecutor != null) {
291-
connectionShutdownExecutor.execute(shutdown);
308+
connectionShutdownExecutor.execute(connectionShutdownRunnable);
292309
} else if (executorService() != null) {
293-
executorService().execute(shutdown);
310+
executorService().execute(connectionShutdownRunnable);
294311
} else {
295-
String name = "rabbitmq-connection-shutdown-" + state.getConnection();
296-
Thread shutdownThread = Environment.newThread(threadFactory(), shutdown, name);
312+
String name = "rabbitmq-connection-shutdown-" + connectionName;
313+
Thread shutdownThread = Environment.newThread(threadFactory(), connectionShutdownRunnable, name);
297314
shutdownThread.start();
298315
}
299316
}

0 commit comments

Comments
 (0)