1package issuedb
2
3import (
4 "database/sql"
5 _ "embed"
6 "encoding/hex"
7 "fmt"
8 "path/filepath"
9 "strconv"
10 "time"
11
12 "arche/internal/issues"
13 "arche/internal/object"
14 "arche/internal/store"
15 "arche/internal/wiki"
16
17 _ "github.com/mattn/go-sqlite3"
18)
19
20//go:embed sql/001_initial.sql
21var sql001 string
22
23//go:embed sql/002_meta_hwm.sql
24var sql002 string
25
26type DB struct {
27 Issues *issues.Store
28 Wiki *wiki.Store
29 db *sql.DB
30}
31
32func Open(repoDir string) (*DB, error) {
33 return openDB(repoDir, nil)
34}
35
36func NewWithStore(repoDir string, st store.Store) (*DB, error) {
37 return openDB(repoDir, st)
38}
39
40type storeObjectSink struct {
41 st store.Store
42}
43
44func (s *storeObjectSink) WriteIssueEventObject(id [32]byte, raw []byte) error {
45 if ok, _ := s.st.HasObject(id); ok {
46 return nil
47 }
48 tx, err := s.st.Begin()
49 if err != nil {
50 return err
51 }
52 if err := s.st.WriteObject(tx, id, "issue-event", raw); err != nil {
53 s.st.Rollback(tx) //nolint:errcheck
54 return err
55 }
56 return s.st.Commit(tx)
57}
58
59func openDB(repoDir string, st store.Store) (*DB, error) {
60 path := filepath.Join(repoDir, "issues.db")
61
62 db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000")
63 if err != nil {
64 return nil, err
65 }
66
67 db.SetMaxOpenConns(1)
68
69 if err := runMigrations(db); err != nil {
70 db.Close()
71 return nil, err
72 }
73
74 issueStore := issues.New(db)
75 if st != nil {
76 sink := &storeObjectSink{st: st}
77 issueStore.SetObjectSink(sink)
78 if err := seedParentHashes(db, issueStore, st); err != nil {
79 db.Close()
80 return nil, fmt.Errorf("issuedb: seed parent hashes: %w", err)
81 }
82 if err := recoverFromStore(db, st); err != nil {
83 db.Close()
84 return nil, fmt.Errorf("issuedb: recovery pass: %w", err)
85 }
86 }
87
88 return &DB{
89 Issues: issueStore,
90 Wiki: wiki.New(db),
91 db: db,
92 }, nil
93}
94
95func seedParentHashes(idb *sql.DB, st *issues.Store, objStore store.Store) error {
96 rows, err := idb.Query(
97 `SELECT issue_id, hlc_ms, hlc_seq, kind, payload, author
98 FROM issue_events
99 ORDER BY hlc_ms ASC, hlc_seq ASC`,
100 )
101 if err != nil {
102 return err
103 }
104 defer rows.Close()
105
106 type evt struct {
107 issueID string
108 hlcMS int64
109 hlcSeq int
110 kind string
111 payload []byte
112 author string
113 }
114 var evts []evt
115 for rows.Next() {
116 var e evt
117 var payload string
118 if err := rows.Scan(&e.issueID, &e.hlcMS, &e.hlcSeq, &e.kind, &payload, &e.author); err != nil {
119 return err
120 }
121 e.payload = []byte(payload)
122 evts = append(evts, e)
123 }
124 if err := rows.Err(); err != nil {
125 return err
126 }
127
128 last := map[string][32]byte{}
129 for _, e := range evts {
130 obj := &object.IssueEventObject{
131 IssueID: e.issueID,
132 Kind: e.kind,
133 Payload: e.payload,
134 Author: e.author,
135 HLCMS: e.hlcMS,
136 HLCSeq: e.hlcSeq,
137 }
138 if prev, ok := last[e.issueID]; ok {
139 obj.Parents = [][32]byte{prev}
140 }
141 id := object.HashIssueEvent(obj)
142 if ok, _ := objStore.HasObject(id); ok {
143 last[e.issueID] = id
144 }
145 }
146
147 for issueID, h := range last {
148 st.SetParentHash(issueID, h)
149 }
150 return nil
151}
152
153func recoverFromStore(idb *sql.DB, st store.Store) error {
154 var hwmMS int64
155 var hwmSeq int
156 row := idb.QueryRow(`SELECT value FROM meta WHERE key='hwm_ms'`)
157 var hwmMSStr string
158 if err := row.Scan(&hwmMSStr); err == nil {
159 hwmMS, _ = strconv.ParseInt(hwmMSStr, 10, 64)
160 }
161 row = idb.QueryRow(`SELECT value FROM meta WHERE key='hwm_seq'`)
162 var hwmSeqStr string
163 if err := row.Scan(&hwmSeqStr); err == nil {
164 hwmSeqVal, _ := strconv.ParseInt(hwmSeqStr, 10, 64)
165 hwmSeq = int(hwmSeqVal)
166 }
167
168 allIDs, err := st.ListObjectsByKind(string(object.KindIssueEvent))
169 if err != nil {
170 return err
171 }
172
173 var newEvs []issues.IssueEvent
174 var newHWM struct {
175 ms int64
176 seq int
177 }
178 newHWM.ms = hwmMS
179 newHWM.seq = hwmSeq
180
181 for _, id := range allIDs {
182 _, raw, err := st.ReadObject(id)
183 if err != nil {
184 continue
185 }
186 obj, err := object.DecodeIssueEvent(raw)
187 if err != nil {
188 continue
189 }
190 if obj.HLCMS < hwmMS || (obj.HLCMS == hwmMS && obj.HLCSeq <= hwmSeq) {
191 continue
192 }
193 eventID := hex.EncodeToString(id[:])
194 var exists int
195 err = idb.QueryRow(`SELECT COUNT(*) FROM issue_events WHERE event_id=?`, eventID).Scan(&exists)
196 if err != nil || exists > 0 {
197 continue
198 }
199 newEvs = append(newEvs, issues.IssueEvent{
200 EventID: eventID,
201 IssueID: obj.IssueID,
202 HLCMS: obj.HLCMS,
203 HLCSeq: obj.HLCSeq,
204 Kind: obj.Kind,
205 Payload: obj.Payload,
206 Author: obj.Author,
207 Created: 0,
208 })
209 if obj.HLCMS > newHWM.ms || (obj.HLCMS == newHWM.ms && obj.HLCSeq > newHWM.seq) {
210 newHWM.ms = obj.HLCMS
211 newHWM.seq = obj.HLCSeq
212 }
213 }
214
215 if len(newEvs) == 0 {
216 return nil
217 }
218
219 tx, err := idb.Begin()
220 if err != nil {
221 return err
222 }
223
224 for _, ev := range newEvs {
225 _, err := tx.Exec(
226 `INSERT OR IGNORE INTO issue_events (event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created) VALUES (?,?,?,?,?,?,?,?)`,
227 ev.EventID, ev.IssueID, ev.HLCMS, ev.HLCSeq, ev.Kind, string(ev.Payload), ev.Author, ev.Created,
228 )
229 if err != nil {
230 tx.Rollback() //nolint:errcheck
231 return fmt.Errorf("recovery: insert event %s: %w", ev.EventID, err)
232 }
233 }
234
235 if _, err := tx.Exec(`INSERT OR REPLACE INTO meta (key,value) VALUES ('hwm_ms',?)`, strconv.FormatInt(newHWM.ms, 10)); err != nil {
236 tx.Rollback() //nolint:errcheck
237 return err
238 }
239
240 if _, err := tx.Exec(`INSERT OR REPLACE INTO meta (key,value) VALUES ('hwm_seq',?)`, strconv.Itoa(newHWM.seq)); err != nil {
241 tx.Rollback() //nolint:errcheck
242 return err
243 }
244
245 return tx.Commit()
246}
247
248func (d *DB) Close() error {
249 return d.db.Close()
250}
251
252type migration struct {
253 version int
254 sql string
255}
256
257var all = []migration{
258 {1, sql001},
259 {2, sql002},
260}
261
262func runMigrations(db *sql.DB) error {
263 if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_migrations (
264 version INTEGER PRIMARY KEY,
265 applied_at INTEGER NOT NULL
266 )`); err != nil {
267 return err
268 }
269
270 rows, err := db.Query("SELECT version FROM schema_migrations")
271 if err != nil {
272 return err
273 }
274 defer rows.Close()
275
276 applied := map[int]bool{}
277 for rows.Next() {
278 var v int
279 if err := rows.Scan(&v); err != nil {
280 return err
281 }
282 applied[v] = true
283 }
284
285 for _, m := range all {
286 if applied[m.version] {
287 continue
288 }
289
290 tx, err := db.Begin()
291 if err != nil {
292 return err
293 }
294
295 if _, err := tx.Exec(m.sql); err != nil {
296 tx.Rollback()
297 return fmt.Errorf("issuedb migrate v%d: %w", m.version, err)
298 }
299
300 if _, err := tx.Exec("INSERT INTO schema_migrations (version, applied_at) VALUES (?,?)", m.version, time.Now().Unix()); err != nil {
301 tx.Rollback()
302 return fmt.Errorf("issuedb migrate v%d record: %w", m.version, err)
303 }
304
305 if err := tx.Commit(); err != nil {
306 return err
307 }
308 }
309
310 return nil
311}