Setting Up Redpanda Connect (Benthos) for MQTT/AMQP Stream Processor with Docker

Setting Up Redpanda Connect (Benthos) for MQTT/AMQP Stream Processor with Docker
Photo by Bram Gerinckx / Unsplash

Redpanda Connect is a lightweight stream processor. It reads messages from an input, processes or transforms them, then writes them to an output. The whole workflow is written as YAML.

A simple pipeline looks like this:

RabbitMQ -> Redpanda Connect -> PostgreSQL

Why Use Redpanda Connect?

Redpanda Connect is useful when you need a reliable data pipeline without building a full custom application.

It is good for:

  • Moving data from queues into databases
  • Cleaning or standardizing JSON payloads
  • Splitting one message into multiple rows
  • Adding retries when a database is temporarily unavailable
  • Rate limiting writes so PostgreSQL does not get overloaded
  • Running multiple independent streams in one Docker container

In my case, I used it to replace several n8n workflows that consumed RabbitMQ telemetry data and inserted it into PostgreSQL.

Basic Concepts

A Redpanda Connect stream usually has three main parts:

input:
  # where data comes from

pipeline:
  processors:
    # how data is parsed, cleaned, mapped, filtered, or split

output:
  # where data goes

For example:

input     = RabbitMQ
processor = JSON mapping
output    = PostgreSQL insert

Docker Compose Setup

Here is a simple Docker Compose setup for Redpanda Connect in streams mode.

services:
  redpanda-connect:
    image: docker.redpanda.com/redpandadata/connect:latest
    restart: always
    ports:
      - "4195:4195"
    volumes:
      - ./streams:/streams
      - ./resources:/resources
    command:
      - streams
      - -r
      - /resources/*.yaml
      - /streams/*.yaml

This expects two folders:

.
├── docker-compose.yml
├── streams
│   └── 01_sensor_readings.yaml
└── resources
    └── rate_limits.yaml

The /streams folder contains your actual workflows.

The /resources folder contains shared resources, such as rate limits.

Adding a Rate Limit

Rate limiting is important when PostgreSQL is catching up after downtime. Without it, Redpanda Connect may drain RabbitMQ too quickly and overload the database.

Create:

resources/rate_limits.yaml
rate_limit_resources:
  - label: postgres_insert_limit
    local:
      count: 50
      interval: 1s

This allows around 50 messages per second for any stream that uses this limiter.

Example PostgreSQL Table

For a simple example, imagine we want to store sensor readings.

CREATE TABLE sensor_readings (
  id BIGSERIAL PRIMARY KEY,
  sensor_id TEXT NOT NULL,
  reading_value NUMERIC(12,4) NOT NULL,
  reading_timestamp TIMESTAMPTZ NOT NULL
);

Example RabbitMQ message:

{
  "sensor_id": "mvps4b",
  "value": 2813,
  "timestamp": "2026-05-14T03:45:40.776Z"
}

Example Stream: RabbitMQ to PostgreSQL

Create:

streams/01_sensor_readings.yaml
input:
  amqp_0_9:
    urls:
      - amqp://user:password@rabbitmq-host:5672/
    queue: sensor_readings_queue
    queue_declare:
      enabled: true
      durable: true
    bindings_declare:
      - exchange: amq.topic
        key: sensors.readings.*
    prefetch_count: 1

pipeline:
  processors:
    - mapping: |
        root = if this.type() == "string" {
          this.parse_json().catch(this)
        } else {
          this
        }

    - mapping: |
        root.sensor_id = this.sensor_id
        root.reading_value = this.value.number()
        root.reading_timestamp = this.timestamp

    - mapping: |
        root = if this.sensor_id == null || this.reading_value == null || this.reading_timestamp == null {
          deleted()
        } else {
          this
        }

    - rate_limit:
        resource: postgres_insert_limit

output:
  retry:
    max_retries: 3
    backoff:
      initial_interval: 5s
      max_interval: 30s
    output:
      sql_insert:
        driver: postgres
        dsn: postgres://postgres:password@postgres-host:5432/my_database?sslmode=disable
        table: sensor_readings
        columns:
          - sensor_id
          - reading_value
          - reading_timestamp
        args_mapping: |
          root = [
            this.sensor_id,
            this.reading_value,
            this.reading_timestamp
          ]

Starting the Pipeline

Run:

docker compose up -d

Watch logs:

docker compose logs -f --tail=50 redpanda-connect

If everything is working, you should see messages like:

Input type amqp_0_9 is now active
Output type sql_insert is now active
Launching Redpanda Connect in streams mode

Important Lessons Learned

1. Do Not Put Resources Inside Stream Files

In streams mode, this should not go inside /streams/*.yaml:

rate_limit_resources:

Put it in /resources/*.yaml, then load it with:

command:
  - streams
  - -r
  - /resources/*.yaml
  - /streams/*.yaml

2. One RabbitMQ Queue Is Not Broadcast

If two workflows consume the same RabbitMQ queue, they compete for messages. One message goes to one consumer, not both.

If two pipelines need the same data, create two separate queues and bind both queues to the same exchange/routing key.

3. Match SQL Columns and Arguments Exactly

This must match:

columns:
  - sensor_id
  - reading_value
  - reading_timestamp

with:

args_mapping: |
  root = [
    this.sensor_id,
    this.reading_value,
    this.reading_timestamp
  ]

If the number of columns and values is different, PostgreSQL will reject the insert.

4. Rate Limit After Splitting Messages

If one RabbitMQ message contains many records, split it first, then rate limit.

    - unarchive:
        format: json_array

    - rate_limit:
        resource: postgres_insert_limit

That way the limit applies to actual PostgreSQL rows, not just RabbitMQ messages.

5. Watch for Dotted JSON Keys

If your JSON keys contain dots, such as:

{
  "mvps4d.smb12": {
    "value": 123
  }
}

avoid using path-style access. Use key_values() instead.

Final Thoughts

Redpanda Connect is a good middle ground between a visual automation tool and a full custom service. You still get readable workflows, but with better control over throughput, retries, parsing, and database writes. For high-volume data pipelines, especially RabbitMQ to PostgreSQL, Redpanda Connect is much more suitable.

References

Subscribe to Experiment Lab

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe