Skip to content

feat: Add metadata-only replace API to Table for REPLACE snapshot operations#3131

Open
qzyu999 wants to merge 18 commits intoapache:mainfrom
qzyu999:feature/core-rewrite-api
Open

feat: Add metadata-only replace API to Table for REPLACE snapshot operations#3131
qzyu999 wants to merge 18 commits intoapache:mainfrom
qzyu999:feature/core-rewrite-api

Conversation

@qzyu999
Copy link
Copy Markdown

@qzyu999 qzyu999 commented Mar 9, 2026

Closes #3130

Rationale for this change

In a current PR (#3124, part of #1092), the proposed replace() API accepts a PyArrow dataframe (pa.Table), forcing the table engine to physically serialize data during a metadata transaction commit. This couples execution with the catalog, diverges from Java Iceberg's native RewriteFiles builder behavior, and fails to register under Operation.REPLACE.

This PR redesigns table.replace() and transaction.replace() to accept Iterable[DataFile] inputs. By externalizing physical data writing (e.g., compaction via Ray), the new explicit metadata-only _RewriteFiles SnapshotProducer can natively swap snapshot pointers in the manifests, perfectly inheriting ancestral sequence numbers for DELETED entries to ensure time-travel equivalence.

Are these changes tested?

Yes.

Fully exhaustive test coverage has been added to tests/table/test_replace.py. The suite validates:

  1. Context manager executions tracking valid history growth (len(table.history())).
  2. Snapshot summary bindings asserting strict Operation.REPLACE tags.
  3. Accurate evaluation of delta-metrics (added/deleted files and records tracking perfectly).
  4. Low-level serialization: Bypassed high-level discard filters on manifest.fetch_manifest_entry(discard_deleted=False) to natively assert that status=DELETED overrides are accurately preserving avro sequence numbers.
  5. Idempotent edge cases where replace([], []) successfully short-circuits the commit loop without mutating history.

Are there any user-facing changes?

Yes.

The method signature for Table.replace() and Transaction.replace() has been updated from the original PR #3124.
It no longer accepts a PyArrow DataFrame (df: pa.Table). Instead, it now requests two arguments:
files_to_delete: Iterable[DataFile] and files_to_add: Iterable[DataFile], following the convention seen in the Java implementation.

(Please add the changelog label)

AI Disclosure

AI was used to help understand the code base and draft code changes. All code changes have been thoroughly reviewed, ensuring that the code changes are in line with a broader understanding of the codebase.

  • Worth deeper review after AI-assistance:
  • The test_invalid_operation() in tests/table/test_snapshots.py previously used Operation.REPLACE as a value to test invalid operations, but with this change Operation.REPLACE becomes valid. In place I just put a dummy Operation.
  • The _RewriteFiles in pyiceberg/table/update/snapshot.py overrides the _deleted_entries and _existing_manifests functions. I sought to test this thoroughly that it was done correctly. I am thinking it's possible to improve the test suite to make this more rigorous. I am open to suggestions on how that could be done.

qzyu999 added 4 commits March 9, 2026 15:40
- Fixed positional argument type mismatch for `snapshot_properties` in [_RewriteFiles](iceberg-python/pyiceberg/table/update/snapshot.py)
- Added missing `Catalog` type annotations to pytest fixtures in [test_replace.py](iceberg-python/tests/table/test_replace.py)
- Added strict `is not None` assertions for `table.current_snapshot()` to satisfy mypy Optional checking
- Auto-formatted tests with ruff
…ass enum validation (Operation.REPLACE is valid so we can no longer use it in test_invalid_operation)
Copy link
Copy Markdown
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

There are a couple of things we should check (and add as test cases):

Scanning Manifests

  • Every file marked for replacement was found in at least one manifest — if any are missing, abort
  • Matched entries are rewritten as DELETED with the new snapshot ID
  • Unmatched entries are carried over as EXISTING
  • Manifests with no affected files are reused unchanged

New Manifest

  • All incoming replacement files are present with status ADDED
  • The new snapshot ID is applied to every entry

Manifest List

  • Includes the new ADDED manifest
  • Includes all rewritten manifests with DELETED entries
  • Includes all unchanged manifests
  • Includes any pre-existing delete manifests, passed through as-is

Invariant Check

  • Records added ≤ records removed
  • If the difference is due to prior soft-deletes, confirm those delete files account for it
  • Records added never exceed records removed — if they do, the operation is invalid

Snapshot

  • Has a unique snapshot ID
  • Parent points to the previous snapshot
  • Sequence number is exactly previous + 1
  • Operation type is set to "replace"
  • Manifest list path is correct
  • Summary counts (files and records) are accurate

Comment thread pyiceberg/table/__init__.py Outdated
"""
return UpdateStatistics(transaction=self)

def replace(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not expose replace as a public function as we cannot guarantee that the files_to_delete and files_to_add contains the same records.

I think we should start at _RewriteFiles

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, I think that logic makes sense, as it would be dangerous for users to use these without being able to enforce the underlying expectations of the values input to these functions. Removed them in 94bd87e

Comment thread tests/table/test_replace.py Outdated
Comment on lines +84 to +93
assert len(manifest_files) == 2 # One for ADDED, one for DELETED

# Check that sequence numbers were handled properly natively by verifying the manifest contents
entries = []
for manifest in manifest_files:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
entries.append(entry)

# One entry for ADDED (new file), one for DELETED (old file)
assert len(entries) == 2
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, I've addressed this in the latest 33aaef0, where the status of each file is tested.

@qzyu999
Copy link
Copy Markdown
Author

qzyu999 commented Mar 28, 2026

Hi @kevinjqliu, apologies for the delay, thank you so much for taking the time to review the PR again, I understand that you are quite busy. I've addressed all your points in the latest set of tests within 33aaef0. I've thoroughly expanded the tests to integrate those requirements across a broad set of tests.

There are two minor issues I noticed however:

  • Requirement: If the difference is due to prior soft-deletes, confirm those delete files account for it
    • This would require however that the _RewriteFiles be scoped to handle Delete Manifests, but currently it's only set to handle Data Files. Handling Delete Manifests would make it so that we could potentially do REPLACE operations on deleted files. For example the purpose of this PR is to allow for compaction of data files, but we could in theory also compact delete files for the use case that someone has run many delete operations on many small files.
    • I think this is definitely something to work on, but perhaps not in this PR. The Java code seems to handle this well. I am thinking that after we merge this REPLACE, we can next work on the data compaction issue. Then after that we can come back to work on _RewriteFiles for Delete Manifests and work on metadata compaction afterwards.
  • Another more minor issue I noticed is that from running the tests and doing fast_append() on files that are DataFileContent.POSITION_DELETES, they're not yet being labeled properly as ManifestContent.DELETES. IIUC this is due to the fact that _SnapshotProducer._manifests() (which fast_append relies on under the hood) currently defaults to creating standard ManifestContent.DATA writers. It doesn't yet inspect the incoming file's content type to route POSITION_DELETES into a dedicated ManifestContent.DELETES writer. I worked around this in my test by scanning the manifest entries directly rather than relying on the manifest's label, but I just wanted to flag it for the roadmap for when we build out full Merge-on-Read write support.

Copy link
Copy Markdown
Member

@geruh geruh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @qzyu999, thanks for raising. I think the direction is right here. I just have a few design questions before going more indepth on the implementation details.

return []

def _existing_manifests(self) -> list[ManifestFile]:
"""To determine if there are any existing manifests."""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic looks nearly identical to the OverwriteFiles._existing_manifests(), and probably can use some clean python oop with the different snapshot classes. So let's create a helper in the _SnapshotProducer class. Then Overwrite can add it's additional logic on top and rewrite can just use it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, thank you for the feedback! I agree with this idea that the two sets of code are highly similar, in b0a770c I create a _get_existing_manifests function in _SnapshotProducer that is reused in both _OverwriteFiles and _RewriteFiles.


def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to rewrite
if self._deleted_data_files or self._added_data_files:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can replicate the _DeleteFiles logic here by using the @cache_property on _the compute deletes function. Especially since _commit() calls self._deleted_entries() for validation and then calls the super commit to write and get delete entries.

def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], bool]:

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, great suggestion, I've applied the changes in c8162a8.

)

def replace(self) -> _RewriteFiles:
return _RewriteFiles(
Copy link
Copy Markdown
Member

@geruh geruh Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sort of confused by the naming since we are introducing a user facing API replace but the underlying snapshot operation is a rewrite? We should rename to rewrite() for consistency? Unless I'm missing something?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, you bring up a good point, and it's something I noticed seemed off along the way. The reason why we have this discrepancy is because we're mirroring what's found in the Java code itself.

I named the Python API replace() to accurately reflect the Operation.REPLACE snapshot string it generates, while keeping the internal class named _RewriteFiles to match the Java builder logic.

That said, if you feel strongly about matching the Java API's user-facing method (rewrite()) rather than the snapshot operation (replace()), I'm happy to rename the public method to rewrite() for consistency. Let me know what you prefer!

manifests_after = snapshot_after.manifests(table.io)
manifest_paths_after = [m.manifest_path for m in manifests_after]

assert delete_manifest_path in manifest_paths_after
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test to ensure that original sequence numbers are carried over from before the rewrite.

Copy link
Copy Markdown
Author

@qzyu999 qzyu999 Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, great idea, I've added a test to check the sequence numbers in d939b67.

from pyiceberg.typedef import Record


def test_replace_internally(catalog: Catalog) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are on the right track can we add a few more like:

  • test with multiple files for set matching rows
  • test against partitioned table
  • did we do a no-op replace to ensure we have correct state matching java impl

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, these are all great test ideas, I've added them in d939b67.

added_records = sum(f.record_count for f in self._added_data_files)
deleted_records = sum(entry.data_file.record_count for entry in deleted_entries)

if added_records > deleted_records:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you seeing this invariant? I mean this seems correct since the spec says rewrite must be "logically equivalent". This check could reasonable as a safety guard, but what happens when delete file rewriting is added? Then these numbers could be incorrect.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @geruh, thanks for flagging this, you're right that this is a safety guard, but it doesn't yet factor in future changes when adding delete file rewriting. Should we add something like this?

# Note: This physical record count invariant is a sanity guard for data file 
# compaction to ensure no data is accidentally duplicated or invented. 
# TODO: This will need to be evolved into a logical record count validation 
# once PyIceberg supports rewriting delete files (Merge-on-Read).
added_records = sum(f.record_count for f in self._added_data_files)
deleted_records = sum(entry.data_file.record_count for entry in deleted_entries)

if added_records > deleted_records:
    raise ValueError(f"Invalid replace: records added ({added_records}) exceeds records removed ({deleted_records})")

This logical record count validation would involve something like having the _commit method to do the following, which the codebase currently cannot do:

  • Identify associated Delete Files: For every DataFile you are deleting, you would need to find every Position Delete or Equality Delete file that points to it.
  • Calculate the "Subtraction": You would need to subtract those delete row counts from the physical record_count of the old files to find the Old Logical Count.
  • Compare: You would then verify that Old Logical Count == New Logical Count.
    The current _RewriteFiles implementation is "blind" to deletes. It only tracks _added_data_files and _deleted_data_files.
    I believe this can be part of a full MoR implementation, something that I would love to work on after finishing these maintenance tasks.

Otherwise, I can also remove it from _RerwriteFiles and move forward, WDYT?

@qzyu999
Copy link
Copy Markdown
Author

qzyu999 commented Apr 16, 2026

Hi @geruh, thanks for the awesome feedback, I've responded to each of your replies, PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature: Add metadata-only replace API to Table for REPLACE snapshot operations

3 participants