salesforce_cdc

Subscribes to one or more Salesforce Pub/Sub topics in parallel and emits a message per event. Topics may be:

  • /data/<sObject>ChangeEvent — per-sObject CDC channel.

  • /data/ChangeEvents — CDC firehose (every CDC-enabled sObject).

  • /event/<EventName>__e — custom Platform Event.

  • /event/<StandardEventName> — standard Platform Event (e.g. LoginEventStream).

  • A bare sObject name (e.g. Account) is shorthand for /data/AccountChangeEvent.

Introduced in version 4.90.3.

Optionally runs a REST snapshot for the CDC sObjects before opening the streaming subscriptions, so the pipeline sees the current state plus continuous changes. Per-topic replay state persists in a cache resource so each subscription resumes across restarts independently.

When to use this input

Use salesforce_cdc for:

  • Continuous ingestion with both historical state (snapshot) and live changes (CDC).

  • Real-time custom or standard Platform Events.

  • Mixed CDC + Platform Event pipelines under a single component.

Use a different Salesforce input instead if:

  • You only need a one-off extract or periodic SOQL query — use salesforce.

  • You need a GraphQL query (cross-object in one request) — use salesforce_graphql.

  • Common

  • Advanced

inputs:
  label: ""
  salesforce_cdc:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    api_version: v65.0
    topics: [] # No default (required)
    stream_snapshot: true
    replay_preset: latest
    snapshot_max_batch_size: 2000
    stream_batch_size: 100
    max_parallel_snapshot_objects: 1
    checkpoint_cache: "" # No default (required)
    checkpoint_cache_key: salesforce_cdc
    checkpoint_limit: 1024
    auto_replay_nacks: true
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
inputs:
  label: ""
  salesforce_cdc:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    api_version: v65.0
    topics: [] # No default (required)
    stream_snapshot: true
    replay_preset: latest
    snapshot_max_batch_size: 2000
    stream_batch_size: 100
    max_parallel_snapshot_objects: 1
    checkpoint_cache: "" # No default (required)
    checkpoint_cache_key: salesforce_cdc
    checkpoint_limit: 1024
    auto_replay_nacks: true
    grpc:
      reconnect_base_delay: 500ms
      reconnect_max_delay: 30s
      reconnect_max_attempts: 0
      shutdown_timeout: 10s
      buffer_size: 1000
    http:
      timeout: 5s
      tls:
        enabled: false
        skip_cert_verify: false
        enable_renegotiation: false
        root_cas: ""
        root_cas_file: ""
        client_certs: []
      proxy_url: ""
      disable_http2: false
      tps_limit: 0
      tps_burst: 1
      backoff:
        initial_interval: 1s
        max_interval: 30s
        max_retries: 3
      tcp:
        connect_timeout: 0s
        keep_alive:
          idle: 15s
          interval: 15s
          count: 9
        tcp_user_timeout: 0s
      http:
        max_idle_conns: 100
        max_idle_conns_per_host: 0
        max_conns_per_host: 64
        idle_conn_timeout: 1m30s
        tls_handshake_timeout: 10s
        expect_continue_timeout: 1s
        response_header_timeout: 0s
        disable_keep_alives: false
        disable_compression: false
        max_response_header_bytes: 1048576
        max_response_body_bytes: 10485760
        write_buffer_size: 4096
        read_buffer_size: 4096
        h2:
          strict_max_concurrent_requests: false
          max_decoder_header_table_size: 4096
          max_encoder_header_table_size: 4096
          max_read_frame_size: 16384
          max_receive_buffer_per_connection: 1048576
          max_receive_buffer_per_stream: 1048576
          send_ping_timeout: 0s
          ping_timeout: 15s
          write_byte_timeout: 0s
      access_log_level: ""
      access_log_body_limit: 0
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

Fields

api_version

Salesforce REST API version to target, prefixed with v. Affects endpoint paths (/services/data/{api_version}/…​) and available fields/objects. Must be supported by your org — check Setup → Company Information. Older versions may lack recent fields.

Type: string

Default: v65.0

# Examples:
api_version: v65.0

# ---

api_version: v62.0

auto_replay_nacks

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

Type: bool

Default: true

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

checkpoint_cache

Name of the cache resource used to persist snapshot cursor and per-topic replay IDs across restarts. The cache must be declared under the top-level cache_resources block. Choose a durable cache (Redis, Postgres, DynamoDB) for production; in-memory caches lose checkpoints on restart.

Type: string

# Examples:
checkpoint_cache: persistent_cache

checkpoint_cache_key

Key inside the checkpoint cache where this input’s state is stored. Change when running multiple salesforce_cdc inputs against the same cache resource to avoid collisions.

Type: string

Default: salesforce_cdc

checkpoint_limit

Maximum number of unacknowledged batches in flight (per topic) before that topic pauses reading. Prevents unbounded memory growth when downstream components stall. Higher values increase throughput in steady state; lower values bound memory under backpressure.

Type: int

Default: 1024

client_id

