arche / internal/issuedb/db.go

commit 154431fd
  1package issuedb
  2
  3import (
  4	"database/sql"
  5	_ "embed"
  6	"encoding/hex"
  7	"fmt"
  8	"path/filepath"
  9	"strconv"
 10	"time"
 11
 12	"arche/internal/issues"
 13	"arche/internal/object"
 14	"arche/internal/store"
 15	"arche/internal/wiki"
 16
 17	_ "github.com/mattn/go-sqlite3"
 18)
 19
 20//go:embed sql/001_initial.sql
 21var sql001 string
 22
 23//go:embed sql/002_meta_hwm.sql
 24var sql002 string
 25
 26type DB struct {
 27	Issues *issues.Store
 28	Wiki   *wiki.Store
 29	db     *sql.DB
 30}
 31
 32func Open(repoDir string) (*DB, error) {
 33	return openDB(repoDir, nil)
 34}
 35
 36func NewWithStore(repoDir string, st store.Store) (*DB, error) {
 37	return openDB(repoDir, st)
 38}
 39
 40type storeObjectSink struct {
 41	st store.Store
 42}
 43
 44func (s *storeObjectSink) WriteIssueEventObject(id [32]byte, raw []byte) error {
 45	if ok, _ := s.st.HasObject(id); ok {
 46		return nil
 47	}
 48	tx, err := s.st.Begin()
 49	if err != nil {
 50		return err
 51	}
 52	if err := s.st.WriteObject(tx, id, "issue-event", raw); err != nil {
 53		s.st.Rollback(tx) //nolint:errcheck
 54		return err
 55	}
 56	return s.st.Commit(tx)
 57}
 58
 59func openDB(repoDir string, st store.Store) (*DB, error) {
 60	path := filepath.Join(repoDir, "issues.db")
 61
 62	db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000")
 63	if err != nil {
 64		return nil, err
 65	}
 66
 67	db.SetMaxOpenConns(1)
 68
 69	if err := runMigrations(db); err != nil {
 70		db.Close()
 71		return nil, err
 72	}
 73
 74	issueStore := issues.New(db)
 75	if st != nil {
 76		sink := &storeObjectSink{st: st}
 77		issueStore.SetObjectSink(sink)
 78		if err := seedParentHashes(db, issueStore, st); err != nil {
 79			db.Close()
 80			return nil, fmt.Errorf("issuedb: seed parent hashes: %w", err)
 81		}
 82		if err := recoverFromStore(db, st); err != nil {
 83			db.Close()
 84			return nil, fmt.Errorf("issuedb: recovery pass: %w", err)
 85		}
 86	}
 87
 88	return &DB{
 89		Issues: issueStore,
 90		Wiki:   wiki.New(db),
 91		db:     db,
 92	}, nil
 93}
 94
 95func seedParentHashes(idb *sql.DB, st *issues.Store, objStore store.Store) error {
 96	rows, err := idb.Query(
 97		`SELECT issue_id, hlc_ms, hlc_seq, kind, payload, author
 98		   FROM issue_events
 99		  ORDER BY hlc_ms ASC, hlc_seq ASC`,
100	)
101	if err != nil {
102		return err
103	}
104	defer rows.Close()
105
106	type evt struct {
107		issueID string
108		hlcMS   int64
109		hlcSeq  int
110		kind    string
111		payload []byte
112		author  string
113	}
114	var evts []evt
115	for rows.Next() {
116		var e evt
117		var payload string
118		if err := rows.Scan(&e.issueID, &e.hlcMS, &e.hlcSeq, &e.kind, &payload, &e.author); err != nil {
119			return err
120		}
121		e.payload = []byte(payload)
122		evts = append(evts, e)
123	}
124	if err := rows.Err(); err != nil {
125		return err
126	}
127
128	last := map[string][32]byte{}
129	for _, e := range evts {
130		obj := &object.IssueEventObject{
131			IssueID: e.issueID,
132			Kind:    e.kind,
133			Payload: e.payload,
134			Author:  e.author,
135			HLCMS:   e.hlcMS,
136			HLCSeq:  e.hlcSeq,
137		}
138		if prev, ok := last[e.issueID]; ok {
139			obj.Parents = [][32]byte{prev}
140		}
141		id := object.HashIssueEvent(obj)
142		if ok, _ := objStore.HasObject(id); ok {
143			last[e.issueID] = id
144		}
145	}
146
147	for issueID, h := range last {
148		st.SetParentHash(issueID, h)
149	}
150	return nil
151}
152
153func recoverFromStore(idb *sql.DB, st store.Store) error {
154	var hwmMS int64
155	var hwmSeq int
156	row := idb.QueryRow(`SELECT value FROM meta WHERE key='hwm_ms'`)
157	var hwmMSStr string
158	if err := row.Scan(&hwmMSStr); err == nil {
159		hwmMS, _ = strconv.ParseInt(hwmMSStr, 10, 64)
160	}
161	row = idb.QueryRow(`SELECT value FROM meta WHERE key='hwm_seq'`)
162	var hwmSeqStr string
163	if err := row.Scan(&hwmSeqStr); err == nil {
164		hwmSeqVal, _ := strconv.ParseInt(hwmSeqStr, 10, 64)
165		hwmSeq = int(hwmSeqVal)
166	}
167
168	allIDs, err := st.ListObjectsByKind(string(object.KindIssueEvent))
169	if err != nil {
170		return err
171	}
172
173	var newEvs []issues.IssueEvent
174	var newHWM struct {
175		ms  int64
176		seq int
177	}
178	newHWM.ms = hwmMS
179	newHWM.seq = hwmSeq
180
181	for _, id := range allIDs {
182		_, raw, err := st.ReadObject(id)
183		if err != nil {
184			continue
185		}
186		obj, err := object.DecodeIssueEvent(raw)
187		if err != nil {
188			continue
189		}
190		if obj.HLCMS < hwmMS || (obj.HLCMS == hwmMS && obj.HLCSeq <= hwmSeq) {
191			continue
192		}
193		eventID := hex.EncodeToString(id[:])
194		var exists int
195		err = idb.QueryRow(`SELECT COUNT(*) FROM issue_events WHERE event_id=?`, eventID).Scan(&exists)
196		if err != nil || exists > 0 {
197			continue
198		}
199		newEvs = append(newEvs, issues.IssueEvent{
200			EventID: eventID,
201			IssueID: obj.IssueID,
202			HLCMS:   obj.HLCMS,
203			HLCSeq:  obj.HLCSeq,
204			Kind:    obj.Kind,
205			Payload: obj.Payload,
206			Author:  obj.Author,
207			Created: 0,
208		})
209		if obj.HLCMS > newHWM.ms || (obj.HLCMS == newHWM.ms && obj.HLCSeq > newHWM.seq) {
210			newHWM.ms = obj.HLCMS
211			newHWM.seq = obj.HLCSeq
212		}
213	}
214
215	if len(newEvs) == 0 {
216		return nil
217	}
218
219	tx, err := idb.Begin()
220	if err != nil {
221		return err
222	}
223
224	for _, ev := range newEvs {
225		_, err := tx.Exec(
226			`INSERT OR IGNORE INTO issue_events (event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created) VALUES (?,?,?,?,?,?,?,?)`,
227			ev.EventID, ev.IssueID, ev.HLCMS, ev.HLCSeq, ev.Kind, string(ev.Payload), ev.Author, ev.Created,
228		)
229		if err != nil {
230			tx.Rollback() //nolint:errcheck
231			return fmt.Errorf("recovery: insert event %s: %w", ev.EventID, err)
232		}
233	}
234
235	if _, err := tx.Exec(`INSERT OR REPLACE INTO meta (key,value) VALUES ('hwm_ms',?)`, strconv.FormatInt(newHWM.ms, 10)); err != nil {
236		tx.Rollback() //nolint:errcheck
237		return err
238	}
239
240	if _, err := tx.Exec(`INSERT OR REPLACE INTO meta (key,value) VALUES ('hwm_seq',?)`, strconv.Itoa(newHWM.seq)); err != nil {
241		tx.Rollback() //nolint:errcheck
242		return err
243	}
244
245	return tx.Commit()
246}
247
248func (d *DB) Close() error {
249	return d.db.Close()
250}
251
252type migration struct {
253	version int
254	sql     string
255}
256
257var all = []migration{
258	{1, sql001},
259	{2, sql002},
260}
261
262func runMigrations(db *sql.DB) error {
263	if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_migrations (
264		version    INTEGER PRIMARY KEY,
265		applied_at INTEGER NOT NULL
266	)`); err != nil {
267		return err
268	}
269
270	rows, err := db.Query("SELECT version FROM schema_migrations")
271	if err != nil {
272		return err
273	}
274	defer rows.Close()
275
276	applied := map[int]bool{}
277	for rows.Next() {
278		var v int
279		if err := rows.Scan(&v); err != nil {
280			return err
281		}
282		applied[v] = true
283	}
284
285	for _, m := range all {
286		if applied[m.version] {
287			continue
288		}
289
290		tx, err := db.Begin()
291		if err != nil {
292			return err
293		}
294
295		if _, err := tx.Exec(m.sql); err != nil {
296			tx.Rollback()
297			return fmt.Errorf("issuedb migrate v%d: %w", m.version, err)
298		}
299
300		if _, err := tx.Exec("INSERT INTO schema_migrations (version, applied_at) VALUES (?,?)", m.version, time.Now().Unix()); err != nil {
301			tx.Rollback()
302			return fmt.Errorf("issuedb migrate v%d record: %w", m.version, err)
303		}
304
305		if err := tx.Commit(); err != nil {
306			return err
307		}
308	}
309
310	return nil
311}