rpk topic consume

Consume records from topics.

Consuming records reads from any amount of input topics, formats each record according to --format, and prints them to STDOUT. The output formatter understands a wide variety of formats.

The default output format --format json is a special format that outputs each record as JSON. There may be more single-word-no-escapes formats added later. Outside of these special formats, formatting follows the rules described below.

Format output

Formatting output is based on percent escapes and modifiers. Slashes can be
used for common escapes:

    \t \n \r \\ \xNN

prints tabs, newlines, carriage returns, slashes, or hex encoded characters.p

Percent encoding prints record fields, fetch partition fields, or extra values:

    %t    topic
    %T    topic length
    %k    key
    %K    key length
    %v    value
    %V    value length
    %h    begin the header specification
    %H    number of headers
    %p    partition
    %o    offset
    %e    leader epoch
    %d    timestamp (formatting described below)
    %a    record attributes (formatting described below)
    %x    producer id
    %y    producer epoch

    %[    partition log start offset
    %|    partition last stable offset
    %]    partition high watermark

    %%    percent sign
    %{    left brace
    %}    right brace

    %i    the number of records formatted

MODIFIERS

Text and numbers can be formatted in many different ways, and the default
format can be changed within brace modifiers. %v prints a value, while %v{hex}
prints the value hex encoded. %T prints the length of a topic in ascii, while
%T{big8} prints the length of the topic as an eight byte big endian.

All modifiers go within braces following a percent-escape.

NUMBERS

Formatting number values can have the following modifiers:

     ascii       print the number as ascii (default)

     hex64       sixteen hex characters
     hex32       eight hex characters
     hex16       four hex characters
     hex8        two hex characters
     hex4        one hex character

     big64       eight byte big endian number
     big32       four byte big endian number
     big16       two byte big endian number
     big8        alias for byte

     little64    eight byte little endian number
     little32    four byte little endian number
     little16    two byte little endian number
     little8     alias for byte

     byte        one byte number
     bool        "true" if the number is non-zero, "false" if the number is zero

All numbers are truncated as necessary per the modifier. Printing %V{byte} for
a length 256 value will print a single null, whereas printing %V{big8} would
print the bytes 1 and 0.

When writing number sizes, the size corresponds to the size of the raw values,
not the size of encoded values. "%T% t{hex}" for the topic "foo" will print
"3 666f6f", not "6 666f6f".

TIMESTAMPS

By default, the timestamp field is printed as a millisecond number value. In
addition to the number modifiers above, timestamps can be printed with either
Go formatting or strftime formatting:

    %d{go[2006-01-02T15:04:05Z07:00]}
    %d{strftime[%F]}

