Comunicacion en Tiempo Real

Tabla de Contenidos


Vision general

La plataforma ofrece funcionalidades en tiempo real a traves de los micro frontends, incluyendo:

  • Actualizaciones en vivo -- metricas de dashboard, elementos del feed e indicadores de estado que reflejan cambios del servidor en milisegundos.
  • Edicion colaborativa -- varios usuarios pueden ver y modificar el mismo documento o configuracion de forma simultanea, con transformaciones operacionales o resolucion de conflictos basada en CRDT.
  • Notificaciones -- alertas push entregadas instantaneamente a los clientes conectados sin necesidad de polling.
  • Presencia -- visibilidad de quien esta en linea, que pagina esta viendo y si esta escribiendo activamente o inactivo.

Arquitectura

El flujo de datos para todas las funcionalidades en tiempo real sigue un camino consistente:

Flujo de datos en tiempo real: Navegador a Worker a Durable Object a Hibernatable WebSocket Navegador App React Cliente WebSocket WSS Worker Enrutamiento en el edge Autenticacion JWT stub.fetch Durable Object Coordinacion con estado Consistencia single-writer Hibernatable WebSocket Bidireccional persistente Lado del cliente Cloudflare Edge (sin infraestructura que gestionar)
  1. El navegador abre una conexion WebSocket hacia un endpoint de Cloudflare Worker.
  2. El Worker autentica la solicitud, resuelve el ID del Durable Object correcto y reenvía la solicitud de upgrade.
  3. El Durable Object acepta el WebSocket usando la Hibernatable WebSocket API, adjunta metadatos y gestiona todo el enrutamiento de mensajes.
  4. El Hibernatable WebSocket mantiene la conexion activa incluso cuando el Durable Object no tiene trabajo pendiente, reduciendo drasticamente el coste.

Por que Durable Objects en lugar de servidores WebSocket tradicionales

AspectoServidor tradicionalDurable Objects
DespliegueProvisionar y gestionar VMs o contenedoresNativo en el edge, sin infraestructura
EscaladoEscalado horizontal manual + sticky sessionsEscalado automatico por objeto a traves de la red de Cloudflare
Gestion del servidorParcheo de SO, balanceadores de carga, health checksTotalmente gestionado por Cloudflare
Coste en inactividadPagar por servidores siempre encendidosLa hibernacion elimina los cargos de CPU en periodos de inactividad
Latencia globalRegion unica o replicacion multi-regionSe ejecuta en el edge mas cercano al usuario coordinador
Consistencia de estadoBases de datos externas + invalidacion de cacheSingle-threaded por objeto con almacenamiento co-ubicado

Los Durable Objects eliminan la carga operativa de mantener infraestructura WebSocket, proporcionando garantías solidas de consistencia single-writer por limite de coordinacion.


Hibernatable WebSocket API

Ciclo de vida de hibernacion del WebSocket: conectado, hibernando, despertar por mensaje, procesar, hibernar de nuevo Ciclo de vida de la hibernacion Conectado Procesamiento activo idle Hibernando Sin cargos de CPU message Despertar Isolate restaurado Procesar webSocketMessage() todas las conexiones inactivas de nuevo mas msgs Los attachments sobreviven Tags + metadatos serializados Estado en memoria perdido Usar ctx.storage para persistir

Que es la hibernacion

Los Durable Objects soportan un mecanismo de hibernacion que cambia fundamentalmente el modelo de costes para conexiones WebSocket de larga duracion:

  • Durante el procesamiento activo, el Durable Object funciona normalmente, ejecuta JavaScript y se factura por tiempo de CPU.
  • Cuando todas las conexiones WebSocket estan inactivas (sin mensajes entrantes, sin alarmas pendientes), el Durable Object puede entrar en hibernacion.
  • Durante la hibernacion:
    • No se generan cargos de CPU.
    • Todas las conexiones WebSocket permanecen abiertas y funcionales desde la perspectiva del cliente.
    • El estado en memoria se descarta. Cualquier estado que deba sobrevivir a la hibernacion necesita almacenarse via this.ctx.storage o serializarse en los attachments del WebSocket.
    • El isolate JavaScript del Durable Object se desaloja de memoria.
  • El Durable Object despierta cuando:
    • Llega un mensaje WebSocket entrante (dispara webSocketMessage()).
    • Se activa una alarma previamente programada (dispara alarm()).
    • Llega una nueva solicitud HTTP (dispara fetch()).

Facturacion durante la hibernacion: Mientras el Durable Object esta hibernando, no se generan cargos de CPU ni de duracion. Solo se factura por almacenamiento y por mensajes WebSocket entrantes que despierten el DO. Esto hace economicamente viable mantener miles de conexiones WebSocket inactivas para canales de presencia y notificaciones.

Patron de la API

La Hibernatable WebSocket API usa event handlers en la clase del Durable Object en lugar de adjuntar listeners a objetos WebSocket individuales. Esto es lo que permite a Cloudflare desalojar el isolate de memoria mientras mantiene las conexiones abiertas.

import { DurableObject } from 'cloudflare:workers';

interface Env {
  CHAT_ROOMS: DurableObjectNamespace<ChatRoom>;
}

interface WebSocketAttachment {
  userId: string;
  displayName: string;
  role: 'admin' | 'member' | 'viewer';
  joinedAt: number;
}

interface IncomingMessage {
  type: 'chat' | 'typing' | 'presence' | 'ping';
  payload: Record<string, unknown>;
}

interface OutgoingMessage {
  type: string;
  payload: Record<string, unknown>;
  sender: string;
  timestamp: number;
}

export class ChatRoom extends DurableObject<Env> {
  private messageHistory: OutgoingMessage[] = [];

  /**
   * Handles the initial HTTP request to upgrade to a WebSocket connection.
   * The Worker forwards the upgrade request here after authentication.
   */
  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    // Verify this is a WebSocket upgrade request
    if (request.headers.get('Upgrade') !== 'websocket') {
      return new Response('Expected WebSocket upgrade', { status: 426 });
    }

    // Extract authenticated user info (set by the Worker after JWT validation)
    const userId = url.searchParams.get('userId');
    const displayName = url.searchParams.get('displayName') ?? 'Anonymous';
    const role = (url.searchParams.get('role') as WebSocketAttachment['role']) ?? 'member';

    if (!userId) {
      return new Response('Missing userId', { status: 400 });
    }

    // Create the WebSocket pair
    const pair = new WebSocketPair();
    const [client, server] = Object.values(pair);

    // Accept the server-side WebSocket using the Hibernatable API.
    // Tags enable efficient filtered broadcasting later.
    this.ctx.acceptWebSocket(server, [`user:${userId}`, `role:${role}`]);

    // Attach metadata that survives hibernation
    const attachment: WebSocketAttachment = {
      userId,
      displayName,
      role,
      joinedAt: Date.now(),
    };
    server.serializeAttachment(attachment);

    // Notify existing participants of the new connection
    this.broadcast({
      type: 'user-joined',
      payload: { userId, displayName },
      sender: 'system',
      timestamp: Date.now(),
    }, server);

    // Send recent message history to the new client
    const recentMessages = await this.getRecentMessages(50);
    server.send(JSON.stringify({
      type: 'history',
      payload: { messages: recentMessages },
      sender: 'system',
      timestamp: Date.now(),
    }));

