Skip to content

[FLINK-38267] Only call channel state rescaling logic for exchange with channel state to avoid UnsupportedOperationException #26931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

1996fanrui
Copy link
Member

@1996fanrui 1996fanrui commented Aug 20, 2025

What is the purpose of the change

Job cannot be recovered from UC(unaligned checkpoint) after rescaling, and the exception is:

java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. 
Did you change the partitioner to forward or rescale? 
It may also help to add an explicit shuffle().

When one upstream task has multiple output exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale).

Or when one downstream task has multiple input exchanges, which including UC SUPPORTED exchanges(likes hash or rebalance) and at least one UC UNSUPPORTED exchanges(likes Forward or rescale).

See more from FLINK-38267

Brief change log

The implemented solution was to make the state redistribution logic more granular by checking for in-flight data on a per-exchange basis instead of a per-task basis.

  • [FLINK-38267][checkpoint] Refactor hasInputState and hasOutputState related logic in TaskStateAssignment
    • Precise State Tracking: The TaskStateAssignment class was refactored to no longer use a simple boolean flag. It now precisely tracks which specific input gates and result partitions contain in-flight data.
  • [FLINK-38267] Only call channel state rescaling logic for exchange with channel state to avoid UnsupportedOperationException
    • Per-Channel/Partition Checks: The core redistribution methods, reDistributeInputChannelStates and reDistributeResultSubpartitionStates, were modified. Their internal logic now iterates through each input gate or output partition and uses new helper methods (hasInFlightDataForInputGate and hasInFlightDataForResultPartition) to check if that specific channel has state.
    • Conditional Logic: The state redistribution logic is now wrapped in a conditional block. It is only invoked for a channel if the per-exchange check passes. This ensures that stateless exchanges (like forward or rescale) are correctly skipped, avoiding the exception.

This approach fixes the bug by applying the redistribution logic only where it is actually needed, allowing jobs with mixed partitioner types to rescale from an unaligned checkpoint successfully.

Verifying this change

Doing

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 20, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui force-pushed the 38267/fix-UC-multi-exchanges branch 2 times, most recently from 1e6b2e7 to 4f49de1 Compare August 20, 2025 20:39
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to do some manual testing in the meantime

new InflightDataGateOrPartitionRescalingDescriptor(
new int[0],
RescaleMappings.identity(0, 0),
java.util.Collections.emptySet(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe it can be imported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! And updated

Comment on lines 509 to 510
Arrays.asList(inputResult.getProducer().getProducedDataSets())
.indexOf(inputResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that this is working all the time because IntermediateResult has no valid equals implementation and just uses == on the references?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the current code implementation, it works well since we do not create multiple IntermediateResult for same object.

But I think it is not a good way to find the partitionIndex, I have updated the code that find partitionIndex via the IntermediateDataSetID.

Comment on lines 532 to 533
int gateIndex =
downstreamAssignment.executionJobVertex.getInputs().indexOf(producedResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

* an unaligned checkpoint.
*/
@RunWith(Parameterized.class)
public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through the existing and these new UC tests which are providing excellent coverage. What I've not yet found is cases which skip rescaling when an edge has no in-flight/channel state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ITCase has 4 DAGs, and all of them have forward or rescale exchange.

These 2 exchanges does not support unaligned checkpoint, resulting in the corresponding edge has no in-flight/channel state.

Comment on lines 145 to 155
// Decrease the memory segment size to avoid the test is so slow for some reasons:
// 1. Recovery phase needs to consume all inflight buffers
// 2. Forward or rescale exchange does not support unaligned checkpoint.
conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("1 kb"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet understand these comments, could you elaborate on this? On my side if I remove it then it makes tests just flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet understand these comments, could you elaborate on this?

I added more comments, I hope it is clear.

On my side if I remove it then it makes tests just flaky.

Sorry, do you mean is it slow or failed on your local? As I understand, it get slower, but should not be failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remove the mentioned line then majority of the time the last test fails, but in general it's flaky🤷🏻‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to execute the class 4-5 times to see the following failure:

java.lang.IllegalStateException: Job has entered FAILED state, but expecting [RUNNING]

Copy link
Member Author

@1996fanrui 1996fanrui Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to run this ITCase 10 times on my local after removing conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("1 kb"));, it works well.

Could you please mvn install this PR in your local, and try run test again. If the flink-runtime code is master branch, this ITCase will be failed (It is exactly why I introduce this ITCase, it could reproduce the FLINK-38267 bug). I encountered this issue during development as well, and it works well after installing this PR to local.

If it is still failed after re-installing, would you mind providing the detailed exception info and exception stack? why Job has entered FAILED state? Thank you very much!

@@ -204,15 +266,16 @@ public OperatorSubtaskState getSubtaskState(OperatorInstanceID instanceID) {
return assignment.getInputMapping(assignmentIndex, recompute);
},
outputSubtaskMappings,
this::getOutputMapping))
this::getOutputMapping,
false))
.build();
}

