in elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala [37:178]
def create[T](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper)
: pekko.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] =
create(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
/**
* Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`.
* The result status is part of the [[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]] and must be checked for
* successful execution.
*
* Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected.
*
* @param messageWriter converts type `T` to a `String` containing valid JSON
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T])
: pekko.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] =
scaladsl.ElasticsearchFlow
.create(elasticsearchParams, settings, messageWriter)
.asJava
/**
* Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`
* with `passThrough` of type `C`.
* The result status is part of the [[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]] and must be checked for
* successful execution.
*
* Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected.
*
* @param objectMapper Jackson object mapper converting type `T` to JSON
*/
def createWithPassThrough[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper): pekko.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] =
createWithPassThrough(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
/**
* Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`
* with `passThrough` of type `C`.
* The result status is part of the [[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]] and must be checked for
* successful execution.
*
* Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected.
*
* @param messageWriter converts type `T` to a `String` containing valid JSON
*/
def createWithPassThrough[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T]): pekko.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] =
scaladsl.ElasticsearchFlow
.createWithPassThrough(elasticsearchParams, settings, messageWriter)
.asJava
/**
* Create a flow to update Elasticsearch with
* [[java.util.List[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`
* with `passThrough` of type `C`.
* The result status is part of the [[java.util.List[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]]]
* and must be checked for successful execution.
*
* Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected.
*
* @param objectMapper Jackson object mapper converting type `T` to JSON
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper): pekko.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]],
java.util.List[WriteResult[T, C]], NotUsed] =
createBulk(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
/**
* Create a flow to update Elasticsearch with
* [[java.util.List[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`
* with `passThrough` of type `C`.
* The result status is part of the [[java.util.List[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]]]
* and must be checked for successful execution.
*
* Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected.
*
* @param messageWriter converts type `T` to a `String` containing valid JSON
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T]): pekko.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]],
java.util.List[WriteResult[T, C]], NotUsed] = pekko.stream.scaladsl
.Flow[java.util.List[WriteMessage[T, C]]]
.map(_.asScala.toIndexedSeq)
.via(
scaladsl.ElasticsearchFlow
.createBulk(elasticsearchParams, settings, messageWriter))
.map(_.asJava)
.asJava
/**
* Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`
* with `context` of type `C`.
* The result status is part of the [[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]] and must be checked for
* successful execution.
*
* @param objectMapper Jackson object mapper converting type `T` to JSON
* @throws IllegalArgumentException When settings configure retrying.
*/
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper)
: pekko.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] =
createWithContext(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
/**
* Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T`
* with `context` of type `C`.
* The result status is part of the [[pekko.stream.connectors.elasticsearch.WriteResult WriteResult]] and must be checked for
* successful execution.
*
* @param messageWriter converts type `T` to a `String` containing valid JSON
* @throws IllegalArgumentException When settings configure retrying.
*/
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T])
: pekko.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] =
scaladsl.ElasticsearchFlow
.createWithContext(elasticsearchParams, settings, messageWriter)
.asJava
private final class JacksonWriter[T](mapper: ObjectMapper) extends MessageWriter[T] {
override def convert(message: T): String =
mapper.writeValueAsString(message)
}