Skip to content

[do-not-merge] Wide-schema parquet read perf — visibility for #21968#21987

Draft
adriangb wants to merge 6 commits intoapache:mainfrom
pydantic:adrian/wide-schema-perf
Draft

[do-not-merge] Wide-schema parquet read perf — visibility for #21968#21987
adriangb wants to merge 6 commits intoapache:mainfrom
pydantic:adrian/wide-schema-perf

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

@adriangb adriangb commented May 2, 2026

⚠️ This PR is not meant to be merged as-is. It exists to make the
code changes from a wide-schema-perf investigation visible from
#21968 — a more
careful breakout into landable PRs (and an upstream of the arrow-rs
companion changes) is the next step. Posting as a draft so the diff
is browseable.

Companion arrow-rs draft: apache/arrow-rs#9882

What this branch does

Speeds up parquet reads on schemas with hundreds-to-thousands of
columns where the query touches only a handful. On a 1024-col × 256-file
synthetic dataset the warm wide vs narrow ratio drops from ~2× to
~1.7× and cold from ~30× to ~22×. With collect_statistics=false cold
drops to ~3.5×.

Changes:

  • statistics_from_parquet_metadata: was O(N²)/file because each
    iteration called StatisticsConverter::try_new which scanned all
    parquet leaves. Precompute logical→parquet leaf indices once and
    use a new low-overhead from_arrow_field constructor (added on the
    arrow-rs side). O(N²)/file → O(N)/file.
  • apply_file_schema_type_coercions: short-circuit before building
    the full lookup HashMap when nothing transforms; return None when
    no field actually changed (was returning Some(<identical schema>)
    in the latter case, forcing a wasted ArrowReaderMetadata rebuild
    per file).
  • DefaultFilesMetadataCache: store memory_size next to each
    entry — no more per-put / per-evict structural walks.
  • CachedParquetMetaData: OnceLock<ArrowReaderMetadata> so warm
    cache hits become an Arc-clone (~4 ns) instead of re-walking the
    parquet schema (~190 µs at 1024 cols). Plus a single-slot
    Mutex<Option<(supplied_schema_ptr, ArrowReaderMetadata)>> for the
    post-coercion build.
  • CachedParquetFileReader::get_arrow_reader_metadata: implements
    the new arrow-rs trait method, returning fully-built
    ArrowReaderMetadata from cache for both base and post-coercion
    configurations. prepare_filters made async so the coercion
    rebuild also routes through the cache-aware reader.
  • New wide_schema_microbench covering try_new vs cached clone,
    apply_coercions no-op, PruningPredicate::try_new, and
    StatisticsConverter try_new vs from_arrow_field.

Full investigation log: see report.md on the branch.

Status

  • All targeted tests pass (-p datafusion-datasource-parquet,
    -p datafusion-execution, arrow-rs parquet schema/arrow_reader).
  • The 16 row_filter / row_group_filter failures in the parquet
    datasource crate are environmental (need the parquet-testing
    submodule) and reproduce on main.
  • This depends on the arrow-rs branch with companion changes (see PR
    link above); the workspace's patch.crates-io points at it.

adriangb and others added 6 commits May 1, 2026 10:42
Adds a new sql_benchmarks suite that isolates the wide-schema scan
overhead in selective parquet queries: the regime where most of the
work is loading footers / column-chunk metadata rather than reading
row data, and that cost scales with the number of column chunks in
the dataset rather than with the number of columns the query touches.

The wide_schema suite has two subgroups (selected via BENCH_SUBGROUP):

  - wide:   1024 cols x 256 files x 50k rows (~225 MB) — the workload
  - narrow:    8 cols x 256 files x 50k rows (~110 MB) — internal
              baseline, only meaningful as a comparison point

Both share row count, file count, and per-file row-group structure;
only schema width differs. All 4 queries run on both subgroups so
every wide number has a directly comparable narrow baseline.

A new gen_wide_data binary synthesizes both datasets in ~60 s with no
external data source. The 8-column base schema (id, value, count, ts,
category, flag, status, text) carries deterministic data; copies 2..N
from the suffix-renamed widening are zero-filled (zero rather than
null since reader-side null-array shortcuts mute the slowdown by
~35 %).

Query coverage:

  - Q01: filter + project + ORDER BY + LIMIT (TopK shortcut)
  - Q02: project 1 column with tight filter + LIMIT 1
  - Q03: tight filter + small projection, no sort
  - Q04: two low-cardinality string filters + a non-stat-prunable
         modulo predicate for tight selectivity (~0.005 % match rate),
         project two columns, no LIMIT or ORDER BY

For Q04 specifically, cold-start datafusion-cli shows ~15x slowdown
wide vs narrow; EXPLAIN ANALYZE shows metadata_load_time scaling 141x
while bloom_filter_eval_time and statistics_eval_time stay flat.

bench.sh adds:
  - data wide_schema:  synthesizes both wide and narrow datasets
  - run wide_schema:   runs the wide subgroup, then the narrow
                       baseline subgroup, for query-by-query comparison

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wide-schema (e.g. 1024-column) parquet datasets exposed several
per-file CPU costs that scale with the schema width even when the
query touches only a handful of columns. With 256 files this turned
into a 15-30x slowdown vs. an 8-column dataset on the same row count
and on identical bytes scanned. This commit collects the DataFusion
changes; companion changes live in arrow-rs (`adrian/wide-schema-perf`
on pydantic/arrow-rs).

