Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [Circuit Breaker](community-plugins/circuit-breaker.md)
* [SendRemoteFile](community-plugins/sendremotefile.md)
* [RFC 7234 Cache](community-plugins/cache.md)
* [AMQP1](community-plugins/amqp1.md)

## 📡 App Server

Expand Down
283 changes: 283 additions & 0 deletions community-plugins/amqp1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
# Jobs — AMQP 1.0 Driver

AMQP 1.0 driver for RoadRunner provides unified support for both **Azure Service Bus** and **RabbitMQ** using the pure `github.com/Azure/go-amqp` library. It implements the standardized AMQP 1.0 protocol for better interoperability and broker-agnostic messaging.

{% hint style="warning" %}
This is a third-party plugin and isn't included by default. See the "Building RoadRunner with AMQP1" section for more information.
{% endhint %}

## Features

* **Pure AMQP 1.0 Implementation**: Uses `github.com/Azure/go-amqp` v1.4.0 for standardized protocol support
* **Dual Broker Support**: Works with both Azure Service Bus and RabbitMQ (with AMQP 1.0 plugin)
* **Automatic Broker Detection**: Identifies Azure Service Bus vs RabbitMQ automatically
* **Unified Configuration**: Same configuration format works with both brokers
* **TLS/SSL Support**: Automatic encryption for Azure Service Bus, configurable for RabbitMQ
* **Connection Resilience**: Built-in retry mechanisms and connection management
* **Distributed Tracing**: OpenTelemetry integration for observability
* **Container-based Architecture**: AMQP 1.0 sessions and links for efficient messaging
* **Event-driven Design**: Asynchronous message processing capabilities

## Building RoadRunner with AMQP1

To include the AMQP1 driver in your RoadRunner build, you need to configure it in your velox.toml build configuration.

{% code title="velox.toml" %}

```toml
[roadrunner]
ref = "master"

[debug]
enabled = false

[log]
level = "debug"
mode = "production"

# optional, needed only to download RR once (per version)
[github.token]
token = "${GITHUB_TOKEN}"

# Include the AMQP1 plugin
[plugins.amqp1]
tag = "master"
module_name = "github.com/ammadfa/amqp1"

# Include jobs plugin (required dependency)
[plugins.jobs]
tag = "latest"
module_name = "github.com/roadrunner-server/jobs/v5"

# Other common plugins
[plugins.server]
tag = "latest"
module_name = "github.com/roadrunner-server/server/v5"

[plugins.http]
tag = "latest"
module_name = "github.com/roadrunner-server/http/v5"
```

{% endcode %}

More info about customizing RR with your own plugins: [link](../customization/plugin.md)

## Configuration

### Azure Service Bus Configuration

For Azure Service Bus, use the `amqps://` protocol with your connection string:

{% code title=".rr.yaml" %}

```yaml
amqp1:
addr: "amqps://RootManageSharedAccessKey:YOUR_ACCESS_KEY@YOUR_NAMESPACE.servicebus.windows.net:5671/"
container_id: "roadrunner-jobs-azure"

jobs:
consume: ["azure-queue"]

pipelines:
azure-queue:
driver: amqp1
config:
queue: "test-queue" # Must exist in Azure Service Bus
prefetch: 10
priority: 1
durable: false
exclusive: false
```

{% endcode %}

**Azure Service Bus Requirements:**
- Queue must be pre-created in Azure portal or via Azure CLI
- Uses Shared Access Key authentication
- TLS is automatically enabled with `amqps://` protocol
- Routing occurs directly to queue (no exchanges)

### RabbitMQ Configuration

For RabbitMQ, ensure the AMQP 1.0 plugin is enabled:

{% code title=".rr.yaml" %}