Consumer Key of the Salesforce Connected App authorized for the OAuth Client Credentials flow. Create the Connected App under Setup → App Manager → New Connected App, enable OAuth settings, enable the Client Credentials Flow under Flow Enablement, then copy the Consumer Key from Manage Consumer Details.

Type: string

client_secret

Consumer Secret of the Salesforce Connected App, paired with client_id. Sensitive — prefer environment variable interpolation (${SALESFORCE_CLIENT_SECRET}) over inlining.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

grpc

gRPC transport tuning for the Pub/Sub API connection.

Type: object

grpc.buffer_size

Size of the internal gRPC event receive buffer.

Type: int

Default: 1000

grpc.reconnect_base_delay

Base delay for gRPC reconnection backoff.

Type: string

Default: 500ms

grpc.reconnect_max_attempts

Maximum number of gRPC reconnection attempts. 0 means unlimited.

Type: int

Default: 0

grpc.reconnect_max_delay

Maximum delay for gRPC reconnection backoff.

Type: string

Default: 30s

grpc.shutdown_timeout

Timeout for graceful gRPC client shutdown.

Type: string

Default: 10s

http

HTTP client configuration for Salesforce REST calls (OAuth token endpoint and, where applicable, data queries).

Type: object

http.access_log_body_limit

Maximum bytes of request/response body to include in logs. 0 to skip body logging.

Type: int

Default: 0

http.access_log_level

Log level for HTTP request/response logging. Empty disables logging.

Type: string

Default: ""

Options: `, `TRACE, DEBUG, INFO, WARN, ERROR

http.backoff

Adaptive backoff configuration for 429 (Too Many Requests) responses. Always active.

Type: object

http.backoff.initial_interval

Initial interval between retries on 429 responses.

Type: string

Default: 1s

http.backoff.max_interval

Maximum interval between retries on 429 responses.

Type: string

Default: 30s

http.backoff.max_retries

Maximum number of retries on 429 responses.

Type: int

Default: 3

http.disable_http2

Disable HTTP/2 and force HTTP/1.1.

Type: bool

Default: false

http.http

HTTP transport settings controlling connection pooling, timeouts, and HTTP/2.

Type: object

http.http.disable_compression

Disable automatic decompression of gzip responses.

Type: bool

Default: false

http.http.disable_keep_alives

Disable HTTP keep-alive connections; each request uses a new connection.

Type: bool

Default: false

http.http.expect_continue_timeout

Maximum time to wait for a server’s 100-continue response before sending the body. 0 means the body is sent immediately.

Type: string

Default: 1s

http.http.h2

HTTP/2-specific transport settings. Only applied when HTTP/2 is enabled.

Type: object

http.http.h2.max_decoder_header_table_size

Upper limit in bytes for the HPACK header table used to decode headers from the peer. Must be less than 4 MiB.

Type: int

Default: 4096

http.http.h2.max_encoder_header_table_size

Upper limit in bytes for the HPACK header table used to encode headers sent to the peer. Must be less than 4 MiB.

Type: int

Default: 4096

http.http.h2.max_read_frame_size

Largest HTTP/2 frame this endpoint will read. Valid range: 16 KiB to 16 MiB.

Type: int

Default: 16384

http.http.h2.max_receive_buffer_per_connection

Maximum flow-control window size in bytes for data received on a connection. Must be at least 64 KiB and less than 4 MiB.

Type: int

Default: 1048576

http.http.h2.max_receive_buffer_per_stream

Maximum flow-control window size in bytes for data received on a single stream. Must be less than 4 MiB.

Type: int

Default: 1048576

http.http.h2.ping_timeout

Timeout waiting for a PING response before closing the connection.

Type: string

Default: 15s

http.http.h2.send_ping_timeout

Idle timeout after which a PING frame is sent to verify connection health. 0 disables health checks.

Type: string

Default: 0s

http.http.h2.strict_max_concurrent_requests

When true, new requests block when a connection’s concurrency limit is reached instead of opening a new connection.

Type: bool

Default: false

http.http.h2.write_byte_timeout

Timeout for writing data to a connection. The timer resets whenever bytes are written. 0 disables the timeout.

Type: string

Default: 0s

http.http.idle_conn_timeout

How long an idle connection remains in the pool before being closed. 0 disables the timeout.

Type: string

Default: 1m30s

http.http.max_conns_per_host

Maximum total connections (active + idle) per host. 0 means unlimited.

Type: int

Default: 64

http.http.max_idle_conns

Maximum total number of idle (keep-alive) connections across all hosts. 0 means unlimited.

Type: int

Default: 100

http.http.max_idle_conns_per_host

Maximum idle connections to keep per host. 0 (the default) uses GOMAXPROCS+1.

Type: int

Default: 0

http.http.max_response_body_bytes

Maximum bytes of response body the client will read. The response body is wrapped with a limit reader; reads beyond this cap return EOF. 0 disables the limit.

Type: int

Default: 10485760

http.http.max_response_header_bytes

Maximum bytes of response headers to allow.

Type: int

Default: 1048576

