Tutorial 3

Streaming

Demonstrates how a tool can stream multiple content fragments back to the client before returning the final result.

View source on GitHub →

Demonstrates how a tool can stream multiple content fragments back to the client before returning the final result.

How It Works

MCP supports streaming tool responses — a tool can send incremental results while it’s still running, then return a final result when done. This is useful for:

  • Long-running operations with progress updates
  • Search results arriving incrementally
  • Data processing pipelines with intermediate output

The tool accesses the stream via finemcp.StreamFromCtx(ctx):

tool, _ := finemcp.NewTool("long-task",
    func(ctx context.Context, input []byte) ([]byte, error) {
        stream := finemcp.StreamFromCtx(ctx)
        if stream != nil {
            // Send intermediate results
            for i := 1; i <= 5; i++ {
                stream.SendText(fmt.Sprintf("Step %d/5 done", i))
                time.Sleep(200 * time.Millisecond)
            }
            // Send structured content
            stream.Send(finemcp.TextContent{Text: "Final structured content"})
        }
        // Return the final result
        return []byte("Task completed!"), nil
    },
)

Stream API

MethodDescription
finemcp.StreamFromCtx(ctx)Get the *ToolStream from context (nil if transport doesn’t support streaming)
stream.SendText(text)Send a text fragment as a notification
stream.Send(content)Send structured content (e.g. TextContent, ImageContent)

Important: Transport Requirements

Not all transports support streaming. The stream is only available when the transport sets a NotificationSender in the context:

TransportStreamFromCtxStreaming works?
StartHTTPReturns nilNo — request/response only
StartStreamableReturns *ToolStreamYes — via GET SSE connection
StartSSEReturns *ToolStreamYes — via SSE events
StartWebSocketReturns *ToolStreamYes — via WebSocket messages
ServeStdioReturns *ToolStreamYes — via stdout notifications

This example uses StartStreamable because it’s the modern recommended transport for streaming.

Server Option

finemcp.WithStreamBufferSize(64)  // Channel buffer size for queued stream chunks

Architecture: Streamable HTTP

With StartStreamable, notifications (streaming chunks) are delivered via a separate GET SSE connection, not in the POST response:

Terminal 1 (GET SSE — receives notifications):
  curl -N http://localhost:8080 -H "Accept: text/event-stream" -H "Mcp-Session-Id: <id>"
  ← data: {"jsonrpc":"2.0","method":"notifications/message","params":{"content":{"type":"text","text":"Step 1/5 done"}}}
  ← data: {"jsonrpc":"2.0","method":"notifications/message","params":{"content":{"type":"text","text":"Step 2/5 done"}}}
  ← ...

Terminal 2 (POST — sends request, gets final result):
  curl -X POST http://localhost:8080 -H "Mcp-Session-Id: <id>" -d '{...tools/call...}'
  ← {"jsonrpc":"2.0","id":3,"result":{"content":[{"type":"text","text":"Task completed!"}]}}

In real MCP clients (Claude Desktop, Cursor, etc.), both connections are managed internally — you never see this split.

Testing with curl

# Start the server
go run ./03-streaming

# Step 1: Initialize and note the Mcp-Session-Id from response headers
curl -v -X POST http://localhost:8080 -H "Content-Type: application/json" -d '{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2025-03-26",
    "clientInfo": { "name": "curl-client", "version": "1.0.0" },
    "capabilities": {}
  }
}'

# Step 2 (Terminal 1): Open SSE stream to receive notifications
curl -N http://localhost:8080 \
  -H "Accept: text/event-stream" \
  -H "Mcp-Session-Id: <session-id>"

# Step 3 (Terminal 2): Call the tool
curl -X POST http://localhost:8080 -H "Content-Type: application/json" \
  -H "Mcp-Session-Id: <session-id>" -d '{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tools/call",
  "params": { "name": "long-task" }
}'

Terminal 1 will show the 5 progress steps in real-time. Terminal 2 will show only the final result.

More Examples

Streaming with error handling

stream := finemcp.StreamFromCtx(ctx)
if stream != nil {
    for i, item := range items {
        if err := stream.SendText(fmt.Sprintf("Processing %d/%d: %s", i+1, len(items), item)); err != nil {
            return nil, fmt.Errorf("stream interrupted: %w", err)
        }
    }
}

Graceful fallback for non-streaming transports

stream := finemcp.StreamFromCtx(ctx)
if stream != nil {
    stream.SendText("Starting...")
}
result := doWork()
if stream != nil {
    stream.SendText("Done!")
}
return []byte(result), nil
// Works with all transports — streaming or not