salesforce_sink
Writes messages to Salesforce, routing each Kafka topic to its own sObject configuration.
Introduced in version 4.85.0.
Consumes batches of messages and writes them to Salesforce. Each message must have a topic field (set by the per-topic processor) and a data field containing the Salesforce record fields. The topic is used to look up the correct topic_mappings entry which defines the sObject, operation, and write mode.
Realtime mode uses the sObject Collections REST API (synchronous, up to 200 records/call). Bulk mode uses the Bulk API 2.0 (asynchronous, polls until complete).
-
Common
-
Advanced
outputs:
label: ""
salesforce_sink:
org_url: "" # No default (required)
client_id: "" # No default (required)
client_secret: "" # No default (required)
api_version: v65.0
bulk_batch_size: 1000
max_concurrent_bulk_jobs: 10
bulk_poll_interval: 5s
batch_period: 5s
max_in_flight: 1
topic_mappings: [] # No default (required)
outputs:
label: ""
salesforce_sink:
org_url: "" # No default (required)
client_id: "" # No default (required)
client_secret: "" # No default (required)
api_version: v65.0
bulk_batch_size: 1000
max_concurrent_bulk_jobs: 10
bulk_poll_interval: 5s
batch_period: 5s
max_in_flight: 1
topic_mappings: [] # No default (required)
http:
timeout: 5s
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: ""
root_cas_file: ""
client_certs: []
proxy_url: ""
disable_http2: false
tps_limit: 0
tps_burst: 1
backoff:
initial_interval: 1s
max_interval: 30s
max_retries: 3
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
http:
max_idle_conns: 100
max_idle_conns_per_host: 0
max_conns_per_host: 64
idle_conn_timeout: 1m30s
tls_handshake_timeout: 10s
expect_continue_timeout: 1s
response_header_timeout: 0s
disable_keep_alives: false
disable_compression: false
max_response_header_bytes: 1048576
max_response_body_bytes: 10485760
write_buffer_size: 4096
read_buffer_size: 4096
h2:
strict_max_concurrent_requests: false
max_decoder_header_table_size: 4096
max_encoder_header_table_size: 4096
max_read_frame_size: 16384
max_receive_buffer_per_connection: 1048576
max_receive_buffer_per_stream: 1048576
send_ping_timeout: 0s
ping_timeout: 15s
write_byte_timeout: 0s
access_log_level: ""
access_log_body_limit: 0
Fields
api_version
Salesforce REST API version to target, prefixed with v. Affects endpoint paths (/services/data/{api_version}/…) and available fields/objects. Must be supported by your org — check Setup → Company Information. Older versions may lack recent fields.
Type: string
Default: v65.0
# Examples:
api_version: v65.0
# ---
api_version: v62.0
bulk_batch_size
Number of records per bulk job. Also controls the output batch size.
Type: int
Default: 1000
bulk_poll_interval
How often to poll Salesforce for bulk job completion status.
Type: string
Default: 5s
client_secret
Client secret for the Salesforce Connected App.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
http
HTTP client configuration for Salesforce REST calls (OAuth token endpoint and, where applicable, data queries).
Type: object
http.access_log_body_limit
Maximum bytes of request/response body to include in logs. 0 to skip body logging.
Type: int
Default: 0
http.access_log_level
Log level for HTTP request/response logging. Empty disables logging.
Type: string
Default: ""
Options: `, `TRACE, DEBUG, INFO, WARN, ERROR
http.backoff
Adaptive backoff configuration for 429 (Too Many Requests) responses. Always active.
Type: object
http.backoff.initial_interval
Initial interval between retries on 429 responses.
Type: string
Default: 1s
http.backoff.max_interval
Maximum interval between retries on 429 responses.
Type: string
Default: 30s
http.http
HTTP transport settings controlling connection pooling, timeouts, and HTTP/2.
Type: object
http.http.disable_compression
Disable automatic decompression of gzip responses.
Type: bool
Default: false
http.http.disable_keep_alives
Disable HTTP keep-alive connections; each request uses a new connection.
Type: bool
Default: false
http.http.expect_continue_timeout
Maximum time to wait for a server’s 100-continue response before sending the body. 0 means the body is sent immediately.
Type: string
Default: 1s
http.http.h2.max_decoder_header_table_size
Upper limit in bytes for the HPACK header table used to decode headers from the peer. Must be less than 4 MiB.
Type: int
Default: 4096
http.http.h2.max_encoder_header_table_size
Upper limit in bytes for the HPACK header table used to encode headers sent to the peer. Must be less than 4 MiB.
Type: int
Default: 4096
http.http.h2.max_read_frame_size
Largest HTTP/2 frame this endpoint will read. Valid range: 16 KiB to 16 MiB.
Type: int
Default: 16384
http.http.h2.max_receive_buffer_per_connection
Maximum flow-control window size in bytes for data received on a connection. Must be at least 64 KiB and less than 4 MiB.
Type: int
Default: 1048576
http.http.h2.max_receive_buffer_per_stream
Maximum flow-control window size in bytes for data received on a single stream. Must be less than 4 MiB.
Type: int
Default: 1048576
http.http.h2.ping_timeout
Timeout waiting for a PING response before closing the connection.
Type: string
Default: 15s
http.http.h2.send_ping_timeout
Idle timeout after which a PING frame is sent to verify connection health. 0 disables health checks.
Type: string
Default: 0s
http.http.h2.strict_max_concurrent_requests
When true, new requests block when a connection’s concurrency limit is reached instead of opening a new connection.
Type: bool
Default: false
http.http.h2.write_byte_timeout
Timeout for writing data to a connection. The timer resets whenever bytes are written. 0 disables the timeout.
Type: string
Default: 0s
http.http.idle_conn_timeout
How long an idle connection remains in the pool before being closed. 0 disables the timeout.
Type: string
Default: 1m30s
http.http.max_conns_per_host
Maximum total connections (active + idle) per host. 0 means unlimited.
Type: int
Default: 64
http.http.max_idle_conns
Maximum total number of idle (keep-alive) connections across all hosts. 0 means unlimited.
Type: int
Default: 100
http.http.max_idle_conns_per_host
Maximum idle connections to keep per host. 0 (the default) uses GOMAXPROCS+1.
Type: int
Default: 0
http.http.max_response_body_bytes
Maximum bytes of response body the client will read. The response body is wrapped with a limit reader; reads beyond this cap return EOF. 0 disables the limit.
Type: int
Default: 10485760
http.http.max_response_header_bytes
Maximum bytes of response headers to allow.
Type: int
Default: 1048576
http.http.response_header_timeout
Maximum time to wait for response headers after writing the full request. 0 disables the timeout.
Type: string
Default: 0s
http.http.tls_handshake_timeout
Maximum time to wait for a TLS handshake to complete. 0 disables the timeout.
Type: string
Default: 10s
http.http.write_buffer_size
Size in bytes of the per-connection write buffer.
Type: int
Default: 4096
http.tcp.connect_timeout
Maximum amount of time a dial will wait for a connect to complete. Zero disables.
Type: string
Default: 0s
http.tcp.keep_alive.count
Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.
Type: int
Default: 9
http.tcp.keep_alive.idle
Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.
Type: string
Default: 15s
http.tcp.keep_alive.interval
Duration between keep-alive probes. Zero defaults to 15s.
Type: string
Default: 15s
http.tcp.tcp_user_timeout
Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.
Type: string
Default: 0s
http.tls.client_certs[]
A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
http.tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
http.tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
http.tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Requires version 3.45.0 or later.
Type: bool
Default: false
http.tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
http.tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
http.tls.skip_cert_verify
Whether to skip server side certificate verification.
Type: bool
Default: false
max_concurrent_bulk_jobs
Maximum number of bulk jobs polling concurrently in the background. Each in-flight job buffers its CSV payload in memory. Lower this value if memory usage is a concern.
Type: int
Default: 10
max_in_flight
Maximum number of batches to send concurrently. Increasing this value improves real-time write throughput.
Type: int
Default: 1
org_url
Salesforce instance base URL (for example, https://your-domain.salesforce.com).
Type: string
# Examples:
org_url: https://acme.my.salesforce.com
# ---
org_url: https://acme--staging.sandbox.my.salesforce.com
topic_mappings[]
Per-topic Salesforce write configuration. Each entry maps a Kafka topic to an sObject and write settings.
Type: object
topic_mappings[].all_or_none
Real-time only: rolls back the entire batch if any record fails.
Type: bool
Default: false
topic_mappings[].external_id_field
External ID field name. Required for upsert operations.
Type: string
Default: ""
topic_mappings[].mode
Write mode: realtime (sObject Collections API) or bulk (Bulk API 2.0).
Type: string
Default: realtime
topic_mappings[].operation
Write operation: insert, update, upsert, or delete.
Type: string
Default: upsert