Quick Start: Real-Time Market Data
This guide shows how to use the Cadenza WebSocket API to stream real-time market data (order books, trades).
Workflow Overview
Prerequisites
pip install cadenza-client centrifuge-python supabase
Complete Example
import asyncio
from centrifuge import Client, ClientEventHandler, SubscriptionEventHandler
from supabase import create_client
from cadenza_client.models.rpc_method import RpcMethod
from cadenza_client.models.rpc_create_subscription_request import RpcCreateSubscriptionRequest
from cadenza_client.models.rpc_get_order_book_params import RpcGetOrderBookParams
from cadenza_client.models.rpc_order_book import RpcOrderBook
from cadenza_client.models.subscription_type import SubscriptionType
from cadenza_client.models.venue import Venue
# Configuration
SUPABASE_URL = "https://your-project.supabase.co"
SUPABASE_KEY = "your-anon-key"
WS_URL = "wss://cadenza-ws-uat.algo724.com/connection/websocket"
# Step 1: Authenticate with Supabase
def get_access_token(email: str, password: str) -> str:
"""Login to Supabase and get JWT access token"""
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
response = supabase.auth.sign_in_with_password({
"email": email,
"password": password
})
return response.session.access_token
# Step 2: Connection event handlers
class ConnectionHandler(ClientEventHandler):
def on_connected(self, ctx):
print(f"Connected! Client ID: {ctx.client}")
def on_disconnected(self, ctx):
print(f"Disconnected: {ctx.reason} (code: {ctx.code})")
# Handle token expiry
if ctx.code == 3500:
print("Token expired - refresh and reconnect")
def on_error(self, ctx):
print(f"Error: {ctx.error}")
# Step 3: Subscription event handler for order book updates
class OrderBookHandler(SubscriptionEventHandler):
def on_subscribed(self, ctx):
print(f"Subscribed to channel")
def on_publication(self, ctx):
"""Handle incoming order book data"""
# Parse raw dict to SDK model
orderbook = RpcOrderBook.from_dict(ctx.data)
print(f"\n--- {orderbook.venue}:{orderbook.symbol} ---")
print(f"Timestamp: {orderbook.timestamp}")
# Display top 5 bids and asks
print("BIDS ASKS")
for i in range(min(5, len(orderbook.bids or []), len(orderbook.asks or []))):
bid = orderbook.bids[i] if orderbook.bids and i < len(orderbook.bids) else None
ask = orderbook.asks[i] if orderbook.asks and i < len(orderbook.asks) else None
bid_str = f"{bid.price:>10} x {bid.quantity:<8}" if bid else " " * 20
ask_str = f"{ask.price:>10} x {ask.quantity:<8}" if ask else ""
print(f"{bid_str} | {ask_str}")
async def main():
# Get authentication token
token = get_access_token("your@email.com", "your-password")
# Create WebSocket client
client = Client(WS_URL, token=token, events=ConnectionHandler())
# Connect to WebSocket
await client.connect()
print("Connecting...")
# Wait for connection
await asyncio.sleep(1)
# --- Option A: One-time order book request ---
params = RpcGetOrderBookParams(
instrument_id="BINANCE:BTC/USDT",
depth=10
)
result = await client.rpc(
RpcMethod.QUERY_DOT_ORDERBOOK_DOT_GET.value,
params.to_dict()
)
print(f"Order book snapshot: {result.data}")
# --- Option B: Subscribe to real-time updates ---
venue = "BINANCE"
symbol = "BTC/USDT"
# Step 4a: Create server-side subscription (tells Cadenza to start streaming)
request = RpcCreateSubscriptionRequest(
venue=Venue(venue),
instruments=[symbol],
subscriptionType=SubscriptionType.MARKET_DOT_SUBSCRIPTION_DOT_ORDERBOOK
)
await client.rpc(
RpcMethod.COMMAND_DOT_SUBSCRIPTION_DOT_CREATE.value,
request.to_dict()
)
print(f"Created server subscription for {venue}:{symbol}")
# Step 4b: Subscribe to Centrifugo channel (receive the data)
channel = f"market:orderbook:{venue}:{symbol}"
sub = client.new_subscription(channel, events=OrderBookHandler())
await sub.subscribe()
print(f"Subscribed to channel: {channel}")
# Keep running to receive updates
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
# Cleanup
await sub.unsubscribe()
client.remove_subscription(sub)
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Key Concepts
Two-Step Subscription
Subscribing to real-time data requires two steps:
- Create server-side subscription (RPC call) - Tells Cadenza backend to start streaming data
- Subscribe to Centrifugo channel - Connects your client to receive the stream
# Step 1: Server-side subscription
await client.rpc("command.subscription.create", {
"venue": "BINANCE",
"instruments": ["BTC/USDT"],
"subscriptionType": "market.subscription.orderbook"
})
# Step 2: Client-side channel subscription
channel = "market:orderbook:BINANCE:BTC/USDT"
sub = client.new_subscription(channel, events=handler)
await sub.subscribe()
Channel Naming
Channels follow the pattern: {domain}:{resource}:{venue}:{symbol}
| Channel Pattern | Example | Data Type |
|---|---|---|
market:orderbook:{venue}:{symbol} | market:orderbook:BINANCE:BTC/USDT | RpcOrderBook |
market:trade:{venue}:{symbol} | market:trade:BINANCE:ETH/USDT | Trade data |
SDK Models
The Cadenza SDK provides type-safe models for all requests and responses:
# Request params
from cadenza_client.models.rpc_get_order_book_params import RpcGetOrderBookParams
from cadenza_client.models.rpc_create_subscription_request import RpcCreateSubscriptionRequest
# Response data
from cadenza_client.models.rpc_order_book import RpcOrderBook
# Enums
from cadenza_client.models.venue import Venue
from cadenza_client.models.subscription_type import SubscriptionType
from cadenza_client.models.rpc_method import RpcMethod
RPC Methods
Use RpcMethod enum for type-safe method names:
from cadenza_client.models.rpc_method import RpcMethod
# Query methods (read data)
RpcMethod.QUERY_DOT_ORDERBOOK_DOT_GET.value # "query.orderbook.get"
RpcMethod.QUERY_DOT_INSTRUMENT_DOT_LIST.value # "query.instrument.list"
RpcMethod.QUERY_DOT_VENUE_DOT_LIST.value # "query.venue.list"
# Command methods (create/modify)
RpcMethod.COMMAND_DOT_SUBSCRIPTION_DOT_CREATE.value # "command.subscription.create"
Error Handling
Handle connection and authentication errors:
class ConnectionHandler(ClientEventHandler):
def on_disconnected(self, ctx):
if ctx.code == 3500:
# Token expired - refresh and reconnect
new_token = refresh_token()
# Reconnect with new token
elif ctx.code == 109:
# Token invalid - re-authenticate
pass
Next Steps
- Authentication - Full authentication flow details
- RPC Commands - Complete RPC method reference
- Channels - All available subscription channels