gcp_pubsub

Sends messages to a GCP Cloud Pub/Sub topic. Metadata from messages are sent as attributes.

  • Common

  • Advanced

# Common config fields, showing default values
output:
  label: ""
  gcp_pubsub:
    project: "" # No default (required)
    credentials_json: "" # No default (optional)
    topic: "" # No default (required)
    endpoint: ""
    max_in_flight: 64
    count_threshold: 100
    delay_threshold: 10ms
    byte_threshold: 1000000
    metadata:
      exclude_prefixes: []
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
# All config fields, showing default values
output:
  label: ""
  gcp_pubsub:
    project: "" # No default (required)
    credentials_json: "" # No default (optional)
    topic: "" # No default (required)
    endpoint: ""
    ordering_key: "" # No default (optional)
    max_in_flight: 64
    count_threshold: 100
    delay_threshold: 10ms
    byte_threshold: 1000000
    publish_timeout: 1m0s
    metadata:
      exclude_prefixes: []
    flow_control:
      max_outstanding_bytes: -1
      max_outstanding_messages: 1000
      limit_exceeded_behavior: block
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

For information on how to set up credentials, see this guide.

Troubleshooting

If you’re consistently seeing Failed to send message to gcp_pubsub: context deadline exceeded error logs without any further information it is possible that you are encountering https://github.com/benthosdev/benthos/issues/1042, which occurs when metadata values contain characters that are not valid utf-8. This can frequently occur when consuming from Kafka as the key metadata field may be populated with an arbitrary binary value, but this issue is not exclusive to Kafka.

If you are blocked by this issue then a work around is to delete either the specific problematic keys:

pipeline:
  processors:
    - mapping: |
        meta kafka_key = deleted()

Or delete all keys with:

pipeline:
  processors:
    - mapping: meta = deleted()

Fields

project

The project ID of the topic to publish to.

Type: string

credentials_json

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

topic

The topic to publish to. This field supports interpolation functions.

Type: string

endpoint

An optional endpoint to override the default of pubsub.googleapis.com:443. This can be used to connect to a region specific pubsub endpoint. For a list of valid values, see this document.

Type: string

Default: ""

# Examples

endpoint: us-central1-pubsub.googleapis.com:443

endpoint: us-west3-pubsub.googleapis.com:443

ordering_key

The ordering key to use for publishing messages. This field supports interpolation functions.

Type: string

max_in_flight

The maximum number of messages to have in flight at a given time. Increasing this may improve throughput.

Type: int

Default: 64

count_threshold

Publish a pubsub buffer when it has this many messages

Type: int

Default: 100

delay_threshold

Publish a non-empty pubsub buffer after this delay has passed.

Type: string

Default: "10ms"

byte_threshold

Publish a batch when its size in bytes reaches this value.

Type: int

Default: 1000000

publish_timeout

The maximum length of time to wait before abandoning a publish attempt for a message.

Type: string

Default: "1m0s"

# Examples

publish_timeout: 10s

publish_timeout: 5m

publish_timeout: 60m

metadata

Specify criteria for which metadata values are sent as attributes, all are sent by default.

Type: object

metadata.exclude_prefixes

Provide a list of explicit metadata key prefixes to be excluded when adding metadata to sent messages.

Type: array

Default: []

flow_control

For a given topic, configures the PubSub client’s internal buffer for messages to be published.

Type: object

flow_control.max_outstanding_bytes

Maximum size of buffered messages to be published. If less than or equal to zero, this is disabled.

Type: int

Default: -1

flow_control.max_outstanding_messages

Maximum number of buffered messages to be published. If less than or equal to zero, this is disabled.

Type: int

Default: 1000

flow_control.limit_exceeded_behavior

Configures the behavior when trying to publish additional messages while the flow controller is full. The available options are block (default), ignore (disable), and signal_error (publish results will return an error).

Type: string

Default: "block"

Options: ignore , block , signal_error .

batching

Configures a batching policy on this output. While the PubSub client maintains its own internal buffering mechanism, preparing larger batches of messages can further trade-off some latency for throughput.

Type: object

# Examples

batching:
  byte_size: 5000
  count: 0
  period: 1s

batching:
  count: 10
  period: 1s

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples

processors:
  - archive:
      format: concatenate

processors:
  - archive:
      format: lines

processors:
  - archive:
      format: json_array