Manage Topics with the Redpanda Operator

The Redpanda Operator allows you to declaratively create and manage Kafka topics using Topic custom resources (resources) in Kubernetes. Each Topic resource is mapped to a topic in your Redpanda cluster. The topic controller, a component of the Redpanda Operator, keeps the corresponding Kafka topic in sync with the Topic resource. This resource allows you to create topics as part of a Redpanda deployment.

Prerequisites

You must have the following:

  • Kubernetes cluster: Ensure you have a running Kubernetes cluster, either locally, such as with minikube or kind, or remotely.

  • Kubectl: Ensure you have the kubectl command-line tool installed and configured to communicate with your cluster.

  • Redpanda: Ensure you have the Redpanda Operator and a Redpanda resource deployed in your Kubernetes cluster.

Limitations

You cannot create access control lists (ACLs) directly in the Topic resource. To create ACLs for your topics, you must use rpk or another Kafka client. For details about ACLs, see Redpanda Authorization Mechanisms.

Create a topic

You can create a topic using a Topic resource:

apiVersion: cluster.redpanda.com/v1alpha1
kind: Topic
metadata:
  name:
spec:
  kafkaApiSpec:
    brokers: []
    tls: {}
    sasl: {}
  partitions:
  replicationFactor:
  additionalConfig: {}
  metricsNamespace:
  overwriteTopicName:
  interval:
  • metadata.name (required): The name of the Topic resource. If the overwriteTopicName property is not set, the name of the Topic resource is also given to the topic in Redpanda.

    Valid names must consist of lowercase alphanumeric characters, hyphens (-), or periods (.). Names cannot start or end with a non-alphanumeric character. Underscores (_) are not allowed. For example, chat-room is a valid name, whereas chat_room is not. To use other characters such as underscores in your topic names, use the overwriteTopicName property.

  • spec.kafkaApiSpec (required): Configuration details for connecting to Redpanda brokers.

  • spec.partitions: The number of topic shards distributed across the brokers in a Redpanda cluster. This value cannot be decreased post-creation. Overrides the default cluster property default_topic_partitions.

  • spec.replicationFactor: Specifies the number of topic replicas. The value must be an odd number. Overrides the default cluster property default_topic_replications.

  • spec.additionalConfig: A map of any topic-specific configuration options. See Topic Configuration Properties.

  • spec.metricsNamespace: The fully-qualified name of the topic metrics for use in multi-operator environments. Defaults to redpanda-operator.

  • spec.overwriteTopicName: Overwrites the topic name in metadata.name.

  • spec.interval: Sets the reconciliation interval for the topic controller. Default is 3 seconds (3s).

The default settings are best suited to a one-broker cluster in a development environment. To learn how to modify the default values in the configuration file, see Configure Cluster Properties. Even if you set default values that work for most topics, you may still want to change some properties for a specific topic.

Example Topic resource

This example creates a topic called chat-room. The kafkaApiSpec property configures the connection to a Redpanda cluster using the defaults in the Redpanda Helm chart.

example-topic.yaml
apiVersion: cluster.redpanda.com/v1alpha1
kind: Topic
metadata:
  name: chat-room
spec:
  partitions: 3
  replicationFactor: 3
  additionalConfig:
    cleanup.policy: "compact"
  kafkaApiSpec:
    brokers:
      - "redpanda-0.redpanda.<namespace>.svc.cluster.local:9093"
      - "redpanda-1.redpanda.<namespace>.svc.cluster.local:9093"
      - "redpanda-2.redpanda.<namespace>.svc.cluster.local:9093"
    tls:
      caCertSecretRef:
        name: "redpanda-default-cert"
        key: "ca.crt"
kubectl apply -f example-topic.yaml --namespace <namespace>

Replace <namespace> with the namespace in which you deployed Redpanda.

Choose the number of partitions

A partition acts as a log file where topic data is written. Dividing topics into partitions allows producers to write messages in parallel and consumers to read messages in parallel. The higher the number of partitions, the greater the throughput.

As a general rule, select a number of partitions that corresponds to the maximum number of consumers in any consumer group that will consume the data.

Choose the replication factor

Replicas are copies of partitions that are distributed across different brokers, so if one broker goes down, other brokers still have a copy of the data. The default replication factor in the cluster configuration is set to 1.

By choosing a replication factor greater than 1, you ensure that each partition has a copy of its data on at least one other broker. One replica acts as the leader, and the other replicas are followers.

The replication factor must be an odd number. Redpanda Data recommends a replication factor of 3 for most use cases.

