arche / internal/store/sqlite.go

commit 154431fd
  1package store
  2
  3import (
  4	"database/sql"
  5	"encoding/hex"
  6	"errors"
  7	"fmt"
  8	"strings"
  9	"time"
 10
 11	"arche/internal/object"
 12	"arche/internal/store/migrate"
 13
 14	"github.com/klauspost/compress/zstd"
 15	_ "github.com/mattn/go-sqlite3"
 16)
 17
 18const defaultPackThreshold = 128 * 1024
 19
 20type SQLiteStore struct {
 21	db              *sql.DB
 22	pack            *packManager
 23	codec           codec
 24	packDir         string
 25	packThreshold   int
 26	compressionName string
 27}
 28
 29func OpenSQLiteStore(dbPath, packDir string, packThreshold, packSealSize int, compression string) (*SQLiteStore, error) {
 30	if packThreshold <= 0 {
 31		packThreshold = defaultPackThreshold
 32	}
 33	db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000")
 34	if err != nil {
 35		return nil, fmt.Errorf("store open %s: %w", dbPath, err)
 36	}
 37	db.SetMaxOpenConns(1)
 38
 39	if _, err := db.Exec("PRAGMA journal_mode = WAL; PRAGMA foreign_keys = ON;"); err != nil {
 40		db.Close()
 41		return nil, fmt.Errorf("store pragma: %w", err)
 42	}
 43
 44	if err := migrate.Run(db); err != nil {
 45		db.Close()
 46		return nil, fmt.Errorf("store migrate: %w", err)
 47	}
 48
 49	pm, err := newPackManager(packDir, packSealSize)
 50	if err != nil {
 51		db.Close()
 52		return nil, err
 53	}
 54
 55	var dictData []byte
 56	_ = db.QueryRow("SELECT dict FROM zstd_dicts ORDER BY id DESC LIMIT 1").Scan(&dictData)
 57
 58	cd, err := newCodec(compression, dictData)
 59	if err != nil {
 60		db.Close()
 61		pm.close()
 62		return nil, err
 63	}
 64
 65	return &SQLiteStore{
 66		db:              db,
 67		pack:            pm,
 68		codec:           cd,
 69		packDir:         packDir,
 70		packThreshold:   packThreshold,
 71		compressionName: compression,
 72	}, nil
 73}
 74
 75func (s *SQLiteStore) Begin() (*Tx, error) {
 76	sqlTx, err := s.db.Begin()
 77	if err != nil {
 78		return nil, fmt.Errorf("begin tx: %w", err)
 79	}
 80	return &Tx{sqlTx: sqlTx}, nil
 81}
 82
 83func (s *SQLiteStore) Commit(tx *Tx) error {
 84	return tx.sqlTx.Commit()
 85}
 86
 87func (s *SQLiteStore) Rollback(tx *Tx) error {
 88	return tx.sqlTx.Rollback()
 89}
 90
 91func (s *SQLiteStore) Close() error {
 92	s.pack.close()
 93	s.codec.Close()
 94	return s.db.Close()
 95}
 96
 97func (s *SQLiteStore) AddConflict(tx *Tx, path string) error {
 98	_, err := tx.sqlTx.Exec("INSERT OR IGNORE INTO conflicts (path) VALUES (?)", path)
 99	return err
100}
101
102func (s *SQLiteStore) ClearConflict(tx *Tx, path string) error {
103	_, err := tx.sqlTx.Exec("DELETE FROM conflicts WHERE path = ?", path)
104	return err
105}
106
107func (s *SQLiteStore) ClearAllConflicts(tx *Tx) error {
108	_, err := tx.sqlTx.Exec("DELETE FROM conflicts")
109	return err
110}
111
112func (s *SQLiteStore) ListConflicts() ([]string, error) {
113	rows, err := s.db.Query("SELECT path FROM conflicts ORDER BY path")
114	if err != nil {
115		return nil, err
116	}
117	defer rows.Close()
118	var out []string
119	for rows.Next() {
120		var p string
121		if err := rows.Scan(&p); err != nil {
122			return nil, err
123		}
124		out = append(out, p)
125	}
126	return out, rows.Err()
127}
128
129func (s *SQLiteStore) HasObject(id [32]byte) (bool, error) {
130	var count int
131	err := s.db.QueryRow("SELECT COUNT(*) FROM objects WHERE id = ?", id[:]).Scan(&count)
132	if err != nil {
133		return false, err
134	}
135	if count > 0 {
136		return true, nil
137	}
138
139	err = s.db.QueryRow("SELECT COUNT(*) FROM pack_index WHERE blob_id = ?", id[:]).Scan(&count)
140	if err != nil {
141		return false, err
142	}
143	return count > 0, nil
144}
145
146func (s *SQLiteStore) ReadObject(id [32]byte) (kind string, raw []byte, err error) {
147	var compressed []byte
148	rowErr := s.db.QueryRow("SELECT kind, data FROM objects WHERE id = ?", id[:]).Scan(&kind, &compressed)
149	if rowErr == nil {
150		raw, err = s.codec.Decompress(compressed)
151		if err != nil {
152			return "", nil, fmt.Errorf("decompress object %s: %w", hex.EncodeToString(id[:])[:12], err)
153		}
154		return kind, raw, nil
155	}
156	if !errors.Is(rowErr, sql.ErrNoRows) {
157		return "", nil, rowErr
158	}
159
160	var packFile string
161	var offset, rawSize int64
162	var deltaBaseIDRaw []byte
163	var deltaDepth int
164	rowErr = s.db.QueryRow(
165		"SELECT pack_file, offset, raw_size, delta_base_id, delta_depth FROM pack_index WHERE blob_id = ?", id[:],
166	).Scan(&packFile, &offset, &rawSize, &deltaBaseIDRaw, &deltaDepth)
167	if errors.Is(rowErr, sql.ErrNoRows) {
168		return "", nil, fmt.Errorf("object %s not found", hex.EncodeToString(id[:])[:12])
169	}
170	if rowErr != nil {
171		return "", nil, rowErr
172	}
173
174	compressed, err = s.pack.read(packFile, offset)
175	if err != nil {
176		return "", nil, err
177	}
178
179	if len(deltaBaseIDRaw) > 0 {
180		if deltaDepth > deltaMaxDepth {
181			return "", nil, fmt.Errorf("pack object %s: delta chain depth %d exceeds limit %d",
182				hex.EncodeToString(id[:])[:12], deltaDepth, deltaMaxDepth)
183		}
184		deltaBytes, decErr := s.codec.Decompress(compressed)
185		if decErr != nil {
186			return "", nil, fmt.Errorf("decompress delta %s: %w", hex.EncodeToString(id[:])[:12], decErr)
187		}
188		var baseID [32]byte
189		copy(baseID[:], deltaBaseIDRaw)
190		_, baseRaw, baseErr := s.ReadObject(baseID)
191		if baseErr != nil {
192			return "", nil, fmt.Errorf("read delta base for %s: %w", hex.EncodeToString(id[:])[:12], baseErr)
193		}
194		raw, err = ApplyDelta(baseRaw, deltaBytes)
195		if err != nil {
196			return "", nil, fmt.Errorf("apply delta %s: %w", hex.EncodeToString(id[:])[:12], err)
197		}
198		return string(object.KindBlob), raw, nil
199	}
200
201	raw, err = s.codec.Decompress(compressed)
202	if err != nil {
203		return "", nil, fmt.Errorf("decompress pack object %s: %w", hex.EncodeToString(id[:])[:12], err)
204	}
205	return string(object.KindBlob), raw, nil
206}
207
208func (s *SQLiteStore) WriteObject(tx *Tx, id [32]byte, kind string, raw []byte) error {
209	compressed := s.codec.Compress(raw)
210
211	if len(raw) > s.packThreshold && kind == string(object.KindBlob) {
212		entry, err := s.pack.write(compressed, int64(len(raw)))
213		if err != nil {
214			return err
215		}
216		_, err = tx.sqlTx.Exec(
217			"INSERT OR IGNORE INTO pack_index (blob_id, pack_file, offset, raw_size) VALUES (?, ?, ?, ?)",
218			id[:], entry.packFile, entry.offset, entry.rawSize,
219		)
220		return err
221	}
222
223	_, err := tx.sqlTx.Exec(
224		"INSERT OR IGNORE INTO objects (id, kind, data) VALUES (?, ?, ?)",
225		id[:], kind, compressed,
226	)
227	return err
228}
229
230func (s *SQLiteStore) ListObjectsByKind(kind string) ([][32]byte, error) {
231	rows, err := s.db.Query("SELECT id FROM objects WHERE kind = ?", kind)
232	if err != nil {
233		return nil, err
234	}
235	defer rows.Close()
236	var ids [][32]byte
237	for rows.Next() {
238		var raw []byte
239		if err := rows.Scan(&raw); err != nil {
240			return nil, err
241		}
242		var id [32]byte
243		copy(id[:], raw)
244		ids = append(ids, id)
245	}
246	return ids, rows.Err()
247}
248
249func (s *SQLiteStore) GetBookmark(name string) (*Bookmark, error) {
250	var cid []byte
251	var remote sql.NullString
252	err := s.db.QueryRow("SELECT commit_id, remote FROM bookmarks WHERE name = ?", name).Scan(&cid, &remote)
253	if errors.Is(err, sql.ErrNoRows) {
254		return nil, nil
255	}
256	if err != nil {
257		return nil, err
258	}
259	b := &Bookmark{Name: name, Remote: remote.String}
260	copy(b.CommitID[:], cid)
261	return b, nil
262}
263
264func (s *SQLiteStore) SetBookmark(tx *Tx, b Bookmark) error {
265	var remote interface{}
266	if b.Remote != "" {
267		remote = b.Remote
268	}
269	_, err := tx.sqlTx.Exec(
270		"INSERT OR REPLACE INTO bookmarks (name, commit_id, remote) VALUES (?, ?, ?)",
271		b.Name, b.CommitID[:], remote,
272	)
273	return err
274}
275
276func (s *SQLiteStore) DeleteBookmark(tx *Tx, name string) error {
277	_, err := tx.sqlTx.Exec("DELETE FROM bookmarks WHERE name = ?", name)
278	return err
279}
280
281func (s *SQLiteStore) ListBookmarks() ([]Bookmark, error) {
282	rows, err := s.db.Query("SELECT name, commit_id, remote FROM bookmarks ORDER BY name")
283	if err != nil {
284		return nil, err
285	}
286	defer rows.Close()
287
288	var out []Bookmark
289	for rows.Next() {
290		var b Bookmark
291		var cid []byte
292		var remote sql.NullString
293		if err := rows.Scan(&b.Name, &cid, &remote); err != nil {
294			return nil, err
295		}
296		copy(b.CommitID[:], cid)
297		b.Remote = remote.String
298		out = append(out, b)
299	}
300	return out, rows.Err()
301}
302
303func (s *SQLiteStore) GetPhase(commitID [32]byte) (object.Phase, error) {
304	var phase int
305	err := s.db.QueryRow("SELECT phase FROM phases WHERE commit_id = ?", commitID[:]).Scan(&phase)
306	if errors.Is(err, sql.ErrNoRows) {
307		return object.PhaseDraft, nil
308	}
309
310	if err != nil {
311		return 0, err
312	}
313
314	return object.Phase(phase), nil
315}
316
317func (s *SQLiteStore) SetPhase(tx *Tx, commitID [32]byte, phase object.Phase) error {
318	_, err := tx.sqlTx.Exec(
319		"INSERT OR REPLACE INTO phases (commit_id, phase) VALUES (?, ?)",
320		commitID[:], int(phase),
321	)
322	return err
323}
324
325func (s *SQLiteStore) ListPublicCommitIDs() ([][32]byte, error) {
326	rows, err := s.db.Query("SELECT commit_id FROM phases WHERE phase = ?", int(object.PhasePublic))
327	if err != nil {
328		return nil, err
329	}
330	defer rows.Close()
331	var out [][32]byte
332	for rows.Next() {
333		var raw []byte
334		if err := rows.Scan(&raw); err != nil {
335			return nil, err
336		}
337		var id [32]byte
338		copy(id[:], raw)
339		out = append(out, id)
340	}
341	return out, rows.Err()
342}
343
344func (s *SQLiteStore) ListSecretCommitIDs() ([][32]byte, error) {
345	rows, err := s.db.Query("SELECT commit_id FROM phases WHERE phase = ?", int(object.PhaseSecret))
346	if err != nil {
347		return nil, err
348	}
349	defer rows.Close()
350	var out [][32]byte
351	for rows.Next() {
352		var raw []byte
353		if err := rows.Scan(&raw); err != nil {
354			return nil, err
355		}
356		var id [32]byte
357		copy(id[:], raw)
358		out = append(out, id)
359	}
360	return out, rows.Err()
361}
362
363func (s *SQLiteStore) AllocChangeID(tx *Tx) (string, error) {
364	for length := 8; length <= 32; length += 2 {
365		id := object.NewChangeID(length)
366		_, err := tx.sqlTx.Exec("INSERT INTO changes (change_id, commit_id) VALUES (?, NULL)", id)
367		if err == nil {
368			return id, nil
369		}
370		if !isSQLiteConstraintError(err) {
371			return "", fmt.Errorf("alloc change ID: %w", err)
372		}
373	}
374	return "", errors.New("change ID allocation failed after max retries")
375}
376
377func (s *SQLiteStore) GetChangeCommit(changeID string) ([32]byte, error) {
378	var rows *sql.Rows
379	var err error
380	if len(changeID) < 8 {
381		rows, err = s.db.Query(
382			"SELECT change_id, commit_id FROM changes WHERE change_id LIKE ? AND commit_id IS NOT NULL",
383			changeID+"%",
384		)
385	} else {
386		rows, err = s.db.Query(
387			"SELECT change_id, commit_id FROM changes WHERE change_id = ? AND commit_id IS NOT NULL",
388			changeID,
389		)
390	}
391
392	if err != nil {
393		return object.ZeroID, err
394	}
395	defer rows.Close()
396
397	var found [32]byte
398	var count int
399	for rows.Next() {
400		var cid []byte
401		var chid string
402		if err := rows.Scan(&chid, &cid); err != nil {
403			return object.ZeroID, err
404		}
405		copy(found[:], cid)
406		count++
407	}
408
409	if err := rows.Err(); err != nil {
410		return object.ZeroID, err
411	}
412
413	if count == 0 {
414		return object.ZeroID, sql.ErrNoRows
415	}
416
417	if count > 1 {
418		return object.ZeroID, fmt.Errorf("ambiguous change ID prefix %q matches %d changes", changeID, count)
419	}
420
421	return found, nil
422}
423
424func (s *SQLiteStore) SetChangeCommit(tx *Tx, changeID string, commitID [32]byte) error {
425	_, err := tx.sqlTx.Exec(
426		"UPDATE changes SET commit_id = ? WHERE change_id = ?",
427		commitID[:], changeID,
428	)
429	return err
430}
431
432func (s *SQLiteStore) ListChanges() ([]Bookmark, error) {
433	rows, err := s.db.Query("SELECT change_id, commit_id FROM changes WHERE commit_id IS NOT NULL")
434	if err != nil {
435		return nil, err
436	}
437	defer rows.Close()
438	var out []Bookmark
439	for rows.Next() {
440		var name string
441		var commitID []byte
442		if err := rows.Scan(&name, &commitID); err != nil {
443			return nil, err
444		}
445		var id [32]byte
446		copy(id[:], commitID)
447		out = append(out, Bookmark{Name: name, CommitID: id})
448	}
449	return out, rows.Err()
450}
451
452func (s *SQLiteStore) GetWCacheEntry(path string) (*WCacheEntry, error) {
453	var e WCacheEntry
454	var blobID []byte
455	var dirty int
456	err := s.db.QueryRow(
457		"SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache WHERE path = ?", path,
458	).Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty)
459	if errors.Is(err, sql.ErrNoRows) {
460		return nil, nil
461	}
462	if err != nil {
463		return nil, err
464	}
465	copy(e.BlobID[:], blobID)
466	e.Dirty = dirty != 0
467	return &e, nil
468}
469
470func (s *SQLiteStore) SetWCacheEntry(tx *Tx, e WCacheEntry) error {
471	dirty := 0
472	if e.Dirty {
473		dirty = 1
474	}
475	_, err := tx.sqlTx.Exec(
476		"INSERT OR REPLACE INTO wcache (path, inode, mtime_ns, size, blob_id, mode, dirty) VALUES (?, ?, ?, ?, ?, ?, ?)",
477		e.Path, e.Inode, e.MtimeNs, e.Size, e.BlobID[:], e.Mode, dirty,
478	)
479	return err
480}
481
482func (s *SQLiteStore) DeleteWCacheEntry(tx *Tx, path string) error {
483	_, err := tx.sqlTx.Exec("DELETE FROM wcache WHERE path = ?", path)
484	return err
485}
486
487func (s *SQLiteStore) ListWCacheEntries() ([]WCacheEntry, error) {
488	rows, err := s.db.Query("SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache ORDER BY path")
489	if err != nil {
490		return nil, err
491	}
492	defer rows.Close()
493
494	var out []WCacheEntry
495	for rows.Next() {
496		var e WCacheEntry
497		var blobID []byte
498		var dirty int
499		if err := rows.Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty); err != nil {
500			return nil, err
501		}
502		copy(e.BlobID[:], blobID)
503		e.Dirty = dirty != 0
504		out = append(out, e)
505	}
506	return out, rows.Err()
507}
508
509func (s *SQLiteStore) ClearWCache(tx *Tx) error {
510	_, err := tx.sqlTx.Exec("DELETE FROM wcache")
511	return err
512}
513
514func (s *SQLiteStore) MarkWCacheDirty(path string) error {
515	_, err := s.db.Exec(
516		`INSERT INTO wcache (path, inode, mtime_ns, size, blob_id, mode, dirty)
517		 VALUES (?, 0, 0, 0, zeroblob(32), 0, 1)
518		 ON CONFLICT(path) DO UPDATE SET dirty = 1`,
519		path,
520	)
521	return err
522}
523
524func (s *SQLiteStore) ListDirtyWCacheEntries() ([]WCacheEntry, error) {
525	rows, err := s.db.Query("SELECT path, inode, mtime_ns, size, blob_id, mode, dirty FROM wcache WHERE dirty = 1 ORDER BY path")
526	if err != nil {
527		return nil, err
528	}
529	defer rows.Close()
530
531	var out []WCacheEntry
532	for rows.Next() {
533		var e WCacheEntry
534		var blobID []byte
535		var dirty int
536		if err := rows.Scan(&e.Path, &e.Inode, &e.MtimeNs, &e.Size, &blobID, &e.Mode, &dirty); err != nil {
537			return nil, err
538		}
539		copy(e.BlobID[:], blobID)
540		e.Dirty = dirty != 0
541		out = append(out, e)
542	}
543	return out, rows.Err()
544}
545
546func (s *SQLiteStore) ClearWCacheDirtyFlags(tx *Tx) error {
547	_, err := tx.sqlTx.Exec("UPDATE wcache SET dirty = 0 WHERE dirty = 1")
548	return err
549}
550
551func (s *SQLiteStore) InsertOperation(tx *Tx, op Operation) (int64, error) {
552	if op.Timestamp == 0 {
553		op.Timestamp = time.Now().Unix()
554	}
555	result, err := tx.sqlTx.Exec(
556		"INSERT INTO operations (kind, timestamp, before, after, metadata) VALUES (?, ?, ?, ?, ?)",
557		op.Kind, op.Timestamp, op.Before, op.After, nullableString(op.Metadata),
558	)
559	if err != nil {
560		return 0, err
561	}
562	return result.LastInsertId()
563}
564
565func (s *SQLiteStore) ListOperations(n int) ([]Operation, error) {
566	query := "SELECT seq, kind, timestamp, before, after, metadata FROM operations ORDER BY seq DESC"
567	if n > 0 {
568		query += fmt.Sprintf(" LIMIT %d", n)
569	}
570	rows, err := s.db.Query(query)
571	if err != nil {
572		return nil, err
573	}
574	defer rows.Close()
575
576	var out []Operation
577	for rows.Next() {
578		var op Operation
579		var meta sql.NullString
580		if err := rows.Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta); err != nil {
581			return nil, err
582		}
583		op.Metadata = meta.String
584		out = append(out, op)
585	}
586	return out, rows.Err()
587}
588
589func (s *SQLiteStore) GetOperation(seq int64) (*Operation, error) {
590	var op Operation
591	var meta sql.NullString
592	err := s.db.QueryRow(
593		"SELECT seq, kind, timestamp, before, after, metadata FROM operations WHERE seq = ?", seq,
594	).Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta)
595	if errors.Is(err, sql.ErrNoRows) {
596		return nil, nil
597	}
598	if err != nil {
599		return nil, err
600	}
601	op.Metadata = meta.String
602	return &op, nil
603}
604
605func (s *SQLiteStore) GetLastOperation() (*Operation, error) {
606	var op Operation
607	var meta sql.NullString
608	err := s.db.QueryRow(
609		"SELECT seq, kind, timestamp, before, after, metadata FROM operations ORDER BY seq DESC LIMIT 1",
610	).Scan(&op.Seq, &op.Kind, &op.Timestamp, &op.Before, &op.After, &meta)
611	if errors.Is(err, sql.ErrNoRows) {
612		return nil, nil
613	}
614	if err != nil {
615		return nil, err
616	}
617	op.Metadata = meta.String
618	return &op, nil
619}
620
621func isSQLiteConstraintError(err error) bool {
622	if err == nil {
623		return false
624	}
625	return strings.Contains(err.Error(), "UNIQUE constraint failed") ||
626		strings.Contains(err.Error(), "constraint failed")
627}
628
629func nullableString(s string) interface{} {
630	if s == "" {
631		return nil
632	}
633	return s
634}
635
636func (s *SQLiteStore) AcquireLock(tx *Tx, path, owner, comment string) error {
637	var existingOwner string
638	err := tx.sqlTx.QueryRow("SELECT owner FROM file_locks WHERE path = ?", path).Scan(&existingOwner)
639	if err == nil && existingOwner != owner {
640		return fmt.Errorf("file %q is locked by %q", path, existingOwner)
641	}
642	_, err = tx.sqlTx.Exec(`
643		INSERT INTO file_locks (path, owner, acquired_at, comment) VALUES (?, ?, ?, ?)
644		ON CONFLICT(path) DO UPDATE SET acquired_at = excluded.acquired_at, comment = excluded.comment`,
645		path, owner, time.Now().Unix(), comment,
646	)
647	return err
648}
649
650func (s *SQLiteStore) ReleaseLock(tx *Tx, path, owner string) error {
651	res, err := tx.sqlTx.Exec("DELETE FROM file_locks WHERE path = ? AND owner = ?", path, owner)
652	if err != nil {
653		return err
654	}
655	n, _ := res.RowsAffected()
656	if n == 0 {
657		return fmt.Errorf("lock on %q is not held by %q", path, owner)
658	}
659	return nil
660}
661
662func (s *SQLiteStore) ReleaseLockAdmin(tx *Tx, path string) error {
663	_, err := tx.sqlTx.Exec("DELETE FROM file_locks WHERE path = ?", path)
664	return err
665}
666
667func (s *SQLiteStore) GetLock(path string) (*FileLock, error) {
668	var l FileLock
669	var comment sql.NullString
670	err := s.db.QueryRow(
671		"SELECT path, owner, acquired_at, comment FROM file_locks WHERE path = ?", path,
672	).Scan(&l.Path, &l.Owner, &l.AcquiredAt, &comment)
673	if errors.Is(err, sql.ErrNoRows) {
674		return nil, nil
675	}
676	if err != nil {
677		return nil, err
678	}
679	l.Comment = comment.String
680	return &l, nil
681}
682
683func (s *SQLiteStore) ListLocks() ([]FileLock, error) {
684	rows, err := s.db.Query(
685		"SELECT path, owner, acquired_at, comment FROM file_locks ORDER BY acquired_at DESC",
686	)
687	if err != nil {
688		return nil, err
689	}
690	defer rows.Close()
691	var out []FileLock
692	for rows.Next() {
693		var l FileLock
694		var comment sql.NullString
695		if err := rows.Scan(&l.Path, &l.Owner, &l.AcquiredAt, &comment); err != nil {
696			return nil, err
697		}
698		l.Comment = comment.String
699		out = append(out, l)
700	}
701	return out, rows.Err()
702}
703
704func (s *SQLiteStore) TrainAndSaveDict() error {
705	rows, err := s.db.Query(
706		"SELECT data FROM objects WHERE kind = 'blob' ORDER BY RANDOM() LIMIT 200",
707	)
708	if err != nil {
709		return fmt.Errorf("sample blobs: %w", err)
710	}
711	defer rows.Close()
712
713	var samples [][]byte
714	for rows.Next() {
715		var compressed []byte
716		if scanErr := rows.Scan(&compressed); scanErr != nil {
717			return scanErr
718		}
719		raw, decErr := s.codec.Decompress(compressed)
720		if decErr == nil && len(raw) > 0 {
721			samples = append(samples, raw)
722		}
723	}
724	if err := rows.Err(); err != nil {
725		return err
726	}
727	if len(samples) < 5 {
728		return fmt.Errorf("not enough blobs to train dictionary (need at least 5, found %d)", len(samples))
729	}
730
731	dict, err := zstd.BuildDict(zstd.BuildDictOptions{
732		Contents: samples,
733	})
734	if err != nil {
735		return fmt.Errorf("build dict: %w", err)
736	}
737
738	if _, err := s.db.Exec(
739		"INSERT INTO zstd_dicts (created_at, dict) VALUES (?, ?)",
740		time.Now().Unix(), dict,
741	); err != nil {
742		return fmt.Errorf("save dict: %w", err)
743	}
744
745	s.codec.Close()
746	newCD, err := newCodec(s.compressionName, dict)
747	if err != nil {
748		return fmt.Errorf("reload codec: %w", err)
749	}
750	s.codec = newCD
751	return nil
752}