From 4d7154705c39bdae20adb267ef2d2b0eef43b45d Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 23 Aug 2025 01:32:41 +0000 Subject: [PATCH] Refactor load_table methods to reduce code duplication This commit refactors the `load_table_from_uri`, `load_table_from_file`, `load_table_from_dataframe`, and `load_table_from_json` methods in the BigQuery client. A new private helper method, `_prepare_load_job`, has been introduced to centralize the creation and configuration of `LoadJob` objects. This eliminates redundant code for setting up job IDs, projects, and locations. A second helper, `_prepare_load_config`, was also added to standardize the handling of `LoadJobConfig` objects, merging user-provided configurations with client defaults. These changes make the loading methods more concise, easier to read, and simpler to maintain. --- google/cloud/bigquery/client.py | 141 +++++++++++++++++--------------- 1 file changed, 73 insertions(+), 68 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 4ca2cb428..3e1aca962 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2490,6 +2490,51 @@ def api_request(*args, **kwargs): page_size=page_size, ) + def _prepare_load_config( + self, job_config: Optional[LoadJobConfig] = None + ) -> LoadJobConfig: + """Helper to construct a load job configuration. + + Args: + job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + A user-supplied job configuration. + + Returns: + google.cloud.bigquery.job.LoadJobConfig: + The job configuration to use for a load job. + """ + if job_config is not None: + _verify_job_config_type(job_config, LoadJobConfig) + else: + job_config = job.LoadJobConfig() + + return job_config._fill_from_default(self._default_load_job_config) + + def _prepare_load_job( + self, + destination: Union[Table, TableReference, str], + job_config: LoadJobConfig, + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, + location: Optional[str] = None, + project: Optional[str] = None, + source_uris: Optional[Sequence[str]] = None, + ) -> job.LoadJob: + """Helper for `load_table_from_` methods to prepare a LoadJob.""" + job_id = _make_job_id(job_id, job_id_prefix) + + if project is None: + project = self.project + + if location is None: + location = self.location + + job_ref = job._JobReference(job_id, project=project, location=location) + + destination = _table_arg_to_table_ref(destination, default_project=self.project) + + return job.LoadJob(job_ref, source_uris, destination, self, job_config) + def load_table_from_uri( self, source_uris: Union[str, Sequence[str]], @@ -2547,31 +2592,21 @@ def load_table_from_uri( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ - job_id = _make_job_id(job_id, job_id_prefix) - - if project is None: - project = self.project - - if location is None: - location = self.location - - job_ref = job._JobReference(job_id, project=project, location=location) - if isinstance(source_uris, str): source_uris = [source_uris] - destination = _table_arg_to_table_ref(destination, default_project=self.project) - - if job_config is not None: - _verify_job_config_type(job_config, LoadJobConfig) - else: - job_config = job.LoadJobConfig() - - new_job_config = job_config._fill_from_default(self._default_load_job_config) + new_job_config = self._prepare_load_config(job_config) + load_job = self._prepare_load_job( + destination, + new_job_config, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + source_uris=source_uris, + ) - load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config) load_job._begin(retry=retry, timeout=timeout) - return load_job def load_table_from_file( @@ -2647,25 +2682,15 @@ def load_table_from_file( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ - job_id = _make_job_id(job_id, job_id_prefix) - - if project is None: - project = self.project - - if location is None: - location = self.location - - destination = _table_arg_to_table_ref(destination, default_project=self.project) - job_ref = job._JobReference(job_id, project=project, location=location) - - if job_config is not None: - _verify_job_config_type(job_config, LoadJobConfig) - else: - job_config = job.LoadJobConfig() - - new_job_config = job_config._fill_from_default(self._default_load_job_config) - - load_job = job.LoadJob(job_ref, None, destination, self, new_job_config) + new_job_config = self._prepare_load_config(job_config) + load_job = self._prepare_load_job( + destination, + new_job_config, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + ) job_resource = load_job.to_api_repr() if rewind: @@ -2796,14 +2821,7 @@ def load_table_from_dataframe( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ - job_id = _make_job_id(job_id, job_id_prefix) - - if job_config is not None: - _verify_job_config_type(job_config, LoadJobConfig) - else: - job_config = job.LoadJobConfig() - - new_job_config = job_config._fill_from_default(self._default_load_job_config) + new_job_config = self._prepare_load_config(job_config) supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET} if new_job_config.source_format is None: @@ -2830,9 +2848,6 @@ def load_table_from_dataframe( # pyarrow is now the only supported parquet engine. raise ValueError("This method requires pyarrow to be installed") - if location is None: - location = self.location - # If table schema is not provided, we try to fetch the existing table # schema, and check if dataframe schema is compatible with it - except # for WRITE_TRUNCATE jobs, the existing schema does not matter then. @@ -2877,8 +2892,14 @@ def load_table_from_dataframe( stacklevel=2, ) + # We need a unique suffix for every load_table_from_dataframe call to + # avoid collisions. + # See: https://github.com/googleapis/python-bigquery/issues/1363 + session_suffix = uuid.uuid4().hex tmpfd, tmppath = tempfile.mkstemp( - suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower()) + suffix="_job_{}.{}".format( + session_suffix, new_job_config.source_format.lower() + ) ) os.close(tmpfd) @@ -3012,15 +3033,7 @@ def load_table_from_json( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ - job_id = _make_job_id(job_id, job_id_prefix) - - if job_config is not None: - _verify_job_config_type(job_config, LoadJobConfig) - else: - job_config = job.LoadJobConfig() - - new_job_config = job_config._fill_from_default(self._default_load_job_config) - + new_job_config = self._prepare_load_config(job_config) new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON # In specific conditions, we check if the table alread exists, and/or @@ -3040,14 +3053,6 @@ def load_table_from_json( else: new_job_config.autodetect = False - if project is None: - project = self.project - - if location is None: - location = self.location - - destination = _table_arg_to_table_ref(destination, default_project=self.project) - data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows) encoded_str = data_str.encode() data_file = io.BytesIO(encoded_str)