Skip to content

Commit d455e9a

Browse files
lynxplutoihostage
authored andcommitted
ES7.x/8.x spec support. ES6 support dropped
1 parent e3795a1 commit d455e9a

File tree

14 files changed

+74
-82
lines changed

14 files changed

+74
-82
lines changed

java/build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ object Versions {
1616
const val jacoco = "0.8.2"
1717
const val jackson = "2.9.7"
1818
const val elasticsearch = "7.16.2"
19-
const val testcontainers = "1.18.3"
19+
const val testcontainers = "1.20.0"
2020
}
2121

2222
val lagomVersion = project.properties["lagomVersion"] as String? ?: Versions.lagom

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/document/ElasticDocument.kt

+16-16
Original file line numberDiff line numberDiff line change
@@ -30,61 +30,61 @@ interface ElasticDocument : ElasticService {
3030
* Add document to index.
3131
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html)
3232
*/
33-
fun indexWithId(index: String, type: String, id: String): ServiceCall<ByteString, IndexResult>
33+
fun indexWithId(index: String, id: String): ServiceCall<ByteString, IndexResult>
3434

3535
/**
3636
* Retrieve document with meta from index.
3737
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html)
3838
*/
39-
fun get(index: String, type: String, id: String): ServiceCall<NotUsed, ByteString>
39+
fun get(index: String, id: String): ServiceCall<NotUsed, ByteString>
4040

4141
/**
4242
* Retrieve document (only source) from index.
4343
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html#_source)
4444
*/
45-
fun getSource(index: String, type: String, id: String): ServiceCall<NotUsed, ByteString>
45+
fun getSource(index: String, id: String): ServiceCall<NotUsed, ByteString>
4646

4747
/**
4848
* Check the document exists on index.
4949
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html#docs-get)
5050
*/
51-
fun exists(index: String, type: String, id: String): ServiceCall<NotUsed, Done>
51+
fun exists(index: String, id: String): ServiceCall<NotUsed, Done>
5252

5353
/**
5454
* Check the document source exists on index.
5555
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html#_source)
5656
*/
57-
fun existsSource(index: String, type: String, id: String): ServiceCall<NotUsed, Done>
57+
fun existsSource(index: String, id: String): ServiceCall<NotUsed, Done>
5858

5959
/**
6060
* Executing bulk requests.
6161
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)
6262
*/
63-
fun bulk(index: String, type: String): ServiceCall<BulkRequest, BulkResult>
63+
fun bulk(index: String): ServiceCall<BulkRequest, BulkResult>
6464

6565
/**
6666
* Executes update document requests.
6767
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html)
6868
*/
69-
fun update(index: String, type: String, id: String): ServiceCall<ByteString, UpdateResult>
69+
fun update(index: String, id: String): ServiceCall<ByteString, UpdateResult>
7070

7171
/**
7272
* Executes delete document requests.
7373
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html)
7474
*/
75-
fun delete(index: String, type: String, id: String): ServiceCall<NotUsed, DeleteResult>
75+
fun delete(index: String, id: String): ServiceCall<NotUsed, DeleteResult>
7676

7777
@JvmDefault
7878
override fun descriptor(): Descriptor {
7979
return named("elastic-document").withCalls(
80-
restCall<ByteString, IndexResult>(PUT, "/:index/:type/:id", ElasticDocument::indexWithId.javaMethod),
81-
restCall<NotUsed, ByteString>(GET, "/:index/:type/:id", ElasticDocument::get.javaMethod),
82-
restCall<NotUsed, Done>(HEAD, "/:index/:type/:id", ElasticDocument::exists.javaMethod),
83-
restCall<NotUsed, ByteString>(GET, "/:index/:type/:id/_source", ElasticDocument::getSource.javaMethod),
84-
restCall<NotUsed, Done>(HEAD, "/:index/:type/:id/_source", ElasticDocument::existsSource.javaMethod),
85-
restCall<BulkRequest, BulkResult>(POST, "/:index/:type/_bulk", ElasticDocument::bulk.javaMethod),
86-
restCall<ByteString, UpdateResult>(POST, "/:index/:type/:id/_update", ElasticDocument::update.javaMethod),
87-
restCall<NotUsed, DeleteResult>(DELETE, "/:index/:type/:id", ElasticDocument::delete.javaMethod)
80+
restCall<ByteString, IndexResult>(PUT, "/:index/_doc/:id", ElasticDocument::indexWithId.javaMethod),
81+
restCall<NotUsed, ByteString>(GET, "/:index/_doc/:id", ElasticDocument::get.javaMethod),
82+
restCall<NotUsed, Done>(HEAD, "/:index/_doc/:id", ElasticDocument::exists.javaMethod),
83+
restCall<NotUsed, ByteString>(GET, "/:index/_source/:id", ElasticDocument::getSource.javaMethod),
84+
restCall<NotUsed, Done>(HEAD, "/:index/_source/:id", ElasticDocument::existsSource.javaMethod),
85+
restCall<BulkRequest, BulkResult>(POST, "/:index/_bulk", ElasticDocument::bulk.javaMethod),
86+
restCall<ByteString, UpdateResult>(POST, "/:index/_update/:id", ElasticDocument::update.javaMethod),
87+
restCall<NotUsed, DeleteResult>(DELETE, "/:index/_doc/:id", ElasticDocument::delete.javaMethod)
8888
)
8989
.withSerializerFactory(ElasticSerializerFactory(objectMapper()))
9090
}

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/document/dsl/Document.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ open class Document<T> @JvmOverloads constructor(
88
@get:JsonProperty("_index")
99
var index: String = "",
1010
@get:JsonProperty("_type")
11-
var type: String = "",
11+
var type: String? = "",
1212
@get:JsonProperty("_id")
1313
var id: String = "",
1414
@get:JsonProperty("_version")

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/document/dsl/bulk/BulkResult.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ data class BulkResult(
2525
@get:JsonProperty("_index")
2626
open val index: String,
2727
@get:JsonProperty("_type")
28-
open val type: String,
28+
open val type: String?,
2929
@get:JsonProperty("_id")
3030
open val id: String,
3131
open val status: Long = -1,
@@ -37,7 +37,7 @@ data class BulkResult(
3737

3838
data class BulkCreateResult(
3939
override val index: String,
40-
override val type: String,
40+
override val type: String?,
4141
override val id: String,
4242
override val status: Long,
4343
override val result: String?,
@@ -46,7 +46,7 @@ data class BulkResult(
4646

4747
data class BulkIndexResult(
4848
override val index: String,
49-
override val type: String,
49+
override val type: String?,
5050
override val id: String,
5151
override val status: Long,
5252
override val result: String?,
@@ -55,7 +55,7 @@ data class BulkResult(
5555

5656
data class BulkUpdateResult(
5757
override val index: String,
58-
override val type: String,
58+
override val type: String?,
5959
override val id: String,
6060
override val status: Long,
6161
override val result: String?,
@@ -64,7 +64,7 @@ data class BulkResult(
6464

6565
data class BulkDeleteResult(
6666
override val index: String,
67-
override val type: String,
67+
override val type: String?,
6868
override val id: String,
6969
override val status: Long,
7070
override val result: String?,

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/document/dsl/delete/DeleteResult.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
44

55
data class DeleteResult(
66
@JsonProperty("_index") val index: String,
7-
@JsonProperty("_type") val type: String,
7+
@JsonProperty("_type") val type: String?,
88
@JsonProperty("_id") val id: String,
99
@JsonProperty("_version") val version: Int,
1010
val result: String? = null

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/document/dsl/index/IndexResult.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
44

55
data class IndexResult(
66
@JsonProperty("_index") val index: String,
7-
@JsonProperty("_type") val type: String,
7+
@JsonProperty("_type") val type: String?,
88
@JsonProperty("_id") val id: String,
99
@JsonProperty("_version") val version: Int,
1010
val result: String? = null

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/document/dsl/update/UpdateResult.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
44

55
data class UpdateResult(
66
@JsonProperty("_index") val index: String,
7-
@JsonProperty("_type") val type: String,
7+
@JsonProperty("_type") val type: String?,
88
@JsonProperty("_id") val id: String,
99
@JsonProperty("_version") val version: Int,
1010
val result: String? = null

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/search/ElasticSearch.kt

-20
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,6 @@ import kotlin.reflect.jvm.javaMethod
1919
*/
2020
interface ElasticSearch : ElasticService {
2121

22-
/**
23-
* Search documents with across multiple types within an index, and across multiple indices.
24-
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-multi-index-type)
25-
*/
26-
fun search(indices: List<String>, types: List<String>): ServiceCall<SearchRequest, ByteString>
27-
28-
/**
29-
* Search documents an index with the specified type.
30-
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-multi-index-type)
31-
*/
32-
fun search(index: String, type: String): ServiceCall<SearchRequest, ByteString>
33-
3422
/**
3523
* Search documents across all types within an index.
3624
* See also [Elasticsearch Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-multi-index-type)
@@ -58,18 +46,10 @@ interface ElasticSearch : ElasticService {
5846
@JvmDefault
5947
override fun descriptor(): Descriptor {
6048
return named("elastic-search").withCalls(
61-
restCall<SearchRequest, ByteString>(
62-
GET, "/:indices/:types/_search",
63-
forceKF<ElasticSearch.(List<String>, List<String>) -> ServiceCall<*, *>>(ElasticSearch::search).javaMethod
64-
),
6549
restCall<SearchRequest, ByteString>(
6650
GET, "/:indices/_search",
6751
forceKF<ElasticSearch.(List<String>) -> ServiceCall<*, *>>(ElasticSearch::search).javaMethod
6852
),
69-
restCall<SearchRequest, ByteString>(
70-
GET, "/:index/:type/_search",
71-
forceKF<ElasticSearch.(String, String) -> ServiceCall<*, *>>(ElasticSearch::search).javaMethod
72-
),
7353
restCall<SearchRequest, ByteString>(
7454
GET, "/:index/_search",
7555
forceKF<ElasticSearch.(String) -> ServiceCall<*, *>>(ElasticSearch::search).javaMethod

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/search/ScrollSearchSourceStage.kt

+18-8
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,15 @@ import org.taymyr.lagom.elasticsearch.search.dsl.SearchResult
1313
import org.taymyr.lagom.elasticsearch.search.dsl.query.Order
1414
import java.util.LinkedList
1515
import java.util.Queue
16+
import java.util.concurrent.CompletionStage
17+
import java.util.function.Function
1618

1719
/**
1820
* The [GraphStage] stage of scrolling search results.
1921
*/
2022
class ScrollSearchSourceStage<T : SearchResult<*>>(
21-
val index: String,
22-
val type: String,
23-
val request: SearchRequest,
24-
val resultClass: Class<T>,
25-
val client: ElasticSearch
23+
private val request: SearchRequest,
24+
private val searchAction: Function<SearchRequest, CompletionStage<T>>
2625
) : GraphStage<SourceShape<T>>() {
2726
private val out = Outlet.create<T>("ScrollSearchSourceStage.out")
2827
private val shape = SourceShape(out)
@@ -48,7 +47,7 @@ class ScrollSearchSourceStage<T : SearchResult<*>>(
4847
}
4948

5049
private fun searchAfter(r: SearchRequest) {
51-
client.search(index, type).invoke(r, resultClass).whenComplete { result, throwable ->
50+
searchAction.apply(r).whenComplete { result, throwable ->
5251
if (throwable != null) {
5352
searchFailureCallback.invoke(throwable)
5453
} else {
@@ -94,7 +93,18 @@ class ScrollSearchSourceStage<T : SearchResult<*>>(
9493
* Creates a [Source] of scrolling search results.
9594
*/
9695
@JvmStatic
97-
fun <T : SearchResult<*>> scrollSearchSource(index: String, docType: String, request: SearchRequest, resultClass: Class<T>, client: ElasticSearch) =
98-
Source.fromGraph(ScrollSearchSourceStage(index, docType, request, resultClass, client))
96+
fun <T : SearchResult<*>> scrollSearchSource(index: String, request: SearchRequest, resultClass: Class<T>, client: ElasticSearch) =
97+
Source.fromGraph(
98+
ScrollSearchSourceStage(request) { client.search(index).invoke(it, resultClass) }
99+
)
100+
101+
/**
102+
* Creates a [Source] of scrolling search results.
103+
*/
104+
@JvmStatic
105+
fun <T : SearchResult<*>> scrollSearchSource(request: SearchRequest, searchAction: Function<SearchRequest, CompletionStage<T>>) =
106+
Source.fromGraph(
107+
ScrollSearchSourceStage(request, searchAction)
108+
)
99109
}
100110
}

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/search/SearchScroller.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ import java.util.concurrent.CompletionStage
1414
*/
1515
@Deprecated("It is recommended to use ScrollSearchSource instead")
1616
class SearchScroller(
17-
val elasticSearch: ElasticSearch,
18-
val index: String,
19-
val type: String
17+
private val elasticSearch: ElasticSearch,
18+
private val index: String
2019
) {
2120
fun <DocType, ResultType : SearchResult<out DocType>> searchAfter(
2221
request: SearchRequest,
@@ -37,7 +36,7 @@ class SearchScroller(
3736
cf: CompletableFuture<List<DocType>>,
3837
accum: PSequence<DocType>
3938
) {
40-
elasticSearch.search(index, type).invoke(request, resultType).whenComplete { result, throwable ->
39+
elasticSearch.search(index).invoke(request, resultType).whenComplete { result, throwable ->
4140
if (result != null) {
4241
if (result.sources.isNullOrEmpty()) {
4342
cf.complete(accum)

java/src/main/kotlin/org/taymyr/lagom/elasticsearch/search/dsl/HitResult.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
44

55
data class HitResult<T>(
66
@JsonProperty("_index") val index: String,
7-
@JsonProperty("_type") val type: String,
7+
@JsonProperty("_type") val type: String?,
88
@JsonProperty("_id") val id: String,
99
@JsonProperty("_score") val score: Double,
1010
@JsonProperty("_source") val source: T,

java/src/test/java/org/taymyr/lagom/elasticsearch/AbstractElasticsearchIT.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.testcontainers.elasticsearch.ElasticsearchContainer;
1515
import org.testcontainers.junit.jupiter.Container;
1616
import org.testcontainers.junit.jupiter.Testcontainers;
17+
import org.testcontainers.utility.DockerImageName;
1718
import play.api.Configuration;
1819
import scala.collection.immutable.Map$;
1920

@@ -31,7 +32,7 @@
3132
@Testcontainers
3233
public class AbstractElasticsearchIT {
3334

34-
static final String ELASTIC_VERSION = "7.16.2";
35+
static final String ELASTIC_VERSION = "7.17.28";
3536

3637
protected static Config config;
3738
protected static ActorSystem actorSystem;
@@ -42,7 +43,10 @@ public class AbstractElasticsearchIT {
4243
protected static ElasticDocument elasticDocument;
4344

4445
@Container
45-
protected static final ElasticsearchContainer elastic = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + ELASTIC_VERSION);
46+
protected static final ElasticsearchContainer elastic = new ElasticsearchContainer(
47+
DockerImageName.parse("elasticsearch:" + ELASTIC_VERSION)
48+
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch:" + ELASTIC_VERSION)
49+
);
4650

4751
@BeforeAll
4852
static void beforeAll() throws URISyntaxException {

java/src/test/java/org/taymyr/lagom/elasticsearch/document/ElasticDocumentIT.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -36,63 +36,63 @@ void shouldWorkCorrect() throws InterruptedException, ExecutionException, Timeou
3636
}
3737

3838
private void indexDocument() throws InterruptedException, ExecutionException, TimeoutException {
39-
IndexResult result = eventually(invoke(elasticDocument.indexWithId("test", "sample", "1"),
39+
IndexResult result = eventually(invoke(elasticDocument.indexWithId("test", "1"),
4040
new TestDocument("user", "message")));
4141
assertThat(result.getIndex()).isEqualTo("test");
42-
assertThat(result.getType()).isEqualTo("sample");
42+
assertThat(result.getType()).isEqualTo("_doc");
4343
}
4444

4545
private void getDocument() throws InterruptedException, ExecutionException, TimeoutException {
46-
IndexedTestDocument result = eventually(invoke(elasticDocument.get("test", "sample", "1"), IndexedTestDocument.class));
46+
IndexedTestDocument result = eventually(invoke(elasticDocument.get("test", "1"), IndexedTestDocument.class));
4747
assertThat(result.getIndex()).isEqualTo("test");
48-
assertThat(result.getType()).isEqualTo("sample");
48+
assertThat(result.getType()).isEqualTo("_doc");
4949
assertThat(result.getId()).isEqualTo("1");
5050
assertThat(result.getSource().getUser()).isEqualTo("user.bulkUpdate");
5151
assertThat(result.getSource().getMessage()).isEqualTo("message");
5252
}
5353

5454
private void getSource() throws InterruptedException, ExecutionException, TimeoutException {
55-
TestDocument result = eventually(invoke(elasticDocument.getSource("test", "sample", "1"), TestDocument.class));
55+
TestDocument result = eventually(invoke(elasticDocument.getSource("test", "1"), TestDocument.class));
5656
assertThat(result.getUser()).isEqualTo("user.update");
5757
assertThat(result.getMessage()).isEqualTo("message");
5858
}
5959

6060
private void checkExists() throws InterruptedException, ExecutionException, TimeoutException {
61-
Done result = eventually(elasticDocument.exists("test", "sample", "1").invoke());
61+
Done result = eventually(elasticDocument.exists("test", "1").invoke());
6262
assertThat(result).isEqualTo(Done.getInstance());
63-
result = eventually(elasticDocument.existsSource("test", "sample", "1").invoke());
63+
result = eventually(elasticDocument.existsSource("test", "1").invoke());
6464
assertThat(result).isEqualTo(Done.getInstance());
6565
}
6666

6767
private void bulkUpdate() throws InterruptedException, ExecutionException, TimeoutException {
6868
BulkRequest request = BulkRequest.of(
6969
new BulkUpdate("1", new IndexedTestDocument(new TestDocument("user.bulkUpdate", "message")))
7070
);
71-
BulkResult result = eventually(elasticDocument.bulk("test", "sample").invoke(request));
71+
BulkResult result = eventually(elasticDocument.bulk("test").invoke(request));
7272
assertThat(result.isErrors()).isFalse();
7373
assertThat(result.getItems()).hasSize(1);
7474
assertThat(result.getItems().get(0).getStatus()).isEqualTo(200);
7575
assertThat(result.getItems().get(0).getResult()).isEqualTo("updated");
7676
assertThat(result.getItems().get(0).getError()).isNull();
7777
assertThat(result.getItems().get(0).getIndex()).isEqualTo("test");
78-
assertThat(result.getItems().get(0).getType()).isEqualTo("sample");
78+
assertThat(result.getItems().get(0).getType()).isEqualTo("_doc");
7979
assertThat(result.getItems().get(0).getId()).isEqualTo("1");
8080
}
8181

8282
private void update() throws InterruptedException, ExecutionException, TimeoutException {
8383
TestDocument doc = new TestDocument("user.update", "message");
8484
UpdateRequest updateRequest = DocUpdateRequest.builder().doc(doc).build();
85-
UpdateResult result = eventually(invoke(elasticDocument.update("test", "sample", "1"), updateRequest));
85+
UpdateResult result = eventually(invoke(elasticDocument.update("test", "1"), updateRequest));
8686
assertThat(result.getIndex()).isEqualTo("test");
87-
assertThat(result.getType()).isEqualTo("sample");
87+
assertThat(result.getType()).isEqualTo("_doc");
8888
assertThat(result.getId()).isEqualTo("1");
8989
assertThat(result.getResult()).isEqualTo("updated");
9090
}
9191

9292
private void delete() throws InterruptedException, ExecutionException, TimeoutException {
93-
DeleteResult result = eventually(elasticDocument.delete("test", "sample", "1").invoke());
93+
DeleteResult result = eventually(elasticDocument.delete("test", "1").invoke());
9494
assertThat(result.getIndex()).isEqualTo("test");
95-
assertThat(result.getType()).isEqualTo("sample");
95+
assertThat(result.getType()).isEqualTo("_doc");
9696
assertThat(result.getId()).isEqualTo("1");
9797
assertThat(result.getResult()).isEqualTo("deleted");
9898
}

0 commit comments

Comments
 (0)