arche / internal/store/migrate/migrate.go

commit 154431fd
  1package migrate
  2
  3import (
  4	"database/sql"
  5	_ "embed"
  6	"fmt"
  7	"time"
  8)
  9
 10//go:embed sql/001_initial.sql
 11var sql001 string
 12
 13//go:embed sql/002_conflicts.sql
 14var sql002 string
 15
 16//go:embed sql/003_wcache_mode.sql
 17var sql003 string
 18
 19//go:embed sql/004_pack_delta.sql
 20var sql004 string
 21
 22//go:embed sql/005_zstd_dict.sql
 23var sql005 string
 24
 25//go:embed sql/006_file_locks.sql
 26var sql006 string
 27
 28//go:embed sql/007_wcache_dirty.sql
 29var sql007 string
 30
 31type migration struct {
 32	version int
 33	sql     string
 34}
 35
 36var all = []migration{
 37	{1, sql001},
 38	{2, sql002},
 39	{3, sql003},
 40	{4, sql004},
 41	{5, sql005},
 42	{6, sql006},
 43	{7, sql007},
 44}
 45
 46func Run(db *sql.DB) error {
 47	if err := ensureMigrationsTable(db); err != nil {
 48		return err
 49	}
 50
 51	applied, err := appliedVersions(db)
 52	if err != nil {
 53		return err
 54	}
 55
 56	for _, m := range all {
 57		if applied[m.version] {
 58			continue
 59		}
 60		if err := applyMigration(db, m); err != nil {
 61			return fmt.Errorf("migrate v%d: %w", m.version, err)
 62		}
 63	}
 64	return nil
 65}
 66
 67func ensureMigrationsTable(db *sql.DB) error {
 68	_, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_migrations (
 69		version    INTEGER PRIMARY KEY,
 70		applied_at INTEGER NOT NULL
 71	)`)
 72	return err
 73}
 74
 75func appliedVersions(db *sql.DB) (map[int]bool, error) {
 76	rows, err := db.Query("SELECT version FROM schema_migrations")
 77	if err != nil {
 78		return nil, err
 79	}
 80	defer rows.Close()
 81
 82	m := make(map[int]bool)
 83	for rows.Next() {
 84		var v int
 85		if err := rows.Scan(&v); err != nil {
 86			return nil, err
 87		}
 88		m[v] = true
 89	}
 90	return m, rows.Err()
 91}
 92
 93func applyMigration(db *sql.DB, m migration) error {
 94	tx, err := db.Begin()
 95	if err != nil {
 96		return err
 97	}
 98
 99	if _, err := tx.Exec(m.sql); err != nil {
100		tx.Rollback()
101		return fmt.Errorf("exec: %w", err)
102	}
103
104	if _, err := tx.Exec(
105		"INSERT INTO schema_migrations (version, applied_at) VALUES (?, ?)",
106		m.version, time.Now().Unix(),
107	); err != nil {
108		tx.Rollback()
109		return fmt.Errorf("record: %w", err)
110	}
111
112	return tx.Commit()
113}