From 017fd8eef79c9cc77e0dd80cf32c7e310f2403ba Mon Sep 17 00:00:00 2001 From: David Anyatonwu Date: Sat, 25 Jan 2025 09:10:59 +0100 Subject: [PATCH 1/4] feat: implement chunked database dumps with progress tracking and R2 storage --- README.md | 62 +++++++++ src/do.ts | 144 ++++++++++++++------ src/export/chunked-dump.ts | 271 +++++++++++++++++++++++++++++++++++++ src/handler.ts | 35 +++++ src/types.ts | 17 ++- wrangler.toml | 6 + 6 files changed, 490 insertions(+), 45 deletions(-) create mode 100644 src/export/chunked-dump.ts diff --git a/README.md b/README.md index 1bbf96d..bc7f5bb 100644 --- a/README.md +++ b/README.md @@ -230,6 +230,8 @@ window.onload = connectWebSocket

SQL Dump

You can request a `database_dump.sql` file that exports your database schema and data into a single file. +For small databases (< 100MB), you can use the direct download endpoint: +
 
 curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
@@ -238,6 +240,66 @@ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
 
 
+For large databases, use the chunked dump endpoint which processes the dump in the background: + +1. Start the dump: +
+ 
+ curl --location --request POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/chunked' \
+ --header 'Authorization: Bearer ABC123'
+ 
+ 
+ +Response: + +```json +{ + "message": "Database dump started", + "dumpId": "123e4567-e89b-12d3-a456-426614174000", + "status": "in_progress", + "downloadUrl": "https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/123e4567-e89b-12d3-a456-426614174000" +} +``` + +2. Check dump status: +
+ 
+ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/123e4567-e89b-12d3-a456-426614174000/status' \
+ --header 'Authorization: Bearer ABC123'
+ 
+ 
+ +Response: + +```json +{ + "status": "in_progress", + "progress": { + "currentTable": "users", + "processedTables": 2, + "totalTables": 5 + } +} +``` + +3. Download the completed dump: +
+ 
+ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/123e4567-e89b-12d3-a456-426614174000' \
+ --header 'Authorization: Bearer ABC123' \
+ --output database_dump.sql
+ 
+ 
+ +The chunked dump endpoint: + +- Processes large databases in chunks to avoid memory issues +- Stores the dump file in R2 storage +- Takes "breathing intervals" to prevent database locking +- Supports databases up to 10GB in size +- Provides progress tracking +- Returns a download URL when complete +

JSON Data Export

 
diff --git a/src/do.ts b/src/do.ts
index d23939e..b88c7ff 100644
--- a/src/do.ts
+++ b/src/do.ts
@@ -1,4 +1,7 @@
 import { DurableObject } from 'cloudflare:workers'
