macp-sdk-typescriptGuides
Streaming
Session Streaming
MacpStream provides a bidirectional gRPC stream for real-time session participation. Use it when you want to receive accepted envelopes as they arrive rather than polling.
const stream = client.openStream({ auth: Auth.devAgent('observer') });
// Send an envelope through the stream
await stream.send(envelope);
// Consume accepted envelopes as an async iterator
for await (const received of stream.responses()) {
console.log(received.messageType, received.sender);
if (received.messageType === 'Commitment') {
break; // session resolved
}
}
// Always close when done
stream.close();Stream Lifecycle
openStream()opens a duplex gRPC streamsend()writes envelopes to the streamresponses()yields accepted envelopes from the serverclose()terminates the stream
Errors on the stream surface as thrown exceptions from the async iterator:
try {
for await (const envelope of stream.responses()) {
// process
}
} catch (err) {
if (err instanceof MacpTransportError) {
console.log('stream disconnected:', err.message);
}
}Important Notes
StreamSessionis a server-advertised capability (sessions.stream). CheckInitializeresponse capabilities before using.- The stream delivers envelopes in authoritative acceptance order, matching the runtime's ordering.
- Stream does not support late-attach — you only receive envelopes accepted after connecting.
- For durable observation, use
getSession()to get current state + stream for live updates.
Mode Registry Watcher
ModeRegistryWatcher monitors changes to the runtime's mode registry (registrations, unregistrations, promotions):
import { ModeRegistryWatcher } from 'macp-sdk-typescript';
const watcher = new ModeRegistryWatcher(client, { auth });Async Iterator with AbortSignal
const controller = new AbortController();
// Cancel after 30 seconds
setTimeout(() => controller.abort(), 30_000);
for await (const change of watcher.changes(controller.signal)) {
console.log('mode registry changed at', change.observedAtUnixMs);
// Refresh mode list
const modes = await client.listModes();
const extModes = await client.listExtModes();
}Callback-Based
// Blocks until the stream ends or errors
await watcher.watch(async (change) => {
console.log('change detected:', change.observedAtUnixMs);
});One-Shot
Wait for a single change event:
const change = await watcher.nextChange();
console.log('first change at', change.observedAtUnixMs);Roots Watcher
RootsWatcher monitors changes to coordination roots/boundaries:
import { RootsWatcher } from 'macp-sdk-typescript';
const watcher = new RootsWatcher(client, { auth });
for await (const change of watcher.changes()) {
console.log('roots changed at', change.observedAtUnixMs);
// Refresh root list
const roots = await client.listRoots();
console.log('current roots:', roots.roots);
}The API is identical to ModeRegistryWatcher — changes(signal?), watch(handler), and nextChange().
Combining Streams
Run multiple watchers concurrently:
const controller = new AbortController();
// Watch modes and roots in parallel
await Promise.all([
(async () => {
const watcher = new ModeRegistryWatcher(client, { auth });
for await (const c of watcher.changes(controller.signal)) {
console.log('mode registry:', c.observedAtUnixMs);
}
})(),
(async () => {
const watcher = new RootsWatcher(client, { auth });
for await (const c of watcher.changes(controller.signal)) {
console.log('roots:', c.observedAtUnixMs);
}
})(),
]);