Skip to content

proto: serialize and dedupe dynamic filters v2#21807

Merged
adriangb merged 6 commits intoapache:mainfrom
jayshrivastava:js/dedupe-dynamic-filter-inner-state-v2
May 1, 2026
Merged

proto: serialize and dedupe dynamic filters v2#21807
adriangb merged 6 commits intoapache:mainfrom
jayshrivastava:js/dedupe-dynamic-filter-inner-state-v2

Conversation

@jayshrivastava
Copy link
Copy Markdown
Contributor

@jayshrivastava jayshrivastava commented Apr 23, 2026

Which issue does this PR close?

Informs: datafusion-contrib/datafusion-distributed#180
Closes: #20418

Rationale for this change

Consider you have a plan with a HashJoinExec and DataSourceExec

HashJoinExec(dynamic_filter_1 on a@0)
  (...left side of join)
  ProjectionExec(a := Column("a", source_index))
    DataSourceExec
      ParquetSource(predicate = dynamic_filter_2)

You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning:

  1. When you deserialize the plan, both the HashJoinExec and DataSourceExec should have pointers to the same DynamicFilterPhysicalExpr
  2. The DynamicFilterPhysicalExpr should be updated during execution by the HashJoinExec and the DataSourceExec should filter out rows

This does not happen today for a few reasons, a couple of which this PR aims to address

  1. DynamicFilterPhysicalExpr is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as Literal) due to the PhysicalExpr::snapshot() API
  2. Even if DynamicFilterPhysicalExpr survives round-tripping, the one pushed down to the DataSourceExec often has different children. In this case, you have two DynamicFilterPhysicalExpr which
    do not survive deduping, causing referential integrity to be lost.

What changes are included in this PR?

This PR aims to fix those problems by:

  1. Removing the snapshot() call from the serialization process
  2. Adding protos for DynamicFilterPhysicalExpr so it can be serialized and deserialized
  3. Removing Arc-based deduplication. We now only dedupe on
    expression_id if the PhysicalExpr reports a expression_id.
    After this change, only DynamicFilterPhysicalExpr reports an expression_id
    to be deduped.
  4. expression_id is now just a random u64. Since a given query likely
    only has a few DynamicFilterPhysicalExpr instances, the odds of a
    collision are very low
  5. There's no need for a DedupingSerializer anymore since the
    expression_id is already stored in the dynamic filter proto itself

Future work:

  1. Serialize dynamic filters in HashJoinExec, AggregateExec and SortExec
  2. Add tests which actually execute plans after deserialization and assert that dynamic filtering is functional
  3. Add proto converters to the PhysicalExtensionCodec trait so implementors can utilize deduping logic

Are these changes tested?

  • adds tests which roundtrip dynamic filters and assert that referential
    integrity is maintained
  • removes tests that test Arc-based deduplication and session id
    rotation since we don't support that anymore

Are there any user-facing changes?

  • The default codec does not call snapshot() on PhysicalExpr during serialization anymore. This means that DynamicFilterPhysicalExpr are now serialized and deserialized without snapshotting.
  • All PhysicalExpr are not deduped anymore. Only DynamicFilterPhysicalExpr is

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates proto Related to proto crate physical-plan Changes to the physical-plan crate labels Apr 23, 2026
@jayshrivastava jayshrivastava force-pushed the js/dedupe-dynamic-filter-inner-state-v2 branch from 004aa52 to 23fc7f1 Compare April 23, 2026 23:33
@github-actions github-actions Bot removed the physical-plan Changes to the physical-plan crate label Apr 23, 2026
Comment thread datafusion/proto/src/physical_plan/mod.rs
@jayshrivastava jayshrivastava force-pushed the js/dedupe-dynamic-filter-inner-state-v2 branch 2 times, most recently from 64b5889 to c042c08 Compare April 24, 2026 00:14
@jayshrivastava jayshrivastava changed the title Js/dedupe dynamic filter inner state v2 proto: serialize and dedupe dynamic filters Apr 24, 2026
@jayshrivastava jayshrivastava marked this pull request as ready for review April 24, 2026 00:17
@jayshrivastava jayshrivastava changed the title proto: serialize and dedupe dynamic filters proto: serialize and dedupe dynamic filters v2 Apr 24, 2026
Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs Outdated
Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs Outdated
Comment thread datafusion/proto/proto/datafusion.proto Outdated
Comment thread datafusion/proto/src/physical_plan/mod.rs
Comment thread datafusion/proto/src/physical_plan/to_proto.rs
Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs
Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs
Copy link
Copy Markdown
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking very nice @jayshrivastava 👍🏻

