arche / internal/issues/issues.go

commit 154431fd
  1package issues
  2
  3import (
  4	"bytes"
  5	"database/sql"
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9	"sync"
 10	"time"
 11
 12	"arche/internal/object"
 13
 14	"github.com/google/uuid"
 15)
 16
 17type HLC struct {
 18	MS  int64
 19	Seq int
 20}
 21
 22func (h HLC) Less(other HLC) bool {
 23	if h.MS != other.MS {
 24		return h.MS < other.MS
 25	}
 26	return h.Seq < other.Seq
 27}
 28
 29type ObjectSink interface {
 30	WriteIssueEventObject(id [32]byte, raw []byte) error
 31}
 32
 33type Store struct {
 34	db       *sql.DB
 35	mu       sync.Mutex
 36	hlc      HLC
 37	sink     ObjectSink
 38	lastHash map[string][32]byte
 39}
 40
 41type Issue struct {
 42	ID           string
 43	Status       string
 44	Title        string
 45	Body         string
 46	BodyConflict *BodyConflict
 47	Comments     []Comment
 48	Labels       []string
 49	Refs         []string
 50}
 51
 52type BodyConflict struct {
 53	BaseEventID string
 54	OurEdit     string
 55	TheirEdit   string
 56}
 57
 58type Comment struct {
 59	EventID string
 60	Text    string
 61	Author  string
 62	HLC     HLC
 63}
 64
 65type IssueEvent struct {
 66	EventID string
 67	IssueID string
 68	HLCMS   int64
 69	HLCSeq  int
 70	Kind    string
 71	Payload []byte
 72	Author  string
 73	Created int64
 74}
 75
 76func New(db *sql.DB) *Store { return &Store{db: db} }
 77
 78func (s *Store) SetObjectSink(sink ObjectSink) {
 79	s.mu.Lock()
 80	defer s.mu.Unlock()
 81	s.sink = sink
 82	s.lastHash = make(map[string][32]byte)
 83}
 84
 85func (s *Store) SetParentHash(issueID string, h [32]byte) {
 86	s.mu.Lock()
 87	defer s.mu.Unlock()
 88	if s.lastHash != nil {
 89		s.lastHash[issueID] = h
 90	}
 91}
 92
 93func (s *Store) tickHLC(remote *HLC) HLC {
 94	now := time.Now().UnixMilli()
 95	s.mu.Lock()
 96	defer s.mu.Unlock()
 97	wall := now
 98	if s.hlc.MS > wall {
 99		wall = s.hlc.MS
100	}
101	if remote != nil && remote.MS > wall {
102		wall = remote.MS
103	}
104	if wall == s.hlc.MS {
105		s.hlc.Seq++
106	} else {
107		s.hlc.MS = wall
108		s.hlc.Seq = 0
109	}
110	return s.hlc
111}
112
113func newID() string {
114	return uuid.Must(uuid.NewV7()).String()
115}
116
117func (s *Store) insertEvent(issueID, kind string, payload any, author string) (IssueEvent, error) {
118	hlc := s.tickHLC(nil)
119	p, err := json.Marshal(payload)
120	if err != nil {
121		return IssueEvent{}, err
122	}
123	ev := IssueEvent{
124		EventID: newID(),
125		IssueID: issueID,
126		HLCMS:   hlc.MS,
127		HLCSeq:  hlc.Seq,
128		Kind:    kind,
129		Payload: p,
130		Author:  author,
131		Created: time.Now().Unix(),
132	}
133	_, err = s.db.Exec(
134		`INSERT INTO issue_events (event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created) VALUES (?,?,?,?,?,?,?,?)`,
135		ev.EventID, ev.IssueID, ev.HLCMS, ev.HLCSeq, ev.Kind, string(ev.Payload), ev.Author, ev.Created,
136	)
137	if err != nil {
138		return IssueEvent{}, err
139	}
140
141	if s.sink != nil {
142		s.mu.Lock()
143		var parents [][32]byte
144		if prev, ok := s.lastHash[issueID]; ok {
145			parents = [][32]byte{prev}
146		}
147		s.mu.Unlock()
148
149		obj := &object.IssueEventObject{
150			IssueID: issueID,
151			Kind:    kind,
152			Payload: p,
153			Author:  author,
154			HLCMS:   hlc.MS,
155			HLCSeq:  hlc.Seq,
156			Parents: parents,
157		}
158		var buf bytes.Buffer
159		object.EncodeIssueEvent(&buf, obj)
160		raw := buf.Bytes()
161		id := object.HashIssueEvent(obj)
162
163		if writeErr := s.sink.WriteIssueEventObject(id, raw); writeErr == nil {
164			s.mu.Lock()
165			s.lastHash[issueID] = id
166			s.mu.Unlock()
167		}
168	}
169
170	return ev, nil
171}
172
173func (s *Store) CreateIssue(title, body, author string) (string, error) {
174	id := newID()
175	if _, err := s.insertEvent(id, "create", map[string]string{"id": id}, author); err != nil {
176		return "", err
177	}
178	if _, err := s.insertEvent(id, "title", title, author); err != nil {
179		return "", err
180	}
181	if _, err := s.insertEvent(id, "status", "open", author); err != nil {
182		return "", err
183	}
184	if body != "" {
185		if _, err := s.insertEvent(id, "body", body, author); err != nil {
186			return "", err
187		}
188	}
189	return id, nil
190}
191
192func (s *Store) SetStatus(issueID, status, author string) error {
193	_, err := s.insertEvent(issueID, "status", status, author)
194	return err
195}
196
197func (s *Store) SetTitle(issueID, title, author string) error {
198	_, err := s.insertEvent(issueID, "title", title, author)
199	return err
200}
201
202func (s *Store) SetBody(issueID, body, author string) error {
203	_, err := s.insertEvent(issueID, "body", body, author)
204	return err
205}
206
207func (s *Store) AddComment(issueID, text, author string) error {
208	_, err := s.insertEvent(issueID, "comment", text, author)
209	return err
210}
211
212func (s *Store) AddLabel(issueID, label, author string) error {
213	token := newID()
214	_, err := s.insertEvent(issueID, "label_add", map[string]string{"label": label, "token": token}, author)
215	return err
216}
217
218func (s *Store) RemoveLabel(issueID, token, author string) error {
219	_, err := s.insertEvent(issueID, "label_rm", token, author)
220	return err
221}
222
223func (s *Store) AddRef(issueID, commitRef, author string) error {
224	_, err := s.insertEvent(issueID, "ref", commitRef, author)
225	return err
226}
227
228func (s *Store) loadEvents(issueID string) ([]IssueEvent, error) {
229	rows, err := s.db.Query(
230		`SELECT event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created FROM issue_events WHERE issue_id=? ORDER BY hlc_ms,hlc_seq`,
231		issueID,
232	)
233	if err != nil {
234		return nil, err
235	}
236	defer rows.Close()
237	var evs []IssueEvent
238	for rows.Next() {
239		var ev IssueEvent
240		var payload string
241		if err := rows.Scan(&ev.EventID, &ev.IssueID, &ev.HLCMS, &ev.HLCSeq, &ev.Kind, &payload, &ev.Author, &ev.Created); err != nil {
242			return nil, err
243		}
244		ev.Payload = []byte(payload)
245		evs = append(evs, ev)
246	}
247	return evs, rows.Err()
248}
249
250func Reduce(evs []IssueEvent) Issue {
251	var iss Issue
252
253	type lwwEntry struct {
254		hlcMS  int64
255		hlcSeq int
256		value  string
257		evID   string
258	}
259	var statusLWW, titleLWW, bodyLWW lwwEntry
260
261	type labelAdd struct{ label, token string }
262	var labelAdds []labelAdd
263	removedTokens := map[string]bool{}
264
265	for _, ev := range evs {
266		hlcMS, hlcSeq := ev.HLCMS, ev.HLCSeq
267		beats := func(cur lwwEntry) bool {
268			if hlcMS != cur.hlcMS {
269				return hlcMS > cur.hlcMS
270			}
271			return hlcSeq > cur.hlcSeq
272		}
273		switch ev.Kind {
274		case "create":
275			var m map[string]string
276			json.Unmarshal(ev.Payload, &m) //nolint:errcheck
277			iss.ID = m["id"]
278		case "status":
279			var v string
280			json.Unmarshal(ev.Payload, &v) //nolint:errcheck
281			if beats(statusLWW) {
282				statusLWW = lwwEntry{hlcMS, hlcSeq, v, ev.EventID}
283			}
284		case "title":
285			var v string
286			json.Unmarshal(ev.Payload, &v) //nolint:errcheck
287			if beats(titleLWW) {
288				titleLWW = lwwEntry{hlcMS, hlcSeq, v, ev.EventID}
289			}
290		case "body":
291			var v string
292			json.Unmarshal(ev.Payload, &v) //nolint:errcheck
293			if beats(bodyLWW) {
294				bodyLWW = lwwEntry{hlcMS, hlcSeq, v, ev.EventID}
295			}
296		case "body_conflict":
297			var bc BodyConflict
298			json.Unmarshal(ev.Payload, &bc) //nolint:errcheck
299			iss.BodyConflict = &bc
300		case "comment":
301			var text string
302			json.Unmarshal(ev.Payload, &text) //nolint:errcheck
303			iss.Comments = append(iss.Comments, Comment{
304				EventID: ev.EventID,
305				Text:    text,
306				Author:  ev.Author,
307				HLC:     HLC{ev.HLCMS, ev.HLCSeq},
308			})
309		case "label_add":
310			var m map[string]string
311			json.Unmarshal(ev.Payload, &m) //nolint:errcheck
312			if m["label"] != "" && m["token"] != "" {
313				labelAdds = append(labelAdds, labelAdd{m["label"], m["token"]})
314			}
315		case "label_rm":
316			var token string
317			json.Unmarshal(ev.Payload, &token) //nolint:errcheck
318			removedTokens[token] = true
319		case "ref":
320			var ref string
321			json.Unmarshal(ev.Payload, &ref) //nolint:errcheck
322			if ref != "" {
323				iss.Refs = append(iss.Refs, ref)
324			}
325		}
326	}
327
328	iss.Status = statusLWW.value
329	if iss.Status == "" {
330		iss.Status = "open"
331	}
332	iss.Title = titleLWW.value
333	iss.Body = bodyLWW.value
334
335	labelPresent := map[string]bool{}
336	for _, la := range labelAdds {
337		if !removedTokens[la.token] {
338			labelPresent[la.label] = true
339		}
340	}
341	for l := range labelPresent {
342		iss.Labels = append(iss.Labels, l)
343	}
344	sort.Strings(iss.Labels)
345
346	return iss
347}
348
349func (s *Store) GetIssue(issueID string) (Issue, error) {
350	evs, err := s.loadEvents(issueID)
351	if err != nil {
352		return Issue{}, err
353	}
354	if len(evs) == 0 {
355		return Issue{}, fmt.Errorf("issue not found: %s", issueID)
356	}
357	iss := Reduce(evs)
358	iss.ID = issueID
359	return iss, nil
360}
361
362type IssueStub struct {
363	ID     string
364	Status string
365	Title  string
366}
367
368func (s *Store) ListIssues() ([]IssueStub, error) {
369	rows, err := s.db.Query(
370		`SELECT DISTINCT issue_id FROM issue_events WHERE kind='create' ORDER BY rowid DESC`,
371	)
372	if err != nil {
373		return nil, err
374	}
375	defer rows.Close()
376	var ids []string
377	for rows.Next() {
378		var id string
379		rows.Scan(&id) //nolint:errcheck
380		ids = append(ids, id)
381	}
382	if err := rows.Err(); err != nil {
383		return nil, err
384	}
385
386	var stubs []IssueStub
387	for _, id := range ids {
388		evs, err := s.loadEvents(id)
389		if err != nil {
390			return nil, err
391		}
392		iss := Reduce(evs)
393		iss.ID = id
394		stubs = append(stubs, IssueStub{
395			ID:     iss.ID,
396			Status: iss.Status,
397			Title:  iss.Title,
398		})
399	}
400	return stubs, nil
401}
402
403func (s *Store) MergeEvents(evs []IssueEvent) error {
404	bodyIssues := map[string]bool{}
405
406	for _, ev := range evs {
407		res, err := s.db.Exec(
408			`INSERT OR IGNORE INTO issue_events (event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created) VALUES (?,?,?,?,?,?,?,?)`,
409			ev.EventID, ev.IssueID, ev.HLCMS, ev.HLCSeq, ev.Kind, string(ev.Payload), ev.Author, ev.Created,
410		)
411		if err != nil {
412			return err
413		}
414		s.mu.Lock()
415		remote := HLC{ev.HLCMS, ev.HLCSeq}
416		if !remote.Less(s.hlc) {
417			s.hlc = remote
418			s.hlc.Seq++
419		}
420		s.mu.Unlock()
421
422		if ev.Kind == "body" {
423			if n, _ := res.RowsAffected(); n > 0 {
424				bodyIssues[ev.IssueID] = true
425			}
426		}
427	}
428
429	for issueID := range bodyIssues {
430		if err := s.detectBodyConflict(issueID); err != nil {
431			return err
432		}
433	}
434
435	return nil
436}
437
438func (s *Store) detectBodyConflict(issueID string) error {
439	rows, err := s.db.Query(
440		`SELECT event_id, hlc_ms, hlc_seq, payload FROM issue_events
441		 WHERE issue_id=? AND kind='body' ORDER BY hlc_ms DESC, hlc_seq DESC`,
442		issueID,
443	)
444	if err != nil {
445		return err
446	}
447	type bodyEv struct {
448		eventID string
449		hlcMS   int64
450		hlcSeq  int
451		text    string
452	}
453	var bodies []bodyEv
454	for rows.Next() {
455		var b bodyEv
456		var raw string
457		if err := rows.Scan(&b.eventID, &b.hlcMS, &b.hlcSeq, &raw); err != nil {
458			rows.Close()
459			return err
460		}
461		json.Unmarshal([]byte(raw), &b.text) //nolint:errcheck
462		bodies = append(bodies, b)
463	}
464	rows.Close()
465	if err := rows.Err(); err != nil {
466		return err
467	}
468	if len(bodies) < 2 {
469		return nil
470	}
471
472	var existing int
473	_ = s.db.QueryRow(
474		`SELECT COUNT(*) FROM issue_events WHERE issue_id=? AND kind='body_conflict'`,
475		issueID,
476	).Scan(&existing)
477	if existing > 0 {
478		return nil
479	}
480
481	bc := BodyConflict{
482		BaseEventID: bodies[1].eventID,
483		OurEdit:     bodies[0].text,
484		TheirEdit:   bodies[1].text,
485	}
486	_, err = s.insertEvent(issueID, "body_conflict", bc, "")
487	return err
488}
489
490func (s *Store) AllEvents() ([]IssueEvent, error) {
491	rows, err := s.db.Query(
492		`SELECT event_id,issue_id,hlc_ms,hlc_seq,kind,payload,author,created FROM issue_events ORDER BY hlc_ms,hlc_seq`,
493	)
494	if err != nil {
495		return nil, err
496	}
497	defer rows.Close()
498	var evs []IssueEvent
499	for rows.Next() {
500		var ev IssueEvent
501		var payload string
502		rows.Scan(&ev.EventID, &ev.IssueID, &ev.HLCMS, &ev.HLCSeq, &ev.Kind, &payload, &ev.Author, &ev.Created) //nolint:errcheck
503		ev.Payload = []byte(payload)
504		evs = append(evs, ev)
505	}
506	return evs, rows.Err()
507}