salesforce_cdc
Subscribes to one or more Salesforce Pub/Sub topics in parallel and emits a message per event. Topics may be:
-
/data/<sObject>ChangeEvent— per-sObject CDC channel. -
/data/ChangeEvents— CDC firehose (every CDC-enabled sObject). -
/event/<EventName>__e— custom Platform Event. -
/event/<StandardEventName>— standard Platform Event (e.g.LoginEventStream). -
A bare sObject name (e.g.
Account) is shorthand for/data/AccountChangeEvent.
Optionally runs a REST snapshot for the CDC sObjects before opening the streaming subscriptions, so the pipeline sees the current state plus continuous changes. Per-topic replay state persists in a cache resource so each subscription resumes across restarts independently.
When to use this input
Use salesforce_cdc for:
-
Continuous ingestion with both historical state (snapshot) and live changes (CDC).
-
Real-time custom or standard Platform Events.
-
Mixed CDC + Platform Event pipelines under a single component.
Use a different Salesforce input instead if:
-
You only need a one-off extract or periodic SOQL query — use
salesforce. -
You need a GraphQL query (cross-object in one request) — use
salesforce_graphql.
-
Common
-
Advanced
inputs:
label: ""
salesforce_cdc:
org_url: "" # No default (required)
client_id: "" # No default (required)
client_secret: "" # No default (required)
api_version: v65.0
topics: [] # No default (required)
stream_snapshot: true
replay_preset: latest
snapshot_max_batch_size: 2000
stream_batch_size: 100
max_parallel_snapshot_objects: 1
checkpoint_cache: "" # No default (required)
checkpoint_cache_key: salesforce_cdc
checkpoint_limit: 1024
auto_replay_nacks: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
inputs:
label: ""
salesforce_cdc:
org_url: "" # No default (required)
client_id: "" # No default (required)
client_secret: "" # No default (required)
api_version: v65.0
topics: [] # No default (required)
stream_snapshot: true
replay_preset: latest
snapshot_max_batch_size: 2000
stream_batch_size: 100
max_parallel_snapshot_objects: 1
checkpoint_cache: "" # No default (required)
checkpoint_cache_key: salesforce_cdc
checkpoint_limit: 1024
auto_replay_nacks: true
grpc:
reconnect_base_delay: 500ms
reconnect_max_delay: 30s
reconnect_max_attempts: 0
shutdown_timeout: 10s
buffer_size: 1000
http:
timeout: 5s
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: ""
root_cas_file: ""
client_certs: []
proxy_url: ""
disable_http2: false
tps_limit: 0
tps_burst: 1
backoff:
initial_interval: 1s
max_interval: 30s
max_retries: 3
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
http:
max_idle_conns: 100
max_idle_conns_per_host: 0
max_conns_per_host: 64
idle_conn_timeout: 1m30s
tls_handshake_timeout: 10s
expect_continue_timeout: 1s
response_header_timeout: 0s
disable_keep_alives: false
disable_compression: false
max_response_header_bytes: 1048576
max_response_body_bytes: 10485760
write_buffer_size: 4096
read_buffer_size: 4096
h2:
strict_max_concurrent_requests: false
max_decoder_header_table_size: 4096
max_encoder_header_table_size: 4096
max_read_frame_size: 16384
max_receive_buffer_per_connection: 1048576
max_receive_buffer_per_stream: 1048576
send_ping_timeout: 0s
ping_timeout: 15s
write_byte_timeout: 0s
access_log_level: ""
access_log_body_limit: 0
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Fields
api_version
Salesforce REST API version to target, prefixed with v. Affects endpoint paths (/services/data/{api_version}/…) and available fields/objects. Must be supported by your org — check Setup → Company Information. Older versions may lack recent fields.
Type: string
Default: v65.0
# Examples:
api_version: v65.0
# ---
api_version: v62.0
auto_replay_nacks
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Type: bool
Default: true
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
checkpoint_cache
Name of the cache resource used to persist snapshot cursor and per-topic replay IDs across restarts. The cache must be declared under the top-level cache_resources block. Choose a durable cache (Redis, Postgres, DynamoDB) for production; in-memory caches lose checkpoints on restart.
Type: string
# Examples:
checkpoint_cache: persistent_cache
checkpoint_cache_key
Key inside the checkpoint cache where this input’s state is stored. Change when running multiple salesforce_cdc inputs against the same cache resource to avoid collisions.
Type: string
Default: salesforce_cdc
checkpoint_limit
Maximum number of unacknowledged batches in flight (per topic) before that topic pauses reading. Prevents unbounded memory growth when downstream components stall. Higher values increase throughput in steady state; lower values bound memory under backpressure.
Type: int
Default: 1024
client_id
Consumer Key of the Salesforce Connected App authorized for the OAuth Client Credentials flow. Create the Connected App under Setup → App Manager → New Connected App, enable OAuth settings, enable the Client Credentials Flow under Flow Enablement, then copy the Consumer Key from Manage Consumer Details.
Type: string
client_secret
Consumer Secret of the Salesforce Connected App, paired with client_id. Sensitive — prefer environment variable interpolation (${SALESFORCE_CLIENT_SECRET}) over inlining.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
grpc.reconnect_max_attempts
Maximum number of gRPC reconnection attempts. 0 means unlimited.
Type: int
Default: 0
http
HTTP client configuration for Salesforce REST calls (OAuth token endpoint and, where applicable, data queries).
Type: object
http.access_log_body_limit
Maximum bytes of request/response body to include in logs. 0 to skip body logging.
Type: int
Default: 0
http.access_log_level
Log level for HTTP request/response logging. Empty disables logging.
Type: string
Default: ""
Options: `, `TRACE, DEBUG, INFO, WARN, ERROR
http.backoff
Adaptive backoff configuration for 429 (Too Many Requests) responses. Always active.
Type: object
http.backoff.initial_interval
Initial interval between retries on 429 responses.
Type: string
Default: 1s
http.backoff.max_interval
Maximum interval between retries on 429 responses.
Type: string
Default: 30s
http.http
HTTP transport settings controlling connection pooling, timeouts, and HTTP/2.
Type: object
http.http.disable_compression
Disable automatic decompression of gzip responses.
Type: bool
Default: false
http.http.disable_keep_alives
Disable HTTP keep-alive connections; each request uses a new connection.
Type: bool
Default: false
http.http.expect_continue_timeout
Maximum time to wait for a server’s 100-continue response before sending the body. 0 means the body is sent immediately.
Type: string
Default: 1s
http.http.h2.max_decoder_header_table_size
Upper limit in bytes for the HPACK header table used to decode headers from the peer. Must be less than 4 MiB.
Type: int
Default: 4096
http.http.h2.max_encoder_header_table_size
Upper limit in bytes for the HPACK header table used to encode headers sent to the peer. Must be less than 4 MiB.
Type: int
Default: 4096
http.http.h2.max_read_frame_size
Largest HTTP/2 frame this endpoint will read. Valid range: 16 KiB to 16 MiB.
Type: int
Default: 16384
http.http.h2.max_receive_buffer_per_connection
Maximum flow-control window size in bytes for data received on a connection. Must be at least 64 KiB and less than 4 MiB.
Type: int
Default: 1048576
http.http.h2.max_receive_buffer_per_stream
Maximum flow-control window size in bytes for data received on a single stream. Must be less than 4 MiB.
Type: int
Default: 1048576
http.http.h2.ping_timeout
Timeout waiting for a PING response before closing the connection.
Type: string
Default: 15s
http.http.h2.send_ping_timeout
Idle timeout after which a PING frame is sent to verify connection health. 0 disables health checks.
Type: string
Default: 0s
http.http.h2.strict_max_concurrent_requests
When true, new requests block when a connection’s concurrency limit is reached instead of opening a new connection.
Type: bool
Default: false
http.http.h2.write_byte_timeout
Timeout for writing data to a connection. The timer resets whenever bytes are written. 0 disables the timeout.
Type: string
Default: 0s
http.http.idle_conn_timeout
How long an idle connection remains in the pool before being closed. 0 disables the timeout.
Type: string
Default: 1m30s
http.http.max_conns_per_host
Maximum total connections (active + idle) per host. 0 means unlimited.
Type: int
Default: 64
http.http.max_idle_conns
Maximum total number of idle (keep-alive) connections across all hosts. 0 means unlimited.
Type: int
Default: 100
http.http.max_idle_conns_per_host
Maximum idle connections to keep per host. 0 (the default) uses GOMAXPROCS+1.
Type: int
Default: 0
http.http.max_response_body_bytes
Maximum bytes of response body the client will read. The response body is wrapped with a limit reader; reads beyond this cap return EOF. 0 disables the limit.
Type: int
Default: 10485760
http.http.max_response_header_bytes
Maximum bytes of response headers to allow.
Type: int
Default: 1048576
http.http.response_header_timeout
Maximum time to wait for response headers after writing the full request. 0 disables the timeout.
Type: string
Default: 0s
http.http.tls_handshake_timeout
Maximum time to wait for a TLS handshake to complete. 0 disables the timeout.
Type: string
Default: 10s
http.http.write_buffer_size
Size in bytes of the per-connection write buffer.
Type: int
Default: 4096
http.tcp.connect_timeout
Maximum amount of time a dial will wait for a connect to complete. Zero disables.
Type: string
Default: 0s
http.tcp.keep_alive.count
Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.
Type: int
Default: 9
http.tcp.keep_alive.idle
Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.
Type: string
Default: 15s
http.tcp.keep_alive.interval
Duration between keep-alive probes. Zero defaults to 15s.
Type: string
Default: 15s
http.tcp.tcp_user_timeout
Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.
Type: string
Default: 0s
http.tls.client_certs[]
A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
http.tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
http.tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
http.tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Type: bool
Default: false
http.tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
http.tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
http.tls.skip_cert_verify
Whether to skip server side certificate verification.
Type: bool
Default: false
max_parallel_snapshot_objects
Number of sObjects snapshotted concurrently during the REST snapshot phase. Each in-flight snapshot consumes one HTTP connection and Salesforce API call quota. Default 1 serializes the work — raise when snapshotting many sObjects and your API limits permit.
Type: int
Default: 1
org_url
Salesforce instance base URL for your org, protocol included and no trailing slash. Used as the base for both the OAuth token endpoint and REST queries. Production orgs use https://{my-domain}.my.salesforce.com; sandboxes use https://{my-domain}.sandbox.my.salesforce.com. Legacy instance URLs (https://na123.salesforce.com) still work but My Domain URLs are strongly recommended by Salesforce.
Type: string
# Examples:
org_url: https://acme.my.salesforce.com
# ---
org_url: https://acme--staging.sandbox.my.salesforce.com
replay_preset
Initial replay position used per topic only on first run (when no checkpoint exists in the cache); ignored once a topic’s replay ID has been written.
-
latest: Start from new events only; any changes between prior run and Connect are skipped. -
earliest: Replay from the retention start (24h standard, 72h with enhanced retention). Use to recover missed events after outages.
Type: string
Default: latest
Options: latest, earliest
snapshot_max_batch_size
Page size for the REST snapshot query — records per /query response. Must be between 200 and 2000 per Salesforce REST API limits. Larger pages reduce HTTP round trips; smaller pages reduce peak memory per fetch.
Type: int
Default: 2000
# Examples:
snapshot_max_batch_size: 2000
# ---
snapshot_max_batch_size: 500
stream_batch_size
Number of events requested per gRPC Fetch call, per topic. Higher values improve throughput at the cost of peak batch memory; lower values give steadier latency under load.
Type: int
Default: 100
# Examples:
stream_batch_size: 100
# ---
stream_batch_size: 500
stream_snapshot
When true (default), paginate a full REST snapshot of every CDC sObject in topics before opening any streaming subscription. When false, skip the snapshot and start streaming immediately. Platform Event topics (/event/…) are always skipped — they have no REST equivalent.
Type: bool
Default: true
topics[]
Pub/Sub topics to subscribe to. Each entry is one of: a bare sObject name (Account → /data/AccountChangeEvent), an explicit CDC channel (/data/AccountChangeEvent), the CDC firehose (/data/ChangeEvents), or a Platform Event topic (/event/Order__e, /event/LoginEventStream). Each topic gets its own gRPC subscription with an independent replay cursor.
Type: array
# Examples:
topics:
- Account
- Contact
# ---
topics:
- /data/ChangeEvents
# ---
topics:
- Account
- /event/Order__e
# ---
topics:
- Opportunity
- MyCustom__c
- /event/Sync_Requested__e