http.http.read_buffer_size

Size in bytes of the per-connection read buffer.

Type: int

Default: 4096

http.http.response_header_timeout

Maximum time to wait for response headers after writing the full request. 0 disables the timeout.

Type: string

Default: 0s

http.http.tls_handshake_timeout

Maximum time to wait for a TLS handshake to complete. 0 disables the timeout.

Type: string

Default: 10s

http.http.write_buffer_size

Size in bytes of the per-connection write buffer.

Type: int

Default: 4096

http.proxy_url

HTTP proxy URL. Empty string disables proxying.

Type: string

Default: ""

http.tcp

TCP socket configuration.

Type: object

http.tcp.connect_timeout

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

Type: string

Default: 0s

http.tcp.keep_alive

TCP keep-alive probe configuration.

Type: object

http.tcp.keep_alive.count

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

Type: int

Default: 9

http.tcp.keep_alive.idle

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

Type: string

Default: 15s

http.tcp.keep_alive.interval

Duration between keep-alive probes. Zero defaults to 15s.

Type: string

Default: 15s

http.tcp.tcp_user_timeout

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.

Type: string

Default: 0s

http.timeout

HTTP request timeout.

Type: string

Default: 5s

http.tls

Custom TLS settings can be used to override system defaults.

Type: object

http.tls.client_certs[]

A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.

Type: object

Default: []

# Examples:
client_certs:
  - cert: foo
    key: bar


# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

http.tls.client_certs[].cert

A plain text certificate to use.

Type: string

Default: ""

http.tls.client_certs[].cert_file

The path of a certificate to use.

Type: string

Default: ""

http.tls.client_certs[].key

A plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

http.tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

http.tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}

http.tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Requires version 3.45.0 or later.

Type: bool

Default: false

http.tls.enabled

Whether custom TLS settings are enabled.

Type: bool

Default: false

http.tls.root_cas

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

http.tls.root_cas_file

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string

Default: ""

# Examples:
root_cas_file: ./root_cas.pem

http.tls.skip_cert_verify

Whether to skip server side certificate verification.

Type: bool

Default: false

http.tps_burst

Maximum burst size for rate limiting.

Type: int

Default: 1

http.tps_limit

Rate limit in requests per second. 0 disables rate limiting.

Type: float

Default: 0

max_parallel_snapshot_objects

Number of sObjects snapshotted concurrently during the REST snapshot phase. Each in-flight snapshot consumes one HTTP connection and Salesforce API call quota. Default 1 serializes the work — raise when snapshotting many sObjects and your API limits permit.

Type: int

Default: 1

org_url

Salesforce instance base URL for your org, protocol included and no trailing slash. Used as the base for both the OAuth token endpoint and REST queries. Production orgs use https://{my-domain}.my.salesforce.com; sandboxes use https://{my-domain}.sandbox.my.salesforce.com. Legacy instance URLs (https://na123.salesforce.com) still work but My Domain URLs are strongly recommended by Salesforce.

Type: string

# Examples:
org_url: https://acme.my.salesforce.com

# ---

org_url: https://acme--staging.sandbox.my.salesforce.com

replay_preset

Initial replay position used per topic only on first run (when no checkpoint exists in the cache); ignored once a topic’s replay ID has been written.

  • latest: Start from new events only; any changes between prior run and Connect are skipped.

  • earliest: Replay from the retention start (24h standard, 72h with enhanced retention). Use to recover missed events after outages.

Type: string

Default: latest

Options: latest, earliest

snapshot_max_batch_size

Page size for the REST snapshot query — records per /query response. Must be between 200 and 2000 per Salesforce REST API limits. Larger pages reduce HTTP round trips; smaller pages reduce peak memory per fetch.

Type: int

Default: 2000

# Examples:
snapshot_max_batch_size: 2000

# ---

snapshot_max_batch_size: 500

stream_batch_size

Number of events requested per gRPC Fetch call, per topic. Higher values improve throughput at the cost of peak batch memory; lower values give steadier latency under load.

Type: int

Default: 100

# Examples:
stream_batch_size: 100

# ---

stream_batch_size: 500

stream_snapshot

When true (default), paginate a full REST snapshot of every CDC sObject in topics before opening any streaming subscription. When false, skip the snapshot and start streaming immediately. Platform Event topics (/event/…​) are always skipped — they have no REST equivalent.

Type: bool

Default: true

topics[]

Pub/Sub topics to subscribe to. Each entry is one of: a bare sObject name (Account/data/AccountChangeEvent), an explicit CDC channel (/data/AccountChangeEvent), the CDC firehose (/data/ChangeEvents), or a Platform Event topic (/event/Order__e, /event/LoginEventStream). Each topic gets its own gRPC subscription with an independent replay cursor.

Type: array

# Examples:
topics:
  - Account
  - Contact

# ---

topics:
  - /data/ChangeEvents

# ---

topics:
  - Account
  - /event/Order__e

# ---

topics:
  - Opportunity
  - MyCustom__c
  - /event/Sync_Requested__e