1package store
2
3import (
4 "database/sql"
5 "encoding/hex"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "arche/internal/object"
12 "arche/internal/store/migrate"
13
14 "github.com/klauspost/compress/zstd"
15 _ "github.com/mattn/go-sqlite3"
16)
17
18const defaultPackThreshold = 128 * 1024
19
20type SQLiteStore struct {
21 db *sql.DB
22 pack *packManager
23 codec codec
24 packDir string
25 packThreshold int
26 compressionName string
27}
28
29func OpenSQLiteStore(dbPath, packDir string, packThreshold, packSealSize int, compression string) (*SQLiteStore, error) {
30 if packThreshold <= 0 {
31 packThreshold = defaultPackThreshold
32 }
33 db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000")
34 if err != nil {
35 return nil, fmt.Errorf("store open %s: %w", dbPath, err)
36 }
37 db.SetMaxOpenConns(1)
38
39 if _, err := db.Exec("PRAGMA journal_mode = WAL; PRAGMA foreign_keys = ON;"); err != nil {
40 db.Close()
41 return nil, fmt.Errorf("store pragma: %w", err)
42 }
43
44 if err := migrate.Run(db); err != nil {
45 db.Close()
46 return nil, fmt.Errorf("store migrate: %w", err)
47 }
48
49 pm, err := newPackManager(packDir, packSealSize)
50 if err != nil {
51 db.Close()
52 return nil, err
53 }
54
55 var dictData []byte
56 _ = db.QueryRow("SELECT dict FROM zstd_dicts ORDER BY id DESC LIMIT 1").Scan(&dictData)
57
58 cd, err := newCodec(compression, dictData)
59 if err != nil {
60 db.Close()
61 pm.close()
62 return nil, err
63 }
64
65 return &SQLiteStore{
66 db: db,
67 pack: pm,
68 codec: cd,
69 packDir: packDir,
70 packThreshold: packThreshold,
71 compressionName: compression,
72 }, nil
73}
74
75func (s *SQLiteStore) Begin() (*Tx, error) {
76 sqlTx, err := s.db.Begin()
77 if err != nil {
78 return nil, fmt.Errorf("begin tx: %w", err)
79 }
80 return &Tx{sqlTx: sqlTx}, nil
81}
82
83func (s *SQLiteStore) Commit(tx *Tx) error {
84 return tx.sqlTx.Commit()
85}
86
87func (s *SQLiteStore) Rollback(tx *Tx) error {
88 return tx.sqlTx.Rollback()
89}
90
91func (s *SQLiteStore) Close() error {
92 s.pack.close()
93 s.codec.Close()
94 return s.db.Close()
95}
96
97func (s *SQLiteStore) AddConflict(tx *Tx, path string) error {
98 _, err := tx.sqlTx.Exec("INSERT OR IGNORE INTO conflicts (path) VALUES (?)", path)
99 return err
100}
101
102func (s *SQLiteStore) ClearConflict(tx *Tx, path string) error {
103 _, err := tx.sqlTx.Exec("DELETE FROM conflicts WHERE path = ?", path)
104 return err
105}
106
107func (s *SQLiteStore) ClearAllConflicts(tx *Tx) error {
108 _, err := tx.sqlTx.Exec("DELETE FROM conflicts")
109 return err
110}
111
112func (s *SQLiteStore) ListConflicts() ([]string, error) {
113 rows, err := s.db.Query("SELECT path FROM conflicts ORDER BY path")
114 if err != nil {
115 return nil, err
116 }
117 defer rows.Close()
118 var out []string
119 for rows.Next() {
120 var p string
121 if err := rows.Scan(&p); err != nil {
122 return nil, err
123 }
124 out = append(out, p)
125 }
126 return out, rows.Err()
127}
128
129func (s *SQLiteStore) HasObject(id [32]byte) (bool, error) {
130 var count int
131 err := s.db.QueryRow("SELECT COUNT(*) FROM objects WHERE id = ?", id[:]).Scan(&count)
132 if err != nil {
133 return false, err
134 }
135 if count > 0 {
136 return true, nil
137 }
138
139 err = s.db.QueryRow("SELECT COUNT(*) FROM pack_index WHERE blob_id = ?", id[:]).Scan(&count)
140 if err != nil {
141 return false, err
142 }
143 return count > 0, nil
144}
145
146func (s *SQLiteStore) ReadObject(id [32]byte) (kind string, raw []byte, err error) {
147 var compressed []byte
148 rowErr := s.db.QueryRow("SELECT kind, data FROM objects WHERE id = ?", id[:]).Scan(&kind, &compressed)
149 if rowErr == nil {
150 raw, err = s.codec.Decompress(compressed)
151 if err != nil {
152 return "", nil, fmt.Errorf("decompress object %s: %w", hex.EncodeToString(id[:])[:12], err)
153 }
154 return kind, raw, nil
155 }
156 if !errors.Is(rowErr, sql.ErrNoRows) {
157 return "", nil, rowErr
158 }
159
160 var packFile string
161 var offset, rawSize int64
162 var deltaBaseIDRaw []byte
163 var deltaDepth int
164 rowErr = s.db.QueryRow(
165 "SELECT pack_file, offset, raw_size, delta_base_id, delta_depth FROM pack_index WHERE blob_id = ?", id[:],
166 ).Scan(&packFile, &offset, &rawSize, &deltaBaseIDRaw, &deltaDepth)
167 if errors.Is(rowErr, sql.ErrNoRows) {
168 return "", nil, fmt.Errorf("object %s not found", hex.EncodeToString(id[:])[:12])
169 }
170 if rowErr != nil {
171 return "", nil, rowErr
172 }
173
174 compressed, err = s.pack.read(packFile, offset)
175 if err != nil {
176 return "", nil, err
177 }
178
179 if len(deltaBaseIDRaw) > 0 {
180 if deltaDepth > deltaMaxDepth {
181 return "", nil, fmt.Errorf("pack object %s: delta chain depth %d exceeds limit %d",
182 hex.EncodeToString(id[:])[:12], deltaDepth, deltaMaxDepth)
183 }
184 deltaBytes, decErr := s.codec.Decompress(compressed)
185 if decErr != nil {
186 return "", nil, fmt.Errorf("decompress delta %s: %w", hex.EncodeToString(id[:])[:12], decErr)
187 }
188 var baseID [32]byte
189 copy(baseID[:], deltaBaseIDRaw)
190 _, baseRaw, baseErr := s.ReadObject(baseID)
191 if baseErr != nil {
192 return "", nil, fmt.Errorf("read delta base for %s: %w", hex.EncodeToString(id[:])[:12], baseErr)
193 }
194 raw, err = ApplyDelta(baseRaw, deltaBytes)
195 if err != nil {
196 return "", nil, fmt.Errorf("apply delta %s: %w", hex.EncodeToString(id[:])[:12], err)
197 }
198 return string(object.KindBlob), raw, nil
199 }
200
201 raw, err = s.codec.Decompress(compressed)
202 if err != nil {
203 return "", nil, fmt.Errorf("decompress pack object %s: %w", hex.EncodeToString(id[:])[:12], err)
204 }
205 return string(object.KindBlob), raw, nil
206}
207
208func (s *SQLiteStore) WriteObject(tx *Tx, id [32]byte, kind string, raw []byte) error {
209 compressed := s.codec.Compress(raw)
210
211 if len(raw) > s.packThreshold && kind == string(object.KindBlob) {
212 entry, err := s.pack.write(compressed, int64(len(raw)))
213 if err != nil {
214 return err
215 }
216 _, err = tx.sqlTx.Exec(
217 "INSERT OR IGNORE INTO pack_index (blob_id, pack_file, offset, raw_size) VALUES (?, ?, ?, ?)",
218 id[:], entry.packFile, entry.offset, entry.rawSize,
219 )
220 return err
221 }
222
223 _, err := tx.sqlTx.Exec(
224 "INSERT OR IGNORE INTO objects (id, kind, data) VALUES (?, ?, ?)",
225 id[:], kind, compressed,
226 )
227 return err
228}
229
230func (s *SQLiteStore) ListObjectsByKind(kind string) ([][32]byte, error) {
231 rows, err := s.db.Query("SELECT id FROM objects WHERE kind = ?", kind)
232 if err != nil {
233 return nil, err
234 }
235 defer rows.Close()
236 var ids [][32]byte
237 for rows.Next() {
238 var raw []byte
239 if err := rows.Scan(&raw); err != nil {
240 return nil, err
241 }
242 var id [32]byte
243 copy(id[:], raw)
244 ids = append(ids, id)
245 }
246 return ids, rows.Err()
247}
248
249func (s *SQLiteStore) GetBookmark(name string) (*Bookmark, error) {
250 var cid []byte
251 var remote sql.NullString
252 err := s.db.QueryRow("SELECT commit_id, remote FROM bookmarks WHERE name = ?", name).Scan(&cid, &remote)
253 if errors.Is(err, sql.ErrNoRows) {
254 return nil, nil
255 }
256 if err != nil {
257 return nil, err
258 }
259 b := &Bookmark{Name: name, Remote: remote.String}
260 copy(b.CommitID[:], cid)
261 return b, nil
262}
263
264func (s *SQLiteStore) SetBookmark(tx *Tx, b Bookmark) error {
265 var remote interface{}
266 if b.Remote != "" {
267 remote = b.Remote
268 }
269 _, err := tx.sqlTx.Exec(
270 "INSERT OR REPLACE INTO bookmarks (name, commit_id, remote) VALUES (?, ?, ?)",
271 b.Name, b.CommitID[:], remote,
272 )
273 return err
274}
275
276func (s *SQLiteStore) DeleteBookmark(tx *Tx, name string) error {
277 _, err := tx.sqlTx.Exec("DELETE FROM bookmarks WHERE name = ?", name)
278 return err
279}
280
281func (s *SQLiteStore) ListBookmarks() ([]Bookmark, error) {
282 rows, err := s.db.Query("SELECT name, commit_id, remote FROM bookmarks ORDER BY name")
283 if err != nil {
284 return nil, err
285 }
286 defer rows.Close()
287
288 var out []Bookmark
289 for rows.Next() {
290 var b Bookmark
291 var cid []byte
292 var remote sql.NullString
293 if err := rows.Scan(&b.Name, &cid, &remote); err != nil {
294 return nil, err
295 }
296 copy(b.CommitID[:], cid)
297 b.Remote = remote.String
298 out = append(out, b)
299 }
300 return out, rows.Err()
301}
302
303func (s *SQLiteStore) GetPhase(commitID [32]byte) (object.Phase, error) {
304 var phase int
305 err := s.db.QueryRow("SELECT phase FROM phases WHERE commit_id = ?", commitID[:]).Scan(&phase)
306 if errors.Is(err, sql.ErrNoRows) {
307 return object.PhaseDraft, nil
308 }
309
310 if err != nil {
311 return 0, err
312 }
313
314 return object.Phase(phase), nil
315}
316
317func (s *SQLiteStore) SetPhase(tx *Tx, commitID [32]byte, phase object.Phase) error {
318 _, err := tx.sqlTx.Exec(
319 "INSERT OR REPLACE INTO phases (commit_id, phase) VALUES (?, ?)",
320 commitID[:], int(phase),
321 )
322 return err
323}
324
325func (s *SQLiteStore) ListPublicCommitIDs() ([][32]byte, error) {
326 rows, err := s.db.Query("SELECT commit_id FROM phases WHERE phase = ?", int(object.PhasePublic))
327 if err != nil {
328 return nil, err
329 }
330 defer rows.Close()
331 var out [][32]byte
332 for rows.Next() {
333 var raw []byte
334 if err := rows.Scan(&raw); err != nil {
335 return nil, err
336 }
337 var id [32]byte
338 copy(id[:], raw)
339 out = append(out, id)
340 }
341 return out, rows.Err()
342}
343
344func (s *SQLiteStore) ListSecretCommitIDs() ([][32]byte, error) {
345 rows, err := s.db.Query("SELECT commit_id FROM phases WHERE phase = ?", int(object.PhaseSecret))
346 if err != nil {
347 return nil, err
348 }
349 defer rows.Close()
350 var out [][32]byte
351 for rows.Next() {
352 var raw []byte
353 if err := rows.Scan(&raw); err != nil {
354 return nil, err
355 }
356 var id [32]byte
357 copy(id[:], raw)
358 out = append(out, id)
359 }
360 return out, rows.Err()
361}
362
363func (s *SQLiteStore) AllocChangeID(tx *Tx) (string, error) {
364 for length := 8; length <= 32; length += 2 {
365 id := object.NewChangeID(length)
366 _, err := tx.sqlTx.Exec("INSERT INTO changes (change_id, commit_id) VALUES (?, NULL)", id)
367 if err == nil {
368 return id, nil
369 }
370 if !isSQLiteConstraintError(err) {
371 return "", fmt.Errorf("alloc change ID: %w", err)
372 }
373 }
374 return "", errors.New("change ID allocation failed after max retries")
375}
376
377func (s *SQLiteStore) GetChangeCommit(changeID string) ([32]byte, error) {
378 var rows *sql.Rows
379 var err error
380 if len(changeID) < 8 {
381 rows, err = s.db.Query(
382 "SELECT change_id, commit_id FROM changes WHERE change_id LIKE ? AND commit_id IS NOT NULL",
383 changeID+"%",
384 )
385 } else {
386 rows, err = s.db.Query(
387 "SELECT change_id, commit_id FROM changes WHERE change_id = ? AND commit_id IS NOT NULL",
388 changeID,
389 )
390 }
391
392 if err != nil {
393 return object.ZeroID, err
394 }
395 defer rows.Close()
396
397 var found [32]byte
398 var count int
399 for rows.Next() {
400 var cid []byte
401 var chid string
402 if err := rows.Scan(&chid, &cid); err != nil {
403 return object.ZeroID, err
404 }
405 copy(found[:], cid)
406 count++
407 }
408
409 if err := rows.Err(); err != nil {
410 return object.ZeroID, err
411 }
412
413 if count == 0 {
414 return object.ZeroID, sql.ErrNoRows
415 }
416
417 if count > 1 {
418 return object.ZeroID, fmt.Errorf("ambiguous change ID prefix %q matches %d changes", changeID, count)
419 }
420
421 return found, nil
422}
423
424func (s *SQLiteStore) SetChangeCommit(tx *Tx, changeID string, commitID [32]byte) error {
425 _, err := tx.sqlTx.Exec(
426 "UPDATE changes SET commit_id = ? WHERE change_id = ?",
427 commitID[:], changeID,
428 )
429 return err
430}
431
432func (s *SQLiteStore) ListChanges() ([]Bookmark, error) {
433 rows, err := s.db.Query("SELECT change_id, commit_id FROM changes WHERE commit_id IS NOT NULL")
434 if err != nil {
435 return nil, err
436 }
437 defer rows.Close()
438 var out []Bookmark
439 for rows.Next() {
440 var name string
441 var commitID []byte
442 if err := rows.Scan(&name, &commitID); err != nil {
443 return nil, err
444 }
445 var id [32]byte
446 copy(id[:], commitID)
447 out = append(out, Bookmark{Name: name, CommitID: id})
448 }
449 return out, rows.Err()
450}
451
452func (s *SQLiteStore) GetWCacheEntry(path string) (*WCacheEntry, error) {
453 var e WCacheEntry
454 var blobID []byte
455 var dirty int
456 err := s.db.QueryRow(
457 "SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache WHERE path = ?", path,
458 ).Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty)
459 if errors.Is(err, sql.ErrNoRows) {
460 return nil, nil
461 }
462 if err != nil {
463 return nil, err
464 }
465 copy(e.BlobID[:], blobID)
466 e.Dirty = dirty != 0
467 return &e, nil
468}
469
470func (s *SQLiteStore) SetWCacheEntry(tx *Tx, e WCacheEntry) error {
471 dirty := 0
472 if e.Dirty {
473 dirty = 1
474 }
475 _, err := tx.sqlTx.Exec(
476 "INSERT OR REPLACE INTO wcache (path, inode, mtime_ns, size, blob_id, mode, dirty) VALUES (?, ?, ?, ?, ?, ?, ?)",
477 e.Path, e.Inode, e.MtimeNs, e.Size, e.BlobID[:], e.Mode, dirty,
478 )
479 return err
480}
481
482func (s *SQLiteStore) DeleteWCacheEntry(tx *Tx, path string) error {
483 _, err := tx.sqlTx.Exec("DELETE FROM wcache WHERE path = ?", path)
484 return err
485}
486
487func (s *SQLiteStore) ListWCacheEntries() ([]WCacheEntry, error) {
488 rows, err := s.db.Query("SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache ORDER BY path")
489 if err != nil {
490 return nil, err
491 }
492 defer rows.Close()
493
494 var out []WCacheEntry
495 for rows.Next() {
496 var e WCacheEntry
497 var blobID []byte
498 var dirty int
499 if err := rows.Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty); err != nil {
500 return nil, err
501 }
502 copy(e.BlobID[:], blobID)
503 e.Dirty = dirty != 0
504 out = append(out, e)
505 }
506 return out, rows.Err()
507}
508
509func (s *SQLiteStore) ClearWCache(tx *Tx) error {
510 _, err := tx.sqlTx.Exec("DELETE FROM wcache")
511 return err
512}
513
514func (s *SQLiteStore) MarkWCacheDirty(path string) error {
515 _, err := s.db.Exec(
516 `INSERT INTO wcache (path, inode, mtime_ns, size, blob_id, mode, dirty)
517 VALUES (?, 0, 0, 0, zeroblob(32), 0, 1)
518 ON CONFLICT(path) DO UPDATE SET dirty = 1`,
519 path,
520 )
521 return err
522}
523
524func (s *SQLiteStore) ListDirtyWCacheEntries() ([]WCacheEntry, error) {
525 rows, err := s.db.Query("SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache WHERE dirty = 1 ORDER BY path")
526 if err != nil {
527 return nil, err
528 }
529 defer rows.Close()
530
531 var out []WCacheEntry
532 for rows.Next() {
533 var e WCacheEntry
534 var blobID []byte
535 var dirty int
536 if err := rows.Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty); err != nil {
537 return nil, err
538 }
539 copy(e.BlobID[:], blobID)
540 e.Dirty = dirty != 0
541 out = append(out, e)
542 }
543 return out, rows.Err()
544}
545
546func (s *SQLiteStore) ClearWCacheDirtyFlags(tx *Tx) error {
547 _, err := tx.sqlTx.Exec("UPDATE wcache SET dirty = 0 WHERE dirty = 1")
548 return err
549}
550
551func (s *SQLiteStore) InsertOperation(tx *Tx, op Operation) (int64, error) {
552 if op.Timestamp == 0 {
553 op.Timestamp = time.Now().Unix()
554 }
555 result, err := tx.sqlTx.Exec(
556 "INSERT INTO operations (kind, timestamp, before, after, metadata) VALUES (?, ?, ?, ?, ?)",
557 op.Kind, op.Timestamp, op.Before, op.After, nullableString(op.Metadata),
558 )
559 if err != nil {
560 return 0, err
561 }
562 return result.LastInsertId()
563}
564
565func (s *SQLiteStore) ListOperations(n int) ([]Operation, error) {
566 query := "SELECT seq, kind, timestamp, before, after, metadata FROM operations ORDER BY seq DESC"
567 if n > 0 {
568 query += fmt.Sprintf(" LIMIT %d", n)
569 }
570 rows, err := s.db.Query(query)
571 if err != nil {
572 return nil, err
573 }
574 defer rows.Close()
575
576 var out []Operation
577 for rows.Next() {
578 var op Operation
579 var meta sql.NullString
580 if err := rows.Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta); err != nil {
581 return nil, err
582 }
583 op.Metadata = meta.String
584 out = append(out, op)
585 }
586 return out, rows.Err()
587}
588
589func (s *SQLiteStore) GetOperation(seq int64) (*Operation, error) {
590 var op Operation
591 var meta sql.NullString
592 err := s.db.QueryRow(
593 "SELECT seq, kind, timestamp, before, after, metadata FROM operations WHERE seq = ?", seq,
594 ).Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta)
595 if errors.Is(err, sql.ErrNoRows) {
596 return nil, nil
597 }
598 if err != nil {
599 return nil, err
600 }
601 op.Metadata = meta.String
602 return &op, nil
603}
604
605func (s *SQLiteStore) GetLastOperation() (*Operation, error) {
606 var op Operation
607 var meta sql.NullString
608 err := s.db.QueryRow(
609 "SELECT seq, kind, timestamp, before, after, metadata FROM operations ORDER BY seq DESC LIMIT 1",
610 ).Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta)
611 if errors.Is(err, sql.ErrNoRows) {
612 return nil, nil
613 }
614 if err != nil {
615 return nil, err
616 }
617 op.Metadata = meta.String
618 return &op, nil
619}
620
621func isSQLiteConstraintError(err error) bool {
622 if err == nil {
623 return false
624 }
625 return strings.Contains(err.Error(), "UNIQUE constraint failed") ||
626 strings.Contains(err.Error(), "constraint failed")
627}
628
629func nullableString(s string) interface{} {
630 if s == "" {
631 return nil
632 }
633 return s
634}
635
636func (s *SQLiteStore) AcquireLock(tx *Tx, path, owner, comment string) error {
637 var existingOwner string
638 err := tx.sqlTx.QueryRow("SELECT owner FROM file_locks WHERE path = ?", path).Scan(&existingOwner)
639 if err == nil && existingOwner != owner {
640 return fmt.Errorf("file %q is locked by %q", path, existingOwner)
641 }
642 _, err = tx.sqlTx.Exec(`
643 INSERT INTO file_locks (path, owner, acquired_at, comment) VALUES (?, ?, ?, ?)
644 ON CONFLICT(path) DO UPDATE SET acquired_at = excluded.acquired_at, comment = excluded.comment`,
645 path, owner, time.Now().Unix(), comment,
646 )
647 return err
648}
649
650func (s *SQLiteStore) ReleaseLock(tx *Tx, path, owner string) error {
651 res, err := tx.sqlTx.Exec("DELETE FROM file_locks WHERE path = ? AND owner = ?", path, owner)
652 if err != nil {
653 return err
654 }
655 n, _ := res.RowsAffected()
656 if n == 0 {
657 return fmt.Errorf("lock on %q is not held by %q", path, owner)
658 }
659 return nil
660}
661
662func (s *SQLiteStore) ReleaseLockAdmin(tx *Tx, path string) error {
663 _, err := tx.sqlTx.Exec("DELETE FROM file_locks WHERE path = ?", path)
664 return err
665}
666
667func (s *SQLiteStore) GetLock(path string) (*FileLock, error) {
668 var l FileLock
669 var comment sql.NullString
670 err := s.db.QueryRow(
671 "SELECT path, owner, acquired_at, comment FROM file_locks WHERE path = ?", path,
672 ).Scan(&l.Path, &l.Owner, &l.AcquiredAt, &comment)
673 if errors.Is(err, sql.ErrNoRows) {
674 return nil, nil
675 }
676 if err != nil {
677 return nil, err
678 }
679 l.Comment = comment.String
680 return &l, nil
681}
682
683func (s *SQLiteStore) ListLocks() ([]FileLock, error) {
684 rows, err := s.db.Query(
685 "SELECT path, owner, acquired_at, comment FROM file_locks ORDER BY acquired_at DESC",
686 )
687 if err != nil {
688 return nil, err
689 }
690 defer rows.Close()
691 var out []FileLock
692 for rows.Next() {
693 var l FileLock
694 var comment sql.NullString
695 if err := rows.Scan(&l.Path, &l.Owner, &l.AcquiredAt, &comment); err != nil {
696 return nil, err
697 }
698 l.Comment = comment.String
699 out = append(out, l)
700 }
701 return out, rows.Err()
702}
703
704func (s *SQLiteStore) TrainAndSaveDict() error {
705 rows, err := s.db.Query(
706 "SELECT data FROM objects WHERE kind = 'blob' ORDER BY RANDOM() LIMIT 200",
707 )
708 if err != nil {
709 return fmt.Errorf("sample blobs: %w", err)
710 }
711 defer rows.Close()
712
713 var samples [][]byte
714 for rows.Next() {
715 var compressed []byte
716 if scanErr := rows.Scan(&compressed); scanErr != nil {
717 return scanErr
718 }
719 raw, decErr := s.codec.Decompress(compressed)
720 if decErr == nil && len(raw) > 0 {
721 samples = append(samples, raw)
722 }
723 }
724 if err := rows.Err(); err != nil {
725 return err
726 }
727 if len(samples) < 5 {
728 return fmt.Errorf("not enough blobs to train dictionary (need at least 5, found %d)", len(samples))
729 }
730
731 dict, err := zstd.BuildDict(zstd.BuildDictOptions{
732 Contents: samples,
733 })
734 if err != nil {
735 return fmt.Errorf("build dict: %w", err)
736 }
737
738 if _, err := s.db.Exec(
739 "INSERT INTO zstd_dicts (created_at, dict) VALUES (?, ?)",
740 time.Now().Unix(), dict,
741 ); err != nil {
742 return fmt.Errorf("save dict: %w", err)
743 }
744
745 s.codec.Close()
746 newCD, err := newCodec(s.compressionName, dict)
747 if err != nil {
748 return fmt.Errorf("reload codec: %w", err)
749 }
750 s.codec = newCD
751 return nil
752}