[fix](streaming-job) recompute derived fields after replay and ALTER#62936
Conversation
|
/review |
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes StreamingInsertJob recovery/ALTER behavior by re-deriving transient scheduling and data-quality window fields from jobProperties.max_interval, ensuring consistent behavior after image replay and after ALTER PROPERTIES.
Changes:
- Add
recomputeDerivedFields()to rebuildsampleWindowMs, refreshtimerDefinition.interval, and reset sampling-window counters. - Invoke derived-field recomputation from
init(),gsonPostProcess(), andmodifyPropertiesInternal(). - Add unit tests covering derived-field recomputation and ALTER refresh behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Centralizes and re-applies derived-field computation on create/replay/ALTER paths. |
| fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java | Adds tests verifying recomputation updates the sample window and scheduler interval, and resets counters. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
run external |
c5c43ce to
0ce6766
Compare
|
run buildall |
2 similar comments
|
run buildall |
|
run buildall |
|
run cloud_p0 |
1 similar comment
|
run cloud_p0 |
|
/review |
There was a problem hiding this comment.
Review completed for the full PR diff. I did not find additional distinct issues beyond the existing inline review threads already raised for StreamingInsertJob:
modifyPropertiesInternal()/recomputeDerivedFields()updates are not protected by the job write lock whilecommitOffset()uses the same sampling fields under the write lock.sampleWindowMsand related interval calculations can overflow for very largemax_intervalvalues because validation currently has no upper bound.
I did not add duplicate inline comments for those already-known threads.
Critical checkpoint conclusions:
- Goal: The PR addresses the intended replay/ALTER bug by centralizing recomputation of
sampleWindowMs, scheduler interval, and sampling counters fromjobProperties.max_intervalon create, image replay, and ALTER paths. - Scope: The code change is small and focused, with one helper and three call sites.
- Concurrency: The modified ALTER path touches shared job sampling/timer state; the known existing thread covers the missing job-level write lock. No separate concurrency issue was found.
- Lifecycle/replay:
gsonPostProcess()now rebuilds the non-persisted derived fields afterjobPropertiesis restored; I did not find another replay incompatibility in this patch. - Configuration/compatibility: No new config item or wire/storage format is introduced. The behavior change is limited to applying existing
max_intervalconsistently. - Parallel paths: Create, ALTER, and gson replay paths are covered by the new helper; task creation reads the refreshed
jobPropertiesas before. - Tests: Added unit coverage checks recomputation, ALTER refresh, and counter reset. The two existing review threads identify remaining missing/unsafe cases not covered by these tests.
- Persistence/transactions/data correctness: No transaction visibility or EditLog serialization format change was introduced; the existing job update log persists the updated job state after ALTER.
- Observability/performance: No new logging or metrics appear necessary for this small derived-state recomputation; no additional performance issue was found.
- User focus: No additional user-provided review focus was specified.
…62936) ### What problem does this PR solve? Problem Summary: `StreamingInsertJob` initializes two derived fields from `jobProperties.max_interval` in `init()`: - `sampleWindowMs` = `max_interval * 10 * 1000` — used by `checkDataQuality()` for the `load.max_filter_ratio` time window - `jobConfig.timerDefinition.interval` = `max_interval` — used by `JobScheduler` to compute next trigger time Neither is persisted in the gson image, and neither is refreshed in two paths: 1. **gson replay (`gsonPostProcess`)**: after FE checkpoint restart, `sampleWindowMs` stays at default `0`. The time-window check `(now - sampleStartTime) > sampleWindowMs` is then always true, so the sample window expires on every commit. The window-accumulation contract used by `load.max_filter_ratio` degrades to single-batch judgment, and a job recovered from image can be wrongly paused on a small bad batch that should be diluted by the surrounding window. 2. **ALTER PROPERTIES (`modifyPropertiesInternal`)**: changing `max_interval` only updates `properties` and `jobProperties`. Neither `sampleWindowMs` nor `timerDefinition.interval` is refreshed. The scheduler keeps reading the old interval (the new value never reaches `JobExecutionConfiguration.getTriggerDelayTimes`), so ALTER `max_interval` never takes effect — not even after FE restart, since image carries the stale `interval` too. ### Fix Extract a single `recomputeDerivedFields()` that re-derives all transient state from `jobProperties`: - `sampleWindowMs = maxIntervalSec * 10 * 1000` - `timerDefinition.interval = maxIntervalSec` - reset `sampleStartTime` / `sampleWindowScannedRows` / `sampleWindowFilteredRows` Call it at every entry point where `jobProperties` is rebuilt: - `init()` (job creation) - `gsonPostProcess()` (image replay) - `modifyPropertiesInternal()` (ALTER PROPERTIES) Resetting the sample counters on ALTER is intentional: changing `max_interval` redefines the window itself, so accumulated counts from the old window have no meaningful interpretation in the new one. ### Release note Fix streaming insert job sample window and scheduler interval not being restored after FE checkpoint replay or ALTER PROPERTIES.
What problem does this PR solve?
Problem Summary:
StreamingInsertJobinitializes two derived fields fromjobProperties.max_intervalininit():sampleWindowMs=max_interval * 10 * 1000— used bycheckDataQuality()for theload.max_filter_ratiotime windowjobConfig.timerDefinition.interval=max_interval— used byJobSchedulerto compute next trigger timeNeither is persisted in the gson image, and neither is refreshed in two paths:
gson replay (
gsonPostProcess): after FE checkpoint restart,sampleWindowMsstays at default0. The time-window check(now - sampleStartTime) > sampleWindowMsis then always true, so the sample window expires on every commit. The window-accumulation contract used byload.max_filter_ratiodegrades to single-batch judgment, and a job recovered from image can be wrongly paused on a small bad batch that should be diluted by the surrounding window.ALTER PROPERTIES (
modifyPropertiesInternal): changingmax_intervalonly updatespropertiesandjobProperties. NeithersampleWindowMsnortimerDefinition.intervalis refreshed. The scheduler keeps reading the old interval (the new value never reachesJobExecutionConfiguration.getTriggerDelayTimes), so ALTERmax_intervalnever takes effect — not even after FE restart, since image carries the staleintervaltoo.Fix
Extract a single
recomputeDerivedFields()that re-derives all transient state fromjobProperties:sampleWindowMs = maxIntervalSec * 10 * 1000timerDefinition.interval = maxIntervalSecsampleStartTime/sampleWindowScannedRows/sampleWindowFilteredRowsCall it at every entry point where
jobPropertiesis rebuilt:init()(job creation)gsonPostProcess()(image replay)modifyPropertiesInternal()(ALTER PROPERTIES)Resetting the sample counters on ALTER is intentional: changing
max_intervalredefines the window itself, so accumulated counts from the old window have no meaningful interpretation in the new one.Release note
Fix streaming insert job sample window and scheduler interval not being restored after FE checkpoint replay or ALTER PROPERTIES.
Check List (For Author)
Test
Behavior changed:
load.max_filter_ratio. (2) ALTERmax_intervalnow takes effect on the scheduler within the next batch tick (~10 minutes); previously it was silently ignored.Does this need documentation?
Check List (For Reviewer who merge this PR)