| commit |
20e34e662513ebcce563ddce10ad674908be2ec595114b9a307284aae6bc9f26
|
|---|---|
| change |
aypvsqgd
|
| author | dewn <dewn5228@proton.me> |
| committer | dewn <dewn5228@proton.me> |
| date | 2026-03-12 15:35:38 |
| phase | public |
| parents |
8d9a5ff1
|
| signature | Unsigned |
watch: store watcher PID in store.db instead of pid file, clear transactionally on shutdown
--- a/internal/cli/cmd_watch.go +++ b/internal/cli/cmd_watch.go @@ -1,77 +1,77 @@ package cli import ( "context" "fmt" "os" "os/signal" "syscall" "arche/internal/repo" "arche/internal/watcher" "arche/internal/wc" "github.com/spf13/cobra" ) var watchCmd = &cobra.Command{ Use: "watch", Short: "Watch working tree for changes to accelerate snap and status", Long: `arche watch starts a filesystem event watcher that tracks which files have changed since the last snapshot. When the watcher is active, arche snap and arche status skip re-statting and re-hashing unmodified files, measurably improving performance on large working trees. The watcher seeds the dirty set with any files already modified before it starts, so the first arche snap after launch is still fully correct. Only one watcher per repository. Stop the watcher with Ctrl-C or SIGTERM.`, RunE: runWatch, } func runWatch(cmd *cobra.Command, args []string) error { wd, err := os.Getwd() if err != nil { return err } root, err := repo.FindRoot(wd) if err != nil || root == "" { return fmt.Errorf("not inside an arche repository") } r, err := repo.Open(root) if err != nil { return err } arch := r.ArcheDir() workRoot := r.Root if watcher.IsActive(arch +, r.Store ) { r.Close() return fmt.Errorf("watcher is already running for this repository") } w := wc.New(r) statuses, err := w.Status() if err != nil { r.Close() return fmt.Errorf("status: %w", err) } for _, s := range statuses { if markErr := r.Store.MarkWCacheDirty(s.Path); markErr != nil { fmt.Fprintf(os.Stderr, "arche watch: seed dirty %q: %v\n", s.Path, markErr) } } fmt.Fprintf(os.Stderr, "arche watch: watching %s (%d modified files at startup)\n", workRoot, len(statuses)) fmt.Fprintln(os.Stderr, "arche watch: tracking file events — stop with Ctrl-C") ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() defer r.Close() if err := watcher.Run(ctx, workRoot, arch, r.Store); err != nil { return err } fmt.Fprintln(os.Stderr, "arche watch: stopped") return nil }
--- a/internal/store/migrate/migrate.go +++ b/internal/store/migrate/migrate.go @@ -1,117 +1,121 @@ package migrate import ( "database/sql" _ "embed" "fmt" "time" ) //go:embed sql/001_initial.sql var sql001 string //go:embed sql/002_conflicts.sql var sql002 string //go:embed sql/003_wcache_mode.sql var sql003 string //go:embed sql/004_pack_delta.sql var sql004 string //go:embed sql/005_zstd_dict.sql var sql005 string //go:embed sql/006_file_locks.sql var sql006 string //go:embed sql/007_wcache_dirty.sql var sql007 string //go:embed sql/008_shelves.sql var sql008 string +//go:embed sql/009_watcher_pid.sql +var sql009 string + type migration struct { version int sql string } var all = []migration{ {1, sql001}, {2, sql002}, {3, sql003}, {4, sql004}, {5, sql005}, {6, sql006}, {7, sql007}, {8, sql008}, + {9, sql009}, } func Run(db *sql.DB) error { if err := ensureMigrationsTable(db); err != nil { return err } applied, err := appliedVersions(db) if err != nil { return err } for _, m := range all { if applied[m.version] { continue } if err := applyMigration(db, m); err != nil { return fmt.Errorf("migrate v%d: %w", m.version, err) } } return nil } func ensureMigrationsTable(db *sql.DB) error { _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, applied_at INTEGER NOT NULL )`) return err } func appliedVersions(db *sql.DB) (map[int]bool, error) { rows, err := db.Query("SELECT version FROM schema_migrations") if err != nil { return nil, err } defer rows.Close() m := make(map[int]bool) for rows.Next() { var v int if err := rows.Scan(&v); err != nil { return nil, err } m[v] = true } return m, rows.Err() } func applyMigration(db *sql.DB, m migration) error { tx, err := db.Begin() if err != nil { return err } if _, err := tx.Exec(m.sql); err != nil { tx.Rollback() return fmt.Errorf("exec: %w", err) } if _, err := tx.Exec( "INSERT INTO schema_migrations (version, applied_at) VALUES (?, ?)", m.version, time.Now().Unix(), ); err != nil { tx.Rollback() return fmt.Errorf("record: %w", err) } return tx.Commit() }
--- /dev/null +++ b/internal/store/migrate/sql/009_watcher_pid.sql @@ -1,0 +1,3 @@ +CREATE TABLE IF NOT EXISTS watcher_pid ( + pid INTEGER NOT NULL +);
--- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -1,812 +1,835 @@ package store import ( "database/sql" "encoding/hex" "errors" "fmt" "strings" "time" "arche/internal/object" "arche/internal/store/migrate" "github.com/klauspost/compress/zstd" _ "github.com/mattn/go-sqlite3" ) const defaultPackThreshold = 128 * 1024 type SQLiteStore struct { db *sql.DB pack *packManager codec codec packDir string packThreshold int compressionName string } func OpenSQLiteStore(dbPath, packDir string, packThreshold, packSealSize int, compression string) (*SQLiteStore, error) { if packThreshold <= 0 { packThreshold = defaultPackThreshold } db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("store open %s: %w", dbPath, err) } db.SetMaxOpenConns(1) if _, err := db.Exec("PRAGMA journal_mode = WAL; PRAGMA foreign_keys = ON;"); err != nil { db.Close() return nil, fmt.Errorf("store pragma: %w", err) } if err := migrate.Run(db); err != nil { db.Close() return nil, fmt.Errorf("store migrate: %w", err) } pm, err := newPackManager(packDir, packSealSize) if err != nil { db.Close() return nil, err } var dictData []byte _ = db.QueryRow("SELECT dict FROM zstd_dicts ORDER BY id DESC LIMIT 1").Scan(&dictData) cd, err := newCodec(compression, dictData) if err != nil { db.Close() pm.close() return nil, err } return &SQLiteStore{ db: db, pack: pm, codec: cd, packDir: packDir, packThreshold: packThreshold, compressionName: compression, }, nil } func (s *SQLiteStore) Begin() (*Tx, error) { sqlTx, err := s.db.Begin() if err != nil { return nil, fmt.Errorf("begin tx: %w", err) } return &Tx{sqlTx: sqlTx}, nil } func (s *SQLiteStore) Commit(tx *Tx) error { return tx.sqlTx.Commit() } func (s *SQLiteStore) Rollback(tx *Tx) error { return tx.sqlTx.Rollback() } func (s *SQLiteStore) Close() error { s.pack.close() s.codec.Close() return s.db.Close() } func (s *SQLiteStore) AddConflict(tx *Tx, path string) error { _, err := tx.sqlTx.Exec("INSERT OR IGNORE INTO conflicts (path) VALUES (?)", path) return err } func (s *SQLiteStore) ClearConflict(tx *Tx, path string) error { _, err := tx.sqlTx.Exec("DELETE FROM conflicts WHERE path = ?", path) return err } func (s *SQLiteStore) ClearAllConflicts(tx *Tx) error { _, err := tx.sqlTx.Exec("DELETE FROM conflicts") return err } func (s *SQLiteStore) ListConflicts() ([]string, error) { rows, err := s.db.Query("SELECT path FROM conflicts ORDER BY path") if err != nil { return nil, err } defer rows.Close() var out []string for rows.Next() { var p string if err := rows.Scan(&p); err != nil { return nil, err } out = append(out, p) } return out, rows.Err() } func (s *SQLiteStore) HasObject(id [32]byte) (bool, error) { var count int err := s.db.QueryRow("SELECT COUNT(*) FROM objects WHERE id = ?", id[:]).Scan(&count) if err != nil { return false, err } if count > 0 { return true, nil } err = s.db.QueryRow("SELECT COUNT(*) FROM pack_index WHERE blob_id = ?", id[:]).Scan(&count) if err != nil { return false, err } return count > 0, nil } func (s *SQLiteStore) ReadObject(id [32]byte) (kind string, raw []byte, err error) { var compressed []byte rowErr := s.db.QueryRow("SELECT kind, data FROM objects WHERE id = ?", id[:]).Scan(&kind, &compressed) if rowErr == nil { raw, err = s.codec.Decompress(compressed) if err != nil { return "", nil, fmt.Errorf("decompress object %s: %w", hex.EncodeToString(id[:])[:12], err) } return kind, raw, nil } if !errors.Is(rowErr, sql.ErrNoRows) { return "", nil, rowErr } var packFile string var offset, rawSize int64 var deltaBaseIDRaw []byte var deltaDepth int rowErr = s.db.QueryRow( "SELECT pack_file, offset, raw_size, delta_base_id, delta_depth FROM pack_index WHERE blob_id = ?", id[:], ).Scan(&packFile, &offset, &rawSize, &deltaBaseIDRaw, &deltaDepth) if errors.Is(rowErr, sql.ErrNoRows) { return "", nil, fmt.Errorf("object %s not found", hex.EncodeToString(id[:])[:12]) } if rowErr != nil { return "", nil, rowErr } compressed, err = s.pack.read(packFile, offset) if err != nil { return "", nil, err } if len(deltaBaseIDRaw) > 0 { if deltaDepth > deltaMaxDepth { return "", nil, fmt.Errorf("pack object %s: delta chain depth %d exceeds limit %d", hex.EncodeToString(id[:])[:12], deltaDepth, deltaMaxDepth) } deltaBytes, decErr := s.codec.Decompress(compressed) if decErr != nil { return "", nil, fmt.Errorf("decompress delta %s: %w", hex.EncodeToString(id[:])[:12], decErr) } var baseID [32]byte copy(baseID[:], deltaBaseIDRaw) _, baseRaw, baseErr := s.ReadObject(baseID) if baseErr != nil { return "", nil, fmt.Errorf("read delta base for %s: %w", hex.EncodeToString(id[:])[:12], baseErr) } raw, err = ApplyDelta(baseRaw, deltaBytes) if err != nil { return "", nil, fmt.Errorf("apply delta %s: %w", hex.EncodeToString(id[:])[:12], err) } return string(object.KindBlob), raw, nil } raw, err = s.codec.Decompress(compressed) if err != nil { return "", nil, fmt.Errorf("decompress pack object %s: %w", hex.EncodeToString(id[:])[:12], err) } return string(object.KindBlob), raw, nil } func (s *SQLiteStore) WriteObject(tx *Tx, id [32]byte, kind string, raw []byte) error { compressed := s.codec.Compress(raw) if len(raw) > s.packThreshold && kind == string(object.KindBlob) { entry, err := s.pack.write(compressed, int64(len(raw))) if err != nil { return err } _, err = tx.sqlTx.Exec( "INSERT OR IGNORE INTO pack_index (blob_id, pack_file, offset, raw_size) VALUES (?, ?, ?, ?)", id[:], entry.packFile, entry.offset, entry.rawSize, ) return err } _, err := tx.sqlTx.Exec( "INSERT OR IGNORE INTO objects (id, kind, data) VALUES (?, ?, ?)", id[:], kind, compressed, ) return err } func (s *SQLiteStore) ListObjectsByKind(kind string) ([][32]byte, error) { rows, err := s.db.Query("SELECT id FROM objects WHERE kind = ?", kind) if err != nil { return nil, err } defer rows.Close() var ids [][32]byte for rows.Next() { var raw []byte if err := rows.Scan(&raw); err != nil { return nil, err } var id [32]byte copy(id[:], raw) ids = append(ids, id) } return ids, rows.Err() } func (s *SQLiteStore) GetBookmark(name string) (*Bookmark, error) { var cid []byte var remote sql.NullString err := s.db.QueryRow("SELECT commit_id, remote FROM bookmarks WHERE name = ?", name).Scan(&cid, &remote) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } b := &Bookmark{Name: name, Remote: remote.String} copy(b.CommitID[:], cid) return b, nil } func (s *SQLiteStore) SetBookmark(tx *Tx, b Bookmark) error { var remote interface{} if b.Remote != "" { remote = b.Remote } _, err := tx.sqlTx.Exec( "INSERT OR REPLACE INTO bookmarks (name, commit_id, remote) VALUES (?, ?, ?)", b.Name, b.CommitID[:], remote, ) return err } func (s *SQLiteStore) DeleteBookmark(tx *Tx, name string) error { _, err := tx.sqlTx.Exec("DELETE FROM bookmarks WHERE name = ?", name) return err } func (s *SQLiteStore) ListBookmarks() ([]Bookmark, error) { rows, err := s.db.Query("SELECT name, commit_id, remote FROM bookmarks ORDER BY name") if err != nil { return nil, err } defer rows.Close() var out []Bookmark for rows.Next() { var b Bookmark var cid []byte var remote sql.NullString if err := rows.Scan(&b.Name, &cid, &remote); err != nil { return nil, err } copy(b.CommitID[:], cid) b.Remote = remote.String out = append(out, b) } return out, rows.Err() } func (s *SQLiteStore) GetPhase(commitID [32]byte) (object.Phase, error) { var phase int err := s.db.QueryRow("SELECT phase FROM phases WHERE commit_id = ?", commitID[:]).Scan(&phase) if errors.Is(err, sql.ErrNoRows) { return object.PhaseDraft, nil } if err != nil { return 0, err } return object.Phase(phase), nil } func (s *SQLiteStore) SetPhase(tx *Tx, commitID [32]byte, phase object.Phase) error { _, err := tx.sqlTx.Exec( "INSERT OR REPLACE INTO phases (commit_id, phase) VALUES (?, ?)", commitID[:], int(phase), ) return err } func (s *SQLiteStore) ListPublicCommitIDs() ([][32]byte, error) { rows, err := s.db.Query("SELECT commit_id FROM phases WHERE phase = ?", int(object.PhasePublic)) if err != nil { return nil, err } defer rows.Close() var out [][32]byte for rows.Next() { var raw []byte if err := rows.Scan(&raw); err != nil { return nil, err } var id [32]byte copy(id[:], raw) out = append(out, id) } return out, rows.Err() } func (s *SQLiteStore) ListSecretCommitIDs() ([][32]byte, error) { rows, err := s.db.Query("SELECT commit_id FROM phases WHERE phase = ?", int(object.PhaseSecret)) if err != nil { return nil, err } defer rows.Close() var out [][32]byte for rows.Next() { var raw []byte if err := rows.Scan(&raw); err != nil { return nil, err } var id [32]byte copy(id[:], raw) out = append(out, id) } return out, rows.Err() } func (s *SQLiteStore) AllocChangeID(tx *Tx) (string, error) { for length := 8; length <= 32; length += 2 { id := object.NewChangeID(length) _, err := tx.sqlTx.Exec("INSERT INTO changes (change_id, commit_id) VALUES (?, NULL)", id) if err == nil { return id, nil } if !isSQLiteConstraintError(err) { return "", fmt.Errorf("alloc change ID: %w", err) } } return "", errors.New("change ID allocation failed after max retries") } func (s *SQLiteStore) GetChangeCommit(changeID string) ([32]byte, error) { var rows *sql.Rows var err error if len(changeID) < 8 { rows, err = s.db.Query( "SELECT change_id, commit_id FROM changes WHERE change_id LIKE ? AND commit_id IS NOT NULL", changeID+"%", ) } else { rows, err = s.db.Query( "SELECT change_id, commit_id FROM changes WHERE change_id = ? AND commit_id IS NOT NULL", changeID, ) } if err != nil { return object.ZeroID, err } defer rows.Close() var found [32]byte var count int for rows.Next() { var cid []byte var chid string if err := rows.Scan(&chid, &cid); err != nil { return object.ZeroID, err } copy(found[:], cid) count++ } if err := rows.Err(); err != nil { return object.ZeroID, err } if count == 0 { return object.ZeroID, sql.ErrNoRows } if count > 1 { return object.ZeroID, fmt.Errorf("ambiguous change ID prefix %q matches %d changes", changeID, count) } return found, nil } func (s *SQLiteStore) SetChangeCommit(tx *Tx, changeID string, commitID [32]byte) error { _, err := tx.sqlTx.Exec( "UPDATE changes SET commit_id = ? WHERE change_id = ?", commitID[:], changeID, ) return err } func (s *SQLiteStore) ListChanges() ([]Bookmark, error) { rows, err := s.db.Query("SELECT change_id, commit_id FROM changes WHERE commit_id IS NOT NULL") if err != nil { return nil, err } defer rows.Close() var out []Bookmark for rows.Next() { var name string var commitID []byte if err := rows.Scan(&name, &commitID); err != nil { return nil, err } var id [32]byte copy(id[:], commitID) out = append(out, Bookmark{Name: name, CommitID: id}) } return out, rows.Err() } func (s *SQLiteStore) GetWCacheEntry(path string) (*WCacheEntry, error) { var e WCacheEntry var blobID []byte var dirty int err := s.db.QueryRow( "SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache WHERE path = ?", path, ).Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } copy(e.BlobID[:], blobID) e.Dirty = dirty != 0 return &e, nil } func (s *SQLiteStore) SetWCacheEntry(tx *Tx, e WCacheEntry) error { dirty := 0 if e.Dirty { dirty = 1 } _, err := tx.sqlTx.Exec( "INSERT OR REPLACE INTO wcache (path, inode, mtime_ns, size, blob_id, mode, dirty) VALUES (?, ?, ?, ?, ?, ?, ?)", e.Path, e.Inode, e.MtimeNs, e.Size, e.BlobID[:], e.Mode, dirty, ) return err } func (s *SQLiteStore) DeleteWCacheEntry(tx *Tx, path string) error { _, err := tx.sqlTx.Exec("DELETE FROM wcache WHERE path = ?", path) return err } func (s *SQLiteStore) ListWCacheEntries() ([]WCacheEntry, error) { rows, err := s.db.Query("SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache ORDER BY path") if err != nil { return nil, err } defer rows.Close() var out []WCacheEntry for rows.Next() { var e WCacheEntry var blobID []byte var dirty int if err := rows.Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty); err != nil { return nil, err } copy(e.BlobID[:], blobID) e.Dirty = dirty != 0 out = append(out, e) } return out, rows.Err() } func (s *SQLiteStore) ClearWCache(tx *Tx) error { _, err := tx.sqlTx.Exec("DELETE FROM wcache") return err } func (s *SQLiteStore) MarkWCacheDirty(path string) error { _, err := s.db.Exec( `INSERT INTO wcache (path, inode, mtime_ns, size, blob_id, mode, dirty) VALUES (?, 0, 0, 0, zeroblob(32), 0, 1) ON CONFLICT(path) DO UPDATE SET dirty = 1`, path, ) return err } func (s *SQLiteStore) ListDirtyWCacheEntries() ([]WCacheEntry, error) { rows, err := s.db.Query("SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache WHERE dirty = 1 ORDER BY path") if err != nil { return nil, err } defer rows.Close() var out []WCacheEntry for rows.Next() { var e WCacheEntry var blobID []byte var dirty int if err := rows.Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty); err != nil { return nil, err } copy(e.BlobID[:], blobID) e.Dirty = dirty != 0 out = append(out, e) } return out, rows.Err() } func (s *SQLiteStore) ClearWCacheDirtyFlags(tx *Tx) error { _, err := tx.sqlTx.Exec("UPDATE wcache SET dirty = 0 WHERE dirty = 1") return err } func (s *SQLiteStore) InsertOperation(tx *Tx, op Operation) (int64, error) { if op.Timestamp == 0 { op.Timestamp = time.Now().Unix() } result, err := tx.sqlTx.Exec( "INSERT INTO operations (kind, timestamp, before, after, metadata) VALUES (?, ?, ?, ?, ?)", op.Kind, op.Timestamp, op.Before, op.After, nullableString(op.Metadata), ) if err != nil { return 0, err } return result.LastInsertId() } func (s *SQLiteStore) ListOperations(n int) ([]Operation, error) { query := "SELECT seq, kind, timestamp, before, after, metadata FROM operations ORDER BY seq DESC" if n > 0 { query += fmt.Sprintf(" LIMIT %d", n) } rows, err := s.db.Query(query) if err != nil { return nil, err } defer rows.Close() var out []Operation for rows.Next() { var op Operation var meta sql.NullString if err := rows.Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta); err != nil { return nil, err } op.Metadata = meta.String out = append(out, op) } return out, rows.Err() } func (s *SQLiteStore) GetOperation(seq int64) (*Operation, error) { var op Operation var meta sql.NullString err := s.db.QueryRow( "SELECT seq, kind, timestamp, before, after, metadata FROM operations WHERE seq = ?", seq, ).Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } op.Metadata = meta.String return &op, nil } func (s *SQLiteStore) GetLastOperation() (*Operation, error) { var op Operation var meta sql.NullString err := s.db.QueryRow( "SELECT seq, kind, timestamp, before, after, metadata FROM operations ORDER BY seq DESC LIMIT 1", ).Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } op.Metadata = meta.String return &op, nil } func isSQLiteConstraintError(err error) bool { if err == nil { return false } return strings.Contains(err.Error(), "UNIQUE constraint failed") || strings.Contains(err.Error(), "constraint failed") } func nullableString(s string) interface{} { if s == "" { return nil } return s } func (s *SQLiteStore) AcquireLock(tx *Tx, path, owner, comment string) error { var existingOwner string err := tx.sqlTx.QueryRow("SELECT owner FROM file_locks WHERE path = ?", path).Scan(&existingOwner) if err == nil && existingOwner != owner { return fmt.Errorf("file %q is locked by %q", path, existingOwner) } _, err = tx.sqlTx.Exec(` INSERT INTO file_locks (path, owner, acquired_at, comment) VALUES (?, ?, ?, ?) ON CONFLICT(path) DO UPDATE SET acquired_at = excluded.acquired_at, comment = excluded.comment`, path, owner, time.Now().Unix(), comment, ) return err } func (s *SQLiteStore) ReleaseLock(tx *Tx, path, owner string) error { res, err := tx.sqlTx.Exec("DELETE FROM file_locks WHERE path = ? AND owner = ?", path, owner) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return fmt.Errorf("lock on %q is not held by %q", path, owner) } return nil } func (s *SQLiteStore) ReleaseLockAdmin(tx *Tx, path string) error { _, err := tx.sqlTx.Exec("DELETE FROM file_locks WHERE path = ?", path) return err } func (s *SQLiteStore) GetLock(path string) (*FileLock, error) { var l FileLock var comment sql.NullString err := s.db.QueryRow( "SELECT path, owner, acquired_at, comment FROM file_locks WHERE path = ?", path, ).Scan(&l.Path, &l.Owner, &l.AcquiredAt, &comment) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } l.Comment = comment.String return &l, nil } func (s *SQLiteStore) ListLocks() ([]FileLock, error) { rows, err := s.db.Query( "SELECT path, owner, acquired_at, comment FROM file_locks ORDER BY acquired_at DESC", ) if err != nil { return nil, err } defer rows.Close() var out []FileLock for rows.Next() { var l FileLock var comment sql.NullString if err := rows.Scan(&l.Path, &l.Owner, &l.AcquiredAt, &comment); err != nil { return nil, err } l.Comment = comment.String out = append(out, l) } return out, rows.Err() } func (s *SQLiteStore) TrainAndSaveDict() error { rows, err := s.db.Query( "SELECT data FROM objects WHERE kind = 'blob' ORDER BY RANDOM() LIMIT 200", ) if err != nil { return fmt.Errorf("sample blobs: %w", err) } defer rows.Close() var samples [][]byte for rows.Next() { var compressed []byte if scanErr := rows.Scan(&compressed); scanErr != nil { return scanErr } raw, decErr := s.codec.Decompress(compressed) if decErr == nil && len(raw) > 0 { samples = append(samples, raw) } } if err := rows.Err(); err != nil { return err } if len(samples) < 5 { return fmt.Errorf("not enough blobs to train dictionary (need at least 5, found %d)", len(samples)) } dict, err := zstd.BuildDict(zstd.BuildDictOptions{ Contents: samples, }) if err != nil { return fmt.Errorf("build dict: %w", err) } if _, err := s.db.Exec( "INSERT INTO zstd_dicts (created_at, dict) VALUES (?, ?)", time.Now().Unix(), dict, ); err != nil { return fmt.Errorf("save dict: %w", err) } s.codec.Close() newCD, err := newCodec(s.compressionName, dict) if err != nil { return fmt.Errorf("reload codec: %w", err) } s.codec = newCD return nil } func (s *SQLiteStore) CreateShelf(tx *Tx, sh Shelf) error { _, err := tx.sqlTx.Exec(` INSERT INTO shelves (name, tree_id, base_commit_id, created_at, description) VALUES (?, ?, ?, ?, ?)`, sh.Name, sh.TreeID[:], sh.BaseCommitID[:], sh.CreatedAt, sh.Description, ) return err } func (s *SQLiteStore) GetShelf(name string) (*Shelf, error) { var sh Shelf var treeID, baseID []byte err := s.db.QueryRow( "SELECT name, tree_id, base_commit_id, created_at, description FROM shelves WHERE name = ?", name, ).Scan(&sh.Name, &treeID, &baseID, &sh.CreatedAt, &sh.Description) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } copy(sh.TreeID[:], treeID) copy(sh.BaseCommitID[:], baseID) return &sh, nil } func (s *SQLiteStore) ListShelves() ([]Shelf, error) { rows, err := s.db.Query( "SELECT name, tree_id, base_commit_id, created_at, description FROM shelves ORDER BY created_at DESC", ) if err != nil { return nil, err } defer rows.Close() var out []Shelf for rows.Next() { var sh Shelf var treeID, baseID []byte if err := rows.Scan(&sh.Name, &treeID, &baseID, &sh.CreatedAt, &sh.Description); err != nil { return nil, err } copy(sh.TreeID[:], treeID) copy(sh.BaseCommitID[:], baseID) out = append(out, sh) } return out, rows.Err() } func (s *SQLiteStore) DropShelf(tx *Tx, name string) error { res, err := tx.sqlTx.Exec("DELETE FROM shelves WHERE name = ?", name) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return fmt.Errorf("shelf %q not found", name) } return nil } + +func (s *SQLiteStore) GetWatcherPID() (int, error) { + var pid int + err := s.db.QueryRow("SELECT pid FROM watcher_pid LIMIT 1").Scan(&pid) + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return pid, err +} + +func (s *SQLiteStore) SetWatcherPID(pid int) error { + _, err := s.db.Exec("DELETE FROM watcher_pid") + if err != nil { + return err + } + _, err = s.db.Exec("INSERT INTO watcher_pid (pid) VALUES (?)", pid) + return err +} + +func (s *SQLiteStore) ClearWatcherPID() error { + _, err := s.db.Exec("DELETE FROM watcher_pid") + return err +}
--- a/internal/store/store.go +++ b/internal/store/store.go @@ -1,118 +1,122 @@ package store import ( "database/sql" "arche/internal/object" ) type Bookmark struct { Name string CommitID [32]byte Remote string } type WCacheEntry struct { Path string Inode uint64 MtimeNs int64 Size int64 BlobID [32]byte Mode uint8 // object.EntryMode value (0=file,1=exec,2=symlink,3=dir) Dirty bool // set by watcher; cleared by snap/status after processing } type Operation struct { Seq int64 Kind string Timestamp int64 Before string After string Metadata string } type Tx struct { sqlTx *sql.Tx } func (t *Tx) SQLTx() *sql.Tx { return t.sqlTx } type Store interface { HasObject(id [32]byte) (bool, error) ReadObject(id [32]byte) (kind string, raw []byte, err error) WriteObject(tx *Tx, id [32]byte, kind string, raw []byte) error ListObjectsByKind(kind string) ([][32]byte, error) GetBookmark(name string) (*Bookmark, error) SetBookmark(tx *Tx, b Bookmark) error DeleteBookmark(tx *Tx, name string) error ListBookmarks() ([]Bookmark, error) GetPhase(commitID [32]byte) (object.Phase, error) SetPhase(tx *Tx, commitID [32]byte, phase object.Phase) error ListPublicCommitIDs() ([][32]byte, error) AllocChangeID(tx *Tx) (string, error) GetChangeCommit(changeID string) ([32]byte, error) SetChangeCommit(tx *Tx, changeID string, commitID [32]byte) error ListChanges() ([]Bookmark, error) GetWCacheEntry(path string) (*WCacheEntry, error) SetWCacheEntry(tx *Tx, e WCacheEntry) error DeleteWCacheEntry(tx *Tx, path string) error ListWCacheEntries() ([]WCacheEntry, error) ClearWCache(tx *Tx) error MarkWCacheDirty(path string) error ListDirtyWCacheEntries() ([]WCacheEntry, error) ClearWCacheDirtyFlags(tx *Tx) error InsertOperation(tx *Tx, op Operation) (int64, error) ListOperations(n int) ([]Operation, error) GetOperation(seq int64) (*Operation, error) GetLastOperation() (*Operation, error) AddConflict(tx *Tx, path string) error ClearConflict(tx *Tx, path string) error ClearAllConflicts(tx *Tx) error ListConflicts() ([]string, error) ListSecretCommitIDs() ([][32]byte, error) Begin() (*Tx, error) Commit(tx *Tx) error Rollback(tx *Tx) error Close() error + + GetWatcherPID() (int, error) + SetWatcherPID(pid int) error + ClearWatcherPID() error } type FileLock struct { Path string Owner string AcquiredAt int64 Comment string } type LockStore interface { AcquireLock(tx *Tx, path, owner, comment string) error ReleaseLock(tx *Tx, path, owner string) error ReleaseLockAdmin(tx *Tx, path string) error GetLock(path string) (*FileLock, error) ListLocks() ([]FileLock, error) } type Shelf struct { Name string TreeID [32]byte BaseCommitID [32]byte CreatedAt int64 Description string } type ShelfStore interface { CreateShelf(tx *Tx, s Shelf) error GetShelf(name string) (*Shelf, error) ListShelves() ([]Shelf, error) DropShelf(tx *Tx, name string) error } type DictTrainer interface { TrainAndSaveDict() error }
--- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -1,95 +1,89 @@ package watcher import ( "context" "fmt" "os" "path/filepath" "str -conv" - "str ings" "arche/internal/store" "github.com/fsnotify/fsnotify" "golang.org/x/sys/unix" ) -const PidFile = "watch.pid" - -func IsActive(archeDir string) bool { +func IsActive(archeDir string, st store.Store) bool { + pid, err := st.GetWatcherPID() + if err == nil && pid > 0 { + return unix.Kill(pid, 0) == nil + } data, +f err := os.ReadFile(filepath.Join(archeDir, -PidFile +"watch.pid" )) if +f err != nil { return false } - - pid, err := strconv.Atoi + var legacyPID int + fmt.Sscanf (strings.TrimSpace(string(data)) -) - if err != nil { - return false - } - - return +, "%d", &legacyPID) //nolint:errcheck + return legacyPID > 0 && unix.Kill( -pid +legacyPID , 0) == nil } func Run(ctx context.Context, workRoot, archeDir string, st store.Store) error { -if err +pid := os. -WriteFile( - filepath.Join(archeDir, PidFile), - []byte(strconv.Itoa(os.Getpid())), - 0o644, - +Getpid() + if err := st.SetWatcherPID(pid ); err != nil { return fmt.Errorf(" -write +store watcher pid: %w", err) } defer -os.Remove(filepath.Join(archeDir, PidFile)) +st.ClearWatcherPID() //nolint:errcheck w, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("fsnotify: %w", err) } defer w.Close() if err := filepath.Walk(workRoot, func(p string, info os.FileInfo, walkErr error) error { if walkErr != nil || !info.IsDir() { return nil } if filepath.Base(p) == ".arche" { return filepath.SkipDir } return w.Add(p) }); err != nil { return fmt.Errorf("watch setup: %w", err) } for { select { case <-ctx.Done(): return nil case event, ok := <-w.Events: if !ok { return nil } const interesting = fsnotify.Write | fsnotify.Create | fsnotify.Rename | fsnotify.Remove | fsnotify.Chmod if event.Op&interesting == 0 { continue } rel, err := filepath.Rel(workRoot, event.Name) if err != nil || strings.HasPrefix(rel, "..") || strings.HasPrefix(rel, ".arche") { continue } rel = filepath.ToSlash(rel) if markErr := st.MarkWCacheDirty(rel); markErr != nil { fmt.Fprintf(os.Stderr, "arche watch: mark dirty %q: %v\n", rel, markErr) } if event.Op&fsnotify.Create != 0 { if info, statErr := os.Lstat(event.Name); statErr == nil && info.IsDir() { _ = w.Add(event.Name) } } case _, ok := <-w.Errors: if !ok { return nil } } } }
--- a/internal/wc/wc.go +++ b/internal/wc/wc.go @@ -1,1104 +1,1104 @@ package wc import ( "encoding/json" "fmt" "io/fs" "os" "path/filepath" "sort" "strings" "syscall" "time" "arche/internal/merge" "arche/internal/object" "arche/internal/repo" "arche/internal/store" "arche/internal/watcher" ) func dirtySet(r *repo.Repo) (map[string]bool, error) { if !watcher.IsActive(r.ArcheDir() +, r.Store ) { return nil, nil } entries, err := r.Store.ListDirtyWCacheEntries() if err != nil { return nil, err } m := make(map[string]bool, len(entries)) for _, e := range entries { m[e.Path] = true } return m, nil } type FileStatus struct { Path string Status rune } type WC struct { Repo *repo.Repo SignKey string NoAutoAdvance bool AuthorOverride *object.Signature } func New(r *repo.Repo) *WC { return &WC{Repo: r} } func (wc *WC) maybeSign(c *object.Commit) error { if wc.SignKey == "" { return nil } body := object.CommitBodyForSigning(c) sig, _, err := object.SignCommitBody(body, wc.SignKey) if err != nil { return fmt.Errorf("commit signing: %w", err) } c.CommitSig = sig return nil } func (wc *WC) snapshotIntoTx(tx *store.Tx, headCommit *object.Commit, paths []string, cacheMap map[string]store.WCacheEntry, dirty map[string]bool, message string, now time.Time) (*object.Commit, [32]byte, error) { r := wc.Repo var entries []fileEntry if err := r.Store.ClearWCache(tx); err != nil { return nil, object.ZeroID, fmt.Errorf("clear wcache: %w", err) } for _, rel := range paths { if dirty != nil && !dirty[rel] { if cached, ok := cacheMap[rel]; ok { entries = append(entries, fileEntry{ path: rel, blobID: cached.BlobID, mode: object.EntryMode(cached.Mode), }) if err := r.Store.SetWCacheEntry(tx, cached); err != nil { return nil, object.ZeroID, fmt.Errorf("set wcache: %w", err) } continue } } abs := filepath.Join(r.Root, rel) info, err := os.Lstat(abs) if err != nil { continue } var blobID [32]byte mode := fileMode(info) if cached, ok := cacheMap[rel]; ok { st := info.Sys().(*syscall.Stat_t) inode := st.Ino mtime := info.ModTime().UnixNano() size := info.Size() if cached.Inode == inode && cached.MtimeNs == mtime && cached.Size == size { blobID = cached.BlobID } } if blobID == object.ZeroID { content, err := readFileContent(abs, info) if err != nil { return nil, object.ZeroID, err } id, err := repo.WriteBlobTx(r.Store, tx, &object.Blob{Content: content}) if err != nil { return nil, object.ZeroID, err } blobID = id } st := info.Sys().(*syscall.Stat_t) if err := r.Store.SetWCacheEntry(tx, store.WCacheEntry{ Path: rel, Inode: st.Ino, MtimeNs: info.ModTime().UnixNano(), Size: info.Size(), BlobID: blobID, Mode: uint8(mode), }); err != nil { return nil, object.ZeroID, fmt.Errorf("set wcache: %w", err) } entries = append(entries, fileEntry{path: rel, blobID: blobID, mode: mode}) } tree, err := buildTree(r, tx, entries) if err != nil { return nil, object.ZeroID, err } sig := object.Signature{ Name: r.Cfg.User.Name, Email: r.Cfg.User.Email, Timestamp: now, } c := &object.Commit{ TreeID: tree, Parents: headCommit.Parents, ChangeID: headCommit.ChangeID, Author: headCommit.Author, Committer: sig, Message: message, Phase: headCommit.Phase, } if headCommit.Author.Timestamp.IsZero() { c.Author = sig } if err := wc.maybeSign(c); err != nil { return nil, object.ZeroID, err } commitID, err := repo.WriteCommitTx(r.Store, tx, c) if err != nil { return nil, object.ZeroID, err } if err := r.Store.SetChangeCommit(tx, c.ChangeID, commitID); err != nil { return nil, object.ZeroID, err } return c, commitID, nil } func (wc *WC) snapshotInput() (paths []string, cacheMap map[string]store.WCacheEntry, dirty map[string]bool, err error) { r := wc.Repo cacheEntries, err := r.Store.ListWCacheEntries() if err != nil { return nil, nil, nil, err } cacheMap = make(map[string]store.WCacheEntry, len(cacheEntries)) for _, e := range cacheEntries { cacheMap[e.Path] = e } dirty, _ = dirtySet(r) if dirty != nil { seen := make(map[string]bool, len(cacheMap)+len(dirty)) for p := range cacheMap { seen[p] = true paths = append(paths, p) } for p := range dirty { if !seen[p] { paths = append(paths, p) } } } else { paths, err = wc.trackedPaths() if err != nil { return nil, nil, nil, err } } return paths, cacheMap, dirty, nil } func (wc *WC) Snapshot(message string) (*object.Commit, [32]byte, error) { r := wc.Repo now := time.Now() head, _, err := r.HeadCommit() if err != nil { return nil, object.ZeroID, err } paths, cacheMap, dirty, err := wc.snapshotInput() if err != nil { return nil, object.ZeroID, err } tx, err := r.Store.Begin() if err != nil { return nil, object.ZeroID, err } c, commitID, err := wc.snapshotIntoTx(tx, head, paths, cacheMap, dirty, message, now) if err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } if err := r.Store.Commit(tx); err != nil { return nil, object.ZeroID, err } return c, commitID, nil } func (wc *WC) Snap(message string) (*object.Commit, [32]byte, error) { r := wc.Repo now := time.Now() before, err := r.CaptureRefState() if err != nil { return nil, object.ZeroID, err } statusBefore, err := wc.Status() if err != nil { return nil, object.ZeroID, err } diffPaths := make(map[string]bool, len(statusBefore)) for _, fsEntry := range statusBefore { diffPaths[fsEntry.Path] = true } useRestrictedPaths := len(r.Cfg.Hooks.PreSnap) > 0 if useRestrictedPaths { if err := RunHooksSequential(r.Root, "pre-snap", r.Cfg.Hooks.PreSnap); err != nil { return nil, object.ZeroID, fmt.Errorf("pre-snap hook failed: %w", err) } } head, oldHeadID, err := r.HeadCommit() if err != nil { return nil, object.ZeroID, err } type snapshotFn func(tx *store.Tx) (*object.Commit, [32]byte, error) var doSnapshot snapshotFn if useRestrictedPaths { headBlobs := make(map[string][32]byte) headModes := make(map[string]object.EntryMode) if err := flattenTree(r, head.TreeID, "", headBlobs); err != nil { return nil, object.ZeroID, err } if err := flattenTreeModes(r, head.TreeID, "", headModes); err != nil { return nil, object.ZeroID, err } doSnapshot = func(tx *store.Tx) (*object.Commit, [32]byte, error) { return wc.snapshotRestrictedPathsIntoTx(tx, head, headBlobs, headModes, diffPaths, message, now) } } else { paths, cacheMap, dirty, err := wc.snapshotInput() if err != nil { return nil, object.ZeroID, err } doSnapshot = func(tx *store.Tx) (*object.Commit, [32]byte, error) { return wc.snapshotIntoTx(tx, head, paths, cacheMap, dirty, message, now) } } existingBookmarks, _ := r.Store.ListBookmarks() tx, err := r.Store.Begin() if err != nil { return nil, object.ZeroID, err } snapped, snappedID, err := doSnapshot(tx) if err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } if snappedID != oldHeadID { for _, bm := range existingBookmarks { if bm.CommitID == oldHeadID { _ = r.Store.SetBookmark(tx, store.Bookmark{ Name: bm.Name, CommitID: snappedID, Remote: bm.Remote, }) } } } newChangeID, err := r.Store.AllocChangeID(tx) if err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } sig := object.Signature{Name: r.Cfg.User.Name, Email: r.Cfg.User.Email, Timestamp: now} newDraft := &object.Commit{ TreeID: snapped.TreeID, Parents: [][32]byte{snappedID}, ChangeID: newChangeID, Author: sig, Committer: sig, Message: "", Phase: object.PhaseDraft, } newDraftID, err := repo.WriteCommitTx(r.Store, tx, newDraft) if err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } if err := r.Store.SetChangeCommit(tx, newChangeID, newDraftID); err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } after := buildRefState(snappedID, object.FormatChangeID(newChangeID)) op := store.Operation{ Kind: "snap", Timestamp: now.Unix(), Before: before, After: after, Metadata: "'" + firstLine(snapped.Message) + "'", } if _, err := r.Store.InsertOperation(tx, op); err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } if err := r.Store.Commit(tx); err != nil { return nil, object.ZeroID, err } if err := r.WriteHead(object.FormatChangeID(newChangeID)); err != nil { return nil, object.ZeroID, err } if len(r.Cfg.Hooks.PostSnap) > 0 { if err := RunHooksSequential(r.Root, "post-snap", r.Cfg.Hooks.PostSnap); err != nil { fmt.Fprintf(os.Stderr, "arche snap: post-snap hook: %v\n", err) } } return snapped, snappedID, nil } func (wc *WC) Status() ([]FileStatus, error) { r := wc.Repo head, _, err := r.HeadCommit() if err != nil { return nil, err } headFiles := make(map[string][32]byte) if err := flattenTree(r, head.TreeID, "", headFiles); err != nil { return nil, err } wcPaths, err := wc.trackedPaths() if err != nil { return nil, err } wcSet := make(map[string]bool, len(wcPaths)) for _, p := range wcPaths { wcSet[p] = true } cacheEntries, _ := r.Store.ListWCacheEntries() cacheMap := make(map[string]store.WCacheEntry, len(cacheEntries)) for _, e := range cacheEntries { cacheMap[e.Path] = e } dirty, _ := dirtySet(r) var out []FileStatus for path, headBlobID := range headFiles { if !wcSet[path] { out = append(out, FileStatus{Path: path, Status: 'D'}) continue } if dirty != nil && !dirty[path] { if cached, ok := cacheMap[path]; ok { if cached.BlobID != headBlobID { out = append(out, FileStatus{Path: path, Status: 'M'}) } continue } } curBlobID, err := wc.blobIDForPath(path) if err != nil { continue } if curBlobID != headBlobID { out = append(out, FileStatus{Path: path, Status: 'M'}) } } ignore, _ := loadIgnore(r.Root) for _, path := range wcPaths { if _, inHead := headFiles[path]; !inHead { if ignore.Match(path) { continue } out = append(out, FileStatus{Path: path, Status: 'A'}) } } sort.Slice(out, func(i, j int) bool { return out[i].Path < out[j].Path }) return out, nil } func (wc *WC) materializeDisk(treeID [32]byte) (map[string][32]byte, map[string]object.EntryMode, error) { r := wc.Repo wantFiles := make(map[string][32]byte) wantMode := make(map[string]object.EntryMode) if err := flattenTree(r, treeID, "", wantFiles); err != nil { return nil, nil, err } if err := flattenTreeModes(r, treeID, "", wantMode); err != nil { return nil, nil, err } ignore, _ := loadIgnore(r.Root) err := filepath.WalkDir(r.Root, func(path string, d fs.DirEntry, err error) error { if err != nil { return nil } rel, _ := filepath.Rel(r.Root, path) if rel == "." { return nil } if d.IsDir() { if rel == archeDirName || strings.HasPrefix(rel, archeDirName+string(os.PathSeparator)) { return filepath.SkipDir } return nil } if ignore.Match(rel) { return nil } if _, ok := wantFiles[rel]; !ok { return os.Remove(path) } return nil }) if err != nil { return nil, nil, err } var conflictPaths []string for relPath, blobID := range wantFiles { abs := filepath.Join(r.Root, relPath) if err := os.MkdirAll(filepath.Dir(abs), 0o755); err != nil { return nil, nil, err } content, err := r.ReadBlob(blobID) if err != nil { if conf, cErr := r.ReadConflict(blobID); cErr == nil { content = renderConflictMarkers(r, conf) conflictPaths = append(conflictPaths, relPath) err = nil } } if err != nil { return nil, nil, err } perm := fs.FileMode(0o644) if wantMode[relPath] == object.ModeExec { perm = 0o755 } if err := os.WriteFile(abs, content, perm); err != nil { return nil, nil, err } } for _, p := range conflictPaths { delete(wantFiles, p) } return wantFiles, wantMode, nil } func renderConflictMarkers(r *repo.Repo, conf *object.Conflict) []byte { readStr := func(id [32]byte) string { if id == object.ZeroID { return "" } b, _ := r.ReadBlob(id) return string(b) } nl := func(s string) string { if len(s) > 0 && s[len(s)-1] != '\n' { return s + "\n" } return s } if conf.Ours.BlobID == object.ZeroID { return []byte(fmt.Sprintf("<<<<<<< ours\n(deleted)\n=======\n%s>>>>>>> theirs\n", nl(readStr(conf.Theirs.BlobID)))) } if conf.Theirs.BlobID == object.ZeroID { return []byte(fmt.Sprintf("<<<<<<< ours\n%s=======\n(deleted)\n>>>>>>> theirs\n", nl(readStr(conf.Ours.BlobID)))) } return []byte(fmt.Sprintf("<<<<<<< ours\n%s=======\n%s>>>>>>> theirs\n", nl(readStr(conf.Ours.BlobID)), nl(readStr(conf.Theirs.BlobID)))) } func (wc *WC) populateWCacheInTx(tx *store.Tx, wantFiles map[string][32]byte) error { r := wc.Repo if err := r.Store.ClearWCache(tx); err != nil { return err } for relPath, blobID := range wantFiles { abs := filepath.Join(r.Root, relPath) info, err := os.Lstat(abs) if err != nil { continue } st, ok := info.Sys().(*syscall.Stat_t) if !ok { continue } _ = r.Store.SetWCacheEntry(tx, store.WCacheEntry{ Path: relPath, Inode: st.Ino, MtimeNs: info.ModTime().UnixNano(), Size: info.Size(), BlobID: blobID, Mode: uint8(fileMode(info)), }) } return nil } func (wc *WC) MaterializeQuiet(treeID [32]byte) error { r := wc.Repo wantFiles, _, err := wc.materializeDisk(treeID) if err != nil { return err } tx, err := r.Store.Begin() if err != nil { return err } if err := wc.populateWCacheInTx(tx, wantFiles); err != nil { r.Store.Rollback(tx) return err } return r.Store.Commit(tx) } func (wc *WC) Materialize(treeID [32]byte, newChangeID string) error { r := wc.Repo before, _ := r.CaptureRefState() now := time.Now() wantFiles, _, err := wc.materializeDisk(treeID) if err != nil { return err } bare := object.StripChangeIDPrefix(newChangeID) commitID, _ := r.Store.GetChangeCommit(bare) after := buildRefState(commitID, newChangeID) tx, err := r.Store.Begin() if err != nil { return err } if err := wc.populateWCacheInTx(tx, wantFiles); err != nil { r.Store.Rollback(tx) return err } op := store.Operation{ Kind: "co", Timestamp: now.Unix(), Before: before, After: after, Metadata: "checked out " + newChangeID, } if _, err := r.Store.InsertOperation(tx, op); err != nil { r.Store.Rollback(tx) return err } return r.Store.Commit(tx) } const archeDirName = ".arche" func (wc *WC) trackedPaths() ([]string, error) { r := wc.Repo ignore, _ := loadIgnore(r.Root) var paths []string err := filepath.WalkDir(r.Root, func(path string, d fs.DirEntry, err error) error { if err != nil { return nil } rel, _ := filepath.Rel(r.Root, path) if rel == "." { return nil } if d.IsDir() { if rel == archeDirName || strings.HasPrefix(rel, archeDirName+string(os.PathSeparator)) { return filepath.SkipDir } if ignore.MatchDir(rel) { return filepath.SkipDir } return nil } if ignore.Match(rel) { return nil } paths = append(paths, filepath.ToSlash(rel)) return nil }) return paths, err } func (wc *WC) TrackedPaths() ([]string, error) { return wc.trackedPaths() } func (wc *WC) blobIDForPath(rel string) ([32]byte, error) { r := wc.Repo abs := filepath.Join(r.Root, rel) info, err := os.Lstat(abs) if err != nil { return object.ZeroID, err } st := info.Sys().(*syscall.Stat_t) if cached, _ := r.Store.GetWCacheEntry(rel); cached != nil { if cached.Inode == st.Ino && cached.MtimeNs == info.ModTime().UnixNano() && cached.Size == info.Size() { return cached.BlobID, nil } } content, err := readFileContent(abs, info) if err != nil { return object.ZeroID, err } b := &object.Blob{Content: content} return object.HashBlob(b), nil } func flattenTree(r *repo.Repo, treeID [32]byte, prefix string, out map[string][32]byte) error { if treeID == object.ZeroID { return nil } t, err := r.ReadTree(treeID) if err != nil { return err } for _, e := range t.Entries { rel := join(prefix, e.Name) switch e.Mode { case object.ModeDir: if err := flattenTree(r, e.ObjectID, rel, out); err != nil { return err } default: out[rel] = e.ObjectID } } return nil } func flattenTreeModes(r *repo.Repo, treeID [32]byte, prefix string, out map[string]object.EntryMode) error { if treeID == object.ZeroID { return nil } t, err := r.ReadTree(treeID) if err != nil { return err } for _, e := range t.Entries { rel := join(prefix, e.Name) switch e.Mode { case object.ModeDir: if err := flattenTreeModes(r, e.ObjectID, rel, out); err != nil { return err } default: out[rel] = e.Mode } } return nil } type fileEntry struct { path string blobID [32]byte mode object.EntryMode } func buildTree(r *repo.Repo, tx *store.Tx, entries []fileEntry) ([32]byte, error) { type node struct { isFile bool blobID [32]byte mode object.EntryMode children map[string]*node } root := &node{children: make(map[string]*node)} for _, e := range entries { parts := strings.Split(e.path, "/") cur := root for i, part := range parts { if i == len(parts)-1 { cur.children[part] = &node{isFile: true, blobID: e.blobID, mode: e.mode} } else { if _, ok := cur.children[part]; !ok { cur.children[part] = &node{children: make(map[string]*node)} } cur = cur.children[part] } } } var writeNode func(n *node) ([32]byte, error) writeNode = func(n *node) ([32]byte, error) { var treeEntries []object.TreeEntry for name, child := range n.children { if child.isFile { treeEntries = append(treeEntries, object.TreeEntry{ Name: name, Mode: child.mode, ObjectID: child.blobID, }) } else { subID, err := writeNode(child) if err != nil { return object.ZeroID, err } treeEntries = append(treeEntries, object.TreeEntry{ Name: name, Mode: object.ModeDir, ObjectID: subID, }) } } sort.Slice(treeEntries, func(i, j int) bool { return treeEntries[i].Name < treeEntries[j].Name }) t := &object.Tree{Entries: treeEntries} id, err := repo.WriteTreeTx(r.Store, tx, t) return id, err } return writeNode(root) } func fileMode(info os.FileInfo) object.EntryMode { if info.Mode()&0o111 != 0 { return object.ModeExec } if info.Mode()&os.ModeSymlink != 0 { return object.ModeSymlink } return object.ModeFile } func readFileContent(abs string, info os.FileInfo) ([]byte, error) { if info.Mode()&os.ModeSymlink != 0 { target, err := os.Readlink(abs) if err != nil { return nil, err } return []byte(target), nil } return os.ReadFile(abs) } func join(prefix, name string) string { if prefix == "" { return name } return prefix + "/" + name } func buildRefState(commitID [32]byte, changeID string) string { m := map[string]string{ "head": changeID, "tip": fmt.Sprintf("%x", commitID), } b, _ := json.Marshal(m) return string(b) } func firstLine(s string) string { if i := strings.IndexByte(s, '\n'); i >= 0 { return s[:i] } return s } func (wc *WC) SnapshotTree() ([32]byte, error) { r := wc.Repo paths, cacheMap, dirty, err := wc.snapshotInput() if err != nil { return object.ZeroID, err } tx, err := r.Store.Begin() if err != nil { return object.ZeroID, err } var entries []fileEntry for _, rel := range paths { if dirty != nil && !dirty[rel] { if cached, ok := cacheMap[rel]; ok { entries = append(entries, fileEntry{ path: rel, blobID: cached.BlobID, mode: object.EntryMode(cached.Mode), }) continue } } abs := filepath.Join(r.Root, rel) info, err := os.Lstat(abs) if err != nil { continue } var blobID [32]byte mode := fileMode(info) if cached, ok := cacheMap[rel]; ok { st := info.Sys().(*syscall.Stat_t) if cached.Inode == st.Ino && cached.MtimeNs == info.ModTime().UnixNano() && cached.Size == info.Size() { blobID = cached.BlobID } } if blobID == object.ZeroID { content, err := readFileContent(abs, info) if err != nil { r.Store.Rollback(tx) //nolint:errcheck return object.ZeroID, err } id, err := repo.WriteBlobTx(r.Store, tx, &object.Blob{Content: content}) if err != nil { r.Store.Rollback(tx) //nolint:errcheck return object.ZeroID, err } blobID = id } entries = append(entries, fileEntry{path: rel, blobID: blobID, mode: mode}) } treeID, err := buildTree(r, tx, entries) if err != nil { r.Store.Rollback(tx) //nolint:errcheck return object.ZeroID, err } if err := r.Store.Commit(tx); err != nil { return object.ZeroID, err } return treeID, nil } func (wc *WC) Amend(message string) (*object.Commit, [32]byte, error) { r := wc.Repo now := time.Now() head, oldHeadID, err := r.HeadCommit() if err != nil { return nil, object.ZeroID, err } if head.Phase == object.PhasePublic { return nil, object.ZeroID, fmt.Errorf("cannot amend a public commit; use --force-rewrite if you are sure") } before, err := r.CaptureRefState() if err != nil { return nil, object.ZeroID, err } if message == "" { message = head.Message } paths, cacheMap, dirty, err := wc.snapshotInput() if err != nil { return nil, object.ZeroID, err } tx, err := r.Store.Begin() if err != nil { return nil, object.ZeroID, err } amended, amendedID, err := wc.snapshotIntoTx(tx, head, paths, cacheMap, dirty, message, now) if err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } if oldHeadID != amendedID { obs := &object.ObsoleteMarker{ Predecessor: oldHeadID, Successors: [][32]byte{amendedID}, Reason: "amend", Timestamp: now.Unix(), } if _, err := repo.WriteObsoleteTx(r.Store, tx, obs); err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } } after := buildRefState(amendedID, object.FormatChangeID(amended.ChangeID)) op := store.Operation{ Kind: "amend", Timestamp: now.Unix(), Before: before, After: after, Metadata: "'" + firstLine(amended.Message) + "'", } if _, err := r.Store.InsertOperation(tx, op); err != nil { r.Store.Rollback(tx) return nil, object.ZeroID, err } if err := r.Store.Commit(tx); err != nil { return nil, object.ZeroID, err } if oldHeadID != amendedID { if err := wc.autoRebaseDownstream(oldHeadID, amendedID, head.ChangeID, now); err != nil { fmt.Fprintf(os.Stderr, "arche: warning: downstream rebase failed: %v\n", err) } } return amended, amendedID, nil } func (wc *WC) autoRebaseDownstream(oldParentID, newParentID [32]byte, headChangeID string, now time.Time) error { r := wc.Repo allChanges, err := r.Store.ListChanges() if err != nil { return err } type draftEntry struct { id [32]byte changeID string commit *object.Commit } children := make(map[[32]byte][]draftEntry) for _, ch := range allChanges { if ch.CommitID == object.ZeroID { continue } c, err := r.ReadCommit(ch.CommitID) if err != nil || c == nil { continue } if c.Phase != object.PhaseDraft { continue } if c.ChangeID == headChangeID { continue } if len(c.Parents) == 0 { continue } d := draftEntry{id: ch.CommitID, changeID: ch.Name, commit: c} children[c.Parents[0]] = append(children[c.Parents[0]], d) } type rebaseTask struct { entry draftEntry newParent [32]byte } var tasks []rebaseTask queue := []struct { oldID [32]byte newID [32]byte }{{oldParentID, newParentID}} for len(queue) > 0 { cur := queue[0] queue = queue[1:] for _, child := range children[cur.oldID] { tasks = append(tasks, rebaseTask{entry: child, newParent: cur.newID}) queue = append(queue, struct{ oldID, newID [32]byte }{child.id, child.id}) } } remapped := map[[32]byte][32]byte{oldParentID: newParentID} for _, task := range tasks { oldFirst := task.entry.commit.Parents[0] newParent, ok := remapped[oldFirst] if !ok { newParent = oldFirst } var baseTreeID [32]byte if pc, err2 := r.ReadCommit(oldFirst); err2 == nil { baseTreeID = pc.TreeID } newParentCommit, err := r.ReadCommit(newParent) if err != nil { return fmt.Errorf("read new parent for %s: %w", object.FormatChangeID(task.entry.changeID), err) } result, err := merge.Trees(r, baseTreeID, task.entry.commit.TreeID, newParentCommit.TreeID) if err != nil { return fmt.Errorf("merge for %s: %w", object.FormatChangeID(task.entry.changeID), err) } newCommit := &object.Commit{ TreeID: result.TreeID, Parents: [][32]byte{newParent}, ChangeID: task.entry.changeID, Author: task.entry.commit.Author, Committer: object.Signature{Name: r.Cfg.User.Name, Email: r.Cfg.User.Email, Timestamp: now}, Message: task.entry.commit.Message, Phase: task.entry.commit.Phase, } tx, err := r.Store.Begin() if err != nil { return err } newCommitID, err := repo.WriteCommitTx(r.Store, tx, newCommit) if err != nil { r.Store.Rollback(tx) return err } if err := r.Store.SetChangeCommit(tx, task.entry.changeID, newCommitID); err != nil { r.Store.Rollback(tx) return err } obs := &object.ObsoleteMarker{ Predecessor: task.entry.id, Successors: [][32]byte{newCommitID}, Reason: "amend", Timestamp: now.Unix(), } if _, err := repo.WriteObsoleteTx(r.Store, tx, obs); err != nil { r.Store.Rollback(tx) return err } if err := r.Store.Commit(tx); err != nil { return err } remapped[task.entry.id] = newCommitID conflictNote := "" if len(result.Conflicts) > 0 { conflictNote = fmt.Sprintf(" (%d conflict(s))", len(result.Conflicts)) } fmt.Printf(" auto-rebased %s%s\n", object.FormatChangeID(task.entry.changeID), conflictNote) } return nil }