123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- """Update chat table
- Revision ID: 242a2047eae0
- Revises: 6a39f3d8e55c
- Create Date: 2024-10-09 21:02:35.241684
- """
- from alembic import op
- import sqlalchemy as sa
- from sqlalchemy.sql import table, select, update
- import json
- revision = "242a2047eae0"
- down_revision = "6a39f3d8e55c"
- branch_labels = None
- depends_on = None
- def upgrade():
- conn = op.get_bind()
- inspector = sa.inspect(conn)
- columns = inspector.get_columns("chat")
- column_dict = {col["name"]: col for col in columns}
- chat_column = column_dict.get("chat")
- old_chat_exists = "old_chat" in column_dict
- if chat_column:
- if isinstance(chat_column["type"], sa.Text):
- print("Converting 'chat' column to JSON")
- if old_chat_exists:
- print("Dropping old 'old_chat' column")
- op.drop_column("chat", "old_chat")
- # Step 1: Rename current 'chat' column to 'old_chat'
- print("Renaming 'chat' column to 'old_chat'")
- op.alter_column(
- "chat", "chat", new_column_name="old_chat", existing_type=sa.Text()
- )
- # Step 2: Add new 'chat' column of type JSON
- print("Adding new 'chat' column of type JSON")
- op.add_column("chat", sa.Column("chat", sa.JSON(), nullable=True))
- else:
- # If the column is already JSON, no need to do anything
- pass
- # Step 3: Migrate data from 'old_chat' to 'chat'
- chat_table = table(
- "chat",
- sa.Column("id", sa.String(), primary_key=True),
- sa.Column("old_chat", sa.Text()),
- sa.Column("chat", sa.JSON()),
- )
- # - Selecting all data from the table
- connection = op.get_bind()
- results = connection.execute(select(chat_table.c.id, chat_table.c.old_chat))
- for row in results:
- try:
- # Convert text JSON to actual JSON object, assuming the text is in JSON format
- json_data = json.loads(row.old_chat)
- except json.JSONDecodeError:
- json_data = None # Handle cases where the text cannot be converted to JSON
- connection.execute(
- sa.update(chat_table)
- .where(chat_table.c.id == row.id)
- .values(chat=json_data)
- )
- # Step 4: Drop 'old_chat' column
- print("Dropping 'old_chat' column")
- op.drop_column("chat", "old_chat")
- def downgrade():
- # Step 1: Add 'old_chat' column back as Text
- op.add_column("chat", sa.Column("old_chat", sa.Text(), nullable=True))
- # Step 2: Convert 'chat' JSON data back to text and store in 'old_chat'
- chat_table = table(
- "chat",
- sa.Column("id", sa.String(), primary_key=True),
- sa.Column("chat", sa.JSON()),
- sa.Column("old_chat", sa.Text()),
- )
- connection = op.get_bind()
- results = connection.execute(select(chat_table.c.id, chat_table.c.chat))
- for row in results:
- text_data = json.dumps(row.chat) if row.chat is not None else None
- connection.execute(
- sa.update(chat_table)
- .where(chat_table.c.id == row.id)
- .values(old_chat=text_data)
- )
- # Step 3: Remove the new 'chat' JSON column
- op.drop_column("chat", "chat")
- # Step 4: Rename 'old_chat' back to 'chat'
- op.alter_column("chat", "old_chat", new_column_name="chat", existing_type=sa.Text())
|