Docs Connect Components Outputs elasticsearch_v8 elasticsearch_v8 Available in: Cloud, Self-Managed Publishes messages into an Elasticsearch index. If the index does not exist, this output creates it using dynamic mapping. The elasticsearch_v8 output is based on the the go-elasticsearch/v8 library. For full information about breaking changes from previous versions, see Elastic’s Migrating to 8.0 guide. To help configure your own elasticsearch_v8 output, this page includes example pipeline configurations. Introduced in version 4.47.0. Common Advanced # Common configuration fields, showing default values output: label: "" elasticsearch_v8: urls: [] # No default (required) index: "" # No default (required) action: "" # No default (required) id: ${!counter()}-${!timestamp_unix()} # No default (required) max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" # Optional check: "" # Optional # All configuration fields, showing default values output: label: "" elasticsearch_v8: urls: [] # No default (required) index: "" # No default (required) action: "" # No default (required) id: ${!counter()}-${!timestamp_unix()} # No default (required) pipeline: "" # Optional routing: "" # Optional retry_on_conflict: 0 tls: enabled: false skip_cert_verify: false enable_renegotiation: false root_cas: "" # Optional root_cas_file: "" # Optional client_certs: [] max_in_flight: 64 basic_auth: enabled: false username: "" password: "" batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) Set values dynamically You can use function interpolations to dynamically set values for the id and index fields, as well as other fields where function interpolations are supported. When message batches are sent, interpolations are performed per message. Performance For improved performance, this output sends: Multiple messages in parallel. Adjust the max_in_flight field value to tune the maximum number of in-flight messages (or message batches). Messages as batches. You can configure batches at both input and output level. For more information, see Message Batching. Fields urls A list of URLs to connect to. This output attempts to connect to each URL in the list, in order, until a successful connection is established. If an item in the list contains commas, it is split into multiple URLs. Type: array # Examples urls: - http://localhost:9200 index The Elasticsearch index where messages are published. This field supports interpolation functions. Type: string Default: "" action The action to perform on each document. This field supports interpolation functions. For more information on how the update action works, see Example pipelines. Type: string Default: "" Options: index , update , delete id Define the ID for indexed messages. Use function interpolations to dynamically create a unique ID for each message. Type: string Default: "" # Examples ${!counter()}-${!timestamp_unix()} pipeline Specify the ID of a pipeline to preprocess incoming documents before they are published (optional). This field supports interpolation functions. Type: string Default: "" routing The routing key to use for the document. This field supports interpolation functions. Type: string Default: "" retry_on_conflict The number of times to retry an update operation when a version conflict occurs. Type: int Default: 0 tls Override system defaults with custom TLS settings. Type: object tls.enabled Enable custom TLS settings. By default, custom settings are disabled. Type: bool Default: false tls.skip_cert_verify Whether to skip server-side certificate verification. Type: bool Default: false tls.enable_renegotiation Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation. Type: bool Default: false Requires version 3.45.0 or newer tls.root_cas Specify a root certificate authority to use (optional). This is a string, representing a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples root_cas: |- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- tls.root_cas_file Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Type: string Default: "" # Examples root_cas_file: ./root_cas.pem tls.client_certs A list of client certificates to use. For each certificate, specify values for either the cert and key fields, or cert_file and key_file fields. Type: array Default: [] # Examples client_certs: - cert: foo key: bar client_certs: - cert_file: ./example.pem key_file: ./example.key tls.client_certs[].cert The plain text certificate to use. Type: string Default: "" tls.client_certs[].key The plain text certificate key to use. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" tls.client_certs[].cert_file The path of a certificate to use. Type: string Default: "" tls.client_certs[].key_file The path of a certificate key to use. Type: string Default: "" tls.client_certs[].password A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format. The pbeWithMD5AndDES-CBC algorithm does not authenticate ciphertext, and is vulnerable to padding oracle attacks, which may allow an attacker to recover the plain text password. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples password: foo password: ${KEY_PASSWORD} max_in_flight The maximum number of messages to have in flight at a given time. Increase this to improve throughput. Type: int Default: 64 basic_auth Allows you to specify basic authentication. Type: object basic_auth.enabled Whether to use basic authentication in requests. Type: bool Default: false basic_auth.username The username to use for authentication. Type: string Default: "" basic_auth.password A password to authenticate with. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" batching Allows you to configure a batching policy. Type: object # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.count The number of messages after which the batch is flushed. Set to 0 to disable count-based batching. Type: int Default: 0 batching.byte_size The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching. Type: int Default: 0 batching.period The period after which an incomplete batch is flushed regardless of its size. Type: string Default: "" # Examples period: 1s period: 1m period: 500ms batching.check A Bloblang query that should return a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples check: this.type == "end_of_transaction" batching.processors For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches. Type: array # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array Example pipelines Update documents Index documents from Redpanda Index documents from AWS S3 To update documents in the target index, the top level of the request body must include at least one of the following fields: doc: Performs partial updates on a document. upsert: Updates an existing document or inserts a document if it doesn’t exist. script: Performs an update using a scripting language, such as Elasticsearch’s Painless scripting language. The following examples show how to configure mapping processors with this output to achieve different types of updates. Example 1: Partial document update output: processors: # Sets the metadata ID field to the message ID then # performs a partial update on the document. - mapping: | meta id = this.id root.doc = this elasticsearch_v8: urls: [localhost:9200] # The URL of the Elasticsearch server. index: my_target_index # The name of the Elasticsearch index. id: ${! @id } # Sets the document ID to the value of the metadata ID field. action: update # The action to perform on each document. Example 2: Scripted update output: processors: # Sets the metadata ID field to the message ID then # increments the counter field by `1` using a script. - mapping: | meta id = this.id root.script.source = "ctx._source.counter += 1" elasticsearch_v8: urls: [localhost:9200] # The URL of the Elasticsearch server. index: my_target_index # The name of the Elasticsearch index. id: ${! @id } # Sets the document ID to the value of the metadata ID field. action: update # The action to perform on each document. Example 3: Upsert output: processors: # Sets the metadata ID field to the message ID. # If the product with the specified ID exists, update its product_price to 100. # If the document does not exist, insert a new document with the ID set to 1 # and the `product_price` set to 50. - mapping: | meta id = this.id root.doc.product_price = 100 root.upsert.product_price = 50 elasticsearch_v8: urls: [localhost:9200] # The URL of the Elasticsearch server. index: my_target_index # The name of the Elasticsearch index. id: ${! @id } # Sets the document ID to the value of the metadata ID field. action: update # The action to perform on each document. For more information on the structures and behaviors of doc, upsert, and script fields, see the Elasticsearch Update API. Reads messages from a Redpanda cluster and writes them to an Elasticsearch index using a field from the message as the document ID. # Reads messages from a Redpanda cluster. input: redpanda: seed_brokers: [localhost:19092] # The address of the Redpanda broker. topics: ["product_code"] # The topic to consume messages from. consumer_group: "rpcn3" # The consumer group ID. processors: # Sets the metadata ID field to the message ID and # sets the root of the message to the message content. - mapping: | meta id = this.id root = this # Writes messages to the specified Elasticsearch index. output: elasticsearch_v8: urls: ['http://localhost:9200'] # The URL of the Elasticsearch server. index: "product_code" # The name of the Elasticsearch index. action: "index" # The action to perform on each document. id: ${! meta("id") } # Sets the document ID to the value of the metadata ID field. Reads messages from a AWS S3 bucket and writes them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document. # Reads messages from an AWS S3 bucket. input: aws_s3: bucket: "my_bucket" # The name of the S3 bucket. prefix: "prod_inventory/" # A prefix to filter objects in the bucket. scanner: to_the_end: {} # Scans the bucket to the end. # Writes messages to the specified Elasticsearch index. output: elasticsearch_v8: urls: ['http://localhost:9200'] # The URL of the Elasticsearch server. index: "current_prod_inventory" # The name of the Elasticsearch index. action: "index" # The action to perform on each document. id: ${! meta("s3_key") } # Sets the document ID to the S3 key. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution elasticsearch fallback