misc.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. import hashlib
  2. import re
  3. import time
  4. import uuid
  5. from datetime import timedelta
  6. from pathlib import Path
  7. from typing import Callable, Optional
  8. def get_messages_content(messages: list[dict]) -> str:
  9. return "\n".join(
  10. [
  11. f"{message['role'].upper()}: {get_content_from_message(message)}"
  12. for message in messages
  13. ]
  14. )
  15. def get_last_user_message_item(messages: list[dict]) -> Optional[dict]:
  16. for message in reversed(messages):
  17. if message["role"] == "user":
  18. return message
  19. return None
  20. def get_content_from_message(message: dict) -> Optional[str]:
  21. if isinstance(message["content"], list):
  22. for item in message["content"]:
  23. if item["type"] == "text":
  24. return item["text"]
  25. else:
  26. return message["content"]
  27. return None
  28. def get_last_user_message(messages: list[dict]) -> Optional[str]:
  29. message = get_last_user_message_item(messages)
  30. if message is None:
  31. return None
  32. return get_content_from_message(message)
  33. def get_last_assistant_message(messages: list[dict]) -> Optional[str]:
  34. for message in reversed(messages):
  35. if message["role"] == "assistant":
  36. return get_content_from_message(message)
  37. return None
  38. def get_system_message(messages: list[dict]) -> Optional[dict]:
  39. for message in messages:
  40. if message["role"] == "system":
  41. return message
  42. return None
  43. def remove_system_message(messages: list[dict]) -> list[dict]:
  44. return [message for message in messages if message["role"] != "system"]
  45. def pop_system_message(messages: list[dict]) -> tuple[Optional[dict], list[dict]]:
  46. return get_system_message(messages), remove_system_message(messages)
  47. def prepend_to_first_user_message_content(
  48. content: str, messages: list[dict]
  49. ) -> list[dict]:
  50. for message in messages:
  51. if message["role"] == "user":
  52. if isinstance(message["content"], list):
  53. for item in message["content"]:
  54. if item["type"] == "text":
  55. item["text"] = f"{content}\n{item['text']}"
  56. else:
  57. message["content"] = f"{content}\n{message['content']}"
  58. break
  59. return messages
  60. def add_or_update_system_message(content: str, messages: list[dict]):
  61. """
  62. Adds a new system message at the beginning of the messages list
  63. or updates the existing system message at the beginning.
  64. :param msg: The message to be added or appended.
  65. :param messages: The list of message dictionaries.
  66. :return: The updated list of message dictionaries.
  67. """
  68. if messages and messages[0].get("role") == "system":
  69. messages[0]["content"] = f"{content}\n{messages[0]['content']}"
  70. else:
  71. # Insert at the beginning
  72. messages.insert(0, {"role": "system", "content": content})
  73. return messages
  74. def openai_chat_message_template(model: str):
  75. return {
  76. "id": f"{model}-{str(uuid.uuid4())}",
  77. "created": int(time.time()),
  78. "model": model,
  79. "choices": [{"index": 0, "logprobs": None, "finish_reason": None}],
  80. }
  81. def openai_chat_chunk_message_template(
  82. model: str, message: Optional[str] = None, usage: Optional[dict] = None
  83. ) -> dict:
  84. template = openai_chat_message_template(model)
  85. template["object"] = "chat.completion.chunk"
  86. if message:
  87. template["choices"][0]["delta"] = {"content": message}
  88. else:
  89. template["choices"][0]["finish_reason"] = "stop"
  90. if usage:
  91. template["usage"] = usage
  92. return template
  93. def openai_chat_completion_message_template(
  94. model: str, message: Optional[str] = None, usage: Optional[dict] = None
  95. ) -> dict:
  96. template = openai_chat_message_template(model)
  97. template["object"] = "chat.completion"
  98. if message is not None:
  99. template["choices"][0]["message"] = {"content": message, "role": "assistant"}
  100. template["choices"][0]["finish_reason"] = "stop"
  101. if usage:
  102. template["usage"] = usage
  103. return template
  104. def get_gravatar_url(email):
  105. # Trim leading and trailing whitespace from
  106. # an email address and force all characters
  107. # to lower case
  108. address = str(email).strip().lower()
  109. # Create a SHA256 hash of the final string
  110. hash_object = hashlib.sha256(address.encode())
  111. hash_hex = hash_object.hexdigest()
  112. # Grab the actual image URL
  113. return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp"
  114. def calculate_sha256(file):
  115. sha256 = hashlib.sha256()
  116. # Read the file in chunks to efficiently handle large files
  117. for chunk in iter(lambda: file.read(8192), b""):
  118. sha256.update(chunk)
  119. return sha256.hexdigest()
  120. def calculate_sha256_string(string):
  121. # Create a new SHA-256 hash object
  122. sha256_hash = hashlib.sha256()
  123. # Update the hash object with the bytes of the input string
  124. sha256_hash.update(string.encode("utf-8"))
  125. # Get the hexadecimal representation of the hash
  126. hashed_string = sha256_hash.hexdigest()
  127. return hashed_string
  128. def validate_email_format(email: str) -> bool:
  129. if email.endswith("@localhost"):
  130. return True
  131. return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email))
  132. def sanitize_filename(file_name):
  133. # Convert to lowercase
  134. lower_case_file_name = file_name.lower()
  135. # Remove special characters using regular expression
  136. sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name)
  137. # Replace spaces with dashes
  138. final_file_name = re.sub(r"\s+", "-", sanitized_file_name)
  139. return final_file_name
  140. def extract_folders_after_data_docs(path):
  141. # Convert the path to a Path object if it's not already
  142. path = Path(path)
  143. # Extract parts of the path
  144. parts = path.parts
  145. # Find the index of '/data/docs' in the path
  146. try:
  147. index_data_docs = parts.index("data") + 1
  148. index_docs = parts.index("docs", index_data_docs) + 1
  149. except ValueError:
  150. return []
  151. # Exclude the filename and accumulate folder names
  152. tags = []
  153. folders = parts[index_docs:-1]
  154. for idx, _ in enumerate(folders):
  155. tags.append("/".join(folders[: idx + 1]))
  156. return tags
  157. def parse_duration(duration: str) -> Optional[timedelta]:
  158. if duration == "-1" or duration == "0":
  159. return None
  160. # Regular expression to find number and unit pairs
  161. pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)"
  162. matches = re.findall(pattern, duration)
  163. if not matches:
  164. raise ValueError("Invalid duration string")
  165. total_duration = timedelta()
  166. for number, _, unit in matches:
  167. number = float(number)
  168. if unit == "ms":
  169. total_duration += timedelta(milliseconds=number)
  170. elif unit == "s":
  171. total_duration += timedelta(seconds=number)
  172. elif unit == "m":
  173. total_duration += timedelta(minutes=number)
  174. elif unit == "h":
  175. total_duration += timedelta(hours=number)
  176. elif unit == "d":
  177. total_duration += timedelta(days=number)
  178. elif unit == "w":
  179. total_duration += timedelta(weeks=number)
  180. return total_duration
  181. def parse_ollama_modelfile(model_text):
  182. parameters_meta = {
  183. "mirostat": int,
  184. "mirostat_eta": float,
  185. "mirostat_tau": float,
  186. "num_ctx": int,
  187. "repeat_last_n": int,
  188. "repeat_penalty": float,
  189. "temperature": float,
  190. "seed": int,
  191. "tfs_z": float,
  192. "num_predict": int,
  193. "top_k": int,
  194. "top_p": float,
  195. "num_keep": int,
  196. "typical_p": float,
  197. "presence_penalty": float,
  198. "frequency_penalty": float,
  199. "penalize_newline": bool,
  200. "numa": bool,
  201. "num_batch": int,
  202. "num_gpu": int,
  203. "main_gpu": int,
  204. "low_vram": bool,
  205. "f16_kv": bool,
  206. "vocab_only": bool,
  207. "use_mmap": bool,
  208. "use_mlock": bool,
  209. "num_thread": int,
  210. }
  211. data = {"base_model_id": None, "params": {}}
  212. # Parse base model
  213. base_model_match = re.search(
  214. r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE
  215. )
  216. if base_model_match:
  217. data["base_model_id"] = base_model_match.group(1)
  218. # Parse template
  219. template_match = re.search(
  220. r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
  221. )
  222. if template_match:
  223. data["params"] = {"template": template_match.group(1).strip()}
  224. # Parse stops
  225. stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE)
  226. if stops:
  227. data["params"]["stop"] = stops
  228. # Parse other parameters from the provided list
  229. for param, param_type in parameters_meta.items():
  230. param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE)
  231. if param_match:
  232. value = param_match.group(1)
  233. try:
  234. if param_type is int:
  235. value = int(value)
  236. elif param_type is float:
  237. value = float(value)
  238. elif param_type is bool:
  239. value = value.lower() == "true"
  240. except Exception as e:
  241. print(e)
  242. continue
  243. data["params"][param] = value
  244. # Parse adapter
  245. adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE)
  246. if adapter_match:
  247. data["params"]["adapter"] = adapter_match.group(1)
  248. # Parse system description
  249. system_desc_match = re.search(
  250. r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
  251. )
  252. system_desc_match_single = re.search(
  253. r"SYSTEM\s+([^\n]+)", model_text, re.IGNORECASE
  254. )
  255. if system_desc_match:
  256. data["params"]["system"] = system_desc_match.group(1).strip()
  257. elif system_desc_match_single:
  258. data["params"]["system"] = system_desc_match_single.group(1).strip()
  259. # Parse messages
  260. messages = []
  261. message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE)
  262. for role, content in message_matches:
  263. messages.append({"role": role, "content": content})
  264. if messages:
  265. data["params"]["messages"] = messages
  266. return data