Skip to content

Commit 328b925

Browse files
felixweinbergerjingx8885surya-prakash-susarla
committed
Fix child process cleanup in stdio termination
When terminating MCP servers, child processes were being orphaned because only the parent process was killed. This caused resource leaks and prevented proper cleanup, especially with tools like npx that spawn child processes for the actual server implementation. This was happening on both POSIX and Windows systems - however because of implementation details, resolving this is non-trivial and requires introducing psutil to introduce cross-platform utilities for dealing with children and process trees. This addresses critical issues where MCP servers using process spawning tools would leave zombie processes running after client shutdown. resolves #850 resolves #729 Co-authored-by: jingx8885 <jingxu8885@qq.com> Co-authored-by: Surya Prakash Susarla <susarla.surya.prakash.1998@gmail.com>
1 parent 7af9e65 commit 328b925

File tree

5 files changed

+535
-13
lines changed

5 files changed

+535
-13
lines changed

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = [
3232
"pydantic-settings>=2.5.2",
3333
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
3434
"jsonschema>=4.20.0",
35+
"pywin32>=310; sys_platform == 'win32'",
3536
]
3637

3738
[project.optional-dependencies]
@@ -124,5 +125,7 @@ filterwarnings = [
124125
# This should be fixed on Uvicorn's side.
125126
"ignore::DeprecationWarning:websockets",
126127
"ignore:websockets.server.WebSocketServerProtocol is deprecated:DeprecationWarning",
127-
"ignore:Returning str or bytes.*:DeprecationWarning:mcp.server.lowlevel"
128+
"ignore:Returning str or bytes.*:DeprecationWarning:mcp.server.lowlevel",
129+
# pywin32 internal deprecation warning
130+
"ignore:getargs.*The 'u' format is deprecated:DeprecationWarning"
128131
]

src/mcp/client/stdio/__init__.py

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import os
2+
import signal
23
import sys
34
from contextlib import asynccontextmanager
45
from pathlib import Path
56
from typing import Literal, TextIO
67

78
import anyio
89
import anyio.lowlevel
10+
from anyio.abc import Process
911
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1012
from anyio.streams.text import TextReceiveStream
1113
from pydantic import BaseModel, Field
@@ -14,8 +16,10 @@
1416
from mcp.shared.message import SessionMessage
1517

1618
from .win32 import (
19+
FallbackProcess,
1720
create_windows_process,
1821
get_windows_executable_command,
22+
terminate_windows_process_tree,
1923
)
2024

2125
# Environment variables to inherit by default
@@ -184,7 +188,7 @@ async def stdin_writer():
184188
await process.wait()
185189
except TimeoutError:
186190
# If process doesn't terminate in time, force kill it
187-
process.kill()
191+
await _terminate_process_tree(process)
188192
except ProcessLookupError:
189193
# Process already exited, which is fine
190194
pass
@@ -219,11 +223,80 @@ async def _create_platform_compatible_process(
219223
):
220224
"""
221225
Creates a subprocess in a platform-compatible way.
222-
Returns a process handle.
226+
227+
Unix: Creates process in a new session/process group for killpg support
228+
Windows: Creates process in a Job Object for reliable child termination
223229
"""
224230
if sys.platform == "win32":
231+
# Windows: Use Job Objects for proper process tree management
225232
process = await create_windows_process(command, args, env, errlog, cwd)
226233
else:
227-
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
234+
# Unix: Create process in new session for process group termination
235+
process = await anyio.open_process(
236+
[command, *args],
237+
env=env,
238+
stderr=errlog,
239+
cwd=cwd,
240+
start_new_session=True,
241+
)
228242

229243
return process
244+
245+
246+
async def _terminate_process_tree(process: Process | FallbackProcess, timeout: float = 2.0) -> None:
247+
"""
248+
Terminate a process and all its children using platform-specific methods.
249+
250+
Unix: Uses os.killpg() for atomic process group termination
251+
Windows: Uses Job Objects via pywin32 for reliable child process cleanup
252+
"""
253+
if sys.platform == "win32":
254+
# Windows: Use Job Object termination
255+
await terminate_windows_process_tree(process)
256+
else:
257+
# Unix: Use process groups for atomic termination
258+
pid = getattr(process, "pid", None)
259+
if pid is None:
260+
popen = getattr(process, "popen", None)
261+
if popen:
262+
pid = getattr(popen, "pid", None)
263+
264+
if not pid:
265+
return
266+
267+
try:
268+
# Get process group ID (we use start_new_session=True)
269+
pgid = os.getpgid(pid)
270+
271+
# Send SIGTERM to entire process group atomically
272+
os.killpg(pgid, signal.SIGTERM)
273+
274+
# Wait for graceful termination
275+
deadline = anyio.current_time() + timeout
276+
while anyio.current_time() < deadline:
277+
try:
278+
# Check if process group still exists (signal 0 = check only)
279+
os.killpg(pgid, 0)
280+
await anyio.sleep(0.1)
281+
except ProcessLookupError:
282+
# Process group terminated successfully
283+
return
284+
285+
# Force kill if still alive after timeout
286+
try:
287+
os.killpg(pgid, signal.SIGKILL)
288+
except ProcessLookupError:
289+
# Already dead
290+
pass
291+
292+
except (ProcessLookupError, PermissionError, OSError):
293+
# Fall back to simple terminate if process group approach fails
294+
try:
295+
process.terminate()
296+
with anyio.fail_after(timeout):
297+
await process.wait()
298+
except Exception:
299+
try:
300+
process.kill()
301+
except Exception:
302+
pass

src/mcp/client/stdio/win32.py

Lines changed: 128 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@
1313
from anyio.abc import Process
1414
from anyio.streams.file import FileReadStream, FileWriteStream
1515

16+
# Windows-specific imports for Job Objects
17+
if sys.platform == "win32":
18+
import pywintypes
19+
import win32api
20+
import win32con
21+
import win32job
22+
else:
23+
# Type stubs for non-Windows platforms
24+
win32api = None
25+
win32con = None
26+
win32job = None
27+
pywintypes = None
28+
29+
JobHandle = int
30+
1631

1732
def get_windows_executable_command(command: str) -> str:
1833
"""
@@ -103,6 +118,11 @@ def kill(self) -> None:
103118
"""Kill the subprocess immediately (alias for terminate)."""
104119
self.terminate()
105120

121+
@property
122+
def pid(self) -> int:
123+
"""Return the process ID."""
124+
return self.popen.pid
125+
106126

107127
# ------------------------
108128
# Updated function
@@ -117,13 +137,16 @@ async def create_windows_process(
117137
cwd: Path | str | None = None,
118138
) -> Process | FallbackProcess:
119139
"""
120-
Creates a subprocess in a Windows-compatible way.
140+
Creates a subprocess in a Windows-compatible way with Job Object support.
121141
122142
Attempt to use anyio's open_process for async subprocess creation.
123143
In some cases this will throw NotImplementedError on Windows, e.g.
124144
when using the SelectorEventLoop which does not support async subprocesses.
125145
In that case, we fall back to using subprocess.Popen.
126146
147+
The process is automatically added to a Job Object to ensure all child
148+
processes are terminated when the parent is terminated.
149+
127150
Args:
128151
command (str): The executable to run
129152
args (list[str]): List of command line arguments
@@ -132,8 +155,11 @@ async def create_windows_process(
132155
cwd (Path | str | None): Working directory for the subprocess
133156
134157
Returns:
135-
FallbackProcess: Async-compatible subprocess with stdin and stdout streams
158+
Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams
136159
"""
160+
job = _create_job_object()
161+
process = None
162+
137163
try:
138164
# First try using anyio with Windows-specific flags to hide console window
139165
process = await anyio.open_process(
@@ -146,10 +172,9 @@ async def create_windows_process(
146172
stderr=errlog,
147173
cwd=cwd,
148174
)
149-
return process
150175
except NotImplementedError:
151-
# Windows often doesn't support async subprocess creation, use fallback
152-
return await _create_windows_fallback_process(command, args, env, errlog, cwd)
176+
# If Windows doesn't support async subprocess creation, use fallback
177+
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
153178
except Exception:
154179
# Try again without creation flags
155180
process = await anyio.open_process(
@@ -158,7 +183,9 @@ async def create_windows_process(
158183
stderr=errlog,
159184
cwd=cwd,
160185
)
161-
return process
186+
187+
_maybe_assign_process_to_job(process, job)
188+
return process
162189

163190

164191
async def _create_windows_fallback_process(
@@ -185,8 +212,6 @@ async def _create_windows_fallback_process(
185212
bufsize=0, # Unbuffered output
186213
creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0),
187214
)
188-
return FallbackProcess(popen_obj)
189-
190215
except Exception:
191216
# If creationflags failed, fallback without them
192217
popen_obj = subprocess.Popen(
@@ -198,4 +223,98 @@ async def _create_windows_fallback_process(
198223
cwd=cwd,
199224
bufsize=0,
200225
)
201-
return FallbackProcess(popen_obj)
226+
process = FallbackProcess(popen_obj)
227+
return process
228+
229+
230+
def _create_job_object() -> int | None:
231+
"""
232+
Create a Windows Job Object configured to terminate all processes when closed.
233+
234+
Returns:
235+
The job object handle, or None if creation failed.
236+
"""
237+
if sys.platform != "win32" or not win32job:
238+
return None
239+
240+
try:
241+
job = win32job.CreateJobObject(None, "")
242+
extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation)
243+
244+
# Set the job to terminate all processes when the handle is closed
245+
extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
246+
win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info)
247+
return job
248+
except Exception:
249+
# If job creation fails, return None
250+
return None
251+
252+
253+
def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None:
254+
"""
255+
Try to assign a process to a job object. If assignment fails
256+
for any reason, the job handle is closed.
257+
258+
Args:
259+
process: The process to assign to the job
260+
job: The job object handle (may be None)
261+
"""
262+
if not job:
263+
return
264+
265+
if sys.platform != "win32" or not win32api or not win32con or not win32job:
266+
return
267+
268+
try:
269+
process_handle = win32api.OpenProcess(
270+
win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid
271+
)
272+
if not process_handle:
273+
raise Exception("Failed to open process handle")
274+
275+
try:
276+
# Assign process to job
277+
win32job.AssignProcessToJobObject(job, process_handle)
278+
process._job_object = job
279+
finally:
280+
# Always close the process handle
281+
win32api.CloseHandle(process_handle)
282+
except Exception:
283+
# If we can't assign to job, close it
284+
if win32api:
285+
win32api.CloseHandle(job)
286+
287+
288+
async def terminate_windows_process_tree(process: Process | FallbackProcess) -> None:
289+
"""
290+
Terminate a process and all its children on Windows.
291+
292+
If the process has an associated job object, it will be terminated.
293+
Otherwise, falls back to basic process termination.
294+
295+
Args:
296+
process: The process to terminate
297+
"""
298+
if sys.platform != "win32":
299+
return
300+
301+
# Check if process has a job object
302+
job = getattr(process, "_job_object", None)
303+
if job and win32job:
304+
try:
305+
win32job.TerminateJobObject(job, 1)
306+
except Exception:
307+
# Job might already be terminated
308+
pass
309+
finally:
310+
if win32api:
311+
try:
312+
win32api.CloseHandle(job)
313+
except Exception:
314+
pass
315+
316+
# Always try to terminate the process itself as well
317+
try:
318+
process.terminate()
319+
except Exception:
320+
pass

0 commit comments

Comments
 (0)