diff --git a/src/databricks/sql/backend/thrift_backend.py b/src/databricks/sql/backend/thrift_backend.py index e23f3389b..353e05461 100644 --- a/src/databricks/sql/backend/thrift_backend.py +++ b/src/databricks/sql/backend/thrift_backend.py @@ -669,6 +669,20 @@ def _poll_for_status(self, op_handle): ) return self.make_request(self._client.GetOperationStatus, req) + def _heartbeat_poll(self, op_handle): + """ + Single-shot GetOperationStatus for the result-set heartbeat. Bypasses + make_request() so a transient failure does NOT stall inside the + driver's long retry budget — ResultHeartbeatManager counts failures + itself and self-stops after MAX_CONSECUTIVE_FAILURES. + """ + req = ttypes.TGetOperationStatusReq( + operationHandle=op_handle, + getProgressUpdate=False, + ) + with self._request_lock: + return self._client.GetOperationStatus(req) + def _create_arrow_table(self, t_row_set, lz4_compressed, schema_bytes, description): if t_row_set.columns is not None: ( diff --git a/src/databricks/sql/backend/thrift_result_heartbeat_manager.py b/src/databricks/sql/backend/thrift_result_heartbeat_manager.py new file mode 100644 index 000000000..6a6a0b9d9 --- /dev/null +++ b/src/databricks/sql/backend/thrift_result_heartbeat_manager.py @@ -0,0 +1,177 @@ +""" +Background heartbeat for the Thrift backend. + +Why this exists +--------------- +The warehouse evicts an operation/command handle after roughly 20-25 minutes +of driver idleness. Once that happens, any subsequent TFetchResults against +the handle returns HTTP 404 / RESOURCE_DOES_NOT_EXIST and the result set is +permanently broken — the driver's retry policy classifies the error as +non-retryable. + +This manager keeps the handle alive while a consumer is slowly draining +results. While a ThriftResultSet has rows still pending on the server, a +daemon thread issues a periodic GetOperationStatus against the operation +handle. The keepalive stops as soon as the server has finished delivering +data (last TFetchResults returns hasMoreRows=False) or the result set is +closed. + +Design mirrors the C# ADBC driver's DatabricksOperationStatusPoller. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Optional + +from databricks.sql.thrift_api.TCLIService import ttypes + +logger = logging.getLogger(__name__) + + +class ResultHeartbeatManager: + """Per-ResultSet background keepalive against operation-handle eviction.""" + + DEFAULT_INTERVAL_SECONDS = 60 + DEFAULT_STOP_TIMEOUT_SECONDS = 5.0 + MAX_CONSECUTIVE_FAILURES = 10 + + # Operation states that indicate the server has released the handle (or + # is about to). No point continuing to heartbeat against any of these. + # FINISHED_STATE is intentionally NOT terminal: it means query execution + # finished but the handle is still alive for result streaming. + _TERMINAL_STATES = frozenset( + { + ttypes.TOperationState.CANCELED_STATE, + ttypes.TOperationState.CLOSED_STATE, + ttypes.TOperationState.ERROR_STATE, + ttypes.TOperationState.TIMEDOUT_STATE, + ttypes.TOperationState.UKNOWN_STATE, + } + ) + + def __init__( + self, + *, + backend, + op_handle, + interval_seconds: int, + statement_id_hex: str, + ) -> None: + self._backend = backend + self._op_handle = op_handle + self._interval_seconds = interval_seconds + self._statement_id_hex = statement_id_hex + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._consecutive_failures = 0 + # Successful poll count — exposed for tests / ad-hoc debugging. + # Intentionally not surfaced through the telemetry pipeline; see the + # plan's "Telemetry: not added" section for why. + self._poll_count = 0 + self._lock = threading.Lock() + + def start(self) -> None: + """ + Spawn the daemon thread. Calling twice is a no-op with a warning — + not an exception, because this guard sits in ResultSet construction + and a defensive failure should not abort the user's query. + """ + with self._lock: + if self._thread is not None: + logger.warning( + "ResultHeartbeatManager.start() called twice for " + "statement %s; ignoring", + self._statement_id_hex, + ) + return + self._thread = threading.Thread( + target=self._run, + name="databricks-sql-heartbeat-%s" % self._statement_id_hex, + daemon=True, + ) + self._thread.start() + logger.debug( + "heartbeat manager started for statement %s " "(interval=%ss)", + self._statement_id_hex, + self._interval_seconds, + ) + + def stop(self, timeout: float = DEFAULT_STOP_TIMEOUT_SECONDS) -> None: + """ + Signal the loop to exit, then join with a bounded timeout. + + Idempotent. If the join elapses without the thread terminating + (e.g. wedged in a blocking socket rea_fill_results_bufferd), emit a single DEBUG log + line and return — the daemon thread will die with the interpreter. + """ + with self._lock: + self._stop_event.set() + thread = self._thread + if thread is None: + return + thread.join(timeout=timeout) + if thread.is_alive(): + logger.debug( + "heartbeat thread for statement %s did not terminate " + "within %ss; letting daemon thread die with interpreter", + self._statement_id_hex, + timeout, + ) + + def _run(self) -> None: + # Event.wait returns True if the event was set during the wait + # (i.e. stop was signaled), in which case we exit cleanly. + while not self._stop_event.wait(self._interval_seconds): + if not self._poll_once(): + return + + def _poll_once(self) -> bool: + """ + Issue a single GetOperationStatus. Return True to keep polling, + False to self-stop the manager. + """ + try: + resp = self._backend._heartbeat_poll(self._op_handle) + except Exception as e: + self._consecutive_failures += 1 + logger.debug( + "heartbeat poll failed for statement %s " + "(consecutive_failures=%d): %s", + self._statement_id_hex, + self._consecutive_failures, + e, + ) + if self._consecutive_failures >= self.MAX_CONSECUTIVE_FAILURES: + logger.warning( + "heartbeat manager stopping after %d consecutive " + "failures for statement %s", + self._consecutive_failures, + self._statement_id_hex, + ) + return False + return True + + self._consecutive_failures = 0 + self._poll_count += 1 + state = getattr(resp, "operationState", None) + state_name = ( + ttypes.TOperationState._VALUES_TO_NAMES.get(state, str(state)) + if state is not None + else "None" + ) + logger.debug( + "heartbeat poll ok for statement %s (state=%s)", + self._statement_id_hex, + state_name, + ) + if state in self._TERMINAL_STATES: + logger.debug( + "heartbeat poll for statement %s observed terminal " + "operation state %s; stopping", + self._statement_id_hex, + state_name, + ) + return False + return True diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index fe52f0c79..ce10d08c0 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -264,6 +264,14 @@ def read(self) -> Optional[OAuthToken]: # (True by default) # use_cloud_fetch # Enable use of cloud fetch to extract large query results in parallel via cloud storage + # enable_heartbeat + # When True (default), each Thrift ResultSet that still has rows pending on the server + # spawns a daemon thread that periodically issues TGetOperationStatus against the + # operation handle to keep it alive past the warehouse's idle-eviction window + # (~20-25 min). Pass enable_heartbeat=False to opt out. + # heartbeat_interval_seconds + # Interval between heartbeat polls in seconds (default 60). Has no effect when + # enable_heartbeat is False. logger.debug( "Connection.__init__(server_hostname=%s, http_path=%s)", @@ -295,6 +303,11 @@ def read(self) -> Optional[OAuthToken]: self.disable_pandas = kwargs.get("_disable_pandas", False) self.lz4_compression = kwargs.get("enable_query_result_lz4_compression", True) self.use_cloud_fetch = kwargs.get("use_cloud_fetch", True) + # Per-ResultSet background GetOperationStatus keepalive against + # server-side operation-handle idle eviction. See + # backend/thrift_result_heartbeat_manager.py for details. + self.enable_heartbeat = kwargs.get("enable_heartbeat", True) + self.heartbeat_interval_seconds = kwargs.get("heartbeat_interval_seconds", 60) self._cursors = [] # type: List[Cursor] self.telemetry_batch_size = kwargs.get( "telemetry_batch_size", TelemetryClientFactory.DEFAULT_BATCH_SIZE diff --git a/src/databricks/sql/result_set.py b/src/databricks/sql/result_set.py index 6c4c3a43a..cccbb83e1 100644 --- a/src/databricks/sql/result_set.py +++ b/src/databricks/sql/result_set.py @@ -163,6 +163,14 @@ def fetchall_arrow(self) -> "pyarrow.Table": """Fetch all remaining rows as an Arrow table.""" pass + def _stop_heartbeat(self) -> None: + """ + Stop any background heartbeat associated with this result set. + + Base-class no-op; the Thrift result set overrides this. + """ + return None + def close(self) -> None: """ Close the result set. @@ -171,6 +179,10 @@ def close(self) -> None: been closed on the server for some other reason, issue a request to the server to close it. """ try: + # Stop the heartbeat BEFORE close_command so the manager doesn't + # race against the close RPC over the same Thrift transport. + self._stop_heartbeat() + if self.results is not None: self.results.close() else: @@ -222,6 +234,10 @@ def __init__( :param has_more_rows: Whether there are more rows to fetch """ self.num_chunks = 0 + # Initialize before any code path that could call _stop_heartbeat + # (e.g. _fill_results_buffer below, if the initial fetch flips + # has_more_rows to False). + self._heartbeat_manager = None # Initialize ThriftResultSet-specific attributes self._use_cloud_fetch = use_cloud_fetch @@ -270,6 +286,45 @@ def __init__( if not self.results: self._fill_results_buffer() + # Start the background keepalive once the result set is fully + # constructed and we know the server still has more rows to deliver. + # This must happen AFTER the initial _fill_results_buffer above, + # because that call may flip has_more_rows to False. + if self._heartbeat_eligible(): + from databricks.sql.backend.thrift_result_heartbeat_manager import ( + ResultHeartbeatManager, + ) + + self._heartbeat_manager = ResultHeartbeatManager( + backend=self.backend, + op_handle=self.command_id.to_thrift_handle(), + interval_seconds=connection.heartbeat_interval_seconds, + statement_id_hex=self.command_id.to_hex_guid(), + ) + self._heartbeat_manager.start() + + def _heartbeat_eligible(self) -> bool: + if not getattr(self.connection, "enable_heartbeat", False): + return False + if self.has_been_closed_server_side: + return False + if not self.has_more_rows: + return False + # Defensive: command_id can be None in tests / mocks. Also, + # to_thrift_handle returns None for non-Thrift command IDs. + if self.command_id is None: + return False + return self.command_id.to_thrift_handle() is not None + + def _stop_heartbeat(self) -> None: + manager = self._heartbeat_manager + if manager is None: + return + # Clear the attribute first so re-entry is a no-op even if stop() + # itself is slow. + self._heartbeat_manager = None + manager.stop() + def _fill_results_buffer(self): results, has_more_rows, result_links_count = self.backend.fetch_results( command_id=self.command_id, @@ -286,6 +341,13 @@ def _fill_results_buffer(self): self.has_more_rows = has_more_rows self.num_chunks += result_links_count + # Server has finished delivering rows for this statement — no point + # keeping the operation handle alive even if the local buffer still + # holds rows the consumer hasn't drained. Matches C# ADBC's stop at + # end-of-results inside ReadNextRecordBatchAsync. + if not has_more_rows: + self._stop_heartbeat() + def _convert_columnar_table(self, table): column_names = [c[0] for c in self.description] ResultRow = Row(*column_names) diff --git a/tests/e2e/test_heartbeat.py b/tests/e2e/test_heartbeat.py new file mode 100644 index 000000000..dd3e9ef53 --- /dev/null +++ b/tests/e2e/test_heartbeat.py @@ -0,0 +1,110 @@ +""" +End-to-end tests for the Thrift result-set heartbeat. + +Requires a real warehouse and the standard env vars: + DATABRICKS_SERVER_HOSTNAME + DATABRICKS_HTTP_PATH + DATABRICKS_TOKEN + +Skipped automatically when any of those are not set. Runs in seconds, not +the full 30-min idle window — we use a tiny heartbeat_interval_seconds to +verify the loop is alive, rather than waiting for actual handle eviction. +""" + +import os +import time +from contextlib import contextmanager + +import pytest + +import databricks.sql as sql + + +pytestmark = pytest.mark.skipif( + not ( + os.getenv("DATABRICKS_SERVER_HOSTNAME") + and os.getenv("DATABRICKS_HTTP_PATH") + and os.getenv("DATABRICKS_TOKEN") + ), + reason="DATABRICKS_SERVER_HOSTNAME / DATABRICKS_HTTP_PATH / DATABRICKS_TOKEN not set", +) + + +@contextmanager +def _connect(**extra): + conn = sql.connect( + server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"], + http_path=os.environ["DATABRICKS_HTTP_PATH"], + access_token=os.environ["DATABRICKS_TOKEN"], + use_cloud_fetch=False, + **extra, + ) + try: + yield conn + finally: + conn.close() + + +def _prime_for_more_rows(cursor): + """ + Execute a query and drain the direct-results batch so that subsequent + fetches must round-trip to the server. Guarantees has_more_rows=True + on the active ResultSet — the path the heartbeat protects. + """ + cursor.arraysize = 10 + cursor.execute("SELECT id FROM range(0, 100000)") + first = cursor.fetchmany(10) + assert len(first) == 10 + sanity = cursor.fetchmany(10) + assert len(sanity) == 10 + + +class TestThriftHeartbeat: + def test_heartbeat_polls_during_idle(self): + with _connect(enable_heartbeat=True, heartbeat_interval_seconds=2) as conn: + with conn.cursor() as cursor: + _prime_for_more_rows(cursor) + + rs = cursor.active_result_set + mgr = rs._heartbeat_manager + assert mgr is not None, ( + "heartbeat manager should be constructed when " "has_more_rows=True" + ) + assert mgr._thread is not None and mgr._thread.is_alive() + + # Wait > 2× interval so the loop has issued multiple polls. + time.sleep(5) + + assert mgr._poll_count >= 2, ( + f"expected >= 2 heartbeat polls in 5s at interval=2s, " + f"got {mgr._poll_count}" + ) + + # Pull more rows; the heartbeat should still be alive while + # the server has data left. + cursor.fetchmany(10) + assert mgr._thread.is_alive() + + def test_heartbeat_disabled_skips_manager(self): + with _connect(enable_heartbeat=False) as conn: + with conn.cursor() as cursor: + cursor.arraysize = 10 + cursor.execute("SELECT id FROM range(0, 100000)") + cursor.fetchmany(10) + rs = cursor.active_result_set + assert rs._heartbeat_manager is None + + def test_heartbeat_stops_on_close(self): + with _connect(enable_heartbeat=True, heartbeat_interval_seconds=2) as conn: + with conn.cursor() as cursor: + _prime_for_more_rows(cursor) + rs = cursor.active_result_set + mgr = rs._heartbeat_manager + assert mgr is not None and mgr._thread.is_alive() + + rs.close() + + # stop() joins with timeout=5s, so the thread must be dead + # by the time close() returns. + assert not mgr._thread.is_alive() + assert rs._heartbeat_manager is None diff --git a/tests/unit/test_result_heartbeat_manager.py b/tests/unit/test_result_heartbeat_manager.py new file mode 100644 index 000000000..4ec960e45 --- /dev/null +++ b/tests/unit/test_result_heartbeat_manager.py @@ -0,0 +1,159 @@ +"""Unit tests for ResultHeartbeatManager.""" + +import threading +import time +import unittest +from unittest.mock import Mock + +from databricks.sql.backend.thrift_result_heartbeat_manager import ( + ResultHeartbeatManager, +) +from databricks.sql.thrift_api.TCLIService import ttypes + + +def _resp(state): + return Mock(operationState=state) + + +def _wait_until(predicate, timeout=2.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(0.005) + return False + + +class ResultHeartbeatManagerTest(unittest.TestCase): + def _make_manager(self, backend, interval_seconds=0.01): + return ResultHeartbeatManager( + backend=backend, + op_handle=Mock(name="op_handle"), + interval_seconds=interval_seconds, + statement_id_hex="test-stmt", + ) + + # ---- lifecycle ---------------------------------------------------- + + def test_start_spawns_daemon_thread_that_polls(self): + backend = Mock() + backend._heartbeat_poll.return_value = _resp( + ttypes.TOperationState.RUNNING_STATE + ) + mgr = self._make_manager(backend) + mgr.start() + + try: + self.assertTrue( + _wait_until(lambda: backend._heartbeat_poll.call_count >= 2), + "heartbeat thread did not poll twice in 2s", + ) + self.assertTrue(mgr._thread.daemon) + self.assertTrue(mgr._thread.is_alive()) + finally: + mgr.stop() + + self.assertFalse(mgr._thread.is_alive()) + + def test_stop_before_start_is_safe(self): + mgr = self._make_manager(Mock()) + mgr.stop() # must not raise + + def test_stop_twice_is_idempotent(self): + backend = Mock() + backend._heartbeat_poll.return_value = _resp( + ttypes.TOperationState.RUNNING_STATE + ) + mgr = self._make_manager(backend) + mgr.start() + mgr.stop() + mgr.stop() # must not raise + + def test_double_start_is_logged_warning_no_op(self): + backend = Mock() + backend._heartbeat_poll.return_value = _resp( + ttypes.TOperationState.RUNNING_STATE + ) + mgr = self._make_manager(backend) + mgr.start() + first_thread = mgr._thread + try: + mgr.start() # must not raise + self.assertIs(mgr._thread, first_thread) + finally: + mgr.stop() + + # ---- terminal-state self-stop ------------------------------------ + + def test_terminal_state_self_stops_the_thread(self): + for state in ( + ttypes.TOperationState.CLOSED_STATE, + ttypes.TOperationState.CANCELED_STATE, + ttypes.TOperationState.ERROR_STATE, + ttypes.TOperationState.TIMEDOUT_STATE, + ttypes.TOperationState.UKNOWN_STATE, + ): + with self.subTest(state=state): + backend = Mock() + backend._heartbeat_poll.return_value = _resp(state) + mgr = self._make_manager(backend) + mgr.start() + self.assertTrue( + _wait_until(lambda: not mgr._thread.is_alive()), + f"thread did not self-stop on terminal state {state}", + ) + + def test_finished_state_keeps_polling(self): + backend = Mock() + backend._heartbeat_poll.return_value = _resp( + ttypes.TOperationState.FINISHED_STATE + ) + mgr = self._make_manager(backend) + mgr.start() + try: + self.assertTrue( + _wait_until(lambda: backend._heartbeat_poll.call_count >= 3), + "FINISHED_STATE should NOT terminate the heartbeat", + ) + self.assertTrue(mgr._thread.is_alive()) + finally: + mgr.stop() + + # ---- failure counter --------------------------------------------- + + def test_self_stops_after_max_consecutive_failures(self): + backend = Mock() + backend._heartbeat_poll.side_effect = RuntimeError("boom") + mgr = self._make_manager(backend) + mgr.start() + self.assertTrue( + _wait_until(lambda: not mgr._thread.is_alive(), timeout=3.0), + "thread did not self-stop after consecutive failures", + ) + self.assertEqual( + backend._heartbeat_poll.call_count, + ResultHeartbeatManager.MAX_CONSECUTIVE_FAILURES, + ) + + def test_success_resets_failure_counter(self): + backend = Mock() + seq = [RuntimeError("x")] * 5 + [_resp(ttypes.TOperationState.RUNNING_STATE)] + backend._heartbeat_poll.side_effect = ( + seq + [_resp(ttypes.TOperationState.RUNNING_STATE)] * 10 + ) + + mgr = self._make_manager(backend) + mgr.start() + try: + self.assertTrue( + _wait_until(lambda: backend._heartbeat_poll.call_count >= len(seq) + 2), + "polling did not continue past the recovery point", + ) + self.assertTrue(mgr._thread.is_alive()) + self.assertEqual(mgr._consecutive_failures, 0) + finally: + mgr.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_thrift_result_set_heartbeat.py b/tests/unit/test_thrift_result_set_heartbeat.py new file mode 100644 index 000000000..b5a05dd8a --- /dev/null +++ b/tests/unit/test_thrift_result_set_heartbeat.py @@ -0,0 +1,155 @@ +""" +Integration tests for ResultHeartbeatManager wiring into ThriftResultSet. + +These verify the lifecycle hooks (start in __init__, stop in close(), stop +when _fill_results_buffer flips has_more_rows to False) — not the manager +itself, which is covered in test_result_heartbeat_manager.py. +""" + +import unittest +from unittest.mock import Mock, patch + +from databricks.sql.backend.thrift_backend import ThriftDatabricksClient +from databricks.sql.backend.types import ( + BackendType, + CommandId, + CommandState, + ExecuteResponse, +) +from databricks.sql.result_set import ThriftResultSet +from databricks.sql.thrift_api.TCLIService import ttypes + + +def _make_thrift_command_id() -> CommandId: + handle = ttypes.TOperationHandle( + operationId=ttypes.THandleIdentifier(guid=b"a" * 16, secret=b"b" * 16), + operationType=0, + hasResultSet=True, + modifiedRowCount=None, + ) + return CommandId.from_thrift_handle(handle) + + +def _make_connection(enable_heartbeat=True, interval=60): + conn = Mock() + conn.enable_heartbeat = enable_heartbeat + conn.heartbeat_interval_seconds = interval + conn.heartbeat_request_timeout_seconds = 30 + conn.disable_pandas = False + conn.open = True + return conn + + +def _execute_response(*, has_been_closed_server_side=False): + return ExecuteResponse( + command_id=_make_thrift_command_id(), + status=CommandState.SUCCEEDED, + description=[("c", "int", None, None, None, None, None)], + has_been_closed_server_side=has_been_closed_server_side, + lz4_compressed=False, + is_staging_operation=False, + arrow_schema_bytes=None, + result_format=None, + ) + + +def _make_backend(): + backend = Mock(spec=ThriftDatabricksClient) + backend.fetch_results.return_value = (Mock(), True, 0) + return backend + + +@patch( + "databricks.sql.backend.thrift_result_heartbeat_manager." "ResultHeartbeatManager" +) +class ThriftResultSetHeartbeatWiringTest(unittest.TestCase): + def _make_rs( + self, + *, + has_more_rows=True, + has_been_closed_server_side=False, + enable_heartbeat=True, + backend=None, + ): + backend = backend or _make_backend() + # Provide a pre-built results queue so __init__ doesn't call + # _fill_results_buffer() before we get to the eligibility check. + results_queue = Mock() + return ( + ThriftResultSet( + connection=_make_connection(enable_heartbeat=enable_heartbeat), + execute_response=_execute_response( + has_been_closed_server_side=has_been_closed_server_side + ), + thrift_client=backend, + has_more_rows=has_more_rows, + ), + backend, + results_queue, + ) + + def test_starts_when_has_more_rows_true(self, mgr_cls): + rs, _, _ = self._make_rs() + mgr_cls.assert_called_once() + rs._heartbeat_manager.start.assert_called_once() + + def test_does_not_start_when_no_more_rows(self, mgr_cls): + # When has_more_rows=False the eligibility check should short-circuit + # BEFORE the initial _fill_results_buffer can flip anything. + backend = _make_backend() + backend.fetch_results.return_value = (Mock(), False, 0) + rs, _, _ = self._make_rs(has_more_rows=False, backend=backend) + mgr_cls.assert_not_called() + self.assertIsNone(rs._heartbeat_manager) + + def test_does_not_start_when_closed_server_side(self, mgr_cls): + rs, _, _ = self._make_rs(has_been_closed_server_side=True) + mgr_cls.assert_not_called() + + def test_does_not_start_when_flag_disabled(self, mgr_cls): + rs, _, _ = self._make_rs(enable_heartbeat=False) + mgr_cls.assert_not_called() + + def test_close_stops_heartbeat_first(self, mgr_cls): + rs, backend, _ = self._make_rs() + manager = rs._heartbeat_manager + self.assertIsNotNone(manager) + + rs.close() + manager.stop.assert_called_once() + self.assertIsNone(rs._heartbeat_manager) + + def test_close_twice_is_idempotent(self, mgr_cls): + rs, _, _ = self._make_rs() + manager = rs._heartbeat_manager + + rs.close() + rs.close() + manager.stop.assert_called_once() + + def test_fill_results_buffer_stops_when_server_finished(self, mgr_cls): + backend = _make_backend() + rs, _, _ = self._make_rs(backend=backend) + manager = rs._heartbeat_manager + + # Server delivers its last batch and signals no more rows. + backend.fetch_results.return_value = (Mock(), False, 0) + rs._fill_results_buffer() + + manager.stop.assert_called_once() + self.assertIsNone(rs._heartbeat_manager) + + def test_fill_results_buffer_keeps_running_when_more_rows(self, mgr_cls): + backend = _make_backend() + rs, _, _ = self._make_rs(backend=backend) + manager = rs._heartbeat_manager + + backend.fetch_results.return_value = (Mock(), True, 0) + rs._fill_results_buffer() + + manager.stop.assert_not_called() + self.assertIs(rs._heartbeat_manager, manager) + + +if __name__ == "__main__": + unittest.main()