elasticsearch_v9

Publishes messages into an Elasticsearch index. If the index does not exist, the output creates one with a dynamic mapping.

Both the id and index fields can be dynamically set using function interpolations. When sending batched messages these interpolations are performed per message part.

Introduced in version 4.77.0.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in-flight messages (or message batches) with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. You can compose a batch at both the input and output level. For more information, see Message Batching.

  • Common

  • Advanced

outputs:
  label: ""
  elasticsearch_v9:
    urls: [] # No default (required)
    index: "" # No default (required)
    action: "" # No default (required)
    id: "" # No default (required)
    max_in_flight: 64
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
outputs:
  label: ""
  elasticsearch_v9:
    urls: [] # No default (required)
    index: "" # No default (required)
    action: "" # No default (required)
    id: "" # No default (required)
    pipeline: ""
    routing: ""
    retry_on_conflict: 0
    tls:
      enabled: false
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    max_in_flight: 64
    basic_auth:
      enabled: false
      username: ""
      password: ""
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

Fields

action

The action to perform on each document. This field must resolve to one of the following action types: index, update, delete, create or upsert. This field supports interpolation functions.

For more information on how the update action works, see Updating Documents. For examples of how to use the create and upsert actions, see Create Documents and Upserting Documents respectively.

Type: string

basic_auth

Allows you to specify basic authentication.

Type: object

basic_auth.enabled

Whether to use basic authentication in requests.

Type: bool

Default: false

basic_auth.password

A password to authenticate with.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

basic_auth.username

A username to authenticate as.

Type: string

Default: ""

batching

Configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

id

Define the ID for indexed messages. Use function interpolations to dynamically create a unique ID for each message.

This field supports interpolation functions.

Type: string

# Examples:
id: ${!counter()}-${!timestamp_unix()}

index

The Elasticsearch index where messages are published. This field supports interpolation functions.

Type: string

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int

Default: 64

pipeline

Specify the ID of a pipeline to preprocess incoming documents before they are published (optional). This field supports interpolation functions.

Type: string

Default: ""

retry_on_conflict

The number of times to retry an update operation when a version conflict occurs.

Type: int

Default: 0

routing

The routing key to use for the document. This field supports interpolation functions.

Type: string

Default: ""

tls

Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments.

Type: object

tls.client_certs[]

A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates.

You must set tls.enabled: true for the client certificates to take effect.

Certificate pairing rules: For each certificate item, provide either:

  • Inline PEM data using both cert and key or

  • File paths using both cert_file and key_file.

Mixing inline and file-based values within the same item is not supported.

Type: object

Default: []

# Examples:
client_certs:
  - cert: foo
    key: bar


# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

A plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].cert_file

The path of a certificate to use.

Type: string

Default: ""

tls.client_certs[].key

A plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}

tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Requires version 3.45.0 or later.

Type: bool

Default: false

tls.enabled

Whether to enable TLS for secure connections. Set to true to enable TLS encryption. Required to be true for other TLS options (like client_certs, root_cas, etc.) to take effect.

Type: bool

Default: false

tls.root_cas

Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data.

Type: string

Default: ""

# Examples:
root_cas_file: ./root_cas.pem

tls.skip_cert_verify

Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely.

Type: bool

Default: false

urls[]

A list of URLs to connect to. This output attempts to connect to each URL in the list, in order, until a successful connection is established. If an item in the list contains commas, it is split into multiple URLs.

Type: array

# Examples:
urls:
  - "http://localhost:9200"

Examples

Updating Documents

When updating documents, the request body should contain a combination of a doc, upsert, and/or script fields at the top level, this should be done via mapping processors. doc updates using a partial document, script performs an update using a scripting language such as the built in Painless language, and upsert updates an existing document or inserts a new one if it doesn’t exist. For more information on the structures and behaviors of these fields, please see the Elasticsearch Update API

# Partial document update
output:
  processors:
    - mapping: |
        meta id = this.id
        # Performs a partial update on the document.
        root.doc = this
  elasticsearch_v9:
    urls: [localhost:9200]
    index: foo
    id: ${! @id }
    action: update

# Scripted update
output:
  processors:
    - mapping: |
        meta id = this.id
        # Increments the field "counter" by 1.
        root.script.source = "ctx._source.counter += 1"
  elasticsearch_v9:
    urls: [localhost:9200]
    index: foo
    id: ${! @id }
    action: update

# Upsert
output:
  processors:
    - mapping: |
        meta id = this.id
        # If the product with the ID exists, its price will be updated to 50.
        # If the product does not exist, a new document with ID 1 and a price
        # of 100 will be inserted.
        root.doc.product_price = 50
        root.upsert.product_price = 100
  elasticsearch_v9:
    urls: [localhost:9200]
    index: foo
    id: ${! @id }
    action: update

Indexing documents from Redpanda

Here we read messages from a Redpanda cluster and write them to an Elasticsearch index using a field from the message as the ID for the Elasticsearch document.

input:
  redpanda:
    seed_brokers: [localhost:19092]
    topics: ["things"]
    consumer_group: "rpcn3"
  processors:
    - mapping: |
        meta id = this.id
        root = this
output:
  elasticsearch_v9:
    urls: ['http://localhost:9200']
    index: "things"
    action: "index"
    id: ${! meta("id") }

Indexing documents from S3

Here we read messages from a AWS S3 bucket and write them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document.

input:
  aws_s3:
    bucket: "my-cool-bucket"
    prefix: "bug-facts/"
    scanner:
      to_the_end: {}
output:
  elasticsearch_v9:
    urls: ['http://localhost:9200']
    index: "cool-bug-facts"
    action: "index"
    id: ${! meta("s3_key") }

Create Documents

When using the create action, a new document will be created if the document ID does not already exist. If the document ID already exists, the operation will fail.

output:
  elasticsearch_v9:
    urls: [localhost:9200]
    index: foo
    id: ${! json("id") }
    action: create

Upserting Documents

When using the upsert action, if the document ID already exists, it will be updated. If the document ID does not exist, a new document will be inserted. The request body should contain the document to be indexed.

output:
  processors:
    - mapping: |
        meta id = this.id
        root = this.doc
  elasticsearch_v9:
    urls: [localhost:9200]
    index: foo
    id: ${! @id }
    action: upsert