Changes:

- `statistics_from_parquet_metadata`: was O(N_columns^2) per file
  because each iteration called `StatisticsConverter::try_new` which
  scanned all parquet leaves looking for the matching column. Now we
  precompute a logical -> parquet leaf index map once (O(N)) and use
  the new `StatisticsConverter::from_arrow_field` constructor in the
  loop. The redundant `parquet_column` lookup inside
  `summarize_column_statistics` is also dropped.
- `apply_file_schema_type_coercions`: short-circuit before building
  the 1024-entry HashMap when no transform is needed, instead of
  building it as a side effect of the check.
- `DefaultFilesMetadataCache`: store the entry's `memory_size` next
  to it so `put` / `evict_entries` / `remove` don't repeatedly walk
  the metadata structure to recompute it.
- `CachedParquetMetaData`: now carries a `OnceLock<CachedArrowView>`
  with the per-file arrow `Schema` and `FieldLevels`. Built lazily
  from the cached parquet metadata so the per-leaf walk happens once
  per file, not once per query.
- `CachedParquetFileReader`: implement the new
  `AsyncFileReader::get_arrow_reader_metadata` so that warm metadata
  cache hits return a fully-built `ArrowReaderMetadata` (using
  `from_field_levels`) instead of going through `try_new` and
  re-walking the parquet schema.

Together these turn the warm-cache wide-vs-narrow ratio from ~2x down
to ~1.7x and the cold-start ratio from ~30x down to ~22x. The
remaining gap is dominated by metadata cache thrashing on the default
50MB limit (wide metadata is ~1.5MB/file × 256 files ~= 400MB) and by
per-file work in the morsel planner / predicate construction; those
will be addressed separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-on to the wide-schema work. Two functional changes plus a
microbenchmark to measure them.

- `apply_file_schema_type_coercions` now returns `None` when no field
  actually needed coercion. Before this it could walk every field, find
  no transformation, build a 1024-element vec of `Arc::clone`d fields,
  and still return `Some(<identical schema>)` — which caused the caller
  to rebuild `ArrowReaderMetadata` for nothing.
- `CachedParquetMetaData` now caches the constructed `ArrowReaderMetadata`
  itself instead of caching `(Schema, FieldLevels)` and re-assembling it
  on every cache hit. The cache hit path is now an `ArrowReaderMetadata::clone`
  (~4 ns: a few `Arc::clone`s) instead of a `FieldLevels::clone` (which
  deep-clones the `ParquetField` tree, ~hundreds of ns / nodes).
  Adds an unused-but-public `coerced_arrow_reader_metadata` slot for the
  follow-up that will memoise the post-coercion build.

Adds `wide_schema_microbench` covering:
- `ArrowReaderMetadata::try_new` vs `clone_cached` across 8 / 64 / 256 /
  1024 columns. Confirms the cache turns ~190 µs at 1024 cols into ~4 ns.
- `apply_file_schema_type_coercions` no-op cost (~10 ns/col linear).
- `PruningPredicate::try_new` for a tiny predicate against wide schemas
  (already constant ~2 µs — predicate-driven, not schema-driven).
- `StatisticsConverter::try_new` vs `from_arrow_field` (~30% faster).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Make `prepare_filters` async so it can call
`AsyncFileReader::get_arrow_reader_metadata` for the post-coercion
rebuild instead of using `ArrowReaderMetadata::try_new` directly. The
override on `CachedParquetFileReader` now also handles the
supplied-schema case by delegating to
`CachedParquetMetaData::coerced_arrow_reader_metadata` — its single-slot
cache hits when subsequent files / queries supply the same coerced
schema (Arc::ptr_eq).

In practice this cuts the per-file `try_new` cost on warm wide-schema
queries: ~190 µs/file at 1024 cols becomes a 4 ns clone whenever the
supplied-schema Arc matches the cached one.

Also add a research-log section to report.md walking through the dead
ends (skip-page-index regressed warm; the early-return for
apply_file_schema_type_coercions is bypassed by `force_view_types=true`)
and what remains.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Largest cold-path discovery so far: `datafusion.execution.collect_statistics`
defaults to true, and on a 1024-col × 256-file dataset it doubles to
triples cold latency (655 ms vs 253 ms with it off) by eagerly
fetching+computing per-file stats during planning that the query then
ignores. Documented as a tunable, with notes on what a deeper fix
(lazy per-column stats inference) would look like.

Also added the cold-cache-sweep numbers and the post-coercion
`coerced_arrow_reader_metadata` writeup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nges

Final pass numbers (Q04, profiling build):
- wide @50m cold: 1010 ms → 615 ms (−39%)
- wide @50m hot:  108 ms → 87 ms  (−19%)
- wide @2g cold:  830 ms → 510 ms (−39%)
- wide @2g hot:   47 ms → 38 ms   (−19%)
- wide @50m cold with collect_statistics=false: ~257 ms (−75%)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 2, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
error: `cargo metadata` exited with an error: error: failed to load source for dependency `arrow`

Caused by:
  Unable to update /home/arrow-rs/arrow

Caused by:
  failed to read `/home/arrow-rs/arrow/Cargo.toml`

Caused by:
  No such file or directory (os error 2)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate execution Related to the execution crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant