MACP

Streaming

Session Streaming

MacpStream provides a bidirectional gRPC stream for real-time session participation. Use it when you want to receive accepted envelopes as they arrive rather than polling.

const stream = client.openStream({ auth: Auth.devAgent('observer') });

// Send an envelope through the stream
await stream.send(envelope);

// Consume accepted envelopes as an async iterator
for await (const received of stream.responses()) {
  console.log(received.messageType, received.sender);

  if (received.messageType === 'Commitment') {
    break; // session resolved
  }
}

// Always close when done
stream.close();

Stream Lifecycle

  1. openStream() opens a duplex gRPC stream
  2. send() writes envelopes to the stream
  3. responses() yields accepted envelopes from the server
  4. close() terminates the stream

Errors on the stream surface as thrown exceptions from the async iterator:

try {
  for await (const envelope of stream.responses()) {
    // process
  }
} catch (err) {
  if (err instanceof MacpTransportError) {
    console.log('stream disconnected:', err.message);
  }
}

Important Notes

  • StreamSession is a server-advertised capability (sessions.stream). Check Initialize response capabilities before using.
  • The stream delivers envelopes in authoritative acceptance order, matching the runtime's ordering.
  • Stream does not support late-attach — you only receive envelopes accepted after connecting.
  • For durable observation, use getSession() to get current state + stream for live updates.

Mode Registry Watcher

ModeRegistryWatcher monitors changes to the runtime's mode registry (registrations, unregistrations, promotions):

import { ModeRegistryWatcher } from 'macp-sdk-typescript';

const watcher = new ModeRegistryWatcher(client, { auth });

Async Iterator with AbortSignal

const controller = new AbortController();

// Cancel after 30 seconds
setTimeout(() => controller.abort(), 30_000);

for await (const change of watcher.changes(controller.signal)) {
  console.log('mode registry changed at', change.observedAtUnixMs);

  // Refresh mode list
  const modes = await client.listModes();
  const extModes = await client.listExtModes();
}

Callback-Based

// Blocks until the stream ends or errors
await watcher.watch(async (change) => {
  console.log('change detected:', change.observedAtUnixMs);
});

One-Shot

Wait for a single change event:

const change = await watcher.nextChange();
console.log('first change at', change.observedAtUnixMs);

Roots Watcher

RootsWatcher monitors changes to coordination roots/boundaries:

import { RootsWatcher } from 'macp-sdk-typescript';

const watcher = new RootsWatcher(client, { auth });

for await (const change of watcher.changes()) {
  console.log('roots changed at', change.observedAtUnixMs);

  // Refresh root list
  const roots = await client.listRoots();
  console.log('current roots:', roots.roots);
}

The API is identical to ModeRegistryWatcherchanges(signal?), watch(handler), and nextChange().

Combining Streams

Run multiple watchers concurrently:

const controller = new AbortController();

// Watch modes and roots in parallel
await Promise.all([
  (async () => {
    const watcher = new ModeRegistryWatcher(client, { auth });
    for await (const c of watcher.changes(controller.signal)) {
      console.log('mode registry:', c.observedAtUnixMs);
    }
  })(),
  (async () => {
    const watcher = new RootsWatcher(client, { auth });
    for await (const c of watcher.changes(controller.signal)) {
      console.log('roots:', c.observedAtUnixMs);
    }
  })(),
]);