azure_cosmosdb

Creates or updates messages as JSON documents in Azure CosmosDB.

  • Common

  • Advanced

# Common config fields, showing default values
label: ""
azure_cosmosdb:
  endpoint: https://localhost:8081 # No default (optional)
  account_key: '!!!SECRET_SCRUBBED!!!' # No default (optional)
  connection_string: '!!!SECRET_SCRUBBED!!!' # No default (optional)
  database: testdb # No default (required)
  container: testcontainer # No default (required)
  partition_keys_map: root = "blobfish" # No default (required)
  operation: Create
  item_id: ${! json("id") } # No default (optional)
# All config fields, showing default values
label: ""
azure_cosmosdb:
  endpoint: https://localhost:8081 # No default (optional)
  account_key: '!!!SECRET_SCRUBBED!!!' # No default (optional)
  connection_string: '!!!SECRET_SCRUBBED!!!' # No default (optional)
  database: testdb # No default (required)
  container: testcontainer # No default (required)
  partition_keys_map: root = "blobfish" # No default (required)
  operation: Create
  patch_operations: [] # No default (optional)
  patch_condition: from c where not is_defined(c.blobfish) # No default (optional)
  auto_id: true
  item_id: ${! json("id") } # No default (optional)
  enable_content_response_on_write: true

When creating documents, each message must have the id property (case-sensitive) set (or use auto_id: true). It is the unique name that identifies the document, that is, no two documents share the same id within a logical partition. The id field must not exceed 255 characters. See details.

The partition_keys field must resolve to the same value(s) across the entire message batch.

Credentials

You can use one of the following authentication mechanisms:

  • Set the endpoint field and the account_key field

  • Set only the endpoint field to use DefaultAzureCredential

  • Set the connection_string field

Metadata

This component adds the following metadata fields to each message:

- activity_id
- request_charge

You can access these metadata fields using function interpolation.

Batching

CosmosDB limits the maximum batch size to 100 messages and the payload must not exceed 2MB (details here).

Examples

  • Patch documents

Query documents from a container and patch them.

input:
  azure_cosmosdb:
    endpoint: http://localhost:8080
    account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
    database: blobbase
    container: blobfish
    partition_keys_map: root = "AbyssalPlain"
    query: SELECT * FROM blobfish

  processors:
    - mapping: |
        root = ""
        meta habitat = json("habitat")
        meta id = this.id
    - azure_cosmosdb:
        endpoint: http://localhost:8080
        account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
        database: testdb
        container: blobfish
        partition_keys_map: root = json("habitat")
        item_id: ${! meta("id") }
        operation: Patch
        patch_operations:
          # Add a new /diet field
          - operation: Add
            path: /diet
            value_map: root = json("diet")
          # Remove the first location from the /locations array field
          - operation: Remove
            path: /locations/0
          # Add new location at the end of the /locations array field
          - operation: Add
            path: /locations/-
            value_map: root = "Challenger Deep"
        # Return the updated document
        enable_content_response_on_write: true

Fields

endpoint

CosmosDB endpoint.

Type: string

# Examples

endpoint: https://localhost:8081

account_key

Account key.

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

# Examples

account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==

connection_string

Connection string.

This field contains sensitive information. Review your cluster security before adding it to your configuration.

Type: string

# Examples

connection_string: AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;

database

Database.

Type: string

# Examples

database: testdb

container

Container.

Type: string

# Examples

container: testcontainer

partition_keys_map

A Bloblang mapping which should evaluate to a single partition key value or an array of partition key values of type string, integer or boolean. Currently, hierarchical partition keys are not supported so only one value may be provided.

Type: string

# Examples

partition_keys_map: root = "blobfish"

partition_keys_map: root = 41

partition_keys_map: root = true

partition_keys_map: root = null

partition_keys_map: root = json("blobfish").depth

operation

Operation.

Type: string

Default: "Create"

Option Summary

Create

Create operation.

Delete

Delete operation.

Patch

Patch operation.

Read

Read operation.

Replace

Replace operation.

Upsert

Upsert operation.

patch_operations

Patch operations to be performed when operation: Patch .

Type: array

patch_operations[].operation

Operation.

Type: string

Default: "Add"

Option Summary

Add

Add patch operation.

Increment

Increment patch operation.

Remove

Remove patch operation.

Replace

Replace patch operation.

Set

Set patch operation.

patch_operations[].path

Path.

Type: string

# Examples

path: /foo/bar/baz

patch_operations[].value_map

A Bloblang mapping which should evaluate to a value of any type that is supported by CosmosDB.

Type: string

# Examples

value_map: root = "blobfish"

value_map: root = 41

value_map: root = true

value_map: root = json("blobfish").depth

value_map: root = [1, 2, 3]

patch_condition

Patch operation condition. This field supports interpolation functions.

Type: string

# Examples

patch_condition: from c where not is_defined(c.blobfish)

auto_id

Automatically set the item id field to a random UUID v4. If the id field is already set, then it will not be overwritten. Setting this to false can improve performance, since the messages will not have to be parsed.

Type: bool

Default: true

item_id

ID of item to replace or delete. Only used by the Replace and Delete operations This field supports interpolation functions.

Type: string

# Examples

item_id: ${! json("id") }

enable_content_response_on_write

Enable content response on write operations. To save some bandwidth, set this to false if you don’t need to receive the updated message(s) from the server, in which case the processor will not modify the content of the messages which are fed into it. Applies to every operation except Read.

Type: bool

Default: true

CosmosDB emulator

If you wish to run the CosmosDB emulator that is referenced in the documentation here, the following Docker command should do the trick:

> docker run --rm -it -p 8081:8081 --name=cosmosdb -e AZURE_COSMOS_EMULATOR_PARTITION_COUNT=10 -e AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE=false mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator

Note: AZURE_COSMOS_EMULATOR_PARTITION_COUNT controls the number of partitions that will be supported by the emulator. The bigger the value, the longer it takes for the container to start up.

Additionally, instead of installing the container self-signed certificate which is exposed via https://localhost:8081/_explorer/emulator.pem, you can run mitmproxy like so:

> mitmproxy -k --mode "reverse:https://localhost:8081"

Then you can access the CosmosDB UI via http://localhost:8080/_explorer/index.html and use http://localhost:8080 as the CosmosDB endpoint.