Skip to content

[python] Add self-contained Ray datasource and top-level read_paimon/write_paimon API#7740

Open
TheR1sing3un wants to merge 7 commits intoapache:masterfrom
TheR1sing3un:py-ray-datasource-top-level
Open

[python] Add self-contained Ray datasource and top-level read_paimon/write_paimon API#7740
TheR1sing3un wants to merge 7 commits intoapache:masterfrom
TheR1sing3un:py-ray-datasource-top-level

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

@TheR1sing3un TheR1sing3un commented Apr 29, 2026

Purpose

Today, reading a Paimon table into a Ray Dataset requires the caller to first build a TableRead by hand:

catalog = CatalogFactory.create({"warehouse": ...})
table = catalog.get_table("db.table")
rb = table.new_read_builder()
read = rb.new_read()
splits = rb.new_scan().plan().splits()
ds = read.to_ray(splits)

That works for the existing TableRead.to_ray() helper, but it forces every user to repeat the same catalog → table → builder boilerplate. The Iceberg integration has long had a single-line IcebergDatasource(table_identifier, catalog_options, ...), and that's the missing surface here.

This PR makes RayDatasource self-contained and adds a top-level facade so reading and writing a Paimon table from Ray is one call.

Datasource refactor

  • RayDatasource(table_identifier, catalog_options, predicate=None, projection=None, limit=None) is the new constructor. The catalog, table, splits and read_type are loaded lazily via @property (table, splits, read_type), so the object is cheap to instantiate and easy to ship across Ray workers.
  • The legacy entry point — TableRead.to_ray() — keeps working through a new RayDatasource._from_table_read(table_read, splits) classmethod that wraps an already-resolved (table_read, splits) pair without a second catalog round-trip.
  • The read closure is hoisted to a module-level _paimon_read_task generator that yields one Arrow table per batch. This:
    • avoids the closure-over-self serialization overhead documented in ray-project/ray#49107;
    • keeps memory proportional to one batch rather than the whole split chunk.

New module: pypaimon.ray

from pypaimon.ray import read_paimon, write_paimon

ds = read_paimon("db.table", catalog_options={"warehouse": "/path"})
write_paimon(ds, "db.table", catalog_options={"warehouse": "/path"})

read_paimon(table_identifier, catalog_options, *, filter=None, projection=None, limit=None, ray_remote_args=None, concurrency=None, override_num_blocks=None, **read_args) -> ray.data.Dataset

write_paimon(dataset, table_identifier, catalog_options, *, overwrite=False, concurrency=None, ray_remote_args=None) -> None

Linked issue

N/A — surfaced when wiring pypaimon into a Ray-based ingestion pipeline alongside Iceberg, where users expected a read_paimon/write_paimon facade analogous to read_iceberg/write_iceberg.

Tests

New pypaimon/tests/ray_integration_test.py (8 cases):

  • test_read_paimon_basic — round-trip of three rows.
  • test_read_paimon_with_projection — projection narrowing.
  • test_read_paimon_with_filter — predicate pushdown via PredicateBuilder.
  • test_read_paimon_empty_table — empty datasets work.
  • test_write_paimon_basic — write-then-read round-trip via the facade.
  • test_write_paimon_overwriteoverwrite=True replaces data.
  • test_read_paimon_primary_key — PK upsert + merge-on-read row count and values.
  • test_read_paimon_invalid_override_num_blocks — guards override_num_blocks < 1.

Existing ray_data_test / ray_sink_test (which exercise TableRead.to_ray() and PaimonDatasink directly) continue to pass via the _from_table_read bridge — no behavioural change for those paths.

Local: pytest pypaimon/tests/{ray_data_test,ray_sink_test,ray_integration_test}.py → 26 passed; flake8 --config=dev/cfg.ini clean.

API and format

