salesforce_sink
Writes messages to Salesforce, routing each Kafka topic to its own sObject configuration.
Consumes batches of messages and writes them to Salesforce. Each message must have a topic field (set by the per-topic processor) and a data field containing the Salesforce record fields. The topic is used to look up the correct topic_mappings entry which defines the sObject, operation, and write mode.
Realtime mode uses the sObject Collections REST API (synchronous, up to 200 records/call). Bulk mode uses the Bulk API 2.0 (asynchronous, polls until complete).
-
Common
-
Advanced
outputs:
label: ""
salesforce_sink:
org_url: "" # No default (required)
client_id: "" # No default (required)
client_secret: "" # No default (required)
api_version: v65.0
bulk_batch_size: 1000
max_concurrent_bulk_jobs: 10
bulk_poll_interval: 5s
batch_period: 5s
max_in_flight: 1
topic_mappings: [] # No default (required)
outputs:
label: ""
salesforce_sink:
org_url: "" # No default (required)
client_id: "" # No default (required)
client_secret: "" # No default (required)
api_version: v65.0
bulk_batch_size: 1000
max_concurrent_bulk_jobs: 10
bulk_poll_interval: 5s
batch_period: 5s
max_in_flight: 1
topic_mappings: [] # No default (required)
http:
timeout: 5s
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: ""
root_cas_file: ""
client_certs: []
proxy_url: ""
disable_http2: false
tps_limit: 0
tps_burst: 1
backoff:
initial_interval: 1s
max_interval: 30s
max_retries: 3
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
http:
max_idle_conns: 100
max_idle_conns_per_host: 0
max_conns_per_host: 64
idle_conn_timeout: 1m30s
tls_handshake_timeout: 10s
expect_continue_timeout: 1s
response_header_timeout: 0s
disable_keep_alives: false
disable_compression: false
max_response_header_bytes: 1048576
max_response_body_bytes: 10485760
write_buffer_size: 4096
read_buffer_size: 4096
h2:
strict_max_concurrent_requests: false
max_decoder_header_table_size: 4096
max_encoder_header_table_size: 4096
max_read_frame_size: 16384
max_receive_buffer_per_connection: 1048576
max_receive_buffer_per_stream: 1048576
send_ping_timeout: 0s
ping_timeout: 15s
write_byte_timeout: 0s
access_log_level: ""
access_log_body_limit: 0
Fields
api_version
Salesforce REST API version to target, prefixed with v. Affects endpoint paths (/services/data/{api_version}/…) and available fields/objects. Must be supported by your org — check Setup → Company Information. Older versions may lack recent fields.
Type: string
Default: v65.0
# Examples:
api_version: v65.0
# ---
api_version: v62.0
bulk_batch_size
Number of records per bulk job. Also controls the output batch size.
Type: int
Default: 1000
bulk_poll_interval
How often to poll Salesforce for bulk job completion status.
Type: string
Default: 5s
client_secret
Client secret for the Salesforce Connected App.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
http
HTTP client configuration for Salesforce REST calls (OAuth token endpoint and, where applicable, data queries).
Type: object
http.access_log_body_limit
Maximum bytes of request/response body to include in logs. 0 to skip body logging.
Type: int
Default: 0
http.access_log_level
Log level for HTTP request/response logging. Empty disables logging.
Type: string
Default: ""
Options: `, `TRACE, DEBUG, INFO, WARN, ERROR
http.backoff
Adaptive backoff configuration for 429 (Too Many Requests) responses. Always active.
Type: object
http.backoff.initial_interval
Initial interval between retries on 429 responses.
Type: string
Default: 1s
http.backoff.max_interval
Maximum interval between retries on 429 responses.
Type: string
Default: 30s
http.http
HTTP transport settings controlling connection pooling, timeouts, and HTTP/2.
Type: object
http.http.disable_compression
Disable automatic decompression of gzip responses.
Type: bool
Default: false
http.http.disable_keep_alives
Disable HTTP keep-alive connections; each request uses a new connection.
Type: bool
Default: false
http.http.expect_continue_timeout
Maximum time to wait for a server’s 100-continue response before sending the body. 0 means the body is sent immediately.
Type: string
Default: 1s
http.http.h2.max_decoder_header_table_size
Upper limit in bytes for the HPACK header table used to decode headers from the peer. Must be less than 4 MiB.
Type: int
Default: 4096
http.http.h2.max_encoder_header_table_size
Upper limit in bytes for the HPACK header table used to encode headers sent to the peer. Must be less than 4 MiB.
Type: int
Default: 4096
http.http.h2.max_read_frame_size
Largest HTTP/2 frame this endpoint will read. Valid range: 16 KiB to 16 MiB.
Type: int
Default: 16384
http.http.h2.max_receive_buffer_per_connection
Maximum flow-control window size in bytes for data received on a connection. Must be at least 64 KiB and less than 4 MiB.
Type: int
Default: 1048576
http.http.h2.max_receive_buffer_per_stream
Maximum flow-control window size in bytes for data received on a single stream. Must be less than 4 MiB.
Type: int
Default: 1048576
http.http.h2.ping_timeout
Timeout waiting for a PING response before closing the connection.
Type: string
Default: 15s
http.http.h2.send_ping_timeout
Idle timeout after which a PING frame is sent to verify connection health. 0 disables health checks.
Type: string
Default: 0s
http.http.h2.strict_max_concurrent_requests
When true, new requests block when a connection’s concurrency limit is reached instead of opening a new connection.
Type: bool
Default: false
http.http.h2.write_byte_timeout
Timeout for writing data to a connection. The timer resets whenever bytes are written. 0 disables the timeout.
Type: string
Default: 0s
http.http.idle_conn_timeout
How long an idle connection remains in the pool before being closed. 0 disables the timeout.
Type: string
Default: 1m30s
http.http.max_conns_per_host
Maximum total connections (active + idle) per host. 0 means unlimited.
Type: int
Default: 64
http.http.max_idle_conns
Maximum total number of idle (keep-alive) connections across all hosts. 0 means unlimited.
Type: int
Default: 100
http.http.max_idle_conns_per_host
Maximum idle connections to keep per host. 0 (the default) uses GOMAXPROCS+1.
Type: int
Default: 0
http.http.max_response_body_bytes
Maximum bytes of response body the client will read. The response body is wrapped with a limit reader; reads beyond this cap return EOF. 0 disables the limit.
Type: int
Default: 10485760
http.http.max_response_header_bytes
Maximum bytes of response headers to allow.
Type: int
Default: 1048576
http.http.response_header_timeout
Maximum time to wait for response headers after writing the full request. 0 disables the timeout.
Type: string
Default: 0s
http.http.tls_handshake_timeout
Maximum time to wait for a TLS handshake to complete. 0 disables the timeout.
Type: string
Default: 10s
http.http.write_buffer_size
Size in bytes of the per-connection write buffer.
Type: int
Default: 4096
http.tcp.connect_timeout
Maximum amount of time a dial will wait for a connect to complete. Zero disables.
Type: string
Default: 0s
http.tcp.keep_alive.count
Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.
Type: int
Default: 9
http.tcp.keep_alive.idle
Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.
Type: string
Default: 15s
http.tcp.keep_alive.interval
Duration between keep-alive probes. Zero defaults to 15s.
Type: string
Default: 15s
http.tcp.tcp_user_timeout
Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.
Type: string
Default: 0s
http.tls.client_certs[]
A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
http.tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
http.tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
http.tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Type: bool
Default: false
http.tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
http.tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
http.tls.skip_cert_verify
Whether to skip server side certificate verification.
Type: bool
Default: false
max_concurrent_bulk_jobs
Maximum number of bulk jobs polling concurrently in the background. Each in-flight job buffers its CSV payload in memory. Lower this value if memory usage is a concern.
Type: int
Default: 10
max_in_flight
Maximum number of batches to send concurrently. Increasing this value improves real-time write throughput.
Type: int
Default: 1
org_url
Salesforce instance base URL (for example, https://your-domain.salesforce.com).
Type: string
# Examples:
org_url: https://acme.my.salesforce.com
# ---
org_url: https://acme--staging.sandbox.my.salesforce.com
topic_mappings[]
Per-topic Salesforce write configuration. Each entry maps a Kafka topic to an sObject and write settings.
Type: object
topic_mappings[].all_or_none
Real-time only: rolls back the entire batch if any record fails.
Type: bool
Default: false
topic_mappings[].external_id_field
External ID field name. Required for upsert operations.
Type: string
Default: ""
topic_mappings[].mode
Write mode: realtime (sObject Collections API) or bulk (Bulk API 2.0).
Type: string
Default: realtime
topic_mappings[].operation
Write operation: insert, update, upsert, or delete.
Type: string
Default: upsert