Skip to content

fix: remove stdout stream handling for share #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 45 additions & 93 deletions src/mcpm/commands/share.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
Share command for MCPM - Share a single MCP server through a tunnel
"""

import os
import secrets
import select
import shlex
import shutil
import signal
Expand All @@ -27,19 +25,6 @@ def find_mcp_proxy() -> Optional[str]:
return shutil.which("mcp-proxy")


def make_non_blocking(file_obj):
"""Make a file object non-blocking."""
if os.name == 'posix':
import fcntl

fd = file_obj.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# On other platforms (e.g., Windows), we rely on the behavior of select()
# and the non-blocking nature of readline() on Popen streams,
# or the existing try-except for IOError/OSError.


def wait_for_random_port(process: subprocess.Popen, timeout: int = 20) -> Optional[int]:
"""
Wait for mcp-proxy to output the random port information.
Expand Down Expand Up @@ -74,39 +59,26 @@ def wait_for_random_port(process: subprocess.Popen, timeout: int = 20) -> Option
console.print(f"[red]Error output:[/]\n{stderr_output}")
sys.exit(1)

# Use select to wait for data to be available without blocking
readable = []
if process.stdout:
readable.append(process.stdout)
if process.stderr:
readable.append(process.stderr)

if readable:
# Wait for up to 1 second for output
r, _, _ = select.select(readable, [], [], 1.0)

# Process available output
for stream in r:
try:
line = stream.readline()
if line:
print(line.rstrip())

# Check for port information
if "Uvicorn running on http://" in line:
try:
url_part = line.split("Uvicorn running on ")[1].split(" ")[0]
actual_port = int(url_part.split(":")[-1].strip())
port_found = True
console.print(
f"[cyan]mcp-proxy SSE server running on port [bold]{actual_port}[/bold][/]"
)
break
except (ValueError, IndexError):
pass
except (IOError, OSError):
# Resource temporarily unavailable - this is normal for non-blocking IO
pass
# Process available output
try:
if process.stderr:
line = process.stderr.readline()
if line:
console.print(line.rstrip())

# Check for port information
if "Uvicorn running on http://" in line:
try:
url_part = line.split("Uvicorn running on ")[1].split(" ")[0]
actual_port = int(url_part.split(":")[-1].strip())
port_found = True
console.print(f"[cyan]mcp-proxy SSE server running on port [bold]{actual_port}[/bold][/]")
break
except (ValueError, IndexError):
pass
except (IOError, OSError):
# Resource temporarily unavailable - this is normal for non-blocking IO
pass
else:
# No streams to read from, just wait a bit
time.sleep(0.5)
Expand Down Expand Up @@ -148,12 +120,6 @@ def start_mcp_proxy(command: str, port: Optional[int] = None) -> Tuple[subproces
console.print(f"[cyan]Running command: [bold]{' '.join(cmd_parts)}[/bold][/]")
process = subprocess.Popen(cmd_parts, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1)

# Make stdout and stderr non-blocking
if process.stdout:
make_non_blocking(process.stdout)
if process.stderr:
make_non_blocking(process.stderr)

# If port is None, we need to parse the output to find the random port
actual_port = port
if not actual_port:
Expand Down Expand Up @@ -352,45 +318,31 @@ def signal_handler(sig, frame):
if tunnel:
tunnel.kill()
break

# Use select to check for available output without blocking
readable = []
if server_process.stdout:
readable.append(server_process.stdout)
if server_process.stderr:
readable.append(server_process.stderr)

if readable:
# Wait for up to 1 second for output
r, _, _ = select.select(readable, [], [], 1.0)

# Process available output
for stream in r:
try:
line = stream.readline()
if line:
line_str = line.rstrip()
print(line_str)
last_activity_time = time.time()

# Check for error messages
error_msg = monitor_for_errors(line_str)
if error_msg and error_msg not in error_messages:
console.print(f"[bold red]Error:[/] {error_msg}")
error_messages.append(error_msg)
server_error_detected = True

# If this is a critical error and we have retries left, restart
if "Protocol initialization error" in error_msg and retries_left > 0:
console.print(
f"[yellow]Will attempt to restart ({retries_left} retries left)[/]"
)
# Break out of the loop to trigger a restart
server_process.terminate()
break
except (IOError, OSError):
# Resource temporarily unavailable - this is normal for non-blocking IO
pass
# Process available output
try:
if server_process.stderr:
line = server_process.stderr.readline()
if line:
line_str = line.rstrip()
console.print(line_str)
last_activity_time = time.time()

# Check for error messages
error_msg = monitor_for_errors(line_str)
if error_msg and error_msg not in error_messages:
console.print(f"[bold red]Error:[/] {error_msg}")
error_messages.append(error_msg)
server_error_detected = True

# If this is a critical error and we have retries left, restart
if "Protocol initialization error" in error_msg and retries_left > 0:
console.print(f"[yellow]Will attempt to restart ({retries_left} retries left)[/]")
# Break out of the loop to trigger a restart
server_process.terminate()
break
except (IOError, OSError):
# Resource temporarily unavailable - this is normal for non-blocking IO
pass
else:
# No streams to read from, just wait a bit
time.sleep(0.5)
Expand Down
4 changes: 2 additions & 2 deletions src/mcpm/router/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
response = Response("Could not parse message", status_code=400)
await response(scope, receive, send)
try:
await writer.send(SessionMessage(message=err))
await writer.send(err)
except (BrokenPipeError, ConnectionError, OSError) as pipe_err:
logger.warning(f"Failed to send error due to pipe issue: {pipe_err}")
return
Expand All @@ -240,7 +240,7 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
logger.warning(f"Connection error when sending message to session {session_id}: {e}")
self._read_stream_writers.pop(session_id, None)
self._session_id_to_identifier.pop(session_id, None)

# Implicitly return None. The original 'return response' is removed.
return

Expand Down
18 changes: 0 additions & 18 deletions tests/test_share.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from src.mcpm.commands.share import (
find_mcp_proxy,
make_non_blocking,
monitor_for_errors,
share,
terminate_process,
Expand All @@ -33,23 +32,6 @@ def test_find_mcp_proxy_not_found(self, monkeypatch):

assert find_mcp_proxy() is None

@patch("fcntl.fcntl")
def test_make_non_blocking(self, mock_fcntl):
"""Test making a file object non-blocking"""
# Create a mock file object
mock_file = Mock()
mock_file.fileno.return_value = 42

# Set up mock fcntl return values
mock_fcntl.return_value = 0

# Call the function
make_non_blocking(mock_file)

# Verify that functions were called correctly
mock_file.fileno.assert_called_once()
assert mock_fcntl.call_count == 2

def test_monitor_for_errors_with_known_error(self):
"""Test error detection with a known error pattern"""
error_line = "Error: RuntimeError: Received request before initialization was complete"
Expand Down