From 0ec2efe9fc40ccbcb95b2e55ed4c3cee151e0ba2 Mon Sep 17 00:00:00 2001 From: Armando Alexis Sepulveda Cruz Date: Fri, 27 Mar 2026 20:09:40 -0600 Subject: [PATCH] fix: route server-initiated requests via GET SSE in responseStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a client sends a POST request to initiate a tool/handler, the server uses ServerResponse.sse() (WebMVC) which calls request.startAsync(), putting Tomcat's OutputBuffer into 'suspended' async mode. While the request handler is executing (e.g. waiting for a sampling/createMessage response), any writes to the POST SSE transport's OutputBuffer are buffered and never flushed to the network — because Tomcat only flushes the buffer once the async context dispatches (i.e. when the consumer lambda returns). This causes a deadlock for server-initiated requests such as MCP Sampling (sampling/createMessage) and Elicitation: the handler blocks waiting for the client's response, but the client never receives the request because it is stuck in the suspended buffer. The GET SSE stream (listeningStreamRef) does not have this problem: its consumer lambda exits immediately, so Tomcat releases the OutputBuffer and writes from any thread reach the socket immediately. Fix: In responseStream(), construct the McpAsyncServerExchange using McpStreamableServerSession.this (the outer session) as the session instead of the inner McpStreamableServerSessionStream tied to the POST SSE transport. McpStreamableServerSession.sendRequest() already delegates to listeningStreamRef (GET SSE), so server-initiated requests are now correctly sent through the GET SSE channel. Existing integration tests in AbstractMcpClientServerIntegrationTests (testCreateMessageSuccess, testCreateMessageWithRequestTimeoutSuccess) cover this scenario for HttpServletStreamableIntegrationTests. --- .../spec/McpStreamableServerSession.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java index 9ec2117bb..3dfe3e93a 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java @@ -182,7 +182,6 @@ public Mono responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr return Mono.deferContextual(ctx -> { McpTransportContext transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); - McpStreamableServerSessionStream stream = new McpStreamableServerSessionStream(transport); McpRequestHandler requestHandler = McpStreamableServerSession.this.requestHandlers .get(jsonrpcRequest.method()); // TODO: delegate to stream, which upon successful response should close @@ -195,9 +194,19 @@ public Mono responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND, error.message(), error.data()))); } + // Use McpStreamableServerSession.this as the session for the exchange so that + // server-initiated requests (e.g. sampling/createMessage, elicitation) are + // routed via the GET SSE listening stream (listeningStreamRef) instead of + // the POST SSE response stream. The POST SSE response stream's underlying + // Tomcat OutputBuffer is in "suspended" async mode while the request handler + // is running, so writes to it are buffered and never flushed to the network + // until the handler completes — causing a deadlock when the handler blocks + // waiting for the client's sampling response. The GET SSE stream's consumer + // has already exited by the time requests arrive, so its buffer is not + // suspended and writes are immediately visible to the client. return requestHandler - .handle(new McpAsyncServerExchange(this.id, stream, clientCapabilities.get(), clientInfo.get(), - transportContext), jsonrpcRequest.params()) + .handle(new McpAsyncServerExchange(this.id, McpStreamableServerSession.this, + clientCapabilities.get(), clientInfo.get(), transportContext), jsonrpcRequest.params()) .map(result -> new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), result, null)) .onErrorResume(e -> {