Choose the cleanup policy

The cleanup policy determines how to manage the partition log files when they reach a certain size:

  • delete deletes data based on age or log size.

  • compact compacts the data by only keeping the latest values for each KEY.

  • compact,delete combines both methods.

If the cleanup policy is delete or compact,delete you can Delete records from a topic.

Configure write caching

Write caching is a relaxed mode of acks=all that provides better performance at the expense of durability. It acknowledges a message as soon as it is received and acknowledged on a majority of brokers, without waiting for it to be written to disk. This provides lower latency while still ensuring that a majority of brokers acknowledge the write.

Write caching applies to user topics. It does not apply to transactions or consumer offsets. Transactions and offset commits are always fsynced.

For clusters in development mode, write caching is enabled by default. For clusters in production mode, it is disabled by default.

Only enable write caching on workloads that can tolerate some data loss in the case of multiple, simultaneous broker failures. Leaving write caching disabled safeguards your data against complete data center or availability zone failures.

Configure at cluster level

To enable write caching by default in all user topics, set the cluster-level property write_caching_default:

  • Helm + Operator

  • Helm

redpanda-cluster.yaml
apiVersion: cluster.redpanda.com/v1alpha1
kind: Redpanda
metadata:
  name: redpanda
spec:
  chartRef: {}
  clusterSpec:
    config:
      cluster:
        write_caching_default: true
kubectl apply -f redpanda-cluster.yaml --namespace <namespace>
  • --values

  • --set

write-caching.yaml
config:
  cluster:
    write_caching_default: true
helm upgrade --install redpanda redpanda/redpanda --namespace <namespace> --create-namespace \
  --values write-caching.yaml --reuse-values
helm upgrade --install redpanda redpanda/redpanda --namespace <namespace> --create-namespace \
  --set config.cluster.write_caching_default=true

With write_caching enabled at the cluster level, Redpanda fsyncs to disk according to raft_replica_max_pending_flush_bytes and raft_replica_max_flush_delay_ms, whichever is reached first.

Configure at topic level

To override the cluster-level setting at the topic level, set the topic-level property write.caching:

example-topic.yaml
apiVersion: cluster.redpanda.com/v1alpha1
kind: Topic
metadata:
  name: chat-room
spec:
  partitions: 3
  replicationFactor: 3
  additionalConfig:
    write.caching: true
  kafkaApiSpec:
    brokers:
      - "redpanda-0.redpanda.<namespace>.svc.cluster.local:9093"
      - "redpanda-1.redpanda.<namespace>.svc.cluster.local:9093"
      - "redpanda-2.redpanda.<namespace>.svc.cluster.local:9093"
    tls:
      caCertSecretRef:
        name: "redpanda-default-cert"
        key: "ca.crt"

With write.caching enabled at the topic level, Redpanda fsyncs to disk according to flush.ms and flush.bytes, whichever is reached first.

Verify a topic

After deploying a Topic resource, verify that the Redpanda Operator reconciled it.

kubectl logs -l app.kubernetes.io/name=operator -c manager --namespace <namespace>
Example output
{
  "level":"info",
  "ts":"2023-09-25T16:20:09.538Z",
  "logger":"TopicReconciler.Reconcile",
  "msg":"Starting reconcile loop",
  "controller":"topic",
  "controllerGroup":"cluster.redpanda.com",
  "controllerKind":"Topic",
  "Topic":
  {
    "name":"chat-room",
    "namespace":"<namespace>"
  },
  "namespace":"<namespace>",
  "name":"chat-room",
  "reconcileID":"c0cf9abc-a553-48b7-9b6e-2de3cdfb4432"
}
{
  "level":"info",
  "ts":"2023-09-25T16:20:09.581Z",
  "logger":"TopicReconciler.Reconcile",
  "msg":"reconciliation finished in 43.436125ms, next run in 3s",
  "controller":"topic",
  "controllerGroup":"cluster.redpanda.com",
  "controllerKind":"Topic",
  "Topic":
  {
    "name":"chat-room",
    "namespace":"<namespace>"
  },
  "namespace":"<namespace>",
  "name":"chat-room",
  "reconcileID":"c0cf9abc-a553-48b7-9b6e-2de3cdfb4432",
  "result":
  {
    "Requeue":false,
    "RequeueAfter":3000000000
  }
}

Then, verify that the topic now exists in your Redpanda cluster:

kubectl --namespace <namespace> exec -ti <pod-name> -c <container-name> -- \
  rpk topic list

Example output:

