misc.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from pathlib import Path
  2. import hashlib
  3. import json
  4. import re
  5. from datetime import timedelta
  6. from typing import Optional
  7. def get_gravatar_url(email):
  8. # Trim leading and trailing whitespace from
  9. # an email address and force all characters
  10. # to lower case
  11. address = str(email).strip().lower()
  12. # Create a SHA256 hash of the final string
  13. hash_object = hashlib.sha256(address.encode())
  14. hash_hex = hash_object.hexdigest()
  15. # Grab the actual image URL
  16. return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp"
  17. def calculate_sha256(file):
  18. sha256 = hashlib.sha256()
  19. # Read the file in chunks to efficiently handle large files
  20. for chunk in iter(lambda: file.read(8192), b""):
  21. sha256.update(chunk)
  22. return sha256.hexdigest()
  23. def calculate_sha256_string(string):
  24. # Create a new SHA-256 hash object
  25. sha256_hash = hashlib.sha256()
  26. # Update the hash object with the bytes of the input string
  27. sha256_hash.update(string.encode("utf-8"))
  28. # Get the hexadecimal representation of the hash
  29. hashed_string = sha256_hash.hexdigest()
  30. return hashed_string
  31. def validate_email_format(email: str) -> bool:
  32. if email.endswith("@localhost"):
  33. return True
  34. return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email))
  35. def sanitize_filename(file_name):
  36. # Convert to lowercase
  37. lower_case_file_name = file_name.lower()
  38. # Remove special characters using regular expression
  39. sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name)
  40. # Replace spaces with dashes
  41. final_file_name = re.sub(r"\s+", "-", sanitized_file_name)
  42. return final_file_name
  43. def extract_folders_after_data_docs(path):
  44. # Convert the path to a Path object if it's not already
  45. path = Path(path)
  46. # Extract parts of the path
  47. parts = path.parts
  48. # Find the index of '/data/docs' in the path
  49. try:
  50. index_data_docs = parts.index("data") + 1
  51. index_docs = parts.index("docs", index_data_docs) + 1
  52. except ValueError:
  53. return []
  54. # Exclude the filename and accumulate folder names
  55. tags = []
  56. folders = parts[index_docs:-1]
  57. for idx, part in enumerate(folders):
  58. tags.append("/".join(folders[: idx + 1]))
  59. return tags
  60. def parse_duration(duration: str) -> Optional[timedelta]:
  61. if duration == "-1" or duration == "0":
  62. return None
  63. # Regular expression to find number and unit pairs
  64. pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)"
  65. matches = re.findall(pattern, duration)
  66. if not matches:
  67. raise ValueError("Invalid duration string")
  68. total_duration = timedelta()
  69. for number, _, unit in matches:
  70. number = float(number)
  71. if unit == "ms":
  72. total_duration += timedelta(milliseconds=number)
  73. elif unit == "s":
  74. total_duration += timedelta(seconds=number)
  75. elif unit == "m":
  76. total_duration += timedelta(minutes=number)
  77. elif unit == "h":
  78. total_duration += timedelta(hours=number)
  79. elif unit == "d":
  80. total_duration += timedelta(days=number)
  81. elif unit == "w":
  82. total_duration += timedelta(weeks=number)
  83. return total_duration
  84. def parse_ollama_modelfile(model_text):
  85. parameters_meta = {
  86. "mirostat": int,
  87. "mirostat_eta": float,
  88. "mirostat_tau": float,
  89. "num_ctx": int,
  90. "repeat_last_n": int,
  91. "repeat_penalty": float,
  92. "temperature": float,
  93. "seed": int,
  94. "tfs_z": float,
  95. "num_predict": int,
  96. "top_k": int,
  97. "top_p": float,
  98. "num_keep": int,
  99. "typical_p": float,
  100. "presence_penalty": float,
  101. "frequency_penalty": float,
  102. "penalize_newline": bool,
  103. "numa": bool,
  104. "num_batch": int,
  105. "num_gpu": int,
  106. "main_gpu": int,
  107. "low_vram": bool,
  108. "f16_kv": bool,
  109. "vocab_only": bool,
  110. "use_mmap": bool,
  111. "use_mlock": bool,
  112. "num_thread": int,
  113. }
  114. data = {"base_model_id": None, "params": {}}
  115. # Parse base model
  116. base_model_match = re.search(
  117. r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE
  118. )
  119. if base_model_match:
  120. data["base_model_id"] = base_model_match.group(1)
  121. # Parse template
  122. template_match = re.search(
  123. r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
  124. )
  125. if template_match:
  126. data["params"] = {"template": template_match.group(1).strip()}
  127. # Parse stops
  128. stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE)
  129. if stops:
  130. data["params"]["stop"] = stops
  131. # Parse other parameters from the provided list
  132. for param, param_type in parameters_meta.items():
  133. param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE)
  134. if param_match:
  135. value = param_match.group(1)
  136. try:
  137. if param_type == int:
  138. value = int(value)
  139. elif param_type == float:
  140. value = float(value)
  141. elif param_type == bool:
  142. value = value.lower() == "true"
  143. except Exception as e:
  144. print(e)
  145. continue
  146. data["params"][param] = value
  147. # Parse adapter
  148. adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE)
  149. if adapter_match:
  150. data["params"]["adapter"] = adapter_match.group(1)
  151. # Parse system description
  152. system_desc_match = re.search(
  153. r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
  154. )
  155. if system_desc_match:
  156. data["params"]["system"] = system_desc_match.group(1).strip()
  157. # Parse messages
  158. messages = []
  159. message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE)
  160. for role, content in message_matches:
  161. messages.append({"role": role, "content": content})
  162. if messages:
  163. data["params"]["messages"] = messages
  164. return data