1package store
2
3import (
4 "encoding/hex"
5 "encoding/json"
6 "fmt"
7 "os"
8 "path/filepath"
9 "sort"
10 "time"
11
12 "arche/internal/object"
13)
14
15type GCStats struct {
16 ObjectsDeleted int
17 PackEntriesDeleted int
18 PackFilesRebuilt int
19 BytesFreed int64
20}
21
22type GCProgress func(phase string, done, total int)
23
24type GCer interface {
25 GC(retentionDays int, progress GCProgress) (*GCStats, error)
26}
27
28func (s *SQLiteStore) GC(retentionDays int, progress GCProgress) (*GCStats, error) {
29 if progress == nil {
30 progress = func(string, int, int) {}
31 }
32 if retentionDays <= 0 {
33 retentionDays = 90
34 }
35 retentionCutoff := time.Now().AddDate(0, 0, -retentionDays).Unix()
36
37 live := make(map[[32]byte]struct{})
38
39 progress("roots", 0, 0)
40 roots, err := s.gcCollectRoots()
41 if err != nil {
42 return nil, fmt.Errorf("gc: collect roots: %w", err)
43 }
44 for i, root := range roots {
45 progress("mark", i+1, len(roots))
46 if err := s.gcMark(root, live); err != nil {
47 return nil, fmt.Errorf("gc: mark: %w", err)
48 }
49 }
50 if err := s.gcMarkObsolete(live, retentionCutoff); err != nil {
51 return nil, fmt.Errorf("gc: mark obsolete: %w", err)
52 }
53
54 progress("sweep", 0, 0)
55 stats := &GCStats{}
56
57 deleted, err := s.gcSweepObjects(live)
58 if err != nil {
59 return nil, fmt.Errorf("gc: sweep objects: %w", err)
60 }
61 stats.ObjectsDeleted = deleted
62
63 packStats, err := s.gcRepackPacks(live, progress)
64 if err != nil {
65 return nil, fmt.Errorf("gc: repack: %w", err)
66 }
67 stats.PackEntriesDeleted = packStats.PackEntriesDeleted
68 stats.PackFilesRebuilt = packStats.PackFilesRebuilt
69 stats.BytesFreed = packStats.BytesFreed
70
71 return stats, nil
72}
73
74func (s *SQLiteStore) gcCollectRoots() ([][32]byte, error) {
75 seen := make(map[[32]byte]struct{})
76 var roots [][32]byte
77
78 add := func(raw []byte) {
79 if len(raw) != 32 {
80 return
81 }
82 var id [32]byte
83 copy(id[:], raw)
84 if _, ok := seen[id]; !ok {
85 seen[id] = struct{}{}
86 roots = append(roots, id)
87 }
88 }
89 addHex := func(h string) {
90 b, err := hex.DecodeString(h)
91 if err == nil && len(b) == 32 {
92 add(b)
93 }
94 }
95
96 rows, err := s.db.Query("SELECT commit_id FROM bookmarks")
97 if err != nil {
98 return nil, err
99 }
100 for rows.Next() {
101 var raw []byte
102 if scanErr := rows.Scan(&raw); scanErr != nil {
103 rows.Close()
104 return nil, scanErr
105 }
106 add(raw)
107 }
108 rows.Close()
109 if err := rows.Err(); err != nil {
110 return nil, err
111 }
112
113 rows, err = s.db.Query("SELECT commit_id FROM changes WHERE commit_id IS NOT NULL")
114 if err != nil {
115 return nil, err
116 }
117 for rows.Next() {
118 var raw []byte
119 if scanErr := rows.Scan(&raw); scanErr != nil {
120 rows.Close()
121 return nil, scanErr
122 }
123 add(raw)
124 }
125 rows.Close()
126 if err := rows.Err(); err != nil {
127 return nil, err
128 }
129
130 rows, err = s.db.Query("SELECT before, after FROM operations")
131 if err != nil {
132 return nil, err
133 }
134 for rows.Next() {
135 var before, after string
136 if scanErr := rows.Scan(&before, &after); scanErr != nil {
137 rows.Close()
138 return nil, scanErr
139 }
140 for _, snap := range []string{before, after} {
141 var rs struct {
142 Tip string `json:"tip"`
143 Bookmarks map[string]string `json:"bookmarks"`
144 }
145 if json.Unmarshal([]byte(snap), &rs) == nil {
146 addHex(rs.Tip)
147 for _, v := range rs.Bookmarks {
148 addHex(v)
149 }
150 }
151 }
152 }
153 rows.Close()
154 if err := rows.Err(); err != nil {
155 return nil, err
156 }
157
158 return roots, nil
159}
160
161func (s *SQLiteStore) gcMark(startID [32]byte, live map[[32]byte]struct{}) error {
162 queue := [][32]byte{startID}
163 for len(queue) > 0 {
164 id := queue[len(queue)-1]
165 queue = queue[:len(queue)-1]
166
167 if _, ok := live[id]; ok {
168 continue
169 }
170
171 kind, raw, err := s.ReadObject(id)
172 if err != nil {
173 continue
174 }
175 live[id] = struct{}{}
176
177 switch object.Kind(kind) {
178 case object.KindCommit:
179 c, err := object.DecodeCommit(raw)
180 if err != nil {
181 return err
182 }
183 queue = append(queue, c.TreeID)
184 queue = append(queue, c.Parents...)
185
186 case object.KindTree:
187 t, err := object.DecodeTree(raw)
188 if err != nil {
189 return err
190 }
191 for _, e := range t.Entries {
192 queue = append(queue, e.ObjectID)
193 }
194
195 case object.KindConflict:
196 c, err := object.DecodeConflict(raw)
197 if err != nil {
198 return err
199 }
200 if c.Base != nil && c.Base.BlobID != object.ZeroID {
201 live[c.Base.BlobID] = struct{}{}
202 }
203 if c.Ours.BlobID != object.ZeroID {
204 live[c.Ours.BlobID] = struct{}{}
205 }
206 if c.Theirs.BlobID != object.ZeroID {
207 live[c.Theirs.BlobID] = struct{}{}
208 }
209
210 case object.KindBlob:
211 // Already marked above; no children to follow.
212
213 case object.KindObsolete:
214 // Handled separately by gcMarkObsolete; the normal DAG traversal
215 // from roots never reaches obsolete markers.
216 }
217 }
218 return nil
219}
220
221func (s *SQLiteStore) gcMarkObsolete(live map[[32]byte]struct{}, retentionCutoff int64) error {
222 rows, err := s.db.Query("SELECT id, data FROM objects WHERE kind = ?", string(object.KindObsolete))
223 if err != nil {
224 return err
225 }
226 defer rows.Close()
227
228 for rows.Next() {
229 var idRaw, compressed []byte
230 if err := rows.Scan(&idRaw, &compressed); err != nil {
231 return err
232 }
233 raw, err := s.codec.Decompress(compressed)
234 if err != nil {
235 continue
236 }
237 o, err := object.DecodeObsolete(raw)
238 if err != nil {
239 continue
240 }
241 var id [32]byte
242 copy(id[:], idRaw)
243
244 if _, ok := live[o.Predecessor]; ok {
245 live[id] = struct{}{}
246 continue
247 }
248 if o.Timestamp > retentionCutoff {
249 live[id] = struct{}{}
250 }
251 }
252 return rows.Err()
253}
254
255func (s *SQLiteStore) gcSweepObjects(live map[[32]byte]struct{}) (int, error) {
256 rows, err := s.db.Query("SELECT id FROM objects")
257 if err != nil {
258 return 0, err
259 }
260 var dead [][]byte
261 for rows.Next() {
262 var id []byte
263 if err := rows.Scan(&id); err != nil {
264 rows.Close()
265 return 0, err
266 }
267 var key [32]byte
268 copy(key[:], id)
269 if _, ok := live[key]; !ok {
270 cp := make([]byte, len(id))
271 copy(cp, id)
272 dead = append(dead, cp)
273 }
274 }
275 rows.Close()
276 if err := rows.Err(); err != nil {
277 return 0, err
278 }
279 if len(dead) == 0 {
280 return 0, nil
281 }
282
283 tx, err := s.db.Begin()
284 if err != nil {
285 return 0, err
286 }
287 stmt, err := tx.Prepare("DELETE FROM objects WHERE id = ?")
288 if err != nil {
289 tx.Rollback() //nolint:errcheck
290 return 0, err
291 }
292 for _, id := range dead {
293 if _, err := stmt.Exec(id); err != nil {
294 stmt.Close()
295 tx.Rollback() //nolint:errcheck
296 return 0, err
297 }
298 }
299 stmt.Close()
300 if err := tx.Commit(); err != nil {
301 return 0, err
302 }
303 return len(dead), nil
304}
305
306func (s *SQLiteStore) gcRepackPacks(live map[[32]byte]struct{}, progress GCProgress) (*GCStats, error) {
307 type packRec struct {
308 blobID [32]byte
309 packFile string
310 offset int64
311 rawSize int64
312 }
313
314 rows, err := s.db.Query("SELECT blob_id, pack_file, offset, raw_size FROM pack_index")
315 if err != nil {
316 return nil, err
317 }
318 var liveEntries []packRec
319 var deadCount int
320 for rows.Next() {
321 var raw []byte
322 var e packRec
323 if err := rows.Scan(&raw, &e.packFile, &e.offset, &e.rawSize); err != nil {
324 rows.Close()
325 return nil, err
326 }
327 copy(e.blobID[:], raw)
328 if _, ok := live[e.blobID]; ok {
329 liveEntries = append(liveEntries, e)
330 } else {
331 deadCount++
332 }
333 }
334 rows.Close()
335 if err := rows.Err(); err != nil {
336 return nil, err
337 }
338
339 stats := &GCStats{PackEntriesDeleted: deadCount}
340 if deadCount == 0 {
341 return stats, nil
342 }
343
344 oldPackFiles := make(map[string]int64)
345 pRows, err := s.db.Query("SELECT DISTINCT pack_file FROM pack_index")
346 if err != nil {
347 return nil, err
348 }
349 for pRows.Next() {
350 var pf string
351 if err := pRows.Scan(&pf); err != nil {
352 pRows.Close()
353 return nil, err
354 }
355 info, statErr := os.Stat(filepath.Join(s.packDir, pf))
356 if statErr == nil {
357 oldPackFiles[pf] = info.Size()
358 } else {
359 oldPackFiles[pf] = 0
360 }
361 }
362 pRows.Close()
363
364 s.pack.mu.Lock()
365 if s.pack.cur != nil {
366 s.pack.cur.Sync() //nolint:errcheck
367 s.pack.cur.Close()
368 s.pack.cur = nil
369 }
370 s.pack.mu.Unlock()
371
372 if len(liveEntries) == 0 {
373 if _, err := s.db.Exec("DELETE FROM pack_index"); err != nil {
374 return nil, err
375 }
376 for pf, size := range oldPackFiles {
377 os.Remove(filepath.Join(s.packDir, pf)) //nolint:errcheck
378 stats.BytesFreed += size
379 }
380 return stats, nil
381 }
382
383 newPM, err := newPackManager(s.packDir, 0)
384 if err != nil {
385 return nil, err
386 }
387
388 type newEntry struct {
389 blobID [32]byte
390 packFile string
391 offset int64
392 rawSize int64
393 deltaBaseID [32]byte
394 deltaDepth int
395 }
396
397 sort.Slice(liveEntries, func(i, j int) bool {
398 return liveEntries[i].rawSize > liveEntries[j].rawSize
399 })
400
401 type rawBlob struct {
402 blobID [32]byte
403 rawSize int64
404 content []byte
405 }
406 rawBlobs := make([]rawBlob, 0, len(liveEntries))
407 for _, e := range liveEntries {
408 _, content, err := s.ReadObject(e.blobID)
409 if err != nil {
410 newPM.close()
411 return nil, fmt.Errorf("gc: read blob %s for repack: %w",
412 hex.EncodeToString(e.blobID[:])[:8], err)
413 }
414 rawBlobs = append(rawBlobs, rawBlob{blobID: e.blobID, rawSize: e.rawSize, content: content})
415 }
416
417 var newEntries []newEntry
418 depthOf := make(map[[32]byte]int, len(rawBlobs))
419
420 for i, rb := range rawBlobs {
421 progress("repack", i+1, len(rawBlobs))
422
423 var bestBase *rawBlob
424 if i > 0 {
425 prev := &rawBlobs[i-1]
426 if prev.rawSize > 0 && rb.rawSize > 0 {
427 lo, hi := prev.rawSize, rb.rawSize
428 if lo > hi {
429 lo, hi = hi, lo
430 }
431 if hi <= lo*6/5 && depthOf[prev.blobID] < deltaMaxDepth {
432 bestBase = prev
433 }
434 }
435 }
436
437 if bestBase != nil {
438 deltaBytes := ComputeDelta(bestBase.content, rb.content)
439 compDelta := s.codec.Compress(deltaBytes)
440 compFull := s.codec.Compress(rb.content)
441 if len(compDelta) < len(compFull)*4/5 {
442 pe, writeErr := newPM.write(compDelta, rb.rawSize)
443 if writeErr != nil {
444 newPM.close()
445 return nil, fmt.Errorf("gc: write delta blob: %w", writeErr)
446 }
447 d := depthOf[bestBase.blobID] + 1
448 depthOf[rb.blobID] = d
449 newEntries = append(newEntries, newEntry{
450 blobID: rb.blobID,
451 packFile: pe.packFile,
452 offset: pe.offset,
453 rawSize: rb.rawSize,
454 deltaBaseID: bestBase.blobID,
455 deltaDepth: d,
456 })
457 continue
458 }
459 }
460
461 compFull := s.codec.Compress(rb.content)
462 pe, err := newPM.write(compFull, rb.rawSize)
463 if err != nil {
464 newPM.close()
465 return nil, fmt.Errorf("gc: write compacted blob: %w", err)
466 }
467 depthOf[rb.blobID] = 0
468 newEntries = append(newEntries, newEntry{
469 blobID: rb.blobID,
470 packFile: pe.packFile,
471 offset: pe.offset,
472 rawSize: rb.rawSize,
473 })
474 }
475 newPM.close()
476
477 tx, err := s.db.Begin()
478 if err != nil {
479 return nil, err
480 }
481 if _, err := tx.Exec("DELETE FROM pack_index"); err != nil {
482 tx.Rollback() //nolint:errcheck
483 return nil, err
484 }
485 stmt, err := tx.Prepare(
486 "INSERT INTO pack_index (blob_id, pack_file, offset, raw_size, delta_base_id, delta_depth) VALUES (?, ?, ?, ?, ?, ?)",
487 )
488 if err != nil {
489 tx.Rollback() //nolint:errcheck
490 return nil, err
491 }
492 for _, ne := range newEntries {
493 var deltaBaseID interface{}
494 if ne.deltaBaseID != ([32]byte{}) {
495 deltaBaseID = ne.deltaBaseID[:]
496 }
497 if _, err := stmt.Exec(ne.blobID[:], ne.packFile, ne.offset, ne.rawSize, deltaBaseID, ne.deltaDepth); err != nil {
498 stmt.Close()
499 tx.Rollback() //nolint:errcheck
500 return nil, err
501 }
502 }
503 stmt.Close()
504 if err := tx.Commit(); err != nil {
505 return nil, err
506 }
507
508 newPackFiles := make(map[string]struct{})
509 for _, ne := range newEntries {
510 newPackFiles[ne.packFile] = struct{}{}
511 }
512 stats.PackFilesRebuilt = len(newPackFiles)
513
514 for pf, size := range oldPackFiles {
515 if _, isNew := newPackFiles[pf]; !isNew {
516 os.Remove(filepath.Join(s.packDir, pf)) //nolint:errcheck
517 stats.BytesFreed += size
518 }
519 }
520
521 return stats, nil
522}