access_control.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from typing import Optional, Union, List, Dict, Any
  2. from open_webui.models.users import Users, UserModel
  3. from open_webui.models.groups import Groups
  4. from open_webui.config import DEFAULT_USER_PERMISSIONS
  5. import json
  6. def fill_missing_permissions(
  7. permissions: Dict[str, Any], default_permissions: Dict[str, Any]
  8. ) -> Dict[str, Any]:
  9. """
  10. Recursively fills in missing properties in the permissions dictionary
  11. using the default permissions as a template.
  12. """
  13. for key, value in default_permissions.items():
  14. if key not in permissions:
  15. permissions[key] = value
  16. elif isinstance(value, dict) and isinstance(
  17. permissions[key], dict
  18. ): # Both are nested dictionaries
  19. permissions[key] = fill_missing_permissions(permissions[key], value)
  20. return permissions
  21. def get_permissions(
  22. user_id: str,
  23. default_permissions: Dict[str, Any],
  24. ) -> Dict[str, Any]:
  25. """
  26. Get all permissions for a user by combining the permissions of all groups the user is a member of.
  27. If a permission is defined in multiple groups, the most permissive value is used (True > False).
  28. Permissions are nested in a dict with the permission key as the key and a boolean as the value.
  29. """
  30. def combine_permissions(
  31. permissions: Dict[str, Any], group_permissions: Dict[str, Any]
  32. ) -> Dict[str, Any]:
  33. """Combine permissions from multiple groups by taking the most permissive value."""
  34. for key, value in group_permissions.items():
  35. if isinstance(value, dict):
  36. if key not in permissions:
  37. permissions[key] = {}
  38. permissions[key] = combine_permissions(permissions[key], value)
  39. else:
  40. if key not in permissions:
  41. permissions[key] = value
  42. else:
  43. permissions[key] = (
  44. permissions[key] or value
  45. ) # Use the most permissive value (True > False)
  46. return permissions
  47. user_groups = Groups.get_groups_by_member_id(user_id)
  48. # Deep copy default permissions to avoid modifying the original dict
  49. permissions = json.loads(json.dumps(default_permissions))
  50. # Combine permissions from all user groups
  51. for group in user_groups:
  52. group_permissions = group.permissions
  53. permissions = combine_permissions(permissions, group_permissions)
  54. # Ensure all fields from default_permissions are present and filled in
  55. permissions = fill_missing_permissions(permissions, default_permissions)
  56. return permissions
  57. def has_permission(
  58. user_id: str,
  59. permission_key: str,
  60. default_permissions: Dict[str, Any] = {},
  61. ) -> bool:
  62. """
  63. Check if a user has a specific permission by checking the group permissions
  64. and fall back to default permissions if not found in any group.
  65. Permission keys can be hierarchical and separated by dots ('.').
  66. """
  67. def get_permission(permissions: Dict[str, Any], keys: List[str]) -> bool:
  68. """Traverse permissions dict using a list of keys (from dot-split permission_key)."""
  69. for key in keys:
  70. if key not in permissions:
  71. return False # If any part of the hierarchy is missing, deny access
  72. permissions = permissions[key] # Traverse one level deeper
  73. return bool(permissions) # Return the boolean at the final level
  74. permission_hierarchy = permission_key.split(".")
  75. # Retrieve user group permissions
  76. user_groups = Groups.get_groups_by_member_id(user_id)
  77. for group in user_groups:
  78. group_permissions = group.permissions
  79. if get_permission(group_permissions, permission_hierarchy):
  80. return True
  81. # Check default permissions afterward if the group permissions don't allow it
  82. default_permissions = fill_missing_permissions(
  83. default_permissions, DEFAULT_USER_PERMISSIONS
  84. )
  85. return get_permission(default_permissions, permission_hierarchy)
  86. def has_access(
  87. user_id: str,
  88. type: str = "write",
  89. access_control: Optional[dict] = None,
  90. ) -> bool:
  91. if access_control is None:
  92. return type == "read"
  93. user_groups = Groups.get_groups_by_member_id(user_id)
  94. user_group_ids = [group.id for group in user_groups]
  95. permission_access = access_control.get(type, {})
  96. permitted_group_ids = permission_access.get("group_ids", [])
  97. permitted_user_ids = permission_access.get("user_ids", [])
  98. return user_id in permitted_user_ids or any(
  99. group_id in permitted_group_ids for group_id in user_group_ids
  100. )
  101. # Get all users with access to a resource
  102. def get_users_with_access(
  103. type: str = "write", access_control: Optional[dict] = None
  104. ) -> List[UserModel]:
  105. if access_control is None:
  106. return Users.get_users()
  107. permission_access = access_control.get(type, {})
  108. permitted_group_ids = permission_access.get("group_ids", [])
  109. permitted_user_ids = permission_access.get("user_ids", [])
  110. user_ids_with_access = set(permitted_user_ids)
  111. for group_id in permitted_group_ids:
  112. group_user_ids = Groups.get_group_user_ids_by_id(group_id)
  113. if group_user_ids:
  114. user_ids_with_access.update(group_user_ids)
  115. return Users.get_users_by_user_ids(list(user_ids_with_access))