Per-org AMQP Queues
Queue layout, routing key conventions, RabbitMQ auth delegation, lifecycle, durability, and connection endpoint.
Overview
Voke models each integrating organisation as an isolated AMQP tenant. A single shared RabbitMQ topic exchange (vcp) carries traffic in the legacy/default vhost; new partner keys may also receive a dedicated vhost named partner-{apiKeyId}. Isolation is enforced through queue prefixes plus the RabbitMQ HTTP auth backend, which validates every connect, queue access, and publish operation against the authenticated org and, for per-key vhosts, the concrete API key.
Operator prerequisite — RabbitMQ federation. Per-key vhost partners (partner-{apiKeyId})
consume from queues in their own vhost, but Voke's hot publish path writes to the vcp exchange
in the central / vhost. Without RabbitMQ exchange federation enabled, a partner authenticates
cleanly but never receives a single message. Voke's production deployment runs the
rabbitmq_federation plugin with an upstream + policy that mirrors vcp.* events from / into
every per-key vhost. If you are running your own broker, see
docs/deployment/per-key-vhost-federation.md in the repo before onboarding partners — this is
not optional.
Queue layout
All queues are prefixed with vcp.{orgSlug}. Queues are durable: true and survive broker restarts.
| Queue | Direction | Routing key binding | Description |
|---|---|---|---|
vcp.{slug}.command | Inbound (partner → Voke) | {slug}.command.# | Dispatched commands: setpoints, mode changes, emergency stops |
vcp.{slug}.config | Inbound (partner → Voke) | {slug}.config.# | Site configuration requests (constraints, topology, fallback) |
vcp.{slug}.schedule | Inbound (partner → Voke) | {slug}.schedule.* | Scheduled dispatch plans (create, cancel) |
vcp.{slug}.event.telemetry | Outbound (Voke → partner) | {slug}.event.telemetry.realtime, {slug}.event.telemetry.meter | Real-time telemetry and 1-minute meter readings |
vcp.{slug}.event.status | Outbound (Voke → partner) | {slug}.event.command.*, {slug}.event.mode.*, {slug}.event.schedule.* | Command ACK/NACK, mode change events, schedule status |
vcp.{slug}.event.alarm | Outbound (Voke → partner) | {slug}.event.alarm.# | PLC-originated alarms forwarded upstream |
vcp.{slug}.event.execution | Outbound (Voke → partner) | {slug}.event.execution.# | Execution status updates for dispatched commands |
Inbound = you publish to the exchange; Voke's consumer picks it up. Outbound = Voke publishes; you consume.
Routing key conventions
Inbound
Commands, config, and schedules use a {slug}.{family}.{subtype} pattern:
{slug}.command.site-setpoint — site-level power / energy setpoint
{slug}.command.device — per-device command (charge/discharge/stop)
{slug}.command.emergency — emergency stop or hold
{slug}.config.site-constraints — operating constraints (max power, SOC limits, etc.)
{slug}.config.site-topology — asset declaration and topology update
{slug}.config.fallback-config — fallback behaviour on loss of connectivity
{slug}.schedule.create — create or replace a dispatch schedule
{slug}.schedule.cancel — cancel an active scheduleThe command.# binding on vcp.{slug}.command and config.# binding on vcp.{slug}.config use multi-word wildcards, so additional command or config types can be added without modifying the binding. The schedule.* binding uses a single-word wildcard — schedule subtypes are always one token.
Outbound
Voke publishes outbound events using the following routing keys (verified from VcpAmqpService):
| Routing key | Queue received by | Published by |
|---|---|---|
{slug}.event.telemetry.realtime | vcp.{slug}.event.telemetry | publishTelemetry |
{slug}.event.telemetry.meter | vcp.{slug}.event.telemetry | publishMeterReading |
{slug}.event.alarm | vcp.{slug}.event.alarm | publishAlarm |
{slug}.event.command.ack | vcp.{slug}.event.status | publishCommandAck |
{slug}.event.execution | vcp.{slug}.event.execution | publishCommandStatus |
The alarm and execution bindings use the multi-word wildcard (#) so the current bare routes and any future subtype routes both land in the same queue. Status uses single-word subtype bindings (event.command.*, event.mode.*, event.schedule.*).
AMQP auth flow
When a partner's AMQP client connects to RabbitMQ, the broker has no local user database. Instead, it delegates every auth decision to Voke via the rabbitmq_auth_backend_http plugin. Voke exposes four endpoints under /api/v1/internal/amqp-auth/. These endpoints are @Public() (no JWT required) and intentionally carry no application-layer header secret — the rabbitmq_auth_backend_http plugin (v3.13.7) does not support sending custom headers, so an earlier X-Internal-Secret design was dropped. Protection comes from two layers below the app: only the broker container can reach the API on the internal Docker network, and the nginx reverse proxy returns 404 for any /api/v1/internal/* request from the public internet.
Endpoint details
POST /api/v1/internal/amqp-auth/user
Called once on connect. Voke:
- Hashes the password using HMAC-SHA256 (with pepper) and looks up the API key.
- Checks that the key is active and not expired.
- Validates that the API key's organisation slug matches the supplied username.
- Checks that the key has
vcp:connect(or legacytrading:connect) scope.
Returns "allow" on success, "deny" on any failure.
POST /api/v1/internal/amqp-auth/vhost
Called after user auth to check vhost access.
Allowed vhosts:
/— legacy/default vhost.partner-{apiKeyId}— dedicated per-key vhost when the key was provisioned with avhostvalue.
Any other vhost is denied. For per-key vhosts, the API key must be active, its stored apiKey.vhost must match the requested vhost, and the key's org slug must match the AMQP username.
POST /api/v1/internal/amqp-auth/resource
Called per operation (queue declare, bind, publish, consume). Voke enforces:
- Exchange access:
vcpexchange → allowed. Legacytradingexchange → allowed (backward compat). Default exchange ("") → allowed (needed for RPC-style direct replies). - Queue access: only queues whose name starts with
vcp.{orgSlug}.are allowed. Any attempt to touch another org's queue returns"deny". - All other resource types →
"deny".
POST /api/v1/internal/amqp-auth/topic
Called per publish/consume on topic exchanges. Voke enforces that the routing key starts with the org's queue prefix. A partner for org acme cannot publish to beta.command.site-setpoint or consume from beta.event.telemetry.realtime.
Write permission is also scope-gated:
| Routing key family | Required publish scope |
|---|---|
{slug}.command.site-setpoint | vcp:write:setpoint |
{slug}.command.device / {slug}.command.device.* | vcp:write:device-command |
{slug}.command.mode | vcp:write:mode |
{slug}.schedule.* | vcp:write:schedule |
{slug}.config.* | vcp:write:config |
Unknown publish routing-key families fail closed. Read/consume operations remain prefix-gated.
Queue lifecycle
Queues are managed dynamically — Voke never requires a manual broker setup.
On enable: When an org's trading status transitions to ENABLED, TradingPartnerConfigService.enableTrading() calls VcpAmqpService.addOrgConsumer(orgId, slug). OrgVcpConsumer.setup() then asserts all seven queues (durable: true) and binds them to the vcp exchange. If AMQP is not yet connected, the consumer is queued and set up as soon as the connection is established.
On disable: disableTrading() calls VcpAmqpService.removeOrgConsumer(orgId), which calls OrgVcpConsumer.teardown(deleteQueues = false). By default, queues are not deleted — they are unbound from consumers and their contents preserved. Pass deleteQueues = true explicitly to delete them (e.g. during integration teardown).
On API boot: VcpAmqpService.onModuleInit() connects to AMQP and calls setupAllOrgConsumers(), which queries all TradingPartnerConfig rows with status = ENABLED and re-asserts queues for each. PerVhostConsumerService also opens one consumer connection for each active API key with a non-null vhost. Queues survive API restarts without any operator action.
On AMQP disconnect: The service clears all in-process OrgVcpConsumer instances and schedules a reconnect with exponential backoff (initial 1 s, max 30 s). On reconnect all org consumers are re-established automatically.
Durability and ack guarantees
All queues are declared durable: true. Messages published by Voke use persistent: true (delivery mode 2). Messages survive a RabbitMQ broker restart for all queues.
For inbound queues (you publish):
- Publish with
persistent: true(delivery mode 2) to ensure messages survive a broker restart before Voke picks them up. - Wrap each publish in a channel confirm (
channel.waitForConfirms()) if you need delivery guarantees.
For outbound queues (you consume):
- Acknowledge (
ack) after successful processing. - On malformed payloads (schema validation failure),
nackwithrequeue = falseso the message routes to a dead-letter queue rather than looping. - Voke's own inbound consumers follow this pattern: parse and validate with Zod;
ackon success;nack(requeue=false)on parse failure.
Connection endpoint
Partner clients connect to the public AMQPS listener — amqps:// over TLS, never plaintext.
| Environment | Host | Port | Protocol |
|---|---|---|---|
| Production | mqtt.voke.lovinka.com | 5671 | amqps (TLS) |
| Local dev | localhost | 5671 | amqps (TLS) |
The host and port that Voke embeds in the partner connection bundle are configurable via PUBLIC_AMQPS_HOST and PUBLIC_AMQPS_PORT on the Voke API. The defaults above apply when these env vars are not set. (The separately-named AMQP_PUBLIC_HOST / AMQP_PUBLIC_PORT env vars control the API's own internal connection to the broker on the Docker network and are not the partner-facing surface.)
Connection string pattern:
// username = org slug, password = plaintext API key secret (vcp:connect scope)
// vhost = apiKey.vhost (per-key) or '/' (legacy)
const vhostPath = apiKey.vhost ? `/${encodeURIComponent(apiKey.vhost)}` : '/';
const url = `amqps://${encodeURIComponent(orgSlug)}:${encodeURIComponent(apiKeySecret)}@${host}:${port}${vhostPath}`;The Connect Partner wizard's reveal screen hands you the fully-assembled amqps:// URI from ConnectionBundleBuilder — paste that directly rather than reconstructing it. See API keys & auth for how to create a key and which scope to assign.