Skip to main content

Quick Start: Real-Time Market Data

This guide shows how to use the Cadenza WebSocket API to stream real-time market data (order books, trades).

Workflow Overview

Prerequisites

pip install cadenza-client centrifuge-python supabase

Complete Example

import asyncio
from centrifuge import Client, ClientEventHandler, SubscriptionEventHandler
from supabase import create_client

from cadenza_client.models.rpc_method import RpcMethod
from cadenza_client.models.rpc_create_subscription_request import RpcCreateSubscriptionRequest
from cadenza_client.models.rpc_get_order_book_params import RpcGetOrderBookParams
from cadenza_client.models.rpc_order_book import RpcOrderBook
from cadenza_client.models.subscription_type import SubscriptionType
from cadenza_client.models.venue import Venue

# Configuration
SUPABASE_URL = "https://your-project.supabase.co"
SUPABASE_KEY = "your-anon-key"
WS_URL = "wss://cadenza-ws-uat.algo724.com/connection/websocket"


# Step 1: Authenticate with Supabase
def get_access_token(email: str, password: str) -> str:
"""Login to Supabase and get JWT access token"""
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
response = supabase.auth.sign_in_with_password({
"email": email,
"password": password
})
return response.session.access_token


# Step 2: Connection event handlers
class ConnectionHandler(ClientEventHandler):
def on_connected(self, ctx):
print(f"Connected! Client ID: {ctx.client}")

def on_disconnected(self, ctx):
print(f"Disconnected: {ctx.reason} (code: {ctx.code})")
# Handle token expiry
if ctx.code == 3500:
print("Token expired - refresh and reconnect")

def on_error(self, ctx):
print(f"Error: {ctx.error}")


# Step 3: Subscription event handler for order book updates
class OrderBookHandler(SubscriptionEventHandler):
def on_subscribed(self, ctx):
print(f"Subscribed to channel")

def on_publication(self, ctx):
"""Handle incoming order book data"""
# Parse raw dict to SDK model
orderbook = RpcOrderBook.from_dict(ctx.data)

print(f"\n--- {orderbook.venue}:{orderbook.symbol} ---")
print(f"Timestamp: {orderbook.timestamp}")

# Display top 5 bids and asks
print("BIDS ASKS")
for i in range(min(5, len(orderbook.bids or []), len(orderbook.asks or []))):
bid = orderbook.bids[i] if orderbook.bids and i < len(orderbook.bids) else None
ask = orderbook.asks[i] if orderbook.asks and i < len(orderbook.asks) else None
bid_str = f"{bid.price:>10} x {bid.quantity:<8}" if bid else " " * 20
ask_str = f"{ask.price:>10} x {ask.quantity:<8}" if ask else ""
print(f"{bid_str} | {ask_str}")


async def main():
# Get authentication token
token = get_access_token("your@email.com", "your-password")

# Create WebSocket client
client = Client(WS_URL, token=token, events=ConnectionHandler())

# Connect to WebSocket
await client.connect()
print("Connecting...")

# Wait for connection
await asyncio.sleep(1)

# --- Option A: One-time order book request ---
params = RpcGetOrderBookParams(
instrument_id="BINANCE:BTC/USDT",
depth=10
)
result = await client.rpc(
RpcMethod.QUERY_DOT_ORDERBOOK_DOT_GET.value,
params.to_dict()
)
print(f"Order book snapshot: {result.data}")

# --- Option B: Subscribe to real-time updates ---
venue = "BINANCE"
symbol = "BTC/USDT"

# Step 4a: Create server-side subscription (tells Cadenza to start streaming)
request = RpcCreateSubscriptionRequest(
venue=Venue(venue),
instruments=[symbol],
subscriptionType=SubscriptionType.MARKET_DOT_SUBSCRIPTION_DOT_ORDERBOOK
)
await client.rpc(
RpcMethod.COMMAND_DOT_SUBSCRIPTION_DOT_CREATE.value,
request.to_dict()
)
print(f"Created server subscription for {venue}:{symbol}")

# Step 4b: Subscribe to Centrifugo channel (receive the data)
channel = f"market:orderbook:{venue}:{symbol}"
sub = client.new_subscription(channel, events=OrderBookHandler())
await sub.subscribe()
print(f"Subscribed to channel: {channel}")

# Keep running to receive updates
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
# Cleanup
await sub.unsubscribe()
client.remove_subscription(sub)
await client.disconnect()


if __name__ == "__main__":
asyncio.run(main())

Key Concepts

Two-Step Subscription

Subscribing to real-time data requires two steps:

  1. Create server-side subscription (RPC call) - Tells Cadenza backend to start streaming data
  2. Subscribe to Centrifugo channel - Connects your client to receive the stream
# Step 1: Server-side subscription
await client.rpc("command.subscription.create", {
"venue": "BINANCE",
"instruments": ["BTC/USDT"],
"subscriptionType": "market.subscription.orderbook"
})

# Step 2: Client-side channel subscription
channel = "market:orderbook:BINANCE:BTC/USDT"
sub = client.new_subscription(channel, events=handler)
await sub.subscribe()

Channel Naming

Channels follow the pattern: {domain}:{resource}:{venue}:{symbol}

Channel PatternExampleData Type
market:orderbook:{venue}:{symbol}market:orderbook:BINANCE:BTC/USDTRpcOrderBook
market:trade:{venue}:{symbol}market:trade:BINANCE:ETH/USDTTrade data

SDK Models

The Cadenza SDK provides type-safe models for all requests and responses:

# Request params
from cadenza_client.models.rpc_get_order_book_params import RpcGetOrderBookParams
from cadenza_client.models.rpc_create_subscription_request import RpcCreateSubscriptionRequest

# Response data
from cadenza_client.models.rpc_order_book import RpcOrderBook

# Enums
from cadenza_client.models.venue import Venue
from cadenza_client.models.subscription_type import SubscriptionType
from cadenza_client.models.rpc_method import RpcMethod

RPC Methods

Use RpcMethod enum for type-safe method names:

from cadenza_client.models.rpc_method import RpcMethod

# Query methods (read data)
RpcMethod.QUERY_DOT_ORDERBOOK_DOT_GET.value # "query.orderbook.get"
RpcMethod.QUERY_DOT_INSTRUMENT_DOT_LIST.value # "query.instrument.list"
RpcMethod.QUERY_DOT_VENUE_DOT_LIST.value # "query.venue.list"

# Command methods (create/modify)
RpcMethod.COMMAND_DOT_SUBSCRIPTION_DOT_CREATE.value # "command.subscription.create"

Error Handling

Handle connection and authentication errors:

class ConnectionHandler(ClientEventHandler):
def on_disconnected(self, ctx):
if ctx.code == 3500:
# Token expired - refresh and reconnect
new_token = refresh_token()
# Reconnect with new token
elif ctx.code == 109:
# Token invalid - re-authenticate
pass

Next Steps