arche / internal/store/gc.go

commit 154431fd
  1package store
  2
  3import (
  4	"encoding/hex"
  5	"encoding/json"
  6	"fmt"
  7	"os"
  8	"path/filepath"
  9	"sort"
 10	"time"
 11
 12	"arche/internal/object"
 13)
 14
 15type GCStats struct {
 16	ObjectsDeleted     int
 17	PackEntriesDeleted int
 18	PackFilesRebuilt   int
 19	BytesFreed         int64
 20}
 21
 22type GCProgress func(phase string, done, total int)
 23
 24type GCer interface {
 25	GC(retentionDays int, progress GCProgress) (*GCStats, error)
 26}
 27
 28func (s *SQLiteStore) GC(retentionDays int, progress GCProgress) (*GCStats, error) {
 29	if progress == nil {
 30		progress = func(string, int, int) {}
 31	}
 32	if retentionDays <= 0 {
 33		retentionDays = 90
 34	}
 35	retentionCutoff := time.Now().AddDate(0, 0, -retentionDays).Unix()
 36
 37	live := make(map[[32]byte]struct{})
 38
 39	progress("roots", 0, 0)
 40	roots, err := s.gcCollectRoots()
 41	if err != nil {
 42		return nil, fmt.Errorf("gc: collect roots: %w", err)
 43	}
 44	for i, root := range roots {
 45		progress("mark", i+1, len(roots))
 46		if err := s.gcMark(root, live); err != nil {
 47			return nil, fmt.Errorf("gc: mark: %w", err)
 48		}
 49	}
 50	if err := s.gcMarkObsolete(live, retentionCutoff); err != nil {
 51		return nil, fmt.Errorf("gc: mark obsolete: %w", err)
 52	}
 53
 54	progress("sweep", 0, 0)
 55	stats := &GCStats{}
 56
 57	deleted, err := s.gcSweepObjects(live)
 58	if err != nil {
 59		return nil, fmt.Errorf("gc: sweep objects: %w", err)
 60	}
 61	stats.ObjectsDeleted = deleted
 62
 63	packStats, err := s.gcRepackPacks(live, progress)
 64	if err != nil {
 65		return nil, fmt.Errorf("gc: repack: %w", err)
 66	}
 67	stats.PackEntriesDeleted = packStats.PackEntriesDeleted
 68	stats.PackFilesRebuilt = packStats.PackFilesRebuilt
 69	stats.BytesFreed = packStats.BytesFreed
 70
 71	return stats, nil
 72}
 73
 74func (s *SQLiteStore) gcCollectRoots() ([][32]byte, error) {
 75	seen := make(map[[32]byte]struct{})
 76	var roots [][32]byte
 77
 78	add := func(raw []byte) {
 79		if len(raw) != 32 {
 80			return
 81		}
 82		var id [32]byte
 83		copy(id[:], raw)
 84		if _, ok := seen[id]; !ok {
 85			seen[id] = struct{}{}
 86			roots = append(roots, id)
 87		}
 88	}
 89	addHex := func(h string) {
 90		b, err := hex.DecodeString(h)
 91		if err == nil && len(b) == 32 {
 92			add(b)
 93		}
 94	}
 95
 96	rows, err := s.db.Query("SELECT commit_id FROM bookmarks")
 97	if err != nil {
 98		return nil, err
 99	}
100	for rows.Next() {
101		var raw []byte
102		if scanErr := rows.Scan(&raw); scanErr != nil {
103			rows.Close()
104			return nil, scanErr
105		}
106		add(raw)
107	}
108	rows.Close()
109	if err := rows.Err(); err != nil {
110		return nil, err
111	}
112
113	rows, err = s.db.Query("SELECT commit_id FROM changes WHERE commit_id IS NOT NULL")
114	if err != nil {
115		return nil, err
116	}
117	for rows.Next() {
118		var raw []byte
119		if scanErr := rows.Scan(&raw); scanErr != nil {
120			rows.Close()
121			return nil, scanErr
122		}
123		add(raw)
124	}
125	rows.Close()
126	if err := rows.Err(); err != nil {
127		return nil, err
128	}
129
130	rows, err = s.db.Query("SELECT before, after FROM operations")
131	if err != nil {
132		return nil, err
133	}
134	for rows.Next() {
135		var before, after string
136		if scanErr := rows.Scan(&before, &after); scanErr != nil {
137			rows.Close()
138			return nil, scanErr
139		}
140		for _, snap := range []string{before, after} {
141			var rs struct {
142				Tip       string            `json:"tip"`
143				Bookmarks map[string]string `json:"bookmarks"`
144			}
145			if json.Unmarshal([]byte(snap), &rs) == nil {
146				addHex(rs.Tip)
147				for _, v := range rs.Bookmarks {
148					addHex(v)
149				}
150			}
151		}
152	}
153	rows.Close()
154	if err := rows.Err(); err != nil {
155		return nil, err
156	}
157
158	return roots, nil
159}
160
161func (s *SQLiteStore) gcMark(startID [32]byte, live map[[32]byte]struct{}) error {
162	queue := [][32]byte{startID}
163	for len(queue) > 0 {
164		id := queue[len(queue)-1]
165		queue = queue[:len(queue)-1]
166
167		if _, ok := live[id]; ok {
168			continue
169		}
170
171		kind, raw, err := s.ReadObject(id)
172		if err != nil {
173			continue
174		}
175		live[id] = struct{}{}
176
177		switch object.Kind(kind) {
178		case object.KindCommit:
179			c, err := object.DecodeCommit(raw)
180			if err != nil {
181				return err
182			}
183			queue = append(queue, c.TreeID)
184			queue = append(queue, c.Parents...)
185
186		case object.KindTree:
187			t, err := object.DecodeTree(raw)
188			if err != nil {
189				return err
190			}
191			for _, e := range t.Entries {
192				queue = append(queue, e.ObjectID)
193			}
194
195		case object.KindConflict:
196			c, err := object.DecodeConflict(raw)
197			if err != nil {
198				return err
199			}
200			if c.Base != nil && c.Base.BlobID != object.ZeroID {
201				live[c.Base.BlobID] = struct{}{}
202			}
203			if c.Ours.BlobID != object.ZeroID {
204				live[c.Ours.BlobID] = struct{}{}
205			}
206			if c.Theirs.BlobID != object.ZeroID {
207				live[c.Theirs.BlobID] = struct{}{}
208			}
209
210		case object.KindBlob:
211			// Already marked above; no children to follow.
212
213		case object.KindObsolete:
214			// Handled separately by gcMarkObsolete; the normal DAG traversal
215			// from roots never reaches obsolete markers.
216		}
217	}
218	return nil
219}
220
221func (s *SQLiteStore) gcMarkObsolete(live map[[32]byte]struct{}, retentionCutoff int64) error {
222	rows, err := s.db.Query("SELECT id, data FROM objects WHERE kind = ?", string(object.KindObsolete))
223	if err != nil {
224		return err
225	}
226	defer rows.Close()
227
228	for rows.Next() {
229		var idRaw, compressed []byte
230		if err := rows.Scan(&idRaw, &compressed); err != nil {
231			return err
232		}
233		raw, err := s.codec.Decompress(compressed)
234		if err != nil {
235			continue
236		}
237		o, err := object.DecodeObsolete(raw)
238		if err != nil {
239			continue
240		}
241		var id [32]byte
242		copy(id[:], idRaw)
243
244		if _, ok := live[o.Predecessor]; ok {
245			live[id] = struct{}{}
246			continue
247		}
248		if o.Timestamp > retentionCutoff {
249			live[id] = struct{}{}
250		}
251	}
252	return rows.Err()
253}
254
255func (s *SQLiteStore) gcSweepObjects(live map[[32]byte]struct{}) (int, error) {
256	rows, err := s.db.Query("SELECT id FROM objects")
257	if err != nil {
258		return 0, err
259	}
260	var dead [][]byte
261	for rows.Next() {
262		var id []byte
263		if err := rows.Scan(&id); err != nil {
264			rows.Close()
265			return 0, err
266		}
267		var key [32]byte
268		copy(key[:], id)
269		if _, ok := live[key]; !ok {
270			cp := make([]byte, len(id))
271			copy(cp, id)
272			dead = append(dead, cp)
273		}
274	}
275	rows.Close()
276	if err := rows.Err(); err != nil {
277		return 0, err
278	}
279	if len(dead) == 0 {
280		return 0, nil
281	}
282
283	tx, err := s.db.Begin()
284	if err != nil {
285		return 0, err
286	}
287	stmt, err := tx.Prepare("DELETE FROM objects WHERE id = ?")
288	if err != nil {
289		tx.Rollback() //nolint:errcheck
290		return 0, err
291	}
292	for _, id := range dead {
293		if _, err := stmt.Exec(id); err != nil {
294			stmt.Close()
295			tx.Rollback() //nolint:errcheck
296			return 0, err
297		}
298	}
299	stmt.Close()
300	if err := tx.Commit(); err != nil {
301		return 0, err
302	}
303	return len(dead), nil
304}
305
306func (s *SQLiteStore) gcRepackPacks(live map[[32]byte]struct{}, progress GCProgress) (*GCStats, error) {
307	type packRec struct {
308		blobID   [32]byte
309		packFile string
310		offset   int64
311		rawSize  int64
312	}
313
314	rows, err := s.db.Query("SELECT blob_id, pack_file, offset, raw_size FROM pack_index")
315	if err != nil {
316		return nil, err
317	}
318	var liveEntries []packRec
319	var deadCount int
320	for rows.Next() {
321		var raw []byte
322		var e packRec
323		if err := rows.Scan(&raw, &e.packFile, &e.offset, &e.rawSize); err != nil {
324			rows.Close()
325			return nil, err
326		}
327		copy(e.blobID[:], raw)
328		if _, ok := live[e.blobID]; ok {
329			liveEntries = append(liveEntries, e)
330		} else {
331			deadCount++
332		}
333	}
334	rows.Close()
335	if err := rows.Err(); err != nil {
336		return nil, err
337	}
338
339	stats := &GCStats{PackEntriesDeleted: deadCount}
340	if deadCount == 0 {
341		return stats, nil
342	}
343
344	oldPackFiles := make(map[string]int64)
345	pRows, err := s.db.Query("SELECT DISTINCT pack_file FROM pack_index")
346	if err != nil {
347		return nil, err
348	}
349	for pRows.Next() {
350		var pf string
351		if err := pRows.Scan(&pf); err != nil {
352			pRows.Close()
353			return nil, err
354		}
355		info, statErr := os.Stat(filepath.Join(s.packDir, pf))
356		if statErr == nil {
357			oldPackFiles[pf] = info.Size()
358		} else {
359			oldPackFiles[pf] = 0
360		}
361	}
362	pRows.Close()
363
364	s.pack.mu.Lock()
365	if s.pack.cur != nil {
366		s.pack.cur.Sync() //nolint:errcheck
367		s.pack.cur.Close()
368		s.pack.cur = nil
369	}
370	s.pack.mu.Unlock()
371
372	if len(liveEntries) == 0 {
373		if _, err := s.db.Exec("DELETE FROM pack_index"); err != nil {
374			return nil, err
375		}
376		for pf, size := range oldPackFiles {
377			os.Remove(filepath.Join(s.packDir, pf)) //nolint:errcheck
378			stats.BytesFreed += size
379		}
380		return stats, nil
381	}
382
383	newPM, err := newPackManager(s.packDir, 0)
384	if err != nil {
385		return nil, err
386	}
387
388	type newEntry struct {
389		blobID      [32]byte
390		packFile    string
391		offset      int64
392		rawSize     int64
393		deltaBaseID [32]byte
394		deltaDepth  int
395	}
396
397	sort.Slice(liveEntries, func(i, j int) bool {
398		return liveEntries[i].rawSize > liveEntries[j].rawSize
399	})
400
401	type rawBlob struct {
402		blobID  [32]byte
403		rawSize int64
404		content []byte
405	}
406	rawBlobs := make([]rawBlob, 0, len(liveEntries))
407	for _, e := range liveEntries {
408		_, content, err := s.ReadObject(e.blobID)
409		if err != nil {
410			newPM.close()
411			return nil, fmt.Errorf("gc: read blob %s for repack: %w",
412				hex.EncodeToString(e.blobID[:])[:8], err)
413		}
414		rawBlobs = append(rawBlobs, rawBlob{blobID: e.blobID, rawSize: e.rawSize, content: content})
415	}
416
417	var newEntries []newEntry
418	depthOf := make(map[[32]byte]int, len(rawBlobs))
419
420	for i, rb := range rawBlobs {
421		progress("repack", i+1, len(rawBlobs))
422
423		var bestBase *rawBlob
424		if i > 0 {
425			prev := &rawBlobs[i-1]
426			if prev.rawSize > 0 && rb.rawSize > 0 {
427				lo, hi := prev.rawSize, rb.rawSize
428				if lo > hi {
429					lo, hi = hi, lo
430				}
431				if hi <= lo*6/5 && depthOf[prev.blobID] < deltaMaxDepth {
432					bestBase = prev
433				}
434			}
435		}
436
437		if bestBase != nil {
438			deltaBytes := ComputeDelta(bestBase.content, rb.content)
439			compDelta := s.codec.Compress(deltaBytes)
440			compFull := s.codec.Compress(rb.content)
441			if len(compDelta) < len(compFull)*4/5 {
442				pe, writeErr := newPM.write(compDelta, rb.rawSize)
443				if writeErr != nil {
444					newPM.close()
445					return nil, fmt.Errorf("gc: write delta blob: %w", writeErr)
446				}
447				d := depthOf[bestBase.blobID] + 1
448				depthOf[rb.blobID] = d
449				newEntries = append(newEntries, newEntry{
450					blobID:      rb.blobID,
451					packFile:    pe.packFile,
452					offset:      pe.offset,
453					rawSize:     rb.rawSize,
454					deltaBaseID: bestBase.blobID,
455					deltaDepth:  d,
456				})
457				continue
458			}
459		}
460
461		compFull := s.codec.Compress(rb.content)
462		pe, err := newPM.write(compFull, rb.rawSize)
463		if err != nil {
464			newPM.close()
465			return nil, fmt.Errorf("gc: write compacted blob: %w", err)
466		}
467		depthOf[rb.blobID] = 0
468		newEntries = append(newEntries, newEntry{
469			blobID:   rb.blobID,
470			packFile: pe.packFile,
471			offset:   pe.offset,
472			rawSize:  rb.rawSize,
473		})
474	}
475	newPM.close()
476
477	tx, err := s.db.Begin()
478	if err != nil {
479		return nil, err
480	}
481	if _, err := tx.Exec("DELETE FROM pack_index"); err != nil {
482		tx.Rollback() //nolint:errcheck
483		return nil, err
484	}
485	stmt, err := tx.Prepare(
486		"INSERT INTO pack_index (blob_id, pack_file, offset, raw_size, delta_base_id, delta_depth) VALUES (?, ?, ?, ?, ?, ?)",
487	)
488	if err != nil {
489		tx.Rollback() //nolint:errcheck
490		return nil, err
491	}
492	for _, ne := range newEntries {
493		var deltaBaseID interface{}
494		if ne.deltaBaseID != ([32]byte{}) {
495			deltaBaseID = ne.deltaBaseID[:]
496		}
497		if _, err := stmt.Exec(ne.blobID[:], ne.packFile, ne.offset, ne.rawSize, deltaBaseID, ne.deltaDepth); err != nil {
498			stmt.Close()
499			tx.Rollback() //nolint:errcheck
500			return nil, err
501		}
502	}
503	stmt.Close()
504	if err := tx.Commit(); err != nil {
505		return nil, err
506	}
507
508	newPackFiles := make(map[string]struct{})
509	for _, ne := range newEntries {
510		newPackFiles[ne.packFile] = struct{}{}
511	}
512	stats.PackFilesRebuilt = len(newPackFiles)
513
514	for pf, size := range oldPackFiles {
515		if _, isNew := newPackFiles[pf]; !isNew {
516			os.Remove(filepath.Join(s.packDir, pf)) //nolint:errcheck
517			stats.BytesFreed += size
518		}
519	}
520
521	return stats, nil
522}