Lets leave this open for a day to see if anyone has thoughts on the public APIs being tweaked.

cc @stuhood iirc you mentioned you also plan to ser/de dynamic filters across process (but not node) boundaries

Comment thread datafusion/physical-expr-common/src/physical_expr.rs
Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs
@jayshrivastava
Copy link
Copy Markdown
Contributor Author

jayshrivastava commented Apr 24, 2026

@adriangb I've addressed the comments in the last commit. Let me know what you think!

@jayshrivastava
Copy link
Copy Markdown
Contributor Author

Oh TYFR! That was fast

Lets leave this open for a day to see if anyone has thoughts on the public APIs being tweaked.

No problem. I can merge some time next week 👍🏽

@adriangb adriangb force-pushed the js/dedupe-dynamic-filter-inner-state-v2 branch from 586648b to 09c2d34 Compare April 24, 2026 19:47
@adriangb
Copy link
Copy Markdown
Contributor

FYI I hit rebase

Copy link
Copy Markdown
Contributor

@stuhood stuhood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread datafusion/physical-expr-common/src/physical_expr.rs
Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs Outdated
pub is_complete: bool,
}

// TODO: Include expression_id in debug output.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally even in the DisplayAs representation for plans, I would think. It's actually possibly the most important thing to have in the plan when rendering a dynamic filter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep this PR small, so I hid the expression_id to avoid 1 test that was failing. I have code ready to address this TODO which I plan to publish as a next step after this PR is merged. This PR is just an incremental step.

// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
// like `HashJoinExec`, `AggregateExec`,  `SortExec` do not serialize their
// dynamic filter. This causes round trips to fail on the `expression_id`
// because it is regenerated on deserialization.

Comment on lines +401 to +406
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
/// proto deserialization to preserve `expression_id` across a roundtrip
/// rather than minting a fresh one.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be generic via the shared state proposal? Not sure what is up with that. Ditto fn inner.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared state discussion seems promising! I'm hoping it will be an easy migration. I imagine that we would end up storing all Inners in the TaskContext instead of in the actual DynamicFilterPhysicalExpr and looking up the Inners via the expression_id. It's still an ongoing discussion though so I can't be sure how the migration will look.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that would make sense to me

Comment thread datafusion/physical-expr/Cargo.toml Outdated
itertools = { workspace = true, features = ["use_std"] }
parking_lot = { workspace = true }
petgraph = "0.8.3"
rand = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the conventions are, but: introducing random order to the physical-expr crate seems like a big deal? Is this being used to generate something that could be deterministic from some sort of context instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question! I was thinking about a few alternatives 😄

  • We could hash all the exprs in the DynamicFilterPhysicalExpr to get an id. However, this wouldn't solve the shared Inner state linking problem. We don't want this type of identifier - we want an identifier for the inner state specifically.
  • We could use the Arc address of the Inner struct, but it's a bit of a smell to rely on pointer addresses - for example, IDs derived from Arc pointers are only valid until the Arc is dropped. This is what the old code used and something I used as well in initial versions of this PR

A rand u64 is not bad. Realistically, we just need to not have a collision between distinct dynamic filters in a plan. I figure that the probability of more than 2 or 3 distinct dynamic filters in a query must be very low already. And the probability of 2 or 3 rand u64s colliding is negligible.

Lmk what you think!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm less worried about collisions, and more worried about non-determinism (causing flaky tests, different plans to be generated randomly, etc). The other annoying aspect of random ids is that they are huge. If you go ahead and actually render this everywhere, a random ID is going to take up a lot more space than one generated on a context (...starting from 0, etc).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the ids are also random (generated from a mashup of arc pointer address, process id, etc.). So in that sense it's no better or worse. But I do agree something deterministically generated from context would be better.

The only alternative that occurs to me is a process level atomic. Not sure if that might cause some locking, etc. Do you have any other suggestions?

Lastly: we can always change this. As long as there is no API contract on what this number is going to be we could replace it at any point if it becomes a problem.

Informs: datafusion-contrib/datafusion-distributed#180
Closes: apache#20418

Consider you have a plan with a `HashJoinExec` and `DataSourceExec`
```
HashJoinExec(dynamic_filter_1 on a@0)
  (...left side of join)
  ProjectionExec(a := Column("a", source_index))
    DataSourceExec
      ParquetSource(predicate = dynamic_filter_2)
```

