This repository was archived by the owner on Mar 6, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 324
perf: if page_size or max_results is set on QueryJob.result(), use to download first page of results
#1942
Merged
Merged
perf: if page_size or max_results is set on QueryJob.result(), use to download first page of results
#1942
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
4e9f57c
perf: if `page_size` or `max_results` is set on `QueryJob.result()`, …
tswast bd88ddf
add unit tests for query_and_wait
tswast 2241c13
Merge branch 'main' into b344008814-page_size-query_and_wait
tswast f77eba9
populate maxResults on page 2
tswast 1c66608
fix maxResults
tswast 82add16
Merge remote-tracking branch 'origin/b344008814-page_size-query_and_w…
tswast 4267bdf
fix coverage
tswast d4f4bb9
Merge branch 'main' into b344008814-page_size-query_and_wait
Linchin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1385,7 +1385,10 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): | |
| raise | ||
|
|
||
| def _reload_query_results( | ||
| self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: Optional[float] = None | ||
| self, | ||
| retry: "retries.Retry" = DEFAULT_RETRY, | ||
| timeout: Optional[float] = None, | ||
| page_size: int = 0, | ||
| ): | ||
| """Refresh the cached query results unless already cached and complete. | ||
|
|
||
|
|
@@ -1395,6 +1398,9 @@ def _reload_query_results( | |
| timeout (Optional[float]): | ||
| The number of seconds to wait for the underlying HTTP transport | ||
| before using ``retry``. | ||
| page_size (int): | ||
| Maximum number of rows in a single response. See maxResults in | ||
| the jobs.getQueryResults REST API. | ||
| """ | ||
| # Optimization: avoid a call to jobs.getQueryResults if it's already | ||
| # been fetched, e.g. from jobs.query first page of results. | ||
|
|
@@ -1425,7 +1431,14 @@ def _reload_query_results( | |
|
|
||
| # If an explicit timeout is not given, fall back to the transport timeout | ||
| # stored in _blocking_poll() in the process of polling for job completion. | ||
| transport_timeout = timeout if timeout is not None else self._transport_timeout | ||
| if timeout is not None: | ||
| transport_timeout = timeout | ||
| else: | ||
| transport_timeout = self._transport_timeout | ||
|
|
||
| # Handle PollingJob._DEFAULT_VALUE. | ||
| if not isinstance(transport_timeout, (float, int)): | ||
| transport_timeout = None | ||
|
|
||
| self._query_results = self._client._get_query_results( | ||
| self.job_id, | ||
|
|
@@ -1434,6 +1447,7 @@ def _reload_query_results( | |
| timeout_ms=timeout_ms, | ||
| location=self.location, | ||
| timeout=transport_timeout, | ||
| page_size=page_size, | ||
| ) | ||
|
|
||
| def result( # type: ignore # (incompatible with supertype) | ||
|
|
@@ -1515,11 +1529,25 @@ def result( # type: ignore # (incompatible with supertype) | |
| # actually correspond to a finished query job. | ||
| ) | ||
|
|
||
| # Setting max_results should be equivalent to setting page_size with | ||
| # regards to allowing the user to tune how many results to download | ||
| # while we wait for the query to finish. See internal issue: | ||
| # 344008814. | ||
| if page_size is None and max_results is not None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for my education, why are setting max_results and page_size equivalent here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Imagine the user has selected a value for Also, it means the user is expecting to download the results and do something with them, which means it wouldn't be a waste to ask for the first page of results while we wait for the query to finish. |
||
| page_size = max_results | ||
|
|
||
| # When timeout has default sentinel value ``object()``, do not pass | ||
| # anything to invoke default timeouts in subsequent calls. | ||
| kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {} | ||
| done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {} | ||
| reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {} | ||
| list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {} | ||
| if type(timeout) is not object: | ||
| kwargs["timeout"] = timeout | ||
| done_kwargs["timeout"] = timeout | ||
| list_rows_kwargs["timeout"] = timeout | ||
| reload_query_results_kwargs["timeout"] = timeout | ||
|
|
||
| if page_size is not None: | ||
| reload_query_results_kwargs["page_size"] = page_size | ||
|
|
||
| try: | ||
| retry_do_query = getattr(self, "_retry_do_query", None) | ||
|
|
@@ -1562,7 +1590,7 @@ def is_job_done(): | |
| # rateLimitExceeded errors are ambiguous. We want to know if | ||
| # the query job failed and not just the call to | ||
| # jobs.getQueryResults. | ||
| if self.done(retry=retry, **kwargs): | ||
| if self.done(retry=retry, **done_kwargs): | ||
| # If it's already failed, we might as well stop. | ||
| job_failed_exception = self.exception() | ||
| if job_failed_exception is not None: | ||
|
|
@@ -1599,14 +1627,16 @@ def is_job_done(): | |
| # response from the REST API. This ensures we aren't | ||
| # making any extra API calls if the previous loop | ||
| # iteration fetched the finished job. | ||
| self._reload_query_results(retry=retry, **kwargs) | ||
| self._reload_query_results( | ||
| retry=retry, **reload_query_results_kwargs | ||
| ) | ||
| return True | ||
|
|
||
| # Call jobs.getQueryResults with max results set to 0 just to | ||
| # wait for the query to finish. Unlike most methods, | ||
| # jobs.getQueryResults hangs as long as it can to ensure we | ||
| # know when the query has finished as soon as possible. | ||
| self._reload_query_results(retry=retry, **kwargs) | ||
| self._reload_query_results(retry=retry, **reload_query_results_kwargs) | ||
|
|
||
| # Even if the query is finished now according to | ||
| # jobs.getQueryResults, we'll want to reload the job status if | ||
|
|
@@ -1679,8 +1709,9 @@ def is_job_done(): | |
| # We know that there's at least 1 row, so only treat the response from | ||
| # jobs.getQueryResults / jobs.query as the first page of the | ||
| # RowIterator response if there are any rows in it. This prevents us | ||
| # from stopping the iteration early because we're missing rows and | ||
| # there's no next page token. | ||
| # from stopping the iteration early in the cases where we set | ||
| # maxResults=0. In that case, we're missing rows and there's no next | ||
| # page token. | ||
| first_page_response = self._query_results._properties | ||
| if "rows" not in first_page_response: | ||
| first_page_response = None | ||
|
|
@@ -1699,7 +1730,7 @@ def is_job_done(): | |
| query_id=self.query_id, | ||
| first_page_response=first_page_response, | ||
| num_dml_affected_rows=self._query_results.num_dml_affected_rows, | ||
| **kwargs, | ||
| **list_rows_kwargs, | ||
| ) | ||
| rows._preserve_order = _contains_order_by(self.query) | ||
| return rows | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also for my education, why is the timestamp option related to maxResults?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we accept any results in the response, we want to make sure the data is encoded correctly. Without this option, BigQuery encodes timestamps as floating point numbers, which results in a loss of microsecond-level precision.
The reason it matters is that previously we always asked for 0 rows, in which case we didn't include this option.
I suppose we could always included it, but it's not necessary if we don't download any data.