```yaml
amqp1:
addr: "amqp://username:password@rabbitmq:5672/"
container_id: "roadrunner-jobs-rabbitmq"

jobs:
consume: ["rabbit-queue"]

pipelines:
rabbit-queue:
driver: amqp1
config:
queue: "test-queue"
routing_key: "test-queue"
exchange: "test-queue" # Use default exchange
exchange_type: "direct" # informational; configure server-side
prefetch: 10
priority: 10
durable: true
exclusive: false
```

{% endcode %}

**RabbitMQ Requirements:**
- Enable AMQP 1.0 plugin: `rabbitmq-plugins enable rabbitmq_amqp1_0`
- Queues and exchanges must be created ahead of time
- Supports exchange-based routing; ensure bindings are configured server-side
- For delayed messages, enable: `rabbitmq-plugins enable rabbitmq_delayed_message_exchange`

### TLS Configuration

For secure connections, configure TLS settings:

{% code title=".rr.yaml" %}

```yaml
amqp1:
addr: "amqps://guest:[email protected]:5671/"
container_id: "roadrunner-secure"
tls:
cert: "/path/to/cert.pem"
key: "/path/to/key.pem"
root_ca: "/path/to/ca.pem"
insecure_skip_verify: false
```

{% endcode %}

### Advanced Pipeline Configuration

{% code title=".rr.yaml" %}

```yaml
jobs:
pipelines:
# Azure Service Bus optimized pipeline
priority-orders:
driver: amqp1
config:
queue: "priority-orders"
prefetch: 50
priority: 5

# RabbitMQ with topic exchange
events-pipeline:
driver: amqp1
config:
queue: "events-queue"
exchange: "events-exchange"
exchange_type: "topic"
routing_key: "events.#"
prefetch: 25
priority: 3
durable: true
exclusive: false
```
Comment on lines +161 to +183
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

YAML validity: fields look plausible; please lint to be safe.

Run markdownlint and a YAML linter on the added snippets per repo guidelines.


🏁 Script executed:

#!/bin/bash
# Lint Markdown (markdownlint config assumed in repo)
npx markdownlint-cli2 '**/*.md'

# Quick YAML syntax checks (prints offending snippet/line on error)
python - <<'PY'
from ruamel.yaml import YAML
import sys, re
yaml = YAML(typ='safe')
doc, buf = None, []
capture=False
import pathlib
for p in pathlib.Path('community-plugins').glob('amqp1.md'):
    for ln in p.read_text().splitlines():
        if ln.strip().startswith('```yaml'):
            capture=True; buf=[]; continue
        if capture and ln.strip().startswith('```'):
            try:
                yaml.load('\n'.join(buf))
            except Exception as e:
                print(f'YAML block failed in {p}: {e}')
            capture=False; continue
        if capture: buf.append(re.sub(r'^\s{0,3}', '', ln))
PY

Length of output: 56502


Fix markdownlint violations in community-plugins/amqp1.md; YAML fences validate.

markdownlint reported MD004, MD032, MD022, MD059 and MD047 in community-plugins/amqp1.md (unordered list style, missing blank lines around lists/headings, non‑descriptive link text, missing single trailing newline). The YAML fenced blocks parsed successfully with ruamel.yaml. Fix the markdownlint issues and re-run markdownlint-cli2 (and the YAML check).

🤖 Prompt for AI Agents
In community-plugins/amqp1.md around lines 161 to 183, fix the markdownlint
violations: normalize unordered list markers (use a single style, e.g.,
hyphens), add required blank lines above and below lists and headings, replace
non‑descriptive link text with meaningful text, and ensure the file ends with a
single trailing newline; keep the existing YAML fenced blocks intact (they
validate with ruamel.yaml). After making these edits, run markdownlint-cli2 and
the YAML checker to confirm MD004, MD032, MD022, MD059 and MD047 are resolved.


{% endcode %}

## Migration Benefits

The AMQP1 driver provides significant advantages over RabbitMQ-specific implementations:

