This is a development version of the documentation. Content may change without notice.
Voke Documentation
ESM Partners

Per-org AMQP Queues

Queue layout, routing key conventions, RabbitMQ auth delegation, lifecycle, durability, and connection endpoint.

Overview

Voke models each integrating organisation as an isolated AMQP tenant. A single shared RabbitMQ topic exchange (vcp) carries traffic in the legacy/default vhost; new partner keys may also receive a dedicated vhost named partner-{apiKeyId}. Isolation is enforced through queue prefixes plus the RabbitMQ HTTP auth backend, which validates every connect, queue access, and publish operation against the authenticated org and, for per-key vhosts, the concrete API key.

Operator prerequisite — RabbitMQ federation. Per-key vhost partners (partner-{apiKeyId}) consume from queues in their own vhost, but Voke's hot publish path writes to the vcp exchange in the central / vhost. Without RabbitMQ exchange federation enabled, a partner authenticates cleanly but never receives a single message. Voke's production deployment runs the rabbitmq_federation plugin with an upstream + policy that mirrors vcp.* events from / into every per-key vhost. If you are running your own broker, see docs/deployment/per-key-vhost-federation.md in the repo before onboarding partners — this is not optional.

Queue layout

All queues are prefixed with vcp.{orgSlug}. Queues are durable: true and survive broker restarts.

QueueDirectionRouting key bindingDescription
vcp.{slug}.commandInbound (partner → Voke){slug}.command.#Dispatched commands: setpoints, mode changes, emergency stops
vcp.{slug}.configInbound (partner → Voke){slug}.config.#Site configuration requests (constraints, topology, fallback)
vcp.{slug}.scheduleInbound (partner → Voke){slug}.schedule.*Scheduled dispatch plans (create, cancel)
vcp.{slug}.event.telemetryOutbound (Voke → partner){slug}.event.telemetry.realtime, {slug}.event.telemetry.meterReal-time telemetry and 1-minute meter readings
vcp.{slug}.event.statusOutbound (Voke → partner){slug}.event.command.*, {slug}.event.mode.*, {slug}.event.schedule.*Command ACK/NACK, mode change events, schedule status
vcp.{slug}.event.alarmOutbound (Voke → partner){slug}.event.alarm.#PLC-originated alarms forwarded upstream
vcp.{slug}.event.executionOutbound (Voke → partner){slug}.event.execution.#Execution status updates for dispatched commands

Inbound = you publish to the exchange; Voke's consumer picks it up. Outbound = Voke publishes; you consume.

Routing key conventions

Inbound

Commands, config, and schedules use a {slug}.{family}.{subtype} pattern:

{slug}.command.site-setpoint    — site-level power / energy setpoint
{slug}.command.device           — per-device command (charge/discharge/stop)
{slug}.command.emergency        — emergency stop or hold

{slug}.config.site-constraints  — operating constraints (max power, SOC limits, etc.)
{slug}.config.site-topology     — asset declaration and topology update
{slug}.config.fallback-config   — fallback behaviour on loss of connectivity

{slug}.schedule.create          — create or replace a dispatch schedule
{slug}.schedule.cancel          — cancel an active schedule

The command.# binding on vcp.{slug}.command and config.# binding on vcp.{slug}.config use multi-word wildcards, so additional command or config types can be added without modifying the binding. The schedule.* binding uses a single-word wildcard — schedule subtypes are always one token.

Outbound

Voke publishes outbound events using the following routing keys (verified from VcpAmqpService):

Routing keyQueue received byPublished by
{slug}.event.telemetry.realtimevcp.{slug}.event.telemetrypublishTelemetry
{slug}.event.telemetry.metervcp.{slug}.event.telemetrypublishMeterReading
{slug}.event.alarmvcp.{slug}.event.alarmpublishAlarm
{slug}.event.command.ackvcp.{slug}.event.statuspublishCommandAck
{slug}.event.executionvcp.{slug}.event.executionpublishCommandStatus