An arbitrary amount of brackets (or braces, or # symbols) can wrap your date
formatting:

    %d{strftime### [%F] ###}

The above will print " [YYYY-MM-DD] ", while the surrounding three # on each
side are used to wrap the formatting. Further details on Go time formatting can
be found at https://pkg.go.dev/time, while further details on strftime
formatting can be read by checking "man strftime".

ATTRIBUTES

Each record (or batch of records) has a set of possible attributes. Internally,
these are packed into bit flags. Printing an attribute requires first selecting
which attribute you want to print, and then optionally specifying how you want
it to be printed:

     %a{compression}
     %a{compression;number}
     %a{compression;big64}
     %a{compression;hex8}

Compression is by default printed as text ("none", "gzip", ...). Compression
can be printed as a number with ";number", where number is any number
formatting option described above. No compression is 0, gzip is 1, etc.

     %a{timestamp-type}
     %a{timestamp-type;big64}

The record's timestamp type is printed as -1 for very old records (before
timestamps existed), 0 for client generated timestamps, and 1 for broker
generated timestamps. Number formatting can be controlled with ";number".

     %a{transactional-bit}
     %a{transactional-bit;bool}

Prints 1 if the record a part of a transaction or 0 if it is not.
Number formatting can be controlled with ";number".

     %a{control-bit}
     %a{control-bit;bool}

Prints 1 if the record is a commit marker or 0 if it is not.
Number formatting can be controlled with ";number".

TEXT

Text fields without modifiers default to writing the raw bytes. Alternatively,
there are the following modifiers:

    %t{hex}
    %k{base64}
    %v{base64raw}
    %v{unpack[<bBhH>iIqQc.$]}

The hex modifier hex encodes the text, the base64 modifier base64 encodes the
text with standard encoding, and the base64raw modifier encodes the text with
raw standard encoding. The unpack modifier has a further internal
specification, similar to timestamps above:

    x    pad character (does not parse input)
    <    switch what follows to little endian
    >    switch what follows to big endian

    b    signed byte
    B    unsigned byte
    h    int16  ("half word")
    H    uint16 ("half word")
    i    int32
    I    uint32
    q    int64  ("quad word")
    Q    uint64 ("quad word")

    c    any character
    .    alias for c
    s    consume the rest of the input as a string
    $    match the end of the line (append error string if anything remains)

Unpacking text can allow translating binary input into readable output. If a
value is a big-endian uint32, %v will print the raw four bytes, while
%v{unpack[>I]} will print the number in as ascii. If unpacking exhausts the
input before something is unpacked fully, an error message is appended to the
output.

HEADERS

Headers are formatted with percent encoding inside of the modifier:

    %h{ %k=%v{hex} }

will print all headers with a space before the key and after the value, an
equals sign between the key and value, and with the value hex encoded. Header
formatting actually just parses the internal format as a record format, so all
of the above rules about %K, %V, text, and numbers apply.

EXAMPLES

A key and value, separated by a space and ending in newline:
-f '%k %v\n'
A key length as four big endian bytes, and the key as hex:
-f '%K{big32}%k{hex}'
A little endian uint32 and a string unpacked from a value:
-f '%v{unpack[is$]}'

OFFSETS

The --offset flag allows for specifying where to begin consuming, and
optionally, where to stop consuming. The literal words "start" and "end"
specify consuming from the start and the end.

    start     consume from the beginning
    end       consume from the end
    :end      consume until the current end
    +oo       consume oo after the current start offset
    -oo       consume oo before the current end offset
    oo        consume after an exact offset
    oo:       alias for oo
    :oo       consume until an exact offset
    o1:o2     consume from exact offset o1 until exact offset o2
    @t        consume starting from a given timestamp
    @t:       alias for @t
    @:t       consume until a given timestamp
    @t1:t2    consume from timestamp t1 until timestamp t2

There are a few options for timestamps, with each option being evaluated
until one succeeds:

    13 digits             parsed as a unix millisecond
    9 digits              parsed as a unix second
    YYYY-MM-DD            parsed as a day, UTC
    YYYY-MM-DDTHH:MM:SSZ  parsed as RFC3339, UTC; fractional seconds optional (.MMM)
    -dur                  duration ago; from now (as t1) or from t1 (as t2)
    dur                   for t2 in @t1:t2, relative duration from t1
    end                   for t2 in @t1:t2, the current end of the partition

Durations are parsed simply:

    3ms    three milliseconds
    10s    ten seconds
    9m     nine minutes
    1h     one hour
    1m3ms  one minute and three milliseconds

For example,

    -o @2022-02-14:1h   consume 1h of time on Valentine's Day 2022
    -o @-48h:-24h       consume from 2 days ago to 1 day ago
    -o @-1m:end         consume from 1m ago until now
    -o @:-1hr           consume from the start until an hour ago

Usage

rpk topic consume TOPICS... [flags]

Flags

Value Type Description

-b, --balancer

string

Group balancer to use if group consuming (range, roundrobin, sticky, cooperative-sticky) (default "cooperative-sticky").

--fetch-max-bytes

int32

Maximum amount of bytes per fetch request per broker (default 1048576).

--fetch-max-wait

duration

Maximum amount of time to wait when fetching from a broker before the broker replies (default 5s).

-f, --format

string

Output format (see --help for details) (default "json").

-g, --group

string

Group to use for consuming (incompatible with -p).

-h, --help

-

Help for consume.

--meta-only

-

Print all record info except the record value (for -f json).

-n, --num

int

Quit after consuming this number of records (0 is unbounded).

-o, --offset

string

Offset to consume from / to (start, end, 47, +2, -3) (default "start").

-p, --partitions

int32

int32Slice Comma delimited list of specific partitions to consume (default []).

--pretty-print

-

Pretty print each record over multiple lines (for -f json) (default true)

--print-control-records

-

Opt in to printing control records.

--read-committed

-

Opt in to reading only committed offsets.

-r, --regex

-

Parse topics as regex; consume any topic that matches any expression.

--brokers

strings

Comma-separated list of broker <ip>:<port> pairs (for example, ` --brokers '192.168.78.34:9092,192.168.78.35:9092,192.179.23.54:9092' ` ). Alternatively, you may set the REDPANDA_BROKERS environment variable with the comma-separated list of broker addresses.

--config

string

Redpanda config file, if not set the file will be searched for in the default locations.

--password

string

SASL password to be used for authentication.

--sasl-mechanism

string

The authentication mechanism to use. Supported values: SCRAM-SHA-256, SCRAM-SHA-512.

--tls-cert

string

The certificate to be used for TLS authentication with the broker.

--tls-enabled

-

Enable TLS for the Kafka API (not necessary if specifying custom certs).

--tls-key

string

The certificate key to be used for TLS authentication with the broker.

--tls-truststore

string

The truststore to be used for TLS communication with the broker.

--user

string

SASL user to be used for authentication.

-v, --verbose

-

Enable verbose logging (default false).