Docs Connect Components Catalog Outputs elasticsearch_v9 elasticsearch_v9 Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Available in: Self-Managed Publishes messages into an Elasticsearch index. If the index does not exist, the output creates one with a dynamic mapping. Both the id and index fields can be dynamically set using function interpolations. When sending batched messages these interpolations are performed per message part. Introduced in version 4.77.0. Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in-flight messages (or message batches) with the field max_in_flight. This output benefits from sending messages as a batch for improved performance. You can compose a batch at both the input and output level. For more information, see Message Batching. Common Advanced outputs: label: "" elasticsearch_v9: urls: [] # No default (required) index: "" # No default (required) action: "" # No default (required) id: "" # No default (required) max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) outputs: label: "" elasticsearch_v9: urls: [] # No default (required) index: "" # No default (required) action: "" # No default (required) id: "" # No default (required) pipeline: "" routing: "" retry_on_conflict: 0 tls: enabled: false skip_cert_verify: false enable_renegotiation: false root_cas: "" root_cas_file: "" client_certs: [] max_in_flight: 64 basic_auth: enabled: false username: "" password: "" batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) Fields action The action to perform on each document. This field must resolve to one of the following action types: index, update, delete, create or upsert. This field supports interpolation functions. For more information on how the update action works, see Updating Documents. For examples of how to use the create and upsert actions, see Create Documents and Upserting Documents respectively. Type: string basic_auth Allows you to specify basic authentication. Type: object basic_auth.enabled Whether to use basic authentication in requests. Type: bool Default: false basic_auth.password A password to authenticate with. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" basic_auth.username A username to authenticate as. Type: string Default: "" batching Configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s # --- batching: count: 10 period: 1s # --- batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching. Type: int Default: 0 batching.check A Bloblang query that returns a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count The number of messages after which the batch is flushed. Set to 0 to disable count-based batching. Type: int Default: 0 batching.period The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: "" # Examples: period: 1s # --- period: 1m # --- period: 500ms batching.processors[] A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op. Type: processor # Examples: processors: - archive: format: concatenate # --- processors: - archive: format: lines # --- processors: - archive: format: json_array id Define the ID for indexed messages. Use function interpolations to dynamically create a unique ID for each message. This field supports interpolation functions. Type: string # Examples: id: ${!counter()}-${!timestamp_unix()} index The Elasticsearch index where messages are published. This field supports interpolation functions. Type: string max_in_flight The maximum number of messages to have in flight at a given time. Increase this to improve throughput. Type: int Default: 64 pipeline Specify the ID of a pipeline to preprocess incoming documents before they are published (optional). This field supports interpolation functions. Type: string Default: "" retry_on_conflict The number of times to retry an update operation when a version conflict occurs. Type: int Default: 0 routing The routing key to use for the document. This field supports interpolation functions. Type: string Default: "" tls Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments. Type: object tls.client_certs[] A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates. You must set tls.enabled: true for the client certificates to take effect. Certificate pairing rules: For each certificate item, provide either: Inline PEM data using both cert and key or File paths using both cert_file and key_file. Mixing inline and file-based values within the same item is not supported. Type: object Default: [] # Examples: client_certs: - cert: foo key: bar # --- client_certs: - cert_file: ./example.pem key_file: ./example.key tls.client_certs[].cert A plain text certificate to use. Type: string Default: "" tls.client_certs[].cert_file The path of a certificate to use. Type: string Default: "" tls.client_certs[].key A plain text certificate key to use. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" tls.client_certs[].key_file The path of a certificate key to use. Type: string Default: "" tls.client_certs[].password A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format. Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: password: foo # --- password: ${KEY_PASSWORD} tls.enable_renegotiation Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation. Requires version 3.45.0 or later. Type: bool Default: false tls.enabled Whether to enable TLS for secure connections. Set to true to enable TLS encryption. Required to be true for other TLS options (like client_certs, root_cas, etc.) to take effect. Type: bool Default: false tls.root_cas Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: root_cas: |- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- tls.root_cas_file Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data. Type: string Default: "" # Examples: root_cas_file: ./root_cas.pem tls.skip_cert_verify Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely. Type: bool Default: false urls[] A list of URLs to connect to. This output attempts to connect to each URL in the list, in order, until a successful connection is established. If an item in the list contains commas, it is split into multiple URLs. Type: array # Examples: urls: - "http://localhost:9200" Examples Updating Documents When updating documents, the request body should contain a combination of a doc, upsert, and/or script fields at the top level, this should be done via mapping processors. doc updates using a partial document, script performs an update using a scripting language such as the built in Painless language, and upsert updates an existing document or inserts a new one if it doesn’t exist. For more information on the structures and behaviors of these fields, please see the Elasticsearch Update API # Partial document update output: processors: - mapping: | meta id = this.id # Performs a partial update on the document. root.doc = this elasticsearch_v9: urls: [localhost:9200] index: foo id: ${! @id } action: update # Scripted update output: processors: - mapping: | meta id = this.id # Increments the field "counter" by 1. root.script.source = "ctx._source.counter += 1" elasticsearch_v9: urls: [localhost:9200] index: foo id: ${! @id } action: update # Upsert output: processors: - mapping: | meta id = this.id # If the product with the ID exists, its price will be updated to 50. # If the product does not exist, a new document with ID 1 and a price # of 100 will be inserted. root.doc.product_price = 50 root.upsert.product_price = 100 elasticsearch_v9: urls: [localhost:9200] index: foo id: ${! @id } action: update Indexing documents from Redpanda Here we read messages from a Redpanda cluster and write them to an Elasticsearch index using a field from the message as the ID for the Elasticsearch document. input: redpanda: seed_brokers: [localhost:19092] topics: ["things"] consumer_group: "rpcn3" processors: - mapping: | meta id = this.id root = this output: elasticsearch_v9: urls: ['http://localhost:9200'] index: "things" action: "index" id: ${! meta("id") } Indexing documents from S3 Here we read messages from a AWS S3 bucket and write them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document. input: aws_s3: bucket: "my-cool-bucket" prefix: "bug-facts/" scanner: to_the_end: {} output: elasticsearch_v9: urls: ['http://localhost:9200'] index: "cool-bug-facts" action: "index" id: ${! meta("s3_key") } Create Documents When using the create action, a new document will be created if the document ID does not already exist. If the document ID already exists, the operation will fail. output: elasticsearch_v9: urls: [localhost:9200] index: foo id: ${! json("id") } action: create Upserting Documents When using the upsert action, if the document ID already exists, it will be updated. If the document ID does not exist, a new document will be inserted. The request body should contain the document to be indexed. output: processors: - mapping: | meta id = this.id root = this.doc elasticsearch_v9: urls: [localhost:9200] index: foo id: ${! @id } action: upsert Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! elasticsearch_v8 fallback