gcp_spanner_cdc
Creates an input that consumes from a spanner change stream.
Introduced in version 4.56.0.
-
Common
-
Advanced
inputs:
label: ""
gcp_spanner_cdc:
credentials_json: ""
project_id: "" # No default (required)
instance_id: "" # No default (required)
database_id: "" # No default (required)
stream_id: "" # No default (required)
start_timestamp: ""
end_timestamp: ""
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
auto_replay_nacks: true
inputs:
label: ""
gcp_spanner_cdc:
credentials_json: ""
project_id: "" # No default (required)
instance_id: "" # No default (required)
database_id: "" # No default (required)
stream_id: "" # No default (required)
start_timestamp: ""
end_timestamp: ""
heartbeat_interval: 10s
metadata_table: ""
min_watermark_cache_ttl: 5s
allowed_mod_types: [] # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
auto_replay_nacks: true
Consumes change records from a Google Cloud Spanner change stream. This input allows you to track and process database changes in real-time, making it useful for data replication, event-driven architectures, and maintaining derived data stores.
The input reads from a specified change stream within a Spanner database and converts each change record into a message. The message payload contains the change records in JSON format, and metadata is added with details about the Spanner instance, database, and stream.
Change streams provide a way to track mutations to your Spanner database tables. For more information about Spanner change streams, refer to the Google Cloud documentation.
Fields
allowed_mod_types[]
List of modification types to process. If not specified, all modification types are processed. Allowed values: INSERT, UPDATE, DELETE
Type: array
# Examples:
allowed_mod_types:
- INSERT
- UPDATE
- DELETE
auto_replay_nacks
Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.
Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams, as the original shape of the data is discarded immediately upon consumption and mutation.
Type: bool
Default: true
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
The maximum total size (in bytes) that a batch can reach before it is passed on for processing or delivery (flushed). When the combined size of all messages in the batch exceeds this limit, the batch is immediately sent to the next stage (such as a processor or output).
Set to 0 to disable size-based batching. When disabled, messages are flushed based on other conditions (such as batching.count or batching.period).
Type: int
Default: 0
batching.check
A Bloblang query that returns a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
The number of messages at which the batch should be flushed. Set the value to 0 to disable count-based batching.
Type: int
Default: 0
batching.period
The length of time after which an incomplete batch should be flushed regardless of its size. Supported time units are ns, us, ms, s, m, and h. For example, 1s flushes a batch after one second.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, so any attempt to split it into smaller batches with these processors will be ignored.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
credentials_json
Base64-encoded JSON credentials file for authenticating to GCP with a service account. If not provided, Application Default Credentials (ADC) is used.
For more information about how to create a service account and obtain the credentials JSON, see the Google Cloud documentation.
Type: string
Default: ""
database_id
The ID of the Spanner database to read from. This is the name of the database as it appears in the Spanner console or API.
For more information about how to create a Spanner database, see the Google Cloud documentation.
Type: string
end_timestamp
The timestamp at which to stop reading change records from the change stream. This is an optional field that allows you to limit the range of change records processed by the input.
The timestamp should be in RFC3339 format, such as 2023-10-01T00:00:00Z. If not provided, the input reads all available change records up to the current time.
Type: string
Default: ""
# Examples:
end_timestamp: 2022-01-01T00:00:00Z
heartbeat_interval
The interval at which to send heartbeat messages to the output. Heartbeat messages are sent to indicate that the input is still active and processing changes. This can help prevent timeouts in downstream systems.
Supported time units are ns, us, ms, s, m, and h. For example, 1s sends a heartbeat every second.
Type: string
Default: 10s
instance_id
The ID of the Spanner instance to read from. This is the name of the instance as it appears in the Spanner console or API.
For more information about how to create a Spanner instance, see the Google Cloud documentation.
Type: string
metadata_table
The table to store metadata in (default: cdc_metadata_<stream_id>).
Type: string
Default: ""
min_watermark_cache_ttl
Sets how frequently to query Spanner for the minimum watermark.
Type: string
Default: 5s
project_id
The ID of the GCP project that contains the Spanner instance and database. This is the name of the project as it appears in the GCP console or API.
For more information about how to create a GCP project, see the Google Cloud documentation.
Type: string
start_timestamp
The timestamp at which to start reading change records from the change stream. This is an optional field that allows you to limit the range of change records processed by the input.
The timestamp should be in RFC3339 format, such as 2023-10-01T00:00:00Z (default: current time).
Type: string
Default: ""
# Examples:
start_timestamp: 2022-01-01T00:00:00Z
stream_id
The name of the change stream to track. The stream must exist in the Spanner database. To create a change stream, follow the Google Cloud documentation.
Type: string