salesforce_sink

Writes messages to Salesforce, routing each Kafka topic to its own sObject configuration.

Consumes batches of messages and writes them to Salesforce. Each message must have a topic field (set by the per-topic processor) and a data field containing the Salesforce record fields. The topic is used to look up the correct topic_mappings entry which defines the sObject, operation, and write mode.

Realtime mode uses the sObject Collections REST API (synchronous, up to 200 records/call). Bulk mode uses the Bulk API 2.0 (asynchronous, polls until complete).

  • Common

  • Advanced

outputs:
  label: ""
  salesforce_sink:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    restapi_version: v65.0
    request_timeout: 30s
    max_retries: 10
    bulk_batch_size: 1000
    max_concurrent_bulk_jobs: 10
    bulk_poll_interval: 5s
    batch_period: 5s
    max_in_flight: 1
    topic_mappings: [] # No default (required)
outputs:
  label: ""
  salesforce_sink:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    restapi_version: v65.0
    request_timeout: 30s
    max_retries: 10
    bulk_batch_size: 1000
    max_concurrent_bulk_jobs: 10
    bulk_poll_interval: 5s
    batch_period: 5s
    max_in_flight: 1
    topic_mappings: [] # No default (required)

Fields

batch_period

Maximum period to wait before flushing an incomplete batch.

Type: string

Default: 5s

bulk_batch_size

Number of records per bulk job. Also controls the output batch size.

Type: int

Default: 1000

bulk_poll_interval

How often to poll Salesforce for bulk job completion status.

Type: string

Default: 5s

client_id

Client ID for the Salesforce Connected App.

Type: string

client_secret

Client secret for the Salesforce Connected App.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

max_concurrent_bulk_jobs

Maximum number of bulk jobs polling concurrently in the background. Each in-flight job buffers its CSV payload in memory. Lower this value if memory usage is a concern.

Type: int

Default: 10

max_in_flight

Maximum number of batches to send concurrently. Increasing this value improves real-time write throughput.

Type: int

Default: 1

max_retries

Maximum number of retries on a 429 Too Many Requests error.

Type: int

Default: 10

org_url

Salesforce instance base URL (for example, https://your-domain.salesforce.com).

Type: string

request_timeout

Timeout for HTTP requests.

Type: string

Default: 30s

restapi_version

Salesforce REST API version to use (for example, v65.0).

Type: string

Default: v65.0

topic_mappings[]

Per-topic Salesforce write configuration. Each entry maps a Kafka topic to an sObject and write settings.

Type: object

topic_mappings[].all_or_none

Real-time only: rolls back the entire batch if any record fails.

Type: bool

Default: false

topic_mappings[].external_id_field

External ID field name. Required for upsert operations.

Type: string

Default: ""

topic_mappings[].mode

Write mode: realtime (sObject Collections API) or bulk (Bulk API 2.0).

Type: string

Default: realtime

topic_mappings[].operation

Write operation: insert, update, upsert, or delete.

Type: string

Default: upsert

topic_mappings[].sobject

Salesforce sObject API name (for example, Account, Contact, MyObject__c).

Type: string

topic_mappings[].topic

Kafka topic name to match against the message’s topic field.

Type: string