-
Notifications
You must be signed in to change notification settings - Fork 416
Closed
Description
Feature Request / Improvement
arrow_table = pa.Table.from_arrays([
pa.array([2, 3, 4, 5, 6]),
pa.array([
datetime(2021, 5, 19),
datetime(2022, 7, 25),
datetime(2023, 3, 22),
datetime(2024, 7, 17),
datetime(2025, 2, 22)
])
], names=['idx', 'ts'])
try:
cat.drop_table('default.test2')
except:
pass
tbl = cat.create_table(
'default.test2',
schema=Schema(
NestedField(1, 'idx', LongType()),
NestedField(2, 'ts', TimestampType())
)
)
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column('idx', 'id')
with tx.update_spec() as spec:
spec.add_field('id', IdentityTransform())
tbl.delete('id == 4')Raises:
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[25], line 2
1 # Should rewrite the original unpartitioned manifest
----> 2 tbl.delete('id == 4')
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:1611, in Table.delete(self, delete_filter, snapshot_properties)
1603 """
1604 Shorthand for deleting rows from the table.
1605
(...)
1608 snapshot_properties: Custom properties to be added to the snapshot summary
1609 """
1610 with self.transaction() as tx:
-> 1611 tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:593, in Transaction.delete(self, delete_filter, snapshot_properties)
590 if isinstance(delete_filter, str):
591 delete_filter = _parse_row_filter(delete_filter)
--> 593 with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
594 delete_snapshot.delete_by_predicate(delete_filter)
596 # Check if there are any files that require an actual rewrite of a data file
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:2090, in UpdateTableMetadata.__exit__(self, _, value, traceback)
2088 def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
2089 """Close and commit the change."""
-> 2090 self.commit()
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:2086, in UpdateTableMetadata.commit(self)
2085 def commit(self) -> None:
-> 2086 self._transaction._apply(*self._commit())
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3291, in DeleteFiles._commit(self)
3288 def _commit(self) -> UpdatesAndRequirements:
3289 # Only produce a commit when there is something to delete
3290 if self.files_affected:
-> 3291 return super()._commit()
3292 else:
3293 return (), ()
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3197, in _SnapshotProducer._commit(self)
3196 def _commit(self) -> UpdatesAndRequirements:
-> 3197 new_manifests = self._manifests()
3198 next_sequence_number = self._transaction.table_metadata.next_sequence_number()
3200 summary = self._summary(self.snapshot_properties)
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3157, in _SnapshotProducer._manifests(self)
3154 delete_manifests = executor.submit(_write_delete_manifest)
3155 existing_manifests = executor.submit(self._existing_manifests)
-> 3157 return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
File /usr/local/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
456 raise CancelledError()
457 elif self._state == FINISHED:
--> 458 return self.__get_result()
459 else:
460 raise TimeoutError()
File /usr/local/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
406 self = None
File /usr/local/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
55 return
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
File /usr/local/lib/python3.10/site-packages/pyiceberg/table/__init__.py:3146, in _SnapshotProducer._manifests.<locals>._write_delete_manifest()
3138 with write_manifest(
3139 format_version=self._transaction.table_metadata.format_version,
3140 spec=self._transaction.table_metadata.spec(),
(...)
3143 snapshot_id=self._snapshot_id,
3144 ) as writer:
3145 for delete_entry in deleted_entries:
-> 3146 writer.add_entry(delete_entry)
3147 return [writer.to_manifest_file()]
3148 else:
File /usr/local/lib/python3.10/site-packages/pyiceberg/manifest.py:816, in ManifestWriter.add_entry(self, entry)
809 if (
810 (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
811 and entry.sequence_number is not None
812 and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
813 ):
814 self._min_sequence_number = entry.sequence_number
--> 816 self._writer.write_block([self.prepare_entry(entry)])
817 return self
File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/file.py:281, in AvroOutputFile.write_block(self, objects)
279 block_content_encoder = BinaryEncoder(output_stream=in_memory)
280 for obj in objects:
--> 281 self.writer.write(block_content_encoder, obj)
282 block_content = in_memory.getvalue()
284 self.encoder.write_int(len(objects))
File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/writer.py:174, in StructWriter.write(self, encoder, val)
171 def write(self, encoder: BinaryEncoder, val: Record) -> None:
172 for pos, writer in self.field_writers:
173 # When pos is None, then it is a default value
--> 174 writer.write(encoder, val[pos] if pos is not None else None)
File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/writer.py:174, in StructWriter.write(self, encoder, val)
171 def write(self, encoder: BinaryEncoder, val: Record) -> None:
172 for pos, writer in self.field_writers:
173 # When pos is None, then it is a default value
--> 174 writer.write(encoder, val[pos] if pos is not None else None)
File /usr/local/lib/python3.10/site-packages/pyiceberg/avro/writer.py:174, in StructWriter.write(self, encoder, val)
171 def write(self, encoder: BinaryEncoder, val: Record) -> None:
172 for pos, writer in self.field_writers:
173 # When pos is None, then it is a default value
--> 174 writer.write(encoder, val[pos] if pos is not None else None)
File /usr/local/lib/python3.10/site-packages/pyiceberg/typedef.py:188, in Record.__getitem__(self, pos)
186 def __getitem__(self, pos: int) -> Any:
187 """Fetch a value from a Record."""
--> 188 return self.__getattribute__(self._position_to_field_name[pos])
IndexError: tuple index out of range
We add the manifest as deleted, but try to write that using the latest partition-spec.
Metadata
Metadata
Assignees
Labels
No labels