-
Notifications
You must be signed in to change notification settings - Fork 258
Closed
Labels
enhancementNew feature or requestNew feature or request
Milestone
Description
flink 运行过程中由于taskmanager 报错导致整个flink job任务失败停机,而在jobManager的日志中只能看到以下异常(NoRestartException):
Caused by: com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) ~[?:?]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) ~[?:?]
at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:486) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$2(BaseRichOutputFormat.java:504) ~[?:?]
at java.util.ArrayList.forEach(Unknown Source) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:504) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$1(BaseRichOutputFormat.java:460) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
并不能直观了解到任务失败的直接原因。
以下是flink job 任务状态变化,仍然不能了解到任务失败原因
2025-05-09 00:28:32,983 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: 192.168.3.35:3306:dwddata -> Process -> (dwd_st_was_r_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: st_was_r, dwd_river_r_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: st_river_r, dwd_rsvr_r_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: st_rsvr_r, dwd_st_soil_r_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: st_soil_r, dwd_rain_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: st_pptn_r) (1/1) (70b2f99eefc30cc27eae85a8b2a50cba_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:34143-3ae566 @ localhost (dataPort=40245).
com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:105) ~[?:?]
at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:79) ~[?:?]
at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:140) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:486) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$2(BaseRichOutputFormat.java:504) ~[?:?]
at java.util.ArrayList.forEach(Unknown Source) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:504) ~[?:?]
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$1(BaseRichOutputFormat.java:460) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
因此需要将com.dtstack.chunjun.throwable.NoRestartException
异常类去掉,转而将task Manager 中的异常直接上报到jobManager中的日志中来
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request