From 4cac3681e4af1020c8ddbd5e6bfffce812c22e0c Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 26 Feb 2026 15:56:42 -0500 Subject: [PATCH 1/9] test for map forwarder using simple buffer Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index 91a0c2cdba..805e49cb20 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -359,6 +359,189 @@ where Ok((task, isb_reader)) } +#[cfg(test)] +mod simplebuffer_tests { + use super::*; + use std::collections::HashMap; + use std::sync::Arc; + use std::time::Duration; + + use bytes::Bytes; + use chrono::Utc; + use numaflow::map; + use numaflow_shared::server_info::MapMode; + use numaflow_testing::simplebuffer::SimpleBuffer; + use tokio_util::sync::CancellationToken; + + use crate::config::pipeline::isb::BufferFullStrategy::RetryUntilSuccess; + use crate::config::pipeline::isb::{BufferReaderConfig, BufferWriterConfig, Stream}; + use crate::config::pipeline::{ToVertexConfig, VertexType}; + use crate::mapper::test_utils::MapperTestHandle; + use crate::message::{IntOffset, Message, MessageID, Offset}; + use crate::pipeline::isb::ISBWriter; + use crate::pipeline::isb::reader::{ISBReaderComponents, ISBReaderOrchestrator}; + use crate::pipeline::isb::simplebuffer::{SimpleBufferAdapter, WithSimpleBuffer}; + use crate::pipeline::isb::writer::{ISBWriterOrchestrator, ISBWriterOrchestratorComponents}; + use crate::tracker::Tracker; + + struct SimpleCat; + + #[tonic::async_trait] + impl map::Mapper for SimpleCat { + async fn map(&self, input: map::MapRequest) -> Vec { + vec![map::Message::new(input.value).with_keys(input.keys)] + } + } + + /// Helper to write test messages into a SimpleBufferAdapter via its ISBWriter. + async fn write_test_messages(adapter: &SimpleBufferAdapter, count: usize) { + let writer = adapter.writer(); + for i in 0..count { + let msg = Message { + typ: Default::default(), + keys: Arc::from(vec![format!("key-{}", i)]), + tags: None, + value: Bytes::from(format!("payload-{}", i)), + offset: Offset::Int(IntOffset::new(i as i64, 0)), + event_time: Utc::now(), + watermark: None, + id: MessageID { + vertex_name: "test-in".into(), + index: i as i32, + offset: format!("{}", i).into(), + }, + headers: Arc::new(HashMap::new()), + metadata: None, + is_late: false, + ack_handle: None, + }; + writer.write(msg).await.expect("write should succeed"); + } + } + + // End-to-end test for map forwarder using SimpleBuffer. + // Reads from a SimpleBuffer-backed ISB, maps through a cat UDF, and writes + // to another SimpleBuffer-backed ISB. No external infrastructure required. + #[tokio::test(flavor = "multi_thread")] + async fn test_map_forwarder_with_single_stream() { + const MESSAGE_COUNT: usize = 10; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + // Input buffer + let input_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "input-buffer")); + + // Write all the messages to the input buffer so that mapper can read these + write_test_messages(&input_adapter, MESSAGE_COUNT).await; + + // Output buffer + let output_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "output-buffer")); + + // ISB Reader Orchestrator + let input_stream = Stream::new("input-buffer", "test-in", 0); + let buf_reader_config = BufferReaderConfig { + streams: vec![input_stream.clone()], + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let reader_components = ISBReaderComponents { + vertex_type: "Map".to_string(), + stream: input_stream, + config: buf_reader_config, + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, input_adapter.reader(), None) + .await + .unwrap(); + + // ISB Writer Orchestrator + let output_stream = Stream::new("output-buffer", "test-out", 0); + let writer_config = BufferWriterConfig { + streams: vec![output_stream.clone()], + buffer_full_strategy: RetryUntilSuccess, + ..Default::default() + }; + + let mut writers = HashMap::new(); + writers.insert(output_stream.name, output_adapter.writer()); + + let writer_components = ISBWriterOrchestratorComponents:: { + config: vec![ToVertexConfig { + name: "test-out", + partitions: 1, + writer_config, + conditions: None, + to_vertex_type: VertexType::Sink, + }], + writers, + paf_concurrency: 100, + watermark_handle: None, + vertex_type: VertexType::MapUDF, + }; + + let isb_writer = ISBWriterOrchestrator::::new(writer_components); + + // Mapper + let MapperTestHandle { + mapper, + server_handle: _server_handle, + } = MapperTestHandle::create_mapper( + SimpleCat, + tracker.clone(), + MapMode::Unary, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + // Create and start the MapForwarder + let forwarder = MapForwarder::::new(isb_reader, mapper, isb_writer).await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + + // Wait until all messages appear in the output buffer + let result = tokio::time::timeout(Duration::from_secs(10), async { + loop { + if output_adapter.pending_count() >= MESSAGE_COUNT { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await; + + assert!( + result.is_ok(), + "Timed out waiting for messages in output buffer. Got {} of {}", + output_adapter.pending_count(), + MESSAGE_COUNT, + ); + + assert_eq!( + output_adapter.pending_count(), + MESSAGE_COUNT, + "All messages should be forwarded to the output buffer" + ); + + // Shutdown + cln_token.cancel(); + forwarder_handle.abort(); + } +} + #[cfg(test)] mod tests { use super::*; From b96ceaaa538d30c755d6a4d6f58fb65a2dfe2145 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 26 Feb 2026 15:58:40 -0500 Subject: [PATCH 2/9] update test doc comment Signed-off-by: Vaibhav Tiwari --- rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index 805e49cb20..4df1b2775e 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -421,7 +421,7 @@ mod simplebuffer_tests { // End-to-end test for map forwarder using SimpleBuffer. // Reads from a SimpleBuffer-backed ISB, maps through a cat UDF, and writes - // to another SimpleBuffer-backed ISB. No external infrastructure required. + // to another SimpleBuffer-backed ISB #[tokio::test(flavor = "multi_thread")] async fn test_map_forwarder_with_single_stream() { const MESSAGE_COUNT: usize = 10; From 02897313b4115f6616b501635c84d02fa327b6a1 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 26 Feb 2026 16:23:41 -0500 Subject: [PATCH 3/9] Add test with reader and writer using same simplebuffer Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 122 +++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index 4df1b2775e..8c4c215219 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -421,9 +421,9 @@ mod simplebuffer_tests { // End-to-end test for map forwarder using SimpleBuffer. // Reads from a SimpleBuffer-backed ISB, maps through a cat UDF, and writes - // to another SimpleBuffer-backed ISB + // to a DIFFERENT SimpleBuffer-backed ISB #[tokio::test(flavor = "multi_thread")] - async fn test_map_forwarder_with_single_stream() { + async fn test_map_forwarder_with_single_stream_different_isb() { const MESSAGE_COUNT: usize = 10; let cln_token = CancellationToken::new(); @@ -540,6 +540,124 @@ mod simplebuffer_tests { cln_token.cancel(); forwarder_handle.abort(); } + + // End-to-end test for map forwarder using SimpleBuffer. + // Reads from a SimpleBuffer-backed ISB, maps through a cat UDF, and writes + // to the SAME SimpleBuffer-backed ISB + #[tokio::test(flavor = "multi_thread")] + async fn test_map_forwarder_with_single_stream_same_isb() { + const MESSAGE_COUNT: usize = 10; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + let simple_buffer = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "simple-buffer")); + + // Write all the messages to the input buffer so that mapper can read these + write_test_messages(&simple_buffer, MESSAGE_COUNT).await; + + // ISB Reader Orchestrator + let input_stream = Stream::new("input-buffer", "test-in", 0); + let buf_reader_config = BufferReaderConfig { + streams: vec![input_stream.clone()], + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let reader_components = ISBReaderComponents { + vertex_type: "Map".to_string(), + stream: input_stream, + config: buf_reader_config, + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, simple_buffer.reader(), None) + .await + .unwrap(); + + // ISB Writer Orchestrator + let output_stream = Stream::new("output-buffer", "test-out", 0); + let writer_config = BufferWriterConfig { + streams: vec![output_stream.clone()], + buffer_full_strategy: RetryUntilSuccess, + ..Default::default() + }; + + let mut writers = HashMap::new(); + writers.insert(output_stream.name, simple_buffer.writer()); + + let writer_components = ISBWriterOrchestratorComponents:: { + config: vec![ToVertexConfig { + name: "test-out", + partitions: 1, + writer_config, + conditions: None, + to_vertex_type: VertexType::Sink, + }], + writers, + paf_concurrency: 100, + watermark_handle: None, + vertex_type: VertexType::MapUDF, + }; + + let isb_writer = ISBWriterOrchestrator::::new(writer_components); + + // Mapper + let MapperTestHandle { + mapper, + server_handle: _server_handle, + } = MapperTestHandle::create_mapper( + SimpleCat, + tracker.clone(), + MapMode::Unary, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + // Create and start the MapForwarder + let forwarder = MapForwarder::::new(isb_reader, mapper, isb_writer).await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + + // Wait until all messages appear in the output buffer + let result = tokio::time::timeout(Duration::from_secs(10), async { + loop { + if simple_buffer.pending_count() >= MESSAGE_COUNT { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await; + + assert!( + result.is_ok(), + "Timed out waiting for messages in output buffer. Got {} of {}", + simple_buffer.pending_count(), + MESSAGE_COUNT, + ); + + assert_eq!( + simple_buffer.pending_count(), + MESSAGE_COUNT, + "All messages should be forwarded to the output buffer" + ); + + // Shutdown + cln_token.cancel(); + forwarder_handle.abort(); + } } #[cfg(test)] From d4e913af755c79ed5dda723c453ab5bda955e894 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sat, 28 Feb 2026 17:04:43 -0500 Subject: [PATCH 4/9] Add a simple buffer test with multiple streams Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 163 +++++++++++++++++- 1 file changed, 161 insertions(+), 2 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index 8c4c215219..4a9838f6f8 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -360,7 +360,7 @@ where } #[cfg(test)] -mod simplebuffer_tests { +mod simple_buffer_tests { use super::*; use std::collections::HashMap; use std::sync::Arc; @@ -393,7 +393,7 @@ mod simplebuffer_tests { } } - /// Helper to write test messages into a SimpleBufferAdapter via its ISBWriter. + /// Helper to create and write test messages into a SimpleBufferAdapter via its ISBWriter. async fn write_test_messages(adapter: &SimpleBufferAdapter, count: usize) { let writer = adapter.writer(); for i in 0..count { @@ -658,6 +658,165 @@ mod simplebuffer_tests { cln_token.cancel(); forwarder_handle.abort(); } + + // End-to-end test for map forwarder using SimpleBuffer. + // Reads from an ISB backed by multiple SimpleBuffers, maps through a cat UDF, and writes + // to the SAME ISB + #[tokio::test(flavor = "multi_thread")] + async fn test_map_forwarder_with_multi_stream_same_isb() { + const MESSAGE_COUNT: usize = 10; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + // Mapper + let MapperTestHandle { + mapper, + server_handle: _server_handle, + } = MapperTestHandle::create_mapper( + SimpleCat, + tracker.clone(), + MapMode::Unary, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + let simple_buffers = vec![ + SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "simple-buffer-0")), + SimpleBufferAdapter::new(SimpleBuffer::new(1000, 1, "simple-buffer-1")), + SimpleBufferAdapter::new(SimpleBuffer::new(1000, 2, "simple-buffer-2")), + SimpleBufferAdapter::new(SimpleBuffer::new(1000, 3, "simple-buffer-3")), + SimpleBufferAdapter::new(SimpleBuffer::new(1000, 4, "simple-buffer-4")), + ]; + + // Write all the messages to the input buffer so that mapper can read these + for simple_buffer in &simple_buffers { + write_test_messages(simple_buffer, MESSAGE_COUNT).await; + } + + let output_streams = vec![ + Stream::new("default-test-forwarder-for-map-vertex-out-0", "test", 0), + Stream::new("default-test-forwarder-for-map-vertex-out-1", "test", 1), + Stream::new("default-test-forwarder-for-map-vertex-out-2", "test", 2), + Stream::new("default-test-forwarder-for-map-vertex-out-3", "test", 3), + Stream::new("default-test-forwarder-for-map-vertex-out-4", "test", 4), + ]; + + let mut writers = HashMap::new(); + let writer_config = BufferWriterConfig { + streams: output_streams.clone(), + buffer_full_strategy: RetryUntilSuccess, + ..Default::default() + }; + // ISB Writer Orchestrator + for (i, output_stream) in (&output_streams).iter().enumerate() { + writers.insert(output_stream.name, simple_buffers[i].writer()); + } + + let writer_components = ISBWriterOrchestratorComponents:: { + config: vec![ToVertexConfig { + name: "test-out", + partitions: 5, + writer_config, + conditions: None, + to_vertex_type: VertexType::Sink, + }], + writers, + paf_concurrency: 100, + watermark_handle: None, + vertex_type: VertexType::MapUDF, + }; + + let isb_writer = ISBWriterOrchestrator::::new(writer_components); + + // Unique names for the streams we use in this test + let input_streams = vec![ + Stream::new("default-test-forwarder-for-map-vertex-in-0", "test", 0), + Stream::new("default-test-forwarder-for-map-vertex-in-1", "test", 1), + Stream::new("default-test-forwarder-for-map-vertex-in-2", "test", 2), + Stream::new("default-test-forwarder-for-map-vertex-in-3", "test", 3), + Stream::new("default-test-forwarder-for-map-vertex-in-4", "test", 4), + ]; + + let buf_reader_config = BufferReaderConfig { + streams: input_streams.clone(), + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let mut forwarder_handles = vec![]; + for (i, input_stream) in (&input_streams).iter().enumerate() { + // ISB Reader Orchestrator + + let reader_components = ISBReaderComponents { + vertex_type: "Map".to_string(), + stream: input_stream.clone(), + config: buf_reader_config.clone(), + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, simple_buffers[i].reader(), None) + .await + .unwrap(); + + // Create and start the MapForwarder + let forwarder = MapForwarder::::new( + isb_reader, + mapper.clone(), + isb_writer.clone(), + ) + .await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = + tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + forwarder_handles.push(forwarder_handle); + } + + // Wait until all messages appear in the output buffer + let result = tokio::time::timeout(Duration::from_secs(10), async { + for simple_buffer in &simple_buffers { + loop { + if simple_buffer.pending_count() >= MESSAGE_COUNT { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + }) + .await; + + for simple_buffer in simple_buffers { + assert!( + result.is_ok(), + "Timed out waiting for messages in output buffer. Got {} of {}", + simple_buffer.pending_count(), + MESSAGE_COUNT, + ); + + assert_eq!( + simple_buffer.pending_count(), + MESSAGE_COUNT, + "All messages should be forwarded to the output buffer" + ); + } + + // Shutdown + cln_token.cancel(); + for forwarder_handle in forwarder_handles { + forwarder_handle.abort(); + } + } } #[cfg(test)] From c8c21d97dfe5eb8e434dc43ab831efb14593241b Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sat, 28 Feb 2026 19:53:28 -0500 Subject: [PATCH 5/9] Update the multi stream test to use large message count Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 130 +++++++++++++----- 1 file changed, 95 insertions(+), 35 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index 4a9838f6f8..a2c02b8e04 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -378,10 +378,10 @@ mod simple_buffer_tests { use crate::config::pipeline::{ToVertexConfig, VertexType}; use crate::mapper::test_utils::MapperTestHandle; use crate::message::{IntOffset, Message, MessageID, Offset}; - use crate::pipeline::isb::ISBWriter; use crate::pipeline::isb::reader::{ISBReaderComponents, ISBReaderOrchestrator}; use crate::pipeline::isb::simplebuffer::{SimpleBufferAdapter, WithSimpleBuffer}; use crate::pipeline::isb::writer::{ISBWriterOrchestrator, ISBWriterOrchestratorComponents}; + use crate::pipeline::isb::{ISBReader, ISBWriter}; use crate::tracker::Tracker; struct SimpleCat; @@ -513,7 +513,7 @@ mod simple_buffer_tests { let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); // Wait until all messages appear in the output buffer - let result = tokio::time::timeout(Duration::from_secs(10), async { + let result = tokio::time::timeout(Duration::from_secs(3), async { loop { if output_adapter.pending_count() >= MESSAGE_COUNT { break; @@ -631,7 +631,7 @@ mod simple_buffer_tests { let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); // Wait until all messages appear in the output buffer - let result = tokio::time::timeout(Duration::from_secs(10), async { + let result = tokio::time::timeout(Duration::from_secs(3), async { loop { if simple_buffer.pending_count() >= MESSAGE_COUNT { break; @@ -661,10 +661,10 @@ mod simple_buffer_tests { // End-to-end test for map forwarder using SimpleBuffer. // Reads from an ISB backed by multiple SimpleBuffers, maps through a cat UDF, and writes - // to the SAME ISB + // to an ISB backed by a different set of SimpleBuffers #[tokio::test(flavor = "multi_thread")] - async fn test_map_forwarder_with_multi_stream_same_isb() { - const MESSAGE_COUNT: usize = 10; + async fn test_map_forwarder_with_multi_streams() { + const MESSAGE_COUNT: usize = 1000; let cln_token = CancellationToken::new(); let tracker = Tracker::new(None, cln_token.clone()); @@ -685,25 +685,30 @@ mod simple_buffer_tests { ) .await; - let simple_buffers = vec![ - SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "simple-buffer-0")), - SimpleBufferAdapter::new(SimpleBuffer::new(1000, 1, "simple-buffer-1")), - SimpleBufferAdapter::new(SimpleBuffer::new(1000, 2, "simple-buffer-2")), - SimpleBufferAdapter::new(SimpleBuffer::new(1000, 3, "simple-buffer-3")), - SimpleBufferAdapter::new(SimpleBuffer::new(1000, 4, "simple-buffer-4")), + let output_simple_buffers = vec![ + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 0, "output-simple-buffer-0")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 1, "output-simple-buffer-1")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 2, "output-simple-buffer-2")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 3, "output-simple-buffer-3")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 4, "output-simple-buffer-4")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 5, "output-simple-buffer-5")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 6, "output-simple-buffer-6")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 7, "output-simple-buffer-7")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 8, "output-simple-buffer-8")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 9, "output-simple-buffer-9")), ]; - // Write all the messages to the input buffer so that mapper can read these - for simple_buffer in &simple_buffers { - write_test_messages(simple_buffer, MESSAGE_COUNT).await; - } - let output_streams = vec![ Stream::new("default-test-forwarder-for-map-vertex-out-0", "test", 0), Stream::new("default-test-forwarder-for-map-vertex-out-1", "test", 1), Stream::new("default-test-forwarder-for-map-vertex-out-2", "test", 2), Stream::new("default-test-forwarder-for-map-vertex-out-3", "test", 3), Stream::new("default-test-forwarder-for-map-vertex-out-4", "test", 4), + Stream::new("default-test-forwarder-for-map-vertex-out-5", "test", 5), + Stream::new("default-test-forwarder-for-map-vertex-out-6", "test", 6), + Stream::new("default-test-forwarder-for-map-vertex-out-7", "test", 7), + Stream::new("default-test-forwarder-for-map-vertex-out-8", "test", 8), + Stream::new("default-test-forwarder-for-map-vertex-out-9", "test", 9), ]; let mut writers = HashMap::new(); @@ -712,15 +717,16 @@ mod simple_buffer_tests { buffer_full_strategy: RetryUntilSuccess, ..Default::default() }; + // ISB Writer Orchestrator for (i, output_stream) in (&output_streams).iter().enumerate() { - writers.insert(output_stream.name, simple_buffers[i].writer()); + writers.insert(output_stream.name, output_simple_buffers[i].writer()); } let writer_components = ISBWriterOrchestratorComponents:: { config: vec![ToVertexConfig { name: "test-out", - partitions: 5, + partitions: 10, writer_config, conditions: None, to_vertex_type: VertexType::Sink, @@ -733,6 +739,24 @@ mod simple_buffer_tests { let isb_writer = ISBWriterOrchestrator::::new(writer_components); + let input_simple_buffers = vec![ + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 0, "input-simple-buffer-0")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 1, "input-simple-buffer-1")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 2, "input-simple-buffer-2")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 3, "input-simple-buffer-3")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 4, "input-simple-buffer-4")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 5, "input-simple-buffer-5")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 6, "input-simple-buffer-6")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 7, "input-simple-buffer-7")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 8, "input-simple-buffer-8")), + SimpleBufferAdapter::new(SimpleBuffer::new(10000, 9, "input-simple-buffer-9")), + ]; + + // Write all the messages to the input buffer so that mapper can read these + for simple_buffer in &input_simple_buffers { + write_test_messages(simple_buffer, MESSAGE_COUNT).await; + } + // Unique names for the streams we use in this test let input_streams = vec![ Stream::new("default-test-forwarder-for-map-vertex-in-0", "test", 0), @@ -740,6 +764,11 @@ mod simple_buffer_tests { Stream::new("default-test-forwarder-for-map-vertex-in-2", "test", 2), Stream::new("default-test-forwarder-for-map-vertex-in-3", "test", 3), Stream::new("default-test-forwarder-for-map-vertex-in-4", "test", 4), + Stream::new("default-test-forwarder-for-map-vertex-in-5", "test", 5), + Stream::new("default-test-forwarder-for-map-vertex-in-6", "test", 6), + Stream::new("default-test-forwarder-for-map-vertex-in-7", "test", 7), + Stream::new("default-test-forwarder-for-map-vertex-in-8", "test", 8), + Stream::new("default-test-forwarder-for-map-vertex-in-9", "test", 9), ]; let buf_reader_config = BufferReaderConfig { @@ -764,10 +793,13 @@ mod simple_buffer_tests { cln_token: cln_token.clone(), }; - let isb_reader: ISBReaderOrchestrator = - ISBReaderOrchestrator::new(reader_components, simple_buffers[i].reader(), None) - .await - .unwrap(); + let isb_reader: ISBReaderOrchestrator = ISBReaderOrchestrator::new( + reader_components, + input_simple_buffers[i].reader(), + None, + ) + .await + .unwrap(); // Create and start the MapForwarder let forwarder = MapForwarder::::new( @@ -783,11 +815,11 @@ mod simple_buffer_tests { forwarder_handles.push(forwarder_handle); } - // Wait until all messages appear in the output buffer - let result = tokio::time::timeout(Duration::from_secs(10), async { - for simple_buffer in &simple_buffers { + // Wait until all messages exit the input buffer + let input_result = tokio::time::timeout(Duration::from_secs(10), async { + for simple_buffer in &input_simple_buffers { loop { - if simple_buffer.pending_count() >= MESSAGE_COUNT { + if simple_buffer.pending_count() == 0 { break; } tokio::time::sleep(Duration::from_millis(50)).await; @@ -796,21 +828,49 @@ mod simple_buffer_tests { }) .await; - for simple_buffer in simple_buffers { - assert!( - result.is_ok(), - "Timed out waiting for messages in output buffer. Got {} of {}", - simple_buffer.pending_count(), - MESSAGE_COUNT, - ); + assert!( + input_result.is_ok(), + "Timed out waiting for messages to exit input buffers." + ); + // Ensure that all the messages have left the input buffers + for simple_buffer in input_simple_buffers { assert_eq!( simple_buffer.pending_count(), - MESSAGE_COUNT, + 0, "All messages should be forwarded to the output buffer" ); } + let output_buffer_count = output_simple_buffers.len(); + let output_result = tokio::time::timeout(Duration::from_secs(10), async { + loop { + // Ensure that all the messages have reached output buffers. + // Each buffer may have different number of messages, so check the total count + let mut total_output_messages = 0; + + for simple_buffer in &output_simple_buffers { + println!( + "Output buffer pending count: {}, {}", + simple_buffer.pending_count(), + simple_buffer.writer().name() + ); + total_output_messages += simple_buffer.pending_count(); + } + + if total_output_messages == MESSAGE_COUNT * output_buffer_count { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await; + + assert!( + output_result.is_ok(), + "All messages should be forwarded to the output buffer" + ); + // Shutdown cln_token.cancel(); for forwarder_handle in forwarder_handles { From 157900605ac38f47118e8d2e0f3648446d92baa6 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sun, 1 Mar 2026 00:34:28 -0500 Subject: [PATCH 6/9] Add scenarios for all types of maps Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 161 +++++++++++++++--- 1 file changed, 138 insertions(+), 23 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index a2c02b8e04..e147935a14 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -368,9 +368,14 @@ mod simple_buffer_tests { use bytes::Bytes; use chrono::Utc; - use numaflow::map; + use numaflow::batchmap::{BatchResponse, Datum}; + use numaflow::mapstream::MapStreamRequest; + use numaflow::{batchmap, map, mapstream}; use numaflow_shared::server_info::MapMode; use numaflow_testing::simplebuffer::SimpleBuffer; + use tokio::sync::mpsc; + use tokio::sync::mpsc::{Receiver, Sender}; + use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::config::pipeline::isb::BufferFullStrategy::RetryUntilSuccess; @@ -393,6 +398,29 @@ mod simple_buffer_tests { } } + #[tonic::async_trait] + impl batchmap::BatchMapper for SimpleCat { + async fn batchmap(&self, input: Receiver) -> Vec { + let mut batch_responses = vec![]; + let mut input = input; + while let Some(datum) = input.recv().await { + let mut message = BatchResponse::from_id(datum.id); + message.append(batchmap::Message::new(datum.value).with_keys(datum.keys)); + batch_responses.push(message); + } + batch_responses + } + } + + #[tonic::async_trait] + impl mapstream::MapStreamer for SimpleCat { + async fn map_stream(&self, input: MapStreamRequest, tx: Sender) { + let mut input = input; + let message = mapstream::Message::new(input.value).with_keys(input.keys); + tx.send(message).await.expect("send should succeed"); + } + } + /// Helper to create and write test messages into a SimpleBufferAdapter via its ISBWriter. async fn write_test_messages(adapter: &SimpleBufferAdapter, count: usize) { let writer = adapter.writer(); @@ -662,28 +690,19 @@ mod simple_buffer_tests { // End-to-end test for map forwarder using SimpleBuffer. // Reads from an ISB backed by multiple SimpleBuffers, maps through a cat UDF, and writes // to an ISB backed by a different set of SimpleBuffers - #[tokio::test(flavor = "multi_thread")] - async fn test_map_forwarder_with_multi_streams() { + async fn test_map_forwarder_with_multi_streams( + mapper_test_handle: MapperTestHandle, + tracker: Tracker, + batch_size: usize, + cln_token: CancellationToken, + ) { const MESSAGE_COUNT: usize = 1000; - let cln_token = CancellationToken::new(); - let tracker = Tracker::new(None, cln_token.clone()); - let batch_size = 500; - // Mapper let MapperTestHandle { mapper, server_handle: _server_handle, - } = MapperTestHandle::create_mapper( - SimpleCat, - tracker.clone(), - MapMode::Unary, - batch_size, - Duration::from_secs(10), - Duration::from_secs(10), - 10, - ) - .await; + } = mapper_test_handle; let output_simple_buffers = vec![ SimpleBufferAdapter::new(SimpleBuffer::new(10000, 0, "output-simple-buffer-0")), @@ -850,11 +869,6 @@ mod simple_buffer_tests { let mut total_output_messages = 0; for simple_buffer in &output_simple_buffers { - println!( - "Output buffer pending count: {}, {}", - simple_buffer.pending_count(), - simple_buffer.writer().name() - ); total_output_messages += simple_buffer.pending_count(); } @@ -871,12 +885,113 @@ mod simple_buffer_tests { "All messages should be forwarded to the output buffer" ); - // Shutdown cln_token.cancel(); for forwarder_handle in forwarder_handles { forwarder_handle.abort(); } } + + // Test all map types with multi streams test scenario + #[tokio::test(flavor = "multi_thread")] + async fn test_all_map_types_multi_stream() { + let batch_size = 500; + + // Test unary map + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let mapper_test_handle = MapperTestHandle::create_mapper( + SimpleCat, + tracker.clone(), + MapMode::Unary, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + test_map_forwarder_with_multi_streams( + mapper_test_handle, + tracker, + batch_size, + cln_token.clone(), + ) + .await; + + // Test batch map + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let mapper_test_handle = MapperTestHandle::create_batch_mapper( + SimpleCat, + tracker.clone(), + MapMode::Batch, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + test_map_forwarder_with_multi_streams( + mapper_test_handle, + tracker, + batch_size, + cln_token.clone(), + ) + .await; + + // Test map streamer + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let mapper_test_handle = MapperTestHandle::create_map_streamer( + SimpleCat, + tracker.clone(), + MapMode::Stream, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + test_map_forwarder_with_multi_streams( + mapper_test_handle, + tracker, + batch_size, + cln_token.clone(), + ) + .await; + } + + struct PanicMap; + + #[tonic::async_trait] + impl map::Mapper for PanicMap { + async fn map(&self, _input: map::MapRequest) -> Vec { + panic!("PanicCat panicked!"); + } + } + + #[tonic::async_trait] + impl batchmap::BatchMapper for PanicMap { + async fn batchmap( + &self, + _input: mpsc::Receiver, + ) -> Vec { + panic!("PanicBatchMap panicked!"); + } + } + + #[tonic::async_trait] + impl mapstream::MapStreamer for PanicMap { + async fn map_stream( + &self, + _input: mapstream::MapStreamRequest, + _tx: Sender, + ) { + panic!("Streaming map panicked!"); + } + } } #[cfg(test)] From c8f4dff62da4db61a38dbf333363b2f6f1c7e25b Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sun, 1 Mar 2026 15:32:21 -0500 Subject: [PATCH 7/9] Add panic map scenarios for all types of maps Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 181 ++++++++++++++++-- 1 file changed, 170 insertions(+), 11 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index e147935a14..ace05c0cb1 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -375,7 +375,6 @@ mod simple_buffer_tests { use numaflow_testing::simplebuffer::SimpleBuffer; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; - use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::config::pipeline::isb::BufferFullStrategy::RetryUntilSuccess; @@ -383,10 +382,10 @@ mod simple_buffer_tests { use crate::config::pipeline::{ToVertexConfig, VertexType}; use crate::mapper::test_utils::MapperTestHandle; use crate::message::{IntOffset, Message, MessageID, Offset}; + use crate::pipeline::isb::ISBWriter; use crate::pipeline::isb::reader::{ISBReaderComponents, ISBReaderOrchestrator}; use crate::pipeline::isb::simplebuffer::{SimpleBufferAdapter, WithSimpleBuffer}; use crate::pipeline::isb::writer::{ISBWriterOrchestrator, ISBWriterOrchestratorComponents}; - use crate::pipeline::isb::{ISBReader, ISBWriter}; use crate::tracker::Tracker; struct SimpleCat; @@ -415,7 +414,6 @@ mod simple_buffer_tests { #[tonic::async_trait] impl mapstream::MapStreamer for SimpleCat { async fn map_stream(&self, input: MapStreamRequest, tx: Sender) { - let mut input = input; let message = mapstream::Message::new(input.value).with_keys(input.keys); tx.send(message).await.expect("send should succeed"); } @@ -441,6 +439,7 @@ mod simple_buffer_tests { headers: Arc::new(HashMap::new()), metadata: None, is_late: false, + // ack handle is anyways gonna be overridden during isb read processing ack_handle: None, }; writer.write(msg).await.expect("write should succeed"); @@ -738,8 +737,14 @@ mod simple_buffer_tests { }; // ISB Writer Orchestrator - for (i, output_stream) in (&output_streams).iter().enumerate() { - writers.insert(output_stream.name, output_simple_buffers[i].writer()); + for (i, output_stream) in output_streams.iter().enumerate() { + writers.insert( + output_stream.name, + output_simple_buffers + .get(i) + .expect("Invalid output stream index") + .writer(), + ); } let writer_components = ISBWriterOrchestratorComponents:: { @@ -773,7 +778,7 @@ mod simple_buffer_tests { // Write all the messages to the input buffer so that mapper can read these for simple_buffer in &input_simple_buffers { - write_test_messages(simple_buffer, MESSAGE_COUNT).await; + write_test_messages(simple_buffer, MESSAGE_COUNT).await } // Unique names for the streams we use in this test @@ -797,7 +802,7 @@ mod simple_buffer_tests { }; let mut forwarder_handles = vec![]; - for (i, input_stream) in (&input_streams).iter().enumerate() { + for (i, input_stream) in input_streams.iter().enumerate() { // ISB Reader Orchestrator let reader_components = ISBReaderComponents { @@ -814,7 +819,10 @@ mod simple_buffer_tests { let isb_reader: ISBReaderOrchestrator = ISBReaderOrchestrator::new( reader_components, - input_simple_buffers[i].reader(), + input_simple_buffers + .get(i) + .expect("Invalid input stream index") + .reader(), None, ) .await @@ -968,7 +976,7 @@ mod simple_buffer_tests { #[tonic::async_trait] impl map::Mapper for PanicMap { async fn map(&self, _input: map::MapRequest) -> Vec { - panic!("PanicCat panicked!"); + panic!("PanicMap panicked!"); } } @@ -978,7 +986,7 @@ mod simple_buffer_tests { &self, _input: mpsc::Receiver, ) -> Vec { - panic!("PanicBatchMap panicked!"); + panic!("Batch PanicMap panicked!"); } } @@ -989,9 +997,160 @@ mod simple_buffer_tests { _input: mapstream::MapStreamRequest, _tx: Sender, ) { - panic!("Streaming map panicked!"); + panic!("Streaming PanicMap panicked!"); } } + + //#[tokio::test(flavor = "multi_thread")] + async fn test_panic_map_forwarder_with_simple_buffer( + mapper_test_handle: MapperTestHandle, + tracker: Tracker, + batch_size: usize, + cln_token: CancellationToken, + ) { + const MESSAGE_COUNT: usize = 10; + + // Input buffer + let input_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "input-buffer")); + + // Write all the messages to the input buffer so that mapper can read these + write_test_messages(&input_adapter, MESSAGE_COUNT).await; + + // Output buffer + let output_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "output-buffer")); + + // ISB Reader Orchestrator + let input_stream = Stream::new("input-buffer", "test-in", 0); + let buf_reader_config = BufferReaderConfig { + streams: vec![input_stream.clone()], + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let reader_components = ISBReaderComponents { + vertex_type: "Map".to_string(), + stream: input_stream, + config: buf_reader_config, + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, input_adapter.reader(), None) + .await + .unwrap(); + + // ISB Writer Orchestrator + let output_stream = Stream::new("output-buffer", "test-out", 0); + let writer_config = BufferWriterConfig { + streams: vec![output_stream.clone()], + buffer_full_strategy: RetryUntilSuccess, + ..Default::default() + }; + + let mut writers = HashMap::new(); + writers.insert(output_stream.name, output_adapter.writer()); + + let writer_components = ISBWriterOrchestratorComponents:: { + config: vec![ToVertexConfig { + name: "test-out", + partitions: 1, + writer_config, + conditions: None, + to_vertex_type: VertexType::Sink, + }], + writers, + paf_concurrency: 100, + watermark_handle: None, + vertex_type: VertexType::MapUDF, + }; + + let isb_writer = ISBWriterOrchestrator::::new(writer_components); + + // Mapper + let MapperTestHandle { + mapper, + server_handle: _server_handle, + } = mapper_test_handle; + + // Create and start the MapForwarder + let forwarder = MapForwarder::::new(isb_reader, mapper, isb_writer).await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + + // Wait for the forwarder to start + tokio::time::sleep(Duration::from_millis(500)).await; + + assert_eq!( + input_adapter.pending_count() + input_adapter.in_flight_count(), + MESSAGE_COUNT, + "All messages should be forwarded to the output buffer" + ); + + assert_eq!( + output_adapter.pending_count() + output_adapter.in_flight_count(), + 0, + "All messages should be forwarded to the output buffer" + ); + + // Shutdown + cln_token.cancel(); + forwarder_handle.abort(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_all_map_types_for_panic() { + let batch_size = 500; + + // Test unary map + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let mapper_test_handle = MapperTestHandle::create_mapper( + PanicMap, + tracker.clone(), + MapMode::Unary, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + test_panic_map_forwarder_with_simple_buffer( + mapper_test_handle, + tracker, + batch_size, + cln_token, + ) + .await; + + // Test map streamer + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let mapper_test_handle = MapperTestHandle::create_map_streamer( + PanicMap, + tracker.clone(), + MapMode::Stream, + batch_size, + Duration::from_secs(10), + Duration::from_secs(10), + 10, + ) + .await; + + test_panic_map_forwarder_with_simple_buffer( + mapper_test_handle, + tracker, + batch_size, + cln_token.clone(), + ) + .await; + } } #[cfg(test)] From ecc092d35934981ed84b6bc0f1d44c247bc37884 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sun, 1 Mar 2026 16:18:43 -0500 Subject: [PATCH 8/9] Add timeout for forwarder handles instead of aborting explicitly Signed-off-by: Vaibhav Tiwari --- .../src/pipeline/forwarder/map_forwarder.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index c0d3a43498..d15ec9859c 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -900,7 +900,12 @@ mod simple_buffer_tests { cln_token.cancel(); for forwarder_handle in forwarder_handles { - forwarder_handle.abort(); + let forwarder_result = + tokio::time::timeout(Duration::from_secs(1), forwarder_handle).await; + assert!( + forwarder_result.is_ok(), + "Forwarder should have completed successfully after cancellation" + ); } } @@ -1105,7 +1110,11 @@ mod simple_buffer_tests { // Shutdown cln_token.cancel(); - forwarder_handle.abort(); + let forwarder_result = tokio::time::timeout(Duration::from_secs(2), forwarder_handle).await; + assert!( + forwarder_result.is_ok(), + "Forwarder should have completed successfully after cancellation" + ); } #[tokio::test(flavor = "multi_thread")] From 0de74a9122cf1eff508ab1cedc343104fe4d485e Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Mon, 2 Mar 2026 16:11:15 -0500 Subject: [PATCH 9/9] Remove the streaming panic test Signed-off-by: Vaibhav Tiwari --- rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs | 2 ++ rust/numaflow-core/src/pipeline/isb/simplebuffer.rs | 7 +++++++ rust/numaflow-testing/src/simplebuffer.rs | 5 +++++ rust/numaflow-testing/src/simplebuffer/buffer.rs | 7 +++++++ 4 files changed, 21 insertions(+) diff --git a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs index d15ec9859c..1f4d2ed547 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/map_forwarder.rs @@ -1144,6 +1144,7 @@ mod simple_buffer_tests { .await; // Test map streamer + /* Due to #3268, this test is flaky let cln_token = CancellationToken::new(); let tracker = Tracker::new(None, cln_token.clone()); let mapper_test_handle = MapperTestHandle::create_map_streamer( @@ -1164,6 +1165,7 @@ mod simple_buffer_tests { cln_token.clone(), ) .await; + */ } } diff --git a/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs b/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs index 835fc36479..5076be8f2e 100644 --- a/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs +++ b/rust/numaflow-core/src/pipeline/isb/simplebuffer.rs @@ -65,6 +65,13 @@ impl SimpleBufferAdapter { pub fn in_flight_count(&self) -> usize { self.buffer.in_flight_count() } + + /// Get the current number of acked messages in the buffer. + /// This can be used during testing to identify the count of messages that have been acked. + #[allow(dead_code)] + pub fn acked_count(&self) -> usize { + self.buffer.acked_count() + } } /// Adapter that wraps `SimpleReader` and implements the `ISBReader` trait. diff --git a/rust/numaflow-testing/src/simplebuffer.rs b/rust/numaflow-testing/src/simplebuffer.rs index f0ba4823ee..e6f6d7ae35 100644 --- a/rust/numaflow-testing/src/simplebuffer.rs +++ b/rust/numaflow-testing/src/simplebuffer.rs @@ -135,6 +135,11 @@ impl SimpleBuffer { self.state.read().in_flight_count() } + /// Get the current number of acked messages. + pub fn acked_count(&self) -> usize { + self.state.read().acked_count() + } + /// Get the current buffer usage as a fraction. pub fn usage(&self) -> f64 { self.state.read().usage() diff --git a/rust/numaflow-testing/src/simplebuffer/buffer.rs b/rust/numaflow-testing/src/simplebuffer/buffer.rs index 545b9f2564..262626f2c7 100644 --- a/rust/numaflow-testing/src/simplebuffer/buffer.rs +++ b/rust/numaflow-testing/src/simplebuffer/buffer.rs @@ -132,6 +132,13 @@ impl BufferState { .count() } + pub(crate) fn acked_count(&self) -> usize { + self.slots + .iter() + .filter(|s| s.state == MessageState::Acked) + .count() + } + /// Reclaim acked slots from the front of the buffer. pub(crate) fn reclaim_acked(&mut self) { while let Some(front) = self.slots.front() {