NAME          PARTITIONS  REPLICAS
chat-room     3           3

View topic configuration

To view the configuration for all Topic resources:

kubectl get topic -o yaml --namespace <namespace>

You should see a list of all Topic resources and their configurations.

Example output
apiVersion: v1
items:
- apiVersion: cluster.redpanda.com/v1alpha1
  kind: Topic
  metadata:
    annotations:
      kubectl.kubernetes.io/last-applied-configuration: |
        {"apiVersion":"cluster.redpanda.com/v1alpha1","kind":"Topic","metadata":{"annotations":{},"name":"chat-room","namespace":"redpanda"},"spec":{"additionalConfig":{"cleanup.policy":"compact"},"kafkaApiSpec":{"brokers":["redpanda-0.redpanda.redpanda.svc.cluster.local:9093","redpanda-1.redpanda.redpanda.svc.cluster.local:9093","redpanda-2.redpanda.redpanda.svc.cluster.local:9093"],"tls":{"caCertSecretRef":{"key":"ca.crt","name":"redpanda-default-cert"}}},"partitions":3,"replicationFactor":3}}
    creationTimestamp: "2023-10-04T10:51:20Z"
    finalizers:
    - operator.redpanda.com/finalizer
    generation: 2
    name: chat-room
    namespace: redpanda
    resourceVersion: "5262"
    uid: a26e5a79-cb04-48cb-a2d1-4be9f7f3168f
  spec:
    additionalConfig:
      cleanup.policy: compact
    interval: 3s
    kafkaApiSpec:
      brokers:
      - redpanda-0.redpanda.redpanda.svc.cluster.local:9093
      - redpanda-1.redpanda.redpanda.svc.cluster.local:9093
      - redpanda-2.redpanda.redpanda.svc.cluster.local:9093
      tls:
        caCertSecretRef:
          key: ca.crt
          name: redpanda-default-cert
    partitions: 3
    replicationFactor: 3
  status:
    conditions:
    - lastTransitionTime: "2023-10-04T10:54:05Z"
      message: Topic reconciliation succeeded
      observedGeneration: 2
      reason: Succeeded
      status: "True"
      type: Ready
    observedGeneration: 2
    topicConfiguration:
    - configType: STRING
      isDefault: false
      isSensitive: false
      name: compression.type
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: producer
    - configType: STRING
      isDefault: false
      isSensitive: false
      name: cleanup.policy
      readOnly: false
      source: DYNAMIC_TOPIC_CONFIG
      unknownTags: {}
      value: compact
    - configType: LONG
      isDefault: false
      isSensitive: false
      name: segment.bytes
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "67108864"
    - configType: LONG
      isDefault: false
      isSensitive: false
      name: retention.ms
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "604800000"
    - configType: LONG
      isDefault: false
      isSensitive: false
      name: retention.bytes
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "-1"
    - configType: STRING
      isDefault: false
      isSensitive: false
      name: message.timestamp.type
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: CreateTime
    - configType: INT
      isDefault: false
      isSensitive: false
      name: max.message.bytes
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "1048576"
    - configType: BOOLEAN
      isDefault: false
      isSensitive: false
      name: redpanda.remote.read
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "false"
    - configType: BOOLEAN
      isDefault: false
      isSensitive: false
      name: redpanda.remote.write
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "false"
    - configType: LONG
      isDefault: false
      isSensitive: false
      name: retention.local.target.bytes
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "-1"
    - configType: LONG
      isDefault: false
      isSensitive: false
      name: retention.local.target.ms
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "86400000"
    - configSynonyms:
      - name: redpanda.remote.delete
        source: DEFAULT_CONFIG
        value: "true"
      configType: BOOLEAN
      isDefault: false
      isSensitive: false
      name: redpanda.remote.delete
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "true"
    - configType: LONG
      isDefault: false
      isSensitive: false
      name: segment.ms
      readOnly: false
      source: DEFAULT_CONFIG
      unknownTags: {}
      value: "1209600000"
kind: List
metadata:
  resourceVersion: ""

To see only the topic configuration as reported by Redpanda:

