diff --git a/src/backend/database/database.py b/src/backend/database/database.py index 7f81c1a..87ca120 100644 --- a/src/backend/database/database.py +++ b/src/backend/database/database.py @@ -3,18 +3,13 @@ """ import os -import asyncio -import subprocess -import time -from typing import AsyncGenerator, Optional +from typing import AsyncGenerator from urllib.parse import quote_plus as urlquote -from pathlib import Path from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy.schema import CreateSchema from fastapi import Depends -from alembic.config import Config from .models import Base, SCHEMA_NAME @@ -40,153 +35,22 @@ engine, class_=AsyncSession, expire_on_commit=False ) -async def run_migrations_with_lock(redis_client=None, lock_timeout: int = 120, max_wait_time: int = 300) -> bool: - """ - Run database migrations using Alembic with a Redis distributed lock. - All workers will wait for the migration to complete before proceeding. - - Args: - redis_client: Redis client instance - lock_timeout: How long the lock should be held (seconds) - max_wait_time: Maximum time to wait for migrations to complete (seconds) - - Returns: - bool: True if migrations were run successfully or completed by another instance, False on timeout or error - """ - if redis_client is None: - # Import here to avoid circular imports - from config import get_redis_client - redis_client = get_redis_client() - - # Keys for Redis coordination - lock_name = "alembic_migration_lock" - status_key = "alembic_migration_status" - lock_value = f"instance_{time.time()}" - - # Check if migrations are already completed - migration_status = redis_client.get(status_key) - if migration_status == "completed": - print("Migrations already completed - continuing startup") - return True - - # Try to acquire the lock - non-blocking - lock_acquired = redis_client.set( - lock_name, - lock_value, - nx=True, # Only set if key doesn't exist - ex=lock_timeout # Expiry in seconds - ) - - if lock_acquired: - print("This instance will run migrations") - try: - # Set status to in-progress - redis_client.set(status_key, "in_progress", ex=lock_timeout) - - # Run migrations - success = await run_migrations_subprocess() - - if success: - # Set status to completed with a longer expiry (1 hour) - redis_client.set(status_key, "completed", ex=3600) - print("Migration completed successfully - signaling other instances") - return True - else: - # Set status to failed - redis_client.set(status_key, "failed", ex=3600) - print("Migration failed - signaling other instances") - return False - finally: - # Release the lock only if we're the owner - current_value = redis_client.get(lock_name) - if current_value == lock_value: - redis_client.delete(lock_name) - else: - print("Another instance is running migrations - waiting for completion") - - # Wait for the migration to complete - start_time = time.time() - while time.time() - start_time < max_wait_time: - # Check migration status - status = redis_client.get(status_key) - - if status == "completed": - print("Migrations completed by another instance - continuing startup") - return True - elif status == "failed": - print("Migrations failed in another instance - continuing startup with caution") - return False - elif status is None: - # No status yet, might be a stale lock or not started - # Check if lock exists - if not redis_client.exists(lock_name): - # Lock released but no status - try to acquire the lock ourselves - print("No active migration lock - attempting to acquire") - return await run_migrations_with_lock(redis_client, lock_timeout, max_wait_time) - - # Wait before checking again - await asyncio.sleep(1) - - # Timeout waiting for migration - print(f"Timeout waiting for migrations after {max_wait_time} seconds") - return False - -async def run_migrations_subprocess() -> bool: - """ - Run Alembic migrations using a subprocess - - Returns: - bool: True if migrations were successful, False otherwise - """ - try: - # Get the path to the database directory - db_dir = Path(__file__).parent - - # Create a subprocess to run alembic - process = await asyncio.create_subprocess_exec( - 'alembic', 'upgrade', 'head', - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=str(db_dir) # Run in the database directory - ) - - # Wait for the process to complete with a timeout - try: - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60) - - if process.returncode == 0: - print("Database migrations completed successfully") - if stdout: - print(stdout.decode()) - return True - else: - print(f"Migration failed with error code {process.returncode}") - if stderr: - print(stderr.decode()) - return False - - except asyncio.TimeoutError: - print("Migration timed out after 60 seconds") - # Try to terminate the process - process.terminate() - return False - - except Exception as e: - print(f"Migration failed: {str(e)}") - return False - -async def run_migrations() -> None: - """ - Legacy function to run migrations directly (without lock) - This is kept for backward compatibility - """ - await run_migrations_subprocess() async def init_db() -> None: """Initialize the database with required tables""" - # Only create tables, let Alembic handle schema creation - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) + try: + # Create schema if it doesn't exist + async with engine.begin() as conn: + await conn.execute(CreateSchema(SCHEMA_NAME, if_not_exists=True)) + + # Create tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + except Exception as e: + print(f"Error initializing database: {str(e)}") + raise + async def get_session() -> AsyncGenerator[AsyncSession, None]: """Get a database session""" diff --git a/src/backend/main.py b/src/backend/main.py index c9700a6..6f07ee0 100644 --- a/src/backend/main.py +++ b/src/backend/main.py @@ -19,7 +19,7 @@ from routers.template_pad_router import template_pad_router from routers.app_router import app_router from database.service import TemplatePadService -from database.database import async_session, run_migrations_with_lock +from database.database import async_session # Initialize PostHog if API key is available if POSTHOG_API_KEY: @@ -81,28 +81,8 @@ async def lifespan(_: FastAPI): await init_db() print("Database connection established successfully") - # Run database migrations with Redis lock - # All workers will wait for the migration to complete before proceeding - try: - migration_success = await run_migrations_with_lock( - redis_client=redis_client, - lock_timeout=120, # 2 minutes timeout for the lock - max_wait_time=300 # 5 minutes maximum wait time - ) - - if migration_success: - print("Database migrations completed successfully or already done") - else: - print("Warning: Migrations failed or timed out - proceeding with caution") - except Exception as e: - print(f"Warning: Failed to run migrations: {str(e)}") - - # Check Redis connection - try: - redis_client.ping() - print("Redis connection established successfully") - except Exception as e: - print(f"Warning: Redis connection failed: {str(e)}") + redis_client.ping() + print("Redis connection established successfully") # Load all templates from the templates directory await load_templates()