misc.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from pathlib import Path
  2. import hashlib
  3. import json
  4. import re
  5. from datetime import timedelta
  6. from typing import Optional, List, Tuple
  7. def get_last_user_message(messages: List[dict]) -> str:
  8. for message in reversed(messages):
  9. if message["role"] == "user":
  10. if isinstance(message["content"], list):
  11. for item in message["content"]:
  12. if item["type"] == "text":
  13. return item["text"]
  14. return message["content"]
  15. return None
  16. def get_last_assistant_message(messages: List[dict]) -> str:
  17. for message in reversed(messages):
  18. if message["role"] == "assistant":
  19. if isinstance(message["content"], list):
  20. for item in message["content"]:
  21. if item["type"] == "text":
  22. return item["text"]
  23. return message["content"]
  24. return None
  25. def get_system_message(messages: List[dict]) -> dict:
  26. for message in messages:
  27. if message["role"] == "system":
  28. return message
  29. return None
  30. def remove_system_message(messages: List[dict]) -> List[dict]:
  31. return [message for message in messages if message["role"] != "system"]
  32. def pop_system_message(messages: List[dict]) -> Tuple[dict, List[dict]]:
  33. return get_system_message(messages), remove_system_message(messages)
  34. def add_or_update_system_message(content: str, messages: List[dict]):
  35. """
  36. Adds a new system message at the beginning of the messages list
  37. or updates the existing system message at the beginning.
  38. :param msg: The message to be added or appended.
  39. :param messages: The list of message dictionaries.
  40. :return: The updated list of message dictionaries.
  41. """
  42. if messages and messages[0].get("role") == "system":
  43. messages[0]["content"] += f"{content}\n{messages[0]['content']}"
  44. else:
  45. # Insert at the beginning
  46. messages.insert(0, {"role": "system", "content": content})
  47. return messages
  48. def get_gravatar_url(email):
  49. # Trim leading and trailing whitespace from
  50. # an email address and force all characters
  51. # to lower case
  52. address = str(email).strip().lower()
  53. # Create a SHA256 hash of the final string
  54. hash_object = hashlib.sha256(address.encode())
  55. hash_hex = hash_object.hexdigest()
  56. # Grab the actual image URL
  57. return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp"
  58. def calculate_sha256(file):
  59. sha256 = hashlib.sha256()
  60. # Read the file in chunks to efficiently handle large files
  61. for chunk in iter(lambda: file.read(8192), b""):
  62. sha256.update(chunk)
  63. return sha256.hexdigest()
  64. def calculate_sha256_string(string):
  65. # Create a new SHA-256 hash object
  66. sha256_hash = hashlib.sha256()
  67. # Update the hash object with the bytes of the input string
  68. sha256_hash.update(string.encode("utf-8"))
  69. # Get the hexadecimal representation of the hash
  70. hashed_string = sha256_hash.hexdigest()
  71. return hashed_string
  72. def validate_email_format(email: str) -> bool:
  73. if email.endswith("@localhost"):
  74. return True
  75. return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email))
  76. def sanitize_filename(file_name):
  77. # Convert to lowercase
  78. lower_case_file_name = file_name.lower()
  79. # Remove special characters using regular expression
  80. sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name)
  81. # Replace spaces with dashes
  82. final_file_name = re.sub(r"\s+", "-", sanitized_file_name)
  83. return final_file_name
  84. def extract_folders_after_data_docs(path):
  85. # Convert the path to a Path object if it's not already
  86. path = Path(path)
  87. # Extract parts of the path
  88. parts = path.parts
  89. # Find the index of '/data/docs' in the path
  90. try:
  91. index_data_docs = parts.index("data") + 1
  92. index_docs = parts.index("docs", index_data_docs) + 1
  93. except ValueError:
  94. return []
  95. # Exclude the filename and accumulate folder names
  96. tags = []
  97. folders = parts[index_docs:-1]
  98. for idx, part in enumerate(folders):
  99. tags.append("/".join(folders[: idx + 1]))
  100. return tags
  101. def parse_duration(duration: str) -> Optional[timedelta]:
  102. if duration == "-1" or duration == "0":
  103. return None
  104. # Regular expression to find number and unit pairs
  105. pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)"
  106. matches = re.findall(pattern, duration)
  107. if not matches:
  108. raise ValueError("Invalid duration string")
  109. total_duration = timedelta()
  110. for number, _, unit in matches:
  111. number = float(number)
  112. if unit == "ms":
  113. total_duration += timedelta(milliseconds=number)
  114. elif unit == "s":
  115. total_duration += timedelta(seconds=number)
  116. elif unit == "m":
  117. total_duration += timedelta(minutes=number)
  118. elif unit == "h":
  119. total_duration += timedelta(hours=number)
  120. elif unit == "d":
  121. total_duration += timedelta(days=number)
  122. elif unit == "w":
  123. total_duration += timedelta(weeks=number)
  124. return total_duration
  125. def parse_ollama_modelfile(model_text):
  126. parameters_meta = {
  127. "mirostat": int,
  128. "mirostat_eta": float,
  129. "mirostat_tau": float,
  130. "num_ctx": int,
  131. "repeat_last_n": int,
  132. "repeat_penalty": float,
  133. "temperature": float,
  134. "seed": int,
  135. "tfs_z": float,
  136. "num_predict": int,
  137. "top_k": int,
  138. "top_p": float,
  139. "num_keep": int,
  140. "typical_p": float,
  141. "presence_penalty": float,
  142. "frequency_penalty": float,
  143. "penalize_newline": bool,
  144. "numa": bool,
  145. "num_batch": int,
  146. "num_gpu": int,
  147. "main_gpu": int,
  148. "low_vram": bool,
  149. "f16_kv": bool,
  150. "vocab_only": bool,
  151. "use_mmap": bool,
  152. "use_mlock": bool,
  153. "num_thread": int,
  154. }
  155. data = {"base_model_id": None, "params": {}}
  156. # Parse base model
  157. base_model_match = re.search(
  158. r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE
  159. )
  160. if base_model_match:
  161. data["base_model_id"] = base_model_match.group(1)
  162. # Parse template
  163. template_match = re.search(
  164. r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
  165. )
  166. if template_match:
  167. data["params"] = {"template": template_match.group(1).strip()}
  168. # Parse stops
  169. stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE)
  170. if stops:
  171. data["params"]["stop"] = stops
  172. # Parse other parameters from the provided list
  173. for param, param_type in parameters_meta.items():
  174. param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE)
  175. if param_match:
  176. value = param_match.group(1)
  177. try:
  178. if param_type == int:
  179. value = int(value)
  180. elif param_type == float:
  181. value = float(value)
  182. elif param_type == bool:
  183. value = value.lower() == "true"
  184. except Exception as e:
  185. print(e)
  186. continue
  187. data["params"][param] = value
  188. # Parse adapter
  189. adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE)
  190. if adapter_match:
  191. data["params"]["adapter"] = adapter_match.group(1)
  192. # Parse system description
  193. system_desc_match = re.search(
  194. r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
  195. )
  196. system_desc_match_single = re.search(
  197. r"SYSTEM\s+([^\n]+)", model_text, re.IGNORECASE
  198. )
  199. if system_desc_match:
  200. data["params"]["system"] = system_desc_match.group(1).strip()
  201. elif system_desc_match_single:
  202. data["params"]["system"] = system_desc_match_single.group(1).strip()
  203. # Parse messages
  204. messages = []
  205. message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE)
  206. for role, content in message_matches:
  207. messages.append({"role": role, "content": content})
  208. if messages:
  209. data["params"]["messages"] = messages
  210. return data