    // Return the client-side WebSocket to the Worker, which returns it to the browser
    return new Response(null, { status: 101, webSocket: client });
  }

  /**
   * Called when any connected WebSocket sends a message.
   * This is the core handler that wakes the DO from hibernation.
   */
  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const attachment = ws.deserializeAttachment() as WebSocketAttachment;

    // Handle binary messages if needed
    if (message instanceof ArrayBuffer) {
      console.warn(`Binary message from ${attachment.userId}, ignoring`);
      return;
    }

    let data: IncomingMessage;
    try {
      data = JSON.parse(message);
    } catch {
      ws.send(JSON.stringify({ type: 'error', payload: { message: 'Invalid JSON' } }));
      return;
    }

    switch (data.type) {
      case 'chat': {
        const outgoing: OutgoingMessage = {
          type: 'chat',
          payload: {
            ...data.payload,
            messageId: crypto.randomUUID(),
          },
          sender: attachment.userId,
          timestamp: Date.now(),
        };

        // Persist the message
        await this.storeMessage(outgoing);

        // Broadcast to all connected clients including the sender (for confirmation)
        this.broadcast(outgoing);
        break;
      }

      case 'typing': {
        // Typing indicators are transient -- broadcast but do not persist
        this.broadcast({
          type: 'typing',
          payload: {
            userId: attachment.userId,
            displayName: attachment.displayName,
            isTyping: data.payload.isTyping ?? false,
          },
          sender: attachment.userId,
          timestamp: Date.now(),
        }, ws); // Exclude the sender
        break;
      }

      case 'presence': {
        // Update presence state in storage
        await this.ctx.storage.put(`presence:${attachment.userId}`, {
          status: data.payload.status,
          lastSeen: Date.now(),
        });

        this.broadcast({
          type: 'presence',
          payload: {
            userId: attachment.userId,
            status: data.payload.status,
          },
          sender: 'system',
          timestamp: Date.now(),
        });
        break;
      }

      case 'ping': {
        ws.send(JSON.stringify({ type: 'pong', payload: {}, sender: 'system', timestamp: Date.now() }));
        break;
      }

      default: {
        ws.send(JSON.stringify({
          type: 'error',
          payload: { message: `Unknown message type: ${data.type}` },
          sender: 'system',
          timestamp: Date.now(),
        }));
      }
    }
  }

  /**
   * Called when a WebSocket connection closes cleanly or abnormally.
   */
  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    const attachment = ws.deserializeAttachment() as WebSocketAttachment;

    // Close the server side of the connection
    ws.close(code, reason);

    // Remove presence data
    await this.ctx.storage.delete(`presence:${attachment.userId}`);

    // Notify remaining participants
    this.broadcast({
      type: 'user-left',
      payload: {
        userId: attachment.userId,
        displayName: attachment.displayName,
      },
      sender: 'system',
      timestamp: Date.now(),
    });

    // If no connections remain, schedule cleanup
    if (this.ctx.getWebSockets().length === 0) {
      await this.ctx.storage.setAlarm(Date.now() + 60_000); // 1 minute
    }
  }

  /**
   * Called when a WebSocket connection encounters an error.
   */
  async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
    const attachment = ws.deserializeAttachment() as WebSocketAttachment | null;
    const userId = attachment?.userId ?? 'unknown';
    console.error(`WebSocket error for user ${userId}:`, error);

    // Attempt to close the errored connection
    try {
      ws.close(1011, 'Internal error');
    } catch {
      // Connection may already be closed
    }
  }

  /**
   * Alarm handler for scheduled tasks (cleanup, state compaction, etc.).
   */
  async alarm(): Promise<void> {
    const connections = this.ctx.getWebSockets();

    if (connections.length === 0) {
      // No connections for the full cleanup window -- purge old messages
      const keys = await this.ctx.storage.list({ prefix: 'msg:' });
      const cutoff = Date.now() - 24 * 60 * 60 * 1000; // 24 hours

      const toDelete: string[] = [];
      for (const [key, value] of keys) {
        const msg = value as OutgoingMessage;
        if (msg.timestamp < cutoff) {
          toDelete.push(key);
        }
      }

      if (toDelete.length > 0) {
        await this.ctx.storage.delete(toDelete);
      }
    }
  }

  // --- Private helpers ---

  private broadcast(message: OutgoingMessage, exclude?: WebSocket): void {
    const raw = JSON.stringify(message);
    for (const ws of this.ctx.getWebSockets()) {
      if (ws !== exclude) {
        try {
          ws.send(raw);
        } catch {
          // Connection may be closing; webSocketClose will handle cleanup
        }
      }
    }
  }

  private async storeMessage(message: OutgoingMessage): Promise<void> {
    const key = `msg:${message.timestamp}:${(message.payload as { messageId: string }).messageId}`;
    await this.ctx.storage.put(key, message);

    // Also keep in memory for quick access (reset on hibernation)
    this.messageHistory.push(message);
    if (this.messageHistory.length > 200) {
      this.messageHistory = this.messageHistory.slice(-100);
    }
  }

  private async getRecentMessages(count: number): Promise<OutgoingMessage[]> {
    // Try in-memory cache first (empty after hibernation)
    if (this.messageHistory.length > 0) {
      return this.messageHistory.slice(-count);
    }

    // Fall back to storage
    const entries = await this.ctx.storage.list<OutgoingMessage>({
      prefix: 'msg:',
      reverse: true,
      limit: count,
    });

    const messages = [...entries.values()].reverse();
    this.messageHistory = messages;
    return messages;
  }
}

El Worker correspondiente que enruta las solicitudes al Durable Object:

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Example: /ws/room/:roomId
    const match = url.pathname.match(/^\/ws\/room\/(.+)$/);
    if (!match) {
      return new Response('Not found', { status: 404 });
    }

    const roomId = match[1];

    // Authenticate the request (JWT, cookie, etc.)
    const auth = await authenticateRequest(request);
    if (!auth) {
      return new Response('Unauthorized', { status: 401 });
    }

    // Resolve the Durable Object by a deterministic name
    const id = env.CHAT_ROOMS.idFromName(`room:${roomId}`);
    const stub = env.CHAT_ROOMS.get(id);

    // Forward the upgrade request with auth context
    const doUrl = new URL(request.url);
    doUrl.searchParams.set('userId', auth.userId);
    doUrl.searchParams.set('displayName', auth.displayName);
    doUrl.searchParams.set('role', auth.role);

    return stub.fetch(new Request(doUrl.toString(), request));
  },
} satisfies ExportedHandler<Env>;

Estructura del Durable Object

Un DO por limite de coordinacion

Cada instancia de Durable Object representa un unico limite de coordinacion. Todos los participantes dentro de ese limite se conectan a la misma instancia de DO, que proporciona consistencia single-threaded:

Caso de usoLimite del DOID de ejemplo
Chat roomUn DO por salaroom:engineering-general
Edicion de documentosUn DO por documentodoc:invoice-template-42
Seguimiento de presenciaUn DO por organizacionpresence:org-acme-corp
Notificaciones de usuarioUn DO por usuarionotifications:user-abc123
Actualizaciones de dashboardUn DO por dashboarddashboard:sales-overview

Convencion de nombres

Usa un esquema de nombres basado en prefijos consistente para los IDs de Durable Object:

