1package issues
2
3import (
4 "bytes"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "sort"
9 "sync"
10 "time"
11
12 "arche/internal/object"
13
14 "github.com/google/uuid"
15)
16
17type HLC struct {
18 MS int64
19 Seq int
20}
21
22func (h HLC) Less(other HLC) bool {
23 if h.MS != other.MS {
24 return h.MS < other.MS
25 }
26 return h.Seq < other.Seq
27}
28
29type ObjectSink interface {
30 WriteIssueEventObject(id [32]byte, raw []byte) error
31}
32
33type Store struct {
34 db *sql.DB
35 mu sync.Mutex
36 hlc HLC
37 sink ObjectSink
38 lastHash map[string][32]byte
39}
40
41type Issue struct {
42 ID string
43 Status string
44 Title string
45 Body string
46 BodyConflict *BodyConflict
47 Comments []Comment
48 Labels []string
49 Refs []string
50}
51
52type BodyConflict struct {
53 BaseEventID string
54 OurEdit string
55 TheirEdit string
56}
57
58type Comment struct {
59 EventID string
60 Text string
61 Author string
62 HLC HLC
63}
64
65type IssueEvent struct {
66 EventID string
67 IssueID string
68 HLCMS int64
69 HLCSeq int
70 Kind string
71 Payload []byte
72 Author string
73 Created int64
74}
75
76func New(db *sql.DB) *Store { return &Store{db: db} }
77
78func (s *Store) SetObjectSink(sink ObjectSink) {
79 s.mu.Lock()
80 defer s.mu.Unlock()
81 s.sink = sink
82 s.lastHash = make(map[string][32]byte)
83}
84
85func (s *Store) SetParentHash(issueID string, h [32]byte) {
86 s.mu.Lock()
87 defer s.mu.Unlock()
88 if s.lastHash != nil {
89 s.lastHash[issueID] = h
90 }
91}
92
93func (s *Store) tickHLC(remote *HLC) HLC {
94 now := time.Now().UnixMilli()
95 s.mu.Lock()
96 defer s.mu.Unlock()
97 wall := now
98 if s.hlc.MS > wall {
99 wall = s.hlc.MS
100 }
101 if remote != nil && remote.MS > wall {
102 wall = remote.MS
103 }
104 if wall == s.hlc.MS {
105 s.hlc.Seq++
106 } else {
107 s.hlc.MS = wall
108 s.hlc.Seq = 0
109 }
110 return s.hlc
111}
112
113func newID() string {
114 return uuid.Must(uuid.NewV7()).String()
115}
116
117func (s *Store) insertEvent(issueID, kind string, payload any, author string) (IssueEvent, error) {
118 hlc := s.tickHLC(nil)
119 p, err := json.Marshal(payload)
120 if err != nil {
121 return IssueEvent{}, err
122 }
123 ev := IssueEvent{
124 EventID: newID(),
125 IssueID: issueID,
126 HLCMS: hlc.MS,
127 HLCSeq: hlc.Seq,
128 Kind: kind,
129 Payload: p,
130 Author: author,
131 Created: time.Now().Unix(),
132 }
133 _, err = s.db.Exec(
134 `INSERT INTO issue_events (event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created) VALUES (?,?,?,?,?,?,?,?)`,
135 ev.EventID, ev.IssueID, ev.HLCMS, ev.HLCSeq, ev.Kind, string(ev.Payload), ev.Author, ev.Created,
136 )
137 if err != nil {
138 return IssueEvent{}, err
139 }
140
141 if s.sink != nil {
142 s.mu.Lock()
143 var parents [][32]byte
144 if prev, ok := s.lastHash[issueID]; ok {
145 parents = [][32]byte{prev}
146 }
147 s.mu.Unlock()
148
149 obj := &object.IssueEventObject{
150 IssueID: issueID,
151 Kind: kind,
152 Payload: p,
153 Author: author,
154 HLCMS: hlc.MS,
155 HLCSeq: hlc.Seq,
156 Parents: parents,
157 }
158 var buf bytes.Buffer
159 object.EncodeIssueEvent(&buf, obj)
160 raw := buf.Bytes()
161 id := object.HashIssueEvent(obj)
162
163 if writeErr := s.sink.WriteIssueEventObject(id, raw); writeErr == nil {
164 s.mu.Lock()
165 s.lastHash[issueID] = id
166 s.mu.Unlock()
167 }
168 }
169
170 return ev, nil
171}
172
173func (s *Store) CreateIssue(title, body, author string) (string, error) {
174 id := newID()
175 if _, err := s.insertEvent(id, "create", map[string]string{"id": id}, author); err != nil {
176 return "", err
177 }
178 if _, err := s.insertEvent(id, "title", title, author); err != nil {
179 return "", err
180 }
181 if _, err := s.insertEvent(id, "status", "open", author); err != nil {
182 return "", err
183 }
184 if body != "" {
185 if _, err := s.insertEvent(id, "body", body, author); err != nil {
186 return "", err
187 }
188 }
189 return id, nil
190}
191
192func (s *Store) SetStatus(issueID, status, author string) error {
193 _, err := s.insertEvent(issueID, "status", status, author)
194 return err
195}
196
197func (s *Store) SetTitle(issueID, title, author string) error {
198 _, err := s.insertEvent(issueID, "title", title, author)
199 return err
200}
201
202func (s *Store) SetBody(issueID, body, author string) error {
203 _, err := s.insertEvent(issueID, "body", body, author)
204 return err
205}
206
207func (s *Store) AddComment(issueID, text, author string) error {
208 _, err := s.insertEvent(issueID, "comment", text, author)
209 return err
210}
211
212func (s *Store) AddLabel(issueID, label, author string) error {
213 token := newID()
214 _, err := s.insertEvent(issueID, "label_add", map[string]string{"label": label, "token": token}, author)
215 return err
216}
217
218func (s *Store) RemoveLabel(issueID, token, author string) error {
219 _, err := s.insertEvent(issueID, "label_rm", token, author)
220 return err
221}
222
223func (s *Store) AddRef(issueID, commitRef, author string) error {
224 _, err := s.insertEvent(issueID, "ref", commitRef, author)
225 return err
226}
227
228func (s *Store) loadEvents(issueID string) ([]IssueEvent, error) {
229 rows, err := s.db.Query(
230 `SELECT event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created FROM issue_events WHERE issue_id=? ORDER BY hlc_ms,hlc_seq`,
231 issueID,
232 )
233 if err != nil {
234 return nil, err
235 }
236 defer rows.Close()
237 var evs []IssueEvent
238 for rows.Next() {
239 var ev IssueEvent
240 var payload string
241 if err := rows.Scan(&ev.EventID, &ev.IssueID, &ev.HLCMS, &ev.HLCSeq, &ev.Kind, &payload, &ev.Author, &ev.Created); err != nil {
242 return nil, err
243 }
244 ev.Payload = []byte(payload)
245 evs = append(evs, ev)
246 }
247 return evs, rows.Err()
248}
249
250func Reduce(evs []IssueEvent) Issue {
251 var iss Issue
252
253 type lwwEntry struct {
254 hlcMS int64
255 hlcSeq int
256 value string
257 evID string
258 }
259 var statusLWW, titleLWW, bodyLWW lwwEntry
260
261 type labelAdd struct{ label, token string }
262 var labelAdds []labelAdd
263 removedTokens := map[string]bool{}
264
265 for _, ev := range evs {
266 hlcMS, hlcSeq := ev.HLCMS, ev.HLCSeq
267 beats := func(cur lwwEntry) bool {
268 if hlcMS != cur.hlcMS {
269 return hlcMS > cur.hlcMS
270 }
271 return hlcSeq > cur.hlcSeq
272 }
273 switch ev.Kind {
274 case "create":
275 var m map[string]string
276 json.Unmarshal(ev.Payload, &m) //nolint:errcheck
277 iss.ID = m["id"]
278 case "status":
279 var v string
280 json.Unmarshal(ev.Payload, &v) //nolint:errcheck
281 if beats(statusLWW) {
282 statusLWW = lwwEntry{hlcMS, hlcSeq, v, ev.EventID}
283 }
284 case "title":
285 var v string
286 json.Unmarshal(ev.Payload, &v) //nolint:errcheck
287 if beats(titleLWW) {
288 titleLWW = lwwEntry{hlcMS, hlcSeq, v, ev.EventID}
289 }
290 case "body":
291 var v string
292 json.Unmarshal(ev.Payload, &v) //nolint:errcheck
293 if beats(bodyLWW) {
294 bodyLWW = lwwEntry{hlcMS, hlcSeq, v, ev.EventID}
295 }
296 case "body_conflict":
297 var bc BodyConflict
298 json.Unmarshal(ev.Payload, &bc) //nolint:errcheck
299 iss.BodyConflict = &bc
300 case "comment":
301 var text string
302 json.Unmarshal(ev.Payload, &text) //nolint:errcheck
303 iss.Comments = append(iss.Comments, Comment{
304 EventID: ev.EventID,
305 Text: text,
306 Author: ev.Author,
307 HLC: HLC{ev.HLCMS, ev.HLCSeq},
308 })
309 case "label_add":
310 var m map[string]string
311 json.Unmarshal(ev.Payload, &m) //nolint:errcheck
312 if m["label"] != "" && m["token"] != "" {
313 labelAdds = append(labelAdds, labelAdd{m["label"], m["token"]})
314 }
315 case "label_rm":
316 var token string
317 json.Unmarshal(ev.Payload, &token) //nolint:errcheck
318 removedTokens[token] = true
319 case "ref":
320 var ref string
321 json.Unmarshal(ev.Payload, &ref) //nolint:errcheck
322 if ref != "" {
323 iss.Refs = append(iss.Refs, ref)
324 }
325 }
326 }
327
328 iss.Status = statusLWW.value
329 if iss.Status == "" {
330 iss.Status = "open"
331 }
332 iss.Title = titleLWW.value
333 iss.Body = bodyLWW.value
334
335 labelPresent := map[string]bool{}
336 for _, la := range labelAdds {
337 if !removedTokens[la.token] {
338 labelPresent[la.label] = true
339 }
340 }
341 for l := range labelPresent {
342 iss.Labels = append(iss.Labels, l)
343 }
344 sort.Strings(iss.Labels)
345
346 return iss
347}
348
349func (s *Store) GetIssue(issueID string) (Issue, error) {
350 evs, err := s.loadEvents(issueID)
351 if err != nil {
352 return Issue{}, err
353 }
354 if len(evs) == 0 {
355 return Issue{}, fmt.Errorf("issue not found: %s", issueID)
356 }
357 iss := Reduce(evs)
358 iss.ID = issueID
359 return iss, nil
360}
361
362type IssueStub struct {
363 ID string
364 Status string
365 Title string
366}
367
368func (s *Store) ListIssues() ([]IssueStub, error) {
369 rows, err := s.db.Query(
370 `SELECT DISTINCT issue_id FROM issue_events WHERE kind='create' ORDER BY rowid DESC`,
371 )
372 if err != nil {
373 return nil, err
374 }
375 defer rows.Close()
376 var ids []string
377 for rows.Next() {
378 var id string
379 rows.Scan(&id) //nolint:errcheck
380 ids = append(ids, id)
381 }
382 if err := rows.Err(); err != nil {
383 return nil, err
384 }
385
386 var stubs []IssueStub
387 for _, id := range ids {
388 evs, err := s.loadEvents(id)
389 if err != nil {
390 return nil, err
391 }
392 iss := Reduce(evs)
393 iss.ID = id
394 stubs = append(stubs, IssueStub{
395 ID: iss.ID,
396 Status: iss.Status,
397 Title: iss.Title,
398 })
399 }
400 return stubs, nil
401}
402
403func (s *Store) MergeEvents(evs []IssueEvent) error {
404 bodyIssues := map[string]bool{}
405
406 for _, ev := range evs {
407 res, err := s.db.Exec(
408 `INSERT OR IGNORE INTO issue_events (event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created) VALUES (?,?,?,?,?,?,?,?)`,
409 ev.EventID, ev.IssueID, ev.HLCMS, ev.HLCSeq, ev.Kind, string(ev.Payload), ev.Author, ev.Created,
410 )
411 if err != nil {
412 return err
413 }
414 s.mu.Lock()
415 remote := HLC{ev.HLCMS, ev.HLCSeq}
416 if !remote.Less(s.hlc) {
417 s.hlc = remote
418 s.hlc.Seq++
419 }
420 s.mu.Unlock()
421
422 if ev.Kind == "body" {
423 if n, _ := res.RowsAffected(); n > 0 {
424 bodyIssues[ev.IssueID] = true
425 }
426 }
427 }
428
429 for issueID := range bodyIssues {
430 if err := s.detectBodyConflict(issueID); err != nil {
431 return err
432 }
433 }
434
435 return nil
436}
437
438func (s *Store) detectBodyConflict(issueID string) error {
439 rows, err := s.db.Query(
440 `SELECT event_id, hlc_ms, hlc_seq, payload FROM issue_events
441 WHERE issue_id=? AND kind='body' ORDER BY hlc_ms DESC, hlc_seq DESC`,
442 issueID,
443 )
444 if err != nil {
445 return err
446 }
447 type bodyEv struct {
448 eventID string
449 hlcMS int64
450 hlcSeq int
451 text string
452 }
453 var bodies []bodyEv
454 for rows.Next() {
455 var b bodyEv
456 var raw string
457 if err := rows.Scan(&b.eventID, &b.hlcMS, &b.hlcSeq, &raw); err != nil {
458 rows.Close()
459 return err
460 }
461 json.Unmarshal([]byte(raw), &b.text) //nolint:errcheck
462 bodies = append(bodies, b)
463 }
464 rows.Close()
465 if err := rows.Err(); err != nil {
466 return err
467 }
468 if len(bodies) < 2 {
469 return nil
470 }
471
472 var existing int
473 _ = s.db.QueryRow(
474 `SELECT COUNT(*) FROM issue_events WHERE issue_id=? AND kind='body_conflict'`,
475 issueID,
476 ).Scan(&existing)
477 if existing > 0 {
478 return nil
479 }
480
481 bc := BodyConflict{
482 BaseEventID: bodies[1].eventID,
483 OurEdit: bodies[0].text,
484 TheirEdit: bodies[1].text,
485 }
486 _, err = s.insertEvent(issueID, "body_conflict", bc, "")
487 return err
488}
489
490func (s *Store) AllEvents() ([]IssueEvent, error) {
491 rows, err := s.db.Query(
492 `SELECT event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created FROM issue_events ORDER BY hlc_ms,hlc_seq`,
493 )
494 if err != nil {
495 return nil, err
496 }
497 defer rows.Close()
498 var evs []IssueEvent
499 for rows.Next() {
500 var ev IssueEvent
501 var payload string
502 rows.Scan(&ev.EventID, &ev.IssueID, &ev.HLCMS, &ev.HLCSeq, &ev.Kind, &payload, &ev.Author, &ev.Created) //nolint:errcheck
503 ev.Payload = []byte(payload)
504 evs = append(evs, ev)
505 }
506 return evs, rows.Err()
507}