elasticsearch_v8
Publishes messages into an Elasticsearch index. If the index does not exist, this output creates it using dynamic mapping.
The elasticsearch_v8 output is based on the the go-elasticsearch/v8 library. For full information about breaking changes from previous versions, see Elastic’s Migrating to 8.0 guide.
|
To help configure your own elasticsearch_v8
output, this page includes example pipeline configurations.
-
Common
-
Advanced
# Common configuration fields, showing default values
output:
label: ""
elasticsearch_v8:
urls: [] # No default (required)
index: "" # No default (required)
action: "" # No default (required)
id: ${!counter()}-${!timestamp_unix()} # No default (required)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: "" # Optional
check: "" # Optional
# All configuration fields, showing default values
output:
label: ""
elasticsearch_v8:
urls: [] # No default (required)
index: "" # No default (required)
action: "" # No default (required)
id: ${!counter()}-${!timestamp_unix()} # No default (required)
pipeline: "" # Optional
routing: "" # Optional
retry_on_conflict: 0
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: "" # Optional
root_cas_file: "" # Optional
client_certs: []
max_in_flight: 64
basic_auth:
enabled: false
username: ""
password: ""
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Set values dynamically
You can use function interpolations to dynamically set values for the id
and index
fields, as well as other fields where function interpolations are supported. When message batches are sent, interpolations are performed per message.
Performance
For improved performance, this output sends:
-
Multiple messages in parallel. Adjust the
max_in_flight
field value to tune the maximum number of in-flight messages (or message batches). -
Messages as batches. You can configure batches at both input and output level. For more information, see Message Batching.
Fields
urls
A list of URLs to connect to. This output attempts to connect to each URL in the list, in order, until a successful connection is established. If an item in the list contains commas, it is split into multiple URLs.
Type: array
# Examples
urls:
- http://localhost:9200
index
The Elasticsearch index where messages are published. This field supports interpolation functions.
Type: string
Default: ""
action
The action to perform on each document. This field supports interpolation functions.
For more information on how the update
action works, see Example pipelines.
Type: string
Default: ""
Options:
index
, update
, delete
id
Define the ID for indexed messages. Use function interpolations to dynamically create a unique ID for each message.
Type: string
Default: ""
# Examples
${!counter()}-${!timestamp_unix()}
pipeline
Specify the ID of a pipeline to preprocess incoming documents before they are published (optional). This field supports interpolation functions.
Type: string
Default: ""
routing
The routing key to use for the document. This field supports interpolation functions.
Type: string
Default: ""
retry_on_conflict
The number of times to retry an update operation when a version conflict occurs.
Type: int
Default: 0
tls.enabled
Enable custom TLS settings. By default, custom settings are disabled.
Type: bool
Default: false
tls.skip_cert_verify
Whether to skip server-side certificate verification.
Type: bool
Default: false
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation
.
Type: bool
Default: false
tls.root_cas
Specify a root certificate authority to use (optional). This is a string, representing a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate.
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
tls.root_cas_file
Specify the path to a root certificate authority file (optional). This is a file, often with a .pem
extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples
root_cas_file: ./root_cas.pem
tls.client_certs
A list of client certificates to use. For each certificate, specify values for either the cert
and key
fields, or cert_file
and key_file
fields.
Type: array
Default: []
# Examples
client_certs:
- cert: foo
key: bar
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
tls.client_certs[].key
The plain text certificate key to use.
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC
algorithm is not supported for the PKCS#8 format.
The pbeWithMD5AndDES-CBC algorithm does not authenticate ciphertext, and is vulnerable to padding oracle attacks, which may allow an attacker to recover the plain text password.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples
password: foo
password: ${KEY_PASSWORD}
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 64
basic_auth.password
A password to authenticate with.
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.count
The number of messages after which the batch is flushed. Set to 0
to disable count-based batching.
Type: int
Default: 0
batching.byte_size
The number of bytes at which the batch is flushed. Set to 0
to disable size-based batching.
Type: int
Default: 0
batching.period
The period after which an incomplete batch is flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
Example pipelines
-
Update documents
-
Index documents from Redpanda
-
Index documents from AWS S3
To update documents in the target index, the top level of the request body must include at least one of the following fields:
-
doc
: Performs partial updates on a document. -
upsert
: Updates an existing document or inserts a document if it doesn’t exist. -
script
: Performs an update using a scripting language, such as Elasticsearch’s Painless scripting language.
The following examples show how to configure mapping processors with this output to achieve different types of updates.
Example 1: Partial document update
output:
processors:
# Sets the metadata ID field to the message ID then
# performs a partial update on the document.
- mapping: |
meta id = this.id
root.doc = this
elasticsearch_v8:
urls: [localhost:9200] # The URL of the Elasticsearch server.
index: my_target_index # The name of the Elasticsearch index.
id: ${! @id } # Sets the document ID to the value of the metadata ID field.
action: update # The action to perform on each document.
Example 2: Scripted update
output:
processors:
# Sets the metadata ID field to the message ID then
# increments the counter field by `1` using a script.
- mapping: |
meta id = this.id
root.script.source = "ctx._source.counter += 1"
elasticsearch_v8:
urls: [localhost:9200] # The URL of the Elasticsearch server.
index: my_target_index # The name of the Elasticsearch index.
id: ${! @id } # Sets the document ID to the value of the metadata ID field.
action: update # The action to perform on each document.
Example 3: Upsert
output:
processors:
# Sets the metadata ID field to the message ID.
# If the product with the specified ID exists, update its product_price to 100.
# If the document does not exist, insert a new document with the ID set to 1
# and the `product_price` set to 50.
- mapping: |
meta id = this.id
root.doc.product_price = 100
root.upsert.product_price = 50
elasticsearch_v8:
urls: [localhost:9200] # The URL of the Elasticsearch server.
index: my_target_index # The name of the Elasticsearch index.
id: ${! @id } # Sets the document ID to the value of the metadata ID field.
action: update # The action to perform on each document.
For more information on the structures and behaviors of doc
, upsert
, and script
fields, see the Elasticsearch Update API.
Reads messages from a Redpanda cluster and writes them to an Elasticsearch index using a field from the message as the document ID.
# Reads messages from a Redpanda cluster.
input:
redpanda:
seed_brokers: [localhost:19092] # The address of the Redpanda broker.
topics: ["product_code"] # The topic to consume messages from.
consumer_group: "rpcn3" # The consumer group ID.
processors:
# Sets the metadata ID field to the message ID and
# sets the root of the message to the message content.
- mapping: |
meta id = this.id
root = this
# Writes messages to the specified Elasticsearch index.
output:
elasticsearch_v8:
urls: ['http://localhost:9200'] # The URL of the Elasticsearch server.
index: "product_code" # The name of the Elasticsearch index.
action: "index" # The action to perform on each document.
id: ${! meta("id") } # Sets the document ID to the value of the metadata ID field.
Reads messages from a AWS S3 bucket and writes them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document.
# Reads messages from an AWS S3 bucket.
input:
aws_s3:
bucket: "my_bucket" # The name of the S3 bucket.
prefix: "prod_inventory/" # A prefix to filter objects in the bucket.
scanner:
to_the_end: {} # Scans the bucket to the end.
# Writes messages to the specified Elasticsearch index.
output:
elasticsearch_v8:
urls: ['http://localhost:9200'] # The URL of the Elasticsearch server.
index: "current_prod_inventory" # The name of the Elasticsearch index.
action: "index" # The action to perform on each document.
id: ${! meta("s3_key") } # Sets the document ID to the S3 key.