cypher

Beta

Writes a batch of messages to any graph database that supports the Neo4j and Bolt URI schemes. For each incoming message, the connector can perform operations to store or delete data within the database using the Cypher query language.

Introduced in version 4.37.0.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  cypher:
    uri: neo4j://demo.neo4jlabs.com # No default (required)
    cypher: 'MERGE (p:Person {name: $name})' # No default (required)
    database_name: ""
    args_mapping: root.name = this.displayName # No default (optional)
    basic_auth:
      enabled: false
      username: ""
      password: ""
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
    max_in_flight: 64
# All configuration fields, showing default values
output:
  label: ""
  cypher:
    uri: neo4j://demo.neo4jlabs.com # No default (required)
    cypher: 'MERGE (p:Person {name: $name})' # No default (required)
    database_name: ""
    args_mapping: root.name = this.displayName # No default (optional)
    basic_auth:
      enabled: false
      username: ""
      password: ""
      realm: ""
    tls:
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 64

Examples

An example of how to write data to Neo4j Aura.

output:
  cypher:
    uri: neo4j+s://example.databases.neo4j.io
    cypher: |
      MERGE (product:Product {id: $id})
        ON CREATE SET product.name = $product,
                       product.title = $title,
                       product.description = $description,
    args_mapping: |
      root = {}
      root.id = this.product.id
      root.product = this.product.summary.name
      root.title = this.product.summary.displayName
      root.description = this.product.fullDescription
    basic_auth:
      enabled: true
      username: "${NEO4J_USER}"
      password: "${NEO4J_PASSWORD}"

Fields

uri

The connection URI for your graphing database. For more information, see Neo4j’s documentation.

Type: string

# Examples
uri: neo4j://demo.neo4jlabs.com
uri: neo4j+s://aura.databases.neo4j.io
uri: neo4j+ssc://self-signed.demo.neo4jlabs.com
uri: bolt://127.0.0.1:7687
uri: bolt+s://core.db.server:7687
uri: bolt+ssc://10.0.0.43

cypher

The cypher expression to execute against the graph database.

Type: string

# Examples
cypher: 'MERGE (p:Person {name: $name})'
cypher: |-
  MATCH (o:Organization {id: $orgId})
  MATCH (p:Person {name: $name})
  MERGE (p)-[:WORKS_FOR]->(o)

database_name

Set the target database against which expressions are evaluated.

Type: string

Default: ""

args_mapping

Mappings from incoming messages to the data, which are passed into the cypher expression as parameters. All mappings must be objects. By default, this field processes the entire payload.

Type: string

# Examples
args_mapping: root.name = this.displayName
args_mapping: 'root = {"orgId": this.org.id, "name": this.user.name}'

basic_auth

Configure basic authentication for requests to your graphing database.

Type: object

basic_auth.enabled

Whether to use basic authentication in requests.

Type: bool

Default: false

basic_auth.username

The username of the account credentials to authenticate as.

Type: string

Default: ""

basic_auth.password

The password of the account credentials to authenticate with.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

basic_auth.realm

The realm or process for authentication challenges.

Type: string

Default: ""

tls

Override system defaults with custom TLS settings.

Type: object

tls.skip_cert_verify

Whether to skip server-side certificate verification.

Type: bool

Default: false

tls.enable_renegotiation

Whether to allow the remote server to request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Type: bool

Default: false

Requires version 3.45.0 or newer

tls.root_cas

Specify a certificate authority to use (optional). This is a string that represents a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.certificate.

Type: string

Default: ""

# Examples
root_cas_file: ./root_cas.pem

tls.client_certs

A list of client certificates to use. For each certificate specify values for either the cert and key fields, or cert_file and key_file fields.

Type: array

Default: []

# Examples
client_certs:
  - cert: foo
    key: bar
client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

The plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].key

The plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

tls.client_certs[].cert_file

The path to the certificate to use.

Type: string

Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

The plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

The pbeWithMD5AndDES-CBC algorithm does not authenticate ciphertext, and is vulnerable to padding oracle attacks, which may allow an attacker to recover the plaintext password.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples
password: foo
password: ${KEY_PASSWORD}

batching

Configure a batching policy.

Type: object

# Examples
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The amount of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples
processors:
  - archive:
      format: concatenate
processors:
  - archive:
      format: lines
processors:
  - archive:
      format: json_array

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this value to improve throughput.

Type: int

Default: 64