diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 11064375d62e..690f231aa801 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "37345", - "modification": 49 -} + "modification": 50 +} diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 4ef6c392254b..7bad85b5cb6f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -171,6 +171,15 @@ def _bq_uuid(seed=None): return str(hashlib.md5(seed.encode('utf8')).hexdigest()) +def _bq_uuid_list(list): + cs = hashlib.sha256() + for item in list: + cs.update(bigquery_tools.get_hashable_destination(item).encode('utf-8')) + # separator + cs.update(b'\x00') + return cs.hexdigest() + + class _ShardDestinations(beam.DoFn): """Adds a shard number to the key of the KV element. @@ -589,9 +598,7 @@ def process( write_disposition = 'WRITE_APPEND' wait_for_job = False - chunk_job_name = copy_job_name_base - if len(chunks) > 1: - chunk_job_name = f"{copy_job_name_base}_{i}" + chunk_job_name = '%s_%s' % (copy_job_name_base, _bq_uuid_list(chunk)) _LOGGER.info( "Triggering copy job %s from %s to %s (write_disposition: %s)",