@@ -77,7 +77,7 @@ public void run() {
77
77
long now = System .currentTimeMillis ();
78
78
if ((now - state .getLastActivity ()) > state .getConnection ().getHeartbeat () * 1000 * 2 ) {
79
79
try {
80
- state . getConnection (). handleHeartbeatFailure ();
80
+ handleHeartbeatFailure (state );
81
81
} catch (Exception e ) {
82
82
LOGGER .warn ("Error after heartbeat failure of connection {}" , state .getConnection ());
83
83
} catch (AssertionError e ) {
@@ -270,48 +270,68 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex)
270
270
}
271
271
}
272
272
273
+ protected void handleHeartbeatFailure (final SocketChannelFrameHandlerState state ) {
274
+ if (needToDispatchIoError (state )) {
275
+ dispatchShutdownToConnection (
276
+ new Runnable () {
277
+
278
+ @ Override
279
+ public void run () {
280
+ state .getConnection ().handleHeartbeatFailure ();
281
+ }
282
+ },
283
+ state .getConnection ().toString ()
284
+ );
285
+ } else {
286
+ try {
287
+ state .close ();
288
+ } catch (IOException e ) {
289
+
290
+ }
291
+ }
292
+ }
293
+
273
294
protected boolean needToDispatchIoError (final SocketChannelFrameHandlerState state ) {
274
295
return state .getConnection ().isOpen ();
275
296
}
276
297
277
298
protected void dispatchIoErrorToConnection (final SocketChannelFrameHandlerState state , final Throwable ex ) {
278
- // In case of recovery after the shutdown,
279
- // the new connection shouldn't be initialized in
280
- // the NIO thread, to avoid a deadlock.
281
- Runnable shutdown = new Runnable () {
299
+ dispatchShutdownToConnection (
300
+ new Runnable () {
282
301
283
- @ Override
284
- public void run () {
285
- try {
302
+ @ Override
303
+ public void run () {
286
304
state .getConnection ().handleIoError (ex );
287
- } catch (AssertionError e ) {
288
- LOGGER .warn ("Assertion error during error dispatching to connection: " + e .getMessage ());
289
305
}
290
- }
291
- };
292
- if (executorService () == null ) {
293
- String name = "rabbitmq-connection-shutdown-" + state .getConnection ();
294
- Thread shutdownThread = Environment .newThread (threadFactory (), shutdown , name );
295
- shutdownThread .start ();
296
- } else {
297
- executorService ().submit (shutdown );
298
- }
306
+ },
307
+ state .getConnection ().toString ()
308
+ );
299
309
}
300
310
301
311
protected void dispatchShutdownToConnection (final SocketChannelFrameHandlerState state ) {
302
- Runnable shutdown = new Runnable () {
303
- @ Override
304
- public void run () {
305
- state .getConnection ().doFinalShutdown ();
306
- }
307
- };
312
+ dispatchShutdownToConnection (
313
+ new Runnable () {
314
+
315
+ @ Override
316
+ public void run () {
317
+ state .getConnection ().doFinalShutdown ();
318
+ }
319
+ },
320
+ state .getConnection ().toString ()
321
+ );
322
+ }
323
+
324
+ protected void dispatchShutdownToConnection (Runnable connectionShutdownRunnable , String connectionName ) {
325
+ // In case of recovery after the shutdown,
326
+ // the new connection shouldn't be initialized in
327
+ // the NIO thread, to avoid a deadlock.
308
328
if (this .connectionShutdownExecutor != null ) {
309
- connectionShutdownExecutor .execute (shutdown );
329
+ connectionShutdownExecutor .execute (connectionShutdownRunnable );
310
330
} else if (executorService () != null ) {
311
- executorService ().execute (shutdown );
331
+ executorService ().execute (connectionShutdownRunnable );
312
332
} else {
313
- String name = "rabbitmq-connection-shutdown-" + state . getConnection () ;
314
- Thread shutdownThread = Environment .newThread (threadFactory (), shutdown , name );
333
+ String name = "rabbitmq-connection-shutdown-" + connectionName ;
334
+ Thread shutdownThread = Environment .newThread (threadFactory (), connectionShutdownRunnable , name );
315
335
shutdownThread .start ();
316
336
}
317
337
}
0 commit comments