redpanda_common
|
Deprecated in 4.68.0
This component is deprecated and will be removed in the next major version release. Please consider moving onto the unified |
Sends data to a Redpanda (Kafka) broker, using credentials from a common redpanda configuration block.
To avoid duplicating Redpanda cluster credentials in your redpanda_common input, output, or any other components in your data pipeline, you can use a single redpanda configuration block. For more details, see the Pipeline example.
Introduced in version 4.39.0.
If you need to move topic data between Redpanda clusters or other Apache Kafka clusters, consider using the redpanda input and output instead.
|
-
Common
-
Advanced
outputs:
label: ""
redpanda_common:
topic: "" # No default (required)
key: "" # No default (optional)
partition: "" # No default (optional)
metadata:
include_prefixes: []
include_patterns: []
max_in_flight: 10
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
outputs:
label: ""
redpanda_common:
topic: "" # No default (required)
key: "" # No default (optional)
partition: "" # No default (optional)
metadata:
include_prefixes: []
include_patterns: []
timestamp_ms: "" # No default (optional)
max_in_flight: 10
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Pipeline example
This data pipeline reads data from topic_A and topic_B on a Redpanda cluster, and then writes the data to topic_C on the same cluster. The cluster details are configured within the redpanda configuration block, so you only need to configure them once. This is a useful feature when you have multiple inputs and outputs in the same data pipeline that need to connect to the same cluster.
input:
redpanda_common:
topics: [ topic_A, topic_B ]
output:
redpanda_common:
topic: topic_C
key: ${! @id }
redpanda:
seed_brokers: [ "127.0.0.1:9092" ]
tls:
enabled: true
sasl:
- mechanism: SCRAM-SHA-512
password: bar
username: foo
Fields
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.
Type: int
Default: 0
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.
Type: int
Default: 0
batching.period
The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
key
A key to populate for each message (optional). This field supports interpolation functions.
Type: string
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this number to improve throughput until performance plateaus.
Type: int
Default: 10
metadata
Configure which metadata values are added to messages as headers. This allows you to pass additional context information along with your messages.
Type: object
metadata.include_patterns[]
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
Type: array
Default: []
# Examples:
include_patterns:
- .*
# ---
include_patterns:
- _timestamp_unix$
metadata.include_prefixes[]
Provide a list of explicit metadata key prefixes to match against.
Type: array
Default: []
# Examples:
include_prefixes:
- foo_
- bar_
# ---
include_prefixes:
- kafka_
# ---
include_prefixes:
- content-
partition
Set a partition for each message (optional). This field is only relevant when the partitioner is set to manual.
This field supports interpolation functions.
You must provide an interpolation string that is a valid integer.
Type: string
# Examples:
partition: ${! meta("partition") }
timestamp_ms
Set a timestamp (in milliseconds) for each message (optional). When left empty, the current timestamp is used. This field supports interpolation functions.
Type: string
# Examples:
timestamp_ms: ${! timestamp_unix_milli() }
# ---
timestamp_ms: ${! metadata("kafka_timestamp_ms") }