The alarm and execution bindings use the multi-word wildcard (#) so the current bare routes and any future subtype routes both land in the same queue. Status uses single-word subtype bindings (event.command.*, event.mode.*, event.schedule.*).

AMQP auth flow

When a partner's AMQP client connects to RabbitMQ, the broker has no local user database. Instead, it delegates every auth decision to Voke via the rabbitmq_auth_backend_http plugin. Voke exposes four endpoints under /api/v1/internal/amqp-auth/. These endpoints are @Public() (no JWT required) and intentionally carry no application-layer header secret — the rabbitmq_auth_backend_http plugin (v3.13.7) does not support sending custom headers, so an earlier X-Internal-Secret design was dropped. Protection comes from two layers below the app: only the broker container can reach the API on the internal Docker network, and the nginx reverse proxy returns 404 for any /api/v1/internal/* request from the public internet.

Loading diagram...

Endpoint details

POST /api/v1/internal/amqp-auth/user

Called once on connect. Voke:

  1. Hashes the password using HMAC-SHA256 (with pepper) and looks up the API key.
  2. Checks that the key is active and not expired.
  3. Validates that the API key's organisation slug matches the supplied username.
  4. Checks that the key has vcp:connect (or legacy trading:connect) scope.

Returns "allow" on success, "deny" on any failure.

POST /api/v1/internal/amqp-auth/vhost

Called after user auth to check vhost access.

Allowed vhosts:

  • / — legacy/default vhost.
  • partner-{apiKeyId} — dedicated per-key vhost when the key was provisioned with a vhost value.

Any other vhost is denied. For per-key vhosts, the API key must be active, its stored apiKey.vhost must match the requested vhost, and the key's org slug must match the AMQP username.

POST /api/v1/internal/amqp-auth/resource

Called per operation (queue declare, bind, publish, consume). Voke enforces:

  • Exchange access: vcp exchange → allowed. Legacy trading exchange → allowed (backward compat). Default exchange ("") → allowed (needed for RPC-style direct replies).
  • Queue access: only queues whose name starts with vcp.{orgSlug}. are allowed. Any attempt to touch another org's queue returns "deny".
  • All other resource types → "deny".

POST /api/v1/internal/amqp-auth/topic

Called per publish/consume on topic exchanges. Voke enforces that the routing key starts with the org's queue prefix. A partner for org acme cannot publish to beta.command.site-setpoint or consume from beta.event.telemetry.realtime.

Write permission is also scope-gated:

Routing key familyRequired publish scope
{slug}.command.site-setpointvcp:write:setpoint
{slug}.command.device / {slug}.command.device.*vcp:write:device-command
{slug}.command.modevcp:write:mode
{slug}.schedule.*vcp:write:schedule
{slug}.config.*vcp:write:config

Unknown publish routing-key families fail closed. Read/consume operations remain prefix-gated.

Queue lifecycle

Queues are managed dynamically — Voke never requires a manual broker setup.

On enable: When an org's trading status transitions to ENABLED, TradingPartnerConfigService.enableTrading() calls VcpAmqpService.addOrgConsumer(orgId, slug). OrgVcpConsumer.setup() then asserts all seven queues (durable: true) and binds them to the vcp exchange. If AMQP is not yet connected, the consumer is queued and set up as soon as the connection is established.

On disable: disableTrading() calls VcpAmqpService.removeOrgConsumer(orgId), which calls OrgVcpConsumer.teardown(deleteQueues = false). By default, queues are not deleted — they are unbound from consumers and their contents preserved. Pass deleteQueues = true explicitly to delete them (e.g. during integration teardown).

On API boot: VcpAmqpService.onModuleInit() connects to AMQP and calls setupAllOrgConsumers(), which queries all TradingPartnerConfig rows with status = ENABLED and re-asserts queues for each. PerVhostConsumerService also opens one consumer connection for each active API key with a non-null vhost. Queues survive API restarts without any operator action.

On AMQP disconnect: The service clears all in-process OrgVcpConsumer instances and schedules a reconnect with exponential backoff (initial 1 s, max 30 s). On reconnect all org consumers are re-established automatically.

Durability and ack guarantees

All queues are declared durable: true. Messages published by Voke use persistent: true (delivery mode 2). Messages survive a RabbitMQ broker restart for all queues.

For inbound queues (you publish):

  • Publish with persistent: true (delivery mode 2) to ensure messages survive a broker restart before Voke picks them up.
  • Wrap each publish in a channel confirm (channel.waitForConfirms()) if you need delivery guarantees.

For outbound queues (you consume):

  • Acknowledge (ack) after successful processing.
  • On malformed payloads (schema validation failure), nack with requeue = false so the message routes to a dead-letter queue rather than looping.
  • Voke's own inbound consumers follow this pattern: parse and validate with Zod; ack on success; nack(requeue=false) on parse failure.

Connection endpoint

Partner clients connect to the public AMQPS listener — amqps:// over TLS, never plaintext.

EnvironmentHostPortProtocol
Productionmqtt.voke.lovinka.com5671amqps (TLS)
Local devlocalhost5671amqps (TLS)

The host and port that Voke embeds in the partner connection bundle are configurable via PUBLIC_AMQPS_HOST and PUBLIC_AMQPS_PORT on the Voke API. The defaults above apply when these env vars are not set. (The separately-named AMQP_PUBLIC_HOST / AMQP_PUBLIC_PORT env vars control the API's own internal connection to the broker on the Docker network and are not the partner-facing surface.)

Connection string pattern:

// username = org slug, password = plaintext API key secret (vcp:connect scope)
// vhost = apiKey.vhost (per-key) or '/' (legacy)
const vhostPath = apiKey.vhost ? `/${encodeURIComponent(apiKey.vhost)}` : '/';
const url = `amqps://${encodeURIComponent(orgSlug)}:${encodeURIComponent(apiKeySecret)}@${host}:${port}${vhostPath}`;

The Connect Partner wizard's reveal screen hands you the fully-assembled amqps:// URI from ConnectionBundleBuilder — paste that directly rather than reconstructing it. See API keys & auth for how to create a key and which scope to assign.

On this page