1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
| import subprocess import json import sys import os from mcp.server.fastmcp import FastMCP mcp = FastMCP("PowerShell Toolkit") @mcp.tool() def powershell_command(command: str, timeout: int = 30, encoding: str = 'utf-8'): """ Execute PowerShell command and return the result. Args: command (str): PowerShell command to execute timeout (int): Command timeout in seconds (default: 30) encoding (str): Output encoding (default: utf-8) Returns: dict: Contains success status, output, and error information """ try: powershell_executable = _get_powershell_executable() if sys.platform == "win32": powershell_cmd = [ powershell_executable, '-NoProfile', '-OutputFormat', 'Text', '-Command', f'[Console]::OutputEncoding = [System.Text.Encoding]::UTF8; {command}' ] else: powershell_cmd = [ powershell_executable, '-NoProfile', '-Command', command ] env = os.environ.copy() if sys.platform == "win32": env['PYTHONIOENCODING'] = 'utf-8' result = subprocess.run( powershell_cmd, capture_output=True, text=True, timeout=timeout, encoding=encoding, errors='replace', env=env ) response = { "success": result.returncode == 0, "return_code": result.returncode, "stdout": result.stdout.strip() if result.stdout else "", "stderr": result.stderr.strip() if result.stderr else "", "command": command, "powershell_version": powershell_executable } if response["success"]: if response["stdout"]: return f"✅ Command executed successfully:\n\n{response['stdout']}" else: return "✅ Command executed successfully (no output)" else: error_msg = response["stderr"] if response["stderr"] else "Unknown error" return f"❌ Command failed (Exit Code: {response['return_code']}):\n\n{error_msg}" except subprocess.TimeoutExpired: return f"⏰ Command timed out after {timeout} seconds" except FileNotFoundError: return f"❌ PowerShell not found. Please ensure PowerShell is installed and in PATH.\nTried: {powershell_executable}" except Exception as e: return f"❌ Unexpected error: {str(e)}" def _get_powershell_executable(): """ 检测可用的PowerShell可执行文件 """ if sys.platform == "win32": candidates = [ 'pwsh', 'powershell' ] else: candidates = [ 'pwsh', 'powershell' ] for candidate in candidates: try: subprocess.run([candidate, '-Version'], capture_output=True, timeout=5) return candidate except (FileNotFoundError, subprocess.TimeoutExpired): continue return 'powershell' if sys.platform == "win32" else 'pwsh' @mcp.tool() def get_powershell_info(): """ 获取PowerShell版本和系统信息 Returns: dict: PowerShell和系统信息 """ try: powershell_executable = _get_powershell_executable() version_cmd = [ powershell_executable, '-NoProfile', '-Command', '$PSVersionTable | ConvertTo-Json' ] result = subprocess.run( version_cmd, capture_output=True, text=True, timeout=10, encoding='utf-8', errors='replace' ) if result.returncode == 0: try: version_info = json.loads(result.stdout) return f"✅ PowerShell Information:\n\n{json.dumps(version_info, indent=2, ensure_ascii=False)}" except json.JSONDecodeError: return f"✅ PowerShell Version (raw):\n\n{result.stdout}" else: return f"❌ Failed to get PowerShell version: {result.stderr}" except Exception as e: return f"❌ Error getting PowerShell info: {str(e)}" if __name__ == "__main__": mcp.run()
|