Initial work for file format writer API#3119
Conversation
|
CC: @kevinjqliu @Fokko @geruh for review |
| OutputFile, | ||
| OutputStream, | ||
| ) | ||
| from pyiceberg.io.fileformat import DataFileStatistics as DataFileStatistics |
There was a problem hiding this comment.
| from pyiceberg.io.fileformat import DataFileStatistics as DataFileStatistics | |
| from pyiceberg.io.fileformat import DataFileStatistics |
There was a problem hiding this comment.
mypy wasn't happy about this previously: https://github.com/apache/iceberg-python/actions/runs/22681243975/job/65752048019
| _result: DataFileStatistics | None = None | ||
|
|
||
| @abstractmethod | ||
| def write(self, table: pa.Table) -> None: |
There was a problem hiding this comment.
A table looks to be the logical starting point, but I think an iterator of RecordBatches would also make sense. WDYT @kevinjqliu
| def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: | ||
| return Record(*[self._partition_value(field, schema) for field in partition_spec.fields]) | ||
|
|
||
| def to_serialized_dict(self) -> dict[str, Any]: |
There was a problem hiding this comment.
Might be nice to change this into a TypedDict as a return type
There was a problem hiding this comment.
I moved it over from the original implementation. I can do a TypedDict in a follow up when I wire it through if that works?
| def get(cls, file_format: FileFormat) -> FileFormatModel: | ||
| if file_format not in cls._registry: | ||
| raise ValueError(f"No writer registered for {file_format}. Available: {list(cls._registry.keys())}") | ||
| return cls._registry[file_format] |
There was a problem hiding this comment.
I think PyIceberg diverges a bit from Java on this point. PyIceberg could have multiple implementatons for Parquet for example (Arrow/fsspec). Maybe we want something similar to the FileIO loading:
iceberg-python/pyiceberg/io/__init__.py
Line 303 in 82f6040
There was a problem hiding this comment.
I implemented the FileFormatFactory as the Python equivalent of Java's FormatModelRegistry, keyed by FileFormat alone since Python only has Arrow (vs Java needing (FileFormat, Class<?>) for Spark/Flink/Generic). Let me know if you think it's worth adding a property-based override.
|
@Fokko @kevinjqliu @geruh PTAL |
|
@geruh @kevinjqliu PTAL |
geruh
left a comment
There was a problem hiding this comment.
Sorry for the late review here @nssalian, and thanks for starting this. I think a single format key is the right call for our python impl, since pyarrow is our universal data model (so far). I did a quick pass here lmk what you think!
|
|
||
| def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: | ||
| """Exit the context manager, closing the writer and caching statistics.""" | ||
| if exc_type is not None: |
There was a problem hiding this comment.
Is this inverted? The exception branch calls close() and will swallow the error and return meaning _result gets populated with stats from a bad write. What if instead we do something like:
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
try:
self.close()
except Exception:
pass
return
self._result = self.close()
Similar to our logic in the manifest writer exit method.
There was a problem hiding this comment.
Great catch. I'll fix this.
| value_counts: dict[int, int] | ||
| null_value_counts: dict[int, int] | ||
| nan_value_counts: dict[int, int] | ||
| column_aggregates: dict[int, StatsAggregator] |
There was a problem hiding this comment.
I still don't know how I feel about this. I think for now it's okay since we are working with mostly parquet. but then in ORC it would use the stripe metadata.
What we know is that the _partition_value() and partition() methods currently depend on column_aggregates to infer partition values from min/max. These could work from the serialized bounds instead but if refactoring is too much alternatively we could keep the DataFileStatistics in pyarrow class and introduce the shared type in your next phase as mentioned when parquet writer is actually extracted.
There was a problem hiding this comment.
The rest of the class (to_serialized_dict(), counts, sizes) is already format-agnostic. It's just the column_aggregates that is the concern.
For this PR, it's a pure move with no behavioral change. When I'm adding ORC write support, I'll refactor _partition_value() to work from serialized bounds (or define a minimal protocol that both Parquet row group stats and ORC stripe stats can satisfy). That way the refactor happens alongside a concrete second format.
Let me know what you think.
|
|
||
| @classmethod | ||
| def register(cls, model: FileFormatModel) -> None: | ||
| cls._registry[model.format] = model |
There was a problem hiding this comment.
Will this logic overwrite the models we support once we add that logic? It seems like the java implementation throws.
There was a problem hiding this comment.
I'll throw a ValueError to match it. Will fix
| from pyiceberg.io.fileformat import DataFileStatistics, FileFormatWriter | ||
|
|
||
|
|
||
| def test_backward_compat_import() -> None: |
There was a problem hiding this comment.
Can we add some tests for the new abtractions, like getting an unknown format and duplicates like mentioned above. Also if there is an easy way to do the happy path roundtrip?
There was a problem hiding this comment.
Since they is no FileFormatModel I don't know if the roundtrip can be possible yet. I can add it in the next PR where I want to the model for Parquet. But I'll add those unknown and duplicate tests like you mentioned.
Initial work for #3100. Since this is a large change, doing it in parts similar to the
AuthManagerso it's easier to review and move the existing code around.Rationale for this change
Introduces the pluggable file format writer API:
FileFormatWriter,FileFormatModel, andFileFormatFactoryinpyiceberg/io/fileformat.py. MovesDataFileStatisticsfrompyarrow.pywith are-export for backward compatibility. The move is more forward looking and the idea is to keep the stats generic in the future as we add additional formats too.
This is the first part of work for #3100. No behavioral changes; the write path remains hardcoded to Parquet.
Are these changes tested?
Yes.
tests/io/test_fileformat.pytests backward-compatible import ofDataFileStatisticsAre there any user-facing changes?
No