kubectl get topic --namespace <namespace> -o go-template='
{{- printf "%-30s %-30s %-30s\n" "KEY" "VALUE" "SOURCE" -}}
{{- range .items -}}
{{- range .status.topicConfiguration -}}
{{- printf "%-30s %-30s %-30s\n" .name .value .source -}}
{{- end -}}
{{- end -}}
'
Example output
KEY                            VALUE                          SOURCE
compression.type               producer                       DEFAULT_CONFIG
cleanup.policy                 compact                        DYNAMIC_TOPIC_CONFIG
segment.bytes                  67108864                       DEFAULT_CONFIG
retention.ms                   604800000                      DEFAULT_CONFIG
retention.bytes                -1                             DEFAULT_CONFIG
message.timestamp.type         CreateTime                     DEFAULT_CONFIG
max.message.bytes              1048576                        DEFAULT_CONFIG
redpanda.remote.read           false                          DEFAULT_CONFIG
redpanda.remote.write          false                          DEFAULT_CONFIG
retention.local.target.bytes   -1                             DEFAULT_CONFIG
retention.local.target.ms      86400000                       DEFAULT_CONFIG
redpanda.remote.delete         true                           DEFAULT_CONFIG
segment.ms                     1209600000                     DEFAULT_CONFIG

Update a topic

To update a topic, edit the Topic resource configuration, and apply the changes.

Do not use rpk or any other Kafka clients to edit topics that you created using the Topic resource. The topic controller monitors the Redpanda cluster for changes to topics. If you use a Kafka client to edit the topic, the topic controller detects those changes as a drift from the desired state and attempts to reconcile those changes by reverting them to match the Topic resource’s state.

The following example changes the cleanup policy for the chat-room topic:

example-topic.yaml
apiVersion: cluster.redpanda.com/v1alpha1
kind: Topic
metadata:
  name: chat-room
spec:
  partitions: 3
  replicationFactor: 3
  additionalConfig:
    cleanup.policy: "delete"
  kafkaApiSpec:
    brokers:
      - "redpanda-0.redpanda.<namespace>.svc.cluster.local:9093"
      - "redpanda-1.redpanda.<namespace>.svc.cluster.local:9093"
      - "redpanda-2.redpanda.<namespace>.svc.cluster.local:9093"
    tls:
      caCertSecretRef:
        name: "redpanda-default-cert"
        key: "ca.crt"
kubectl apply -f example-topic.yaml --namespace <namespace>

To set a property back to its default value, remove it from the Redpanda resource.

Delete a topic

To delete a topic, delete the Topic resource.

For example:

kubectl delete -f example-topic.yaml --namespace <namespace>

When a topic is deleted, its underlying data is deleted, too.

If you delete the Kafka topic directly using a client such as rpk, the topic controller recreates an empty topic with the same name.

Delete records from a topic

Redpanda lets you delete data from the beginning of a partition up to a specific event, also known as offset. The offset represents the true creation time of the event, not the time when it was stored by Redpanda. Deleting records frees up disk space, which is especially helpful if your producers are pushing more data than you anticipated in your retention plan. Do this when you know that all consumers have read up to that given offset, and the data is no longer needed.

There are different ways to delete records from a topic, including using the rpk topic trim-prefix command or using the DeleteRecords Kafka API with Kafka clients.

  • To delete records, cleanup.policy must be set to delete or compact,delete.

  • Object storage is deleted asynchronously. After messages are deleted, the partition’s start offset will have advanced, but garbage collection of deleted segments may not be complete.

  • Similar to Kafka, after deleting records, local storage and object storage may still contain data for deleted offsets. (Redpanda does not truncate segments. Instead, it bumps the start offset, then it attempts to delete as many whole segments as possible.) Data before the new start offset is not visible to clients but could be read by someone with access to the local disk of a Redpanda node.

Cluster-wide topic configurations

Topics provide a way to organize events in a data streaming platform.

When you create a topic, the default cluster-level topic configurations are applied using the cluster configuration file, unless you specify different configurations. The following table shows the default cluster-level properties and their equivalent topic-level properties:

Cluster property Default Topic property

log_cleanup_policy

delete

cleanup.policy

retention_bytes

null (no limit)

retention.bytes

log_retention_ms

604800000 ms (1 week)

retention.ms

log_segment_ms

null (no limit)

segment.ms

log_segment_size

134217728 bytes (128 MiB)

segment.bytes

log_compression_type

producer

compression.type

log_message_timestamp_type

CreateTime

message.timestamp.type

kafka_batch_max_bytes

1048576 bytes (1 MiB)

max.message.bytes

write_caching_default

false

write.caching

These default settings are best suited to a one-broker cluster in a development environment. To learn how to modify the default cluster-wide configurations, see Configure Cluster Properties. Even if you set default values that work for most topics, you may still want to change some properties for a specific topic.

For details about topic properties, see Topic Configuration Properties.

Next steps

Combine SASL authentication with authorization to control which users have permissions to interact with your topics.