Skip to content

Commit 542db44

Browse files
Make pgroll pull and pgroll migrate command able to handle incompatible migration file formats (#812)
Ensure that the `pgroll pull` and `pgroll migrate` commands are able to operate safely when either: * The local migration directory contains migrations that can not be deserialized by the current version of `pgroll`. * The migration history in the target database contains migrations that can not be deserialized by the current version of `pgroll`. This ensures that these commands work in the presence of older migration formats, either locally or in the remote schema history. This is done by adding a new `RawMigration` type to the `migrations` package, which does not deserialize the `Operations` in the migration but leaves them as a `json.RawMessage`. The following methods in the `roll` and `state` packages are then updated to work with and return `RawMigrations`: * `SchemaHistory` - used to return the schema history from the target database. * `MissingMigrations` - used to find remote migrations that don't exist in the local history * `UnappliedMigrations` - used to find local migrations that don't exist in the remote. By updating these methods and the `pull` and `migrate` commands that use them, these commands can work safely even with local or remote migrations that can't be deserialized by the current version of `pgroll`. Tests are added for both the `MissingMigrations` and `UnappliedMigrations` methods to ensure that they work with un-deserializable migrations. Part of #770
1 parent 94a5e7f commit 542db44

File tree

10 files changed

+246
-50
lines changed

10 files changed

+246
-50
lines changed

cmd/migrate.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
package cmd
44

55
import (
6+
"context"
67
"fmt"
78
"os"
89
"time"
910

1011
"github.com/spf13/cobra"
1112

1213
"github.com/xataio/pgroll/pkg/backfill"
14+
"github.com/xataio/pgroll/pkg/migrations"
15+
"github.com/xataio/pgroll/pkg/roll"
1316
)
1417

1518
func migrateCmd() *cobra.Command {
@@ -82,13 +85,13 @@ func migrateCmd() *cobra.Command {
8285
// Run all migrations after the latest version up to the final migration,
8386
// completing each one.
8487
for _, mig := range migs[:len(migs)-1] {
85-
if err := runMigration(ctx, m, mig, true, backfillConfig); err != nil {
88+
if err := runRawMigration(ctx, m, mig, true, backfillConfig); err != nil {
8689
return fmt.Errorf("failed to run migration file %q: %w", mig.Name, err)
8790
}
8891
}
8992

9093
// Run the final migration, completing it only if requested.
91-
return runMigration(ctx, m, migs[len(migs)-1], complete, backfillConfig)
94+
return runRawMigration(ctx, m, migs[len(migs)-1], complete, backfillConfig)
9295
},
9396
}
9497

@@ -98,3 +101,12 @@ func migrateCmd() *cobra.Command {
98101

99102
return migrateCmd
100103
}
104+
105+
func runRawMigration(ctx context.Context, m *roll.Roll, migration *migrations.RawMigration, complete bool, c *backfill.Config) error {
106+
parsedMigration, err := migrations.ParseMigration(migration)
107+
if err != nil {
108+
return fmt.Errorf("failed to parse migration %q: %w", migration.Name, err)
109+
}
110+
111+
return runMigration(ctx, m, parsedMigration, complete, c)
112+
}

cmd/pull.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func pullCmd() *cobra.Command {
8787
// WriteToFile writes the migration to a file in `targetDir`, prefixing the
8888
// filename with `prefix`. The output format defaults to YAML, but can
8989
// be changed to JSON by setting `useJSON` to true.
90-
func writeMigrationToFile(m *migrations.Migration, targetDir, prefix string, useJSON bool) error {
90+
func writeMigrationToFile(m *migrations.RawMigration, targetDir, prefix string, useJSON bool) error {
9191
err := os.MkdirAll(targetDir, 0o755)
9292
if err != nil {
9393
return err

pkg/migrations/migrations.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ import (
66
"context"
77
"encoding/json"
88
"fmt"
9-
"io"
109

1110
_ "github.com/lib/pq"
12-
"sigs.k8s.io/yaml"
1311

1412
"github.com/xataio/pgroll/pkg/db"
1513
"github.com/xataio/pgroll/pkg/schema"
@@ -54,10 +52,13 @@ type RequiresSchemaRefreshOperation interface {
5452
type (
5553
Operations []Operation
5654
Migration struct {
57-
Name string `json:"name,omitempty"`
58-
55+
Name string `json:"name,omitempty"`
5956
Operations Operations `json:"operations"`
6057
}
58+
RawMigration struct {
59+
Name string `json:"name"`
60+
Operations json.RawMessage `json:"operations"`
61+
}
6162
)
6263

6364
// Validate will check that the migration can be applied to the given schema
@@ -105,22 +106,3 @@ func (m *Migration) ContainsRawSQLOperation() bool {
105106
}
106107
return false
107108
}
108-
109-
// WriteAsJSON writes the migration to the given writer in JSON format
110-
func (m *Migration) WriteAsJSON(w io.Writer) error {
111-
encoder := json.NewEncoder(w)
112-
encoder.SetIndent("", " ")
113-
114-
return encoder.Encode(m)
115-
}
116-
117-
// WriteAsYAML writes the migration to the given writer in YAML format
118-
func (m *Migration) WriteAsYAML(w io.Writer) error {
119-
yml, err := yaml.Marshal(m)
120-
if err != nil {
121-
return err
122-
}
123-
124-
_, err = w.Write(yml)
125-
return err
126-
}

pkg/migrations/op_common.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ func CollectFilesFromDir(dir fs.FS) ([]string, error) {
7272
return migrationFiles, nil
7373
}
7474

75-
// ReadMigration opens the migration file and reads the migration.
76-
func ReadMigration(dir fs.FS, filename string) (*Migration, error) {
75+
// ReadMigration opens the migration file and reads the migration as a
76+
// RawMigration.
77+
func ReadRawMigration(dir fs.FS, filename string) (*RawMigration, error) {
7778
file, err := dir.Open(filename)
7879
if err != nil {
7980
return nil, fmt.Errorf("opening migration file: %w", err)
@@ -85,7 +86,7 @@ func ReadMigration(dir fs.FS, filename string) (*Migration, error) {
8586
return nil, err
8687
}
8788

88-
mig := Migration{}
89+
mig := RawMigration{}
8990
switch filepath.Ext(filename) {
9091
case ".json":
9192
dec := json.NewDecoder(bytes.NewReader(byteValue))
@@ -106,6 +107,29 @@ func ReadMigration(dir fs.FS, filename string) (*Migration, error) {
106107
return &mig, nil
107108
}
108109

110+
// ParseMigration converts a RawMigration to a fully parsed Migration
111+
func ParseMigration(raw *RawMigration) (*Migration, error) {
112+
var ops Operations
113+
if err := json.Unmarshal(raw.Operations, &ops); err != nil {
114+
return nil, fmt.Errorf("parsing operations: %w", err)
115+
}
116+
117+
return &Migration{
118+
Name: raw.Name,
119+
Operations: ops,
120+
}, nil
121+
}
122+
123+
// ReadMigration reads and parses a migration file
124+
func ReadMigration(dir fs.FS, filename string) (*Migration, error) {
125+
raw, err := ReadRawMigration(dir, filename)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
return ParseMigration(raw)
131+
}
132+
109133
// UnmarshalJSON deserializes the list of operations from a JSON array.
110134
func (v *Operations) UnmarshalJSON(data []byte) error {
111135
var tmp []map[string]json.RawMessage
@@ -279,3 +303,22 @@ func operationFromName(name OpName) (Operation, error) {
279303
}
280304
return nil, fmt.Errorf("unknown migration type: %v", name)
281305
}
306+
307+
// WriteAsJSON writes the migration to the given writer in JSON format
308+
func (m *RawMigration) WriteAsJSON(w io.Writer) error {
309+
encoder := json.NewEncoder(w)
310+
encoder.SetIndent("", " ")
311+
312+
return encoder.Encode(m)
313+
}
314+
315+
// WriteAsYAML writes the migration to the given writer in YAML format
316+
func (m *RawMigration) WriteAsYAML(w io.Writer) error {
317+
yml, err := yaml.Marshal(m)
318+
if err != nil {
319+
return err
320+
}
321+
322+
_, err = w.Write(yml)
323+
return err
324+
}

pkg/roll/missing.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// MissingMigrations returns the slice of migrations that have been applied to
1414
// the target database but are missing from the local migrations directory
1515
// `dir`.
16-
func (m *Roll) MissingMigrations(ctx context.Context, dir fs.FS) ([]*migrations.Migration, error) {
16+
func (m *Roll) MissingMigrations(ctx context.Context, dir fs.FS) ([]*migrations.RawMigration, error) {
1717
// Determine the latest version of the database
1818
latestVersion, err := m.State().LatestVersion(ctx, m.Schema())
1919
if err != nil {
@@ -34,7 +34,7 @@ func (m *Roll) MissingMigrations(ctx context.Context, dir fs.FS) ([]*migrations.
3434
// Create a set of local migration names for fast lookup
3535
localMigNames := make(map[string]struct{}, len(files))
3636
for _, file := range files {
37-
mig, err := migrations.ReadMigration(dir, file)
37+
mig, err := migrations.ReadRawMigration(dir, file)
3838
if err != nil {
3939
return nil, fmt.Errorf("reading migration file %s: %w", file, err)
4040
}
@@ -49,7 +49,7 @@ func (m *Roll) MissingMigrations(ctx context.Context, dir fs.FS) ([]*migrations.
4949

5050
// Find all migrations that have been applied to the database but are missing
5151
// from the local directory
52-
migs := make([]*migrations.Migration, 0, len(history))
52+
migs := make([]*migrations.RawMigration, 0, len(history))
5353
for _, h := range history {
5454
if _, ok := localMigNames[h.Migration.Name]; ok {
5555
continue

pkg/roll/missing_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,74 @@ func TestMissingMigrations(t *testing.T) {
195195
})
196196
}
197197

198+
func TestMissingMigrationsWithOldMigrationFormats(t *testing.T) {
199+
t.Parallel()
200+
201+
t.Run("local directory contains an un-deserializable migration", func(t *testing.T) {
202+
fs := fstest.MapFS{
203+
"01_migration_1.json": &fstest.MapFile{Data: unDeserializableMigration(t, "01_migration_1")},
204+
}
205+
206+
testutils.WithMigratorAndConnectionToContainer(t, func(roll *roll.Roll, _ *sql.DB) {
207+
ctx := context.Background()
208+
209+
// Apply migrations to the target database
210+
for _, migration := range []*migrations.Migration{
211+
exampleMig(t, "02_migration_2"),
212+
exampleMig(t, "03_migration_3"),
213+
} {
214+
err := roll.Start(ctx, migration, backfill.NewConfig())
215+
require.NoError(t, err)
216+
err = roll.Complete(ctx)
217+
require.NoError(t, err)
218+
}
219+
220+
// Get missing migrations
221+
migs, err := roll.MissingMigrations(ctx, fs)
222+
require.NoError(t, err)
223+
224+
// Assert that migrations 2 and 3 are missing in the local directory
225+
require.Len(t, migs, 2)
226+
require.Equal(t, "02_migration_2", migs[0].Name)
227+
require.Equal(t, "03_migration_3", migs[1].Name)
228+
})
229+
})
230+
231+
t.Run("remote migration history contains an un-deserializable migration", func(t *testing.T) {
232+
fs := fstest.MapFS{}
233+
234+
testutils.WithMigratorAndConnectionToContainer(t, func(roll *roll.Roll, db *sql.DB) {
235+
ctx := context.Background()
236+
237+
// Apply migrations to the target database
238+
for _, migration := range []*migrations.Migration{
239+
exampleMig(t, "01_migration_1"),
240+
} {
241+
err := roll.Start(ctx, migration, backfill.NewConfig())
242+
require.NoError(t, err)
243+
err = roll.Complete(ctx)
244+
require.NoError(t, err)
245+
}
246+
247+
// Modify the first migration in the schema history to be un-deserializable; in
248+
// practice this could happen if the migration was applied with an older
249+
// version of pgroll that had a different migration format
250+
_, err := db.ExecContext(ctx, `UPDATE pgroll.migrations
251+
SET migration = REPLACE(migration::text, '"up"', '"upxxx"')::jsonb
252+
WHERE name = '01_migration_1'`)
253+
require.NoError(t, err)
254+
255+
// Get missing migrations
256+
migs, err := roll.MissingMigrations(ctx, fs)
257+
require.NoError(t, err)
258+
259+
// Assert that the un-deserializable migration is missing in the local directory
260+
require.Len(t, migs, 1)
261+
require.Equal(t, "01_migration_1", migs[0].Name)
262+
})
263+
})
264+
}
265+
198266
func exampleMig(t *testing.T, name string) *migrations.Migration {
199267
t.Helper()
200268

pkg/roll/unapplied.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
//
1717
// If the local order of migrations does not match the order of migrations in
1818
// the schema history, an `ErrMismatchedMigration` error is returned.
19-
func (m *Roll) UnappliedMigrations(ctx context.Context, dir fs.FS) ([]*migrations.Migration, error) {
19+
func (m *Roll) UnappliedMigrations(ctx context.Context, dir fs.FS) ([]*migrations.RawMigration, error) {
2020
latestVersion, err := m.State().LatestVersion(ctx, m.Schema())
2121
if err != nil {
2222
return nil, fmt.Errorf("determining latest version: %w", err)
@@ -36,7 +36,7 @@ func (m *Roll) UnappliedMigrations(ctx context.Context, dir fs.FS) ([]*migration
3636
var idx int
3737
if latestVersion != nil {
3838
for _, file := range files {
39-
migration, err := migrations.ReadMigration(dir, file)
39+
migration, err := migrations.ReadRawMigration(dir, file)
4040
if err != nil {
4141
return nil, fmt.Errorf("reading migration file %q: %w", file, err)
4242
}
@@ -54,9 +54,9 @@ func (m *Roll) UnappliedMigrations(ctx context.Context, dir fs.FS) ([]*migration
5454
}
5555

5656
// Return all unapplied migrations
57-
migs := make([]*migrations.Migration, 0, len(files))
57+
migs := make([]*migrations.RawMigration, 0, len(files))
5858
for _, file := range files[idx:] {
59-
migration, err := migrations.ReadMigration(dir, file)
59+
migration, err := migrations.ReadRawMigration(dir, file)
6060
if err != nil {
6161
return nil, fmt.Errorf("reading migration file %q: %w", file, err)
6262
}

0 commit comments

Comments
 (0)