Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,19 @@ async def _handle_reconnection(
) -> None:
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
# Bail if max retries exceeded
if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
if attempt >= MAX_RECONNECTION_ATTEMPTS:
logger.warning(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch
error_data = ErrorData(
code=INTERNAL_ERROR,
message=(
f"SSE stream disconnected and max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded"
),
)
error_msg = SessionMessage(
JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.id, error=error_data)
)
await ctx.read_stream_writer.send(error_msg)
return

# Always wait - use server value or default
Expand All @@ -404,6 +415,7 @@ async def _handle_reconnection(
# Track for potential further reconnection
reconnect_last_event_id: str = last_event_id
reconnect_retry_ms = retry_interval_ms
received_data = False

async for sse in event_source.aiter_sse():
if sse.id: # pragma: no branch
Expand All @@ -421,9 +433,15 @@ async def _handle_reconnection(
await event_source.response.aclose()
return

# Stream ended again without response - reconnect again (reset attempt counter)
if sse.data:
received_data = True

# Stream ended without response - reset counter only if we received
# actual message data (not just priming events), otherwise increment
# to prevent infinite reconnection loops when the server always drops.
next_attempt = 0 if received_data else attempt + 1
logger.info("SSE stream disconnected, reconnecting...")
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, next_attempt)
except Exception as e: # pragma: no cover
logger.debug(f"Reconnection failed: {e}")
# Try to reconnect again if we still have an event ID
Expand Down
45 changes: 40 additions & 5 deletions tests/shared/test_streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ async def _handle_list_tools( # pragma: no cover
description="Tool that closes standalone GET stream mid-operation",
input_schema={"type": "object", "properties": {}},
),
Tool(
name="tool_with_perpetual_stream_close",
description="Tool that always closes the stream without sending a response",
input_schema={"type": "object", "properties": {}},
),
]
)

Expand Down Expand Up @@ -380,6 +385,16 @@ async def _handle_call_tool( # pragma: no cover

return CallToolResult(content=[TextContent(type="text", text="Standalone stream close test done")])

elif name == "tool_with_perpetual_stream_close":
# Repeatedly close the stream without ever sending a response.
# Used to verify that _handle_reconnection gives up after MAX_RECONNECTION_ATTEMPTS.
for _ in range(10):
if ctx.close_sse_stream:
await ctx.close_sse_stream()
await anyio.sleep(0.3)
# This response should never be reached by the client because reconnection gives up
return CallToolResult(content=[TextContent(type="text", text="Should not reach")])

return CallToolResult(content=[TextContent(type="text", text=f"Called {name}")])


Expand Down Expand Up @@ -1086,7 +1101,7 @@ async def test_streamable_http_client_tool_invocation(initialized_client_session
"""Test client tool invocation."""
# First list tools
tools = await initialized_client_session.list_tools()
assert len(tools.tools) == 10
assert len(tools.tools) == 11
assert tools.tools[0].name == "test_tool"

# Call the tool
Expand Down Expand Up @@ -1116,7 +1131,7 @@ async def test_streamable_http_client_session_persistence(basic_server: None, ba

# Make multiple requests to verify session persistence
tools = await session.list_tools()
assert len(tools.tools) == 10
assert len(tools.tools) == 11

# Read a resource
resource = await session.read_resource(uri="foobar://test-persist")
Expand All @@ -1138,7 +1153,7 @@ async def test_streamable_http_client_json_response(json_response_server: None,

# Check tool listing
tools = await session.list_tools()
assert len(tools.tools) == 10
assert len(tools.tools) == 11

# Call a tool and verify JSON response handling
result = await session.call_tool("test_tool", {})
Expand Down Expand Up @@ -1220,7 +1235,7 @@ async def test_streamable_http_client_session_termination(basic_server: None, ba

# Make a request to confirm session is working
tools = await session.list_tools()
assert len(tools.tools) == 10
assert len(tools.tools) == 11

async with create_mcp_http_client(headers=headers) as httpx_client2:
async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as (
Expand Down Expand Up @@ -1281,7 +1296,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt

# Make a request to confirm session is working
tools = await session.list_tools()
assert len(tools.tools) == 10
assert len(tools.tools) == 11

async with create_mcp_http_client(headers=headers) as httpx_client2:
async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as (
Expand Down Expand Up @@ -2318,3 +2333,23 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(

assert "content-type" in headers_data
assert headers_data["content-type"] == "application/json"


@pytest.mark.anyio
async def test_reconnection_gives_up_after_max_attempts(
event_server: tuple[SimpleEventStore, str],
) -> None:
"""Client should stop reconnecting after MAX_RECONNECTION_ATTEMPTS and return an error.

Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/2393:
_handle_reconnection used to reset the attempt counter to 0 when the stream ended
without a response, causing an infinite retry loop.
"""
_, server_url = event_server

async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

with pytest.raises(MCPError), anyio.fail_after(30): # pragma: no branch
await session.call_tool("tool_with_perpetual_stream_close", {})
Loading