Support convert_to_state for AVG accumulator#11734
Conversation
|
There is something strange going on with AVG in this query -- it is giving different answers when convert to state is enabled vs not. Maybe it is due to float rounding, but I am not confident |
Kind of expected -- result set is sorted by I've got different results after two consecutive runs even on main branch (without any skipped aggregation). |
|
I debugged this a bit more and I think the issue may be that |
|
Oh, the |
7020dcf to
42daa94
Compare
42daa94 to
b60e1aa
Compare
b60e1aa to
f05c4cd
Compare
|
This PR is failing like the following when running on benchmarks on TPCH. I think there may be a bug related to types in the intermediates. I will keep debugging Update: turns out it is q18: (venv) andrewlamb@Andrews-MBP-2:~/Software/datafusion/benchmarks/data/tpch_sf1$ /Users/andrewlamb/Software/datafusion/datafusion-cli/target/debug/datafusion-cli -f ../../queries/q18.sqlUpdate: filed real issue here #11832 |
f05c4cd to
e3eb80f
Compare
0c4df2e to
f3bedc0
Compare
|
LGTM, thank you @alamb. Regarding q28 slowdown from PR description -- I suppose it's not a stable slowdown, and just a result on single benchmark run (since the regexp over |
That is my understanding too. I also have high hopes that the StringView work will make that query in particualr faster as well |
2010YOUY01
left a comment
There was a problem hiding this comment.
This looks great, thank you
| /// │false│ │ │NULL │ │NULL │ | ||
| /// │false│ │true │ │true │ | ||
| /// └─────┘ └─────┘ └─────┘ | ||
| /// array opt_filter output nulls |
There was a problem hiding this comment.
Looks like output nulls has typo, should be false; true; false; false; fasle?
There was a problem hiding this comment.
Yes you are correct -- thank you for catching that. I fixed it in 149406b
| 4 11 14 | ||
| 5 8 7 | ||
|
|
||
| # Test avg for tinyint / float |
There was a problem hiding this comment.
just wondering why the test is only for tinyint / floats?
There was a problem hiding this comment.
The idea was that AVG the accumulator is already tested elsewhere -- this test is only to exercise the partial aggregate skipping logic
|
Thanks @comphead -- we are making progress here slowly -- but I am pretty stoked to see it 🚀 |
|
Nice work 🎉 |
|
I am very excited to get the @Rachelint is also working on some good stuff. |
## Which issue does this PR close? - Part of apache#17964. ## Rationale for this change SparkAvg's AvgGroupsAccumulator doesn't implement supports_convert_to_state (defaults to false), which prevents the skip-partial-aggregation optimization from kicking in for queries that use Spark's avg(). I ran into this while benchmarking a Spark Connect engine built on DataFusion. On TPC-H q17 at SF10, the partial aggregate for avg(l_quantity) grouped by l_partkey (~2M groups out of 60M rows) was not triggering skip-aggregation: | Metric | Without convert_to_state | With convert_to_state | |--------|-------------------------|-----------------------| | Partial aggregate memory | 923 MB | 40 MB | | Partial aggregate elapsed | 4.75s | 109ms | The skip-aggregation probe (apache#11627) detects when a partial aggregate isn't reducing cardinality and falls back to passing rows through as state directly. This needs convert_to_state so the accumulator can produce [sum, count] state arrays from raw input. The built-in Avg already has this (apache#11734), but it wasn't carried over when SparkAvg was migrated from Comet in apache#17871. ## What changes are included in this PR? Adds convert_to_state() and supports_convert_to_state() to AvgGroupsAccumulator in datafusion-spark. Follows the same approach as the built-in Avg, adapted for SparkAvg's differences: - State order is [sum, count] (vs [count, sum] in the built-in) - Count type is Int64 (vs UInt64 in the built-in) - Null handling uses NullBuffer::union directly instead of pulling in datafusion-functions-aggregate-common as a dep Also cleaned up the fully-qualified arrow::array::BooleanArray references in update_batch / merge_batch since adding BooleanArray to the import block triggered the unused_qualifications lint. ## Are these changes tested? Yes, unit tests covering basic conversion, null propagation, filter handling, and a roundtrip through merge_batch to verify the converted state produces correct results end-to-end. ## Are there any user-facing changes? No. Queries using avg() through the Spark function registry will automatically benefit from skip-partial-aggregation on high-cardinality groupings. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Note: There are ~20 lines of code in this PR, the rest is docmentation and tests
Which issue does this PR close?
Avgaggregate: implementconvert_to_state#11816Rationale for this change
To take advantage of the benefits of #11627 a new method must be implemented for each
GroupsAccumulator.At least one ClickBench query (the one in #6937) uses
AVGso let's implement thatWhat changes are included in this PR?
convert_to_stateforAVGaccumulatorAre these changes tested?
Yes with new unit tests
Performance benchmarks:
Clickbench on the whole looks better
Interestingly my benchmark shows Q31 and Q32 get faster (they both have avg):
datafusion/benchmarks/queries/clickbench/queries.sql
Lines 32 to 33 in 4e278ca
But Q28 gets slower
datafusion/benchmarks/queries/clickbench/queries.sql
Line 29 in 4e278ca
Details
Are there any user-facing changes?
Faster performance