Read CSV format text from stdin or memory#54
Conversation
alamb
left a comment
There was a problem hiding this comment.
I think this PR is good -- the only thing that is wonky to me is that if you Clone one of these readers it won't work. However, in this case the error will be clear so I think this PR can be merged and if anyone cares about the Clone behavior we can fix it in a follow on PR.
Thanks again @heymind
What do you think @Dandandan / @andygrove ?
Codecov Report
@@ Coverage Diff @@
## master #54 +/- ##
==========================================
- Coverage 76.24% 76.20% -0.05%
==========================================
Files 134 134
Lines 23051 23199 +148
==========================================
+ Hits 17576 17679 +103
- Misses 5475 5520 +45
Continue to review full report at Codecov.
|
| Path(String), | ||
|
|
||
| /// Read CSV data from a reader | ||
| Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>), |
There was a problem hiding this comment.
How would this look if we would support multiple readers/partitions?
There was a problem hiding this comment.
Also I am wondering, if we do it like this, we would we need to do the same for json/xml/etc sources that should support essentially the same to avoid reimplementing it for each format.
There was a problem hiding this comment.
I suggest we file a follow on ticket for this work (supporting Reader input for JSON / XML sources). As you say I don't think it needs to be part of this PR.
|
Looks good to me, thanks @heymind . |
fix: More dangling references (apache#54) * fix: More dangling references * test: Add tests for remove_dangling_identifiers UPSTREAM NOTE: This PR was attempted to be upstreamed in apache#13405 - but it was not accepted due to the complexity it brought. Phillip needs to figure out what a good solution that solves our problem and can be upstreamed is.
fix: More dangling references (apache#54) * fix: More dangling references * test: Add tests for remove_dangling_identifiers UPSTREAM NOTE: This PR was attempted to be upstreamed in apache#13405 - but it was not accepted due to the complexity it brought. Phillip needs to figure out what a good solution that solves our problem and can be upstreamed is.
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
Migrate from apache/arrow#10066.
Which issue does this PR close?
Close Jira issue ARROW-12306. Closes #198
Rationale for this change
Let CsvFile and CsvExec support reading from a reader (not only files)
What changes are included in this PR?
This pr adds the following new pub functions:
CsvFile::try_new_from_readerCsvFile::try_new_from_reader_infer_schemaCsvExec::try_new_from_readerCsvStream::try_new_from_readerAre there any user-facing changes?
No.