sequence

Reads messages from a sequence of child inputs, starting with the first and once that input gracefully terminates starts consuming from the next, and so on.

  • Common

  • Advanced

# Common config fields, showing default values
input:
  label: ""
  sequence:
    inputs: [] # No default (required)
# All config fields, showing default values
input:
  label: ""
  sequence:
    sharded_join:
      type: none
      id_path: ""
      iterations: 1
      merge_strategy: array
    inputs: [] # No default (required)

This input is useful for consuming from inputs that have an explicit end but must not be consumed in parallel.

Examples

  • End of Stream Message

  • Joining Data (Simple)

  • Joining Data (Advanced)

A common use case for sequence might be to generate a message at the end of our main input. With the following config once the records within ./dataset.csv are exhausted our final payload {"status":"finished"} will be routed through the pipeline.

input:
  sequence:
    inputs:
      - file:
          paths: [ ./dataset.csv ]
          scanner:
            csv: {}
      - generate:
          count: 1
          mapping: 'root = {"status":"finished"}'

Redpanda Connect can be used to join unordered data from fragmented datasets in memory by specifying a common identifier field and a number of sharded iterations. For example, given two CSV files, the first called "main.csv", which contains rows of user data:

uuid,name,age
AAA,Melanie,34
BBB,Emma,28
CCC,Geri,45

And the second called "hobbies.csv" that, for each user, contains zero or more rows of hobbies:

uuid,hobby
CCC,pokemon go
AAA,rowing
AAA,golf

We can parse and join this data into a single dataset:

{"uuid":"AAA","name":"Melanie","age":34,"hobbies":["rowing","golf"]}
{"uuid":"BBB","name":"Emma","age":28}
{"uuid":"CCC","name":"Geri","age":45,"hobbies":["pokemon go"]}

With the following config:

input:
  sequence:
    sharded_join:
      type: full-outer
      id_path: uuid
      merge_strategy: array
    inputs:
      - file:
          paths:
            - ./hobbies.csv
            - ./main.csv
          scanner:
            csv: {}

In this example we are able to join unordered and fragmented data from a combination of CSV files and newline-delimited JSON documents by specifying multiple sequence inputs with their own processors for extracting the structured data.

The first file "main.csv" contains straight forward CSV data:

uuid,name,age
AAA,Melanie,34
BBB,Emma,28
CCC,Geri,45

And the second file called "hobbies.ndjson" contains JSON documents, one per line, that associate an identifier with an array of hobbies. However, these data objects are in a nested format:

{"document":{"uuid":"CCC","hobbies":[{"type":"pokemon go"}]}}
{"document":{"uuid":"AAA","hobbies":[{"type":"rowing"},{"type":"golf"}]}}

And so we will want to map these into a flattened structure before the join, and then we will end up with a single dataset that looks like this:

{"uuid":"AAA","name":"Melanie","age":34,"hobbies":["rowing","golf"]}
{"uuid":"BBB","name":"Emma","age":28}
{"uuid":"CCC","name":"Geri","age":45,"hobbies":["pokemon go"]}

With the following config:

input:
  sequence:
    sharded_join:
      type: full-outer
      id_path: uuid
      iterations: 10
      merge_strategy: array
    inputs:
      - file:
          paths: [ ./main.csv ]
          scanner:
            csv: {}
      - file:
          paths: [ ./hobbies.ndjson ]
          scanner:
            lines: {}
        processors:
          - mapping: |
              root.uuid = this.document.uuid
              root.hobbies = this.document.hobbies.map_each(this.type)

Fields

sharded_join

EXPERIMENTAL: Provides a way to perform outer joins of arbitrarily structured and unordered data resulting from the input sequence, even when the overall size of the data surpasses the memory available on the machine.

When configured the sequence of inputs will be consumed one or more times according to the number of iterations, and when more than one iteration is specified each iteration will process an entirely different set of messages by sharding them by the ID field. Increasing the number of iterations reduces the memory consumption at the cost of needing to fully parse the data each time.

Each message must be structured (JSON or otherwise processed into a structured form) and the fields will be aggregated with those of other messages sharing the ID. At the end of each iteration the joined messages are flushed downstream before the next iteration begins, hence keeping memory usage limited.

Type: object

Requires version 3.40.0 or newer

sharded_join.type

The type of join to perform. A full-outer ensures that all identifiers seen in any of the input sequences are sent, and is performed by consuming all input sequences before flushing the joined results. An outer join consumes all input sequences but only writes data joined from the last input in the sequence, similar to a left or right outer join. With an outer join if an identifier appears multiple times within the final sequence input it will be flushed each time it appears. full-outter and outter have been deprecated in favour of full-outer and outer.

Type: string

Default: "none"

Options: none , full-outer , outer , full-outter , outter .

sharded_join.id_path

A dot path that points to a common field within messages of each fragmented data set and can be used to join them. Messages that are not structured or are missing this field will be dropped. This field must be set in order to enable joins.

Type: string

Default: ""

sharded_join.iterations

The total number of iterations (shards), increasing this number will increase the overall time taken to process the data, but reduces the memory used in the process. The real memory usage required is significantly higher than the real size of the data and therefore the number of iterations should be at least an order of magnitude higher than the available memory divided by the overall size of the dataset.

Type: int

Default: 1

sharded_join.merge_strategy

The chosen strategy to use when a data join would otherwise result in a collision of field values. The strategy array means non-array colliding values are placed into an array and colliding arrays are merged. The strategy replace replaces old values with new values. The strategy keep keeps the old value.

Type: string

Default: "array"

Options: array , replace , keep .

inputs

An array of inputs to read from sequentially.

Type: array