// In the Worker, resolve the DO ID by name
function getDurableObjectId(
  namespace: DurableObjectNamespace,
  type: string,
  identifier: string,
): DurableObjectId {
  return namespace.idFromName(`${type}:${identifier}`);
}

// Examples:
const roomId = getDurableObjectId(env.CHAT_ROOMS, 'room', roomSlug);
const presenceId = getDurableObjectId(env.PRESENCE, 'presence', orgId);
const notifId = getDurableObjectId(env.NOTIFICATIONS, 'notifications', userId);

Enrutamiento de mensajes

Dentro del Durable Object, enruta los mensajes entrantes a handlers especializados segun el tipo de mensaje. Esto mantiene limpio el handler webSocketMessage y facilita la adicion de nuevos tipos de mensaje:

type MessageHandler = (ws: WebSocket, payload: Record<string, unknown>, attachment: WebSocketAttachment) => Promise<void>;

private handlers: Record<string, MessageHandler> = {
  'chat': this.handleChat.bind(this),
  'typing': this.handleTyping.bind(this),
  'presence': this.handlePresence.bind(this),
  'cursor-move': this.handleCursorMove.bind(this),
  'selection-change': this.handleSelectionChange.bind(this),
  'ping': this.handlePing.bind(this),
};

async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
  const attachment = ws.deserializeAttachment() as WebSocketAttachment;
  const data = JSON.parse(message as string) as IncomingMessage;

  const handler = this.handlers[data.type];
  if (handler) {
    await handler(ws, data.payload, attachment);
  } else {
    ws.send(JSON.stringify({ type: 'error', payload: { message: `Unknown type: ${data.type}` } }));
  }
}

Gestion de estado

Los Durable Objects ofrecen dos niveles de estado:

Estado persistente via this.ctx.storage -- sobrevive a la hibernacion, reinicios y redespliegues:

// Key-value storage operations
await this.ctx.storage.put('room:metadata', { name: 'General', createdAt: Date.now() });
const metadata = await this.ctx.storage.get<RoomMetadata>('room:metadata');

// Batch operations for efficiency (counts as a single storage operation)
await this.ctx.storage.put({
  'user:alice:presence': { status: 'online', lastSeen: Date.now() },
  'user:bob:presence': { status: 'away', lastSeen: Date.now() },
});

// List with prefix scan
const allPresence = await this.ctx.storage.list({ prefix: 'user:', limit: 100 });

Estado transitorio en memoria via propiedades de clase -- rapido pero se pierde en la hibernacion:

export class ChatRoom extends DurableObject<Env> {
  // In-memory caches (rebuilt on wake from hibernation)
  private messageCache: OutgoingMessage[] = [];
  private typingUsers: Map<string, number> = new Map();
  private connectionCount = 0;

  // These reset to their initial values after hibernation.
  // Rebuild them in the first handler call after waking up.
}

Usa almacenamiento persistente para datos que no deben perderse (mensajes, estado de presencia, configuracion). Usa estado en memoria para caches y datos transitorios que cambian frecuentemente (indicadores de escritura, posiciones de cursor) que pueden derivarse del estado persistente o reconstruirse a partir de los clientes conectados.


Gestion de conexiones

serializeAttachment / deserializeAttachment

Cada WebSocket aceptado via la Hibernatable API puede llevar un attachment -- un objeto serializable a JSON que persiste entre ciclos de hibernacion. Este es el mecanismo principal para identificar conexiones.

interface WebSocketAttachment {
  userId: string;
  displayName: string;
  role: 'admin' | 'member' | 'viewer';
  orgId: string;
  joinedAt: number;
  lastMessageId?: string;
}

// On connection accept:
async fetch(request: Request): Promise<Response> {
  const pair = new WebSocketPair();
  const [client, server] = Object.values(pair);

  this.ctx.acceptWebSocket(server);

  // Serialize metadata onto the WebSocket
  const attachment: WebSocketAttachment = {
    userId: 'user-abc',
    displayName: 'Alice',
    role: 'admin',
    orgId: 'org-acme',
    joinedAt: Date.now(),
  };
  server.serializeAttachment(attachment);

  return new Response(null, { status: 101, webSocket: client });
}

// In any message handler -- even after hibernation:
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
  const attachment = ws.deserializeAttachment() as WebSocketAttachment;

  console.log(`Message from ${attachment.displayName} (${attachment.userId})`);

  // Update the attachment if needed
  attachment.lastMessageId = 'msg-xyz';
  ws.serializeAttachment(attachment);
}

Propiedades clave de los attachments:

  • Sobreviven a la hibernacion: el attachment se serializa a disco cuando el DO hiberna y se restaura al despertar.
  • Por conexion: cada WebSocket tiene su propio attachment independiente.
  • Mutables: llama a serializeAttachment() de nuevo para actualizar los datos almacenados.
  • Limite de tamano: manten los attachments pequenos (por debajo de unos pocos KB) ya que se serializan en cada ciclo de hibernacion.

Ciclo de vida de la conexion

El ciclo de vida completo de una conexion WebSocket a traves del sistema:

1. Client initiates connection
   Browser: new WebSocket('wss://api.example.com/ws/room/general')

2. Worker authenticates and routes
   Worker: validate JWT → resolve DO ID → forward upgrade request

3. Durable Object accepts connection
   DO: acceptWebSocket(server, tags) → serializeAttachment(metadata)
       → broadcast user-joined → send message history

4. Bidirectional communication
   Client → webSocketMessage() → process → broadcast/respond
   DO → broadcast/targeted send → Client

5. Hibernation (when idle)
   All connections quiet → isolate evicted → connections stay open
   Incoming message → isolate restored → webSocketMessage() fires
   Attachments and tags are preserved

6. Client disconnects
   Browser closes tab / network drops / explicit close
   → webSocketClose(ws, code, reason) fires
   → cleanup presence → broadcast user-left
   → if no connections remain, schedule alarm for cleanup

7. Scheduled cleanup
   alarm() fires → purge stale data → compact storage

Patrones de broadcasting

Patrones de broadcasting: broadcast a todos (fan-out) versus broadcast con tags (filtrado) Broadcast a todos (fan-out) Durable Object Cliente A Cliente B Cliente C Cliente D getWebSockets() Todos reciben msg Broadcast con tags (filtrado) Durable Object Cliente A role:admin Cliente B role:member Cliente C role:viewer Cliente D role:admin getWebSockets ('role:admin') Solo admins

Broadcast a todos

El patron mas comun: enviar un mensaje a cada WebSocket conectado en el Durable Object.

/**
 * Broadcast a message to all connected WebSockets.
 * Optionally exclude a specific connection (e.g., the sender).
 */
private broadcast(
  message: OutgoingMessage,
  options?: { exclude?: WebSocket; onlyTo?: string[] },
): void {
  const raw = JSON.stringify(message);
  const sockets = this.ctx.getWebSockets();

  for (const ws of sockets) {
    if (options?.exclude && ws === options.exclude) {
      continue;
    }

    if (options?.onlyTo) {
      const attachment = ws.deserializeAttachment() as WebSocketAttachment;
      if (!options.onlyTo.includes(attachment.userId)) {
        continue;
      }
    }

    try {
      ws.send(raw);
    } catch {
      // The connection may be in a closing state.
      // webSocketClose() will handle cleanup.
    }
  }
}

// Usage examples:

// Broadcast to everyone
this.broadcast({ type: 'chat', payload: { text: 'Hello' }, sender: 'alice', timestamp: Date.now() });

// Broadcast to everyone except the sender
this.broadcast(message, { exclude: senderWs });

// Send to specific users only
this.broadcast(notification, { onlyTo: ['user-bob', 'user-carol'] });

Broadcast con tags

Los tags proporcionan una forma mas eficiente de filtrar conexiones WebSocket sin deserializar cada attachment. Los tags se establecen al aceptar el WebSocket y pueden usarse para obtener un subconjunto filtrado.

// Accept a WebSocket with tags
async fetch(request: Request): Promise<Response> {
  const pair = new WebSocketPair();
  const [client, server] = Object.values(pair);

  const userId = 'user-abc';
  const role = 'admin';
  const teamId = 'team-frontend';

  // Tags allow efficient filtering via getWebSockets(tag)
  this.ctx.acceptWebSocket(server, [
    `user:${userId}`,
    `role:${role}`,
    `team:${teamId}`,
  ]);

  server.serializeAttachment({ userId, role, teamId });

  return new Response(null, { status: 101, webSocket: client });
}

// Broadcast only to admins
private broadcastToAdmins(message: OutgoingMessage): void {
  const raw = JSON.stringify(message);
  const adminSockets = this.ctx.getWebSockets('role:admin');

  for (const ws of adminSockets) {
    try {
      ws.send(raw);
    } catch {
      // handled by webSocketClose
    }
  }
}

// Send a direct message to a specific user
private sendToUser(userId: string, message: OutgoingMessage): void {
  const raw = JSON.stringify(message);
  const userSockets = this.ctx.getWebSockets(`user:${userId}`);

  for (const ws of userSockets) {
    try {
      ws.send(raw);
    } catch {
      // handled by webSocketClose
    }
  }
}

// Broadcast to a team channel
private broadcastToTeam(teamId: string, message: OutgoingMessage): void {
  const raw = JSON.stringify(message);
  const teamSockets = this.ctx.getWebSockets(`team:${teamId}`);

  for (const ws of teamSockets) {
    try {
      ws.send(raw);
    } catch {
      // handled by webSocketClose
    }
  }
}

Limites y directrices para tags:

  • Cada WebSocket puede tener hasta 10 tags.
  • Cada tag puede tener hasta 256 bytes.
  • Los tags son inmutables despues de acceptWebSocket() -- para cambiar tags, el cliente debe reconectarse.
  • Usa tags para agrupaciones gruesas y estables (rol, equipo, sala). Usa metadatos del attachment para filtrado detallado y mutable.

Batching de mensajes para actualizaciones de alta frecuencia

Problema

Funcionalidades como seguimiento colaborativo de cursores, indicadores de escritura en vivo o metricas de dashboard en tiempo real pueden generar decenas de actualizaciones por segundo por usuario. Enviar cada actualizacion como un mensaje WebSocket individual genera overhead innecesario:

  • Cada llamada a send() incurre en coste de serializacion.
  • Los clientes receptores procesan muchos mensajes pequenos en lugar de menos actualizaciones agrupadas.
  • Overhead de red por el framing de muchos mensajes pequenos.

Solucion

Acumula los mensajes entrantes de alta frecuencia y envialos en bloque a intervalos cortos usando alarmas del Durable Object:

import { DurableObject } from 'cloudflare:workers';

interface CursorUpdate {
  userId: string;
  x: number;
  y: number;
  timestamp: number;
}

interface BatchedMessage {
  type: 'cursor-batch';
  payload: {
    cursors: CursorUpdate[];
  };
  timestamp: number;
}

export class CollaborativeDocument extends DurableObject<Env> {
  /**
   * Buffer for high-frequency updates.
   * Maps userId to their latest cursor position (only the latest matters).
   */
  private cursorBuffer: Map<string, CursorUpdate> = new Map();

  /**
   * Whether a flush alarm is already scheduled.
   */
  private flushScheduled = false;

  /**
   * Flush interval in milliseconds.
   * 50ms gives a good balance between latency and batching efficiency.
   */
  private static readonly FLUSH_INTERVAL_MS = 50;

  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const data = JSON.parse(message as string);

    switch (data.type) {
      case 'cursor-move': {
        const attachment = ws.deserializeAttachment() as WebSocketAttachment;

        // Buffer the cursor update (overwrite previous position for this user)
        this.cursorBuffer.set(attachment.userId, {
          userId: attachment.userId,
          x: data.payload.x,
          y: data.payload.y,
          timestamp: Date.now(),
        });

        // Schedule a flush if one is not already pending
        await this.scheduleFlush();
        break;
      }

      case 'document-edit': {
        // Critical updates bypass batching and are sent immediately
        this.broadcast({
          type: 'document-edit',
          payload: data.payload,
          sender: (ws.deserializeAttachment() as WebSocketAttachment).userId,
          timestamp: Date.now(),
        }, ws);
        break;
      }
    }
  }

  async alarm(): Promise<void> {
    // Flush the cursor buffer
    if (this.cursorBuffer.size > 0) {
      const cursors = Array.from(this.cursorBuffer.values());
      this.cursorBuffer.clear();
      this.flushScheduled = false;

      const batchedMessage: BatchedMessage = {
        type: 'cursor-batch',
        payload: { cursors },
        timestamp: Date.now(),
      };

      const raw = JSON.stringify(batchedMessage);
      for (const ws of this.ctx.getWebSockets()) {
        const attachment = ws.deserializeAttachment() as WebSocketAttachment;
        // Filter out the user's own cursor and skip if nothing remains
        const relevantCursors = cursors.filter((c) => c.userId !== attachment.userId);
        if (relevantCursors.length > 0) {
          const filtered = JSON.stringify({
            ...batchedMessage,
            payload: { cursors: relevantCursors },
          });
          try {
            ws.send(filtered);
          } catch {
            // handled by webSocketClose
          }
        }
      }
    }

    // Reschedule if new data arrived during the flush
    if (this.cursorBuffer.size > 0) {
      await this.scheduleFlush();
    }
  }

  private async scheduleFlush(): Promise<void> {
    if (!this.flushScheduled) {
      this.flushScheduled = true;

      const currentAlarm = await this.ctx.storage.getAlarm();
      if (currentAlarm === null) {
        await this.ctx.storage.setAlarm(
          Date.now() + CollaborativeDocument.FLUSH_INTERVAL_MS,
        );
      }
    }
  }

  private broadcast(message: OutgoingMessage, exclude?: WebSocket): void {
    const raw = JSON.stringify(message);
    for (const ws of this.ctx.getWebSockets()) {
      if (ws !== exclude) {
        try { ws.send(raw); } catch { /* handled by webSocketClose */ }
      }
    }
  }
}

Directrices de batching

Tipo de actualizacionIntervalo recomendadoEstrategia
Posiciones de cursor50msConservar solo la ultima por usuario
Indicadores de escritura100msDeduplicar por usuario
Metricas de dashboard200-500msAgregar/promediar valores
Heartbeats de presencia5.000-10.000msAgrupar por usuario
Mensajes de chatSin batchingEnviar inmediatamente
Ediciones de documentoSin batchingEnviar inmediatamente (el orden importa)

