Cloud

Psycopg2

Psycopg2 is a PostgreSQL adapter for Python. You can use it to execute SQL queries against Redpanda SQL from Python. Psycopg2 is designed for multi-threaded applications that create and destroy many cursors or run a high number of concurrent INSERT or UPDATE operations.

Prerequisites

Before you begin, make sure you have the following installed:

  • Python (preferably with a virtual environment)

  • The Psycopg2 library

Establish a connection

To connect to Redpanda SQL using Psycopg2, use the psycopg2.connect() function. It creates a new database session and returns a connection object.

  • Using keyword arguments:

    conn = psycopg2.connect(dbname="test", user="postgres", password="secret")

The following parameters can be used with the psycopg2.connect() function:

  • dbname: Your database connection name.

  • user: Username to authenticate with.

  • password: Password to authenticate with.

  • host: localhost or 127.0.0.1.

  • port: Default for Redpanda SQL is 5432.

Example usage

The following example connects to Redpanda SQL using keyword arguments:

import logging
import psycopg2

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

try:
    conn = psycopg2.connect(
        dbname="<database>",
        user="<username>",
        password="<password>",
        host="<host>",
        port="5432"
    )
    logging.info("Connection to Redpanda SQL database established successfully!")
...

Redpanda SQL does not support multi-threaded transactions as provided by Psycopg2. Avoid sharing connections and transactions across threads.

Now, define a table, insert data into it, and query the inserted data.

...
    # SQL statements to execute
    statements = [
        # Drop the film table if it exists
        "DROP TABLE IF EXISTS film;",
        # Create the film table
        "CREATE TABLE film (title text NOT NULL, rating text,length int);",
        # Insert records into the film table
        "INSERT INTO film(title, length, rating) VALUES \
                ('ATTRACTION NEWTON', 83, 'PG-13'), \
                ('CHRISTMAS MOONSHINE', 150, 'NC-17'), \
                ('DANGEROUS UPTOWN', 121, 'PG'), \
                ('KILL BROTHERHOOD', 54, 'G'), \
                ('HALLOWEEN NUTS', 47, 'PG-13'), \
                ('HOURS RAGE', 122, 'NC-17'), \
                ('PIANIST OUTFIELD', 136, 'NC-17'), \
                ('PICKUP DRIVING', 77, 'G'), \
                ('INDEPENDENCE HOTEL', 157, 'NC-17'), \
                ('PRIVATE DROP', 106, 'PG'), \
                ('SAINTS BRIDE', 125, 'G'), \
                ('FOREVER CANDIDATE', 131, 'NC-17'), \
                ('MILLION ACE', 142, 'PG-13'), \
                ('SLEEPY JAPANESE', 137, 'PG'), \
                ('WRATH MILE', 176, 'NC-17'), \
                ('YOUTH KICK', 179, 'NC-17'), \
                ('CLOCKWORK PARADISE', 143, 'PG-13');",
        # Select all records from the film table
        "SELECT * FROM film;"
        ]

    # Create a cursor object to execute SQL queries
    cur = conn.cursor()

    # Execute each SQL statement in the list
    for statement in statements:
        try:
            cur.execute(statement)
            if statement.startswith("SELECT"):
                    # Fetch and print results from SELECT statement
                    outputs = cur.fetchall()
                    for output in outputs:
                        logging.info(f"Query Result: {output}")
            else:
                conn.commit() # Commit changes after each non-SELECT statement
        except psycopg2.Error as exec_error:
            conn.rollback()
            logging.warning(f"Transaction rolled back due to an error")

except psycopg2.Error as e:
    logging.info(f"Error during database operation: {e}")
    if conn:
        conn.rollback() # Rollback the transaction in case of an error
        logging.warning(f"Transaction rolled back due to error.")

finally:
    # Close the cursor and connection
    cur.close()
    conn.close()
    print(f"Redpanda SQL connection closed.")

Supported features and limitations

While Redpanda SQL supports many core SQL features, some limitations exist, in particular related to transaction handling and certain advanced Psycopg2 functionalities. This section clarifies which functions are fully supported, which have limited support, and which are currently unavailable.

Supported features

  • Basic connection and cursor management

    You can use psycopg2.connect() to establish connections and create cursors with conn.cursor() to execute SQL commands.

  • SQL execution

    Standard SQL commands such as SELECT are supported and can be executed via cursor.execute(). Redpanda SQL operates in read-only mode, so INSERT, UPDATE, DELETE, and CREATE TABLE are not available.

  • Transaction control

    Basic transaction commands like conn.commit() and conn.rollback() are available but Redpanda SQL currently has limited transaction support and does not support multi-threaded transactions.

  • Error handling

    Psycopg2 exceptions such as psycopg2.errors can be caught and handled normally.

Limitations and unsupported features

  • Multi-threaded transactions

    Redpanda SQL does not support multi-threaded transactions as Psycopg2 normally provides. Avoid sharing connections or cursors across threads when performing transactional operations.

  • Advanced Psycopg2 features

    Features such as server-side cursors, asynchronous communication, notifications, prepared statements and pipeline mode may not be fully supported due to Redpanda SQL’s current architecture.

Redpanda SQL - Psycopg2 feature compatibility table

Psycopg2 Feature Redpanda SQL Support Notes

psycopg2.connect()

Yes

Standard connection parameters supported

connection.server_version()

Yes

Returns server version

connection.protocol_version()

Yes

Returns protocol version

connection.isolation_level()

Limited

-

connection.set_client_encoding(encoding)

Yes

-

connection.encoding()

Yes

-

connection.status()

Yes

-

connection.close()

Yes

-

connection.closed()

Yes

-

async_connection.poll()

No

Async connections not supported

async_connection.cancel()

No

Async connections not supported

async_connection.isexecuting()

No

Async connections not supported

cursor.execute(operation, parameters)

Yes

Core SQL execution supported

cursor.executemany(operation, seq_of_parameters)

Yes

-

cursor.batch_execute(operation, parameters_list)

No

-

cursor.copy_from(file, table, sep, NULL, size, columns)

Limited

-

cursor.copy_to(file, table, sep, NULL, columns)

Limited

-

cursor.fetchall()

Yes

-

cursor.fetchmany(size)

Yes

-

cursor.fetchone()

Yes

-

cursor.statusmessage()

Yes

-

cursor.description()

Yes

-

cursor.rowcount()

Yes

-

cursor.close()

Yes

-

cursor.closed()

Yes

-