Skip to content

NullPointerException while waiting for query result on BigQuery #1689

@Astrac

Description

@Astrac

I have a complex application and I am using Apache Flink to stream events from Kafka to BigQuery; the application queries the dataset first in order to get the latest offset for each topic/partition. This is how the code running the query looks like:

var response = retryUntilSuccess(maxFetchOffsetAttempts, maxFetchOffsetDelay) { () =>
  Thread.sleep(Random.nextInt(maxFetchOffsetDelay.toMillis.toInt))
  bigQuery.query(
    QueryRequest
      .newBuilder(offsetsQuery)
      .setUseLegacySql(false)
      .setDefaultDataset(config.dataset)
      .build()
  )
}

while (!response.jobCompleted()) {
  logger.warn(
    s"Offsets for ${record.tableName} are not ready, will retry in 1 second, jobId: ${response.getJobId()}")
  Thread.sleep(1000)
  response = bigQuery.getQueryResults(response.getJobId())
}

val result = response.getResult()

This is written in Scala but it is mostly a translation of the example in the Javadocs; the retryUntilSuccess function retries the request multiple time and there is a random wait (30 seconds at this moment) before the request so to handle the fact that many queries will fire in parallel and may result in hitting API limits.

When I run this code I almost always get the following exception:

java.lang.NullPointerException
	at com.google.cloud.bigquery.JobId.fromPb(JobId.java:111)
	at com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:635)
	at com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:619)
	at io.chumps.dataprocessing.sinks.BigQuerySink$$anon$1.startingOffsets$lzycompute(BigQuerySink.scala:89)
	at io.chumps.dataprocessing.sinks.BigQuerySink$$anon$1.startingOffsets(BigQuerySink.scala:73)
	at io.chumps.dataprocessing.sinks.BigQuerySink$$anon$1.open(BigQuerySink.scala:136)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
	at java.lang.Thread.run(Thread.java:745)

The line in our code where this exception happens is where I call getQueryResults and I am sure that I am not sending a null in there as I can see the job id in the logs before the failure; I also checked that the response doesn't contain any error by calling getExecutionErrors. I say that "it almost always fails" because over some tens attempts it managed to run a couple of times without incurring in the exception.

Could this be a bug in the way the API limits are hadled by the client? Do you have any advice to work around this issue?

Thanks!

Metadata

Metadata

Assignees

Labels

api: bigqueryIssues related to the BigQuery API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions