Telemetry ingress
Consume real-time telemetry and 1-minute meter readings forwarded by Voke from plant MQTT to your AMQP queue.
Overview
Telemetry flows from plant to partner: a PLC publishes snapshots on the MQTT topic cpi/{plantId}/telemetry, Voke's SubDeviceTelemetryService decodes and stores the signals using the plant's DeviceTemplate, and VCP forwarders publish partner-facing payloads to vcp.{slug}.event.telemetry.
Two distinct payload shapes arrive on the same queue:
CanonicalTelemetry— real-time snapshot, published on routing key{slug}.event.telemetry.realtime.MeterReadingPayload— 1-minute meter register snapshot, published on routing key{slug}.event.telemetry.meter.
Both arrive on the same queue (vcp.{slug}.event.telemetry). Distinguish them by routing key, or by payload shape (readingAt + meters[] means meter reading).
Payload shapes
CanonicalTelemetry — real-time snapshot
Published by VcpAmqpService.publishTelemetry. Fired on every verified plant telemetry event.
interface CanonicalTelemetry {
gridPowerKw: number | null; // Net grid power; positive = import
fvePowerKw: number | null; // Solar / FVE generation
batteryPowerKw: number | null; // Battery power; positive = charging
consumptionPowerKw: number | null; // Total site consumption
socPercent: number | null; // Battery state of charge (0–100)
availableBatteryEnergyKwh: number | null; // Available battery energy
batteryTemperatureCelsius: number | null; // Battery temperature
currentOperatingMode: OperatingMode; // Active control mode
dataQuality: 'GOOD' | 'INTERPOLATED' | 'STALE' | 'MISSING';
devices?: DeviceTelemetry[]; // Per-device breakdown (optional)
}| Field | Description |
|---|---|
gridPowerKw | Net grid power. Positive values indicate grid import; negative indicate export. |
fvePowerKw | FVE / solar generation in kW. |
batteryPowerKw | Battery power. Positive = charging; negative = discharging. |
consumptionPowerKw | Total site load in kW. |
socPercent | Battery state of charge as a percentage. null if no battery or data unavailable. |
availableBatteryEnergyKwh | Energy the battery can currently deliver. |
batteryTemperatureCelsius | Battery temperature in °C. |
currentOperatingMode | The active control strategy — see OperatingMode enum below. |
dataQuality | GOOD = fresh measurement; INTERPOLATED = estimated; STALE = last-known; MISSING = no data available. |
devices | Per-asset breakdown when the plant has sub-devices. See DeviceTelemetry below. |
OperatingMode values:
enum OperatingMode {
STANDARD = 'STANDARD',
ZERO_EXPORT = 'ZERO_EXPORT',
MAX_EXPORT = 'MAX_EXPORT',
PEAK_SHAVING = 'PEAK_SHAVING',
LOCAL_OPTIMIZATION = 'LOCAL_OPTIMIZATION',
GRID_TARGET = 'GRID_TARGET',
LDS_SUPPORT = 'LDS_SUPPORT',
}DeviceTelemetry — per-device breakdown
Present in CanonicalTelemetry.devices[] when the plant has sub-devices declared in its DeviceTemplate.
interface DeviceTelemetry {
deviceId: string; // Sub-device externalId (e.g. 'B1', 'S2')
assetType: AssetType; // Asset class
powerKw: number; // Device power output / input in kW
socPercent?: number; // Battery SOC (batteries only)
temperatureCelsius?: number; // Device temperature
availableCapacityKwh?: number; // Available energy capacity (batteries)
regulationPercent?: number; // Regulation setpoint tracking accuracy (%)
}deviceId maps to the sub-device's operator-assigned externalId on the Voke plant, not the Voke UUID.
MeterReadingPayload — 1-minute meter registers
Published by VcpAmqpService.publishMeterReading once per minute for VCP-enabled plants that have meter register signals.
type MeterRole = 'GRID' | 'FVE' | 'BESS' | 'CONSUMPTION';
interface MeterEntry {
meterId: string; // Sub-device externalId
role: MeterRole;
importRegisterKwh?: number;
exportRegisterKwh?: number;
productionRegisterKwh?: number;
chargeRegisterKwh?: number;
dischargeRegisterKwh?: number;
consumptionRegisterKwh?: number;
}
interface MeterReadingPayload {
readingAt: string; // ISO 8601 UTC
meters: MeterEntry[];
dataQuality: 'GOOD' | 'DEGRADED' | 'ESTIMATED';
}| Field | Description |
|---|---|
readingAt | Clock-aligned minute timestamp. |
meters | Absolute meter-register values keyed by sub-device externalId. |
dataQuality | GOOD, DEGRADED, or ESTIMATED. |
importRegisterKwh / exportRegisterKwh | Grid import/export registers. |
productionRegisterKwh | FVE production register. |
chargeRegisterKwh / dischargeRegisterKwh | BESS charge/discharge registers. |
consumptionRegisterKwh | Site consumption register. |
VCP v1.1 replaced the old 15-minute IntervalTelemetry payload with MeterReadingPayload.
Consumers should not expect intervalStart, intervalEnd, or gridEnergyKwh15min.
Forwarding and filtering
VcpTelemetryForwarder forwards every decoded telemetry event to the partner queue without signal-level filtering. There is no featured-flag or signal-selection step in the ESM forwarding path — all signals present in the canonical payload are included. Partners that need a subset of signals should filter on the consumer side.
The flow for each real-time event:
- PLC publishes a snapshot to
cpi/{plantId}/telemetryover MQTT. SubDeviceTelemetryServiceverifies the HMAC, decodes sub-device signals viaDeviceTemplate, and emitstelemetry.verified.VcpTelemetryForwarderlistens fortelemetry.verified, resolves the plant'sexternalPlantId, and confirms the org has VCP trading enabled.- The org's adapter normalises the raw data into
CanonicalTelemetryand formats it for partner delivery. VcpAmqpService.publishTelemetrywraps the payload in a VCP envelope and publishes to{slug}.event.telemetry.realtimeon thevcpexchange.MeterReadingForwarderqueries meter register signals once per minute and publishes{slug}.event.telemetry.meter.
Plants without a configured externalPlantId are silently skipped; their telemetry is stored in Voke but not forwarded to the partner.
Subscribing to telemetry
The queue vcp.{slug}.event.telemetry is asserted and bound by Voke when trading is enabled. Your consumer connects with orgSlug as the AMQP username and an API key (vcp:connect scope) as the password.
Using the consume-telemetry.ts example:
import { connectVoke, type VokeAmqpCreds } from './examples/esm/amqp-connect';
interface CanonicalTelemetryPayload {
readingAt?: string; // present on MeterReadingPayload
gridPowerKw: number | null;
fvePowerKw: number | null;
batteryPowerKw: number | null;
consumptionPowerKw: number | null;
socPercent: number | null;
currentOperatingMode: string;
dataQuality: string;
devices?: Array<{ deviceId: string; assetType: string; powerKw: number }>;
}
async function startTelemetryConsumer(creds: VokeAmqpCreds) {
const { ch, outbound } = await connectVoke(creds);
const seen = new Set<string>(); // deduplicate on messageId
await ch.consume(outbound.telemetry, (msg) => {
if (!msg) return;
try {
const envelope = JSON.parse(msg.content.toString()) as {
version: '1.1';
messageId: string;
timestamp: string;
siteId: string;
payload: CanonicalTelemetryPayload;
};
// Deduplicate on messageId
if (seen.has(envelope.messageId)) {
ch.ack(msg);
return;
}
seen.add(envelope.messageId);
const { payload } = envelope;
const isMeterReading = 'readingAt' in payload && 'meters' in payload;
if (isMeterReading) {
console.log(`[${envelope.siteId}] meter reading`, payload);
} else {
console.log(`[${envelope.siteId}] real-time`, payload);
}
ch.ack(msg);
} catch {
// Malformed — dead-letter, do not requeue
ch.nack(msg, false, false);
}
});
console.log(`Consuming telemetry from ${outbound.telemetry}`);
}The messageId in the outer VCP envelope is a Voke-assigned UUIDv4. Use it for deduplication — on consumer reconnect, RabbitMQ may redeliver messages that were not acknowledged before the channel dropped.
Related pages
- Concepts / Signals — how plant signals are decoded from sub-device snapshots
- Concepts / Data retention — TimescaleDB aggregation windows and raw data lifetime
- VCP data model — full field reference for
CanonicalTelemetryandMeterReadingPayload - Commands — observe post-command effects by correlating
siteIdtimestamps in the telemetry stream