diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index db7f36f3d93e..57ced8fac0c1 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -187,29 +187,50 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) -def dataframe_to_bq_schema(dataframe): +def dataframe_to_bq_schema(dataframe, bq_schema): """Convert a pandas DataFrame schema to a BigQuery schema. - TODO(GH#8140): Add bq_schema argument to allow overriding autodetected - schema for a subset of columns. - Args: dataframe (pandas.DataFrame): - DataFrame to convert to convert to Parquet file. + DataFrame for which the client determines the BigQuery schema. + bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + A BigQuery schema. Use this argument to override the autodetected + type for some or all of the DataFrame columns. Returns: Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: The automatically determined schema. Returns None if the type of any column cannot be determined. """ - bq_schema = [] + if bq_schema: + for field in bq_schema: + if field.field_type in schema._STRUCT_TYPES: + raise ValueError( + "Uploading dataframes with struct (record) column types " + "is not supported. See: " + "https://github.com/googleapis/google-cloud-python/issues/8191" + ) + bq_schema_index = {field.name: field for field in bq_schema} + else: + bq_schema_index = {} + + bq_schema_out = [] for column, dtype in zip(dataframe.columns, dataframe.dtypes): + # Use provided type from schema, if present. + bq_field = bq_schema_index.get(column) + if bq_field: + bq_schema_out.append(bq_field) + continue + + # Otherwise, try to automatically determine the type based on the + # pandas dtype. bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) if not bq_type: + warnings.warn("Unable to determine type of column '{}'.".format(column)) return None bq_field = schema.SchemaField(column, bq_type) - bq_schema.append(bq_field) - return tuple(bq_schema) + bq_schema_out.append(bq_field) + return tuple(bq_schema_out) def dataframe_to_arrow(dataframe, bq_schema): @@ -217,7 +238,7 @@ def dataframe_to_arrow(dataframe, bq_schema): Args: dataframe (pandas.DataFrame): - DataFrame to convert to convert to Parquet file. + DataFrame to convert to Arrow table. bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. @@ -255,7 +276,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN Args: dataframe (pandas.DataFrame): - DataFrame to convert to convert to Parquet file. + DataFrame to convert to Parquet file. bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 1b13ee126a5d..da169cb55bf2 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -61,7 +61,6 @@ from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.routine import Routine from google.cloud.bigquery.routine import RoutineReference -from google.cloud.bigquery.schema import _STRUCT_TYPES from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import _table_arg_to_table from google.cloud.bigquery.table import _table_arg_to_table_ref @@ -1532,28 +1531,14 @@ def load_table_from_dataframe( if location is None: location = self.location - if not job_config.schema: - autodetected_schema = _pandas_helpers.dataframe_to_bq_schema(dataframe) - - # Only use an explicit schema if we were able to determine one - # matching the dataframe. If not, fallback to the pandas to_parquet - # method. - if autodetected_schema: - job_config.schema = autodetected_schema + job_config.schema = _pandas_helpers.dataframe_to_bq_schema( + dataframe, job_config.schema + ) tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) os.close(tmpfd) try: - if job_config.schema: - for field in job_config.schema: - if field.field_type in _STRUCT_TYPES: - raise ValueError( - "Uploading dataframes with struct (record) column types " - "is not supported. See: " - "https://github.com/googleapis/google-cloud-python/issues/8191" - ) - if pyarrow and job_config.schema: if parquet_compression == "snappy": # adjust the default value parquet_compression = parquet_compression.upper() diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 59a72297ed87..1422c3c7cb60 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -743,21 +743,22 @@ def test_load_table_from_dataframe_w_nulls(self): ) num_rows = 100 nulls = [None] * num_rows - dataframe = pandas.DataFrame( - { - "bool_col": nulls, - "bytes_col": nulls, - "date_col": nulls, - "dt_col": nulls, - "float_col": nulls, - "geo_col": nulls, - "int_col": nulls, - "num_col": nulls, - "str_col": nulls, - "time_col": nulls, - "ts_col": nulls, - } + df_data = collections.OrderedDict( + [ + ("bool_col", nulls), + ("bytes_col", nulls), + ("date_col", nulls), + ("dt_col", nulls), + ("float_col", nulls), + ("geo_col", nulls), + ("int_col", nulls), + ("num_col", nulls), + ("str_col", nulls), + ("time_col", nulls), + ("ts_col", nulls), + ] ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) @@ -796,7 +797,7 @@ def test_load_table_from_dataframe_w_required(self): ) records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}] - dataframe = pandas.DataFrame(records) + dataframe = pandas.DataFrame(records, columns=["name", "age"]) job_config = bigquery.LoadJobConfig(schema=table_schema) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) @@ -847,44 +848,58 @@ def test_load_table_from_dataframe_w_explicit_schema(self): # https://jira.apache.org/jira/browse/ARROW-2587 # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), ) - dataframe = pandas.DataFrame( - { - "bool_col": [True, None, False], - "bytes_col": [b"abc", None, b"def"], - "date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], - "dt_col": [ - datetime.datetime(1, 1, 1, 0, 0, 0), - None, - datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), - ], - "float_col": [float("-inf"), float("nan"), float("inf")], - "geo_col": [ - "POINT(30 10)", - None, - "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", - ], - "int_col": [-9223372036854775808, None, 9223372036854775807], - "num_col": [ - decimal.Decimal("-99999999999999999999999999999.999999999"), - None, - decimal.Decimal("99999999999999999999999999999.999999999"), - ], - "str_col": ["abc", None, "def"], - "time_col": [ - datetime.time(0, 0, 0), - None, - datetime.time(23, 59, 59, 999999), - ], - "ts_col": [ - datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), - None, - datetime.datetime( - 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc - ), - ], - }, - dtype="object", + df_data = collections.OrderedDict( + [ + ("bool_col", [True, None, False]), + ("bytes_col", [b"abc", None, b"def"]), + ( + "date_col", + [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], + ), + ( + "dt_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + ), + ("float_col", [float("-inf"), float("nan"), float("inf")]), + ( + "geo_col", + [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ("int_col", [-9223372036854775808, None, 9223372036854775807]), + ( + "num_col", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + ), + ("str_col", [u"abc", None, u"def"]), + ( + "time_col", + [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], + ), + ( + "ts_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + ), + ] ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 8a2a1228cd65..1fd6d87487ae 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5230,7 +5230,7 @@ def test_load_table_from_dataframe(self): from google.cloud.bigquery import job client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) load_patch = mock.patch( @@ -5265,7 +5265,7 @@ def test_load_table_from_dataframe_w_client_location(self): from google.cloud.bigquery import job client = self._make_client(location=self.LOCATION) - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) load_patch = mock.patch( @@ -5300,7 +5300,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self): from google.cloud.bigquery import job client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) job_config = job.LoadJobConfig() @@ -5432,6 +5432,192 @@ def test_load_table_from_dataframe_struct_fields_error(self): assert "struct" in err_msg assert "not support" in err_msg + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_partial_schema(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + df_data = collections.OrderedDict( + [ + ("int_col", [1, 2, 3]), + ("int_as_float_col", [1.0, float("nan"), 3.0]), + ("float_col", [1.0, 2.0, 3.0]), + ("bool_col", [True, False, True]), + ( + "dt_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ), + ), + ( + "ts_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc), + ), + ("string_col", [u"abc", None, u"def"]), + ("bytes_col", [b"abc", b"def", None]), + ] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + schema = ( + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("string_col", "STRING"), + SchemaField("bytes_col", "BYTES"), + ) + job_config = job.LoadJobConfig(schema=schema) + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert tuple(sent_config.schema) == ( + SchemaField("int_col", "INTEGER"), + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("float_col", "FLOAT"), + SchemaField("bool_col", "BOOLEAN"), + SchemaField("dt_col", "DATETIME"), + SchemaField("ts_col", "TIMESTAMP"), + SchemaField("string_col", "STRING"), + SchemaField("bytes_col", "BYTES"), + ) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_partial_schema_extra_types(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + df_data = collections.OrderedDict( + [ + ("int_col", [1, 2, 3]), + ("int_as_float_col", [1.0, float("nan"), 3.0]), + ("string_col", [u"abc", None, u"def"]), + ] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + schema = ( + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("string_col", "STRING"), + SchemaField("unknown_col", "BYTES"), + ) + job_config = job.LoadJobConfig(schema=schema) + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert tuple(sent_config.schema) == ( + SchemaField("int_col", "INTEGER"), + SchemaField("int_as_float_col", "INTEGER"), + SchemaField("string_col", "STRING"), + ) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_partial_schema_missing_types(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + df_data = collections.OrderedDict( + [ + ("string_col", [u"abc", u"def", u"ghi"]), + ("unknown_col", [b"jkl", None, b"mno"]), + ] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + schema = (SchemaField("string_col", "STRING"),) + job_config = job.LoadJobConfig(schema=schema) + with load_patch as load_table_from_file, warnings.catch_warnings( + record=True + ) as warned: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + assert warned # there should be at least one warning + unknown_col_warnings = [ + warning for warning in warned if "unknown_col" in str(warning) + ] + assert unknown_col_warnings + assert unknown_col_warnings[0].category == UserWarning + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert sent_config.schema is None + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): @@ -5440,8 +5626,8 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] - dataframe = pandas.DataFrame(records) + records = [{"name": u"Monty", "age": 100}, {"name": u"Python", "age": 60}] + dataframe = pandas.DataFrame(records, columns=["name", "age"]) schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) job_config = job.LoadJobConfig(schema=schema) @@ -5486,7 +5672,7 @@ def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"name": u"Monty", "age": 100}, {"name": u"Python", "age": 60}] dataframe = pandas.DataFrame(records) schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) job_config = job.LoadJobConfig(schema=schema) @@ -5516,7 +5702,7 @@ def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self): client = self._make_client() - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) load_patch = mock.patch( @@ -5553,7 +5739,7 @@ def test_load_table_from_dataframe_w_nulls(self): client = self._make_client() records = [{"name": None, "age": None}, {"name": None, "age": None}] - dataframe = pandas.DataFrame(records) + dataframe = pandas.DataFrame(records, columns=["name", "age"]) schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] job_config = job.LoadJobConfig(schema=schema)