El principio clave: agrupa datos observacionales, envia datos operacionales de inmediato. Los usuarios toleran 50-100ms de latencia en posiciones de cursor, pero los mensajes de chat y las ediciones de documento deben entregarse lo mas rapido posible.


Estrategia de reconexion

Estrategia de reconexion: desconexion, backoff exponencial a 1s, 2s, 4s, 8s, luego reconexion y restauracion de estado Desconexion evento close Backoff exponencial + Jitter 0.5s intento 1 1s intento 2 2s intento 3 4s intento 4 ... Reconexion WebSocket abierto Restaurar estado reenviar msgs perdidos Tope en maxDelay (30s) | Full jitter evita thundering herd | Maximo 20 intentos antes de abandonar

Por que la reconexion es obligatoria

Las conexiones WebSocket se van a caer. No hay forma de evitarlo. Cada cliente debe implementar logica de reconexion porque:

  1. Despliegues de Cloudflare: Cuando se despliega una nueva version del Worker o del Durable Object, Cloudflare termina todas las conexiones WebSocket existentes hacia ese DO. Esto sucede en cada wrangler deploy.
  2. Interrupciones de red: Cambios de WiFi, traspasos celulares, reconexiones de VPN y cortes del ISP.
  3. Backgrounding en movil: iOS y Android cierran agresivamente las conexiones WebSocket cuando las apps pasan a segundo plano.
  4. Timeouts por inactividad: Aunque los Hibernatable WebSockets mantienen vivo el lado del servidor, los clientes o proxies intermediarios pueden cerrar conexiones inactivas.
  5. Desalojo del DO: En casos raros, Cloudflare puede necesitar migrar un Durable Object a otra maquina.

Tratar la reconexion como un caso de error excepcional en lugar de una condicion de operacion normal es la fuente mas comun de bugs en funcionalidades de tiempo real.

Reconexion en el cliente

Implementa backoff exponencial con jitter para evitar tormentas de reconexion tipo thundering herd:

interface ReconnectConfig {
  /** Initial delay before first reconnection attempt (ms) */
  initialDelay: number;
  /** Maximum delay between reconnection attempts (ms) */
  maxDelay: number;
  /** Multiplier applied to delay after each failed attempt */
  backoffMultiplier: number;
  /** Maximum number of reconnection attempts before giving up */
  maxAttempts: number;
  /** Maximum number of messages to queue while disconnected. When exceeded, oldest messages are dropped. */
  maxQueueSize: number;
}

const DEFAULT_RECONNECT_CONFIG: ReconnectConfig = {
  initialDelay: 500,
  maxDelay: 30_000,
  backoffMultiplier: 2,
  maxAttempts: 20,
  maxQueueSize: 100,
};

class ReconnectingWebSocket {
  private ws: WebSocket | null = null;
  private attempt = 0;
  private lastMessageId: string | null = null;
  private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
  private messageQueue: unknown[] = [];
  private config: ReconnectConfig;

  constructor(
    private url: string,
    config?: Partial<ReconnectConfig>,
  ) {
    this.config = { ...DEFAULT_RECONNECT_CONFIG, ...config };
    this.connect();
  }

  private connect(): void {
    // Append lastMessageId for server-side replay on reconnect
    const connectUrl = this.lastMessageId
      ? `${this.url}${this.url.includes('?') ? '&' : '?'}lastMessageId=${this.lastMessageId}`
      : this.url;

    this.ws = new WebSocket(connectUrl);

    this.ws.addEventListener('open', () => {
      console.log('WebSocket connected');
      this.attempt = 0; // Reset backoff on successful connection
      this.onStatusChange?.('connected');
      this.flushQueue();
    });

    this.ws.addEventListener('message', (event) => {
      const data = JSON.parse(event.data);

      // Track the latest message ID for reconnection replay
      if (data.payload?.messageId) {
        this.lastMessageId = data.payload.messageId;
      }

      this.onMessage?.(data);
    });

    this.ws.addEventListener('close', (event) => {
      console.log(`WebSocket closed: ${event.code} ${event.reason}`);
      this.ws = null;
      this.scheduleReconnect();
    });

    this.ws.addEventListener('error', () => {
      // The close event always follows an error event; reconnection is handled there.
      console.error('WebSocket error');
    });
  }

