From ebd48f56496bbef755de5135a21e85d31e04b7a4 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 8 Apr 2024 12:35:12 +0200 Subject: [PATCH 1/5] try to test opensearch2 --- .github/workflows/check-build-test.yml | 2 +- docker-compose.yml | 9 +- .../java/docs/javadsl/OpensearchV2Test.java | 445 +++++++++++++++ .../docs/scaladsl/OpensearchV2Spec.scala | 527 ++++++++++++++++++ 4 files changed, 981 insertions(+), 2 deletions(-) create mode 100644 elasticsearch/src/test/java/docs/javadsl/OpensearchV2Test.java create mode 100644 elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala diff --git a/.github/workflows/check-build-test.yml b/.github/workflows/check-build-test.yml index 5135bc59d..ebbaa348d 100644 --- a/.github/workflows/check-build-test.yml +++ b/.github/workflows/check-build-test.yml @@ -85,7 +85,7 @@ jobs: - { connector: couchbase, pre_cmd: 'docker-compose up -d couchbase_prep' } - { connector: csv } - { connector: dynamodb, pre_cmd: 'docker-compose up -d dynamodb' } - - { connector: elasticsearch, pre_cmd: 'docker-compose up -d elasticsearch6 elasticsearch7 opensearch1' } + - { connector: elasticsearch, pre_cmd: 'docker-compose up -d elasticsearch6 elasticsearch7 opensearch1 opensearch2' } - { connector: file } - { connector: ftp, pre_cmd: './scripts/ftp-servers.sh' } - { connector: geode, pre_cmd: 'docker-compose up -d geode' } diff --git a/docker-compose.yml b/docker-compose.yml index dc0e6a2de..6f50b1c61 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -86,12 +86,19 @@ services: environment: - "discovery.type=single-node" opensearch1: - image: opensearchproject/opensearch:1.3.1 + image: opensearchproject/opensearch:1.3.14 ports: - "9203:9200" environment: - "discovery.type=single-node" - "DISABLE_SECURITY_PLUGIN=true" + opensearch2: + image: opensearchproject/opensearch:2.13.0 + ports: + - "9204:9200" + environment: + - "discovery.type=single-node" + - "DISABLE_SECURITY_PLUGIN=true" ftp: image: stilliard/pure-ftpd:latest ports: diff --git a/elasticsearch/src/test/java/docs/javadsl/OpensearchV2Test.java b/elasticsearch/src/test/java/docs/javadsl/OpensearchV2Test.java new file mode 100644 index 000000000..2eb6b03e8 --- /dev/null +++ b/elasticsearch/src/test/java/docs/javadsl/OpensearchV2Test.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.javadsl; + +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.stream.connectors.elasticsearch.*; +import org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchFlow; +import org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchSink; +import org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchSource; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +public class OpensearchV2Test extends ElasticsearchTestBase { + @BeforeClass + public static void setup() throws IOException { + setupBase(); + + prepareIndex(9204, OpensearchApiVersion.V1); + } + + @AfterClass + public static void shutdown() throws IOException { + cleanIndex(); + } + + @Test + public void typedStream() throws Exception { + // Copy source/book to sink2/book through JsObject stream + // #run-typed + OpensearchSourceSettings sourceSettings = + OpensearchSourceSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + OpensearchWriteSettings sinkSettings = + OpensearchWriteSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + + Source, NotUsed> source = + ElasticsearchSource.typed( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + sourceSettings, + Book.class); + CompletionStage f1 = + source + .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) + .runWith( + ElasticsearchSink.create( + constructElasticsearchParams("sink2", "_doc", OpensearchApiVersion.V1), + sinkSettings, + new ObjectMapper()), + system); + // #run-typed + + f1.toCompletableFuture().get(); + + flushAndRefresh("sink2"); + + // Assert docs in sink2/book + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink2", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = + Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production"); + + Collections.sort(result); + assertEquals(expect, result); + } + + @Test + public void jsObjectStream() throws Exception { + // Copy source/book to sink1/book through JsObject stream + // #run-jsobject + OpensearchSourceSettings sourceSettings = + OpensearchSourceSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + OpensearchWriteSettings sinkSettings = + OpensearchWriteSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + + Source>, NotUsed> source = + ElasticsearchSource.create( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + sourceSettings); + CompletionStage f1 = + source + .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) + .runWith( + ElasticsearchSink.create( + constructElasticsearchParams("sink1", "_doc", OpensearchApiVersion.V1), + sinkSettings, + new ObjectMapper()), + system); + // #run-jsobject + + f1.toCompletableFuture().get(); + + flushAndRefresh("sink1"); + + // Assert docs in sink1/_doc + CompletionStage> f2 = + ElasticsearchSource.create( + constructElasticsearchParams("sink1", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5)) + .map(m -> (String) m.source().get("title")) + .runWith(Sink.seq(), system); + + List result = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = + Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production"); + + Collections.sort(result); + assertEquals(expect, result); + } + + @Test + public void flow() throws Exception { + // Copy source/book to sink3/book through JsObject stream + // #run-flow + CompletionStage>> f1 = + ElasticsearchSource.typed( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + Book.class) + .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams("sink3", "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + new ObjectMapper())) + .runWith(Sink.seq(), system); + // #run-flow + + List> result1 = f1.toCompletableFuture().get(); + flushAndRefresh("sink3"); + + for (WriteResult aResult1 : result1) { + assertEquals(true, aResult1.success()); + } + + // Assert docs in sink3/book + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink3", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withConnection(connectionSettings) + .withBufferSize(5), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = + Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production"); + + Collections.sort(result2); + assertEquals(expect, result2); + } + + @Test + public void stringFlow() throws Exception { + // Copy source/book to sink3/book through JsObject stream + String indexName = "sink3-0"; + // #string + CompletionStage>> write = + Source.from( + Arrays.asList( + WriteMessage.createIndexMessage("1", "{\"title\": \"Das Parfum\"}"), + WriteMessage.createIndexMessage("2", "{\"title\": \"Faust\"}"), + WriteMessage.createIndexMessage( + "3", "{\"title\": \"Die unendliche Geschichte\"}"))) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + StringMessageWriter.getInstance())) + .runWith(Sink.seq(), system); + // #string + + List> result1 = write.toCompletableFuture().get(); + flushAndRefresh(indexName); + + for (WriteResult aResult1 : result1) { + assertEquals(true, aResult1.success()); + } + + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = Arrays.asList("Das Parfum", "Die unendliche Geschichte", "Faust"); + + Collections.sort(result2); + assertEquals(expect, result2); + } + + @Test + public void testMultipleOperations() throws Exception { + // #multiple-operations + // Create, update, upsert and delete documents in sink8/book + List> requests = + Arrays.asList( + WriteMessage.createIndexMessage("00001", new Book("Book 1")), + WriteMessage.createUpsertMessage("00002", new Book("Book 2")), + WriteMessage.createUpsertMessage("00003", new Book("Book 3")), + WriteMessage.createUpdateMessage("00004", new Book("Book 4")), + WriteMessage.createDeleteMessage("00002")); + + Source.from(requests) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams("sink8", "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + // #multiple-operations + + flushAndRefresh("sink8"); + + // Assert docs in sink8/book + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink8", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + List expect = Arrays.asList("Book 1", "Book 3"); + Collections.sort(result2); + + assertEquals(expect, result2); + } + + @Test + public void testKafkaExample() throws Exception { + // #kafka-example + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + List messagesFromKafka = + Arrays.asList( + new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)), + new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)), + new KafkaMessage(new Book("Book 3"), new KafkaOffset(2))); + + final KafkaCommitter kafkaCommitter = new KafkaCommitter(); + + CompletionStage kafkaToOs = + Source.from(messagesFromKafka) // Assume we get this from Kafka + .map( + kafkaMessage -> { + Book book = kafkaMessage.book; + String id = book.title; + + // Transform message so that we can write to elastic + return WriteMessage.createIndexMessage(id, book) + .withPassThrough(kafkaMessage.offset); + }) + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough( + constructElasticsearchParams("sink6", "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + new ObjectMapper())) + .map( + result -> { + if (!result.success()) + throw new RuntimeException("Failed to write message to elastic"); + // Commit to kafka + kafkaCommitter.commit(result.message().passThrough()); + return NotUsed.getInstance(); + }) + .runWith(Sink.ignore(), system); + // #kafka-example + kafkaToOs.toCompletableFuture().get(5, TimeUnit.SECONDS); // Wait for it to complete + flushAndRefresh("sink6"); + + // Make sure all messages was committed to kafka + assertEquals(Arrays.asList(0, 1, 2), kafkaCommitter.committedOffsets); + + // Assert that all docs were written to elastic + List result2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink6", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system) // Run it + .toCompletableFuture() + .get(); // Wait for it to complete + + assertEquals( + messagesFromKafka.stream().map(m -> m.book.title).sorted().collect(Collectors.toList()), + result2.stream().sorted().collect(Collectors.toList())); + } + + @Test + public void testUsingSearchParams() throws Exception { + + String indexName = "test_using_search_params_versions_java"; + String typeName = "_doc"; + + List docs = + Arrays.asList( + new TestDoc("1", "a1", "b1", "c1"), + new TestDoc("2", "a2", "b2", "c2"), + new TestDoc("3", "a3", "b3", "c3")); + + // Insert document + Source.from(docs) + .map((TestDoc d) -> WriteMessage.createIndexMessage(d.id, d)) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + + flushAndRefresh(indexName); + + // #custom-search-params + // Search for docs and ask elastic to only return some fields + + Map searchParams = new HashMap<>(); + searchParams.put("query", "{\"match_all\": {}}"); + searchParams.put("_source", "[\"id\", \"a\", \"c\"]"); + + List result = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + searchParams, // <-- Using searchParams + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + TestDoc.class, + new ObjectMapper()) + .map( + o -> { + return o.source(); // These documents will only have property id, a and c (not + }) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + // #custom-search-params + flushAndRefresh(indexName); + + assertEquals( + docs.size(), + result.stream() + .filter( + d -> { + return d.a != null && d.b == null; + }) + .collect(Collectors.toList()) + .size()); + } +} diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala new file mode 100644 index 000000000..703423c0b --- /dev/null +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.scaladsl + +import org.apache.pekko +import pekko.http.scaladsl.model.Uri.Path +import pekko.http.scaladsl.model.{ HttpMethods, HttpRequest, Uri } +import pekko.stream.connectors.elasticsearch.{ + ElasticsearchConnectionSettings, + OpensearchApiVersion, + OpensearchConnectionSettings, + ReadResult, + StringMessageWriter, + WriteMessage, + WriteResult +} +import pekko.stream.connectors.elasticsearch.scaladsl.{ ElasticsearchFlow, ElasticsearchSink, ElasticsearchSource } +import pekko.stream.connectors.elasticsearch._ +import pekko.stream.scaladsl.{ Sink, Source } +import pekko.testkit.TestKit +import pekko.{ Done, NotUsed } +import spray.json.jsonReader + +import scala.collection.immutable +import scala.concurrent.Future +import spray.json._ + +class OpensearchV2Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils { + + private val connectionSettings: ElasticsearchConnectionSettings = OpensearchConnectionSettings( + "http://localhost:9204") + private val baseSourceSettings = OpensearchSourceSettings(connectionSettings).withApiVersion(OpensearchApiVersion.V1) + private val baseWriteSettings = OpensearchWriteSettings(connectionSettings).withApiVersion(OpensearchApiVersion.V1) + + override protected def beforeAll() = { + insertTestData(connectionSettings) + } + + override def afterAll() = { + val deleteRequest = HttpRequest(HttpMethods.DELETE) + .withUri(Uri(connectionSettings.baseUrl).withPath(Path("/_all"))) + http.singleRequest(deleteRequest).futureValue + + TestKit.shutdownActorSystem(system) + } + + "Un-typed Opensearch connector" should { + "consume and publish Json documents" in { + val indexName = "sink2" + + // #run-jsobject + val copy = ElasticsearchSource + .create( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings) + .map { (message: ReadResult[spray.json.JsObject]) => + val book: Book = jsonReader[Book].read(message.source) + WriteMessage.createIndexMessage(message.id, book) + } + .runWith( + ElasticsearchSink.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + // #run-jsobject + + copy.futureValue shouldBe Done + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, + indexName).futureValue should contain allElementsOf Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production") + } + } + + "Typed Opensearch connector" should { + "consume and publish documents as specific type" in { + val indexName = "sink2" + + // #run-typed + val copy = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings) + .map { (message: ReadResult[Book]) => + WriteMessage.createIndexMessage(message.id, message.source) + } + .runWith( + ElasticsearchSink.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + // #run-typed + + copy.futureValue shouldBe Done + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, + indexName).futureValue should contain allElementsOf Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production") + } + } + + "ElasticsearchFlow" should { + "store documents and pass failed documents to downstream" in { + val indexName = "sink3" + // #run-flow + val copy = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings) + .map { (message: ReadResult[Book]) => + WriteMessage.createIndexMessage(message.id, message.source) + } + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + .runWith(Sink.seq) + // #run-flow + + // Assert no errors + copy.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production") + } + + "store properly formatted JSON from Strings" in { + val indexName = "sink3-0" + + // #string + val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source( + immutable.Seq( + WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint), + WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint), + WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings, + StringMessageWriter)) + .runWith(Sink.seq) + // #string + + // Assert no errors + write.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.sorted shouldEqual Seq( + "Das Parfum", + "Die unendliche Geschichte", + "Faust") + } + + "kafka-example - store documents and pass Responses with passThrough" in { + + // #kafka-example + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book 1"), KafkaOffset(0)), + KafkaMessage(Book("Book 2"), KafkaOffset(1)), + KafkaMessage(Book("Book 3"), KafkaOffset(2))) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6" + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { (kafkaMessage: KafkaMessage) => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + } + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + // #kafka-example + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, + indexName).futureValue.toList should contain allElementsOf messagesFromKafka + .map(_.book.title) + } + + "kafka-example - store documents and pass Responses with passThrough in bulk" in { + + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book 1"), KafkaOffset(0)), + KafkaMessage(Book("Book 2"), KafkaOffset(1)), + KafkaMessage(Book("Book 3"), KafkaOffset(2))) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6-bulk" + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { (kafkaMessage: KafkaMessage) => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .grouped(2) + .via( // write to elastic + ElasticsearchFlow.createBulk[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + .map(_.map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + }) + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, + indexName).futureValue.toList should contain allElementsOf messagesFromKafka + .map(_.book.title) + } + + "kafka-example - store documents and pass Responses with passThrough skipping some w/ NOP" in { + + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book A", shouldSkip = Some(true)), KafkaOffset(0)), + KafkaMessage(Book("Book 1"), KafkaOffset(1)), + KafkaMessage(Book("Book 2"), KafkaOffset(2)), + KafkaMessage(Book("Book B", shouldSkip = Some(true)), KafkaOffset(3)), + KafkaMessage(Book("Book 3"), KafkaOffset(4)), + KafkaMessage(Book("Book C", shouldSkip = Some(true)), KafkaOffset(5))) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6-nop" + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { (kafkaMessage: KafkaMessage) => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + if (book.shouldSkip.getOrElse(false)) + WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset) + else + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + } + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2, 3, 4, 5) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, + indexName).futureValue.toList should contain allElementsOf messagesFromKafka + .filterNot(_.book.shouldSkip.getOrElse(false)) + .map(_.book.title) + } + + "kafka-example - skip all NOP documents and pass Responses with passThrough" in { + + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book 1", shouldSkip = Some(true)), KafkaOffset(0)), + KafkaMessage(Book("Book 2", shouldSkip = Some(true)), KafkaOffset(1)), + KafkaMessage(Book("Book 3", shouldSkip = Some(true)), KafkaOffset(2))) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6-none" + register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below + + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { (kafkaMessage: KafkaMessage) => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + if (book.shouldSkip.getOrElse(false)) + WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset) + else + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + } + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.toList shouldBe List("dummy") + } + + "handle multiple types of operations correctly" in { + val indexName = "sink8" + // #multiple-operations + val requests = List[WriteMessage[Book, NotUsed]]( + WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")), + WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")), + WriteMessage.createUpsertMessage(id = "00003", source = Book("Book 3")), + WriteMessage.createUpdateMessage(id = "00004", source = Book("Book 4")), + WriteMessage.createCreateMessage(id = "00005", source = Book("Book 5")), + WriteMessage.createDeleteMessage(id = "00002")) + + val writeResults = Source(requests) + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + baseWriteSettings)) + .runWith(Sink.seq) + // #multiple-operations + + val results = writeResults.futureValue + results should have size requests.size + // Assert no errors except a missing document for a update request + val errorMessages = results.flatMap(_.errorReason) + errorMessages should have size 1 + errorMessages.head shouldEqual "[_doc][00004]: document missing" + flushAndRefresh(connectionSettings, indexName) + + // Assert docs in sink8/_doc + val readBooks = ElasticsearchSource( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + """{"match_all": {}}""", + baseSourceSettings).map { message => + message.source + } + .runWith(Sink.seq) + + // Docs should contain both columns + readBooks.futureValue.sortBy(_.fields("title").compactPrint) shouldEqual Seq( + Book("Book 1").toJson, + Book("Book 3").toJson, + Book("Book 5").toJson) + } + + "use indexName supplied in message if present" in { + // Copy source/_doc to sink2/_doc through typed stream + + // #custom-index-name-example + val customIndexName = "custom-index" + + val writeCustomIndex = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings) + .map { (message: ReadResult[Book]) => + WriteMessage + .createIndexMessage(message.id, message.source) + .withIndexName(customIndexName) // Setting the index-name to use for this document + } + .runWith( + ElasticsearchSink.create[Book]( + constructElasticsearchParams("this-is-not-the-index-we-are-using", "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings)) + // #custom-index-name-example + + writeCustomIndex.futureValue shouldBe Done + flushAndRefresh(connectionSettings, customIndexName) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, customIndexName).futureValue.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production") + } + } + + "ElasticsearchSource" should { + "be able to use custom searchParams" in { + import spray.json._ + import DefaultJsonProtocol._ + + case class TestDoc(id: String, a: String, b: Option[String], c: String) + + implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc.apply) + + val indexName = "custom-search-params-test-scala" + val typeName = "_doc" + + val docs = List( + TestDoc("1", "a1", Some("b1"), "c1"), + TestDoc("2", "a2", Some("b2"), "c2"), + TestDoc("3", "a3", Some("b3"), "c3")) + + // insert new documents + val writes = Source(docs) + .map { doc => + WriteMessage.createIndexMessage(doc.id, doc) + } + .via( + ElasticsearchFlow.create[TestDoc]( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + baseWriteSettings.withBufferSize(5))) + .runWith(Sink.seq) + + writes.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + // #custom-search-params + // Search for docs and ask elastic to only return some fields + + val readWithSearchParameters = ElasticsearchSource + .typed[TestDoc]( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + searchParams = Map( + "query" -> """ {"match_all": {}} """, + "_source" -> """ ["id", "a", "c"] """), + baseSourceSettings) + .map { message => + message.source + } + .runWith(Sink.seq) + // #custom-search-params + + assert(readWithSearchParameters.futureValue.toList.sortBy(_.id) == docs.map(_.copy(b = None))) + + } + } +} From a54d208bd41a44e3bb799f303dd92ebc180ef5e4 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 8 Apr 2024 12:56:37 +0200 Subject: [PATCH 2/5] rename --- .../{ElasticsearchV5Test.java => ElasticsearchV6Test.java} | 2 +- .../{ElasticsearchV5Spec.scala => ElasticsearchV6Spec.scala} | 2 +- .../src/test/scala/docs/scaladsl/OpensearchV2Spec.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename elasticsearch/src/test/java/docs/javadsl/{ElasticsearchV5Test.java => ElasticsearchV6Test.java} (99%) rename elasticsearch/src/test/scala/docs/scaladsl/{ElasticsearchV5Spec.scala => ElasticsearchV6Spec.scala} (99%) diff --git a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java similarity index 99% rename from elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java rename to elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java index d719bb3d7..f76b89867 100644 --- a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java +++ b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java @@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class ElasticsearchV5Test extends ElasticsearchTestBase { +public class ElasticsearchV6Test extends ElasticsearchTestBase { @BeforeClass public static void setup() throws IOException { setupBase(); diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala similarity index 99% rename from elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala rename to elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala index 7bc0bb25b..0228e86e7 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala @@ -36,7 +36,7 @@ import scala.collection.immutable import scala.concurrent.Future import spray.json._ -class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils { +class ElasticsearchV6Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils { private val connectionSettings: ElasticsearchConnectionSettings = ElasticsearchConnectionSettings( "http://localhost:9201") diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala index 703423c0b..36e27c989 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala @@ -419,7 +419,7 @@ class OpensearchV2Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils // Assert no errors except a missing document for a update request val errorMessages = results.flatMap(_.errorReason) errorMessages should have size 1 - errorMessages.head shouldEqual "[_doc][00004]: document missing" + errorMessages.head shouldEqual "[][00004]: document missing" flushAndRefresh(connectionSettings, indexName) // Assert docs in sink8/_doc From 610b72552577bbd1fd0275867ee705649ae6c4f0 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 8 Apr 2024 13:10:25 +0200 Subject: [PATCH 3/5] Update elasticsearch.md --- docs/src/main/paradox/elasticsearch.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/src/main/paradox/elasticsearch.md b/docs/src/main/paradox/elasticsearch.md index c34f565a7..510a7b216 100644 --- a/docs/src/main/paradox/elasticsearch.md +++ b/docs/src/main/paradox/elasticsearch.md @@ -74,7 +74,7 @@ Use `ElasticsearchSource.typed` and `ElasticsearchSink.create` to create source @java[The data is converted to and from JSON by Jackson's ObjectMapper.] Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #run-typed } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #run-typed } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-typed } @@ -84,7 +84,7 @@ Java Use `ElasticsearchSource.create` and `ElasticsearchSink.create` to create source and sink. Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #run-jsobject } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #run-jsobject } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-jsobject } @@ -103,7 +103,7 @@ In the above examples, `WriteMessage` is used as the input to `ElasticsearchSink | WriteMessage.createDeleteMessage | Delete an existing document. If there is no document with the specified `id`, do nothing. | Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #multiple-operations } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #multiple-operations } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #multiple-operations } @@ -186,7 +186,7 @@ You can also build flow stages with @apidoc[ElasticsearchFlow$]. The API is similar to creating Sinks. Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #run-flow } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #run-flow } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-flow } @@ -197,7 +197,7 @@ Java Elasticsearch requires the documents to be properly formatted JSON. If your data is available as JSON in Strings, you may use the pre-defined `StringMessageWriter` to avoid any conversions. For any other JSON technologies, implement a @scala[`MessageWriter[T]`]@java[`MessageWriter`]. Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #string } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #string } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #string } @@ -209,7 +209,7 @@ Java When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to Elastic. Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #kafka-example } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #kafka-example } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #kafka-example } @@ -221,7 +221,7 @@ When working with index-patterns using wildcards, you might need to specify a cu index-name for each document: Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #custom-index-name-example } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #custom-index-name-example } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #custom-index-name-example } @@ -245,7 +245,7 @@ The easiest way of using Elasticsearch-source, is to just specify the query-para like specifying which fields to return and so on. In such cases you can instead use 'searchParams' instead: Scala -: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #custom-search-params } +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #custom-search-params } Java : @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #custom-search-params } From daf8827b3e7a8f6b4d676391c9eaefac431ab68b Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 8 Apr 2024 14:14:56 +0200 Subject: [PATCH 4/5] Update OpensearchV2Spec.scala --- .../src/test/scala/docs/scaladsl/OpensearchV2Spec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala index 36e27c989..f9c5250dd 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV2Spec.scala @@ -419,7 +419,7 @@ class OpensearchV2Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils // Assert no errors except a missing document for a update request val errorMessages = results.flatMap(_.errorReason) errorMessages should have size 1 - errorMessages.head shouldEqual "[][00004]: document missing" + errorMessages.head shouldEqual "[00004]: document missing" flushAndRefresh(connectionSettings, indexName) // Assert docs in sink8/_doc From a41e082e5a9d159420cc986f91de8d96de6e1f7a Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 8 Apr 2024 14:35:33 +0200 Subject: [PATCH 5/5] fix links --- docs/src/main/paradox/elasticsearch.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/src/main/paradox/elasticsearch.md b/docs/src/main/paradox/elasticsearch.md index 510a7b216..0d4ec4685 100644 --- a/docs/src/main/paradox/elasticsearch.md +++ b/docs/src/main/paradox/elasticsearch.md @@ -77,7 +77,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #run-typed } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-typed } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #run-typed } ### With JSON source @@ -87,7 +87,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #run-jsobject } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-jsobject } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #run-jsobject } ### Writing to Elasticsearch @@ -106,7 +106,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #multiple-operations } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #multiple-operations } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #multiple-operations } ### Source configuration @@ -189,7 +189,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #run-flow } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-flow } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #run-flow } ### Storing documents from Strings @@ -200,7 +200,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #string } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #string } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #string } @@ -212,7 +212,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #kafka-example } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #kafka-example } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #kafka-example } ### Specifying custom index-name for every document @@ -248,7 +248,7 @@ Scala : @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV6Spec.scala) { #custom-search-params } Java -: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #custom-search-params } +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV6Test.java) { #custom-search-params } #### Routing