timeplus

Beta

Sends messages to a data stream on Timeplus Enterprise (Cloud or Self-Hosted) using the Ingest API, or directly to the timeplusd component in Timeplus Enterprise.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  timeplus:
    target: timeplus
    url: https://us-west-2.timeplus.cloud
    workspace: "" # No default (optional)
    stream: "" # No default (required)
    apikey: "" # No default (optional)
    username: "" # No default (optional)
    password: "" # No default (optional)
    max_in_flight: 64
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
# All configuration fields, showing default values
output:
  label: ""
  timeplus:
    target: timeplus
    url: https://us-west-2.timeplus.cloud
    workspace: "" # No default (optional)
    stream: "" # No default (required)
    apikey: "" # No default (optional)
    username: "" # No default (optional)
    password: "" # No default (optional)
    max_in_flight: 64
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

This output only accepts structured messages. All messages must:

  • Contain the same keys.

  • Use a structure that matches the schema of the destination data stream.

If your upstream data source or pipeline returns unstructured messages, such as strings, you can configure an output processor to transform the messages. See the Unstructured messages section for examples.

Examples

  • Timeplus Enterprise (Cloud)

  • Timeplus Enterprise (Self-Hosted)

  • timeplusd

You must generate an API key using the web console of Timeplus Enterprise (Cloud).

output:
  timeplus:
    workspace: <workspace-id>
    stream: <stream-name>
    apikey: <api-key>

Replace the following placeholders with your own values:

  • <workspace-id>: The ID of the workspace you want to send messages to.

  • <stream-name>: The name of the destination data stream.

  • <api-key>: The API key for the Ingest API.

You must specify the username, password, and URL of the application server.

output:
  timeplus:
    url: http://localhost:8000
    workspace: <workspace-id>
    stream: <stream-name>
    username: <timeplus-username>
    password: <timeplus-password>

Replace the following placeholders with your own values:

  • <workspace-id>: The ID of the workspace you want to send messages to.

  • <stream-name>: The name of the destination data stream.

  • <timeplus-username>: The username for the Timeplus application server.

  • <timeplus-password>: The password for the Timeplus application server.

You must specify the HTTP port for timeplusd.

output:
  timeplus:
    url: http://localhost:3218
    stream: <stream-name>
    username: <timeplus-username>
    password: <timeplus-password>

Replace the following placeholders with your own values:

  • <stream-name>: The name of the destination data stream.

  • <timeplus-username>: The username for the Timeplus application server.

  • <timeplus-password>: The password for the Timeplus application server.

Unstructured messages

If your upstream data source or pipeline returns unstructured messages, such as strings, you can configure an output processor to transform them into structured messages and then pass them to the output.

In the following example, the mapping processor creates a field called raw, and uses the functions content().string() to store the original string content into it, thereby creating structured messages. If you use this example, you must also add the raw field name to the destination data stream, so that your message structure matches the schema of your destination data stream.

output:
  timeplus:
    workspace: <workspace-id>
    stream: <stream-name>
    apikey: <api-key>
	processors:
    - mapping: |
        root = {}
        root.raw = content().string()

Fields

target

The destination platform. For Timeplus Enterprise (Cloud or Self-Hosted), enter timeplus, or timeplusd for the timeplusd component.

Type: string

Default: timeplus

Options: timeplus, timeplusd

url

The URL of your Timeplus instance, which should always include the schema and host.

Type: string

# Examples
url: http://localhost:8000
url: http://127.0.0.1:3218

workspace

The ID of the workspace you want to send messages to. This field is required if the target field is set to timeplus.

Type: string

stream

The name of the destination data stream. Make sure the schema of the data stream matches this output.

Type: string

apikey

The API key for the Ingest API. You need to generate this in the web console of Timeplus Enterprise (Cloud). This field is required if you are sending message to Timeplus Enterprise (Cloud).

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

username

The username for the Timeplus application server. This field is required if you are sending messages to Timeplus Enterprise (Self-Hosted) or timeplusd.

Type: string

password

The password for the Timeplus application server. This field is required if you are sending messages to Timeplus Enterprise (Self-Hosted) or timeplusd.

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this number to improve throughput.

Type: int

Default: 64

batching

Configure a batching policy.

Type: object

# Examples
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The amount of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples
processors:
  - archive:
      format: concatenate
processors:
  - archive:
      format: lines
processors:
  - archive:
      format: json_array