Skip to content

Read CSV format text from stdin or memory#53

Closed
heymind wants to merge 1 commit intoapache:masterfrom
heymind:csvread
Closed

Read CSV format text from stdin or memory#53
heymind wants to merge 1 commit intoapache:masterfrom
heymind:csvread

Conversation

@heymind
Copy link
Copy Markdown
Contributor

@heymind heymind commented Apr 25, 2021

Which issue does this PR close?

Close Jira issue ARROW-12306

Rationale for this change

Let CsvFile and CsvExec support reading from a reader (not only files).

What changes are included in this PR?

This pr adds the following new pub functions:

  • CsvFile::try_new_from_reader
  • CsvFile::try_new_from_reader_infer_schema
  • CsvExec::try_new_from_reader
  • CsvStream::try_new_from_reader

Are there any user-facing changes?

No.

@heymind heymind closed this Apr 25, 2021
unkloud pushed a commit to unkloud/datafusion that referenced this pull request Mar 23, 2025
HairstonE pushed a commit to HairstonE/datafusion that referenced this pull request Oct 7, 2025
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 2, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 2, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 2, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 2, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 2, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 3, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 3, 2026
…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant