Comunicacion en Tiempo Real
Tabla de Contenidos
- Vision general
- Hibernatable WebSocket API
- Estructura del Durable Object
- Gestion de conexiones
- Patrones de broadcasting
- Batching de mensajes para actualizaciones de alta frecuencia
- Estrategia de reconexion
- Hook React para WebSocket
- Integracion con Module Federation
- Monitoreo y alertas de conteo de conexiones
- Precios y limites
- Referencias
Vision general
La plataforma ofrece funcionalidades en tiempo real a traves de los micro frontends, incluyendo:
- Actualizaciones en vivo -- metricas de dashboard, elementos del feed e indicadores de estado que reflejan cambios del servidor en milisegundos.
- Edicion colaborativa -- varios usuarios pueden ver y modificar el mismo documento o configuracion de forma simultanea, con transformaciones operacionales o resolucion de conflictos basada en CRDT.
- Notificaciones -- alertas push entregadas instantaneamente a los clientes conectados sin necesidad de polling.
- Presencia -- visibilidad de quien esta en linea, que pagina esta viendo y si esta escribiendo activamente o inactivo.
Arquitectura
El flujo de datos para todas las funcionalidades en tiempo real sigue un camino consistente:
- El navegador abre una conexion WebSocket hacia un endpoint de Cloudflare Worker.
- El Worker autentica la solicitud, resuelve el ID del Durable Object correcto y reenvía la solicitud de upgrade.
- El Durable Object acepta el WebSocket usando la Hibernatable WebSocket API, adjunta metadatos y gestiona todo el enrutamiento de mensajes.
- El Hibernatable WebSocket mantiene la conexion activa incluso cuando el Durable Object no tiene trabajo pendiente, reduciendo drasticamente el coste.
Por que Durable Objects en lugar de servidores WebSocket tradicionales
| Aspecto | Servidor tradicional | Durable Objects |
|---|---|---|
| Despliegue | Provisionar y gestionar VMs o contenedores | Nativo en el edge, sin infraestructura |
| Escalado | Escalado horizontal manual + sticky sessions | Escalado automatico por objeto a traves de la red de Cloudflare |
| Gestion del servidor | Parcheo de SO, balanceadores de carga, health checks | Totalmente gestionado por Cloudflare |
| Coste en inactividad | Pagar por servidores siempre encendidos | La hibernacion elimina los cargos de CPU en periodos de inactividad |
| Latencia global | Region unica o replicacion multi-region | Se ejecuta en el edge mas cercano al usuario coordinador |
| Consistencia de estado | Bases de datos externas + invalidacion de cache | Single-threaded por objeto con almacenamiento co-ubicado |
Los Durable Objects eliminan la carga operativa de mantener infraestructura WebSocket, proporcionando garantías solidas de consistencia single-writer por limite de coordinacion.
Hibernatable WebSocket API
Que es la hibernacion
Los Durable Objects soportan un mecanismo de hibernacion que cambia fundamentalmente el modelo de costes para conexiones WebSocket de larga duracion:
- Durante el procesamiento activo, el Durable Object funciona normalmente, ejecuta JavaScript y se factura por tiempo de CPU.
- Cuando todas las conexiones WebSocket estan inactivas (sin mensajes entrantes, sin alarmas pendientes), el Durable Object puede entrar en hibernacion.
- Durante la hibernacion:
- No se generan cargos de CPU.
- Todas las conexiones WebSocket permanecen abiertas y funcionales desde la perspectiva del cliente.
- El estado en memoria se descarta. Cualquier estado que deba sobrevivir a la hibernacion necesita almacenarse via
this.ctx.storageo serializarse en los attachments del WebSocket. - El isolate JavaScript del Durable Object se desaloja de memoria.
- El Durable Object despierta cuando:
- Llega un mensaje WebSocket entrante (dispara
webSocketMessage()). - Se activa una alarma previamente programada (dispara
alarm()). - Llega una nueva solicitud HTTP (dispara
fetch()).
- Llega un mensaje WebSocket entrante (dispara
Facturacion durante la hibernacion: Mientras el Durable Object esta hibernando, no se generan cargos de CPU ni de duracion. Solo se factura por almacenamiento y por mensajes WebSocket entrantes que despierten el DO. Esto hace economicamente viable mantener miles de conexiones WebSocket inactivas para canales de presencia y notificaciones.
Patron de la API
La Hibernatable WebSocket API usa event handlers en la clase del Durable Object en lugar de adjuntar listeners a objetos WebSocket individuales. Esto es lo que permite a Cloudflare desalojar el isolate de memoria mientras mantiene las conexiones abiertas.
import { DurableObject } from 'cloudflare:workers';
interface Env {
CHAT_ROOMS: DurableObjectNamespace<ChatRoom>;
}
interface WebSocketAttachment {
userId: string;
displayName: string;
role: 'admin' | 'member' | 'viewer';
joinedAt: number;
}
interface IncomingMessage {
type: 'chat' | 'typing' | 'presence' | 'ping';
payload: Record<string, unknown>;
}
interface OutgoingMessage {
type: string;
payload: Record<string, unknown>;
sender: string;
timestamp: number;
}
export class ChatRoom extends DurableObject<Env> {
private messageHistory: OutgoingMessage[] = [];
/**
* Handles the initial HTTP request to upgrade to a WebSocket connection.
* The Worker forwards the upgrade request here after authentication.
*/
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
// Verify this is a WebSocket upgrade request
if (request.headers.get('Upgrade') !== 'websocket') {
return new Response('Expected WebSocket upgrade', { status: 426 });
}
// Extract authenticated user info (set by the Worker after JWT validation)
const userId = url.searchParams.get('userId');
const displayName = url.searchParams.get('displayName') ?? 'Anonymous';
const role = (url.searchParams.get('role') as WebSocketAttachment['role']) ?? 'member';
if (!userId) {
return new Response('Missing userId', { status: 400 });
}
// Create the WebSocket pair
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
// Accept the server-side WebSocket using the Hibernatable API.
// Tags enable efficient filtered broadcasting later.
this.ctx.acceptWebSocket(server, [`user:${userId}`, `role:${role}`]);
// Attach metadata that survives hibernation
const attachment: WebSocketAttachment = {
userId,
displayName,
role,
joinedAt: Date.now(),
};
server.serializeAttachment(attachment);
// Notify existing participants of the new connection
this.broadcast({
type: 'user-joined',
payload: { userId, displayName },
sender: 'system',
timestamp: Date.now(),
}, server);
// Send recent message history to the new client
const recentMessages = await this.getRecentMessages(50);
server.send(JSON.stringify({
type: 'history',
payload: { messages: recentMessages },
sender: 'system',
timestamp: Date.now(),
}));
// Return the client-side WebSocket to the Worker, which returns it to the browser
return new Response(null, { status: 101, webSocket: client });
}
/**
* Called when any connected WebSocket sends a message.
* This is the core handler that wakes the DO from hibernation.
*/
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
// Handle binary messages if needed
if (message instanceof ArrayBuffer) {
console.warn(`Binary message from ${attachment.userId}, ignoring`);
return;
}
let data: IncomingMessage;
try {
data = JSON.parse(message);
} catch {
ws.send(JSON.stringify({ type: 'error', payload: { message: 'Invalid JSON' } }));
return;
}
switch (data.type) {
case 'chat': {
const outgoing: OutgoingMessage = {
type: 'chat',
payload: {
...data.payload,
messageId: crypto.randomUUID(),
},
sender: attachment.userId,
timestamp: Date.now(),
};
// Persist the message
await this.storeMessage(outgoing);
// Broadcast to all connected clients including the sender (for confirmation)
this.broadcast(outgoing);
break;
}
case 'typing': {
// Typing indicators are transient -- broadcast but do not persist
this.broadcast({
type: 'typing',
payload: {
userId: attachment.userId,
displayName: attachment.displayName,
isTyping: data.payload.isTyping ?? false,
},
sender: attachment.userId,
timestamp: Date.now(),
}, ws); // Exclude the sender
break;
}
case 'presence': {
// Update presence state in storage
await this.ctx.storage.put(`presence:${attachment.userId}`, {
status: data.payload.status,
lastSeen: Date.now(),
});
this.broadcast({
type: 'presence',
payload: {
userId: attachment.userId,
status: data.payload.status,
},
sender: 'system',
timestamp: Date.now(),
});
break;
}
case 'ping': {
ws.send(JSON.stringify({ type: 'pong', payload: {}, sender: 'system', timestamp: Date.now() }));
break;
}
default: {
ws.send(JSON.stringify({
type: 'error',
payload: { message: `Unknown message type: ${data.type}` },
sender: 'system',
timestamp: Date.now(),
}));
}
}
}
/**
* Called when a WebSocket connection closes cleanly or abnormally.
*/
async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
// Close the server side of the connection
ws.close(code, reason);
// Remove presence data
await this.ctx.storage.delete(`presence:${attachment.userId}`);
// Notify remaining participants
this.broadcast({
type: 'user-left',
payload: {
userId: attachment.userId,
displayName: attachment.displayName,
},
sender: 'system',
timestamp: Date.now(),
});
// If no connections remain, schedule cleanup
if (this.ctx.getWebSockets().length === 0) {
await this.ctx.storage.setAlarm(Date.now() + 60_000); // 1 minute
}
}
/**
* Called when a WebSocket connection encounters an error.
*/
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
const attachment = ws.deserializeAttachment() as WebSocketAttachment | null;
const userId = attachment?.userId ?? 'unknown';
console.error(`WebSocket error for user ${userId}:`, error);
// Attempt to close the errored connection
try {
ws.close(1011, 'Internal error');
} catch {
// Connection may already be closed
}
}
/**
* Alarm handler for scheduled tasks (cleanup, state compaction, etc.).
*/
async alarm(): Promise<void> {
const connections = this.ctx.getWebSockets();
if (connections.length === 0) {
// No connections for the full cleanup window -- purge old messages
const keys = await this.ctx.storage.list({ prefix: 'msg:' });
const cutoff = Date.now() - 24 * 60 * 60 * 1000; // 24 hours
const toDelete: string[] = [];
for (const [key, value] of keys) {
const msg = value as OutgoingMessage;
if (msg.timestamp < cutoff) {
toDelete.push(key);
}
}
if (toDelete.length > 0) {
await this.ctx.storage.delete(toDelete);
}
}
}
// --- Private helpers ---
private broadcast(message: OutgoingMessage, exclude?: WebSocket): void {
const raw = JSON.stringify(message);
for (const ws of this.ctx.getWebSockets()) {
if (ws !== exclude) {
try {
ws.send(raw);
} catch {
// Connection may be closing; webSocketClose will handle cleanup
}
}
}
}
private async storeMessage(message: OutgoingMessage): Promise<void> {
const key = `msg:${message.timestamp}:${(message.payload as { messageId: string }).messageId}`;
await this.ctx.storage.put(key, message);
// Also keep in memory for quick access (reset on hibernation)
this.messageHistory.push(message);
if (this.messageHistory.length > 200) {
this.messageHistory = this.messageHistory.slice(-100);
}
}
private async getRecentMessages(count: number): Promise<OutgoingMessage[]> {
// Try in-memory cache first (empty after hibernation)
if (this.messageHistory.length > 0) {
return this.messageHistory.slice(-count);
}
// Fall back to storage
const entries = await this.ctx.storage.list<OutgoingMessage>({
prefix: 'msg:',
reverse: true,
limit: count,
});
const messages = [...entries.values()].reverse();
this.messageHistory = messages;
return messages;
}
}
El Worker correspondiente que enruta las solicitudes al Durable Object:
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Example: /ws/room/:roomId
const match = url.pathname.match(/^\/ws\/room\/(.+)$/);
if (!match) {
return new Response('Not found', { status: 404 });
}
const roomId = match[1];
// Authenticate the request (JWT, cookie, etc.)
const auth = await authenticateRequest(request);
if (!auth) {
return new Response('Unauthorized', { status: 401 });
}
// Resolve the Durable Object by a deterministic name
const id = env.CHAT_ROOMS.idFromName(`room:${roomId}`);
const stub = env.CHAT_ROOMS.get(id);
// Forward the upgrade request with auth context
const doUrl = new URL(request.url);
doUrl.searchParams.set('userId', auth.userId);
doUrl.searchParams.set('displayName', auth.displayName);
doUrl.searchParams.set('role', auth.role);
return stub.fetch(new Request(doUrl.toString(), request));
},
} satisfies ExportedHandler<Env>;
Estructura del Durable Object
Un DO por limite de coordinacion
Cada instancia de Durable Object representa un unico limite de coordinacion. Todos los participantes dentro de ese limite se conectan a la misma instancia de DO, que proporciona consistencia single-threaded:
| Caso de uso | Limite del DO | ID de ejemplo |
|---|---|---|
| Chat room | Un DO por sala | room:engineering-general |
| Edicion de documentos | Un DO por documento | doc:invoice-template-42 |
| Seguimiento de presencia | Un DO por organizacion | presence:org-acme-corp |
| Notificaciones de usuario | Un DO por usuario | notifications:user-abc123 |
| Actualizaciones de dashboard | Un DO por dashboard | dashboard:sales-overview |
Convencion de nombres
Usa un esquema de nombres basado en prefijos consistente para los IDs de Durable Object:
// In the Worker, resolve the DO ID by name
function getDurableObjectId(
namespace: DurableObjectNamespace,
type: string,
identifier: string,
): DurableObjectId {
return namespace.idFromName(`${type}:${identifier}`);
}
// Examples:
const roomId = getDurableObjectId(env.CHAT_ROOMS, 'room', roomSlug);
const presenceId = getDurableObjectId(env.PRESENCE, 'presence', orgId);
const notifId = getDurableObjectId(env.NOTIFICATIONS, 'notifications', userId);
Enrutamiento de mensajes
Dentro del Durable Object, enruta los mensajes entrantes a handlers especializados segun el tipo de mensaje. Esto mantiene limpio el handler webSocketMessage y facilita la adicion de nuevos tipos de mensaje:
type MessageHandler = (ws: WebSocket, payload: Record<string, unknown>, attachment: WebSocketAttachment) => Promise<void>;
private handlers: Record<string, MessageHandler> = {
'chat': this.handleChat.bind(this),
'typing': this.handleTyping.bind(this),
'presence': this.handlePresence.bind(this),
'cursor-move': this.handleCursorMove.bind(this),
'selection-change': this.handleSelectionChange.bind(this),
'ping': this.handlePing.bind(this),
};
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
const data = JSON.parse(message as string) as IncomingMessage;
const handler = this.handlers[data.type];
if (handler) {
await handler(ws, data.payload, attachment);
} else {
ws.send(JSON.stringify({ type: 'error', payload: { message: `Unknown type: ${data.type}` } }));
}
}
Gestion de estado
Los Durable Objects ofrecen dos niveles de estado:
Estado persistente via this.ctx.storage -- sobrevive a la hibernacion, reinicios y redespliegues:
// Key-value storage operations
await this.ctx.storage.put('room:metadata', { name: 'General', createdAt: Date.now() });
const metadata = await this.ctx.storage.get<RoomMetadata>('room:metadata');
// Batch operations for efficiency (counts as a single storage operation)
await this.ctx.storage.put({
'user:alice:presence': { status: 'online', lastSeen: Date.now() },
'user:bob:presence': { status: 'away', lastSeen: Date.now() },
});
// List with prefix scan
const allPresence = await this.ctx.storage.list({ prefix: 'user:', limit: 100 });
Estado transitorio en memoria via propiedades de clase -- rapido pero se pierde en la hibernacion:
export class ChatRoom extends DurableObject<Env> {
// In-memory caches (rebuilt on wake from hibernation)
private messageCache: OutgoingMessage[] = [];
private typingUsers: Map<string, number> = new Map();
private connectionCount = 0;
// These reset to their initial values after hibernation.
// Rebuild them in the first handler call after waking up.
}
Usa almacenamiento persistente para datos que no deben perderse (mensajes, estado de presencia, configuracion). Usa estado en memoria para caches y datos transitorios que cambian frecuentemente (indicadores de escritura, posiciones de cursor) que pueden derivarse del estado persistente o reconstruirse a partir de los clientes conectados.
Gestion de conexiones
serializeAttachment / deserializeAttachment
Cada WebSocket aceptado via la Hibernatable API puede llevar un attachment -- un objeto serializable a JSON que persiste entre ciclos de hibernacion. Este es el mecanismo principal para identificar conexiones.
interface WebSocketAttachment {
userId: string;
displayName: string;
role: 'admin' | 'member' | 'viewer';
orgId: string;
joinedAt: number;
lastMessageId?: string;
}
// On connection accept:
async fetch(request: Request): Promise<Response> {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
this.ctx.acceptWebSocket(server);
// Serialize metadata onto the WebSocket
const attachment: WebSocketAttachment = {
userId: 'user-abc',
displayName: 'Alice',
role: 'admin',
orgId: 'org-acme',
joinedAt: Date.now(),
};
server.serializeAttachment(attachment);
return new Response(null, { status: 101, webSocket: client });
}
// In any message handler -- even after hibernation:
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
console.log(`Message from ${attachment.displayName} (${attachment.userId})`);
// Update the attachment if needed
attachment.lastMessageId = 'msg-xyz';
ws.serializeAttachment(attachment);
}
Propiedades clave de los attachments:
- Sobreviven a la hibernacion: el attachment se serializa a disco cuando el DO hiberna y se restaura al despertar.
- Por conexion: cada WebSocket tiene su propio attachment independiente.
- Mutables: llama a
serializeAttachment()de nuevo para actualizar los datos almacenados. - Limite de tamano: manten los attachments pequenos (por debajo de unos pocos KB) ya que se serializan en cada ciclo de hibernacion.
Ciclo de vida de la conexion
El ciclo de vida completo de una conexion WebSocket a traves del sistema:
1. Client initiates connection
Browser: new WebSocket('wss://api.example.com/ws/room/general')
2. Worker authenticates and routes
Worker: validate JWT → resolve DO ID → forward upgrade request
3. Durable Object accepts connection
DO: acceptWebSocket(server, tags) → serializeAttachment(metadata)
→ broadcast user-joined → send message history
4. Bidirectional communication
Client → webSocketMessage() → process → broadcast/respond
DO → broadcast/targeted send → Client
5. Hibernation (when idle)
All connections quiet → isolate evicted → connections stay open
Incoming message → isolate restored → webSocketMessage() fires
Attachments and tags are preserved
6. Client disconnects
Browser closes tab / network drops / explicit close
→ webSocketClose(ws, code, reason) fires
→ cleanup presence → broadcast user-left
→ if no connections remain, schedule alarm for cleanup
7. Scheduled cleanup
alarm() fires → purge stale data → compact storage
Patrones de broadcasting
Broadcast a todos
El patron mas comun: enviar un mensaje a cada WebSocket conectado en el Durable Object.
/**
* Broadcast a message to all connected WebSockets.
* Optionally exclude a specific connection (e.g., the sender).
*/
private broadcast(
message: OutgoingMessage,
options?: { exclude?: WebSocket; onlyTo?: string[] },
): void {
const raw = JSON.stringify(message);
const sockets = this.ctx.getWebSockets();
for (const ws of sockets) {
if (options?.exclude && ws === options.exclude) {
continue;
}
if (options?.onlyTo) {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
if (!options.onlyTo.includes(attachment.userId)) {
continue;
}
}
try {
ws.send(raw);
} catch {
// The connection may be in a closing state.
// webSocketClose() will handle cleanup.
}
}
}
// Usage examples:
// Broadcast to everyone
this.broadcast({ type: 'chat', payload: { text: 'Hello' }, sender: 'alice', timestamp: Date.now() });
// Broadcast to everyone except the sender
this.broadcast(message, { exclude: senderWs });
// Send to specific users only
this.broadcast(notification, { onlyTo: ['user-bob', 'user-carol'] });
Broadcast con tags
Los tags proporcionan una forma mas eficiente de filtrar conexiones WebSocket sin deserializar cada attachment. Los tags se establecen al aceptar el WebSocket y pueden usarse para obtener un subconjunto filtrado.
// Accept a WebSocket with tags
async fetch(request: Request): Promise<Response> {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
const userId = 'user-abc';
const role = 'admin';
const teamId = 'team-frontend';
// Tags allow efficient filtering via getWebSockets(tag)
this.ctx.acceptWebSocket(server, [
`user:${userId}`,
`role:${role}`,
`team:${teamId}`,
]);
server.serializeAttachment({ userId, role, teamId });
return new Response(null, { status: 101, webSocket: client });
}
// Broadcast only to admins
private broadcastToAdmins(message: OutgoingMessage): void {
const raw = JSON.stringify(message);
const adminSockets = this.ctx.getWebSockets('role:admin');
for (const ws of adminSockets) {
try {
ws.send(raw);
} catch {
// handled by webSocketClose
}
}
}
// Send a direct message to a specific user
private sendToUser(userId: string, message: OutgoingMessage): void {
const raw = JSON.stringify(message);
const userSockets = this.ctx.getWebSockets(`user:${userId}`);
for (const ws of userSockets) {
try {
ws.send(raw);
} catch {
// handled by webSocketClose
}
}
}
// Broadcast to a team channel
private broadcastToTeam(teamId: string, message: OutgoingMessage): void {
const raw = JSON.stringify(message);
const teamSockets = this.ctx.getWebSockets(`team:${teamId}`);
for (const ws of teamSockets) {
try {
ws.send(raw);
} catch {
// handled by webSocketClose
}
}
}
Limites y directrices para tags:
- Cada WebSocket puede tener hasta 10 tags.
- Cada tag puede tener hasta 256 bytes.
- Los tags son inmutables despues de
acceptWebSocket()-- para cambiar tags, el cliente debe reconectarse. - Usa tags para agrupaciones gruesas y estables (rol, equipo, sala). Usa metadatos del attachment para filtrado detallado y mutable.
Batching de mensajes para actualizaciones de alta frecuencia
Problema
Funcionalidades como seguimiento colaborativo de cursores, indicadores de escritura en vivo o metricas de dashboard en tiempo real pueden generar decenas de actualizaciones por segundo por usuario. Enviar cada actualizacion como un mensaje WebSocket individual genera overhead innecesario:
- Cada llamada a
send()incurre en coste de serializacion. - Los clientes receptores procesan muchos mensajes pequenos en lugar de menos actualizaciones agrupadas.
- Overhead de red por el framing de muchos mensajes pequenos.
Solucion
Acumula los mensajes entrantes de alta frecuencia y envialos en bloque a intervalos cortos usando alarmas del Durable Object:
import { DurableObject } from 'cloudflare:workers';
interface CursorUpdate {
userId: string;
x: number;
y: number;
timestamp: number;
}
interface BatchedMessage {
type: 'cursor-batch';
payload: {
cursors: CursorUpdate[];
};
timestamp: number;
}
export class CollaborativeDocument extends DurableObject<Env> {
/**
* Buffer for high-frequency updates.
* Maps userId to their latest cursor position (only the latest matters).
*/
private cursorBuffer: Map<string, CursorUpdate> = new Map();
/**
* Whether a flush alarm is already scheduled.
*/
private flushScheduled = false;
/**
* Flush interval in milliseconds.
* 50ms gives a good balance between latency and batching efficiency.
*/
private static readonly FLUSH_INTERVAL_MS = 50;
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const data = JSON.parse(message as string);
switch (data.type) {
case 'cursor-move': {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
// Buffer the cursor update (overwrite previous position for this user)
this.cursorBuffer.set(attachment.userId, {
userId: attachment.userId,
x: data.payload.x,
y: data.payload.y,
timestamp: Date.now(),
});
// Schedule a flush if one is not already pending
await this.scheduleFlush();
break;
}
case 'document-edit': {
// Critical updates bypass batching and are sent immediately
this.broadcast({
type: 'document-edit',
payload: data.payload,
sender: (ws.deserializeAttachment() as WebSocketAttachment).userId,
timestamp: Date.now(),
}, ws);
break;
}
}
}
async alarm(): Promise<void> {
// Flush the cursor buffer
if (this.cursorBuffer.size > 0) {
const cursors = Array.from(this.cursorBuffer.values());
this.cursorBuffer.clear();
this.flushScheduled = false;
const batchedMessage: BatchedMessage = {
type: 'cursor-batch',
payload: { cursors },
timestamp: Date.now(),
};
const raw = JSON.stringify(batchedMessage);
for (const ws of this.ctx.getWebSockets()) {
const attachment = ws.deserializeAttachment() as WebSocketAttachment;
// Filter out the user's own cursor and skip if nothing remains
const relevantCursors = cursors.filter((c) => c.userId !== attachment.userId);
if (relevantCursors.length > 0) {
const filtered = JSON.stringify({
...batchedMessage,
payload: { cursors: relevantCursors },
});
try {
ws.send(filtered);
} catch {
// handled by webSocketClose
}
}
}
}
// Reschedule if new data arrived during the flush
if (this.cursorBuffer.size > 0) {
await this.scheduleFlush();
}
}
private async scheduleFlush(): Promise<void> {
if (!this.flushScheduled) {
this.flushScheduled = true;
const currentAlarm = await this.ctx.storage.getAlarm();
if (currentAlarm === null) {
await this.ctx.storage.setAlarm(
Date.now() + CollaborativeDocument.FLUSH_INTERVAL_MS,
);
}
}
}
private broadcast(message: OutgoingMessage, exclude?: WebSocket): void {
const raw = JSON.stringify(message);
for (const ws of this.ctx.getWebSockets()) {
if (ws !== exclude) {
try { ws.send(raw); } catch { /* handled by webSocketClose */ }
}
}
}
}
Directrices de batching
| Tipo de actualizacion | Intervalo recomendado | Estrategia |
|---|---|---|
| Posiciones de cursor | 50ms | Conservar solo la ultima por usuario |
| Indicadores de escritura | 100ms | Deduplicar por usuario |
| Metricas de dashboard | 200-500ms | Agregar/promediar valores |
| Heartbeats de presencia | 5.000-10.000ms | Agrupar por usuario |
| Mensajes de chat | Sin batching | Enviar inmediatamente |
| Ediciones de documento | Sin batching | Enviar inmediatamente (el orden importa) |
El principio clave: agrupa datos observacionales, envia datos operacionales de inmediato. Los usuarios toleran 50-100ms de latencia en posiciones de cursor, pero los mensajes de chat y las ediciones de documento deben entregarse lo mas rapido posible.
Estrategia de reconexion
Por que la reconexion es obligatoria
Las conexiones WebSocket se van a caer. No hay forma de evitarlo. Cada cliente debe implementar logica de reconexion porque:
- Despliegues de Cloudflare: Cuando se despliega una nueva version del Worker o del Durable Object, Cloudflare termina todas las conexiones WebSocket existentes hacia ese DO. Esto sucede en cada
wrangler deploy. - Interrupciones de red: Cambios de WiFi, traspasos celulares, reconexiones de VPN y cortes del ISP.
- Backgrounding en movil: iOS y Android cierran agresivamente las conexiones WebSocket cuando las apps pasan a segundo plano.
- Timeouts por inactividad: Aunque los Hibernatable WebSockets mantienen vivo el lado del servidor, los clientes o proxies intermediarios pueden cerrar conexiones inactivas.
- Desalojo del DO: En casos raros, Cloudflare puede necesitar migrar un Durable Object a otra maquina.
Tratar la reconexion como un caso de error excepcional en lugar de una condicion de operacion normal es la fuente mas comun de bugs en funcionalidades de tiempo real.
Reconexion en el cliente
Implementa backoff exponencial con jitter para evitar tormentas de reconexion tipo thundering herd:
interface ReconnectConfig {
/** Initial delay before first reconnection attempt (ms) */
initialDelay: number;
/** Maximum delay between reconnection attempts (ms) */
maxDelay: number;
/** Multiplier applied to delay after each failed attempt */
backoffMultiplier: number;
/** Maximum number of reconnection attempts before giving up */
maxAttempts: number;
/** Maximum number of messages to queue while disconnected. When exceeded, oldest messages are dropped. */
maxQueueSize: number;
}
const DEFAULT_RECONNECT_CONFIG: ReconnectConfig = {
initialDelay: 500,
maxDelay: 30_000,
backoffMultiplier: 2,
maxAttempts: 20,
maxQueueSize: 100,
};
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private attempt = 0;
private lastMessageId: string | null = null;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private messageQueue: unknown[] = [];
private config: ReconnectConfig;
constructor(
private url: string,
config?: Partial<ReconnectConfig>,
) {
this.config = { ...DEFAULT_RECONNECT_CONFIG, ...config };
this.connect();
}
private connect(): void {
// Append lastMessageId for server-side replay on reconnect
const connectUrl = this.lastMessageId
? `${this.url}${this.url.includes('?') ? '&' : '?'}lastMessageId=${this.lastMessageId}`
: this.url;
this.ws = new WebSocket(connectUrl);
this.ws.addEventListener('open', () => {
console.log('WebSocket connected');
this.attempt = 0; // Reset backoff on successful connection
this.onStatusChange?.('connected');
this.flushQueue();
});
this.ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
// Track the latest message ID for reconnection replay
if (data.payload?.messageId) {
this.lastMessageId = data.payload.messageId;
}
this.onMessage?.(data);
});
this.ws.addEventListener('close', (event) => {
console.log(`WebSocket closed: ${event.code} ${event.reason}`);
this.ws = null;
this.scheduleReconnect();
});
this.ws.addEventListener('error', () => {
// The close event always follows an error event; reconnection is handled there.
console.error('WebSocket error');
});
}
private scheduleReconnect(): void {
if (this.attempt >= this.config.maxAttempts) {
console.error(`Giving up after ${this.attempt} reconnection attempts`);
this.onStatusChange?.('failed');
return;
}
this.onStatusChange?.('reconnecting');
// Exponential backoff with ceiling and full jitter to prevent thundering herd.
// The ceiling (maxDelay) caps the exponential growth.
// Full jitter randomizes the actual delay between [0, capped_delay] so that
// clients that reconnect simultaneously spread their retries over time.
const capped = Math.min(
this.config.initialDelay * Math.pow(this.config.backoffMultiplier, this.attempt),
this.config.maxDelay,
);
const delay = Math.random() * capped;
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${this.attempt + 1}/${this.config.maxAttempts})`);
this.reconnectTimer = setTimeout(() => {
this.attempt++;
this.connect();
}, delay);
}
send(data: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
// Enforce max queue size by dropping oldest messages when the limit is reached
if (this.messageQueue.length >= this.config.maxQueueSize) {
const dropped = this.messageQueue.shift();
console.warn(
`Message queue full (${this.config.maxQueueSize}). Dropping oldest message:`,
dropped,
);
}
this.messageQueue.push(data);
}
}
close(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.ws?.close(1000, 'Client closing');
}
// Callbacks
onMessage?: (data: unknown) => void;
onStatusChange?: (status: 'connecting' | 'connected' | 'reconnecting' | 'failed') => void;
private flushQueue(): void {
while (this.messageQueue.length > 0 && this.ws?.readyState === WebSocket.OPEN) {
const msg = this.messageQueue.shift();
this.ws.send(JSON.stringify(msg));
}
}
}
Soporte en el servidor
El Durable Object debe soportar la reconexion almacenando mensajes recientes y reenviando los perdidos:
export class ChatRoom extends DurableObject<Env> {
/**
* Handle WebSocket upgrade with optional reconnection replay.
*/
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
const lastMessageId = url.searchParams.get('lastMessageId');
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
this.ctx.acceptWebSocket(server);
server.serializeAttachment({
userId: url.searchParams.get('userId'),
joinedAt: Date.now(),
});
// If reconnecting, replay missed messages
if (lastMessageId) {
const missed = await this.getMessagesSince(lastMessageId);
if (missed.length > 0) {
server.send(JSON.stringify({
type: 'replay',
payload: {
messages: missed,
fromMessageId: lastMessageId,
},
sender: 'system',
timestamp: Date.now(),
}));
}
} else {
// New connection -- send recent history
const recent = await this.getRecentMessages(50);
server.send(JSON.stringify({
type: 'history',
payload: { messages: recent },
sender: 'system',
timestamp: Date.now(),
}));
}
return new Response(null, { status: 101, webSocket: client });
}
/**
* Retrieve all messages stored after the given message ID.
* Messages are stored with keys like `msg:<timestamp>:<messageId>`
* so a prefix scan starting after the last known message returns
* everything the client missed.
*/
private async getMessagesSince(lastMessageId: string): Promise<OutgoingMessage[]> {
// Find the key for the last known message
const allMessages = await this.ctx.storage.list<OutgoingMessage>({
prefix: 'msg:',
});
let found = false;
const missed: OutgoingMessage[] = [];
for (const [key, message] of allMessages) {
if (found) {
missed.push(message);
}
if ((message.payload as { messageId?: string }).messageId === lastMessageId) {
found = true;
}
}
// If the message was not found (pruned), send recent history instead
if (!found) {
return this.getRecentMessages(50);
}
return missed;
}
private async getRecentMessages(count: number): Promise<OutgoingMessage[]> {
const entries = await this.ctx.storage.list<OutgoingMessage>({
prefix: 'msg:',
reverse: true,
limit: count,
});
return [...entries.values()].reverse();
}
}
Hook React para WebSocket
Un hook React personalizado de calidad produccion para gestionar conexiones WebSocket dentro de micro frontends. Este hook reside en el paquete shared-utils y es consumido por cada MFE que necesita comunicacion en tiempo real.
import { useCallback, useEffect, useRef, useState } from 'react';
// --- Types ---
type WebSocketStatus = 'connecting' | 'connected' | 'disconnected' | 'reconnecting' | 'failed';
interface WebSocketOptions<TIncoming = unknown, TOutgoing = unknown> {
/** Called when a parsed message is received */
onMessage?: (message: TIncoming) => void;
/** Called when the connection status changes */
onStatusChange?: (status: WebSocketStatus) => void;
/** Whether to connect automatically on mount. Defaults to true. */
autoConnect?: boolean;
/** Reconnection configuration */
reconnect?: {
enabled?: boolean;
initialDelay?: number;
maxDelay?: number;
backoffMultiplier?: number;
maxAttempts?: number;
/** Maximum number of messages to queue while disconnected. Oldest messages are dropped when exceeded. Defaults to 100. */
maxQueueSize?: number;
};
/** Protocols to request during the WebSocket handshake */
protocols?: string | string[];
/** Custom message serializer. Defaults to JSON.stringify. */
serialize?: (message: TOutgoing) => string;
/** Custom message deserializer. Defaults to JSON.parse. */
deserialize?: (data: string) => TIncoming;
}
interface WebSocketReturn<TOutgoing = unknown> {
/** Current connection status */
status: WebSocketStatus;
/** Send a typed message. Queues if not connected. */
send: (message: TOutgoing) => void;
/** The most recently received message */
lastMessage: unknown | null;
/** Manually trigger a reconnection */
reconnect: () => void;
/** Manually disconnect (disables auto-reconnect) */
disconnect: () => void;
}
// --- Hook Implementation ---
export function useWebSocket<
TIncoming = unknown,
TOutgoing = unknown,
>(
url: string | null,
options: WebSocketOptions<TIncoming, TOutgoing> = {},
): WebSocketReturn<TOutgoing> {
const {
onMessage,
onStatusChange,
autoConnect = true,
reconnect: reconnectConfig = {},
protocols,
serialize = JSON.stringify,
deserialize = (data: string) => JSON.parse(data) as TIncoming,
} = options;
const {
enabled: reconnectEnabled = true,
initialDelay = 500,
maxDelay = 30_000,
backoffMultiplier = 2,
maxAttempts = 20,
maxQueueSize = 100,
} = reconnectConfig;
// --- State ---
const [status, setStatus] = useState<WebSocketStatus>('disconnected');
const [lastMessage, setLastMessage] = useState<unknown | null>(null);
// --- Refs (stable across renders) ---
const wsRef = useRef<WebSocket | null>(null);
const attemptRef = useRef(0);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const messageQueueRef = useRef<TOutgoing[]>([]);
const intentionalCloseRef = useRef(false);
const mountedRef = useRef(true);
// Keep callback refs fresh without triggering reconnects
const onMessageRef = useRef(onMessage);
onMessageRef.current = onMessage;
const onStatusChangeRef = useRef(onStatusChange);
onStatusChangeRef.current = onStatusChange;
// --- Status updater ---
const updateStatus = useCallback((newStatus: WebSocketStatus) => {
if (!mountedRef.current) return;
setStatus(newStatus);
onStatusChangeRef.current?.(newStatus);
}, []);
// --- Flush queued messages ---
const flushQueue = useCallback(() => {
const ws = wsRef.current;
if (!ws || ws.readyState !== WebSocket.OPEN) return;
while (messageQueueRef.current.length > 0) {
const msg = messageQueueRef.current.shift()!;
try {
ws.send(serialize(msg));
} catch (err) {
console.error('Failed to send queued message:', err);
// Put it back at the front
messageQueueRef.current.unshift(msg);
break;
}
}
}, [serialize]);
// --- Connect ---
const connect = useCallback(() => {
if (!url) return;
// Clean up any existing connection
if (wsRef.current) {
wsRef.current.close(1000, 'Reconnecting');
wsRef.current = null;
}
updateStatus('connecting');
intentionalCloseRef.current = false;
const ws = new WebSocket(url, protocols);
wsRef.current = ws;
ws.addEventListener('open', () => {
if (!mountedRef.current) {
ws.close();
return;
}
attemptRef.current = 0;
updateStatus('connected');
flushQueue();
});
ws.addEventListener('message', (event: MessageEvent) => {
if (!mountedRef.current) return;
try {
const parsed = deserialize(event.data as string);
setLastMessage(parsed);
onMessageRef.current?.(parsed);
} catch (err) {
console.error('Failed to parse WebSocket message:', err);
}
});
ws.addEventListener('close', (event: CloseEvent) => {
wsRef.current = null;
if (!mountedRef.current) return;
if (intentionalCloseRef.current) {
updateStatus('disconnected');
return;
}
// Attempt reconnection
if (reconnectEnabled && attemptRef.current < maxAttempts) {
updateStatus('reconnecting');
// Exponential backoff with ceiling and full jitter to prevent thundering herd
const capped = Math.min(
initialDelay * Math.pow(backoffMultiplier, attemptRef.current),
maxDelay,
);
const delay = Math.random() * capped;
console.log(
`WebSocket closed (${event.code}). Reconnecting in ${Math.round(delay)}ms ` +
`(attempt ${attemptRef.current + 1}/${maxAttempts})`,
);
reconnectTimerRef.current = setTimeout(() => {
if (mountedRef.current) {
attemptRef.current++;
connect();
}
}, delay);
} else if (attemptRef.current >= maxAttempts) {
console.error(`WebSocket reconnection failed after ${maxAttempts} attempts`);
updateStatus('failed');
} else {
updateStatus('disconnected');
}
});
ws.addEventListener('error', () => {
// The close event will fire after this; reconnection is handled there.
console.error('WebSocket error');
});
}, [
url, protocols, reconnectEnabled, initialDelay, maxDelay,
backoffMultiplier, maxAttempts, updateStatus, flushQueue, deserialize,
]);
// --- Send ---
const send = useCallback((message: TOutgoing) => {
const ws = wsRef.current;
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(serialize(message));
} catch (err) {
console.error('Failed to send message:', err);
enqueueMessage(message);
}
} else {
// Queue the message for delivery after reconnection
enqueueMessage(message);
}
function enqueueMessage(msg: TOutgoing): void {
if (messageQueueRef.current.length >= maxQueueSize) {
const dropped = messageQueueRef.current.shift();
console.warn(`Message queue full (${maxQueueSize}). Dropping oldest message:`, dropped);
}
messageQueueRef.current.push(msg);
}
}, [serialize, maxQueueSize]);
// --- Manual reconnect ---
const manualReconnect = useCallback(() => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = null;
}
attemptRef.current = 0;
connect();
}, [connect]);
// --- Disconnect ---
const disconnect = useCallback(() => {
intentionalCloseRef.current = true;
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = null;
}
if (wsRef.current) {
wsRef.current.close(1000, 'Client disconnecting');
wsRef.current = null;
}
updateStatus('disconnected');
}, [updateStatus]);
// --- Auto-connect on mount ---
useEffect(() => {
mountedRef.current = true;
if (autoConnect && url) {
connect();
}
return () => {
mountedRef.current = false;
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}
if (wsRef.current) {
intentionalCloseRef.current = true;
wsRef.current.close(1000, 'Component unmounting');
wsRef.current = null;
}
};
}, [url, autoConnect, connect]);
return {
status,
send,
lastMessage,
reconnect: manualReconnect,
disconnect,
};
}
Ejemplo de uso
import { useWebSocket } from '@platform/shared-utils';
// Define typed message contracts
interface ChatIncoming {
type: 'chat' | 'typing' | 'user-joined' | 'user-left' | 'history' | 'replay';
payload: Record<string, unknown>;
sender: string;
timestamp: number;
}
interface ChatOutgoing {
type: 'chat' | 'typing' | 'ping';
payload: Record<string, unknown>;
}
function ChatPanel({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState<ChatIncoming[]>([]);
const { status, send, reconnect } = useWebSocket<ChatIncoming, ChatOutgoing>(
`wss://api.example.com/ws/room/${roomId}`,
{
onMessage: (msg) => {
switch (msg.type) {
case 'chat':
setMessages((prev) => [...prev, msg]);
break;
case 'history':
case 'replay':
setMessages((prev) => [
...prev,
...(msg.payload.messages as ChatIncoming[]),
]);
break;
}
},
onStatusChange: (newStatus) => {
console.log(`Chat connection: ${newStatus}`);
},
reconnect: {
enabled: true,
maxAttempts: 30,
},
},
);
const sendMessage = (text: string) => {
send({ type: 'chat', payload: { text } });
};
return (
<div>
<ConnectionBadge status={status} onReconnect={reconnect} />
<MessageList messages={messages} />
<MessageInput onSend={sendMessage} disabled={status !== 'connected'} />
</div>
);
}
Integracion con Module Federation
Paquete shared-utils
El hook useWebSocket y las utilidades relacionadas residen en el paquete shared-utils dentro del monorepo. Este paquete se expone via Module Federation para que cada micro frontend pueda importar la misma implementacion sin empaquetarla por separado.
packages/
shared-utils/
src/
hooks/
useWebSocket.ts # The hook described above
usePresence.ts # Presence-specific hook built on useWebSocket
useNotifications.ts # Notification channel hook
websocket/
types.ts # Shared message type definitions
constants.ts # Endpoints, reconnect defaults
index.ts # Public API
Configuracion de Module Federation en el shell:
// rsbuild.config.ts (shell)
new ModuleFederationPlugin({
name: 'shell',
shared: {
'@platform/shared-utils': {
singleton: true,
requiredVersion: '^1.0.0',
},
react: { singleton: true, requiredVersion: '^19.2.4' },
'react-dom': { singleton: true, requiredVersion: '^19.2.4' },
},
});
Independencia del MFE
Cada micro frontend importa y usa el hook WebSocket de forma independiente. No existe un gestor WebSocket centralizado a traves del cual todos los MFEs deban comunicarse:
// In the "orders" MFE
import { useWebSocket } from '@platform/shared-utils';
function OrderTracker({ orderId }: { orderId: string }) {
const { status, lastMessage } = useWebSocket(
`wss://api.example.com/ws/orders/${orderId}`,
);
// ...
}
// In the "analytics" MFE
import { useWebSocket } from '@platform/shared-utils';
function LiveDashboard({ dashboardId }: { dashboardId: string }) {
const { status, lastMessage } = useWebSocket(
`wss://api.example.com/ws/dashboard/${dashboardId}`,
);
// ...
}
Conexiones gestionadas por el shell
La aplicacion shell gestiona las conexiones WebSocket que abarcan toda la plataforma y son transversales a todos los MFEs -- especificamente presencia y notificaciones. Estas conexiones se establecen una sola vez cuando el usuario inicia sesion y se comparten con los MFEs a traves de la API CustomEvent del navegador:
// Shell app -- establishes global connections
function ShellApp() {
const { status: presenceStatus } = useWebSocket(
`wss://api.example.com/ws/presence/${orgId}`,
{
onMessage: (msg) => {
// Dispatch presence updates as CustomEvents for MFEs to consume
window.dispatchEvent(
new CustomEvent('platform:presence', { detail: msg }),
);
},
},
);
const { status: notifStatus } = useWebSocket(
`wss://api.example.com/ws/notifications/${userId}`,
{
onMessage: (msg) => {
window.dispatchEvent(
new CustomEvent('platform:notification', { detail: msg }),
);
},
},
);
return <MicroFrontendContainer />;
}
// Any MFE can listen for shell-managed events
function usePresenceFromShell() {
const [presenceMap, setPresenceMap] = useState<Map<string, PresenceInfo>>(new Map());
useEffect(() => {
const handler = (event: CustomEvent) => {
const { userId, status } = event.detail.payload;
setPresenceMap((prev) => {
const next = new Map(prev);
next.set(userId, { status, lastSeen: Date.now() });
return next;
});
};
window.addEventListener('platform:presence', handler as EventListener);
return () => {
window.removeEventListener('platform:presence', handler as EventListener);
};
}, []);
return presenceMap;
}
Resumen de propiedad de conexiones
| Tipo de conexion | Gestionada por | Alcance | Ejemplo |
|---|---|---|---|
| Presencia | Shell | Toda la organizacion | ws/presence/:orgId |
| Notificaciones | Shell | Por usuario | ws/notifications/:userId |
| Chat rooms | MFE (messaging) | Por sala | ws/room/:roomId |
| Edicion de documentos | MFE (docs) | Por documento | ws/doc/:docId |
| Actualizaciones de dashboard | MFE (analytics) | Por dashboard | ws/dashboard/:dashboardId |
| Seguimiento de pedidos | MFE (orders) | Por pedido | ws/orders/:orderId |
Monitoreo y alertas de conteo de conexiones
Monitorear el conteo de conexiones WebSocket es critico para detectar problemas de capacidad, tormentas de reconexion anormales y presion de memoria antes de que afecten a los usuarios.
Monitoreo en el servidor
Expone metricas de conexion desde el Durable Object publicando conteos en cada evento de conexion y desconexion. Usa Cloudflare Workers Analytics Engine para almacenamiento de series temporales:
export class MonitoredChatRoom extends DurableObject<Env> {
/**
* Report the current connection count to the Analytics Engine.
* Call this on every connect, disconnect, and periodically via alarm.
*/
private reportConnectionMetrics(): void {
const connections = this.ctx.getWebSockets();
const totalConnections = connections.length;
// Write to Analytics Engine for dashboarding and alerting
this.env.WEBSOCKET_METRICS.writeDataPoint({
blobs: [this.ctx.id.toString()],
doubles: [totalConnections],
indexes: ['connection_count'],
});
// Log a warning when approaching the recommended ceiling
if (totalConnections > 4_000) {
console.warn(
`DO ${this.ctx.id.toString()} has ${totalConnections} connections — ` +
`approaching the 5,000 recommended limit. Consider sharding.`,
);
}
}
async fetch(request: Request): Promise<Response> {
// ... accept WebSocket ...
this.reportConnectionMetrics();
return new Response(null, { status: 101, webSocket: client });
}
async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
ws.close(code, reason);
this.reportConnectionMetrics();
// ... rest of cleanup ...
}
async alarm(): Promise<void> {
// Periodically report metrics even when idle, so gaps in the time series
// indicate the DO was hibernating rather than simply unreported.
this.reportConnectionMetrics();
// ... rest of alarm logic ...
}
}
Umbrales de alerta
Configura alertas sobre las siguientes condiciones usando Cloudflare Notifications o un sistema externo que consuma datos del Analytics Engine:
| Condicion | Umbral | Accion |
|---|---|---|
| Conteo de conexiones alto | > 4.000 por DO | Preparar sharding; investigar si un unico limite de coordinacion es demasiado amplio |
| Pico rapido de reconexiones | > 500 nuevas conexiones/min a un unico DO | Probable evento de despliegue o red; verificar que los clientes usen backoff con jitter |
| Conteo de conexiones cae a cero | Todos los DOs en un namespace simultaneamente | Posible incidente de despliegue o infraestructura; verificar enrutamiento del Worker |
| Tendencia de crecimiento sostenido | Incremento diario constante | Planificar capacidad para sharding antes de alcanzar el techo de 5.000 |
Monitoreo en el cliente
Registra eventos de reconexion en el cliente para exponer problemas de conectividad a backends de observabilidad:
onStatusChange: (status) => {
// Emit to your telemetry pipeline (e.g., OpenTelemetry, Datadog, custom)
telemetry.trackEvent('websocket_status_change', {
status,
url: wsUrl,
timestamp: Date.now(),
});
if (status === 'failed') {
telemetry.trackEvent('websocket_reconnection_exhausted', {
url: wsUrl,
timestamp: Date.now(),
});
}
},
Agrega metricas de reconexion del lado del cliente para detectar patrones como:
- Un pico de eventos
reconnectingen muchos clientes al mismo tiempo (indica un evento del lado del servidor). - Estado
failedpersistente para regiones o ISPs especificos (indica un problema en la ruta de red). - Alta frecuencia de reconexion para usuarios individuales (indica condiciones de red local inestables).
Precios y limites
Comprender los limites de recursos y el modelo de costes es esencial para disenar la topologia del Durable Object. Los siguientes valores reflejan el plan Paid de Cloudflare Workers.
Limites de Durable Objects
| Recurso | Limite | Notas |
|---|---|---|
| Solicitudes entrantes por DO | ~1.000/seg | Limite soft; puede elevarse en Enterprise |
| Tamano de mensaje WebSocket | 32 MiB | Tamano maximo de payload por mensaje |
| Almacenamiento del Durable Object | 10 GB por DO | Almacenamiento key-value co-ubicado con el DO |
| Operaciones de lectura de storage | 1.000/seg por DO | Las lecturas batch cuentan como una sola operacion |
| Operaciones de escritura de storage | 1.000/seg por DO | Las escrituras batch cuentan como una sola operacion |
| Limite de subrequest | 1.000 por invocacion | Llamadas fetch salientes desde el DO |
| Tiempo de CPU por solicitud | 30 segundos | Tiempo de reloj de pared de CPU por invocacion |
| Tags por WebSocket | 10 | Se establecen al aceptar, inmutables |
| Tamano del tag | 256 bytes | Por tag |
| Precision de alarma | ~30 segundos minimo | No apto para intervalos sub-segundo |
Facturacion de hibernacion
| Componente de facturacion | Activo | Hibernando |
|---|---|---|
| Cargos por tiempo de CPU | Tarifa completa | No se cobra |
| Cargos por duracion | Tarifa completa | No se cobra |
| Cargos por almacenamiento | Estandar | Estandar (aplica siempre) |
| Cargos por mensaje WebSocket | Por mensaje | Por mensaje (despierta el DO) |
Nota sobre facturacion de Durable Object SQLite: La facturacion del almacenamiento Durable Object SQLite esta activa desde enero de 2026. Si tus Durable Objects usan el backend de almacenamiento respaldado por SQLite (el predeterminado para DOs recien creados), ten en cuenta que las lecturas y escrituras se facturan por fila leida/escrita en lugar de por operacion key-value. Consulta la pagina de Durable Objects Pricing para la tarjeta de tarifas actual de SQLite.
Implicaciones de coste para la arquitectura
- Canales de presencia (mayormente inactivos, heartbeat ocasional): Hibernan mas del 95% del tiempo. Extremadamente rentables.
- Canales de notificaciones (inactivos hasta que llega una notificacion): Coste casi nulo durante periodos de inactividad.
- Chat rooms (actividad esporadica): Rentables para salas con rafagas de actividad seguidas de periodos tranquilos.
- Edicion colaborativa (actualizaciones continuas de alta frecuencia): Coste mas alto debido a mensajes frecuentes que impiden la hibernacion. Usa batching de mensajes para reducir despertares.
Directrices de escalado
- Manten cada Durable Object por debajo de 5.000 conexiones WebSocket concurrentes para un rendimiento fiable.
- Para organizaciones con mas de 5.000 usuarios concurrentes, fragmenta la presencia entre multiples DOs (por ejemplo,
presence:org-acme:shard-0,presence:org-acme:shard-1). - Usa operaciones batch de
this.ctx.storagepara mantenerte dentro del limite de 1.000 ops/seg. - Monitorea el tiempo de CPU del DO por solicitud para evitar alcanzar el limite de 30 segundos durante la limpieza basada en alarmas.