### From RabbitMQ-specific to Universal
- **Previous**: `github.com/rabbitmq/rabbitmq-amqp-go-client` (RabbitMQ-only)
- **Current**: `github.com/Azure/go-amqp` (Works with any AMQP 1.0 broker)
- **Benefit**: Single codebase supports multiple message brokers

### Protocol Standardization
- **AMQP 1.0 Compliance**: Standardized protocol ensures better interoperability
- **Container Model**: Modern connection architecture with sessions and links
- **Message Format**: Structured AMQP 1.0 messages with application properties

### Cloud-Native Ready
- **Azure Service Bus**: Native support for Microsoft's cloud messaging service
- **Hybrid Deployments**: Use RabbitMQ on-premises and Azure Service Bus in cloud
- **Consistent API**: Same job queue interface regardless of broker

## Implementation Details

### Driver Components

1. **Plugin** (`plugin.go`): Main plugin interface and registration
2. **Driver** (`amqp1jobs/driver.go`): Core driver with pure AMQP 1.0 support
3. **Config** (`amqp1jobs/config.go`): Configuration structure and validation
4. **Item** (`amqp1jobs/item.go`): Message/job item handling and serialization

### Connection Management

The driver uses the AMQP 1.0 container model:

```go
// Create AMQP 1.0 connection
conn, err := amqp.Dial(ctx, addr, &amqp.ConnOptions{
ContainerID: conf.ContainerID,
TLSConfig: tlsConfig,
})

// Session for logical grouping
session, err := conn.NewSession(ctx, nil)

// Receivers and senders for message flow
receiver, err := session.NewReceiver(ctx, queueName, options)
sender, err := session.NewSender(ctx, target, options)
```

### Broker Detection

The driver automatically detects the broker type and adapts message routing:

- **Azure Service Bus**: Direct queue addressing
- **RabbitMQ**: Exchange-based routing with AMQP v2 addressing

### Message Processing

Unified message processing pattern for both brokers:

```go
// Receive messages
msg, err := receiver.Receive(ctx, nil)

// Process job
jobItem := convertFromAMQP1Message(msg)

// Acknowledge based on result
if success {
receiver.AcceptMessage(ctx, msg)
} else {
receiver.RejectMessage(ctx, msg, nil)
}
```

## Troubleshooting

### Common Issues

1. **RabbitMQ AMQP 1.0 Plugin**: Ensure `rabbitmq_amqp1_0` plugin is enabled
2. **Azure Service Bus**: Verify access keys and queue existence
3. **TLS Configuration**: Check certificate paths and validity
4. **Queue Declaration**: Queues must be pre-created for AMQP 1.0

### Debugging

Enable debug logging to troubleshoot connection and message issues:

```yaml
log:
level: "debug"
mode: "development"
```

### Performance Tuning

- **Prefetch**: Adjust based on message processing speed
- **Priority**: Set appropriate job priority levels
- **Connection Pooling**: Use appropriate container IDs for load distribution
1 change: 1 addition & 0 deletions community-plugins/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ Here is a list of community plugins that are available:
| SendRemoteFile | [link](https://github.com/roadrunner-server/sendremotefile)| Used to send a file from a remote endpoint into the response | [docs](./sendremotefile.md) |
| CircuitBreaker | [link](https://github.com/roadrunner-server/circuit-breaker)| Circuit breaker pattern implementation for RoadRunner | [docs](./circuit-breaker.md) |
| RFC 7234 Cache | [link](https://github.com/darkweak/souin/tree/master/plugins/roadrunner)| RFC 7234 Cache implementation for RoadRunner | [docs](https://github.com/darkweak/souin?tab=readme-ov-file#roadrunner-middleware) |
| AMQP1 | [link](https://github.com/ammadfa/amqp1)| AMQP 1.0 driver for Azure Service Bus and RabbitMQ with pure go-amqp library | [docs](./amqp1.md) |

Feel free to make a PR to add your plugin to the list.