Skip to content

[Flink] Fix PostponeFixedBucketChannelComputer routing all records to same channel#7737

Open
leaves12138 wants to merge 3 commits intoapache:masterfrom
leaves12138:fix-postpone-fixed-bucket-channel-computer
Open

[Flink] Fix PostponeFixedBucketChannelComputer routing all records to same channel#7737
leaves12138 wants to merge 3 commits intoapache:masterfrom
leaves12138:fix-postpone-fixed-bucket-channel-computer

Conversation

@leaves12138
Copy link
Copy Markdown
Contributor

Summary

Fix a bug in PostponeFixedBucketChannelComputer where all records in the same partition are routed to the same downstream channel in batch mode, causing only one subtask to actually process data.

Root Cause

In the channel() method, the variable bucket was set to the total number of buckets for the partition (from knownNumBuckets). Since this value is the same for all records in the same partition, ChannelComputer.select(partition, bucket, numChannels) always returns the same channel — all records go to one subtask.

Fix

Compute a per-record bucket by hashing the trimmedPrimaryKey and taking modulo numBuckets. This distributes records with different primary keys across different channels/subtasks, similar to how ChannelComputer.startChannel handles Integer.MIN_VALUE.

Testing

Verified the logic matches the pattern used in RowDataChannelComputer (fixed bucket tables) and PostponeBucketChannelComputer (streaming postpone tables).

… same channel

Previously, the channel() method used the total number of buckets
(knownNumBuckets) as the bucket parameter for ChannelComputer.select().
Since all records in the same partition share the same total bucket count,
they were all routed to the same downstream channel, causing only one
subtask to process data in batch mode.

This fix computes a per-record bucket by hashing the trimmedPrimaryKey
and taking modulo numBuckets, so records with different primary keys are
distributed across different channels/subtasks.
@JingsongLi
Copy link
Copy Markdown
Contributor

@leaves12138 Add the test.

BinaryRow partition = partitionKeyExtractor.partition(record);
int bucket = knownNumBuckets.computeIfAbsent(partition, p -> numChannels);
int numBuckets = knownNumBuckets.computeIfAbsent(partition, p -> numChannels);
int hash = partitionKeyExtractor.trimmedPrimaryKey(record).hashCode();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use FixedBucketRowKeyExtractor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed automatically by Codex. PostponeFixedBucketChannelComputer now reuses FixedBucketRowKeyExtractor, with FixedBucketRowKeyExtractor#bucket(int numBuckets) added so postpone fixed-bucket writes can still use the per-partition bucket count from knownNumBuckets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants