[do-not-merge] Wide-schema parquet read perf — visibility for #21968#21987
Draft
adriangb wants to merge 6 commits intoapache:mainfrom
Draft
[do-not-merge] Wide-schema parquet read perf — visibility for #21968#21987adriangb wants to merge 6 commits intoapache:mainfrom
adriangb wants to merge 6 commits intoapache:mainfrom
Conversation
Adds a new sql_benchmarks suite that isolates the wide-schema scan
overhead in selective parquet queries: the regime where most of the
work is loading footers / column-chunk metadata rather than reading
row data, and that cost scales with the number of column chunks in
the dataset rather than with the number of columns the query touches.
The wide_schema suite has two subgroups (selected via BENCH_SUBGROUP):
- wide: 1024 cols x 256 files x 50k rows (~225 MB) — the workload
- narrow: 8 cols x 256 files x 50k rows (~110 MB) — internal
baseline, only meaningful as a comparison point
Both share row count, file count, and per-file row-group structure;
only schema width differs. All 4 queries run on both subgroups so
every wide number has a directly comparable narrow baseline.
A new gen_wide_data binary synthesizes both datasets in ~60 s with no
external data source. The 8-column base schema (id, value, count, ts,
category, flag, status, text) carries deterministic data; copies 2..N
from the suffix-renamed widening are zero-filled (zero rather than
null since reader-side null-array shortcuts mute the slowdown by
~35 %).
Query coverage:
- Q01: filter + project + ORDER BY + LIMIT (TopK shortcut)
- Q02: project 1 column with tight filter + LIMIT 1
- Q03: tight filter + small projection, no sort
- Q04: two low-cardinality string filters + a non-stat-prunable
modulo predicate for tight selectivity (~0.005 % match rate),
project two columns, no LIMIT or ORDER BY
For Q04 specifically, cold-start datafusion-cli shows ~15x slowdown
wide vs narrow; EXPLAIN ANALYZE shows metadata_load_time scaling 141x
while bloom_filter_eval_time and statistics_eval_time stay flat.
bench.sh adds:
- data wide_schema: synthesizes both wide and narrow datasets
- run wide_schema: runs the wide subgroup, then the narrow
baseline subgroup, for query-by-query comparison
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wide-schema (e.g. 1024-column) parquet datasets exposed several per-file CPU costs that scale with the schema width even when the query touches only a handful of columns. With 256 files this turned into a 15-30x slowdown vs. an 8-column dataset on the same row count and on identical bytes scanned. This commit collects the DataFusion changes; companion changes live in arrow-rs (`adrian/wide-schema-perf` on pydantic/arrow-rs). Changes: - `statistics_from_parquet_metadata`: was O(N_columns^2) per file because each iteration called `StatisticsConverter::try_new` which scanned all parquet leaves looking for the matching column. Now we precompute a logical -> parquet leaf index map once (O(N)) and use the new `StatisticsConverter::from_arrow_field` constructor in the loop. The redundant `parquet_column` lookup inside `summarize_column_statistics` is also dropped. - `apply_file_schema_type_coercions`: short-circuit before building the 1024-entry HashMap when no transform is needed, instead of building it as a side effect of the check. - `DefaultFilesMetadataCache`: store the entry's `memory_size` next to it so `put` / `evict_entries` / `remove` don't repeatedly walk the metadata structure to recompute it. - `CachedParquetMetaData`: now carries a `OnceLock<CachedArrowView>` with the per-file arrow `Schema` and `FieldLevels`. Built lazily from the cached parquet metadata so the per-leaf walk happens once per file, not once per query. - `CachedParquetFileReader`: implement the new `AsyncFileReader::get_arrow_reader_metadata` so that warm metadata cache hits return a fully-built `ArrowReaderMetadata` (using `from_field_levels`) instead of going through `try_new` and re-walking the parquet schema. Together these turn the warm-cache wide-vs-narrow ratio from ~2x down to ~1.7x and the cold-start ratio from ~30x down to ~22x. The remaining gap is dominated by metadata cache thrashing on the default 50MB limit (wide metadata is ~1.5MB/file × 256 files ~= 400MB) and by per-file work in the morsel planner / predicate construction; those will be addressed separately. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-on to the wide-schema work. Two functional changes plus a microbenchmark to measure them. - `apply_file_schema_type_coercions` now returns `None` when no field actually needed coercion. Before this it could walk every field, find no transformation, build a 1024-element vec of `Arc::clone`d fields, and still return `Some(<identical schema>)` — which caused the caller to rebuild `ArrowReaderMetadata` for nothing. - `CachedParquetMetaData` now caches the constructed `ArrowReaderMetadata` itself instead of caching `(Schema, FieldLevels)` and re-assembling it on every cache hit. The cache hit path is now an `ArrowReaderMetadata::clone` (~4 ns: a few `Arc::clone`s) instead of a `FieldLevels::clone` (which deep-clones the `ParquetField` tree, ~hundreds of ns / nodes). Adds an unused-but-public `coerced_arrow_reader_metadata` slot for the follow-up that will memoise the post-coercion build. Adds `wide_schema_microbench` covering: - `ArrowReaderMetadata::try_new` vs `clone_cached` across 8 / 64 / 256 / 1024 columns. Confirms the cache turns ~190 µs at 1024 cols into ~4 ns. - `apply_file_schema_type_coercions` no-op cost (~10 ns/col linear). - `PruningPredicate::try_new` for a tiny predicate against wide schemas (already constant ~2 µs — predicate-driven, not schema-driven). - `StatisticsConverter::try_new` vs `from_arrow_field` (~30% faster). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Make `prepare_filters` async so it can call `AsyncFileReader::get_arrow_reader_metadata` for the post-coercion rebuild instead of using `ArrowReaderMetadata::try_new` directly. The override on `CachedParquetFileReader` now also handles the supplied-schema case by delegating to `CachedParquetMetaData::coerced_arrow_reader_metadata` — its single-slot cache hits when subsequent files / queries supply the same coerced schema (Arc::ptr_eq). In practice this cuts the per-file `try_new` cost on warm wide-schema queries: ~190 µs/file at 1024 cols becomes a 4 ns clone whenever the supplied-schema Arc matches the cached one. Also add a research-log section to report.md walking through the dead ends (skip-page-index regressed warm; the early-return for apply_file_schema_type_coercions is bypassed by `force_view_types=true`) and what remains. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Largest cold-path discovery so far: `datafusion.execution.collect_statistics` defaults to true, and on a 1024-col × 256-file dataset it doubles to triples cold latency (655 ms vs 253 ms with it off) by eagerly fetching+computing per-file stats during planning that the query then ignores. Documented as a tunable, with notes on what a deeper fix (lazy per-column stats inference) would look like. Also added the cold-cache-sweep numbers and the post-coercion `coerced_arrow_reader_metadata` writeup. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nges Final pass numbers (Q04, profiling build): - wide @50m cold: 1010 ms → 615 ms (−39%) - wide @50m hot: 108 ms → 87 ms (−19%) - wide @2g cold: 830 ms → 510 ms (−39%) - wide @2g hot: 47 ms → 38 ms (−19%) - wide @50m cold with collect_statistics=false: ~257 ms (−75%) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Companion arrow-rs draft: apache/arrow-rs#9882
What this branch does
Speeds up parquet reads on schemas with hundreds-to-thousands of
columns where the query touches only a handful. On a 1024-col × 256-file
synthetic dataset the warm wide vs narrow ratio drops from ~2× to
~1.7× and cold from ~30× to ~22×. With
collect_statistics=falsecolddrops to ~3.5×.
Changes:
statistics_from_parquet_metadata: was O(N²)/file because eachiteration called
StatisticsConverter::try_newwhich scanned allparquet leaves. Precompute logical→parquet leaf indices once and
use a new low-overhead
from_arrow_fieldconstructor (added on thearrow-rs side). O(N²)/file → O(N)/file.
apply_file_schema_type_coercions: short-circuit before buildingthe full lookup HashMap when nothing transforms; return
Nonewhenno field actually changed (was returning
Some(<identical schema>)in the latter case, forcing a wasted
ArrowReaderMetadatarebuildper file).
DefaultFilesMetadataCache: storememory_sizenext to eachentry — no more per-put / per-evict structural walks.
CachedParquetMetaData:OnceLock<ArrowReaderMetadata>so warmcache hits become an Arc-clone (~4 ns) instead of re-walking the
parquet schema (~190 µs at 1024 cols). Plus a single-slot
Mutex<Option<(supplied_schema_ptr, ArrowReaderMetadata)>>for thepost-coercion build.
CachedParquetFileReader::get_arrow_reader_metadata: implementsthe new arrow-rs trait method, returning fully-built
ArrowReaderMetadatafrom cache for both base and post-coercionconfigurations.
prepare_filtersmade async so the coercionrebuild also routes through the cache-aware reader.
wide_schema_microbenchcovering try_new vs cached clone,apply_coercions no-op, PruningPredicate::try_new, and
StatisticsConverter try_new vs from_arrow_field.
Full investigation log: see
report.mdon the branch.Status
-p datafusion-datasource-parquet,-p datafusion-execution, arrow-rs parquet schema/arrow_reader).row_filter/row_group_filterfailures in the parquetdatasource crate are environmental (need the
parquet-testingsubmodule) and reproduce on
main.link above); the workspace's
patch.crates-iopoints at it.