elasticsearch_v9
Publishes messages into an Elasticsearch index. If the index does not exist, the output creates one with a dynamic mapping.
Both the id and index fields can be dynamically set using function interpolations. When sending batched messages these interpolations are performed per message part.
Introduced in version 4.77.0.
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in-flight messages (or message batches) with the field max_in_flight.
This output benefits from sending messages as a batch for improved performance. You can compose a batch at both the input and output level. For more information, see Message Batching.
-
Common
-
Advanced
outputs:
label: ""
elasticsearch_v9:
urls: [] # No default (required)
index: "" # No default (required)
action: "" # No default (required)
id: "" # No default (required)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
outputs:
label: ""
elasticsearch_v9:
urls: [] # No default (required)
index: "" # No default (required)
action: "" # No default (required)
id: "" # No default (required)
pipeline: ""
routing: ""
retry_on_conflict: 0
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: ""
root_cas_file: ""
client_certs: []
max_in_flight: 64
basic_auth:
enabled: false
username: ""
password: ""
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Fields
action
The action to perform on each document. This field must resolve to one of the following action types: index, update, delete, create or upsert. This field supports interpolation functions.
For more information on how the update action works, see Updating Documents. For examples of how to use the create and upsert actions, see Create Documents and Upserting Documents respectively.
Type: string
basic_auth
Configure basic authentication credentials for connecting to Elasticsearch. When enabled, these credentials are sent with each request to authenticate with the cluster.
Type: object
basic_auth.password
A password to authenticate with.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
batching
Configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.
Type: int
Default: 0
batching.check
A Bloblang query that returns a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.
Type: int
Default: 0
batching.period
The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
id
Define the ID for indexed messages. Use function interpolations to dynamically create a unique ID for each message.
This field supports interpolation functions.
Type: string
# Examples:
id: ${!counter()}-${!timestamp_unix()}
index
The Elasticsearch index where messages are published. This field supports interpolation functions.
Type: string
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 64
pipeline
Specify the ID of a pipeline to preprocess incoming documents before they are published (optional). This field supports interpolation functions.
Type: string
Default: ""
retry_on_conflict
The number of times to retry an update operation when a version conflict occurs.
Type: int
Default: 0
routing
The routing key to use for the document. This field supports interpolation functions.
Type: string
Default: ""
tls
Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments.
Type: object
tls.client_certs[]
A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates.
You must set tls.enabled: true for the client certificates to take effect.
Certificate pairing rules: For each certificate item, provide either:
-
Inline PEM data using both
certandkeyor -
File paths using both
cert_fileandkey_file.
Mixing inline and file-based values within the same item is not supported.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Requires version 3.45.0 or later.
Type: bool
Default: false
tls.enabled
Whether to enable TLS for secure connections. Set to true to enable TLS encryption. Required to be true for other TLS options (like client_certs, root_cas, etc.) to take effect.
Type: bool
Default: false
tls.root_cas
Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
tls.root_cas_file
Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
tls.skip_cert_verify
Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely.
Type: bool
Default: false
Examples
Updating Documents
When updating documents, the request body should contain a combination of a doc, upsert, and/or script fields at the top level, this should be done via mapping processors. doc updates using a partial document, script performs an update using a scripting language such as the built in Painless language, and upsert updates an existing document or inserts a new one if it doesn’t exist. For more information on the structures and behaviors of these fields, please see the Elasticsearch Update API
# Partial document update
output:
processors:
- mapping: |
meta id = this.id
# Performs a partial update on the document.
root.doc = this
elasticsearch_v9:
urls: [localhost:9200]
index: foo
id: ${! @id }
action: update
# Scripted update
output:
processors:
- mapping: |
meta id = this.id
# Increments the field "counter" by 1.
root.script.source = "ctx._source.counter += 1"
elasticsearch_v9:
urls: [localhost:9200]
index: foo
id: ${! @id }
action: update
# Upsert
output:
processors:
- mapping: |
meta id = this.id
# If the product with the ID exists, its price will be updated to 50.
# If the product does not exist, a new document with ID 1 and a price
# of 100 will be inserted.
root.doc.product_price = 50
root.upsert.product_price = 100
elasticsearch_v9:
urls: [localhost:9200]
index: foo
id: ${! @id }
action: update
Indexing documents from Redpanda
Here we read messages from a Redpanda cluster and write them to an Elasticsearch index using a field from the message as the ID for the Elasticsearch document.
input:
redpanda:
seed_brokers: [localhost:19092]
topics: ["things"]
consumer_group: "rpcn3"
processors:
- mapping: |
meta id = this.id
root = this
output:
elasticsearch_v9:
urls: ['http://localhost:9200']
index: "things"
action: "index"
id: ${! meta("id") }
Indexing documents from S3
Here we read messages from a AWS S3 bucket and write them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document.
input:
aws_s3:
bucket: "my-cool-bucket"
prefix: "bug-facts/"
scanner:
to_the_end: {}
output:
elasticsearch_v9:
urls: ['http://localhost:9200']
index: "cool-bug-facts"
action: "index"
id: ${! meta("s3_key") }
Create Documents
When using the create action, a new document will be created if the document ID does not already exist. If the document ID already exists, the operation will fail.
output:
elasticsearch_v9:
urls: [localhost:9200]
index: foo
id: ${! json("id") }
action: create
Upserting Documents
When using the upsert action, if the document ID already exists, it will be updated. If the document ID does not exist, a new document will be inserted. The request body should contain the document to be indexed.
output:
processors:
- mapping: |
meta id = this.id
root = this.doc
elasticsearch_v9:
urls: [localhost:9200]
index: foo
id: ${! @id }
action: upsert