Setting Up Redpanda Connect (Benthos) for MQTT/AMQP Stream Processor with Docker
Redpanda Connect is a lightweight stream processor. It reads messages from an input, processes or transforms them, then writes them to an output. The whole workflow is written as YAML.
A simple pipeline looks like this:
RabbitMQ -> Redpanda Connect -> PostgreSQL
Why Use Redpanda Connect?
Redpanda Connect is useful when you need a reliable data pipeline without building a full custom application.
It is good for:
- Moving data from queues into databases
- Cleaning or standardizing JSON payloads
- Splitting one message into multiple rows
- Adding retries when a database is temporarily unavailable
- Rate limiting writes so PostgreSQL does not get overloaded
- Running multiple independent streams in one Docker container
In my case, I used it to replace several n8n workflows that consumed RabbitMQ telemetry data and inserted it into PostgreSQL.
Basic Concepts
A Redpanda Connect stream usually has three main parts:
input:
# where data comes from
pipeline:
processors:
# how data is parsed, cleaned, mapped, filtered, or split
output:
# where data goes
For example:
input = RabbitMQ
processor = JSON mapping
output = PostgreSQL insert
Docker Compose Setup
Here is a simple Docker Compose setup for Redpanda Connect in streams mode.
services:
redpanda-connect:
image: docker.redpanda.com/redpandadata/connect:latest
restart: always
ports:
- "4195:4195"
volumes:
- ./streams:/streams
- ./resources:/resources
command:
- streams
- -r
- /resources/*.yaml
- /streams/*.yaml
This expects two folders:
.
├── docker-compose.yml
├── streams
│ └── 01_sensor_readings.yaml
└── resources
└── rate_limits.yaml
The /streams folder contains your actual workflows.
The /resources folder contains shared resources, such as rate limits.
Adding a Rate Limit
Rate limiting is important when PostgreSQL is catching up after downtime. Without it, Redpanda Connect may drain RabbitMQ too quickly and overload the database.
Create:
resources/rate_limits.yaml
rate_limit_resources:
- label: postgres_insert_limit
local:
count: 50
interval: 1s
This allows around 50 messages per second for any stream that uses this limiter.
Example PostgreSQL Table
For a simple example, imagine we want to store sensor readings.
CREATE TABLE sensor_readings (
id BIGSERIAL PRIMARY KEY,
sensor_id TEXT NOT NULL,
reading_value NUMERIC(12,4) NOT NULL,
reading_timestamp TIMESTAMPTZ NOT NULL
);
Example RabbitMQ message:
{
"sensor_id": "mvps4b",
"value": 2813,
"timestamp": "2026-05-14T03:45:40.776Z"
}
Example Stream: RabbitMQ to PostgreSQL
Create:
streams/01_sensor_readings.yaml
input:
amqp_0_9:
urls:
- amqp://user:password@rabbitmq-host:5672/
queue: sensor_readings_queue
queue_declare:
enabled: true
durable: true
bindings_declare:
- exchange: amq.topic
key: sensors.readings.*
prefetch_count: 1
pipeline:
processors:
- mapping: |
root = if this.type() == "string" {
this.parse_json().catch(this)
} else {
this
}
- mapping: |
root.sensor_id = this.sensor_id
root.reading_value = this.value.number()
root.reading_timestamp = this.timestamp
- mapping: |
root = if this.sensor_id == null || this.reading_value == null || this.reading_timestamp == null {
deleted()
} else {
this
}
- rate_limit:
resource: postgres_insert_limit
output:
retry:
max_retries: 3
backoff:
initial_interval: 5s
max_interval: 30s
output:
sql_insert:
driver: postgres
dsn: postgres://postgres:password@postgres-host:5432/my_database?sslmode=disable
table: sensor_readings
columns:
- sensor_id
- reading_value
- reading_timestamp
args_mapping: |
root = [
this.sensor_id,
this.reading_value,
this.reading_timestamp
]
Starting the Pipeline
Run:
docker compose up -d
Watch logs:
docker compose logs -f --tail=50 redpanda-connect
If everything is working, you should see messages like:
Input type amqp_0_9 is now active
Output type sql_insert is now active
Launching Redpanda Connect in streams mode
Important Lessons Learned
1. Do Not Put Resources Inside Stream Files
In streams mode, this should not go inside /streams/*.yaml:
rate_limit_resources:
Put it in /resources/*.yaml, then load it with:
command:
- streams
- -r
- /resources/*.yaml
- /streams/*.yaml
2. One RabbitMQ Queue Is Not Broadcast
If two workflows consume the same RabbitMQ queue, they compete for messages. One message goes to one consumer, not both.
If two pipelines need the same data, create two separate queues and bind both queues to the same exchange/routing key.
3. Match SQL Columns and Arguments Exactly
This must match:
columns:
- sensor_id
- reading_value
- reading_timestamp
with:
args_mapping: |
root = [
this.sensor_id,
this.reading_value,
this.reading_timestamp
]
If the number of columns and values is different, PostgreSQL will reject the insert.
4. Rate Limit After Splitting Messages
If one RabbitMQ message contains many records, split it first, then rate limit.
- unarchive:
format: json_array
- rate_limit:
resource: postgres_insert_limit
That way the limit applies to actual PostgreSQL rows, not just RabbitMQ messages.
5. Watch for Dotted JSON Keys
If your JSON keys contain dots, such as:
{
"mvps4d.smb12": {
"value": 123
}
}
avoid using path-style access. Use key_values() instead.
Final Thoughts
Redpanda Connect is a good middle ground between a visual automation tool and a full custom service. You still get readable workflows, but with better control over throughput, retries, parsing, and database writes. For high-volume data pipelines, especially RabbitMQ to PostgreSQL, Redpanda Connect is much more suitable.