You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning:
1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr`
2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec`  and the `DataSourceExec` should filter out rows

This does not happen today for a few reasons, a couple of which this PR aims to address
1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API
2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which
do not survive deduping, causing referential integrity to be lost.

This PR aims to fix those problems by:
1. Removing the `snapshot()` call from the serialization process
2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized
3. Removing `Arc`-based deduplication. We now only dedupe on
   `expression_id` if the `PhysicalExpr` reports a `expression_id`.
   After this change, only `DynamicFilterPhysicalExpr` reports an `expression_id`
   to be deduped.
4. `expression_id` is now just a random u64. Since a given query likely
   only has a few `DynamicFilterPhysicalExpr` instances, the odds of a
   collision are very low
5. There's no need for a `DedupingSerializer` anymore since the
   `expression_id` is already stored in the dynamic filter proto itself

Testing
- adds tests which roundtrip dynamic filters and assert that referential
  integrity is maintained
- removes tests that test `Arc`-based deduplication and session id
  rotation
@adriangb
Copy link
Copy Markdown
Contributor

@stuhood I think based on the convo your review is not blocking but just wanted to check before we go ahead with this? @jayshrivastava do you think you could ping @LiaCastaneda to take a critical eye at this? We're on iteration 2 of this whole system so I'm really hoping we can get it right this time 🙏🏻

@LiaCastaneda
Copy link
Copy Markdown
Contributor

👋 Sorry, I’ve been a bit detached from dynamic filtering. I’ll make some time to take a look at this today.

Copy link
Copy Markdown
Contributor

@LiaCastaneda LiaCastaneda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach makes sense to me, It's nice that we don't need two expression_ids just for dynamic filters, and DynamicFilterPhysicalExpr is handled like any other expression during serialization. I have a few API style questions, but I think the overall approach is much cleaner now. Thanks for the hard work Jay and Adrian!