+import { processDumpChunk } from './export/chunked-dump'
+import { StarbaseDBConfiguration } from './handler'
+import { DataSource } from './types'
 
 export class StarbaseDBDurableObject extends DurableObject {
     // Durable storage for the SQL database
@@ -6,6 +9,10 @@ export class StarbaseDBDurableObject extends DurableObject {
     public storage: DurableObjectStorage
     // Map of WebSocket connections to their corresponding session IDs
     public connections = new Map()
+    // Configuration for the database instance
+    private config: StarbaseDBConfiguration
+    // Environment variables
+    protected env: Env
 
     /**
      * The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
@@ -14,56 +21,32 @@ export class StarbaseDBDurableObject extends DurableObject {
      * @param ctx - The interface for interacting with Durable Object state
      * @param env - The interface to reference bindings declared in wrangler.toml
      */
-    constructor(ctx: DurableObjectState, env: Env) {
-        super(ctx, env)
-        this.sql = ctx.storage.sql
-        this.storage = ctx.storage
-
-        // Install default necessary `tmp_` tables for various features here.
-        const cacheStatement = `
-        CREATE TABLE IF NOT EXISTS tmp_cache (
-            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
-            "timestamp" REAL NOT NULL,
-            "ttl" INTEGER NOT NULL,
-            "query" TEXT UNIQUE NOT NULL,
-            "results" TEXT
-        );`
-
-        const allowlistStatement = `
-        CREATE TABLE IF NOT EXISTS tmp_allowlist_queries (
-            id INTEGER PRIMARY KEY AUTOINCREMENT,
-            sql_statement TEXT NOT NULL,
-            source TEXT DEFAULT 'external'
-        )`
-        const allowlistRejectedStatement = `
-        CREATE TABLE IF NOT EXISTS tmp_allowlist_rejections (
-            id INTEGER PRIMARY KEY AUTOINCREMENT,
-            sql_statement TEXT NOT NULL,
-            source TEXT DEFAULT 'external',
-            created_at TEXT DEFAULT (datetime('now'))
-        )`
-
-        const rlsStatement = `
-        CREATE TABLE IF NOT EXISTS tmp_rls_policies (
-            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
-            "actions" TEXT NOT NULL CHECK(actions IN ('SELECT', 'UPDATE', 'INSERT', 'DELETE')),
-            "schema" TEXT,
-            "table" TEXT NOT NULL,
-            "column" TEXT NOT NULL,
-            "value" TEXT NOT NULL,
-            "value_type" TEXT NOT NULL DEFAULT 'string',
-            "operator" TEXT DEFAULT '='
-        )`
+    constructor(state: DurableObjectState, env: Env) {
+        super(state, env)
+        this.storage = state.storage
+        this.sql = state.storage.sql
+        this.env = env
+
+        // Initialize configuration
+        this.config = {
+            role: 'admin',
+            features: {
+                import: true,
+                export: true,
+                allowlist: Boolean(env.ENABLE_ALLOWLIST),
+                rls: Boolean(env.ENABLE_RLS),
+            },
+        }
 
-        this.executeQuery({ sql: cacheStatement })
-        this.executeQuery({ sql: allowlistStatement })
-        this.executeQuery({ sql: allowlistRejectedStatement })
-        this.executeQuery({ sql: rlsStatement })
+        // Initialize tables
+        this.initializeTables()
     }
 
     init() {
         return {
             executeQuery: this.executeQuery.bind(this),
+            storage: this.storage,
+            setAlarm: (timestamp: number) => this.storage.setAlarm(timestamp),
         }
     }
 
@@ -219,4 +202,77 @@ export class StarbaseDBDurableObject extends DurableObject {
             throw error
         }
     }
+
+    private convertToStubArrayBuffer(value: ArrayBuffer): {
+        byteLength: number
+        slice: (begin: number, end?: number) => Promise
+        [Symbol.toStringTag]: string
+    } {
+        return {
+            byteLength: value.byteLength,
+            slice: async (begin: number, end?: number) =>
+                value.slice(begin, end),
+            [Symbol.toStringTag]: 'ArrayBuffer',
+        }
+    }
+
+    async alarm(): Promise {
+        // Check if this is a dump processing alarm
+        const dumpProgress = await this.storage.get('dump_progress')
+        if (dumpProgress) {
+            const dataSource: DataSource = {
+                rpc: {
+                    executeQuery: this.executeQuery.bind(this),
+                    storage: this.storage,
+                    setAlarm: (timestamp: number) =>
+                        this.storage.setAlarm(timestamp),
+                },
+                source: 'internal',
+            }
+            await processDumpChunk(dataSource, this.config, this.env)
+        }
+    }
+
+    private async initializeTables() {
+        // Install default necessary `tmp_` tables for various features here.
+        const cacheStatement = `
+        CREATE TABLE IF NOT EXISTS tmp_cache (
+            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+            "timestamp" REAL NOT NULL,
+            "ttl" INTEGER NOT NULL,
+            "query" TEXT UNIQUE NOT NULL,
+            "results" TEXT
+        );`
+
+        const allowlistStatement = `
+        CREATE TABLE IF NOT EXISTS tmp_allowlist_queries (
+            id INTEGER PRIMARY KEY AUTOINCREMENT,
+            sql_statement TEXT NOT NULL,
+            source TEXT DEFAULT 'external'
+        )`
+        const allowlistRejectedStatement = `
+        CREATE TABLE IF NOT EXISTS tmp_allowlist_rejections (
+            id INTEGER PRIMARY KEY AUTOINCREMENT,
+            sql_statement TEXT NOT NULL,
+            source TEXT DEFAULT 'external',
+            created_at TEXT DEFAULT (datetime('now'))
+        )`
+
+        const rlsStatement = `
+        CREATE TABLE IF NOT EXISTS tmp_rls_policies (
+            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+            "actions" TEXT NOT NULL CHECK(actions IN ('SELECT', 'UPDATE', 'INSERT', 'DELETE')),
+            "schema" TEXT,
+            "table" TEXT NOT NULL,
+            "column" TEXT NOT NULL,
+            "value" TEXT NOT NULL,
+            "value_type" TEXT NOT NULL DEFAULT 'string',
+            "operator" TEXT DEFAULT '='
+        )`
+
+        await this.executeQuery({ sql: cacheStatement })
+        await this.executeQuery({ sql: allowlistStatement })
+        await this.executeQuery({ sql: allowlistRejectedStatement })
+        await this.executeQuery({ sql: rlsStatement })
+    }
 }
diff --git a/src/export/chunked-dump.ts b/src/export/chunked-dump.ts
new file mode 100644
index 0000000..8aec031
--- /dev/null
+++ b/src/export/chunked-dump.ts
@@ -0,0 +1,271 @@
+import { executeOperation } from '.'
+import { StarbaseDBConfiguration } from '../handler'
+import { DataSource } from '../types'
+import { createResponse } from '../utils'
+
+interface DumpProgress {
+    id: string
+    status: 'in_progress' | 'completed' | 'failed'
+    currentTable: string
+    totalTables: number
+    processedTables: number
+    error?: string
+    r2Key?: string
+}
+
+interface StoredDumpData {
+    progress: DumpProgress
+    tables: string[]
+    currentTableIndex: number
+    currentOffset: number
+}
+
+const CHUNK_SIZE = 1000 // Number of rows to process at a time
+const PROCESSING_TIMEOUT = 5000 // 5 seconds of processing before taking a break
+const BREATHING_INTERVAL = 5000 // 5 seconds break between processing chunks
+
+export async function startChunkedDumpRoute(
+    dataSource: DataSource,
+    config: StarbaseDBConfiguration,
+    env: any
+): Promise {
+    try {
+        // Generate a unique ID for this dump operation
+        const dumpId = crypto.randomUUID()
+        const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
+        const r2Key = `dump_${timestamp}.sql`
+
+        // Initialize progress tracking
+        const progress: DumpProgress = {
+            id: dumpId,
+            status: 'in_progress',
+            currentTable: '',
+            totalTables: 0,
+            processedTables: 0,
+            r2Key,
+        }
+
+        // Get all table names
+        const tablesResult = await executeOperation(
+            [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }],
+            dataSource,
+            config
+        )
+
+        const tables = tablesResult.map((row: any) => row.name)
+        progress.totalTables = tables.length
+
+        // Create initial file in R2 with SQLite header
+        await env.DATABASE_DUMPS.put(r2Key, 'SQLite format 3\0\n', {
+            httpMetadata: {
+                contentType: 'application/x-sqlite3',
+            },
+        })
+
+        // We use a DO alarm to continue processing after the initial request
+        if (!dataSource.rpc.setAlarm || !dataSource.rpc.storage) {
+            throw new Error(
+                'setAlarm and storage capabilities required for chunked dumps'
+            )
+        }
+        const alarm = await dataSource.rpc.setAlarm(Date.now() + 1000)
+
+        // Store progress in DO storage for the alarm to pick up
+        await dataSource.rpc.storage.put('dump_progress', {
+            progress,
+            tables,
+            currentTableIndex: 0,
+            currentOffset: 0,
+        })
+
+        return createResponse(
+            {
+                message: 'Database dump started',
+                dumpId,
+                status: 'in_progress',
+                downloadUrl: `https://${env.WORKER_DOMAIN}/export/dump/${dumpId}`,
+            },
+            undefined,
+            200
+        )
+    } catch (error: any) {
+        console.error('Chunked Database Dump Error:', error)
+        return createResponse(undefined, 'Failed to start database dump', 500)
+    }
+}
+
+export async function processDumpChunk(
+    dataSource: DataSource,
+    config: StarbaseDBConfiguration,
+    env: any
+): Promise {
+    const stored = (await dataSource.rpc.storage.get(
+        'dump_progress'
+    )) as StoredDumpData
+    if (!stored) return
+
+    const { progress, tables, currentTableIndex, currentOffset } = stored
+    const startTime = Date.now()
+
+    try {
+        const table = tables[currentTableIndex]
+        progress.currentTable = table
+
+        // Get table schema if this is the first chunk of the table
+        if (currentOffset === 0) {
+            const schemaResult = await executeOperation(
+                [
+                    {
+                        sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`,
+                    },
+                ],
+                dataSource,
+                config
+            )
+
+            if (schemaResult.length) {
+                const schema = schemaResult[0].sql
+                const schemaContent = `\n-- Table: ${table}\n${schema};\n\n`
+
+                // Append schema to R2 file
+                const existingContent = await env.DATABASE_DUMPS.get(
+                    progress.r2Key
+                ).text()
+                await env.DATABASE_DUMPS.put(
+                    progress.r2Key,
+                    existingContent + schemaContent
+                )
+            }
+        }
+
+        // Get chunk of data
+        const dataResult = await executeOperation(
+            [
+                {
+                    sql: `SELECT * FROM ${table} LIMIT ${CHUNK_SIZE} OFFSET ${currentOffset};`,
+                },
+            ],
+            dataSource,
+            config
+        )
+
+        // Process the chunk
+        let insertStatements = ''
+        for (const row of dataResult) {
+            const values = Object.values(row).map((value) =>
+                typeof value === 'string'
+                    ? `'${value.replace(/'/g, "''")}'`
+                    : value
+            )
+            insertStatements += `INSERT INTO ${table} VALUES (${values.join(', ')});\n`
+        }
+
+        // Append to R2 file
+        if (insertStatements) {
+            const existingContent = await env.DATABASE_DUMPS.get(
+                progress.r2Key
+            ).text()
+            await env.DATABASE_DUMPS.put(
+                progress.r2Key,
+                existingContent + insertStatements
+            )
+        }
+
+        // Update progress
+        if (dataResult.length < CHUNK_SIZE) {
+            // Move to next table
+            progress.processedTables++
+            if (currentTableIndex + 1 < tables.length) {
+                await dataSource.rpc.storage.put('dump_progress', {
+                    ...stored,
+                    currentTableIndex: currentTableIndex + 1,
+                    currentOffset: 0,
+                })
+            } else {
+                // All done
+                progress.status = 'completed'
+                await dataSource.rpc.storage.delete('dump_progress')
+            }
+        } else {
+            // Continue with next chunk of current table
+            await dataSource.rpc.storage.put('dump_progress', {
+                ...stored,
+                currentOffset: currentOffset + CHUNK_SIZE,
+            })
+        }
+
+        // Check if we need to take a break
+        const elapsedTime = Date.now() - startTime
+        if (
+            elapsedTime >= PROCESSING_TIMEOUT &&
+            progress.status !== 'completed'
+        ) {
+            // Schedule next chunk after breathing interval
+            await dataSource.rpc.setAlarm(Date.now() + BREATHING_INTERVAL)
+        } else if (progress.status !== 'completed') {
+            // Continue immediately with next chunk
+            await dataSource.rpc.setAlarm(Date.now() + 1000)
+        }
+    } catch (error: any) {
+        console.error('Chunk Processing Error:', error)
+        progress.status = 'failed'
+        progress.error = error.message
+        await dataSource.rpc.storage.delete('dump_progress')
+    }
+}
+
+export async function getDumpStatusRoute(
+    dumpId: string,
+    dataSource: DataSource
+): Promise {
+    const stored = (await dataSource.rpc.storage.get(
+        'dump_progress'
+    )) as StoredDumpData
+    if (!stored || stored.progress.id !== dumpId) {
+        return createResponse(undefined, 'Dump not found', 404)
+    }
+
+    return createResponse(
+        {
+            status: stored.progress.status,
+            progress: {
+                currentTable: stored.progress.currentTable,
+                processedTables: stored.progress.processedTables,
+                totalTables: stored.progress.totalTables,
+                error: stored.progress.error,
+            },
+        },
+        undefined,
+        200
+    )
+}
+
+export async function getDumpFileRoute(
+    dumpId: string,
+    dataSource: DataSource,
+    env: any
+): Promise {
+    const stored = (await dataSource.rpc.storage.get(
+        'dump_progress'
+    )) as StoredDumpData
+
+    if (!stored || stored.progress.id !== dumpId) {
+        return createResponse(undefined, 'Dump not found', 404)
+    }
+
+    if (stored.progress.status !== 'completed') {
+        return createResponse(undefined, 'Dump is still in progress', 400)
+    }
+
+    const r2Object = await env.DATABASE_DUMPS.get(stored.progress.r2Key)
+    if (!r2Object) {
+        return createResponse(undefined, 'Dump file not found', 404)
+    }
+
+    const headers = new Headers({
+        'Content-Type': 'application/x-sqlite3',
+        'Content-Disposition': `attachment; filename="database_dump_${dumpId}.sql"`,
+    })
+
+    return new Response(r2Object.body, { headers })
+}
diff --git a/src/handler.ts b/src/handler.ts
index fd459a9..d8ee0ce 100644
--- a/src/handler.ts
+++ b/src/handler.ts
@@ -15,6 +15,11 @@ import { importTableFromCsvRoute } from './import/csv'
 import { corsPreflight } from './cors'
 import { handleApiRequest } from './api'
 import { StarbasePlugin, StarbasePluginRegistry } from './plugin'