  private scheduleReconnect(): void {
    if (this.attempt >= this.config.maxAttempts) {
      console.error(`Giving up after ${this.attempt} reconnection attempts`);
      this.onStatusChange?.('failed');
      return;
    }

    this.onStatusChange?.('reconnecting');

    // Exponential backoff with ceiling and full jitter to prevent thundering herd.
    // The ceiling (maxDelay) caps the exponential growth.
    // Full jitter randomizes the actual delay between [0, capped_delay] so that
    // clients that reconnect simultaneously spread their retries over time.
    const capped = Math.min(
      this.config.initialDelay * Math.pow(this.config.backoffMultiplier, this.attempt),
      this.config.maxDelay,
    );
    const delay = Math.random() * capped;

    console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${this.attempt + 1}/${this.config.maxAttempts})`);

    this.reconnectTimer = setTimeout(() => {
      this.attempt++;
      this.connect();
    }, delay);
  }

  send(data: unknown): void {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data));
    } else {
      // Enforce max queue size by dropping oldest messages when the limit is reached
      if (this.messageQueue.length >= this.config.maxQueueSize) {
        const dropped = this.messageQueue.shift();
        console.warn(
          `Message queue full (${this.config.maxQueueSize}). Dropping oldest message:`,
          dropped,
        );
      }
      this.messageQueue.push(data);
    }
  }

  close(): void {
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
    }
    this.ws?.close(1000, 'Client closing');
  }

  // Callbacks
  onMessage?: (data: unknown) => void;
  onStatusChange?: (status: 'connecting' | 'connected' | 'reconnecting' | 'failed') => void;

  private flushQueue(): void {
    while (this.messageQueue.length > 0 && this.ws?.readyState === WebSocket.OPEN) {
      const msg = this.messageQueue.shift();
      this.ws.send(JSON.stringify(msg));
    }
  }
}

Soporte en el servidor

El Durable Object debe soportar la reconexion almacenando mensajes recientes y reenviando los perdidos:

export class ChatRoom extends DurableObject<Env> {
  /**
   * Handle WebSocket upgrade with optional reconnection replay.
   */
  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);
    const lastMessageId = url.searchParams.get('lastMessageId');

    const pair = new WebSocketPair();
    const [client, server] = Object.values(pair);

    this.ctx.acceptWebSocket(server);
    server.serializeAttachment({
      userId: url.searchParams.get('userId'),
      joinedAt: Date.now(),
    });

    // If reconnecting, replay missed messages
    if (lastMessageId) {
      const missed = await this.getMessagesSince(lastMessageId);
      if (missed.length > 0) {
        server.send(JSON.stringify({
          type: 'replay',
          payload: {
            messages: missed,
            fromMessageId: lastMessageId,
          },
          sender: 'system',
          timestamp: Date.now(),
        }));
      }
    } else {
      // New connection -- send recent history
      const recent = await this.getRecentMessages(50);
      server.send(JSON.stringify({
        type: 'history',
        payload: { messages: recent },
        sender: 'system',
        timestamp: Date.now(),
      }));
    }

    return new Response(null, { status: 101, webSocket: client });
  }

  /**
   * Retrieve all messages stored after the given message ID.
   * Messages are stored with keys like `msg:<timestamp>:<messageId>`
   * so a prefix scan starting after the last known message returns
   * everything the client missed.
   */
  private async getMessagesSince(lastMessageId: string): Promise<OutgoingMessage[]> {
    // Find the key for the last known message
    const allMessages = await this.ctx.storage.list<OutgoingMessage>({
      prefix: 'msg:',
    });

    let found = false;
    const missed: OutgoingMessage[] = [];

    for (const [key, message] of allMessages) {
      if (found) {
        missed.push(message);
      }
      if ((message.payload as { messageId?: string }).messageId === lastMessageId) {
        found = true;
      }
    }

    // If the message was not found (pruned), send recent history instead
    if (!found) {
      return this.getRecentMessages(50);
    }

    return missed;
  }

  private async getRecentMessages(count: number): Promise<OutgoingMessage[]> {
    const entries = await this.ctx.storage.list<OutgoingMessage>({
      prefix: 'msg:',
      reverse: true,
      limit: count,
    });
    return [...entries.values()].reverse();
  }
}

Hook React para WebSocket

Un hook React personalizado de calidad produccion para gestionar conexiones WebSocket dentro de micro frontends. Este hook reside en el paquete shared-utils y es consumido por cada MFE que necesita comunicacion en tiempo real.

import { useCallback, useEffect, useRef, useState } from 'react';

// --- Types ---

type WebSocketStatus = 'connecting' | 'connected' | 'disconnected' | 'reconnecting' | 'failed';

interface WebSocketOptions<TIncoming = unknown, TOutgoing = unknown> {
  /** Called when a parsed message is received */
  onMessage?: (message: TIncoming) => void;
  /** Called when the connection status changes */
  onStatusChange?: (status: WebSocketStatus) => void;
  /** Whether to connect automatically on mount. Defaults to true. */
  autoConnect?: boolean;
  /** Reconnection configuration */
  reconnect?: {
    enabled?: boolean;
    initialDelay?: number;
    maxDelay?: number;
    backoffMultiplier?: number;
    maxAttempts?: number;
    /** Maximum number of messages to queue while disconnected. Oldest messages are dropped when exceeded. Defaults to 100. */
    maxQueueSize?: number;
  };
  /** Protocols to request during the WebSocket handshake */
  protocols?: string | string[];
  /** Custom message serializer. Defaults to JSON.stringify. */
  serialize?: (message: TOutgoing) => string;
  /** Custom message deserializer. Defaults to JSON.parse. */
  deserialize?: (data: string) => TIncoming;
}

interface WebSocketReturn<TOutgoing = unknown> {
  /** Current connection status */
  status: WebSocketStatus;
  /** Send a typed message. Queues if not connected. */
  send: (message: TOutgoing) => void;
  /** The most recently received message */
  lastMessage: unknown | null;
  /** Manually trigger a reconnection */
  reconnect: () => void;
  /** Manually disconnect (disables auto-reconnect) */
  disconnect: () => void;
}

// --- Hook Implementation ---

export function useWebSocket<
  TIncoming = unknown,
  TOutgoing = unknown,
>(
  url: string | null,
  options: WebSocketOptions<TIncoming, TOutgoing> = {},
): WebSocketReturn<TOutgoing> {
  const {
    onMessage,
    onStatusChange,
    autoConnect = true,
    reconnect: reconnectConfig = {},
    protocols,
    serialize = JSON.stringify,
    deserialize = (data: string) => JSON.parse(data) as TIncoming,
  } = options;

  const {
    enabled: reconnectEnabled = true,
    initialDelay = 500,
    maxDelay = 30_000,
    backoffMultiplier = 2,
    maxAttempts = 20,
    maxQueueSize = 100,
  } = reconnectConfig;

  // --- State ---
  const [status, setStatus] = useState<WebSocketStatus>('disconnected');
  const [lastMessage, setLastMessage] = useState<unknown | null>(null);

  // --- Refs (stable across renders) ---
  const wsRef = useRef<WebSocket | null>(null);
  const attemptRef = useRef(0);
  const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
  const messageQueueRef = useRef<TOutgoing[]>([]);
  const intentionalCloseRef = useRef(false);
  const mountedRef = useRef(true);

  // Keep callback refs fresh without triggering reconnects
  const onMessageRef = useRef(onMessage);
  onMessageRef.current = onMessage;
  const onStatusChangeRef = useRef(onStatusChange);
  onStatusChangeRef.current = onStatusChange;

  // --- Status updater ---
  const updateStatus = useCallback((newStatus: WebSocketStatus) => {
    if (!mountedRef.current) return;
    setStatus(newStatus);
    onStatusChangeRef.current?.(newStatus);
  }, []);

  // --- Flush queued messages ---
  const flushQueue = useCallback(() => {
    const ws = wsRef.current;
    if (!ws || ws.readyState !== WebSocket.OPEN) return;

    while (messageQueueRef.current.length > 0) {
      const msg = messageQueueRef.current.shift()!;
      try {
        ws.send(serialize(msg));
      } catch (err) {
        console.error('Failed to send queued message:', err);
        // Put it back at the front
        messageQueueRef.current.unshift(msg);
        break;
      }
    }
  }, [serialize]);

  // --- Connect ---
  const connect = useCallback(() => {
    if (!url) return;

    // Clean up any existing connection
    if (wsRef.current) {
      wsRef.current.close(1000, 'Reconnecting');
      wsRef.current = null;
    }

    updateStatus('connecting');
    intentionalCloseRef.current = false;

    const ws = new WebSocket(url, protocols);
    wsRef.current = ws;

    ws.addEventListener('open', () => {
      if (!mountedRef.current) {
        ws.close();
        return;
      }

      attemptRef.current = 0;
      updateStatus('connected');
      flushQueue();
    });

    ws.addEventListener('message', (event: MessageEvent) => {
      if (!mountedRef.current) return;

      try {
        const parsed = deserialize(event.data as string);
        setLastMessage(parsed);
        onMessageRef.current?.(parsed);
      } catch (err) {
        console.error('Failed to parse WebSocket message:', err);
      }
    });

    ws.addEventListener('close', (event: CloseEvent) => {
      wsRef.current = null;

      if (!mountedRef.current) return;

      if (intentionalCloseRef.current) {
        updateStatus('disconnected');
        return;
      }

      // Attempt reconnection
      if (reconnectEnabled && attemptRef.current < maxAttempts) {
        updateStatus('reconnecting');

        // Exponential backoff with ceiling and full jitter to prevent thundering herd
        const capped = Math.min(
          initialDelay * Math.pow(backoffMultiplier, attemptRef.current),
          maxDelay,
        );
        const delay = Math.random() * capped;

        console.log(
          `WebSocket closed (${event.code}). Reconnecting in ${Math.round(delay)}ms ` +
          `(attempt ${attemptRef.current + 1}/${maxAttempts})`,
        );

        reconnectTimerRef.current = setTimeout(() => {
          if (mountedRef.current) {
            attemptRef.current++;
            connect();
          }
        }, delay);
      } else if (attemptRef.current >= maxAttempts) {
        console.error(`WebSocket reconnection failed after ${maxAttempts} attempts`);
        updateStatus('failed');
      } else {
        updateStatus('disconnected');
      }
    });

    ws.addEventListener('error', () => {
      // The close event will fire after this; reconnection is handled there.
      console.error('WebSocket error');
    });
  }, [
    url, protocols, reconnectEnabled, initialDelay, maxDelay,
    backoffMultiplier, maxAttempts, updateStatus, flushQueue, deserialize,
  ]);

  // --- Send ---
  const send = useCallback((message: TOutgoing) => {
    const ws = wsRef.current;
    if (ws && ws.readyState === WebSocket.OPEN) {
      try {
        ws.send(serialize(message));
      } catch (err) {
        console.error('Failed to send message:', err);
        enqueueMessage(message);
      }
    } else {
      // Queue the message for delivery after reconnection
      enqueueMessage(message);
    }

    function enqueueMessage(msg: TOutgoing): void {
      if (messageQueueRef.current.length >= maxQueueSize) {
        const dropped = messageQueueRef.current.shift();
        console.warn(`Message queue full (${maxQueueSize}). Dropping oldest message:`, dropped);
      }
      messageQueueRef.current.push(msg);
    }
  }, [serialize, maxQueueSize]);

  // --- Manual reconnect ---
  const manualReconnect = useCallback(() => {
    if (reconnectTimerRef.current) {
      clearTimeout(reconnectTimerRef.current);
      reconnectTimerRef.current = null;
    }
    attemptRef.current = 0;
    connect();
  }, [connect]);

  // --- Disconnect ---
  const disconnect = useCallback(() => {
    intentionalCloseRef.current = true;

    if (reconnectTimerRef.current) {
      clearTimeout(reconnectTimerRef.current);
      reconnectTimerRef.current = null;
    }

    if (wsRef.current) {
      wsRef.current.close(1000, 'Client disconnecting');
      wsRef.current = null;
    }

    updateStatus('disconnected');
  }, [updateStatus]);

  // --- Auto-connect on mount ---
  useEffect(() => {
    mountedRef.current = true;

    if (autoConnect && url) {
      connect();
    }

    return () => {
      mountedRef.current = false;

      if (reconnectTimerRef.current) {
        clearTimeout(reconnectTimerRef.current);
      }

      if (wsRef.current) {
        intentionalCloseRef.current = true;
        wsRef.current.close(1000, 'Component unmounting');
        wsRef.current = null;
      }
    };
  }, [url, autoConnect, connect]);

  return {
    status,
    send,
    lastMessage,
    reconnect: manualReconnect,
    disconnect,
  };
}

Ejemplo de uso

import { useWebSocket } from '@platform/shared-utils';

// Define typed message contracts
interface ChatIncoming {
  type: 'chat' | 'typing' | 'user-joined' | 'user-left' | 'history' | 'replay';
  payload: Record<string, unknown>;
  sender: string;
  timestamp: number;
}

interface ChatOutgoing {
  type: 'chat' | 'typing' | 'ping';
  payload: Record<string, unknown>;
}

function ChatPanel({ roomId }: { roomId: string }) {
  const [messages, setMessages] = useState<ChatIncoming[]>([]);

  const { status, send, reconnect } = useWebSocket<ChatIncoming, ChatOutgoing>(
    `wss://api.example.com/ws/room/${roomId}`,
    {
      onMessage: (msg) => {
        switch (msg.type) {
          case 'chat':
            setMessages((prev) => [...prev, msg]);
            break;
          case 'history':
          case 'replay':
            setMessages((prev) => [
              ...prev,
              ...(msg.payload.messages as ChatIncoming[]),
            ]);
            break;
        }
      },
      onStatusChange: (newStatus) => {
        console.log(`Chat connection: ${newStatus}`);
      },
      reconnect: {
        enabled: true,
        maxAttempts: 30,
      },
    },
  );

  const sendMessage = (text: string) => {
    send({ type: 'chat', payload: { text } });
  };

  return (
    <div>
      <ConnectionBadge status={status} onReconnect={reconnect} />
      <MessageList messages={messages} />
      <MessageInput onSend={sendMessage} disabled={status !== 'connected'} />
    </div>
  );
}

