prompts.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.utils import decode_token
  7. from utils.misc import get_gravatar_url
  8. from apps.web.internal.db import DB
  9. import json
  10. ####################
  11. # Prompts DB Schema
  12. ####################
  13. class Prompt(Model):
  14. command = CharField(unique=True)
  15. user_id = CharField()
  16. title = TextField()
  17. content = TextField()
  18. timestamp = BigIntegerField()
  19. class Meta:
  20. database = DB
  21. class PromptModel(BaseModel):
  22. command: str
  23. user_id: str
  24. title: str
  25. content: str
  26. timestamp: int # timestamp in epoch
  27. ####################
  28. # Forms
  29. ####################
  30. class PromptForm(BaseModel):
  31. command: str
  32. title: str
  33. content: str
  34. class PromptsTable:
  35. def __init__(self, db):
  36. self.db = db
  37. self.db.create_tables([Prompt])
  38. def insert_new_prompt(
  39. self, user_id: str, form_data: PromptForm
  40. ) -> Optional[PromptModel]:
  41. prompt = PromptModel(
  42. **{
  43. "user_id": user_id,
  44. "command": form_data.command,
  45. "title": form_data.title,
  46. "content": form_data.content,
  47. "timestamp": int(time.time()),
  48. }
  49. )
  50. try:
  51. result = Prompt.create(**prompt.model_dump())
  52. if result:
  53. return prompt
  54. else:
  55. return None
  56. except:
  57. return None
  58. def get_prompt_by_command(self, command: str) -> Optional[PromptModel]:
  59. try:
  60. prompt = Prompt.get(Prompt.command == command)
  61. return PromptModel(**model_to_dict(prompt))
  62. except:
  63. return None
  64. def get_prompts(self) -> List[PromptModel]:
  65. return [
  66. PromptModel(**model_to_dict(prompt))
  67. for prompt in Prompt.select()
  68. # .limit(limit).offset(skip)
  69. ]
  70. def update_prompt_by_command(
  71. self, command: str, form_data: PromptForm
  72. ) -> Optional[PromptModel]:
  73. try:
  74. query = Prompt.update(
  75. title=form_data.title,
  76. content=form_data.content,
  77. timestamp=int(time.time()),
  78. ).where(Prompt.command == command)
  79. query.execute()
  80. prompt = Prompt.get(Prompt.command == command)
  81. return PromptModel(**model_to_dict(prompt))
  82. except:
  83. return None
  84. def delete_prompt_by_command(self, command: str) -> bool:
  85. try:
  86. query = Prompt.delete().where((Prompt.command == command))
  87. query.execute() # Remove the rows, return number of rows removed.
  88. return True
  89. except:
  90. return False
  91. Prompts = PromptsTable(DB)