tasks.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # tasks.py
  2. import asyncio
  3. from typing import Dict
  4. from uuid import uuid4
  5. # A dictionary to keep track of active tasks
  6. tasks: Dict[str, asyncio.Task] = {}
  7. def cleanup_task(task_id: str):
  8. """
  9. Remove a completed or canceled task from the global `tasks` dictionary.
  10. """
  11. tasks.pop(task_id, None) # Remove the task if it exists
  12. def create_task(coroutine):
  13. """
  14. Create a new asyncio task and add it to the global task dictionary.
  15. """
  16. task_id = str(uuid4()) # Generate a unique ID for the task
  17. task = asyncio.create_task(coroutine) # Create the task
  18. # Add a done callback for cleanup
  19. task.add_done_callback(lambda t: cleanup_task(task_id))
  20. tasks[task_id] = task
  21. return task_id, task
  22. def get_task(task_id: str):
  23. """
  24. Retrieve a task by its task ID.
  25. """
  26. return tasks.get(task_id)
  27. def list_tasks():
  28. """
  29. List all currently active task IDs.
  30. """
  31. return list(tasks.keys())
  32. async def stop_task(task_id: str):
  33. """
  34. Cancel a running task and remove it from the global task list.
  35. """
  36. task = tasks.get(task_id)
  37. if not task:
  38. raise ValueError(f"Task with ID {task_id} not found.")
  39. task.cancel() # Request task cancellation
  40. try:
  41. await task # Wait for the task to handle the cancellation
  42. except asyncio.CancelledError:
  43. # Task successfully canceled
  44. tasks.pop(task_id, None) # Remove it from the dictionary
  45. return {"status": True, "message": f"Task {task_id} successfully stopped."}
  46. return {"status": False, "message": f"Failed to stop task {task_id}."}