Integracion con Module Federation

Paquete shared-utils

El hook useWebSocket y las utilidades relacionadas residen en el paquete shared-utils dentro del monorepo. Este paquete se expone via Module Federation para que cada micro frontend pueda importar la misma implementacion sin empaquetarla por separado.

packages/
  shared-utils/
    src/
      hooks/
        useWebSocket.ts        # The hook described above
        usePresence.ts         # Presence-specific hook built on useWebSocket
        useNotifications.ts    # Notification channel hook
      websocket/
        types.ts               # Shared message type definitions
        constants.ts           # Endpoints, reconnect defaults
      index.ts                 # Public API

Configuracion de Module Federation en el shell:

// rsbuild.config.ts (shell)
new ModuleFederationPlugin({
  name: 'shell',
  shared: {
    '@platform/shared-utils': {
      singleton: true,
      requiredVersion: '^1.0.0',
    },
    react: { singleton: true, requiredVersion: '^19.2.4' },
    'react-dom': { singleton: true, requiredVersion: '^19.2.4' },
  },
});

Independencia del MFE

Cada micro frontend importa y usa el hook WebSocket de forma independiente. No existe un gestor WebSocket centralizado a traves del cual todos los MFEs deban comunicarse:

// In the "orders" MFE
import { useWebSocket } from '@platform/shared-utils';

function OrderTracker({ orderId }: { orderId: string }) {
  const { status, lastMessage } = useWebSocket(
    `wss://api.example.com/ws/orders/${orderId}`,
  );
  // ...
}
// In the "analytics" MFE
import { useWebSocket } from '@platform/shared-utils';

function LiveDashboard({ dashboardId }: { dashboardId: string }) {
  const { status, lastMessage } = useWebSocket(
    `wss://api.example.com/ws/dashboard/${dashboardId}`,
  );
  // ...
}

Conexiones gestionadas por el shell

La aplicacion shell gestiona las conexiones WebSocket que abarcan toda la plataforma y son transversales a todos los MFEs -- especificamente presencia y notificaciones. Estas conexiones se establecen una sola vez cuando el usuario inicia sesion y se comparten con los MFEs a traves de la API CustomEvent del navegador:

// Shell app -- establishes global connections
function ShellApp() {
  const { status: presenceStatus } = useWebSocket(
    `wss://api.example.com/ws/presence/${orgId}`,
    {
      onMessage: (msg) => {
        // Dispatch presence updates as CustomEvents for MFEs to consume
        window.dispatchEvent(
          new CustomEvent('platform:presence', { detail: msg }),
        );
      },
    },
  );

  const { status: notifStatus } = useWebSocket(
    `wss://api.example.com/ws/notifications/${userId}`,
    {
      onMessage: (msg) => {
        window.dispatchEvent(
          new CustomEvent('platform:notification', { detail: msg }),
        );
      },
    },
  );

  return <MicroFrontendContainer />;
}
// Any MFE can listen for shell-managed events
function usePresenceFromShell() {
  const [presenceMap, setPresenceMap] = useState<Map<string, PresenceInfo>>(new Map());

  useEffect(() => {
    const handler = (event: CustomEvent) => {
      const { userId, status } = event.detail.payload;
      setPresenceMap((prev) => {
        const next = new Map(prev);
        next.set(userId, { status, lastSeen: Date.now() });
        return next;
      });
    };

    window.addEventListener('platform:presence', handler as EventListener);
    return () => {
      window.removeEventListener('platform:presence', handler as EventListener);
    };
  }, []);

  return presenceMap;
}

