salesforce_sink

Writes messages to Salesforce, routing each Kafka topic to its own sObject configuration.

Introduced in version 4.85.0.

Consumes batches of messages and writes them to Salesforce. Each message must have a topic field (set by the per-topic processor) and a data field containing the Salesforce record fields. The topic is used to look up the correct topic_mappings entry which defines the sObject, operation, and write mode.

Realtime mode uses the sObject Collections REST API (synchronous, up to 200 records/call). Bulk mode uses the Bulk API 2.0 (asynchronous, polls until complete).

  • Common

  • Advanced

outputs:
  label: ""
  salesforce_sink:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    restapi_version: v65.0
    request_timeout: 30s
    max_retries: 10
    bulk_batch_size: 1000
    max_concurrent_bulk_jobs: 10
    bulk_poll_interval: 5s
    batch_period: 5s
    max_in_flight: 1
    topic_mappings: [] # No default (required)
outputs:
  label: ""
  salesforce_sink:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    restapi_version: v65.0
    request_timeout: 30s
    max_retries: 10
    bulk_batch_size: 1000
    max_concurrent_bulk_jobs: 10
    bulk_poll_interval: 5s
    batch_period: 5s
    max_in_flight: 1
    topic_mappings: [] # No default (required)

Fields

batch_period

Maximum period to wait before flushing an incomplete batch.

Type: string

Default: 5s

bulk_batch_size

Number of records per bulk job. Also controls the output batch size.

Type: int

Default: 1000

bulk_poll_interval

How often to poll Salesforce for bulk job completion status.

Type: string

Default: 5s

client_id

Client ID for the Salesforce Connected App.

Type: string

client_secret

Client secret for the Salesforce Connected App.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

max_concurrent_bulk_jobs

Maximum number of bulk jobs polling concurrently in the background. Each in-flight job buffers its CSV payload in memory. Lower this value if memory usage is a concern.

Type: int

Default: 10

max_in_flight

Maximum number of batches to send concurrently. Increasing this value improves real-time write throughput.

Type: int

Default: 1

max_retries

Maximum number of retries on a 429 Too Many Requests error.

Type: int

Default: 10

org_url

Salesforce instance base URL (for example, https://your-domain.salesforce.com).

Type: string

request_timeout

Timeout for HTTP requests.

Type: string

Default: 30s

restapi_version

Salesforce REST API version to use (for example, v65.0).

Type: string

Default: v65.0

topic_mappings[]

Per-topic Salesforce write configuration. Each entry maps a Kafka topic to an sObject and write settings.

Type: object

topic_mappings[].all_or_none

Real-time only: rolls back the entire batch if any record fails.

Type: bool

Default: false

topic_mappings[].external_id_field

External ID field name. Required for upsert operations.

Type: string

Default: ""

topic_mappings[].mode

Write mode: realtime (sObject Collections API) or bulk (Bulk API 2.0).

Type: string

Default: realtime

topic_mappings[].operation

Write operation: insert, update, upsert, or delete.

Type: string

Default: upsert

topic_mappings[].sobject

Salesforce sObject API name (for example, Account, Contact, MyObject__c).

Type: string

topic_mappings[].topic

Kafka topic name to match against the message’s topic field.

Type: string