Skip to content

chore: remove migrations from lifespan tasks #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 14 additions & 150 deletions src/backend/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
"""

import os
import asyncio
import subprocess
import time
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator
from urllib.parse import quote_plus as urlquote
from pathlib import Path

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import CreateSchema
from fastapi import Depends
from alembic.config import Config

from .models import Base, SCHEMA_NAME

Expand All @@ -40,153 +35,22 @@
engine, class_=AsyncSession, expire_on_commit=False
)

async def run_migrations_with_lock(redis_client=None, lock_timeout: int = 120, max_wait_time: int = 300) -> bool:
"""
Run database migrations using Alembic with a Redis distributed lock.
All workers will wait for the migration to complete before proceeding.

Args:
redis_client: Redis client instance
lock_timeout: How long the lock should be held (seconds)
max_wait_time: Maximum time to wait for migrations to complete (seconds)

Returns:
bool: True if migrations were run successfully or completed by another instance, False on timeout or error
"""
if redis_client is None:
# Import here to avoid circular imports
from config import get_redis_client
redis_client = get_redis_client()

# Keys for Redis coordination
lock_name = "alembic_migration_lock"
status_key = "alembic_migration_status"
lock_value = f"instance_{time.time()}"

# Check if migrations are already completed
migration_status = redis_client.get(status_key)
if migration_status == "completed":
print("Migrations already completed - continuing startup")
return True

# Try to acquire the lock - non-blocking
lock_acquired = redis_client.set(
lock_name,
lock_value,
nx=True, # Only set if key doesn't exist
ex=lock_timeout # Expiry in seconds
)

if lock_acquired:
print("This instance will run migrations")
try:
# Set status to in-progress
redis_client.set(status_key, "in_progress", ex=lock_timeout)

# Run migrations
success = await run_migrations_subprocess()

if success:
# Set status to completed with a longer expiry (1 hour)
redis_client.set(status_key, "completed", ex=3600)
print("Migration completed successfully - signaling other instances")
return True
else:
# Set status to failed
redis_client.set(status_key, "failed", ex=3600)
print("Migration failed - signaling other instances")
return False
finally:
# Release the lock only if we're the owner
current_value = redis_client.get(lock_name)
if current_value == lock_value:
redis_client.delete(lock_name)
else:
print("Another instance is running migrations - waiting for completion")

# Wait for the migration to complete
start_time = time.time()
while time.time() - start_time < max_wait_time:
# Check migration status
status = redis_client.get(status_key)

if status == "completed":
print("Migrations completed by another instance - continuing startup")
return True
elif status == "failed":
print("Migrations failed in another instance - continuing startup with caution")
return False
elif status is None:
# No status yet, might be a stale lock or not started
# Check if lock exists
if not redis_client.exists(lock_name):
# Lock released but no status - try to acquire the lock ourselves
print("No active migration lock - attempting to acquire")
return await run_migrations_with_lock(redis_client, lock_timeout, max_wait_time)

# Wait before checking again
await asyncio.sleep(1)

# Timeout waiting for migration
print(f"Timeout waiting for migrations after {max_wait_time} seconds")
return False

async def run_migrations_subprocess() -> bool:
"""
Run Alembic migrations using a subprocess

Returns:
bool: True if migrations were successful, False otherwise
"""
try:
# Get the path to the database directory
db_dir = Path(__file__).parent

# Create a subprocess to run alembic
process = await asyncio.create_subprocess_exec(
'alembic', 'upgrade', 'head',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(db_dir) # Run in the database directory
)

# Wait for the process to complete with a timeout
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60)

if process.returncode == 0:
print("Database migrations completed successfully")
if stdout:
print(stdout.decode())
return True
else:
print(f"Migration failed with error code {process.returncode}")
if stderr:
print(stderr.decode())
return False

except asyncio.TimeoutError:
print("Migration timed out after 60 seconds")
# Try to terminate the process
process.terminate()
return False

except Exception as e:
print(f"Migration failed: {str(e)}")
return False

async def run_migrations() -> None:
"""
Legacy function to run migrations directly (without lock)
This is kept for backward compatibility
"""
await run_migrations_subprocess()

async def init_db() -> None:
"""Initialize the database with required tables"""
# Only create tables, let Alembic handle schema creation
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
try:
# Create schema if it doesn't exist
async with engine.begin() as conn:
await conn.execute(CreateSchema(SCHEMA_NAME, if_not_exists=True))

# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

except Exception as e:
print(f"Error initializing database: {str(e)}")
raise


async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Get a database session"""
Expand Down
26 changes: 3 additions & 23 deletions src/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from routers.template_pad_router import template_pad_router
from routers.app_router import app_router
from database.service import TemplatePadService
from database.database import async_session, run_migrations_with_lock
from database.database import async_session

# Initialize PostHog if API key is available
if POSTHOG_API_KEY:
Expand Down Expand Up @@ -81,28 +81,8 @@ async def lifespan(_: FastAPI):
await init_db()
print("Database connection established successfully")

# Run database migrations with Redis lock
# All workers will wait for the migration to complete before proceeding
try:
migration_success = await run_migrations_with_lock(
redis_client=redis_client,
lock_timeout=120, # 2 minutes timeout for the lock
max_wait_time=300 # 5 minutes maximum wait time
)

if migration_success:
print("Database migrations completed successfully or already done")
else:
print("Warning: Migrations failed or timed out - proceeding with caution")
except Exception as e:
print(f"Warning: Failed to run migrations: {str(e)}")

# Check Redis connection
try:
redis_client.ping()
print("Redis connection established successfully")
except Exception as e:
print(f"Warning: Redis connection failed: {str(e)}")
redis_client.ping()
print("Redis connection established successfully")

# Load all templates from the templates directory
await load_templates()
Expand Down