main.py 5.7 KB


  1. import re
  2. import requests
  3. from fastapi import (
  4. FastAPI,
  5. Request,
  6. Depends,
  7. HTTPException,
  8. status,
  9. UploadFile,
  10. File,
  11. Form,
  12. )
  13. from fastapi.middleware.cors import CORSMiddleware
  14. from faster_whisper import WhisperModel
  15. from constants import ERROR_MESSAGES
  16. from utils.utils import (
  17. get_current_user,
  18. get_admin_user,
  19. )
  20. from utils.misc import calculate_sha256
  21. from typing import Optional
  22. from pydantic import BaseModel
  23. from config import AUTOMATIC1111_BASE_URL
  24. app = FastAPI()
  25. app.add_middleware(
  26. CORSMiddleware,
  27. allow_origins=["*"],
  28. allow_credentials=True,
  29. allow_methods=["*"],
  30. allow_headers=["*"],
  31. )
  32. app.state.AUTOMATIC1111_BASE_URL = AUTOMATIC1111_BASE_URL
  33. app.state.ENABLED = app.state.AUTOMATIC1111_BASE_URL != ""
  34. app.state.IMAGE_SIZE = "512x512"
  35. app.state.IMAGE_STEPS = 50
  36. @app.get("/enabled", response_model=bool)
  37. async def get_enable_status(request: Request, user=Depends(get_admin_user)):
  38. return app.state.ENABLED
  39. @app.get("/enabled/toggle", response_model=bool)
  40. async def toggle_enabled(request: Request, user=Depends(get_admin_user)):
  41. try:
  42. r = requests.head(app.state.AUTOMATIC1111_BASE_URL)
  43. app.state.ENABLED = not app.state.ENABLED
  44. return app.state.ENABLED
  45. except Exception as e:
  46. raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
  47. class UrlUpdateForm(BaseModel):
  48. url: str
  49. @app.get("/url")
  50. async def get_openai_url(user=Depends(get_admin_user)):
  51. return {"AUTOMATIC1111_BASE_URL": app.state.AUTOMATIC1111_BASE_URL}
  52. @app.post("/url/update")
  53. async def update_openai_url(form_data: UrlUpdateForm, user=Depends(get_admin_user)):
  54. if form_data.url == "":
  55. app.state.AUTOMATIC1111_BASE_URL = AUTOMATIC1111_BASE_URL
  56. else:
  57. app.state.AUTOMATIC1111_BASE_URL = form_data.url.strip("/")
  58. return {
  59. "AUTOMATIC1111_BASE_URL": app.state.AUTOMATIC1111_BASE_URL,
  60. "status": True,
  61. }
  62. class ImageSizeUpdateForm(BaseModel):
  63. size: str
  64. @app.get("/size")
  65. async def get_image_size(user=Depends(get_admin_user)):
  66. return {"IMAGE_SIZE": app.state.IMAGE_SIZE}
  67. @app.post("/size/update")
  68. async def update_image_size(
  69. form_data: ImageSizeUpdateForm, user=Depends(get_admin_user)
  70. ):
  71. pattern = r"^\d+x\d+$" # Regular expression pattern
  72. if re.match(pattern, form_data.size):
  73. app.state.IMAGE_SIZE = form_data.size
  74. return {
  75. "IMAGE_SIZE": app.state.IMAGE_SIZE,
  76. "status": True,
  77. }
  78. else:
  79. raise HTTPException(
  80. status_code=400,
  81. detail=ERROR_MESSAGES.INCORRECT_FORMAT(" (e.g., 512x512)."),
  82. )
  83. class ImageStepsUpdateForm(BaseModel):
  84. steps: int
  85. @app.get("/steps")
  86. async def get_image_size(user=Depends(get_admin_user)):
  87. return {"IMAGE_STEPS": app.state.IMAGE_STEPS}
  88. @app.post("/steps/update")
  89. async def update_image_size(
  90. form_data: ImageStepsUpdateForm, user=Depends(get_admin_user)
  91. ):
  92. if form_data.steps >= 0:
  93. app.state.IMAGE_STEPS = form_data.steps
  94. return {
  95. "IMAGE_STEPS": app.state.IMAGE_STEPS,
  96. "status": True,
  97. }
  98. else:
  99. raise HTTPException(
  100. status_code=400,
  101. detail=ERROR_MESSAGES.INCORRECT_FORMAT(" (e.g., 50)."),
  102. )
  103. @app.get("/models")
  104. def get_models(user=Depends(get_current_user)):
  105. try:
  106. r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/sd-models")
  107. models = r.json()
  108. return models
  109. except Exception as e:
  110. app.state.ENABLED = False
  111. raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
  112. @app.get("/models/default")
  113. async def get_default_model(user=Depends(get_admin_user)):
  114. try:
  115. r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options")
  116. options = r.json()
  117. return {"model": options["sd_model_checkpoint"]}
  118. except Exception as e:
  119. app.state.ENABLED = False
  120. raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
  121. class UpdateModelForm(BaseModel):
  122. model: str
  123. def set_model_handler(model: str):
  124. r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options")
  125. options = r.json()
  126. if model != options["sd_model_checkpoint"]:
  127. options["sd_model_checkpoint"] = model
  128. r = requests.post(
  129. url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options", json=options
  130. )
  131. return options
  132. @app.post("/models/default/update")
  133. def update_default_model(
  134. form_data: UpdateModelForm,
  135. user=Depends(get_current_user),
  136. ):
  137. return set_model_handler(form_data.model)
  138. class GenerateImageForm(BaseModel):
  139. model: Optional[str] = None
  140. prompt: str
  141. n: int = 1
  142. size: str = "512x512"
  143. negative_prompt: Optional[str] = None
  144. @app.post("/generations")
  145. def generate_image(
  146. form_data: GenerateImageForm,
  147. user=Depends(get_current_user),
  148. ):
  149. print(form_data)
  150. try:
  151. if form_data.model:
  152. set_model_handler(form_data.model)
  153. width, height = tuple(map(int, app.state.IMAGE_SIZE.split("x")))
  154. data = {
  155. "prompt": form_data.prompt,
  156. "batch_size": form_data.n,
  157. "width": width,
  158. "height": height,
  159. }
  160. if app.state.IMAGE_STEPS != None:
  161. data["steps"] = app.state.IMAGE_STEPS
  162. if form_data.negative_prompt != None:
  163. data["negative_prompt"] = form_data.negative_prompt
  164. print(data)
  165. r = requests.post(
  166. url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/txt2img",
  167. json=data,
  168. )
  169. return r.json()
  170. except Exception as e:
  171. print(e)
  172. raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))