diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b222b358f547..fc42a16409eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2105,9 +2105,12 @@ public static void readSource( // the same order. BoundedSource.BoundedReader reader = streamSource.createReader(options); + T current = null; + boolean hasCurrent = false; try { if (reader.start()) { - outputReceiver.get(rowTag).output(reader.getCurrent()); + current = reader.getCurrent(); + hasCurrent = true; } else { return; } @@ -2120,11 +2123,17 @@ public static void readSource( (Exception) e.getCause(), "Unable to parse record reading from BigQuery"); } + if (hasCurrent) { + outputReceiver.get(rowTag).output(current); + } while (true) { + current = null; + hasCurrent = false; try { if (reader.advance()) { - outputReceiver.get(rowTag).output(reader.getCurrent()); + current = reader.getCurrent(); + hasCurrent = true; } else { return; } @@ -2137,6 +2146,9 @@ public static void readSource( (Exception) e.getCause(), "Unable to parse record reading from BigQuery"); } + if (hasCurrent) { + outputReceiver.get(rowTag).output(current); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 70be676c75fe..1162db9e1a28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -602,8 +602,9 @@ public void instantiateHealthcareClient() throws IOException { @ProcessElement public void processElement(ProcessContext context) { String resourceId = context.element(); + String resource = null; try { - context.output(fetchResource(this.client, resourceId)); + resource = java.util.Objects.requireNonNull(fetchResource(this.client, resourceId)); } catch (Exception e) { READ_RESOURCE_ERRORS.inc(); LOG.warn( @@ -612,6 +613,9 @@ public void processElement(ProcessContext context) { e); context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e)); } + if (resource != null) { + context.output(resource); + } } private String fetchResource(HealthcareApiClient client, String resourceName) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java index 3647ef7671eb..496b39a923f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java @@ -365,11 +365,15 @@ public void instantiateHealthcareClient() throws IOException { @ProcessElement public void processElement(ProcessContext context) { String msgId = context.element(); + HL7v2Message message = null; try { - context.output(client.fetchMessage(msgId)); + message = java.util.Objects.requireNonNull(client.fetchMessage(msgId)); } catch (Exception e) { context.output(HL7v2IO.Read.DEAD_LETTER, HealthcareIOError.of(msgId, e)); } + if (message != null) { + context.output(message); + } } } } @@ -487,15 +491,20 @@ public void instantiateHealthcareClient() throws IOException { @ProcessElement public void processElement(ProcessContext context) { String msgId = context.element().getHl7v2MessageId(); + HL7v2ReadResponse response = null; try { - HL7v2ReadResponse response = - HL7v2ReadResponse.of(context.element().getMetadata(), client.fetchMessage(msgId)); - context.output(response); + response = + java.util.Objects.requireNonNull( + HL7v2ReadResponse.of( + context.element().getMetadata(), client.fetchMessage(msgId))); } catch (Exception e) { HealthcareIOError error = HealthcareIOError.of(context.element(), e); context.output(HL7v2IO.HL7v2Read.DEAD_LETTER, error); } + if (response != null) { + context.output(response); + } } } }