Skip to content

Initial work for file format writer API#3119

Open
nssalian wants to merge 7 commits intoapache:mainfrom
nssalian:file-format-initial-work
Open

Initial work for file format writer API#3119
nssalian wants to merge 7 commits intoapache:mainfrom
nssalian:file-format-initial-work

Conversation

@nssalian
Copy link
Copy Markdown
Contributor

@nssalian nssalian commented Mar 3, 2026

Initial work for #3100. Since this is a large change, doing it in parts similar to the AuthManager so it's easier to review and move the existing code around.

Rationale for this change

Introduces the pluggable file format writer API: FileFormatWriter, FileFormatModel, and
FileFormatFactory in pyiceberg/io/fileformat.py. Moves DataFileStatistics from pyarrow.py with a
re-export for backward compatibility. The move is more forward looking and the idea is to keep the stats generic in the future as we add additional formats too.

This is the first part of work for #3100. No behavioral changes; the write path remains hardcoded to Parquet.

Are these changes tested?

Yes. tests/io/test_fileformat.py tests backward-compatible import of DataFileStatistics

Are there any user-facing changes?

No

@nssalian nssalian marked this pull request as ready for review March 3, 2026 19:01
@nssalian
Copy link
Copy Markdown
Contributor Author

nssalian commented Mar 6, 2026

CC: @kevinjqliu @Fokko @geruh for review

Comment thread pyiceberg/io/pyarrow.py
OutputFile,
OutputStream,
)
from pyiceberg.io.fileformat import DataFileStatistics as DataFileStatistics
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from pyiceberg.io.fileformat import DataFileStatistics as DataFileStatistics
from pyiceberg.io.fileformat import DataFileStatistics

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_result: DataFileStatistics | None = None

@abstractmethod
def write(self, table: pa.Table) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A table looks to be the logical starting point, but I think an iterator of RecordBatches would also make sense. WDYT @kevinjqliu

def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
return Record(*[self._partition_value(field, schema) for field in partition_spec.fields])

def to_serialized_dict(self) -> dict[str, Any]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to change this into a TypedDict as a return type

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it over from the original implementation. I can do a TypedDict in a follow up when I wire it through if that works?

Comment thread pyiceberg/io/fileformat.py
Comment thread pyiceberg/io/fileformat.py
Comment on lines +175 to +178
def get(cls, file_format: FileFormat) -> FileFormatModel:
if file_format not in cls._registry:
raise ValueError(f"No writer registered for {file_format}. Available: {list(cls._registry.keys())}")
return cls._registry[file_format]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think PyIceberg diverges a bit from Java on this point. PyIceberg could have multiple implementatons for Parquet for example (Arrow/fsspec). Maybe we want something similar to the FileIO loading:

SCHEMA_TO_FILE_IO: dict[str, list[str]] = {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the FileFormatFactory as the Python equivalent of Java's FormatModelRegistry, keyed by FileFormat alone since Python only has Arrow (vs Java needing (FileFormat, Class<?>) for Spark/Flink/Generic). Let me know if you think it's worth adding a property-based override.

@nssalian nssalian requested a review from Fokko March 27, 2026 15:09
@nssalian
Copy link
Copy Markdown
Contributor Author

@Fokko @kevinjqliu @geruh PTAL

@Fokko Fokko requested a review from kevinjqliu March 31, 2026 13:09
@geruh geruh self-requested a review April 1, 2026 07:33
@nssalian
Copy link
Copy Markdown
Contributor Author

@geruh @kevinjqliu PTAL

Copy link
Copy Markdown
Member

@geruh geruh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late review here @nssalian, and thanks for starting this. I think a single format key is the right call for our python impl, since pyarrow is our universal data model (so far). I did a quick pass here lmk what you think!


def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the context manager, closing the writer and caching statistics."""
if exc_type is not None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this inverted? The exception branch calls close() and will swallow the error and return meaning _result gets populated with stats from a bad write. What if instead we do something like:

def __exit__(self, exc_type, exc_val, exc_tb):
    if exc_type is not None:
        try:
            self.close()
        except Exception:
            pass
        return
    self._result = self.close()

Similar to our logic in the manifest writer exit method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. I'll fix this.

value_counts: dict[int, int]
null_value_counts: dict[int, int]
nan_value_counts: dict[int, int]
column_aggregates: dict[int, StatsAggregator]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know how I feel about this. I think for now it's okay since we are working with mostly parquet. but then in ORC it would use the stripe metadata.

What we know is that the _partition_value() and partition() methods currently depend on column_aggregates to infer partition values from min/max. These could work from the serialized bounds instead but if refactoring is too much alternatively we could keep the DataFileStatistics in pyarrow class and introduce the shared type in your next phase as mentioned when parquet writer is actually extracted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the class (to_serialized_dict(), counts, sizes) is already format-agnostic. It's just the column_aggregates that is the concern.

For this PR, it's a pure move with no behavioral change. When I'm adding ORC write support, I'll refactor _partition_value() to work from serialized bounds (or define a minimal protocol that both Parquet row group stats and ORC stripe stats can satisfy). That way the refactor happens alongside a concrete second format.

Let me know what you think.


@classmethod
def register(cls, model: FileFormatModel) -> None:
cls._registry[model.format] = model
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this logic overwrite the models we support once we add that logic? It seems like the java implementation throws.

https://github.com/apache/iceberg/blob/8f30d8350bdac64e67e3778cc9489f07a57bc2e7/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java#L88-L101

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll throw a ValueError to match it. Will fix

from pyiceberg.io.fileformat import DataFileStatistics, FileFormatWriter


def test_backward_compat_import() -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests for the new abtractions, like getting an unknown format and duplicates like mentioned above. Also if there is an easy way to do the happy path roundtrip?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since they is no FileFormatModel I don't know if the roundtrip can be possible yet. I can add it in the next PR where I want to the model for Parquet. But I'll add those unknown and duplicate tests like you mentioned.

@nssalian nssalian requested review from Fokko and geruh April 17, 2026 21:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants