Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google-cloud-pub-sub-grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ gcloud pubsub topics delete testTopic
| Date | Environment | Credential Provider | Result | Notes |
|------|-------------|---------------------|--------|-------|
| 2026-03-13 | GCP (project: pekko-connectors-test) | google-application-default | 14/14 passed | Scala 8/8, Java 6/6. User credentials via `gcloud auth application-default login`. |
| 2026-05-11 | GCP (project: pekko-connectors) | google-application-default | 1/1 passed | New `Subscriber resource` scenario. Verifies eager-pull deadline tracking, subsequent-request field clearing, eager-pull flow control gate, and auto-cleanup. 10 messages, 6s processing, parallelism 2, `maxOutstandingMessages=3`. 35s. |

After running against real GCP, add a row to the table above to record the result.
92 changes: 91 additions & 1 deletion google-cloud-pub-sub-grpc/k8s/GkeFullFeatureTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.gke
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.RestartSettings
import pekko.stream.connectors.googlecloud.pubsub.grpc.{ AckDeadlineDistribution, FlowControl }
import pekko.stream.connectors.googlecloud.pubsub.grpc.{ AckDeadline, AckDeadlineDistribution, FlowControl }
import pekko.stream.connectors.googlecloud.pubsub.grpc.scaladsl.GooglePubSub
import pekko.stream.scaladsl.{ Flow, Sink, Source }
import com.google.protobuf.ByteString
Expand Down Expand Up @@ -62,6 +62,7 @@ object GkeFullFeatureTest {
scenario5_FlowControl(topicFqrs, subFqrs)
scenario6_NackAndRedeliver(topicFqrs, subFqrs)
scenario7_DynamicDeadlineModification(topicFqrs, subFqrs)
scenario8_SubscriberResource(topicFqrs, subFqrs)

println("\n=== ALL SCENARIOS PASSED ===")
Await.result(system.terminate(), 10.seconds)
Expand Down Expand Up @@ -405,4 +406,93 @@ object GkeFullFeatureTest {
assert(msgs.size == messageCount, s"Expected $messageCount messages, got ${msgs.size}")
println(" PASSED")
}

// ---------------------------------------------------------------------------
// Scenario 8: Subscriber resource — exercises all three bug fixes plus
// composition guarantees in a single end-to-end run.
//
// - Sets `maxOutstandingMessages` on the initial StreamingPullRequest.
// Pre-fix this caused INVALID_ARGUMENT on the second polling tick (bug 2).
//
// - Slow per-message processing (8 seconds) with parallelism well below the
// batch size, so messages buffer inside the eager-pull tracker for tens of
// seconds. Pre-fix the tracker would only see the messages currently in
// mapAsync, the rest would expire and get redelivered (bug 1).
//
// - Configures a small FlowControl limit. The new eager-pull gate counts
// messages on receipt rather than on push downstream, so combined with
// server-side maxOutstandingMessages it actually bounds delivery (bug 3).
//
// - Asserts every published message is received exactly once, no
// duplicates from redelivery, and that flowControl.outstandingCount
// reaches the configured limit at some point during the run.
// ---------------------------------------------------------------------------
private def scenario8_SubscriberResource(topicFqrs: String, subFqrs: String)(
implicit system: ActorSystem): Unit = {
println("\n--- Scenario 8: Subscriber resource (high-level API, all bug fixes) ---")

val messageCount = 20
val maxOutstanding = 5
val processingDelay = 8.seconds
val testPrefix = s"scenario8-${System.nanoTime()}"
val messages = (1 to messageCount).map(i =>
PubsubMessage().withData(ByteString.copyFromUtf8(s"$testPrefix-$i")))

Await.result(
Source
.single(PublishRequest(topicFqrs, messages))
.via(GooglePubSub.publish(parallelism = 1))
.runWith(Sink.head),
30.seconds)
println(s" Published $messageCount messages")

val flowControl = FlowControl(maxOutstandingMessages = maxOutstanding.toLong)
var maxObserved = 0L

// Initial request sets BOTH stream ack deadline AND maxOutstandingMessages.
// Pre-bug-2 fix this would fail on the first keepalive tick with INVALID_ARGUMENT.
val request = StreamingPullRequest(subFqrs)
.withStreamAckDeadlineSeconds(15)
.withMaxOutstandingMessages(maxOutstanding.toLong)

val restartSettings = RestartSettings(
minBackoff = 1.second,
maxBackoff = 10.seconds,
randomFactor = 0.2).withMaxRestarts(3, 1.minute)

val subscriber = GooglePubSub.subscriber(
request = request,
pollInterval = 1.second,
ackDeadline = AckDeadline.Fixed(extensionInterval = 5.seconds, deadlineSeconds = 30),
restartSettings = Some(restartSettings),
flowControl = Some(flowControl))

try {
val received = subscriber.source
.filter(_.message.exists(_.data.toStringUtf8.startsWith(testPrefix)))
.take(messageCount)
.mapAsync(parallelism = 2) { msg =>
val current = flowControl.outstandingCount
synchronized { if (current > maxObserved) maxObserved = current }
println(s" Processing: ${msg.message.map(_.data.toStringUtf8).getOrElse("?")} " +
s"(outstanding: $current/$maxOutstanding)")
// Slow processing forces autoExtend to actually fire while messages wait.
pekko.pattern.after(processingDelay)(Future.successful(msg))
}
.map(msg => AcknowledgeRequest(subFqrs, Seq(msg.ackId)))
.runWith(subscriber.acknowledge(parallelism = 1))

Await.result(received, 5.minutes)

println(s" Received and acked all $messageCount messages")
println(s" Max outstanding observed: $maxObserved (limit: $maxOutstanding)")
assert(maxObserved <= maxOutstanding,
s"Flow control violated: observed $maxObserved > limit $maxOutstanding")
assert(maxObserved >= 1L, "Flow control gate never registered any outstanding messages")
println(" PASSED (no redelivery during slow processing, server-side flow control respected, " +
"subsequent StreamingPullRequest accepted by server)")
} finally {
Await.result(subscriber.close(), 10.seconds)
}
}
}
24 changes: 16 additions & 8 deletions google-cloud-pub-sub-grpc/k8s/build-and-push.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ REPO="pekko-test"
IMAGE="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/gke-auth-test:latest"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
ROOT_DIR="$(cd "${SCRIPT_DIR}/../.." && pwd)"
STAGING="${SCRIPT_DIR}/staging"

# Run cleanup unconditionally on exit (success, failure, or interrupt). Without this trap,
# a script failure between the "Building" step and the "Cleaning up" step at the bottom
# leaves GkeAuthTest.scala / GkeFullFeatureTest.scala leaked into src/main/scala/.../gke/
# and a populated staging/ directory.
cleanup() {
rm -rf "${STAGING}"
rm -f "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/gke/GkeAuthTest.scala"
rm -f "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/gke/GkeFullFeatureTest.scala"
rm -f "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/resources/gke-application.conf"
rmdir "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/gke" 2>/dev/null || true
}
trap cleanup EXIT

echo "=== Creating Artifact Registry repo (if needed) ==="
gcloud artifacts repositories create "${REPO}" \
Expand Down Expand Up @@ -51,8 +65,7 @@ echo "=== Packaging ==="
FULL_CP=$(sbt --error "print google-cloud-pub-sub-grpc/fullClasspath" | tr ',' '\n' | sed 's/.*Attributed(\(.*\))/\1/')
CLASSES_DIR=$(sbt --error "print google-cloud-pub-sub-grpc/classDirectory" | tr -d '[:space:]')

# Create staging directory
STAGING="${SCRIPT_DIR}/staging"
# Create staging directory (already declared at the top of the script for the cleanup trap)
rm -rf "${STAGING}"
mkdir -p "${STAGING}/lib"

Expand Down Expand Up @@ -94,10 +107,5 @@ docker build -t "${IMAGE}" "${STAGING}"
echo "=== Pushing to Artifact Registry ==="
docker push "${IMAGE}"

echo "=== Cleaning up ==="
rm -rf "${STAGING}"
rm -f "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/gke/GkeAuthTest.scala"
rm -f "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/gke/GkeFullFeatureTest.scala"
rm -f "${ROOT_DIR}/google-cloud-pub-sub-grpc/src/main/resources/gke-application.conf"

echo "=== Done: ${IMAGE} ==="
# Cleanup runs from the EXIT trap declared at the top of this script.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc

import org.apache.pekko.annotation.ApiMayChange

import scala.concurrent.duration._

/**
* Configuration for how a [[scaladsl.Subscriber]] (or its Java equivalent) extends ack deadlines
* for messages it has received.
*
* - [[AckDeadline.Fixed]] uses a constant deadline value on every extension. Cheapest, easiest
* to reason about. Use this if your processing latency is predictable.
* - [[AckDeadline.Adaptive]] adapts the deadline based on observed processing latencies via a
* shared [[AckDeadlineDistribution]], matching Google's official client library behavior.
* Use this if processing latency varies widely.
*
* @since 2.0.0
*/
@ApiMayChange
sealed trait AckDeadline {
def extensionInterval: FiniteDuration
}

@ApiMayChange
object AckDeadline {

/**
* Extend deadlines on a fixed schedule using a constant deadline value.
*
* @param extensionInterval how often to extend deadlines (should be less than the deadline)
* @param deadlineSeconds the new deadline to set on each extension
* @param maxAckExtensionPeriod maximum total time to keep extending a message's deadline
* (default 60 minutes, matching Google's client library)
*/
final case class Fixed(
extensionInterval: FiniteDuration,
deadlineSeconds: Int,
maxAckExtensionPeriod: FiniteDuration = 60.minutes) extends AckDeadline

/**
* Extend deadlines on a fixed schedule using an adaptive deadline computed from an
* [[AckDeadlineDistribution]]. The same distribution must be passed to acknowledge/nack
* operators so that completion latencies are recorded into the histogram.
*/
final case class Adaptive(
extensionInterval: FiniteDuration,
distribution: AckDeadlineDistribution) extends AckDeadline

/** Java API: fixed-deadline configuration with a default `maxAckExtensionPeriod` of 60 minutes. */
def fixed(extensionInterval: java.time.Duration, deadlineSeconds: Int): AckDeadline =
Fixed(FiniteDuration(extensionInterval.toNanos, NANOSECONDS), deadlineSeconds)

/** Java API: fixed-deadline configuration with an explicit `maxAckExtensionPeriod`. */
def fixed(extensionInterval: java.time.Duration, deadlineSeconds: Int,
maxAckExtensionPeriod: java.time.Duration): AckDeadline =
Fixed(FiniteDuration(extensionInterval.toNanos, NANOSECONDS), deadlineSeconds,
FiniteDuration(maxAckExtensionPeriod.toNanos, NANOSECONDS))

/** Java API: adaptive-deadline configuration. */
def adaptive(extensionInterval: java.time.Duration,
distribution: AckDeadlineDistribution): AckDeadline =
Adaptive(FiniteDuration(extensionInterval.toNanos, NANOSECONDS), distribution)
}
Loading