# sql_select

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [connect-full.txt](https://docs.redpanda.com/connect-full.txt)

---
title: sql_select
latest-connect-version: 4.93.0
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-redpanda-tag: v26.1.9
docname: inputs/sql_select
page-component-name: connect
page-version: master
page-component-version: master
page-component-title: Connect
page-relative-src-path: inputs/sql_select.adoc
page-edit-url: https://github.com/redpanda-data/rp-connect-docs/edit/main/modules/components/pages/inputs/sql_select.adoc
page-git-created-date: "2024-05-24"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/connect/components/inputs/sql_select.md -->

**Type:** Input ▼

[Input](https://docs.redpanda.com/connect/components/inputs/sql_select/)[Processor](https://docs.redpanda.com/connect/components/processors/sql_select/)

**Available in:** [Cloud](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/sql_select/%20%22View%20the%20Cloud%20version%20of%20this%20component%22), Self-Managed

Executes a select query and creates a message for each row received.

Introduced in version 3.59.0.

#### Common

```yml
inputs:
  label: ""
  sql_select:
    driver: "" # No default (required)
    dsn: "" # No default (required)
    table: "" # No default (required)
    columns: [] # No default (required)
    where: "" # No default (optional)
    args_mapping: "" # No default (optional)
    auto_replay_nacks: true
```

#### Advanced

```yml
inputs:
  label: ""
  sql_select:
    driver: "" # No default (required)
    dsn: "" # No default (required)
    table: "" # No default (required)
    columns: [] # No default (required)
    where: "" # No default (optional)
    args_mapping: "" # No default (optional)
    prefix: "" # No default (optional)
    suffix: "" # No default (optional)
    auto_replay_nacks: true
    init_files: [] # No default (optional)
    init_statement: "" # No default (optional)
    conn_max_idle_time: "" # No default (optional)
    conn_max_life_time: "" # No default (optional)
    conn_max_idle: 2
    conn_max_open: "" # No default (optional)
```

Once the rows from the query are exhausted this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a [sequence](https://docs.redpanda.com/connect/components/inputs/sequence/) to execute).

## [](#examples)Examples

### [](#consume-a-table-postgresql)Consume a Table (PostgreSQL)

Here we define a pipeline that will consume all rows from a table created within the last hour by comparing the unix timestamp stored in the row column "created\_at":

```yaml
input:
  sql_select:
    driver: postgres
    dsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disable
    table: footable
    columns: [ '*' ]
    where: created_at >= ?
    args_mapping: |
      root = [
        now().ts_unix() - 3600
      ]
```

## [](#fields)Fields

### [](#args_mapping)`args_mapping`

An optional [Bloblang mapping](https://docs.redpanda.com/connect/guides/bloblang/about/) which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `where`.

**Type**: `string`

```yaml
# Examples:
args_mapping: root = [ "article", now().ts_format("2006-01-02") ]
```

### [](#auto_replay_nacks)`auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

**Type**: `bool`

**Default**: `true`

### [](#columns)`columns[]`

A list of columns to select.

**Type**: `array`

```yaml
# Examples:
columns:
  - "*"

# ---

columns:
  - foo
  - bar
  - baz
```

### [](#conn_max_idle)`conn_max_idle`

An optional maximum number of connections in the idle connection pool. If conn\_max\_open is greater than 0 but less than the new conn\_max\_idle, then the new conn\_max\_idle will be reduced to match the conn\_max\_open limit. If `value ⇐ 0`, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.

**Type**: `int`

**Default**: `2`

### [](#conn_max_idle_time)`conn_max_idle_time`

An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If `value ⇐ 0`, connections are not closed due to a connections idle time.

**Type**: `string`

### [](#conn_max_life_time)`conn_max_life_time`

An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If `value ⇐ 0`, connections are not closed due to a connections age.

**Type**: `string`

### [](#conn_max_open)`conn_max_open`

An optional maximum number of open connections to the database. If conn\_max\_idle is greater than 0 and the new conn\_max\_open is less than conn\_max\_idle, then conn\_max\_idle will be reduced to match the new conn\_max\_open limit. If `value ⇐ 0`, then there is no limit on the number of open connections. The default is 0 (unlimited).

**Type**: `int`

### [](#driver)`driver`

A database [driver](#drivers) to use.

**Type**: `string`

**Options**: `mysql`, `postgres`, `pgx`, `clickhouse`, `mssql`, `sqlite`, `oracle`, `snowflake`, `trino`, `gocosmos`, `spanner`, `databricks`

### [](#dsn)`dsn`

A Data Source Name to identify the target database.

#### [](#drivers)Drivers

The following is a list of supported drivers, their placeholder style, and their respective DSN formats:

| Driver | Data Source Name Format |
| --- | --- |
| clickhouse | clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&…​&paramN=valueN] |
| mysql | [username[:password]@][protocol[(address)]]/dbname[?param1=value1&…​&paramN=valueN] |
| postgres and pgx | postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&…​] |
| mssql | sqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&…​] |
| sqlite | file:/path/to/filename.db[?param&=value1&…​] |
| oracle | oracle://[username[:password]@][netloc][:port]/service_name?server=server2&server=server3 |
| snowflake | username[:password]@account_identifier/dbname/schemaname[?param1=value&…​&paramN=valueN] |
| trino | http[s]://user[:pass]@host[:port][?parameters] |
| gocosmos | AccountEndpoint=<cosmosdb-endpoint>;AccountKey=<cosmosdb-account-key>[;TimeoutMs=<timeout-in-ms>][;Version=<cosmosdb-api-version>][;DefaultDb/Db=<db-name>][;AutoId=<true/false>][;InsecureSkipVerify=<true/false>] |
| spanner | projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE] |
| databricks | token:<access-token>@<server-hostname>:<port>/<http-path> |

Please note that the `postgres` and `pgx` drivers enforce SSL by default, you can override this with the parameter `sslmode=disable` if required. The `pgx` driver is an alternative to the standard `postgres` (pq) driver and comes with extra functionality such as support for array insertion.

The `snowflake` driver supports multiple DSN formats. Please consult [the docs](https://pkg.go.dev/github.com/snowflakedb/gosnowflake#hdr-Connection_String) for more details. For [key pair authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication), the DSN has the following format: `<snowflake_user>@<snowflake_account>/<db_name>/<schema_name>?warehouse=<warehouse>&role=<role>&authenticator=snowflake_jwt&privateKey=<base64_url_encoded_private_key>`, where the value for the `privateKey` parameter can be constructed from an unencrypted RSA private key file `rsa_key.p8` using `openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0` (you can use `gbasenc` instead of `basenc` on OSX if you install `coreutils` via Homebrew). If you have a password-encrypted private key, you can decrypt it using `openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8`. Also, make sure fields such as the username are URL-encoded.

The [`gocosmos`](https://pkg.go.dev/github.com/microsoft/gocosmos) driver is still experimental, but it has support for [hierarchical partition keys](https://learn.microsoft.com/en-us/azure/cosmos-db/hierarchical-partition-keys) as well as [cross-partition queries](https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-query-container#cross-partition-query). Please refer to the [SQL notes](https://github.com/microsoft/gocosmos/blob/main/SQL.md) for details.

**Type**: `string`

```yaml
# Examples:
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60

# ---

dsn: foouser:foopassword@tcp(localhost:3306)/foodb

# ---

dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

# ---

dsn: oracle://foouser:foopass@localhost:1521/service_name

# ---

dsn: token:dapi1234567890ab@dbc-a1b2345c-d6e7.cloud.databricks.com:443/sql/1.0/warehouses/abc123def456
```

### [](#init_files)`init_files[]`

An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).

Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both `init_statement` and `init_files` are specified the `init_statement` is executed _after_ the `init_files`.

If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Requires version 4.10.0 or later.

**Type**: `array`

```yaml
# Examples:
init_files:
  - ./init/*.sql

# ---

init_files:
  - ./foo.sql
  - ./bar.sql
```

### [](#init_statement)`init_statement`

An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.

If both `init_statement` and `init_files` are specified the `init_statement` is executed _after_ the `init_files`.

If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Requires version 4.10.0 or later.

**Type**: `string`

```yaml
# Examples:
init_statement: |-

  CREATE TABLE IF NOT EXISTS some_table (
    foo varchar(50) not null,
    bar integer,
    baz varchar(50),
    primary key (foo)
  ) WITHOUT ROWID;
```

### [](#prefix)`prefix`

An optional prefix to prepend to the select query (before SELECT).

**Type**: `string`

### [](#suffix)`suffix`

An optional suffix to append to the select query.

**Type**: `string`

### [](#table)`table`

The table to select from.

**Type**: `string`

```yaml
# Examples:
table: foo
```

### [](#where)`where`

An optional where clause to add. Placeholder arguments are populated with the `args_mapping` field. Placeholders should always be question marks, and will automatically be converted to dollar syntax when the postgres or clickhouse drivers are used.

**Type**: `string`

```yaml
# Examples:
where: type = ? and created_at > ?

# ---

where: user_id = ?
```