Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,49 @@ def text(self):
assert isinstance(result, TrinoResult)


def test_trino_result_iterator_survives_transient_error():
"""Iterator recovers from mid-iteration exceptions without losing rows."""

class FailOnceIterator:
def __init__(self):
self._count = 0
self._failed = False

def __iter__(self):
return self

def __next__(self):
self._count += 1
if self._count == 3 and not self._failed:
self._failed = True
raise IOError("S3 segment download failed")
if self._count > 5:
raise StopIteration
return [self._count]

class FakeQuery:
finished = True

def fetch(self):
return []

result = TrinoResult(FakeQuery(), FailOnceIterator())
it = iter(result)

assert next(it) == [1]
assert next(it) == [2]

with pytest.raises(IOError, match="S3 segment download failed"):
next(it)

# Key: iterator resumes after the error
assert next(it) == [4]
assert next(it) == [5]

with pytest.raises(StopIteration):
next(it)


def test_delay_exponential_without_jitter():
max_delay = 1200.0
get_delay = _DelayExponential(base=5, jitter=False, max_delay=max_delay)
Expand Down
32 changes: 24 additions & 8 deletions trino/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,15 +821,19 @@ class TrinoResult:
"""
Represent the result of a Trino query as an iterator on rows.

This class implements the iterator protocol as a generator type
https://docs.python.org/3/library/stdtypes.html#generator-types
This class implements the iterator protocol using __next__ so that
transient exceptions during iteration do not permanently kill the
iterator (unlike a generator, whose frame is finalized after an
unhandled exception).
"""

def __init__(self, query, rows: List[Any]):
self._query = query
# Initial rows from the first POST request
self._rows = rows
self._rownumber = 0
self._row_iter: Optional[Iterator[Any]] = None
self._next_rows = None

@property
def rows(self):
Expand All @@ -844,15 +848,27 @@ def rownumber(self) -> int:
return self._rownumber

def __iter__(self):
return self

def __next__(self):
# Lazy init: prefetch the next batch before exposing current rows.
# A query only transitions to a FINISHED state when the results are fully consumed:
# The reception of the data is acknowledged by calling the next_uri before exposing the data through dbapi.
while not self._query.finished or self._rows is not None:
next_rows = self._query.fetch() if not self._query.finished else None
for row in self._rows:
self._rownumber += 1
yield row
if self._row_iter is None:
self._row_iter = iter(self._rows)
self._next_rows = self._query.fetch() if not self._query.finished else None

self._rows = next_rows
while True:
try:
row = next(self._row_iter)
self._rownumber += 1
return row
except StopIteration:
if self._next_rows is None:
raise
self._rows = self._next_rows
self._row_iter = iter(self._rows)
self._next_rows = self._query.fetch() if not self._query.finished else None
Comment on lines +857 to +871

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to work correctly for the spooling case, when I/O errors originate from next(self._row_iter).

But what if the error is raised by self._query.fetch() instead?

As I understand, in the spooling case it is not an issue, since self._query.fetch() does not occur once the list of segments is fetched right at the beginning. Except maybe when it does not fit in a single response page? But it can occur if the direct protocol is used and self._query.fetch() is called to get the next result page.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately, what contract do we want the iterator to have?

Do we want to guarantee that it can be reused after an I/O error, and return more data without duplication, possibly skipping over some rows? It works that way for spooling but I'm not sure if it works that way for the direct protocol. For example, an error in self._query.fetch() in line 871 will leave self._next_rows unchanged and I think the iterator would go over them second time after the error.

@azawlocki-sbdt azawlocki-sbdt Jun 30, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought: if we assume s3 communication is more unreliable than coordinator communication, as #598 seems to assume, maybe we should just have a separate retry/backoff policy for s3 downloading?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hashhar, you're the reporter of #598, WDYT?



class TrinoQuery:
Expand Down
Loading