code_interpreter.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import asyncio
  2. import json
  3. import uuid
  4. import websockets
  5. import requests
  6. from urllib.parse import urljoin
  7. async def execute_code_jupyter(
  8. jupyter_url, code, token=None, password=None, timeout=10
  9. ):
  10. """
  11. Executes Python code in a Jupyter kernel.
  12. Supports authentication with a token or password.
  13. :param jupyter_url: Jupyter server URL (e.g., "http://localhost:8888")
  14. :param code: Code to execute
  15. :param token: Jupyter authentication token (optional)
  16. :param password: Jupyter password (optional)
  17. :param timeout: WebSocket timeout in seconds (default: 10s)
  18. :return: Dictionary with stdout, stderr, and result
  19. """
  20. session = requests.Session() # Maintain cookies
  21. headers = {} # Headers for requests
  22. # Authenticate using password
  23. if password and not token:
  24. try:
  25. login_url = urljoin(jupyter_url, "/login")
  26. response = session.get(login_url)
  27. response.raise_for_status()
  28. # Retrieve `_xsrf` token
  29. xsrf_token = session.cookies.get("_xsrf")
  30. if not xsrf_token:
  31. raise ValueError("Failed to fetch _xsrf token")
  32. # Send login request
  33. login_data = {"_xsrf": xsrf_token, "password": password}
  34. login_response = session.post(
  35. login_url, data=login_data, cookies=session.cookies
  36. )
  37. login_response.raise_for_status()
  38. # Update headers with `_xsrf`
  39. headers["X-XSRFToken"] = xsrf_token
  40. except Exception as e:
  41. return {
  42. "stdout": "",
  43. "stderr": f"Authentication Error: {str(e)}",
  44. "result": "",
  45. }
  46. # Construct API URLs with authentication token if provided
  47. params = f"?token={token}" if token else ""
  48. kernel_url = urljoin(jupyter_url, f"/api/kernels{params}")
  49. try:
  50. # Include cookies if authenticating with password
  51. response = session.post(kernel_url, headers=headers, cookies=session.cookies)
  52. response.raise_for_status()
  53. kernel_id = response.json()["id"]
  54. # Construct WebSocket URL
  55. websocket_url = urljoin(
  56. jupyter_url.replace("http", "ws"),
  57. f"/api/kernels/{kernel_id}/channels{params}",
  58. )
  59. # **IMPORTANT:** Include authentication cookies for WebSockets
  60. ws_headers = {}
  61. if password and not token:
  62. ws_headers["X-XSRFToken"] = session.cookies.get("_xsrf")
  63. cookies = {name: value for name, value in session.cookies.items()}
  64. ws_headers["Cookie"] = "; ".join(
  65. [f"{name}={value}" for name, value in cookies.items()]
  66. )
  67. # Connect to the WebSocket
  68. async with websockets.connect(
  69. websocket_url, additional_headers=ws_headers
  70. ) as ws:
  71. msg_id = str(uuid.uuid4())
  72. # Send execution request
  73. execute_request = {
  74. "header": {
  75. "msg_id": msg_id,
  76. "msg_type": "execute_request",
  77. "username": "user",
  78. "session": str(uuid.uuid4()),
  79. "date": "",
  80. "version": "5.3",
  81. },
  82. "parent_header": {},
  83. "metadata": {},
  84. "content": {
  85. "code": code,
  86. "silent": False,
  87. "store_history": True,
  88. "user_expressions": {},
  89. "allow_stdin": False,
  90. "stop_on_error": True,
  91. },
  92. "channel": "shell",
  93. }
  94. await ws.send(json.dumps(execute_request))
  95. # Collect execution results
  96. stdout, stderr, result = "", "", None
  97. while True:
  98. try:
  99. message = await asyncio.wait_for(ws.recv(), timeout)
  100. message_data = json.loads(message)
  101. if message_data.get("parent_header", {}).get("msg_id") == msg_id:
  102. msg_type = message_data.get("msg_type")
  103. if msg_type == "stream":
  104. if message_data["content"]["name"] == "stdout":
  105. stdout += message_data["content"]["text"]
  106. elif message_data["content"]["name"] == "stderr":
  107. stderr += message_data["content"]["text"]
  108. elif msg_type in ("execute_result", "display_data"):
  109. result = message_data["content"]["data"].get(
  110. "text/plain", ""
  111. )
  112. elif msg_type == "error":
  113. stderr += "\n".join(message_data["content"]["traceback"])
  114. elif (
  115. msg_type == "status"
  116. and message_data["content"]["execution_state"] == "idle"
  117. ):
  118. break
  119. except asyncio.TimeoutError:
  120. stderr += "\nExecution timed out."
  121. break
  122. except Exception as e:
  123. return {"stdout": "", "stderr": f"Error: {str(e)}", "result": ""}
  124. finally:
  125. # Shutdown the kernel
  126. if kernel_id:
  127. requests.delete(
  128. f"{kernel_url}/{kernel_id}", headers=headers, cookies=session.cookies
  129. )
  130. return {
  131. "stdout": stdout.strip(),
  132. "stderr": stderr.strip(),
  133. "result": result.strip() if result else "",
  134. }
  135. # Example Usage
  136. # asyncio.run(execute_code_jupyter("http://localhost:8888", "print('Hello, world!')", token="your-token"))
  137. # asyncio.run(execute_code_jupyter("http://localhost:8888", "print('Hello, world!')", password="your-password"))