public boolean hasUpstreamOutputStates() {
if (hasUpstreamOutputStates == null) {
hasUpstreamOutputStates =
Arrays.stream(getUpstreamAssignments())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue/risk is not introduced here but maybe we could add a check while we're hanging around here. Namely getUpstreamAssignments is depending on executionJobVertex.getInputs() which is nullable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not completely understand this comment.

Do you mean to add checkState(executionJobVertex.getInputs() != null) or something like that here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's enough to be on the safe side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionJobVertex#inputs is @Nullable, but ExecutionJobVertex#getInputs is not sincegetInputs() checked the isInitialized(), and inputs never be null once is isInitialized.

    public List<IntermediateResult> getInputs() {
        checkState(isInitialized());
        return inputs;
    }

WDYT?

@1996fanrui 1996fanrui marked this pull request as ready for review August 21, 2025 13:40
Copy link
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, all comments are addressed.

new InflightDataGateOrPartitionRescalingDescriptor(
new int[0],
RescaleMappings.identity(0, 0),
java.util.Collections.emptySet(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! And updated

Comment on lines 509 to 510
Arrays.asList(inputResult.getProducer().getProducedDataSets())
.indexOf(inputResult);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the current code implementation, it works well since we do not create multiple IntermediateResult for same object.

But I think it is not a good way to find the partitionIndex, I have updated the code that find partitionIndex via the IntermediateDataSetID.

Comment on lines 532 to 533
int gateIndex =
downstreamAssignment.executionJobVertex.getInputs().indexOf(producedResult);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

* an unaligned checkpoint.
*/
@RunWith(Parameterized.class)
public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogger {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ITCase has 4 DAGs, and all of them have forward or rescale exchange.

These 2 exchanges does not support unaligned checkpoint, resulting in the corresponding edge has no in-flight/channel state.

Comment on lines 145 to 155
// Decrease the memory segment size to avoid the test is so slow for some reasons:
// 1. Recovery phase needs to consume all inflight buffers
// 2. Forward or rescale exchange does not support unaligned checkpoint.
conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("1 kb"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet understand these comments, could you elaborate on this?

I added more comments, I hope it is clear.

On my side if I remove it then it makes tests just flaky.

Sorry, do you mean is it slow or failed on your local? As I understand, it get slower, but should not be failed.

@@ -204,15 +266,16 @@ public OperatorSubtaskState getSubtaskState(OperatorInstanceID instanceID) {
return assignment.getInputMapping(assignmentIndex, recompute);
},
outputSubtaskMappings,
this::getOutputMapping))
this::getOutputMapping,
false))
.build();
}

public boolean hasUpstreamOutputStates() {
if (hasUpstreamOutputStates == null) {
hasUpstreamOutputStates =
Arrays.stream(getUpstreamAssignments())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not completely understand this comment.

Do you mean to add checkState(executionJobVertex.getInputs() != null) or something like that here?

.getInfos().stream()
.mapToInt(ResultSubpartitionInfo::getPartitionIdx);
} else {
throw new IllegalStateException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be driven? If so please could you add a test to drive this throw?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no uncovered implementation so can't be tested now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a defense mechanism to warn developers if other subclasses are introduced in the future.

Unless we introduce a test subclass for this testing.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Aug 21, 2025
… exchange with channel state to avoid UnsupportedOperationException
@1996fanrui 1996fanrui force-pushed the 38267/fix-UC-multi-exchanges branch from 6d7bec8 to 2e58646 Compare August 21, 2025 18:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants