sql_raw

Executes a select query and creates a message for each row received.

  • Common

  • Advanced

# Common configuration fields, showing default values
input:
  label: ""
  sql_raw:
    driver: "" # No default (required)
    dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required)
    query: SELECT * FROM footable WHERE user_id = $1; # No default (required)
    args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
    auto_replay_nacks: true
# All configuration fields, showing default values
input:
  label: ""
  sql_raw:
    driver: "" # No default (required)
    dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required)
    query: SELECT * FROM footable WHERE user_id = $1; # No default (required)
    args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
    auto_replay_nacks: true
    init_files: [] # No default (optional)
    init_statement: | # No default (optional)
      CREATE TABLE IF NOT EXISTS some_table (
        foo varchar(50) not null,
        bar integer,
        baz varchar(50),
        primary key (foo)
      ) WITHOUT ROWID;
    conn_max_idle_time: "" # No default (optional)
    conn_max_life_time: "" # No default (optional)
    conn_max_idle: 2
    conn_max_open: 0 # No default (optional)

When the rows from the query are exhausted, this input shuts down, allowing the pipeline to gracefully terminate or for the next input in a sequence to execute.

Examples

Consumes an SQL table using a query as an input.

Here we preform an aggregate over a list of names in a table that are less than 3600 seconds old.

input:
  sql_raw:
    driver: postgres
    dsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disable
    query: "SELECT name, count(*) FROM person WHERE last_updated < $1 GROUP BY name;"
    args_mapping: |
      root = [
        now().ts_unix() - 3600
      ]

Fields

args_mapping

An optional Bloblang mapping that includes the same number of values in an array as the placeholder arguments in the query field.

Type: string

# Examples:
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

auto_replay_nacks

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

Type: bool

Default: true

conn_max_idle

An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value ⇐ 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.

Type: int

Default: 2

conn_max_idle_time

An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections idle time.

Type: string

conn_max_life_time

An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections age.

Type: string

conn_max_open

An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value ⇐ 0, then there is no limit on the number of open connections. The default is 0 (unlimited).

Type: int

driver

A database driver to use.

Type: string

Options: mysql, postgres, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner

dsn

A Data Source Name to identify the target database.

Type: string

# Examples:
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
dsn: oracle://foouser:foopass@localhost:1521/service_name

init_files[]

An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).

Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files.

If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Type: array

# Examples:
init_files:
  - ./init/*.sql

  - ./foo.sql
  - ./bar.sql

init_statement

An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.

If both init_statement and init_files are specified the init_statement is executed after the init_files.

If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Type: string

# Examples:
init_statement: |-

  CREATE TABLE IF NOT EXISTS some_table (
    foo varchar(50) not null,
    bar integer,
    baz varchar(50),
    primary key (foo)
  ) WITHOUT ROWID;

query

The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2 and so on). The style to use is outlined in this table:

Driver Placeholder Style

clickhouse

Dollar sign ($)

gocosmos

Colon (:)

mysql

Question mark (?)

mssql

Question mark (?)

oracle

Colon (:)

postgres

Dollar sign ($)

snowflake

Question mark (?)

spanner

Question mark (?)

sqlite

Question mark (?)

trino

Question mark (?)

Type: string

# Examples:
query: SELECT * FROM footable WHERE user_id = $1;