# sql_raw

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: sql_raw
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/components/outputs/sql_raw
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/components/outputs/sql_raw.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/components/outputs/sql_raw.adoc
page-git-created-date: "2024-09-09"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/sql_raw.md -->

**Type:** Output ▼

[Output](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/sql_raw/)[Input](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/sql_raw/)[Processor](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/sql_raw/)

**Available in:** Cloud, [Self-Managed](https://docs.redpanda.com/connect/components/outputs/sql_raw/%20%22View%20the%20Self-Managed%20version%20of%20this%20component%22)

Executes an arbitrary SQL query for each message.

#### Common

```yml
outputs:
  label: ""
  sql_raw:
    driver: "" # No default (required)
    dsn: "" # No default (required)
    query: "" # No default (optional)
    args_mapping: "" # No default (optional)
    queries: [] # No default (optional)
    max_in_flight: 64
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
```

#### Advanced

```yml
outputs:
  label: ""
  sql_raw:
    driver: "" # No default (required)
    dsn: "" # No default (required)
    query: "" # No default (optional)
    unsafe_dynamic_query: false
    args_mapping: "" # No default (optional)
    queries: [] # No default (optional)
    max_in_flight: 64
    init_files: [] # No default (optional)
    init_statement: "" # No default (optional)
    conn_max_idle_time: "" # No default (optional)
    conn_max_life_time: "" # No default (optional)
    conn_max_idle: 2
    conn_max_open: "" # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
```

For some scenarios where you might use this output, see [Examples](#examples).

## [](#fields)Fields

### [](#args_mapping)`args_mapping`

An optional [Bloblang mapping](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that includes the same number of values in an array as the placeholder arguments in the [`query`](#query) field.

**Type**: `string`

```yaml
# Examples:
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]

# ---

args_mapping: root = [ meta("user.id") ]
```

### [](#batching)`batching`

Allows you to configure a [batching policy](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).

**Type**: `object`

```yaml
# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

### [](#batching-byte_size)`batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-check)`batching.check`

A [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
check: this.type == "end_of_transaction"
```

### [](#batching-count)`batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-period)`batching.period`

A period in which an incomplete batch should be flushed regardless of its size.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms
```

### [](#batching-processors)`batching.processors[]`

A list of [processors](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/about/) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

**Type**: `processor`

```yaml
# Examples:
processors:
  - archive:
      format: concatenate

# ---

processors:
  - archive:
      format: lines

# ---

processors:
  - archive:
      format: json_array
```

### [](#conn_max_idle)`conn_max_idle`

An optional maximum number of connections in the idle connection pool. If conn\_max\_open is greater than 0 but less than the new conn\_max\_idle, then the new conn\_max\_idle will be reduced to match the conn\_max\_open limit. If `value ⇐ 0`, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.

**Type**: `int`

**Default**: `2`

### [](#conn_max_idle_time)`conn_max_idle_time`

An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If `value ⇐ 0`, connections are not closed due to a connections idle time.

**Type**: `string`

### [](#conn_max_life_time)`conn_max_life_time`

An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If `value ⇐ 0`, connections are not closed due to a connections age.

**Type**: `string`

### [](#conn_max_open)`conn_max_open`

An optional maximum number of open connections to the database. If conn\_max\_idle is greater than 0 and the new conn\_max\_open is less than conn\_max\_idle, then conn\_max\_idle will be reduced to match the new conn\_max\_open limit. If `value ⇐ 0`, then there is no limit on the number of open connections. The default is 0 (unlimited).

**Type**: `int`

### [](#driver)`driver`

A database [driver](#drivers) to use.

**Type**: `string`

**Options**: `mysql`, `postgres`, `pgx`, `clickhouse`, `mssql`, `sqlite`, `oracle`, `snowflake`, `trino`, `gocosmos`, `spanner`, `databricks`

### [](#dsn)`dsn`

A Data Source Name to identify the target database.

#### [](#drivers)Drivers

The following is a list of supported drivers, their placeholder style, and their respective DSN formats:

| Driver | Data Source Name Format |
| --- | --- |
| clickhouse | clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&…​&paramN=valueN] |
| mysql | [username[:password]@][protocol[(address)]]/dbname[?param1=value1&…​&paramN=valueN] |
| postgres and pgx | postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&…​] |
| mssql | sqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&…​] |
| sqlite | file:/path/to/filename.db[?param&=value1&…​] |
| oracle | oracle://[username[:password]@][netloc][:port]/service_name?server=server2&server=server3 |
| snowflake | username[:password]@account_identifier/dbname/schemaname[?param1=value&…​&paramN=valueN] |
| trino | http[s]://user[:pass]@host[:port][?parameters] |
| gocosmos | AccountEndpoint=<cosmosdb-endpoint>;AccountKey=<cosmosdb-account-key>[;TimeoutMs=<timeout-in-ms>][;Version=<cosmosdb-api-version>][;DefaultDb/Db=<db-name>][;AutoId=<true/false>][;InsecureSkipVerify=<true/false>] |
| spanner | projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE] |
| databricks | token:<access-token>@<server-hostname>:<port>/<http-path> |

Please note that the `postgres` and `pgx` drivers enforce SSL by default, you can override this with the parameter `sslmode=disable` if required. The `pgx` driver is an alternative to the standard `postgres` (pq) driver and comes with extra functionality such as support for array insertion.

The `snowflake` driver supports multiple DSN formats. Please consult [the docs](https://pkg.go.dev/github.com/snowflakedb/gosnowflake#hdr-Connection_String) for more details. For [key pair authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication), the DSN has the following format: `<snowflake_user>@<snowflake_account>/<db_name>/<schema_name>?warehouse=<warehouse>&role=<role>&authenticator=snowflake_jwt&privateKey=<base64_url_encoded_private_key>`, where the value for the `privateKey` parameter can be constructed from an unencrypted RSA private key file `rsa_key.p8` using `openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0` (you can use `gbasenc` instead of `basenc` on OSX if you install `coreutils` via Homebrew). If you have a password-encrypted private key, you can decrypt it using `openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8`. Also, make sure fields such as the username are URL-encoded.

The [`gocosmos`](https://pkg.go.dev/github.com/microsoft/gocosmos) driver is still experimental, but it has support for [hierarchical partition keys](https://learn.microsoft.com/en-us/azure/cosmos-db/hierarchical-partition-keys) as well as [cross-partition queries](https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-query-container#cross-partition-query). Please refer to the [SQL notes](https://github.com/microsoft/gocosmos/blob/main/SQL.md) for details.

**Type**: `string`

```yaml
# Examples:
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60

# ---

dsn: foouser:foopassword@tcp(localhost:3306)/foodb

# ---

dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

# ---

dsn: oracle://foouser:foopass@localhost:1521/service_name

# ---

dsn: token:dapi1234567890ab@dbc-a1b2345c-d6e7.cloud.databricks.com:443/sql/1.0/warehouses/abc123def456
```

### [](#init_files)`init_files[]`

An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).

Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both `init_statement` and `init_files` are specified the `init_statement` is executed _after_ the `init_files`.

If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

**Type**: `array`

```yaml
# Examples:
init_files:
  - ./init/*.sql

# ---

init_files:
  - ./foo.sql
  - ./bar.sql
```

### [](#init_statement)`init_statement`

An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.

If both `init_statement` and `init_files` are specified the `init_statement` is executed _after_ the `init_files`.

If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

**Type**: `string`

```yaml
# Examples:
init_statement: |-

  CREATE TABLE IF NOT EXISTS some_table (
    foo varchar(50) not null,
    bar integer,
    baz varchar(50),
    primary key (foo)
  ) WITHOUT ROWID;
```

### [](#max_in_flight)`max_in_flight`

The maximum number of database statements to execute in parallel.

**Type**: `int`

**Default**: `64`

### [](#queries)`queries[]`

A list of database statements to run in addition to your main [`query`](#query). If you specify multiple queries, they are executed within a single transaction. For more information, see [Examples](#examples).

**Type**: `object`

### [](#queries-args_mapping)`queries[].args_mapping`

An optional [Bloblang mapping](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.

**Type**: `string`

```yaml
# Examples:
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]

# ---

args_mapping: root = [ meta("user.id") ]
```

### [](#queries-query)`queries[].query`

The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (`?`) whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2` and so on). The style to use is outlined in this table:

| Driver | Placeholder Style | |---|---| | `clickhouse` | Dollar sign | | `mysql` | Question mark | | `postgres` | Dollar sign | | `pgx` | Dollar sign | | `mssql` | Question mark | | `sqlite` | Question mark | | `oracle` | Colon | | `snowflake` | Question mark | | `trino` | Question mark | | `gocosmos` | Colon |

**Type**: `string`

### [](#queries-when)`queries[].when`

An optional [Bloblang mapping](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that, when set, is evaluated for each message to determine whether to execute this query. The mapping should return a boolean value. The first query in the list whose `when` condition evaluates to `true` (or that has no `when` condition) is executed. This enables conditional query routing based on message content or metadata without requiring `unsafe_dynamic_query`.

**Type**: `string`

```yaml
# Examples:
when: root = meta("kafka_tombstone_message") == "true"

# ---

when: root = this.operation == "delete"
```

### [](#query)`query`

The query to execute.

You must include the correct placeholders for the specified database driver. Some drivers use question marks (`?`), whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2`, and so on).

| Driver | Placeholder Style |
| --- | --- |
| clickhouse | Dollar sign ($) |
| gocosmos | Colon (:) |
| mysql | Question mark (?) |
| mssql | Question mark (?) |
| oracle | Colon (:) |
| postgres | Dollar sign ($) |
| snowflake | Question mark (?) |
| spanner | Question mark (?) |
| sqlite | Question mark (?) |
| trino | Question mark (?) |

**Type**: `string`

```yaml
# Examples:
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);
```

### [](#unsafe_dynamic_query)`unsafe_dynamic_query`

Whether to enable [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries) in the query. Great care should be made to ensure your queries are defended against injection attacks.

**Type**: `bool`

**Default**: `false`

## [](#examples)Examples

### [](#table-insert-mysql)Table Insert (MySQL)

Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:

```yaml
output:
  sql_raw:
    driver: mysql
    dsn: foouser:foopassword@tcp(localhost:3306)/foodb
    query: "INSERT INTO footable (id, name, topic) VALUES (?, ?, ?);"
    args_mapping: |
      root = [
        this.user.id,
        this.user.name,
        meta("kafka_topic"),
      ]
```

### [](#dynamically-creating-tables-postgresql)Dynamically Creating Tables (PostgreSQL)

Here we dynamically create output tables transactionally with inserting a record into the newly created table.

```yaml
output:
  processors:
    - mapping: |
        root = this
        # Prevent SQL injection when using unsafe_dynamic_query
        meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
  sql_raw:
    driver: postgres
    dsn: postgres://localhost/postgres
    unsafe_dynamic_query: true
    queries:
      - query: |
          CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
      - query: |
          INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
          ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
        args_mapping: |
          root = [ this.id, this.document.string() ]
```

### [](#conditional-cdc-queries-postgresql)Conditional CDC Queries (PostgreSQL)

Route messages to different SQL operations based on message metadata. Tombstone messages trigger a DELETE, while all other messages perform an upsert. All operations within a batch execute in a single transaction, ordered by Kafka partition.

```yaml
output:
  sql_raw:
    driver: postgres
    dsn: postgres://localhost/postgres
    max_in_flight: 8
    batching:
      count: 100
      period: 100ms
    queries:
      - when: 'root = meta("kafka_tombstone_message") == "true"'
        query: 'DELETE FROM users WHERE id = $1'
        args_mapping: 'root = [this.id]'
      - query: |
          INSERT INTO users (id, name, updated_at)
          VALUES ($1, $2, $3)
          ON CONFLICT (id) DO UPDATE SET
            name = EXCLUDED.name,
            updated_at = EXCLUDED.updated_at
        args_mapping: 'root = [this.id, this.name, this.updated_at]'
```