Comment on lines +86 to +90
/// **Warning:** exposed publicly solely so that proto (de)serialization in
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
/// or its layout as a stable API.
#[derive(Clone)]
pub struct Inner {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hide this somehow instead of using a warning comment? Exposing this leaves the door open to callers messing with the inner dynamic filter's Arc count -- which isn't great either. It's strange to me that Rust doesn't provide a way to do this currently.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alas, yes, I hate exposing this. But as far as I can tell there is no way around it: #21835.

There's a huge, ugly PR up at #21949.

Would appreciate any input on the issue or PR if you have time.

impl Inner {
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
Self {
expression_id: random::<u64>(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use something more standard like uuid or an atomic counter across all expressions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +892 to 895
// When serializing, this is set via `PhysicalExpr::expression_id`. When deserializing,
// this id is used by the `DeduplicatingProtoConverter` to preserve referential
// integrity across serde roundtrips for different expressions with the same id.
optional uint64 expr_id = 30;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't remember we had this already 🤔 we had an id in the proto but not in the trait, I guess this suggests the original intent was always to have expressions carry their own identity in the trait as well

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to remove snapshot() api now that it's not used? (maybe as a follow up) i think the only expression that implements it is the dynamic filter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still used in Parquet pruning. But I agree it never served as many purposes as we thought it would, it's probably best to remove it and add a downcast for DynamicFilterPhysicalExpr or something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this as a quick follow up. Will post a PR momentarily.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened this PR #21975. The base is main, so only the last commit is relevant.

@adriangb
Copy link
Copy Markdown
Contributor

@jayshrivastava since both Lía and Stu suggested the same thing (use a deterministic incrementing counter) wdyt of adding something like:

use std::sync::atomic::{AtomicUsize, Ordering};

struct ExpressionIdCounter {
    inner: AtomicUsize,
}

impl ExpressionIdCounter {
    pub const fn new(initial_value: usize) -> Self {
        Self {
            inner: AtomicUsize::new(initial_value),
        }
    }

  pub fn get_next(&self) -> usize {
          self.inner.fetch_add(1, Ordering::Relaxed)
      }
}

static EXPR_ID: ExpressionIdCounter = ExpressionIdCounter::new(0);

I'd keep it private for now, but if we want to re-use it we just make it pub and move it.

I don't see any other blockers to merging this!

@stuhood
Copy link
Copy Markdown
Contributor

stuhood commented Apr 30, 2026

@stuhood I think based on the convo your review is not blocking but just wanted to check before we go ahead with this?

Definitely not blocking! Still building context.

@jayshrivastava
Copy link
Copy Markdown
Contributor Author

@adriangb I added deterministic expression ids 👍🏽

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 1, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning origin/main
    Building datafusion-physical-expr v53.1.0 (current)
       Built [  31.711s] (current)
     Parsing datafusion-physical-expr v53.1.0 (current)
      Parsed [   0.047s] (current)
    Building datafusion-physical-expr v53.1.0 (baseline)
       Built [  25.680s] (baseline)
     Parsing datafusion-physical-expr v53.1.0 (baseline)
      Parsed [   0.046s] (baseline)
    Checking datafusion-physical-expr v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.411s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  59.932s] datafusion-physical-expr
    Building datafusion-physical-expr-common v53.1.0 (current)
       Built [  20.531s] (current)
     Parsing datafusion-physical-expr-common v53.1.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-physical-expr-common v53.1.0 (baseline)
       Built [  20.170s] (baseline)
     Parsing datafusion-physical-expr-common v53.1.0 (baseline)
      Parsed [   0.022s] (baseline)
    Checking datafusion-physical-expr-common v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.264s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  42.538s] datafusion-physical-expr-common
    Building datafusion-proto v53.1.0 (current)
       Built [  55.046s] (current)
     Parsing datafusion-proto v53.1.0 (current)
      Parsed [   0.144s] (current)
    Building datafusion-proto v53.1.0 (baseline)
       Built [  54.815s] (baseline)
     Parsing datafusion-proto v53.1.0 (baseline)
      Parsed [   0.145s] (baseline)
    Checking datafusion-proto v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   2.199s] 222 checks: 221 pass, 1 fail, 0 warn, 30 skip

--- failure enum_variant_added: enum variant added on exhaustive enum ---

Description:
A publicly-visible enum without #[non_exhaustive] has a new variant.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#enum-variant-new
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.47.0/src/lints/enum_variant_added.ron

Failed in:
  variant ExprType:DynamicFilter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1393
  variant ExprType:DynamicFilter in /home/runner/work/datafusion/datafusion/datafusion/proto/src/generated/prost.rs:1393

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [ 115.358s] datafusion-proto

jayshrivastava added a commit to jayshrivastava/datafusion that referenced this pull request May 1, 2026
See apache#21807

`PhysicalExpr::snapshot` isn't required because it only applies to
dynamic filters. The only place that its used is in
`pruning_predicate.rs`. Since there's only one use case, we can just
downcast to `DynamicFilterPhysicalExpr` and call `current()`.

Remove `PhysicalExpr::snapshot` and associated code.

Yes, the existing tests should cover the exact same functionality.
Filter pushdown still workers with dynamic filters; we use `current()`
instead of `snapshot()` to capture the expression.

Yes. `PhysicalExpr::snapshot()` no longer exists. Users should
downcast to `DynamicFilterPhysicalExpr` and call `current()` intead of
calling `snapshot()`.

Similarly, in `datafusion/ffi`, `FFI_PhysicalExpr` no longer has a
`snapshot()` method.
@adriangb
Copy link
Copy Markdown
Contributor

adriangb commented May 1, 2026

Thanks @jayshrivastava !

@adriangb adriangb added this pull request to the merge queue May 1, 2026
Merged via the queue into apache:main with commit 948cd09 May 1, 2026
40 checks passed
jayshrivastava added a commit to jayshrivastava/datafusion that referenced this pull request May 1, 2026
See apache#21807

`PhysicalExpr::snapshot` isn't required because it only applies to
dynamic filters. The only place that its used is in
`pruning_predicate.rs`. Since there's only one use case, we can just
downcast to `DynamicFilterPhysicalExpr` and call `current()`.

Remove `PhysicalExpr::snapshot` and associated code.

Yes, the existing tests should cover the exact same functionality.
Filter pushdown still workers with dynamic filters; we use `current()`
instead of `snapshot()` to capture the expression.

Yes. `PhysicalExpr::snapshot()` no longer exists. Users should
downcast to `DynamicFilterPhysicalExpr` and call `current()` intead of
calling `snapshot()`.

Similarly, in `datafusion/ffi`, `FFI_PhysicalExpr` no longer has a
`snapshot()` method.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-expr Changes to the physical-expr crates proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Serialize dynamic filters across network boundaries

5 participants