prompts.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.utils import decode_token
  7. from utils.misc import get_gravatar_url
  8. from apps.web.internal.db import DB
  9. import json
  10. ####################
  11. # Prompts DB Schema
  12. ####################
  13. class Prompt(Model):
  14. command = CharField(unique=True)
  15. user_id = CharField()
  16. title = CharField()
  17. content = TextField()
  18. timestamp = DateField()
  19. class Meta:
  20. database = DB
  21. class PromptModel(BaseModel):
  22. command: str
  23. user_id: str
  24. title: str
  25. content: str
  26. timestamp: int # timestamp in epoch
  27. ####################
  28. # Forms
  29. ####################
  30. class PromptForm(BaseModel):
  31. command: str
  32. title: str
  33. content: str
  34. class PromptsTable:
  35. def __init__(self, db):
  36. self.db = db
  37. self.db.create_tables([Prompt])
  38. def insert_new_prompt(self, user_id: str,
  39. form_data: PromptForm) -> Optional[PromptModel]:
  40. prompt = PromptModel(
  41. **{
  42. "user_id": user_id,
  43. "command": form_data.command,
  44. "title": form_data.title,
  45. "content": form_data.content,
  46. "timestamp": int(time.time()),
  47. })
  48. try:
  49. result = Prompt.create(**prompt.model_dump())
  50. if result:
  51. return prompt
  52. else:
  53. return None
  54. except:
  55. return None
  56. def get_prompt_by_command(self, command: str) -> Optional[PromptModel]:
  57. try:
  58. prompt = Prompt.get(Prompt.command == command)
  59. return PromptModel(**model_to_dict(prompt))
  60. except:
  61. return None
  62. def get_prompts(self) -> List[PromptModel]:
  63. return [
  64. PromptModel(**model_to_dict(prompt)) for prompt in Prompt.select()
  65. # .limit(limit).offset(skip)
  66. ]
  67. def update_prompt_by_command(
  68. self, command: str,
  69. form_data: PromptForm) -> Optional[PromptModel]:
  70. try:
  71. query = Prompt.update(
  72. title=form_data.title,
  73. content=form_data.content,
  74. timestamp=int(time.time()),
  75. ).where(Prompt.command == command)
  76. query.execute()
  77. prompt = Prompt.get(Prompt.command == command)
  78. return PromptModel(**model_to_dict(prompt))
  79. except:
  80. return None
  81. def delete_prompt_by_command(self, command: str) -> bool:
  82. try:
  83. query = Prompt.delete().where((Prompt.command == command))
  84. query.execute() # Remove the rows, return number of rows removed.
  85. return True
  86. except:
  87. return False
  88. Prompts = PromptsTable(DB)