sql_insert
Inserts a row into an SQL database for each message.
Introduced in version 3.59.0.
-
Common
-
Advanced
outputs:
label: ""
sql_insert:
driver: "" # No default (required)
dsn: "" # No default (required)
table: "" # No default (required)
columns: [] # No default (required)
args_mapping: "" # No default (required)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
outputs:
label: ""
sql_insert:
driver: "" # No default (required)
dsn: "" # No default (required)
table: "" # No default (required)
columns: [] # No default (required)
args_mapping: "" # No default (required)
prefix: "" # No default (optional)
suffix: "" # No default (optional)
options: [] # No default (optional)
max_in_flight: 64
init_files: [] # No default (optional)
init_statement: "" # No default (optional)
conn_max_idle_time: "" # No default (optional)
conn_max_life_time: "" # No default (optional)
conn_max_idle: 2
conn_max_open: "" # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Examples
Table Insert (MySQL)
Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:
output:
sql_insert:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
table: footable
columns: [ id, name, topic ]
args_mapping: |
root = [
this.user.id,
this.user.name,
meta("kafka_topic"),
]
Dynamic SQL operations
The table and columns fields are static strings that do not support Bloblang interpolation. For dynamic table names, dynamic column lists, DELETE operations, or any other SQL that sql_insert cannot express, use the sql_raw output instead.
There is no dedicated sql_delete output. To delete rows, use sql_raw with a DELETE statement:
output:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/mydb?sslmode=disable
query: "DELETE FROM my_table WHERE id = $1"
args_mapping: root = [ this.id ]
To insert into a table determined at runtime, use sql_raw with unsafe_dynamic_query: true, which enables Bloblang interpolation in the query field.
| Interpolating user-supplied values into a query can introduce SQL injection risks. Always validate or sanitize the interpolated value beforehand. |
output:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/mydb?sslmode=disable
unsafe_dynamic_query: true
query: 'INSERT INTO ${! this.table_name } (id, value) VALUES ($1, $2)'
args_mapping: root = [ this.id, this.value ]
Fields
args_mapping
A Bloblang mapping which should evaluate to an array of values matching in size to the number of columns specified.
Type: string
# Examples:
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
# ---
args_mapping: root = [ meta("user.id") ]
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
conn_max_idle
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value ⇐ 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
Default: 2
conn_max_idle_time
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections idle time.
Type: string
conn_max_life_time
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections age.
Type: string
conn_max_open
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value ⇐ 0, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int
driver
A database driver to use.
Type: string
Options: mysql, postgres, pgx, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner, databricks
dsn
A Data Source Name to identify the target database.
Drivers
The following is a list of supported drivers, their placeholder style, and their respective DSN formats:
| Driver | Data Source Name Format |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE] |
|
|
Please note that the postgres and pgx drivers enforce SSL by default, you can override this with the parameter sslmode=disable if required.
The pgx driver is an alternative to the standard postgres (pq) driver and comes with extra functionality such as support for array insertion.
The snowflake driver supports multiple DSN formats. Please consult the docs for more details. For key pair authentication, the DSN has the following format: <snowflake_user>@<snowflake_account>/<db_name>/<schema_name>?warehouse=<warehouse>&role=<role>&authenticator=snowflake_jwt&privateKey=<base64_url_encoded_private_key>, where the value for the privateKey parameter can be constructed from an unencrypted RSA private key file rsa_key.p8 using openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0 (you can use gbasenc instead of basenc on OSX if you install coreutils via Homebrew). If you have a password-encrypted private key, you can decrypt it using openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8. Also, make sure fields such as the username are URL-encoded.
The gocosmos driver is still experimental, but it has support for hierarchical partition keys as well as cross-partition queries. Please refer to the SQL notes for details.
Type: string
# Examples:
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
# ---
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
# ---
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
# ---
dsn: oracle://foouser:foopass@localhost:1521/service_name
# ---
dsn: token:dapi1234567890ab@dbc-a1b2345c-d6e7.cloud.databricks.com:443/sql/1.0/warehouses/abc123def456
init_files[]
An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).
Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files.
If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Requires version 4.10.0 or later.
Type: array
# Examples:
init_files:
- ./init/*.sql
# ---
init_files:
- ./foo.sql
- ./bar.sql
init_statement
An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
If both init_statement and init_files are specified the init_statement is executed after the init_files.
If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Requires version 4.10.0 or later.
Type: string
# Examples:
init_statement: |-
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
options[]
A list of keyword options to add before the INTO clause of the query.
Type: array
# Examples:
options:
- DELAYED
- IGNORE