+import {
+    startChunkedDumpRoute,
+    getDumpStatusRoute,
+    getDumpFileRoute,
+} from './export/chunked-dump'
 
 export interface StarbaseDBConfiguration {
     outerbaseApiKey?: string
@@ -107,6 +112,36 @@ export class StarbaseDB {
         }
 
         if (this.getFeature('export')) {
+            this.app.post(
+                '/export/dump/chunked',
+                this.isInternalSource,
+                async (c) => {
+                    return startChunkedDumpRoute(
+                        this.dataSource,
+                        this.config,
+                        c.env
+                    )
+                }
+            )
+
+            this.app.get(
+                '/export/dump/:dumpId/status',
+                this.isInternalSource,
+                async (c) => {
+                    const dumpId = c.req.param('dumpId')
+                    return getDumpStatusRoute(dumpId, this.dataSource)
+                }
+            )
+
+            this.app.get(
+                '/export/dump/:dumpId',
+                this.isInternalSource,
+                async (c) => {
+                    const dumpId = c.req.param('dumpId')
+                    return getDumpFileRoute(dumpId, this.dataSource, c.env)
+                }
+            )
+
             this.app.get('/export/dump', this.isInternalSource, async () => {
                 return dumpDatabaseRoute(this.dataSource, this.config)
             })
diff --git a/src/types.ts b/src/types.ts
index 64f24dd..c4db811 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -50,7 +50,22 @@ export type ExternalDatabaseSource =
     | TursoDBSource
 
 export type DataSource = {
-    rpc: Awaited['init']>>
+    rpc: {
+        executeQuery: (opts: {
+            sql: string
+            params?: unknown[]
+            isRaw?: boolean
+        }) => Promise<
+            | QueryResult[]
+            | {
+                  columns: string[]
+                  rows: any[][]
+                  meta: { rows_read: number; rows_written: number }
+              }
+        >
+        storage: DurableObjectStorage
+        setAlarm: (timestamp: number) => Promise
+    }
     source: 'internal' | 'external'
     external?: ExternalDatabaseSource
     context?: Record
diff --git a/wrangler.toml b/wrangler.toml
index 9ae5af3..cd00e42 100644
--- a/wrangler.toml
+++ b/wrangler.toml
@@ -14,6 +14,12 @@ compatibility_flags = ["nodejs_compat_v2"]
 [observability]
 enabled = true
 
+# R2 bucket for storing database dumps
+[[r2_buckets]]
+binding = "DATABASE_DUMPS"
+bucket_name = "starbasedb-dumps"
+preview_bucket_name = "starbasedb-dumps-dev"
+
 # Bind a Durable Object. Durable objects are a scale-to-zero compute primitive based on the actor model.
 # Durable Objects can live for as long as needed. Use these when you need a long-running "server", such as in realtime apps.
 # Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#durable-objects

From 4e717e397ce51f460af9ecbf0fae34a58bc05afa Mon Sep 17 00:00:00 2001
From: David Anyatonwu 
Date: Sat, 25 Jan 2025 13:13:16 +0100
Subject: [PATCH 2/4] feat: implement chunked database dump

---
 src/do.ts                  |  50 +++-
 src/export/chunked-dump.ts | 584 ++++++++++++++++++++++++++++---------
 src/handler.ts             |   3 +-
 src/index.ts               |   2 +-
 src/types.ts               |  40 ++-
 5 files changed, 513 insertions(+), 166 deletions(-)

diff --git a/src/do.ts b/src/do.ts
index b88c7ff..a1cf4ed 100644
--- a/src/do.ts
+++ b/src/do.ts
@@ -45,7 +45,11 @@ export class StarbaseDBDurableObject extends DurableObject {
     init() {
         return {
             executeQuery: this.executeQuery.bind(this),
-            storage: this.storage,
+            storage: {
+                get: this.storage.get.bind(this.storage),
+                put: this.storage.put.bind(this.storage),
+                delete: this.storage.delete.bind(this.storage),
+            },
             setAlarm: (timestamp: number) => this.storage.setAlarm(timestamp),
         }
     }
@@ -217,19 +221,39 @@ export class StarbaseDBDurableObject extends DurableObject {
     }
 
     async alarm(): Promise {
-        // Check if this is a dump processing alarm
-        const dumpProgress = await this.storage.get('dump_progress')
-        if (dumpProgress) {
-            const dataSource: DataSource = {
-                rpc: {
-                    executeQuery: this.executeQuery.bind(this),
-                    storage: this.storage,
-                    setAlarm: (timestamp: number) =>
-                        this.storage.setAlarm(timestamp),
-                },
-                source: 'internal',
+        try {
+            console.log('Alarm triggered')
+            // List all dump progress keys
+            const allKeys = await this.storage.list({
+                prefix: 'dump_progress_',
+            })
+            console.log('Found dump progress keys:', allKeys)
+
+            // Process each active dump
+            for (const key of allKeys.keys()) {
+                const dumpProgress = await this.storage.get(key)
+                console.log(
+                    'Processing dump progress for key:',
+                    key,
+                    dumpProgress
+                )
+
+                if (dumpProgress) {
+                    const dataSource: DataSource = {
+                        rpc: {
+                            executeQuery: this.executeQuery.bind(this),
+                            storage: this.storage,
+                            setAlarm: (timestamp: number) =>
+                                this.storage.setAlarm(timestamp),
+                        },
+                        source: 'internal',
+                    }
+                    await processDumpChunk(dataSource, this.config, this.env)
+                }
             }
-            await processDumpChunk(dataSource, this.config, this.env)
+        } catch (error: any) {
+            console.error('Error in alarm handler:', error)
+            console.error('Error stack:', error.stack)
         }
     }
 
diff --git a/src/export/chunked-dump.ts b/src/export/chunked-dump.ts
index 8aec031..8cd1c5b 100644
--- a/src/export/chunked-dump.ts
+++ b/src/export/chunked-dump.ts
@@ -11,6 +11,8 @@ interface DumpProgress {
     processedTables: number
     error?: string
     r2Key?: string
+    callbackUrl?: string
+    estimatedSize?: number
 }
 
 interface StoredDumpData {
@@ -18,23 +20,100 @@ interface StoredDumpData {
     tables: string[]
     currentTableIndex: number
     currentOffset: number
+    useR2: boolean
+    chunkSize: number
 }
 
-const CHUNK_SIZE = 1000 // Number of rows to process at a time
+const DEFAULT_CHUNK_SIZE = 1000 // Default number of rows to process at a time
+const LARGE_CHUNK_SIZE = 5000 // Chunk size for small tables
+const SMALL_CHUNK_SIZE = 500 // Chunk size for large tables
+const SIZE_THRESHOLD_FOR_R2 = 100 * 1024 * 1024 // 100MB threshold for using R2
 const PROCESSING_TIMEOUT = 5000 // 5 seconds of processing before taking a break
 const BREATHING_INTERVAL = 5000 // 5 seconds break between processing chunks
 
+async function estimateDatabaseSize(
+    dataSource: DataSource,
+    tables: string[]
+): Promise {
+    let totalSize = 0
+    for (const table of tables) {
+        // Get row count
+        const countResult = (await dataSource.rpc.executeQuery({
+            sql: `SELECT COUNT(*) as count FROM ${table};`,
+        })) as Record[]
+        const rowCount = countResult[0]?.count || 0
+
+        // Get average row size from a sample
+        const sampleResult = (await dataSource.rpc.executeQuery({
+            sql: `SELECT * FROM ${table} LIMIT 100;`,
+        })) as Record[]
+        let avgRowSize = 0
+        if (Array.isArray(sampleResult) && sampleResult.length > 0) {
+            const totalSize = sampleResult.reduce((size, row) => {
+                return size + JSON.stringify(row).length
+            }, 0)
+            avgRowSize = totalSize / sampleResult.length
+        }
+
+        totalSize += rowCount * avgRowSize
+    }
+    return totalSize
+}
+
+function determineChunkSize(tableRowCount: number): number {
+    if (tableRowCount < 10000) {
+        return LARGE_CHUNK_SIZE // Larger chunks for small tables
+    } else if (tableRowCount > 100000) {
+        return SMALL_CHUNK_SIZE // Smaller chunks for large tables
+    }
+    return DEFAULT_CHUNK_SIZE
+}
+
+async function notifyCallback(
+    callbackUrl: string,
+    dumpId: string,
+    status: string
+) {
+    try {
+        await fetch(callbackUrl, {
+            method: 'POST',
+            headers: {
+                'Content-Type': 'application/json',
+            },
+            body: JSON.stringify({
+                dumpId,
+                status,
+                timestamp: new Date().toISOString(),
+            }),
+        })
+    } catch (error) {
+        console.error('Error notifying callback:', error)
+    }
+}
+
 export async function startChunkedDumpRoute(
     dataSource: DataSource,
     config: StarbaseDBConfiguration,
-    env: any
+    env: any,
+    request?: Request
 ): Promise {
     try {
         // Generate a unique ID for this dump operation
         const dumpId = crypto.randomUUID()
-        const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
+        const now = new Date()
+        const timestamp =
+            now.getUTCFullYear().toString() +
+            String(now.getUTCMonth() + 1).padStart(2, '0') +
+            String(now.getUTCDate()).padStart(2, '0') +
+            '-' +
+            String(now.getUTCHours()).padStart(2, '0') +
+            String(now.getUTCMinutes()).padStart(2, '0') +
+            String(now.getUTCSeconds()).padStart(2, '0')
         const r2Key = `dump_${timestamp}.sql`
 
+        // Get callback URL from request if provided
+        const callbackUrl = request?.headers.get('X-Callback-URL') || undefined
+
         // Initialize progress tracking
         const progress: DumpProgress = {
             id: dumpId,
@@ -43,11 +122,16 @@ export async function startChunkedDumpRoute(
             totalTables: 0,
             processedTables: 0,
             r2Key,
+            callbackUrl,
         }
 
         // Get all table names
         const tablesResult = await executeOperation(
-            [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }],
+            [
+                {
+                    sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_cf_%' AND name NOT LIKE 'sqlite_%';",
+                },
+            ],
             dataSource,
             config
         )
@@ -55,12 +139,30 @@ export async function startChunkedDumpRoute(
         const tables = tablesResult.map((row: any) => row.name)
         progress.totalTables = tables.length
 
-        // Create initial file in R2 with SQLite header
-        await env.DATABASE_DUMPS.put(r2Key, 'SQLite format 3\0\n', {
-            httpMetadata: {
-                contentType: 'application/x-sqlite3',
-            },
-        })
+        // Estimate database size
+        const estimatedSize = await estimateDatabaseSize(dataSource, tables)
+        progress.estimatedSize = estimatedSize
+        console.log('Estimated database size:', estimatedSize, 'bytes')
+
+        // Determine storage type based on size
+        const shouldUseR2 =
+            env?.DATABASE_DUMPS && estimatedSize > SIZE_THRESHOLD_FOR_R2
+        if (shouldUseR2) {
+            console.log('Using R2 storage due to large estimated size')
+        } else {
+            console.log('Using DO storage due to small estimated size')
+        }
+
+        // Store initial content
+        if (shouldUseR2) {
+            await env.DATABASE_DUMPS.put(r2Key, 'SQLite format 3\0\n', {
+                httpMetadata: {
+                    contentType: 'application/x-sqlite3',
+                },
+            })
+        } else {
+            await dataSource.rpc.storage.put(r2Key, 'SQLite format 3\0\n')
+        }
 
         // We use a DO alarm to continue processing after the initial request
         if (!dataSource.rpc.setAlarm || !dataSource.rpc.storage) {
@@ -71,26 +173,45 @@ export async function startChunkedDumpRoute(
         const alarm = await dataSource.rpc.setAlarm(Date.now() + 1000)
 
         // Store progress in DO storage for the alarm to pick up
-        await dataSource.rpc.storage.put('dump_progress', {
+        const progressKey = `dump_progress_${dumpId}`
+        await dataSource.rpc.storage.put(progressKey, {
             progress,
             tables,
             currentTableIndex: 0,
             currentOffset: 0,
+            useR2: shouldUseR2,
+            chunkSize: DEFAULT_CHUNK_SIZE,
         })
 
+        // Get base URL from request or fallback to localhost
+        const baseUrl = request
+            ? new URL(request.url).origin
+            : 'http://localhost:8787'
+
         return createResponse(
             {
                 message: 'Database dump started',
                 dumpId,
                 status: 'in_progress',
-                downloadUrl: `https://${env.WORKER_DOMAIN}/export/dump/${dumpId}`,
+                downloadUrl: `${baseUrl}/export/dump/${dumpId}`,
+                estimatedSize,
             },
             undefined,
             200
         )
     } catch (error: any) {
         console.error('Chunked Database Dump Error:', error)
-        return createResponse(undefined, 'Failed to start database dump', 500)
+        console.error('Error stack:', error.stack)
+        console.error('Error details:', {
+            message: error.message,
+            name: error.name,
+            cause: error.cause,
+        })
+        return createResponse(
+            undefined,
+            `Failed to start database dump: ${error.message}`,
+            500
+        )
     }
 }
 
@@ -99,118 +220,237 @@ export async function processDumpChunk(
     config: StarbaseDBConfiguration,
     env: any
 ): Promise {
-    const stored = (await dataSource.rpc.storage.get(
-        'dump_progress'
-    )) as StoredDumpData
-    if (!stored) return
-
-    const { progress, tables, currentTableIndex, currentOffset } = stored
-    const startTime = Date.now()
-
     try {
-        const table = tables[currentTableIndex]
-        progress.currentTable = table
-
-        // Get table schema if this is the first chunk of the table
-        if (currentOffset === 0) {
-            const schemaResult = await executeOperation(
-                [
-                    {
-                        sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`,
-                    },
-                ],
-                dataSource,
-                config
-            )
+        console.log('Starting processDumpChunk')
+        // Get all dump progress keys
+        const allKeys = await dataSource.rpc.storage.list({
+            prefix: 'dump_progress_',
+        })
+        console.log('Found dump progress keys:', allKeys)
+
+        // Process each active dump
+        for (const progressKey of allKeys.keys()) {
+            console.log('Processing dump with key:', progressKey)
+            const stored = (await dataSource.rpc.storage.get(
+                progressKey
+            )) as StoredDumpData & { useR2: boolean }
+            console.log('Stored dump progress:', stored)
 
-            if (schemaResult.length) {
-                const schema = schemaResult[0].sql
-                const schemaContent = `\n-- Table: ${table}\n${schema};\n\n`
-
-                // Append schema to R2 file
-                const existingContent = await env.DATABASE_DUMPS.get(
-                    progress.r2Key
-                ).text()
-                await env.DATABASE_DUMPS.put(
-                    progress.r2Key,
-                    existingContent + schemaContent
+            if (!stored) {
+                console.log(
+                    'No stored dump progress found for key:',
+                    progressKey
                 )
+                continue
             }
-        }
 
-        // Get chunk of data
-        const dataResult = await executeOperation(
-            [
-                {
-                    sql: `SELECT * FROM ${table} LIMIT ${CHUNK_SIZE} OFFSET ${currentOffset};`,
-                },
-            ],
-            dataSource,
-            config
-        )
+            const {
+                progress,
+                tables,
+                currentTableIndex,
+                currentOffset,
+                useR2,
+                chunkSize,
+            } = stored
+            console.log('Processing table:', {
+                currentTable: tables[currentTableIndex],
+                currentTableIndex,
+                totalTables: tables.length,
+                currentOffset,
+            })
 
-        // Process the chunk
-        let insertStatements = ''
-        for (const row of dataResult) {
-            const values = Object.values(row).map((value) =>
-                typeof value === 'string'
-                    ? `'${value.replace(/'/g, "''")}'`
-                    : value
-            )
-            insertStatements += `INSERT INTO ${table} VALUES (${values.join(', ')});\n`
-        }
+            const startTime = Date.now()
 
-        // Append to R2 file
-        if (insertStatements) {
-            const existingContent = await env.DATABASE_DUMPS.get(
-                progress.r2Key
-            ).text()
-            await env.DATABASE_DUMPS.put(
-                progress.r2Key,
-                existingContent + insertStatements
-            )
-        }
+            try {
+                const table = tables[currentTableIndex]
+                progress.currentTable = table
+
+                // Get table schema if this is the first chunk of the table
+                if (currentOffset === 0) {
+                    console.log('Getting schema for table:', table)
+                    const schemaResult = await dataSource.rpc.executeQuery({
+                        sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`,
+                        params: [table],
+                    })
+                    console.log('Schema result:', schemaResult)
+
+                    if (
+                        Array.isArray(schemaResult) &&
+                        schemaResult.length > 0
+                    ) {
+                        const schema = schemaResult[0].sql
+                        const schemaContent = `\n-- Table: ${table}\n${schema};\n\n`
+
+                        // Append schema to file
+                        if (useR2 && progress.r2Key) {
+                            const existingContent =
+                                await env.DATABASE_DUMPS.get(
+                                    progress.r2Key
+                                ).text()
+                            await env.DATABASE_DUMPS.put(
+                                progress.r2Key,
+                                existingContent + schemaContent
+                            )
+                        } else if (progress.r2Key) {
+                            const existingContent =
+                                ((await dataSource.rpc.storage.get(
+                                    progress.r2Key
+                                )) as string) || ''
+                            await dataSource.rpc.storage.put(
+                                progress.r2Key,
+                                existingContent + schemaContent
+                            )
+                        }
+
+                        // Determine chunk size based on table size
+                        const countResult = await dataSource.rpc.executeQuery({
+                            sql: `SELECT COUNT(*) as count FROM ${table};`,
+                        })
+                        if (
+                            Array.isArray(countResult) &&
+                            countResult.length > 0
+                        ) {
+                            const rowCount = countResult[0].count as number
+                            stored.chunkSize = determineChunkSize(rowCount)
+                            console.log(
+                                `Adjusted chunk size for table ${table}:`,
+                                stored.chunkSize
+                            )
+                        }
+                    }
+                }
+
+                // Get chunk of data
+                console.log('Getting data chunk for table:', table)
+                const dataResult = await dataSource.rpc.executeQuery({
+                    sql: `SELECT * FROM ${table} LIMIT ? OFFSET ?;`,
+                    params: [stored.chunkSize, currentOffset],
+                })
+                console.log('Data result:', dataResult)
+
+                // Process the chunk
+                let insertStatements = ''
+                if (Array.isArray(dataResult)) {
+                    for (const row of dataResult) {
+                        const values = Object.values(row).map((value) =>
+                            typeof value === 'string'
+                                ? `'${value.replace(/'/g, "''")}'`
+                                : value === null
+                                  ? 'NULL'
+                                  : value
+                        )
+                        insertStatements += `INSERT INTO ${table} VALUES (${values.join(', ')});\n`
+                    }
+                } else {
+                    console.warn(
+                        'Data result is not an array:',
+                        typeof dataResult
+                    )
+                }
+
+                // Append to file
+                if (insertStatements && progress.r2Key) {
+                    console.log('Appending insert statements to file')
+                    if (useR2) {
+                        const existingContent = await env.DATABASE_DUMPS.get(
+                            progress.r2Key
+                        ).text()
+                        await env.DATABASE_DUMPS.put(
+                            progress.r2Key,
+                            existingContent + insertStatements
+                        )
+                    } else {
+                        const existingContent =
+                            ((await dataSource.rpc.storage.get(
+                                progress.r2Key
+                            )) as string) || ''
+                        await dataSource.rpc.storage.put(
+                            progress.r2Key,
+                            existingContent + insertStatements
+                        )
+                    }
+                }
+
+                // Update progress
+                if (
+                    !Array.isArray(dataResult) ||
+                    dataResult.length < stored.chunkSize
+                ) {
+                    // Move to next table
+                    console.log('Moving to next table')
+                    progress.processedTables++
+                    if (currentTableIndex + 1 < tables.length) {
+                        await dataSource.rpc.storage.put(progressKey, {
+                            ...stored,
+                            progress,
+                            currentTableIndex: currentTableIndex + 1,
+                            currentOffset: 0,
+                        })
+                    } else {
+                        // All done
+                        console.log('Dump completed')
+                        progress.status = 'completed'
+                        // Update progress instead of deleting it
+                        await dataSource.rpc.storage.put(progressKey, {
+                            ...stored,
+                            progress,
+                        })
+
+                        // Send callback if configured
+                        if (progress.callbackUrl) {
+                            await notifyCallback(
+                                progress.callbackUrl,
+                                progress.id,
+                                'completed'
+                            )
+                        }
+                        continue // Move to next dump if any
+                    }
+                } else {
+                    // Continue with next chunk of current table
+                    console.log('Moving to next chunk')
+                    await dataSource.rpc.storage.put(progressKey, {
+                        ...stored,
+                        progress,
+                        currentOffset: currentOffset + stored.chunkSize,
+                    })
+                }
 
-        // Update progress
-        if (dataResult.length < CHUNK_SIZE) {
-            // Move to next table
-            progress.processedTables++
-            if (currentTableIndex + 1 < tables.length) {
-                await dataSource.rpc.storage.put('dump_progress', {
+                // Check if we need to take a break
+                if (Date.now() - startTime >= PROCESSING_TIMEOUT) {
+                    console.log('Taking a break from processing')
+                    await dataSource.rpc.setAlarm(
+                        Date.now() + BREATHING_INTERVAL
+                    )
+                    return
+                }
+            } catch (error: any) {
+                console.error('Error processing chunk:', error)
+                progress.status = 'failed'
+                progress.error = error.message
+                await dataSource.rpc.storage.put(progressKey, {
                     ...stored,
-                    currentTableIndex: currentTableIndex + 1,
-                    currentOffset: 0,
+                    progress,
                 })
-            } else {
-                // All done
-                progress.status = 'completed'
-                await dataSource.rpc.storage.delete('dump_progress')
+
+                // Send callback if configured
+                if (progress.callbackUrl) {
+                    await notifyCallback(
+                        progress.callbackUrl,
+                        progress.id,
+                        'failed'
+                    )
+                }
             }
-        } else {
-            // Continue with next chunk of current table
-            await dataSource.rpc.storage.put('dump_progress', {
-                ...stored,
-                currentOffset: currentOffset + CHUNK_SIZE,
-            })
         }
 
-        // Check if we need to take a break
-        const elapsedTime = Date.now() - startTime
-        if (
-            elapsedTime >= PROCESSING_TIMEOUT &&
-            progress.status !== 'completed'
-        ) {
-            // Schedule next chunk after breathing interval
+        // Schedule next processing if there are active dumps
+        if (allKeys.size > 0) {
             await dataSource.rpc.setAlarm(Date.now() + BREATHING_INTERVAL)
-        } else if (progress.status !== 'completed') {
-            // Continue immediately with next chunk
-            await dataSource.rpc.setAlarm(Date.now() + 1000)
         }
     } catch (error: any) {
-        console.error('Chunk Processing Error:', error)
-        progress.status = 'failed'
-        progress.error = error.message
-        await dataSource.rpc.storage.delete('dump_progress')
+        console.error('Error in processDumpChunk:', error)
+        console.error('Error stack:', error.stack)
     }
 }
 
@@ -218,26 +458,40 @@ export async function getDumpStatusRoute(
     dumpId: string,
     dataSource: DataSource
 ): Promise {
-    const stored = (await dataSource.rpc.storage.get(
-        'dump_progress'
-    )) as StoredDumpData
-    if (!stored || stored.progress.id !== dumpId) {
-        return createResponse(undefined, 'Dump not found', 404)
-    }
+    try {
+        console.log('Checking dump status for ID:', dumpId)
+        const progressKey = `dump_progress_${dumpId}`
+        const stored = (await dataSource.rpc.storage.get(
+            progressKey
+        )) as StoredDumpData & { useR2: boolean }
+        console.log('Stored dump progress:', stored)
+
+        if (!stored) {
+            return createResponse(undefined, 'Dump not found', 404)
+        }
 
-    return createResponse(
-        {
-            status: stored.progress.status,
-            progress: {
-                currentTable: stored.progress.currentTable,
-                processedTables: stored.progress.processedTables,
-                totalTables: stored.progress.totalTables,
-                error: stored.progress.error,
+        return createResponse(
+            {
+                status: stored.progress.status,
+                progress: {
+                    currentTable: stored.progress.currentTable,
+                    processedTables: stored.progress.processedTables,
+                    totalTables: stored.progress.totalTables,
+                    error: stored.progress.error,
+                },
             },
-        },
-        undefined,
-        200
-    )
+            undefined,
+            200
+        )
+    } catch (error: any) {
+        console.error('Error checking dump status:', error)
+        console.error('Error stack:', error.stack)
+        return createResponse(
+            undefined,
+            `Error checking dump status: ${error.message}`,
+            500
+        )
+    }
 }
 
 export async function getDumpFileRoute(
@@ -245,27 +499,67 @@ export async function getDumpFileRoute(
     dataSource: DataSource,
     env: any
 ): Promise {
-    const stored = (await dataSource.rpc.storage.get(
-        'dump_progress'
-    )) as StoredDumpData
+    try {
+        console.log('Getting dump file for ID:', dumpId)
+        const progressKey = `dump_progress_${dumpId}`
+        const stored = (await dataSource.rpc.storage.get(
+            progressKey
+        )) as StoredDumpData & { useR2: boolean }
+        console.log('Stored dump progress:', stored)
 
-    if (!stored || stored.progress.id !== dumpId) {
-        return createResponse(undefined, 'Dump not found', 404)
-    }
+        if (!stored) {
+            return createResponse(undefined, 'Dump not found', 404)
+        }
 
-    if (stored.progress.status !== 'completed') {
-        return createResponse(undefined, 'Dump is still in progress', 400)
-    }
+        if (stored.progress.status !== 'completed') {
+            return createResponse(undefined, 'Dump is still in progress', 400)
+        }
 
-    const r2Object = await env.DATABASE_DUMPS.get(stored.progress.r2Key)
-    if (!r2Object) {
-        return createResponse(undefined, 'Dump file not found', 404)
-    }
+        if (!stored.progress.r2Key) {
+            return createResponse(undefined, 'Dump file key not found', 404)
+        }
+
+        let content: string | ReadableStream
+        if (stored.useR2) {
+            const r2Object = await env.DATABASE_DUMPS.get(stored.progress.r2Key)
+            if (!r2Object) {
+                return createResponse(
+                    undefined,
+                    'Dump file not found in R2',
+                    404
+                )
+            }
+            content = r2Object.body
+        } else {
+            content =
+                ((await dataSource.rpc.storage.get(
+                    stored.progress.r2Key
+                )) as string) || ''
+            if (!content) {
+                return createResponse(
+                    undefined,
+                    'Dump file not found in storage',
+                    404
+                )
+            }
+        }
 
-    const headers = new Headers({
-        'Content-Type': 'application/x-sqlite3',
-        'Content-Disposition': `attachment; filename="database_dump_${dumpId}.sql"`,
-    })
+        const headers = new Headers({
+            'Content-Type': 'application/x-sqlite3',
+            'Content-Disposition': `attachment; filename="database_dump_${dumpId}.sql"`,
+        })
 
-    return new Response(r2Object.body, { headers })
+        // Delete the progress after successful download
+        await dataSource.rpc.storage.delete(progressKey)
+
+        return new Response(content, { headers })
+    } catch (error: any) {
+        console.error('Error getting dump file:', error)
+        console.error('Error stack:', error.stack)
+        return createResponse(
+            undefined,
+            `Error getting dump file: ${error.message}`,
+            500
+        )
+    }
 }
diff --git a/src/handler.ts b/src/handler.ts
index d8ee0ce..597ad4b 100644
--- a/src/handler.ts
+++ b/src/handler.ts
@@ -119,7 +119,8 @@ export class StarbaseDB {
                     return startChunkedDumpRoute(
                         this.dataSource,
                         this.config,
-                        c.env
+                        c.env,
+                        c.req.raw
                     )
                 }
             )
diff --git a/src/index.ts b/src/index.ts
index 85ad2af..97f7be7 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -95,7 +95,7 @@ export default {
                     : env.DATABASE_DURABLE_OBJECT.get(id)
 
             // Create a new RPC Session on the Durable Object.
-            const rpc = await stub.init()
+            const rpc = (await stub.init()) as unknown as DataSource['rpc']
 
             // Get the source type from headers/query params.
             const source =
diff --git a/src/types.ts b/src/types.ts
index c4db811..5bc1498 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -1,6 +1,22 @@
 import { StarbaseDBDurableObject } from './do'
 import { StarbasePlugin, StarbasePluginRegistry } from './plugin'
 
+type Stub = T & { dispose?: () => void }
+
+export type StubArrayBuffer = {
+    readonly byteLength: number
+    slice: (begin: number, end?: number) => Promise
+    readonly [Symbol.toStringTag]: string
+} & { dispose?: () => void }
+
+export type SqlStorageValue =
+    | string
+    | number
+    | boolean
+    | null
+    | ArrayBuffer
+    | StubArrayBuffer
+
 export type QueryResult = Record
 
 export type RemoteSource = {
@@ -56,15 +72,27 @@ export type DataSource = {
             params?: unknown[]
             isRaw?: boolean
         }) => Promise<
-            | QueryResult[]
+            | Record[]
             | {
                   columns: string[]
-                  rows: any[][]
-                  meta: { rows_read: number; rows_written: number }
+                  rows: SqlStorageValue[][]
+                  meta: {
+                      rows_read: number
+                      rows_written: number
+                  }
+              }
+        > & { dispose?: () => void }
+        storage:
+            | DurableObjectStorage
+            | {
+                  get: DurableObjectStorage['get']
+                  put: DurableObjectStorage['put']
+                  delete: DurableObjectStorage['delete']
+                  list: DurableObjectStorage['list']
               }
-        >
-        storage: DurableObjectStorage
-        setAlarm: (timestamp: number) => Promise
+        setAlarm: ((timestamp: number) => Promise) & {
+            dispose?: () => void
+        }
     }
     source: 'internal' | 'external'
     external?: ExternalDatabaseSource

From 1d430dce9d2db7beff089f19fb8529cd51c33604 Mon Sep 17 00:00:00 2001
From: David Anyatonwu 
Date: Tue, 11 Feb 2025 08:35:00 +0100
Subject: [PATCH 3/4] fix: remove duplicate setAlarm property in DO init method

---
 src/do.ts | 38 --------------------------------------
 1 file changed, 38 deletions(-)

diff --git a/src/do.ts b/src/do.ts
index 5aa61af..c73b46d 100644
--- a/src/do.ts
+++ b/src/do.ts
@@ -78,7 +78,6 @@ export class StarbaseDBDurableObject extends DurableObject {
                 put: this.storage.put.bind(this.storage),
                 delete: this.storage.delete.bind(this.storage),
             },
-            setAlarm: (timestamp: number) => this.storage.setAlarm(timestamp),
         }
     }
 
@@ -345,43 +344,6 @@ export class StarbaseDBDurableObject extends DurableObject {
         }
     }
 
-    async alarm(): Promise {
-        try {
-            console.log('Alarm triggered')
-            // List all dump progress keys
-            const allKeys = await this.storage.list({
-                prefix: 'dump_progress_',
-            })
-            console.log('Found dump progress keys:', allKeys)
-
-            // Process each active dump
-            for (const key of allKeys.keys()) {
-                const dumpProgress = await this.storage.get(key)
-                console.log(
-                    'Processing dump progress for key:',
-                    key,
-                    dumpProgress
-                )
-
-                if (dumpProgress) {
-                    const dataSource: DataSource = {
-                        rpc: {
-                            executeQuery: this.executeQuery.bind(this),
-                            storage: this.storage,
-                            setAlarm: (timestamp: number) =>
-                                this.storage.setAlarm(timestamp),
-                        },
-                        source: 'internal',
-                    }
-                    await processDumpChunk(dataSource, this.config, this.env)
-                }
-            }
-        } catch (error: any) {
-            console.error('Error in alarm handler:', error)
-            console.error('Error stack:', error.stack)
-        }
-    }
-
     private async initializeTables() {
         // Install default necessary `tmp_` tables for various features here.
         const cacheStatement = `

From acf3b462396c0b6f286e5a7e243a6d834f046ca7 Mon Sep 17 00:00:00 2001
From: David Anyatonwu 
Date: Tue, 11 Feb 2025 20:06:10 +0100
Subject: [PATCH 4/4] refactor: improve database size estimation with dynamic
 sampling

---
 src/export/chunked-dump.ts | 279 ++++++++++++++++++++++++++-----------
 1 file changed, 197 insertions(+), 82 deletions(-)

diff --git a/src/export/chunked-dump.ts b/src/export/chunked-dump.ts
index 8cd1c5b..65f2e19 100644
--- a/src/export/chunked-dump.ts
+++ b/src/export/chunked-dump.ts
@@ -37,26 +37,60 @@ async function estimateDatabaseSize(
 ): Promise {
     let totalSize = 0
     for (const table of tables) {
+        const quotedTable = `"${table.replace(/"/g, '""')}"` // Properly escape quotes in table names
+
         // Get row count
         const countResult = (await dataSource.rpc.executeQuery({
-            sql: `SELECT COUNT(*) as count FROM ${table};`,
+            sql: `SELECT COUNT(*) as count FROM ${quotedTable};`,
         })) as Record[]
         const rowCount = countResult[0]?.count || 0
 
-        // Get average row size from a sample
-        const sampleResult = (await dataSource.rpc.executeQuery({
-            sql: `SELECT * FROM ${table} LIMIT 100;`,
-        })) as Record[]
-        let avgRowSize = 0
-        if (Array.isArray(sampleResult) && sampleResult.length > 0) {
-            const totalSize = sampleResult.reduce((size, row) => {
-                return size + JSON.stringify(row).length
-            }, 0)
-            avgRowSize = totalSize / sampleResult.length
+        // Get table schema to understand column types
+        const schemaResult = (await dataSource.rpc.executeQuery({
+            sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`,
+            params: [table],
+        })) as Record[]
+
+        // Sample some rows to get average row size
+        const sampleSize = Math.min(100, rowCount) // Sample up to 100 rows
+        if (sampleSize > 0) {
+            const sampleResult = (await dataSource.rpc.executeQuery({
+                sql: `SELECT * FROM ${quotedTable} LIMIT ?;`,
+                params: [sampleSize],
+            })) as Record[]
+
+            // Calculate average row size from sample
+            if (sampleResult.length > 0) {
+                const totalSampleSize = sampleResult.reduce((size, row) => {
+                    // Convert row to SQL insert statement to estimate actual dump size
+                    const values = Object.values(row).map((value) =>
+                        typeof value === 'string'
+                            ? `'${value.replace(/'/g, "''")}'`
+                            : value === null
+                              ? 'NULL'
+                              : String(value)
+                    )
+                    const insertStmt = `INSERT INTO ${table} VALUES (${values.join(', ')});\n`
+                    return size + insertStmt.length
+                }, 0)
+
+                const avgRowSize = Math.ceil(
+                    totalSampleSize / sampleResult.length
+                )
+                totalSize += rowCount * avgRowSize
+            }
         }
 
-        totalSize += rowCount * avgRowSize
+        // Add size for table schema
+        if (schemaResult[0]?.sql) {
+            totalSize += schemaResult[0].sql.length + 20 // Add some padding for formatting
+        }
     }
+
+    // Add some overhead for SQLite header and formatting
+    totalSize += 100 // SQLite header
+    totalSize = Math.ceil(totalSize * 1.1) // Add 10% overhead for safety
+
     return totalSize
 }
 
@@ -145,12 +179,24 @@ export async function startChunkedDumpRoute(
         console.log('Estimated database size:', estimatedSize, 'bytes')
 
         // Determine storage type based on size
-        const shouldUseR2 =
+        const shouldUseR2 = Boolean(
             env?.DATABASE_DUMPS && estimatedSize > SIZE_THRESHOLD_FOR_R2
+        )
         if (shouldUseR2) {
-            console.log('Using R2 storage due to large estimated size')
-        } else {
-            console.log('Using DO storage due to small estimated size')
+            if (!env?.DATABASE_DUMPS) {
+                throw new Error(
+                    'R2 storage requested but R2 binding not available'
+                )
+            }
+            // Test R2 access
+            try {
+                await env.DATABASE_DUMPS.head(r2Key)
+            } catch (error) {
+                console.error('R2 access test failed:', error)
+                throw new Error(
+                    'R2 storage is not accessible. Please check your R2 bucket configuration.'
+                )
+            }
         }
 
         // Store initial content
@@ -228,22 +274,26 @@ export async function processDumpChunk(
         })
         console.log('Found dump progress keys:', allKeys)
 
+        let hasActiveDumps = false
+
         // Process each active dump
         for (const progressKey of allKeys.keys()) {
-            console.log('Processing dump with key:', progressKey)
             const stored = (await dataSource.rpc.storage.get(
                 progressKey
             )) as StoredDumpData & { useR2: boolean }
-            console.log('Stored dump progress:', stored)
 
-            if (!stored) {
-                console.log(
-                    'No stored dump progress found for key:',
-                    progressKey
-                )
+            if (
+                !stored ||
+                stored.progress.status === 'completed' ||
+                stored.progress.status === 'failed'
+            ) {
+                // Clean up completed or failed dumps that weren't properly cleaned
+                await dataSource.rpc.storage.delete(progressKey)
                 continue
             }
 
+            hasActiveDumps = true
+            console.log('Processing dump with key:', progressKey)
             const {
                 progress,
                 tables,
@@ -268,29 +318,48 @@ export async function processDumpChunk(
                 // Get table schema if this is the first chunk of the table
                 if (currentOffset === 0) {
                     console.log('Getting schema for table:', table)
-                    const schemaResult = await dataSource.rpc.executeQuery({
+                    const schemaResult = (await dataSource.rpc.executeQuery({
                         sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`,
                         params: [table],
-                    })
+                    })) as Record[]
                     console.log('Schema result:', schemaResult)
 
-                    if (
-                        Array.isArray(schemaResult) &&
-                        schemaResult.length > 0
-                    ) {
-                        const schema = schemaResult[0].sql
+                    if (schemaResult && schemaResult[0]?.sql) {
+                        const schema = schemaResult[0]?.sql
                         const schemaContent = `\n-- Table: ${table}\n${schema};\n\n`
 
                         // Append schema to file
                         if (useR2 && progress.r2Key) {
-                            const existingContent =
-                                await env.DATABASE_DUMPS.get(
-                                    progress.r2Key
-                                ).text()
-                            await env.DATABASE_DUMPS.put(
-                                progress.r2Key,
-                                existingContent + schemaContent
+                            const r2Object = await env.DATABASE_DUMPS.get(
+                                progress.r2Key
                             )
+                            if (!r2Object) {
+                                const existingContent = ''
+                                await env.DATABASE_DUMPS.put(
+                                    progress.r2Key,
+                                    existingContent + schemaContent,
+                                    {
+                                        httpMetadata: {
+                                            contentType: 'application/sql',
+                                        },
+                                    }
+                                )
+                            } else {
+                                const existingContent = await r2Object
+                                    .arrayBuffer()
+                                    .then((buf: ArrayBuffer) =>
+                                        new TextDecoder().decode(buf)
+                                    )
+                                await env.DATABASE_DUMPS.put(
+                                    progress.r2Key,
+                                    existingContent + schemaContent,
+                                    {
+                                        httpMetadata: {
+                                            contentType: 'application/sql',
+                                        },
+                                    }
+                                )
+                            }
                         } else if (progress.r2Key) {
                             const existingContent =
                                 ((await dataSource.rpc.storage.get(
@@ -303,14 +372,11 @@ export async function processDumpChunk(
                         }
 
                         // Determine chunk size based on table size
-                        const countResult = await dataSource.rpc.executeQuery({
-                            sql: `SELECT COUNT(*) as count FROM ${table};`,
-                        })
-                        if (
-                            Array.isArray(countResult) &&
-                            countResult.length > 0
-                        ) {
-                            const rowCount = countResult[0].count as number
+                        const rows = (await dataSource.rpc.executeQuery({
+                            sql: `SELECT * FROM "${table.replace(/"/g, '""')}";`,
+                        })) as Record[]
+                        if (rows && rows.length > 0) {
+                            const rowCount = rows.length as number
                             stored.chunkSize = determineChunkSize(rowCount)
                             console.log(
                                 `Adjusted chunk size for table ${table}:`,
@@ -322,10 +388,10 @@ export async function processDumpChunk(
 
                 // Get chunk of data
                 console.log('Getting data chunk for table:', table)
-                const dataResult = await dataSource.rpc.executeQuery({
-                    sql: `SELECT * FROM ${table} LIMIT ? OFFSET ?;`,
+                const dataResult = (await dataSource.rpc.executeQuery({
+                    sql: `SELECT * FROM "${table.replace(/"/g, '""')}" LIMIT ? OFFSET ?;`,
                     params: [stored.chunkSize, currentOffset],
-                })
+                })) as Record[]
                 console.log('Data result:', dataResult)
 
                 // Process the chunk
@@ -352,13 +418,36 @@ export async function processDumpChunk(
                 if (insertStatements && progress.r2Key) {
                     console.log('Appending insert statements to file')
                     if (useR2) {
-                        const existingContent = await env.DATABASE_DUMPS.get(
+                        const r2Object = await env.DATABASE_DUMPS.get(
                             progress.r2Key
-                        ).text()
-                        await env.DATABASE_DUMPS.put(
-                            progress.r2Key,
-                            existingContent + insertStatements
                         )
+                        if (!r2Object) {
+                            const existingContent = ''
+                            await env.DATABASE_DUMPS.put(
+                                progress.r2Key,
+                                existingContent + insertStatements,
+                                {
+                                    httpMetadata: {
+                                        contentType: 'application/sql',
+                                    },
+                                }
+                            )
+                        } else {
+                            const existingContent = await r2Object
+                                .arrayBuffer()
+                                .then((buf: ArrayBuffer) =>
+                                    new TextDecoder().decode(buf)
+                                )
+                            await env.DATABASE_DUMPS.put(
+                                progress.r2Key,
+                                existingContent + insertStatements,
+                                {
+                                    httpMetadata: {
+                                        contentType: 'application/sql',
+                                    },
+                                }
+                            )
+                        }
                     } else {
                         const existingContent =
                             ((await dataSource.rpc.storage.get(
@@ -444,8 +533,8 @@ export async function processDumpChunk(
             }
         }
 
-        // Schedule next processing if there are active dumps
-        if (allKeys.size > 0) {
+        // Only schedule next processing if there are active dumps in progress
+        if (hasActiveDumps) {
             await dataSource.rpc.setAlarm(Date.now() + BREATHING_INTERVAL)
         }
     } catch (error: any) {
@@ -520,39 +609,65 @@ export async function getDumpFileRoute(
         }
 
         let content: string | ReadableStream
-        if (stored.useR2) {
-            const r2Object = await env.DATABASE_DUMPS.get(stored.progress.r2Key)
-            if (!r2Object) {
-                return createResponse(
-                    undefined,
-                    'Dump file not found in R2',
-                    404
-                )
-            }
-            content = r2Object.body
-        } else {
-            content =
-                ((await dataSource.rpc.storage.get(
+        let headers = new Headers({
+            'Content-Type': 'application/sql',
+            'Content-Disposition': `attachment; filename="database_dump_${dumpId}.sql"`,
+        })
+
+        try {
+            if (stored.useR2) {
+                const r2Object = await env.DATABASE_DUMPS.get(
                     stored.progress.r2Key
-                )) as string) || ''
-            if (!content) {
-                return createResponse(
-                    undefined,
-                    'Dump file not found in storage',
-                    404
                 )
+                if (!r2Object) {
+                    return createResponse(
+                        undefined,
+                        'Dump file not found in R2',
+                        404
+                    )
+                }
+                content = r2Object.body
+            } else {
+                content =
+                    ((await dataSource.rpc.storage.get(
+                        stored.progress.r2Key
+                    )) as string) || ''
+                if (!content) {
+                    return createResponse(
+                        undefined,
+                        'Dump file not found in storage',
+                        404
+                    )
+                }
             }
-        }
 
-        const headers = new Headers({
-            'Content-Type': 'application/x-sqlite3',
-            'Content-Disposition': `attachment; filename="database_dump_${dumpId}.sql"`,
-        })
+            // Create response before cleanup
+            const response = new Response(content, { headers })
 
-        // Delete the progress after successful download
-        await dataSource.rpc.storage.delete(progressKey)
+            // Clean up after successful retrieval
+            try {
+                // Delete progress data
+                await dataSource.rpc.storage.delete(progressKey)
+
+                // Delete the dump file if using DO storage
+                if (!stored.useR2) {
+                    await dataSource.rpc.storage.delete(stored.progress.r2Key)
+                }
 
-        return new Response(content, { headers })
+                // Delete from R2 if using R2 storage
+                if (stored.useR2 && env?.DATABASE_DUMPS) {
+                    await env.DATABASE_DUMPS.delete(stored.progress.r2Key)
+                }
+            } catch (cleanupError) {
+                console.error('Error during cleanup:', cleanupError)
+                // Continue with response even if cleanup fails
+            }
+
+            return response
+        } catch (error) {
+            console.error('Error retrieving dump file:', error)
+            return createResponse(undefined, 'Error retrieving dump file', 500)
+        }
     } catch (error: any) {
         console.error('Error getting dump file:', error)
         console.error('Error stack:', error.stack)