feat: Add Spark-compatible xxhash64 and murmur3 hash functions#19627
feat: Add Spark-compatible xxhash64 and murmur3 hash functions#19627andygrove wants to merge 12 commits intoapache:mainfrom
xxhash64 and murmur3 hash functions#19627Conversation
| SELECT xxhash64('hello'); | ||
| ---- | ||
| -4367754540140381902 |
There was a problem hiding this comment.
scala> spark.sql("SELECT xxhash64('hello')").show()
+--------------------+
| xxhash64(hello)|
+--------------------+
|-4367754540140381902|
+--------------------+
| SELECT xxhash64(1); | ||
| ---- | ||
| -7001672635703045582 |
There was a problem hiding this comment.
scala> spark.sql("SELECT xxhash64(cast(1 as long))").show()
+---------------------------+
|xxhash64(CAST(1 AS BIGINT))|
+---------------------------+
| -7001672635703045582|
+---------------------------+
| SELECT hash('hello'); | ||
| ---- | ||
| -1008564952 |
There was a problem hiding this comment.
scala> spark.sql("SELECT hash('hello')").show()
+-----------+
|hash(hello)|
+-----------+
|-1008564952|
+-----------+
|
@shehabgamin fyi |
| } | ||
| } | ||
|
|
||
| fn hash_column_murmur3(col: &ArrayRef, hashes: &mut [u32]) -> Result<()> { |
There was a problem hiding this comment.
It looks like support for DataType::Dictionary may be missing. In the Sail codebase, we copied the logic from Comet, where the Dictionary type is handled. However, I’m not sure whether Comet’s implementation has changed since we copied it.
In Sail, the relevant logic can be found here:
- https://github.com/lakehq/sail/blob/540fb8350ab676dfd0c302fafb4176b11fb0ee84/crates/sail-function/src/scalar/hash/spark_murmur3_hash.rs#L68
- https://github.com/lakehq/sail/blob/540fb8350ab676dfd0c302fafb4176b11fb0ee84/crates/sail-function/src/scalar/hash/utils.rs#L12
Based on the attribution comments in those files, the corresponding Comet sources appear to come from the following commit:
- https://github.com/apache/datafusion-comet/blob/bfd7054c02950219561428463d3926afaf8edbba/native/spark-expr/src/spark_hash.rs
- https://github.com/apache/datafusion-comet/blob/bfd7054c02950219561428463d3926afaf8edbba/native/spark-expr/src/scalar_funcs/hash_expressions.rs#L28-L70
There was a problem hiding this comment.
Except the dictionary type, it seems that the FixedSizeBinary is not handled as well.
There was a problem hiding this comment.
Thanks, I'll address this in the next few days. Moving to draft for now.
|
@andygrove Thanks for pinging and mentioning. The PR is in good shape and I'm glad it's contributed back to the upstream. |
| #[inline] | ||
| fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 { | ||
| XxHash64::oneshot(seed, data.as_ref()) | ||
| } |
There was a problem hiding this comment.
| #[inline] | |
| fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 { | |
| XxHash64::oneshot(seed, data.as_ref()) | |
| } | |
| #[inline] | |
| fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: i64) -> i64 { | |
| XxHash64::oneshot(seed as u64, data.as_ref()) as i64 | |
| } |
I wonder if it's worth doing this to make it easier to compute the resulting i64 array, without needing to convert the u64 hash vec to an i64 vec (unless compiler optimizes this away anyway 🤔 )
There was a problem hiding this comment.
I took a look at this, and it may make the code a little cleaner, but also increases the size of the diff on this PR. Perhaps we could consider this as a follow-on cleanup?
| #[inline] | ||
| fn mix_k1(mut k1: i32) -> i32 { | ||
| k1 = k1.mul_wrapping(0xcc9e2d51u32 as i32); | ||
| k1 = k1.rotate_left(15); | ||
| k1.mul_wrapping(0x1b873593u32 as i32) | ||
| } | ||
|
|
There was a problem hiding this comment.
Do we need to provide a link to where this source code was extracted from? I'm assuming it was ported from some other implementation?
There was a problem hiding this comment.
@advancedxy do you remember where this came from?
There was a problem hiding this comment.
This is the original implementation of MurmurHash3: https://github.com/aappleby/smhasher/blob/0ff96f7835817a27d0487325b6c16033e2992eb5/src/MurmurHash3.cpp#L102
There was a problem hiding this comment.
Instead of the specific line, we could just leave a comment at the top referencing that file.
There was a problem hiding this comment.
@advancedxy do you remember where this came from?
The murmur3 hash was added in the repo in the first initial PR/commit, maybe @sunchao knows/remembers where this is originally came from.
However I believe Spark's murmur3 hash is based Guava's Murmur3_32HashFunction, which this murmur3 hash code might also reference.
There was a problem hiding this comment.
Sorry! I also don't quite remember where does this came from :( I searched a few possible places where I think it could be from, but none of them match.
Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
|
Hey, may I ask if there are plans to support |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Use args.number_rows instead of manual row count detection - Use ColumnarValue::values_to_arrays in murmur3_hash - Remove big-endian cfg conditional (arrow-rs doesn't target big endian) - Add source attribution comment for MurmurHash3 algorithm Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests Extract shared type-dispatch logic into a `create_hashes_internal!` macro in utils.rs (ported from Comet's hash_funcs/utils.rs), adding null-count fast paths and support for List, LargeList, FixedSizeList, Struct, and Map types. Refactor xxhash64 and murmur3_hash to use the shared macro, reducing ~200 lines of duplicated type dispatch per file. Add unit tests for boundary values, emoji/CJK strings, float edge cases, struct and list hashing, and corresponding SLT coverage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
I'm not planning on working on this since Spark does not support it. |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@shehabgamin @advancedxy @Jefffrey @mbutrovich I have addressed most of the feedback and also updated this PR to reflect changes in Comet's implementation since this PR was created, so complex types are now fully supported and there are performance improvements and some macros added to reduce duplicate code. I will mark as ready for review once CI is green. |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| let result_array = Int32Array::from(result); | ||
|
|
||
| if num_rows == 1 { | ||
| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some( | ||
| result_array.value(0), | ||
| )))) | ||
| } else { | ||
| Ok(ColumnarValue::Array(Arc::new(result_array))) | ||
| } |
There was a problem hiding this comment.
nit: I think we could move the result array initialization to the else branch to avoid the Int32Array allocation for one element row calls.
|
|
||
| // Convert to Int64 | ||
| let result: Vec<i64> = hashes.into_iter().map(|h| h as i64).collect(); | ||
| let result_array = Int64Array::from(result); |
advancedxy
left a comment
There was a problem hiding this comment.
2 minor comments, which could be addressed in a follow-up mr.
LGTM.
| if !first_col { | ||
| let unpacked = take(dict_array.values().as_ref(), dict_array.keys(), None)?; | ||
| hash_column_murmur3(&unpacked, hashes, false)?; | ||
| } else { |
There was a problem hiding this comment.
Whats the reason for this difference in behaviour for first_col?
|
|
||
| /// Spark-compatible murmur3 hash algorithm | ||
| #[inline] | ||
| pub fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: u32) -> u32 { |
There was a problem hiding this comment.
| pub fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: u32) -> u32 { | |
| fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: u32) -> u32 { |
| first_col: bool, | ||
| ) -> Result<()> { | ||
| // Handle Dictionary types separately (turbofish syntax not supported in macros) | ||
| if let DataType::Dictionary(key_type, _) = col.data_type() { |
There was a problem hiding this comment.
Could we use downcast_dictionary_array here to cut down some boilerplate?
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
|
I'm going to open fresh PRs for the two hash functions |
Which issue does this PR close?
Rationale for this change
Donate some hash functions from Comet so that other projects can benefit from them.
The functions were initially implemented in Comet by @advancedxy
What changes are included in this PR?
I used Claude Code to copy the code from Comet and add slt tests. I manually verified that the expected values match Spark for a few cases just to be sure that the code is correct.
Are these changes tested?
Yes, tests are part of the PR.
Are there any user-facing changes?
No