Custom Metrics

You can’t build cool graphs without metrics, and Redpanda Connect emits many. However, occasionally you might want to also emit custom metrics that track data extracted from messages being processed. In this cookbook we’ll explore how to achieve this by configuring Redpanda Connect to pull download stats from Github, DockerHub and Homebrew and emit them as gauges.

The basics

Firstly, we need to target an API so let’s start with the nice and simple Homebrew API, which we’ll poll every 60 seconds.

We can either do it with an http_client input and a rate limit that restricts us to one request per 60 seconds, or we can use a generate input to generate a message every 60 seconds that triggers an http processor:

  • Processor

  • Input

input:
  generate:
    interval: 60s
    mapping: root = ""

pipeline:
  processors:
    - http:
        url: https://formulae.brew.sh/api/formula/benthos.json
        verb: GET
input:
  http_client:
    url: https://formulae.brew.sh/api/formula/benthos.json
    verb: GET
    rate_limit: brewlimit

rate_limit_resources:
  - label: brewlimit
    local:
      count: 1
      interval: 60s

For this cookbook we’ll continue with the processor option as it makes it easier to deploy it as a scheduled lambda function later on, which is how I’m currently doing it in real life.

The homebrew formula API gives us a JSON blob that looks like this (removing fields we’re not interested in, and with numbers inflated relative to my ego):

{
    "name":"benthos",
    "desc":"Stream processor for mundane tasks written in Go",
    "analytics":{"install":{"30d":{"benthos":78978979},"90d":{"benthos":253339124},"365d":{"benthos":681356871}}}
}

This format makes it fairly easy to emit the value of analytics.install.30d.benthos as a gauge with the metric processor:

http:
  address: 0.0.0.0:4195

input:
  generate:
    interval: 60s
    mapping: root = ""

pipeline:
  processors:
    - http:
        url: https://formulae.brew.sh/api/formula/benthos.json
        verb: GET

    - metric:
        type: gauge
        name: downloads
        labels:
          source: homebrew
        value: ${! json("analytics.install.30d.benthos") }

    - mapping: root = deleted()

metrics:
  mapping: if this != "downloads" { deleted() }
  prometheus: {}

With the above config we have selected the prometheus metrics type, which allows us to use Prometheus to scrape metrics from Redpanda Connect by polling its HTTP API at the url http://localhost:4195/stats.

We have also specified a [path_mapping][metrics.prometheus.path_mapping] that deletes any internal metrics usually emitted by Redpanda Connect by filtering on our custom metric name.

Finally, there’s also a mapping processor added to the end of our pipeline that deletes all messages since we’re not interested in sending the raw data anywhere after this point anyway.

While running this config you can verify that our custom metric is emitted with curl:

curl -s http://localhost:4195/stats | grep downloads

Giving something like:

# HELP benthos_downloads {page-component-title} Gauge metric
# TYPE benthos_downloads gauge
benthos_downloads{source="homebrew"} 78978979

Easy! The DockerHub API is also pretty simple, and adding it to our pipeline is just:

  • Diff

  • Full Config

           source: homebrew
         value: ${! json("analytics.install.30d.benthos") }

+    - mapping: root = ""
+
+    - http:
+        url: http://docker.redpanda.com/redpandadata/connect
+        verb: GET
+        headers:
+          Content-Type: application/json
+
+    - metric:
+        type: gauge
+        name: downloads
+        labels:
+          source: dockerhub
+        value: ${! json("pull_count") }
+
     - mapping: root = deleted()
http:
  address: 0.0.0.0:4195

input:
  generate:
    interval: 60s
    mapping: root = ""

pipeline:
  processors:
    - http:
        url: https://formulae.brew.sh/api/formula/benthos.json
        verb: GET

    - metric:
        type: gauge
        name: downloads
        labels:
          source: homebrew
        value: ${! json("analytics.install.30d.benthos") }

    - mapping: root = ""

    - http:
        url: http://docker.redpanda.com/redpandadata/connect
        verb: GET
        headers:
          Content-Type: application/json

    - metric:
        type: gauge
        name: downloads
        labels:
          source: dockerhub
        value: ${! json("pull_count") }

    - mapping: root = deleted()

metrics:
  mapping: if this != "downloads" { deleted() }
  prometheus: {}

Advanced custom metrics example

So that’s the basics covered. Next, we’re going to target the Github releases API which gives a slightly more complex payload that looks something like this:

