Connect

doris_stream_load

Writes batches of messages into Apache Doris using Stream Load.

Introduced in version 4.96.0.

Each output batch is encoded into a single Doris Stream Load request. The output first contacts a Doris frontend (FE) node and follows the Stream Load redirect to a backend (BE) node before uploading the batch body. A batch is only acknowledged when Doris reports success.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

  • Common

  • Advanced

outputs:
  label: ""
  doris_stream_load:
    fe_urls: []
    database: "" # No default (required)
    table: "" # No default (required)
    username: "" # No default (required)
    password: "" # No default (required)
    format: json
    read_json_by_line: true
    strip_outer_array: false
    columns: []
    label_prefix: redpanda_connect
    timeout: 30s
    max_in_flight: 8
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
outputs:
  label: ""
  doris_stream_load:
    url: "" # No default (optional)
    fe_urls: []
    database: "" # No default (required)
    table: "" # No default (required)
    username: "" # No default (required)
    password: "" # No default (required)
    query_port: 9030
    format: json
    read_json_by_line: true
    strip_outer_array: false
    jsonpaths: ""
    json_root: ""
    columns: []
    where: ""
    column_separator: ,
    line_delimiter: 

    group_commit: ""
    max_filter_ratio: 0
    partitions: []
    temporary_partitions: []
    skip_lines: 0
    empty_field_as_null: false
    trim_double_quotes: false
    num_as_string: false
    fuzzy_parse: false
    label_prefix: redpanda_connect
    headers: {}
    strict_mode: false
    timeout: 30s
    max_in_flight: 8
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

Fields

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

column_separator

Column separator used for csv format.

Type: string

Default: ,

columns[]

Optional Doris columns header. When set the values are joined with commas.

Type: array

Default: []

database

Target Doris database.

Type: string

# Examples:
database: test_db

empty_field_as_null

Whether Doris should treat empty input fields as NULL.

Type: bool

Default: false

fe_urls[]

A list of Doris FE HTTP URLs. The sink will try these FE endpoints in order with per-request failover, and the starting FE is rotated across requests.

Type: array

Default: []

# Examples:
fe_urls:
  - "http://fe1:8030"
  - "http://fe2:8030"

format

Body format sent to Doris Stream Load.

Type: string

Default: json

Options: json, csv

fuzzy_parse

Whether Doris should enable fuzzy_parse for JSON inputs.

Type: bool

Default: false

group_commit

Optional Doris group_commit mode. Valid values are sync_mode, async_mode, and off_mode. When omitted Doris uses the server default behavior.

Type: string

Default: ""

headers

Optional additional static Stream Load headers. Reserved Doris headers configured by this component take precedence.

Type: string

Default: {}

json_root

Optional Doris json_root header value.

Type: string

Default: ""

jsonpaths

Optional Doris jsonpaths header value.

Type: string

Default: ""

label_prefix

Prefix used when generating Doris labels.

Type: string

Default: redpanda_connect

line_delimiter

Line delimiter used for csv batches.

Type: string

Default: ` `

max_filter_ratio

Optional Doris max_filter_ratio header value.

Type: float

Default: 0

max_in_flight

Maximum number of parallel in-flight Doris Stream Load requests.

Type: int

Default: 8

num_as_string

Whether Doris should treat JSON numbers as strings.

Type: bool

Default: false

partitions[]

Optional Doris partitions header. Values are joined with commas.

Type: array

Default: []

password

Doris password.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

query_port

Doris FE MySQL query port, used by ConnectionTest to verify that the target database and table exist. Set to 0 to disable query-port checks.

Type: int

Default: 9030

read_json_by_line

Encode a JSON batch as newline-delimited JSON and set the Doris header read_json_by_line=true.

Type: bool

Default: true

skip_lines

Optional Doris skip_lines header.

Type: int

Default: 0

strict_mode

Whether to set Doris strict_mode=true.

Type: bool

Default: false

strip_outer_array

Encode a JSON batch as a single JSON array and set the Doris header strip_outer_array=true.

Type: bool

Default: false

table

Target Doris table.

Type: string

# Examples:
table: events

temporary_partitions[]

Optional Doris temporary_partitions header. Values are joined with commas.

Type: array

Default: []

timeout

Timeout for each Doris HTTP request.

Type: string

Default: 30s

trim_double_quotes

Whether Doris should trim double quotes in CSV/text imports.

Type: bool

Default: false

url

Backward-compatible single Doris FE HTTP URL, for example http://fe_host:8030. When fe_urls is provided it takes precedence.

Type: string

# Examples:
url: http://127.0.0.1:8030

username

Doris username.

Type: string

# Examples:
username: root

where

Optional Doris where header for filtering imported rows.

Type: string

Default: ""

Examples

JSON lines from stdin

Read newline-delimited JSON messages and write them into Doris using one Stream Load request per batch.

input:
  stdin:
    scanner:
      lines: {}

output:
  doris_stream_load:
    url: http://127.0.0.1:8030
    database: test_db
    table: events
    username: root
    password: secret
    format: json
    read_json_by_line: true
    columns: [id, name, ts]