[Flink] Fix PostponeFixedBucketChannelComputer routing all records to same channel#7737
Open
leaves12138 wants to merge 3 commits intoapache:masterfrom
Open
[Flink] Fix PostponeFixedBucketChannelComputer routing all records to same channel#7737leaves12138 wants to merge 3 commits intoapache:masterfrom
leaves12138 wants to merge 3 commits intoapache:masterfrom
Conversation
… same channel Previously, the channel() method used the total number of buckets (knownNumBuckets) as the bucket parameter for ChannelComputer.select(). Since all records in the same partition share the same total bucket count, they were all routed to the same downstream channel, causing only one subtask to process data in batch mode. This fix computes a per-record bucket by hashing the trimmedPrimaryKey and taking modulo numBuckets, so records with different primary keys are distributed across different channels/subtasks.
Contributor
|
@leaves12138 Add the test. |
JingsongLi
reviewed
May 3, 2026
| BinaryRow partition = partitionKeyExtractor.partition(record); | ||
| int bucket = knownNumBuckets.computeIfAbsent(partition, p -> numChannels); | ||
| int numBuckets = knownNumBuckets.computeIfAbsent(partition, p -> numChannels); | ||
| int hash = partitionKeyExtractor.trimmedPrimaryKey(record).hashCode(); |
Contributor
There was a problem hiding this comment.
Just use FixedBucketRowKeyExtractor?
Contributor
Author
There was a problem hiding this comment.
Fixed automatically by Codex. PostponeFixedBucketChannelComputer now reuses FixedBucketRowKeyExtractor, with FixedBucketRowKeyExtractor#bucket(int numBuckets) added so postpone fixed-bucket writes can still use the per-partition bucket count from knownNumBuckets.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fix a bug in
PostponeFixedBucketChannelComputerwhere all records in the same partition are routed to the same downstream channel in batch mode, causing only one subtask to actually process data.Root Cause
In the
channel()method, the variablebucketwas set to the total number of buckets for the partition (fromknownNumBuckets). Since this value is the same for all records in the same partition,ChannelComputer.select(partition, bucket, numChannels)always returns the same channel — all records go to one subtask.Fix
Compute a per-record bucket by hashing the
trimmedPrimaryKeyand taking modulonumBuckets. This distributes records with different primary keys across different channels/subtasks, similar to howChannelComputer.startChannelhandlesInteger.MIN_VALUE.Testing
Verified the logic matches the pattern used in
RowDataChannelComputer(fixed bucket tables) andPostponeBucketChannelComputer(streaming postpone tables).