Public Python API additions (pypaimon.ray.read_paimon, pypaimon.ray.write_paimon). The existing RayDatasource constructor signature changes shape (now takes table_identifier + catalog_options instead of table_read + splits). The only in-tree caller of the old form, TableRead.to_ray(), is migrated to RayDatasource._from_table_read() in the same change. No external project should be importing RayDatasource directly today, but the bridge classmethod is kept available for callers that do.

No file format change.

Documentation

Public docstrings on RayDatasource, read_paimon, and write_paimon describe the new contract and parameters.

Generative AI disclosure

Drafted with assistance from an AI coding tool; the design follows Iceberg's IcebergDatasource pattern, and every behavioural guarantee made by the new facade is exercised by a test in ray_integration_test.py.

…write_paimon API

The existing RayDatasource needs the caller to first build a TableRead
(via Catalog -> Table -> ReadBuilder -> TableRead) and then pass it in
together with the planned splits. That is fine for the
TableRead.to_ray() helper, but it means there is no single-line API for
"read this Paimon table into a Ray Dataset" — every user has to repeat
the catalog/table/builder boilerplate.

Mirror Iceberg's IcebergDatasource: make RayDatasource self-contained
and add a top-level facade.

  * RayDatasource(table_identifier, catalog_options, predicate=None,
    projection=None, limit=None) is the new constructor. The catalog,
    table, splits and read_type are loaded lazily via @Property, so the
    object is cheap to instantiate and easy to ship across Ray workers.
  * The legacy entry point (TableRead.to_ray) keeps working through a
    new RayDatasource._from_table_read(table_read, splits) classmethod
    that wraps an already-resolved (table_read, splits) pair without a
    second catalog round-trip.
  * Refactor the read closure into a module-level _paimon_read_task
    generator that yields one Arrow table per batch. This avoids the
    closure-over-self serialization overhead (ray-project/ray#49107) and
    keeps memory proportional to one batch instead of the whole chunk.

Add the new pypaimon.ray module:

  * pypaimon.ray.read_paimon(table_identifier, catalog_options, *,
    filter=None, projection=None, limit=None, ray_remote_args=None,
    concurrency=None, override_num_blocks=None, **read_args) ->
    ray.data.Dataset.
  * pypaimon.ray.write_paimon(dataset, table_identifier, catalog_options,
    *, overwrite=False, concurrency=None, ray_remote_args=None) -> None.

Tests:

  * New pypaimon/tests/ray_integration_test.py covers basic read,
    column projection, predicate pushdown, empty table, basic write,
    overwrite, primary-key upsert + read-back, and the
    override_num_blocks<1 guard.
  * Existing ray_data_test / ray_sink_test (which exercise
    TableRead.to_ray and PaimonDatasink directly) continue to pass via
    the _from_table_read bridge — no behavioural change for those
    paths.
Comment thread paimon-python/pypaimon/read/datasource/ray_datasource.py Outdated
Comment thread paimon-python/pypaimon/read/datasource/ray_datasource.py Outdated
Comment thread paimon-python/pypaimon/tests/ray_integration_test.py
Three review comments from JingsongLi on apache#7740:

1. Extract _ensure_planned() helper. The splits and read_type properties
   each duplicated `if self._x is None: self._plan()`; both now route
   through a single _ensure_planned() that runs the ReadBuilder plan
   once and populates both fields together.

2. _from_table_read no longer bypasses __init__ via cls.__new__. Added
   a private _resolved=(table, splits, read_type) sentinel parameter to
   __init__; when supplied the catalog/identifier path is skipped and
   the pre-resolved values are used directly. _from_table_read now
   forwards through __init__, so any future field added to __init__ is
   automatically initialized for both construction paths. Validation
   added for the public path (table_identifier and catalog_options
   required when _resolved is None).

3. Added test_read_paimon_with_limit to ray_integration_test.py: writes
   10 rows across two partitions (forces two raw-convertible splits)
   and asserts limit=3 causes the scan plan to drop the second split,
   so the resulting Ray Dataset row count is < 10. The full unbounded
   read serves as a sanity baseline (count == 10). The assertion uses
   `< 10` rather than `== N` because Paimon's scan-time limit is a
   per-split cap (whole-split granularity), not a row-exact hard limit
   — row-exact short-circuiting in the reader is a separate follow-up.

Tests: pypaimon/tests/ray_integration_test.py 9/9 pass.
Lint: flake8 clean.
@TheR1sing3un
Copy link
Copy Markdown
Member Author

Thanks for the review! Addressed all three comments in c4e86cd:

  1. _ensure_planned() helpersplits and read_type properties now share a single entry point that runs the ReadBuilder plan once and populates both fields together, instead of each property doing its own if x is None: self._plan() check.

  2. _from_table_read no longer bypasses __init__ — added a private _resolved=(table, splits, read_type) sentinel parameter to __init__. When supplied, the catalog/identifier path is skipped and the pre-resolved values are used directly. _from_table_read now forwards through __init__, so any future field added to __init__ is automatically initialized for both construction paths. Also added validation: table_identifier and catalog_options are required when _resolved is None.

  3. limit test case — added test_read_paimon_with_limit to ray_integration_test.py. Writes 10 rows across two partitions (forces two raw-convertible splits) and asserts limit=3 causes the scan to drop the second split (Ray Dataset row count < 10), with the full unbounded read as sanity baseline. The assertion uses < 10 rather than exact == N because Paimon's scan-time limit is per-split (whole-split granularity at this layer); row-exact short-circuiting in the reader is a separate follow-up.

Tests: pypaimon/tests/ray_integration_test.py 9/9 pass, flake8 clean. Ready for re-review.

@TheR1sing3un TheR1sing3un requested a review from JingsongLi April 30, 2026 07:25
def __init__(
self,
table_identifier: Optional[str] = None,
catalog_options: Optional[Dict[str, str]] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should do some abstraction, such as providing a SplitProvider to obtain splits.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should do some abstraction, such as providing a SplitProvider to obtain splits.

Do you mean to extract the logic of how to find the corresponding read splits through identifiers, predicates, limits, and projections into a separate method? To achieve possible reuse in the future? For example, the integration of the high level into the daft engine

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean you should do some abstraction, introduce SplitProvider ABC, and implement two implementations.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean you should do some abstraction, introduce SplitProvider ABC, and implement two implementations.

Thank you for your reminder. I'll make it happen

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean you should do some abstraction, introduce SplitProvider ABC, and implement two implementations.

done~

TheR1sing3un pushed a commit to TheR1sing3un/incubator-paimon that referenced this pull request May 2, 2026
Per JingsongLi's feedback on apache#7740 (line 79), the catalog -> table ->
ReadBuilder -> Scan -> splits chain that lived inline in RayDatasource
is now hidden behind a SplitProvider ABC with two implementations:

- CatalogSplitProvider: builds the chain from a table identifier and
  catalog options. Used by the public read_paimon facade. Caches the
  ReadBuilder plan so splits / read_type are resolved together exactly
  once, mirroring the previous _ensure_planned helper.
- PreResolvedSplitProvider: wraps an already-resolved (table, splits,
  read_type, predicate) tuple. Used by the TableRead.to_ray bridge to
  skip the catalog round-trip.

RayDatasource.__init__ now takes a SplitProvider via a keyword-only
split_provider= argument; the public table_identifier / catalog_options
path constructs CatalogSplitProvider for callers transparently. The
previous _resolved sentinel is removed in favor of this cleaner
injection point.

display_name() is added to the ABC so RayDatasource.get_name() can stay
provider-agnostic instead of branching on isinstance.

Tests: split_provider_test.py covers both implementations - lazy
planning + caching, identifier/options validation, propagation of
predicate / projection / limit (limit verified by split-pruning on a
multi-commit table), and the pre-resolved passthrough. All existing
ray_data_test / ray_sink_test / ray_integration_test cases continue to
pass through the bridge.
Per JingsongLi's feedback on apache#7740 (line 79), the catalog -> table ->
ReadBuilder -> Scan -> splits chain that lived inline in RayDatasource
is now hidden behind a SplitProvider ABC with two implementations:

- CatalogSplitProvider: builds the chain from a table identifier and
  catalog options. Used by the public read_paimon facade. Caches the
  ReadBuilder plan so splits / read_type are resolved together exactly
  once, mirroring the previous _ensure_planned helper.
- PreResolvedSplitProvider: wraps an already-resolved (table, splits,
  read_type, predicate) tuple. Used by the TableRead.to_ray bridge to
  skip the catalog round-trip.

RayDatasource.__init__ now takes a SplitProvider via a keyword-only
split_provider= argument; the public table_identifier / catalog_options
path constructs CatalogSplitProvider for callers transparently. The
previous _resolved sentinel is removed in favor of this cleaner
injection point.

display_name() is added to the ABC so RayDatasource.get_name() can stay
provider-agnostic instead of branching on isinstance.

Tests: split_provider_test.py covers both implementations - lazy
planning + caching, identifier/options validation, propagation of
predicate / projection / limit (limit verified by split-pruning on a
multi-commit table), and the pre-resolved passthrough. All existing
ray_data_test / ray_sink_test / ray_integration_test cases continue to
pass through the bridge.
@TheR1sing3un TheR1sing3un force-pushed the py-ray-datasource-top-level branch from 944d144 to b863ad9 Compare May 2, 2026 15:12
Drop the inline scan args (table_identifier / catalog_options /
predicate / projection / limit) from RayDatasource.__init__. Once the
SplitProvider abstraction was introduced these duplicated
CatalogSplitProvider's constructor and created a footgun where passing
both a split_provider= kwarg and the inline args would silently drop
the inline args.

The constructor is now `RayDatasource(split_provider)`. Callers build
the provider themselves:

- pypaimon.ray.read_paimon constructs CatalogSplitProvider.
- TableRead.to_ray constructs PreResolvedSplitProvider.

The _from_table_read classmethod is removed since the bridge is now a
plain two-line caller-side construct.
Now that RayDatasource is a thin shell over a SplitProvider, the
public-looking @Property wrappers around the provider's API were
duplication, not abstraction. Remove them and call the provider
directly from the two internal users (estimate_inmemory_data_size,
get_read_tasks).

Removed:
- split_provider property (no caller anywhere in the repo).
- table / splits / read_type / predicate properties (only used inside
  this class, mirror SplitProvider one-for-one, and would force every
  future SplitProvider method addition to add a parallel property).

Also drop two dead defensive checks in get_read_tasks:
- hasattr(split, 'merged_row_count') -- defined on the Split ABC, so
  the check is always true.
- hasattr(split, 'row_count') -- abstract Split property, always true.

The hasattr guards on file_size / file_paths stay: those are not on
the ABC and the only Split implementation today (DataSplit) carries
them as concrete attributes.
The previous refactor hoisted the per-task read function to module
level. The justifications given in the PR description -- "avoids
closure-over-self serialization overhead" and "memory proportional to
one batch" -- did not actually depend on module-level placement: the
nested form already used default-arg early binding to avoid capturing
self, and was already a generator. Module-level only added a small
pickle-stability benefit at the cost of an extra public-looking name.

Inline it back as a nested function with the same default-arg binding
pattern. partial(_read_task, chunk_splits) replaces the longer
partial(_paimon_read_task, chunk_splits, table=..., predicate=..., ...)
call site.
Minimise the read-task-related diff vs master: keep the original inner
function name (_get_read_task), type annotations, docstring, and the
intermediate `get_read_task = partial(_get_read_task, ...)` step as-is.
The only change in the read-task area is now where its inputs (table,
predicate, read_type, splits, schema) come from -- they're sourced from
the SplitProvider instead of self.table_read.
@TheR1sing3un TheR1sing3un requested a review from JingsongLi May 2, 2026 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants