Docs Connect Components Inputs gcp_spanner_cdc gcp_spanner_cdc Available in: Self-Managed License: This component requires an enterprise license. You can either upgrade to an Enterprise Edition license, or generate a trial license key that's valid for 30 days. Creates an input that consumes from a spanner change stream. Introduced in version 4.56.0. Common Advanced input: label: "" gcp_spanner_cdc: credentials_json: "" project_id: "" # No default (required) instance_id: "" # No default (required) database_id: "" # No default (required) stream_id: "" # No default (required) start_timestamp: "" end_timestamp: "" batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) auto_replay_nacks: true input: label: "" gcp_spanner_cdc: credentials_json: "" project_id: "" # No default (required) instance_id: "" # No default (required) database_id: "" # No default (required) stream_id: "" # No default (required) start_timestamp: "" end_timestamp: "" heartbeat_interval: 10s metadata_table: "" min_watermark_cache_ttl: 5s allowed_mod_types: [] # No default (optional) batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) auto_replay_nacks: true Consumes change records from a Google Cloud Spanner change stream. This input allows you to track and process database changes in real-time, making it useful for data replication, event-driven architectures, and maintaining derived data stores. The input reads from a specified change stream within a Spanner database and converts each change record into a message. The message payload contains the change records in JSON format, and metadata is added with details about the Spanner instance, database, and stream. Change streams provide a way to track mutations to your Spanner database tables. For more information about Spanner change streams, refer to the Google Cloud documentation. Fields allowed_mod_types[] List of modification types to process. If not specified, all modification types are processed. Allowed values: INSERT, UPDATE, DELETE Type: array # Examples: allowed_mod_types: - INSERT - UPDATE - DELETE auto_replay_nacks Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams, as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true batching Allows you to configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size The maximum total size (in bytes) that a batch can reach before it is passed on for processing or delivery (flushed). When the combined size of all messages in the batch exceeds this limit, the batch is immediately sent to the next stage (such as a processor or output). Set to 0 to disable size-based batching. When disabled, messages are flushed based on other conditions (such as batching.count or batching.period). Type: int Default: 0 batching.check A Bloblang query that returns a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count The number of messages at which the batch should be flushed. Set the value to 0 to disable count-based batching. Type: int Default: 0 batching.period The length of time after which an incomplete batch should be flushed regardless of its size. Supported time units are ns, us, ms, s, m, and h. For example, 1s flushes a batch after one second. Type: string Default: "" # Examples: period: 1s period: 1m period: 500ms batching.processors[] A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, so any attempt to split it into smaller batches with these processors will be ignored. Type: processor # Examples: processors: - archive: format: concatenate - archive: format: lines - archive: format: json_array credentials_json Base64-encoded JSON credentials file for authenticating to GCP with a service account. If not provided, Application Default Credentials (ADC) is used. For more information about how to create a service account and obtain the credentials JSON, see the Google Cloud documentation. Type: string Default: "" database_id The ID of the Spanner database to read from. This is the name of the database as it appears in the Spanner console or API. For more information about how to create a Spanner database, see the Google Cloud documentation. Type: string end_timestamp The timestamp at which to stop reading change records from the change stream. This is an optional field that allows you to limit the range of change records processed by the input. The timestamp should be in RFC3339 format, such as 2023-10-01T00:00:00Z. If not provided, the input reads all available change records up to the current time. Type: string Default: "" # Examples: end_timestamp: 2022-01-01T00:00:00Z heartbeat_interval The interval at which to send heartbeat messages to the output. Heartbeat messages are sent to indicate that the input is still active and processing changes. This can help prevent timeouts in downstream systems. Supported time units are ns, us, ms, s, m, and h. For example, 1s sends a heartbeat every second. Type: string Default: 10s instance_id The ID of the Spanner instance to read from. This is the name of the instance as it appears in the Spanner console or API. For more information about how to create a Spanner instance, see the Google Cloud documentation. Type: string metadata_table The table to store metadata in (default: cdc_metadata_<stream_id>). Type: string Default: "" min_watermark_cache_ttl Sets how frequently to query Spanner for the minimum watermark. Type: string Default: 5s project_id The ID of the GCP project that contains the Spanner instance and database. This is the name of the project as it appears in the GCP console or API. For more information about how to create a GCP project, see the Google Cloud documentation. Type: string start_timestamp The timestamp at which to start reading change records from the change stream. This is an optional field that allows you to limit the range of change records processed by the input. The timestamp should be in RFC3339 format, such as 2023-10-01T00:00:00Z (default: current time). Type: string Default: "" # Examples: start_timestamp: 2022-01-01T00:00:00Z stream_id The name of the change stream to track. The stream must exist in the Spanner database. To create a change stream, follow the Google Cloud documentation. Type: string Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! gcp_pubsub generate