[
  {
    "tag_name": "X.XX.X",
    "assets":[
      {"name":"benthos-lambda_X.XX.X_linux_amd64.zip","download_count":543534545},
      {"name":"benthos_X.XX.X_darwin_amd64.tar.gz","download_count":43242342},
      {"name":"benthos_X.XX.X_freebsd_amd64.tar.gz","download_count":534565656},
      {"name":"benthos_X.XX.X_linux_amd64.tar.gz","download_count":743282474324}
    ]
  }
]

It’s an array of objects, one for each tagged release, with a field assets which is an array of objects representing each release asset, of which we want to emit a separate download gauge. In order to do this we’re going to use a mapping processor to remap the payload from Github into an array of objects of the following form:

[
  {"source":"github","dist":"lambda_linux_amd64","download_count":543534545,"version":"X.XX.X"},
  {"source":"github","dist":"darwin_amd64","download_count":43242342,"version":"X.XX.X"},
  {"source":"github","dist":"freebsd_amd64","download_count":534565656,"version":"X.XX.X"},
  {"source":"github","dist":"linux_amd64","download_count":743282474324,"version":"X.XX.X"}
]

Then we can use an unarchive processor with the format json_array to expand this array into N individual messages, one for each asset. Finally, we will follow up with a metric processor that dynamically sets labels following the fields source, dist and version so that we have a separate metrics series for each asset type for each tagged version.

A simple pipeline of these steps would look like this (please forgive the regexp):

http:
  address: 0.0.0.0:4195

input:
  generate:
    interval: 60s
    mapping: root = ""

pipeline:
  processors:
    - http:
        url: https://api.github.com/repos/redpanda-data/connect/releases
        verb: GET

    - mapping: |
        root = this.map_each(release -> release.assets.map_each(asset -> {
          "source":         "github",
          "dist":           asset.name.re_replace_all("^benthos-?((lambda_)|_)[0-9\\.]+(-rc[0-9]+)?_([^\\.]+).*", "$2$4"),
          "download_count": asset.download_count,
          "version":        release.tag_name.trim("v"),
        }).filter(asset -> asset.dist != "checksums")).flatten()

    - unarchive:
        format: json_array

    - metric:
        type: gauge
        name: downloads
        labels:
          dist: ${! json("dist") }
          source: ${! json("source") }
        value: ${! json("download_count") }

    - mapping: root = deleted()

metrics:
  mapping: if this != "downloads" { deleted() }
  prometheus: {}

Finally, let’s combine all the custom metrics into one pipeline.

Combining into a workflow

The following config expands on the previous examples by configuring each API poll as a branch processor, which allows us to run them within a workflow processor that can execute all three branches in parallel.

The metric processors have also been combined into a single reusable resource by updating the other API calls to format their payloads into the same structure as our Github remap.

http:
  address: 0.0.0.0:4195

input:
  generate:
    interval: 60s
    mapping: root = {}

pipeline:
  processors:
    - workflow:
        meta_path: results
        order: [ [ dockerhub, github, homebrew ] ]

processor_resources:
  - label: dockerhub
    branch:
      request_map: 'root = ""'
      processors:
        - try:
          - http:
              url: http://docker.redpanda.com/redpandadata/connect
              verb: GET
              headers:
                Content-Type: application/json
          - mapping: |
              root.source = "docker"
              root.dist = "docker"
              root.download_count = this.pull_count
              root.version = "all"
          - resource: metric_gauge

  - label: github
    branch:
      request_map: 'root = ""'
      processors:
        - try:
          - http:
              url: https://api.github.com/repos/redpanda-data/connect/releases
              verb: GET
          - mapping: |
              root = this.map_each(release -> release.assets.map_each(asset -> {
                "source":         "github",
                "dist":           asset.name.re_replace_all("^benthos-?((lambda_)|_)[0-9\\.]+(-rc[0-9]+)?_([^\\.]+).*", "$2$4"),
                "download_count": asset.download_count,
                "version":        release.tag_name.trim("v"),
              }).filter(asset -> asset.dist != "checksums")).flatten()
          - unarchive:
              format: json_array
          - resource: metric_gauge
          - mapping: 'root = if batch_index() != 0 { deleted() }'

  - label: homebrew
    branch:
      request_map: 'root = ""'
      processors:
        - try:
          - http:
              url: https://formulae.brew.sh/api/formula/benthos.json
              verb: GET
          - mapping: |
              root.source = "homebrew"
              root.dist = "homebrew"
              root.download_count = this.analytics.install.30d.benthos
              root.version = "all"
          - resource: metric_gauge

  - label: metric_gauge
    metric:
      type: gauge
      name: downloads
      labels:
        dist: ${! json("dist") }
        source: ${! json("source") }
        version: ${! json("version") }
      value: ${! json("download_count") }

metrics:
  mapping: if this != "downloads" { deleted() }
  prometheus: {}