Documentation Index
Fetch the complete documentation index at: https://chatjs.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Resumable streams let useChat reconnect to ongoing message generation after page reloads, network interruptions, or tab switches. Without this, users lose partial AI responses and must regenerate from scratch.
Architecture Overview
- POST /api/chat - Creates stream, saves message with
activeStreamId, publishes chunks to Redis
- GET /api/chat/[id]/stream - Resumes stream from Redis or returns finalized message
How It Works
1. Stream Creation (POST /api/chat)
When you receive a new chat request:
// Generate unique IDs
const messageId = generateUUID();
const streamId = generateUUID();
// Save placeholder message with activeStreamId marker
await saveMessage({
id: messageId,
chatId,
message: {
id: messageId,
role: "assistant",
parts: [],
metadata: {
activeStreamId: streamId, // <-- Key marker for resumability
},
},
});
// Create resumable stream
const streamContext = getStreamContext();
return new Response(
await streamContext.resumableStream(streamId, () =>
stream.pipeThrough(new JsonToSseTransformStream())
)
);
The activeStreamId in message metadata signals that the stream is in-flight and can be resumed.
2. Stream Context Initialization
You need Redis for cross-instance pub/sub:
import { createResumableStreamContext } from "resumable-stream";
let globalStreamContext: ResumableStreamContext | null = null;
export function getStreamContext() {
if (!globalStreamContext) {
globalStreamContext = createResumableStreamContext({
waitUntil: after, // Next.js after() for background cleanup
keyPrefix: `${config.appPrefix}:resumable-stream`,
publisher: redisPublisher,
subscriber: redisSubscriber,
});
}
return globalStreamContext;
}
3. Client Resume Detection
Your client detects partial messages and triggers resume:
const lastMessage = threadInitialMessages.at(-1);
const isLastMessagePartial = !!lastMessage?.metadata?.activeStreamId;
useChat<ChatMessage>({
resume: isLastMessagePartial, // Triggers reconnection on mount
transport: new DefaultChatTransport({
prepareReconnectToStreamRequest({ id: chatId }) {
const partialMessageId = lastMessage?.metadata?.activeStreamId
? lastMessage.id
: null;
return {
api: `/api/chat/${chatId}/stream?messageId=${partialMessageId}`,
};
},
}),
});
4. Stream Resume (GET /api/chat/[id]/stream)
The resume endpoint handles three scenarios:
export async function GET(request: NextRequest, { params }) {
const { id: chatId } = await params;
const messageId = request.nextUrl.searchParams.get("messageId");
const messageWithParts = await getChatMessageWithPartsById({ id: messageId });
const { message } = messageWithParts;
// Scenario 1: Stream already finished
if (!message.metadata.activeStreamId) {
return appendMessageResponse(message); // Send finalized message
}
// Scenario 2: Stream still active - resume it
const streamContext = getStreamContext();
const stream = await streamContext.resumeExistingStream(
message.metadata.activeStreamId
);
if (stream) {
return new Response(stream, { headers: UI_MESSAGE_STREAM_HEADERS });
}
// Scenario 3: Stream lost but message finalized (race condition)
const refreshed = await getChatMessageWithPartsById({ id: messageId });
if (refreshed && !refreshed.message.metadata.activeStreamId) {
return appendMessageResponse(refreshed.message);
}
return new Response(null, { status: 204 });
}
5. Stream Finalization
When streaming completes, you clear the activeStreamId:
onFinish: async ({ messages }) => {
const assistantMessage = messages.at(-1);
await updateMessage({
id: assistantMessage.id,
chatId,
message: {
...assistantMessage,
metadata: {
...assistantMessage.metadata,
activeStreamId: null, // <-- Clear marker
},
},
});
};
On error, also clear it to prevent a stuck resumable state:
onError: (error) => {
after(() =>
updateMessageActiveStreamId({ id: messageId, activeStreamId: null })
);
};
Redis Key Cleanup
Set TTL on stream keys to prevent Redis bloat:
after(async () => {
const keyPattern = `${config.appPrefix}:resumable-stream:rs:sentinel:${streamId}*`;
const keys = await redisPublisher.keys(keyPattern);
await Promise.all(keys.map((key) => redisPublisher.expire(key, 300))); // 5 min TTL
});
Fallback Behavior
Without Redis, resumable streams are disabled:
if (!globalStreamContext) {
console.log("> Resumable streams are disabled due to missing REDIS_URL");
}
Your app still works. Streams just can’t be resumed after disconnection.
Message Flow
Key Implementation Details
- activeStreamId as state marker - Message metadata tracks stream state
- Placeholder message first - Save before streaming so resume can find it
- Redis pub/sub - Enables cross-instance resumption in multi-pod deployments
- appendMessage fallback - If stream finished, send the complete message as a data chunk
- Race condition handling - Double-check DB if Redis stream is missing
Stopping streams
Resumable streams stay compatible with stopping as long as you do not wire request.signal into streamText. Use a server-owned AbortController and a persisted stop flag instead.
See Stop Resumable Streams.
Further Reading