code_interpreter.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import asyncio
  2. import json
  3. import uuid
  4. from typing import Optional
  5. import httpx
  6. import websockets
  7. async def execute_code_jupyter(
  8. jupyter_url: str, code: str, token: str = None, password: str = None, timeout: int = 60
  9. ) -> Optional[dict]:
  10. """
  11. Executes Python code in a Jupyter kernel.
  12. Supports authentication with a token or password.
  13. :param jupyter_url: Jupyter server URL (e.g., "http://localhost:8888")
  14. :param code: Code to execute
  15. :param token: Jupyter authentication token (optional)
  16. :param password: Jupyter password (optional)
  17. :param timeout: WebSocket timeout in seconds (default: 10s)
  18. :return: Dictionary with stdout, stderr, and result
  19. - Images are prefixed with "base64:image/png," and separated by newlines if multiple.
  20. """
  21. jupyter_url = jupyter_url.rstrip("/")
  22. client = httpx.AsyncClient(base_url=jupyter_url, timeout=timeout, follow_redirects=True)
  23. headers = {}
  24. # password authentication
  25. if password and not token:
  26. try:
  27. response = await client.get("/login")
  28. response.raise_for_status()
  29. xsrf_token = response.cookies.get("_xsrf")
  30. if not xsrf_token:
  31. raise ValueError("_xsrf token not found")
  32. response = await client.post("/login", data={"_xsrf": xsrf_token, "password": password})
  33. response.raise_for_status()
  34. headers["X-XSRFToken"] = xsrf_token
  35. except Exception as e:
  36. return {"stdout": "", "stderr": f"Authentication Error: {str(e)}", "result": ""}
  37. # token authentication
  38. params = {"token": token} if token else {}
  39. kernel_id = ""
  40. try:
  41. response = await client.post(url="/api/kernels", params=params, headers=headers)
  42. response.raise_for_status()
  43. kernel_id = response.json()["id"]
  44. ws_base = jupyter_url.replace("http", "ws")
  45. websocket_url = f"{ws_base}/api/kernels/{kernel_id}/channels" + (f"?token={token}" if token else "")
  46. ws_headers = {}
  47. if password and not token:
  48. ws_headers = {
  49. "X-XSRFToken": client.cookies.get("_xsrf"),
  50. "Cookie": "; ".join([f"{name}={value}" for name, value in client.cookies.items()]),
  51. }
  52. async with websockets.connect(websocket_url, additional_headers=ws_headers) as ws:
  53. msg_id = str(uuid.uuid4())
  54. await ws.send(
  55. json.dumps(
  56. {
  57. "header": {
  58. "msg_id": msg_id,
  59. "msg_type": "execute_request",
  60. "username": "user",
  61. "session": str(uuid.uuid4()),
  62. "date": "",
  63. "version": "5.3",
  64. },
  65. "parent_header": {},
  66. "metadata": {},
  67. "content": {
  68. "code": code,
  69. "silent": False,
  70. "store_history": True,
  71. "user_expressions": {},
  72. "allow_stdin": False,
  73. "stop_on_error": True,
  74. },
  75. "channel": "shell",
  76. }
  77. )
  78. )
  79. stdout, stderr, result = "", "", []
  80. while True:
  81. try:
  82. message = await asyncio.wait_for(ws.recv(), timeout)
  83. message_data = json.loads(message)
  84. if message_data.get("parent_header", {}).get("msg_id") != msg_id:
  85. continue
  86. msg_type = message_data.get("msg_type")
  87. match msg_type:
  88. case "stream":
  89. if message_data["content"]["name"] == "stdout":
  90. stdout += message_data["content"]["text"]
  91. elif message_data["content"]["name"] == "stderr":
  92. stderr += message_data["content"]["text"]
  93. case "execute_result" | "display_data":
  94. data = message_data["content"]["data"]
  95. if "image/png" in data:
  96. result.append(f"data:image/png;base64,{data['image/png']}")
  97. elif "text/plain" in data:
  98. result.append(data["text/plain"])
  99. case "error":
  100. stderr += "\n".join(message_data["content"]["traceback"])
  101. case "status":
  102. if message_data["content"]["execution_state"] == "idle":
  103. break
  104. except asyncio.TimeoutError:
  105. stderr += "\nExecution timed out."
  106. break
  107. except Exception as e:
  108. return {"stdout": "", "stderr": f"Error: {str(e)}", "result": ""}
  109. finally:
  110. if kernel_id:
  111. await client.delete(f"/api/kernels/{kernel_id}", headers=headers, params=params)
  112. await client.aclose()
  113. return {"stdout": stdout.strip(), "stderr": stderr.strip(), "result": "\n".join(result).strip() if result else ""}