Resumen de propiedad de conexiones

Tipo de conexionGestionada porAlcanceEjemplo
PresenciaShellToda la organizacionws/presence/:orgId
NotificacionesShellPor usuariows/notifications/:userId
Chat roomsMFE (messaging)Por salaws/room/:roomId
Edicion de documentosMFE (docs)Por documentows/doc/:docId
Actualizaciones de dashboardMFE (analytics)Por dashboardws/dashboard/:dashboardId
Seguimiento de pedidosMFE (orders)Por pedidows/orders/:orderId

Monitoreo y alertas de conteo de conexiones

Monitorear el conteo de conexiones WebSocket es critico para detectar problemas de capacidad, tormentas de reconexion anormales y presion de memoria antes de que afecten a los usuarios.

Monitoreo en el servidor

Expone metricas de conexion desde el Durable Object publicando conteos en cada evento de conexion y desconexion. Usa Cloudflare Workers Analytics Engine para almacenamiento de series temporales:

export class MonitoredChatRoom extends DurableObject<Env> {
  /**
   * Report the current connection count to the Analytics Engine.
   * Call this on every connect, disconnect, and periodically via alarm.
   */
  private reportConnectionMetrics(): void {
    const connections = this.ctx.getWebSockets();
    const totalConnections = connections.length;

    // Write to Analytics Engine for dashboarding and alerting
    this.env.WEBSOCKET_METRICS.writeDataPoint({
      blobs: [this.ctx.id.toString()],
      doubles: [totalConnections],
      indexes: ['connection_count'],
    });

    // Log a warning when approaching the recommended ceiling
    if (totalConnections > 4_000) {
      console.warn(
        `DO ${this.ctx.id.toString()} has ${totalConnections} connections — ` +
        `approaching the 5,000 recommended limit. Consider sharding.`,
      );
    }
  }

  async fetch(request: Request): Promise<Response> {
    // ... accept WebSocket ...
    this.reportConnectionMetrics();
    return new Response(null, { status: 101, webSocket: client });
  }

  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    ws.close(code, reason);
    this.reportConnectionMetrics();
    // ... rest of cleanup ...
  }

  async alarm(): Promise<void> {
    // Periodically report metrics even when idle, so gaps in the time series
    // indicate the DO was hibernating rather than simply unreported.
    this.reportConnectionMetrics();
    // ... rest of alarm logic ...
  }
}

Umbrales de alerta

Configura alertas sobre las siguientes condiciones usando Cloudflare Notifications o un sistema externo que consuma datos del Analytics Engine:

CondicionUmbralAccion
Conteo de conexiones alto> 4.000 por DOPreparar sharding; investigar si un unico limite de coordinacion es demasiado amplio
Pico rapido de reconexiones> 500 nuevas conexiones/min a un unico DOProbable evento de despliegue o red; verificar que los clientes usen backoff con jitter
Conteo de conexiones cae a ceroTodos los DOs en un namespace simultaneamentePosible incidente de despliegue o infraestructura; verificar enrutamiento del Worker
Tendencia de crecimiento sostenidoIncremento diario constantePlanificar capacidad para sharding antes de alcanzar el techo de 5.000

Monitoreo en el cliente

Registra eventos de reconexion en el cliente para exponer problemas de conectividad a backends de observabilidad:

onStatusChange: (status) => {
  // Emit to your telemetry pipeline (e.g., OpenTelemetry, Datadog, custom)
  telemetry.trackEvent('websocket_status_change', {
    status,
    url: wsUrl,
    timestamp: Date.now(),
  });

  if (status === 'failed') {
    telemetry.trackEvent('websocket_reconnection_exhausted', {
      url: wsUrl,
      timestamp: Date.now(),
    });
  }
},

Agrega metricas de reconexion del lado del cliente para detectar patrones como:

  • Un pico de eventos reconnecting en muchos clientes al mismo tiempo (indica un evento del lado del servidor).
  • Estado failed persistente para regiones o ISPs especificos (indica un problema en la ruta de red).
  • Alta frecuencia de reconexion para usuarios individuales (indica condiciones de red local inestables).

Precios y limites

Comprender los limites de recursos y el modelo de costes es esencial para disenar la topologia del Durable Object. Los siguientes valores reflejan el plan Paid de Cloudflare Workers.

Limites de Durable Objects

RecursoLimiteNotas
Solicitudes entrantes por DO~1.000/segLimite soft; puede elevarse en Enterprise
Tamano de mensaje WebSocket32 MiBTamano maximo de payload por mensaje
Almacenamiento del Durable Object10 GB por DOAlmacenamiento key-value co-ubicado con el DO
Operaciones de lectura de storage1.000/seg por DOLas lecturas batch cuentan como una sola operacion
Operaciones de escritura de storage1.000/seg por DOLas escrituras batch cuentan como una sola operacion
Limite de subrequest1.000 por invocacionLlamadas fetch salientes desde el DO
Tiempo de CPU por solicitud30 segundosTiempo de reloj de pared de CPU por invocacion
Tags por WebSocket10Se establecen al aceptar, inmutables
Tamano del tag256 bytesPor tag
Precision de alarma~30 segundos minimoNo apto para intervalos sub-segundo

Facturacion de hibernacion

Componente de facturacionActivoHibernando
Cargos por tiempo de CPUTarifa completaNo se cobra
Cargos por duracionTarifa completaNo se cobra
Cargos por almacenamientoEstandarEstandar (aplica siempre)
Cargos por mensaje WebSocketPor mensajePor mensaje (despierta el DO)

Nota sobre facturacion de Durable Object SQLite: La facturacion del almacenamiento Durable Object SQLite esta activa desde enero de 2026. Si tus Durable Objects usan el backend de almacenamiento respaldado por SQLite (el predeterminado para DOs recien creados), ten en cuenta que las lecturas y escrituras se facturan por fila leida/escrita en lugar de por operacion key-value. Consulta la pagina de Durable Objects Pricing para la tarjeta de tarifas actual de SQLite.

Implicaciones de coste para la arquitectura

  • Canales de presencia (mayormente inactivos, heartbeat ocasional): Hibernan mas del 95% del tiempo. Extremadamente rentables.
  • Canales de notificaciones (inactivos hasta que llega una notificacion): Coste casi nulo durante periodos de inactividad.
  • Chat rooms (actividad esporadica): Rentables para salas con rafagas de actividad seguidas de periodos tranquilos.
  • Edicion colaborativa (actualizaciones continuas de alta frecuencia): Coste mas alto debido a mensajes frecuentes que impiden la hibernacion. Usa batching de mensajes para reducir despertares.

Directrices de escalado

  • Manten cada Durable Object por debajo de 5.000 conexiones WebSocket concurrentes para un rendimiento fiable.
  • Para organizaciones con mas de 5.000 usuarios concurrentes, fragmenta la presencia entre multiples DOs (por ejemplo, presence:org-acme:shard-0, presence:org-acme:shard-1).
  • Usa operaciones batch de this.ctx.storage para mantenerte dentro del limite de 1.000 ops/seg.
  • Monitorea el tiempo de CPU del DO por solicitud para evitar alcanzar el limite de 30 segundos durante la limpieza basada en alarmas.

Referencias