Channels
Cadenza uses channels for real-time push updates. Subscribe to channels to receive live data streams.
Channel Namespaces
| Namespace | Access | Description |
|---|---|---|
| Market Data | Public | Order books, trades, and other market data |
| Trading Data | Private | Account updates, orders, portfolio, balances |
Channel Naming Convention
Channels follow the pattern: {namespace}:{resource}:{identifiers}
| Pattern | Example | Description |
|---|---|---|
market:orderbook:{venue}:{symbol} | market:orderbook:BINANCE:BTC/USDT | Order book updates |
market:trade:{venue}:{symbol} | market:trade:BINANCE:BTC/USDT | Trade updates |
trading:userData:{accountId} | trading:userData:uuid-here | Account updates |
Subscription Workflow
Subscribing to real-time data requires three steps:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Client │ │ Centrifugo │ │ Cadenza │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ 1. RPC: Create Subscription │
│──────────────────────────────────────────────▶│
│ │ │
│ │ 2. Start streaming │
│ │◀──────────────────────│
│ │ │
│ 3. Subscribe to channel │
│──────────────────────▶│ │
│ │ │
│ 4. Receive publications │
│◀──────────────────────│◀──────────────────────│
Step 1: Create server-side subscription (RPC) (Admin only)
- Tells Cadenza what data to stream
- Uses
command.subscription.createRPC method
Step 2: List available subscriptions (RPC)
- Query active subscriptions before subscribing
- Uses
query.subscription.listRPC method
Step 3: Subscribe to Centrifugo channel
- Connects client to receive the data
- Uses Centrifugo SDK subscription API
Managing Subscriptions
Track Active Subscriptions
1. Store subscription references in a dict: { channel: subscription }
2. Check before subscribing to avoid duplicates
3. Remove from dict when unsubscribing
Unsubscribe
await sub.unsubscribe()
client.remove_subscription(sub)
Reconnection and Recovery
Centrifugo supports automatic reconnection and message recovery:
| Context Field | Description |
|---|---|
recoverable | Whether missed messages can be recovered |
offset | Last message offset (for recovery) |
epoch | Stream epoch identifier |
Recovery Flow:
- Track
offsetfrom each publication - On reconnect, Centrifugo requests messages from last offset
- Missed messages are replayed automatically
Subscription Events
Implement SubscriptionEventHandler:
| Event | Description |
|---|---|
on_subscribing | Subscription attempt started |
on_subscribed | Successfully subscribed |
on_unsubscribed | Unsubscribed from channel |
on_publication | Received data update |
on_error | Subscription error |
Best Practices
-
Always create server-side subscription first
- The RPC call tells Cadenza to start streaming data
- Without this, the channel will have no data
-
Use SDK models for parsing
- Parse publication data into typed models
- Example:
RpcOrderBook.from_dict(data)
-
Handle reconnection
- Implement
on_disconnectedhandler - Track offsets for message recovery
- Implement
-
Unsubscribe when done
- Clean up subscriptions to save resources
- Call both
unsubscribe()andremove_subscription()
-
One subscription per channel
- Check if already subscribed before creating new subscription
- Reuse existing subscription objects