arche / internal/syncpkg/server.go

commit 154431fd
  1package syncpkg
  2
  3import (
  4	"bytes"
  5	"encoding/hex"
  6	"encoding/json"
  7	"fmt"
  8	"io"
  9	"log/slog"
 10	"net/http"
 11	"strings"
 12
 13	"arche/internal/issuedb"
 14	"arche/internal/issues"
 15	"arche/internal/object"
 16	"arche/internal/repo"
 17	"arche/internal/store"
 18)
 19
 20type Server struct {
 21	repo     *repo.Repo
 22	token    string
 23	canWrite bool
 24
 25	OnBookmarkUpdated func(name, oldHex, newHex string)
 26
 27	PreUpdateHook func(name, oldHex, newHex string) error
 28}
 29
 30func NewServer(r *repo.Repo, token string) *Server {
 31	return &Server{repo: r, token: token, canWrite: true}
 32}
 33
 34func NewServerAuth(r *repo.Repo, canWrite bool) *Server {
 35	return &Server{repo: r, canWrite: canWrite}
 36}
 37
 38func (s *Server) Handler() http.Handler {
 39	mux := http.NewServeMux()
 40	mux.HandleFunc("/arche/v1/info", s.handleInfo)
 41	mux.HandleFunc("/arche/v1/bloom", s.handleBloom)
 42	mux.HandleFunc("/arche/v1/fetch", s.handleFetch)
 43	mux.HandleFunc("/arche/v1/push", s.handlePush)
 44	mux.HandleFunc("/arche/v1/update-bookmark", s.handleUpdateBookmark)
 45	mux.HandleFunc("/arche/v1/issues", s.handleIssues)
 46	return s.authMiddleware(mux)
 47}
 48
 49func (s *Server) authMiddleware(next http.Handler) http.Handler {
 50	if s.token == "" {
 51		return next
 52	}
 53	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 54		auth := r.Header.Get("Authorization")
 55		expected := "Bearer " + s.token
 56		if auth != expected {
 57			http.Error(w, "Unauthorized", http.StatusUnauthorized)
 58			return
 59		}
 60		next.ServeHTTP(w, r)
 61	})
 62}
 63
 64func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) {
 65	if r.Method != http.MethodGet {
 66		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
 67		return
 68	}
 69
 70	headCID, _ := s.repo.Head()
 71	bare := object.StripChangeIDPrefix(headCID)
 72	commitID, _ := s.repo.Store.GetChangeCommit(bare)
 73
 74	count, _ := countCommits(s.repo)
 75
 76	bms, _ := s.repo.Store.ListBookmarks()
 77	bmMap := make(map[string]string, len(bms))
 78	for _, bm := range bms {
 79		bmMap[bm.Name] = hex.EncodeToString(bm.CommitID[:])
 80	}
 81	_ = commitID
 82
 83	resp := InfoResponse{
 84		HeadChangeID: headCID,
 85		CommitCount:  count,
 86		Bookmarks:    bmMap,
 87	}
 88	w.Header().Set("Content-Type", "application/json")
 89	json.NewEncoder(w).Encode(resp)
 90}
 91
 92func (s *Server) handleBloom(w http.ResponseWriter, r *http.Request) {
 93	if r.Method != http.MethodPost {
 94		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
 95		return
 96	}
 97
 98	body, err := io.ReadAll(io.LimitReader(r.Body, 16<<20))
 99	if err != nil {
100		http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
101		return
102	}
103
104	var filter *BloomFilter
105	if len(body) > 0 {
106		filter, err = BloomFilterFrom(body)
107		if err != nil {
108			http.Error(w, "parse bloom filter: "+err.Error(), http.StatusBadRequest)
109			return
110		}
111	}
112
113	missing, err := collectMissingObjects(s.repo, filter)
114	if err != nil {
115		http.Error(w, "collect missing: "+err.Error(), http.StatusInternalServerError)
116		return
117	}
118
119	w.Header().Set("Content-Type", "application/octet-stream")
120	if err := writeObjectPack(w, s.repo, missing); err != nil {
121		return
122	}
123}
124
125func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) {
126	if r.Method != http.MethodPost {
127		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
128		return
129	}
130	if !s.canWrite {
131		http.Error(w, "Forbidden: read-only access", http.StatusForbidden)
132		return
133	}
134
135	entries, err := ReadPack(r.Body)
136	if err != nil {
137		http.Error(w, "read pack: "+err.Error(), http.StatusBadRequest)
138		return
139	}
140
141	if err := storePackEntries(s.repo, entries); err != nil {
142		http.Error(w, "store objects: "+err.Error(), http.StatusInternalServerError)
143		return
144	}
145
146	w.Header().Set("Content-Type", "application/json")
147	json.NewEncoder(w).Encode(map[string]bool{"ok": true})
148}
149
150func (s *Server) handleUpdateBookmark(w http.ResponseWriter, r *http.Request) {
151	if r.Method != http.MethodPost {
152		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
153		return
154	}
155	if !s.canWrite {
156		http.Error(w, "Forbidden: read-only access", http.StatusForbidden)
157		return
158	}
159
160	var req RefUpdateRequest
161	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
162		http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
163		return
164	}
165
166	w.Header().Set("Content-Type", "application/json")
167
168	newIDBytes, err := hex.DecodeString(req.NewID)
169	if err != nil || len(newIDBytes) != 32 {
170		http.Error(w, "invalid new_id", http.StatusBadRequest)
171		return
172	}
173	var newID [32]byte
174	copy(newID[:], newIDBytes)
175
176	existing, _ := s.repo.Store.GetBookmark(req.Name)
177	currentHex := ""
178	var currentID [32]byte
179	if existing != nil {
180		currentHex = hex.EncodeToString(existing.CommitID[:])
181		currentID = existing.CommitID
182	}
183
184	if !req.Force && !req.ForcePublic && req.ExpectedID != currentHex {
185		json.NewEncoder(w).Encode(RefUpdateResponse{ //nolint:errcheck
186			OK:      false,
187			Current: currentHex,
188			Err: fmt.Sprintf(
189				"bookmark %q diverged: remote tip is %s, expected %s — pull and merge before pushing",
190				req.Name, currentHex, req.ExpectedID),
191		})
192		return
193	}
194
195	if (req.Force || req.ForcePublic) && existing != nil {
196		phase, _ := s.repo.Store.GetPhase(currentID)
197		if phase == object.PhasePublic && !req.ForcePublic {
198			json.NewEncoder(w).Encode(RefUpdateResponse{ //nolint:errcheck
199				OK:  false,
200				Err: fmt.Sprintf("bookmark %q points to a public commit; use --force-public to overwrite", req.Name),
201			})
202			return
203		}
204	}
205
206	if s.PreUpdateHook != nil {
207		if hookErr := s.PreUpdateHook(req.Name, currentHex, req.NewID); hookErr != nil {
208			json.NewEncoder(w).Encode(RefUpdateResponse{ //nolint:errcheck
209				OK:  false,
210				Err: "pre-update hook rejected: " + hookErr.Error(),
211			})
212			return
213		}
214	}
215
216	tx, err := s.repo.Store.Begin()
217	if err != nil {
218		http.Error(w, "begin tx: "+err.Error(), http.StatusInternalServerError)
219		return
220	}
221
222	if req.ForcePublic && existing != nil {
223		marker := &object.ObsoleteMarker{
224			Predecessor: currentID,
225			Successors:  [][32]byte{newID},
226			Reason:      "force-push",
227			Timestamp:   0,
228		}
229		if _, err := repo.WriteObsoleteTx(s.repo.Store, tx, marker); err != nil {
230			s.repo.Store.Rollback(tx)
231			http.Error(w, "write obsolete marker: "+err.Error(), http.StatusInternalServerError)
232			return
233		}
234		slog.Warn("force-pushing public bookmark",
235			"bookmark", req.Name,
236			"old_tip", currentHex[:8],
237			"new_tip", req.NewID[:8],
238		)
239	}
240
241	if err := s.repo.Store.SetBookmark(tx, store.Bookmark{Name: req.Name, CommitID: newID}); err != nil {
242		s.repo.Store.Rollback(tx)
243		http.Error(w, "set bookmark: "+err.Error(), http.StatusInternalServerError)
244		return
245	}
246	if err := s.repo.Store.Commit(tx); err != nil {
247		http.Error(w, "commit: "+err.Error(), http.StatusInternalServerError)
248		return
249	}
250
251	if s.OnBookmarkUpdated != nil {
252		go s.OnBookmarkUpdated(req.Name, currentHex, req.NewID)
253	}
254
255	json.NewEncoder(w).Encode(RefUpdateResponse{OK: true}) //nolint:errcheck
256}
257
258func collectMissingObjects(r *repo.Repo, filter *BloomFilter) ([][32]byte, error) {
259	allIDs, err := listAllObjectIDs(r)
260	if err != nil {
261		return nil, err
262	}
263	if filter == nil {
264		return allIDs, nil
265	}
266	var missing [][32]byte
267	for _, id := range allIDs {
268		if !filter.Test(id) {
269			missing = append(missing, id)
270		}
271	}
272	return missing, nil
273}
274
275func listAllObjectIDs(r *repo.Repo) ([][32]byte, error) {
276	seen := make(map[[32]byte]bool)
277	var queue [][32]byte
278
279	if headCID, err := r.Head(); err == nil {
280		bare := object.StripChangeIDPrefix(headCID)
281		if id, err := r.Store.GetChangeCommit(bare); err == nil {
282			if !seen[id] {
283				seen[id] = true
284				queue = append(queue, id)
285			}
286		}
287	}
288	bms, _ := r.Store.ListBookmarks()
289	for _, bm := range bms {
290		if !seen[bm.CommitID] {
291			seen[bm.CommitID] = true
292			queue = append(queue, bm.CommitID)
293		}
294	}
295
296	var result [][32]byte
297	for len(queue) > 0 {
298		id := queue[0]
299		queue = queue[1:]
300		result = append(result, id)
301
302		_, raw, err := r.Store.ReadObject(id)
303		if err != nil {
304			continue
305		}
306		if strings.HasPrefix(string(raw), "arche-commit") {
307			c, err := object.DecodeCommit(raw)
308			if err != nil {
309				continue
310			}
311			if !seen[c.TreeID] {
312				seen[c.TreeID] = true
313				queue = append(queue, c.TreeID)
314			}
315			for _, p := range c.Parents {
316				if !seen[p] {
317					seen[p] = true
318					queue = append(queue, p)
319				}
320			}
321		} else if strings.HasPrefix(string(raw), "arche-tree") {
322			t, err := object.DecodeTree(raw)
323			if err != nil {
324				continue
325			}
326			for _, e := range t.Entries {
327				if !seen[e.ObjectID] {
328					seen[e.ObjectID] = true
329					queue = append(queue, e.ObjectID)
330				}
331			}
332		}
333	}
334
335	issueIDs, _ := r.Store.ListObjectsByKind(string(object.KindIssueEvent))
336	for _, id := range issueIDs {
337		if !seen[id] {
338			seen[id] = true
339			result = append(result, id)
340		}
341	}
342
343	return result, nil
344}
345
346func listPushableObjectIDs(r *repo.Repo) ([][32]byte, error) {
347	seen := make(map[[32]byte]bool)
348	var queue [][32]byte
349
350	addIfNonSecret := func(id [32]byte) {
351		phase, _ := r.Store.GetPhase(id)
352		if phase == object.PhaseSecret {
353			return
354		}
355		if !seen[id] {
356			seen[id] = true
357			queue = append(queue, id)
358		}
359	}
360
361	if headCID, err := r.Head(); err == nil {
362		bare := object.StripChangeIDPrefix(headCID)
363		if id, err := r.Store.GetChangeCommit(bare); err == nil {
364			addIfNonSecret(id)
365		}
366	}
367	bms, _ := r.Store.ListBookmarks()
368	for _, bm := range bms {
369		addIfNonSecret(bm.CommitID)
370	}
371
372	var result [][32]byte
373	for len(queue) > 0 {
374		id := queue[0]
375		queue = queue[1:]
376		result = append(result, id)
377
378		_, raw, err := r.Store.ReadObject(id)
379		if err != nil {
380			continue
381		}
382		if bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
383			c, err := object.DecodeCommit(raw)
384			if err != nil {
385				continue
386			}
387			if !seen[c.TreeID] {
388				seen[c.TreeID] = true
389				queue = append(queue, c.TreeID)
390			}
391			for _, p := range c.Parents {
392				phase, _ := r.Store.GetPhase(p)
393				if phase == object.PhaseSecret {
394					continue
395				}
396				if !seen[p] {
397					seen[p] = true
398					queue = append(queue, p)
399				}
400			}
401		} else if bytes.HasPrefix(raw, []byte("arche-tree\x00")) {
402			t, err := object.DecodeTree(raw)
403			if err != nil {
404				continue
405			}
406			for _, e := range t.Entries {
407				if !seen[e.ObjectID] {
408					seen[e.ObjectID] = true
409					queue = append(queue, e.ObjectID)
410				}
411			}
412		}
413	}
414
415	issueIDs, _ := r.Store.ListObjectsByKind(string(object.KindIssueEvent))
416	for _, id := range issueIDs {
417		if !seen[id] {
418			seen[id] = true
419			result = append(result, id)
420		}
421	}
422
423	return result, nil
424}
425
426func (s *Server) handleFetch(w http.ResponseWriter, r *http.Request) {
427	if r.Method != http.MethodPost {
428		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
429		return
430	}
431
432	body, err := io.ReadAll(io.LimitReader(r.Body, 4<<20))
433	if err != nil {
434		http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
435		return
436	}
437	var hexIDs []string
438	if err := json.Unmarshal(body, &hexIDs); err != nil {
439		http.Error(w, "decode: "+err.Error(), http.StatusBadRequest)
440		return
441	}
442	ids := make([][32]byte, 0, len(hexIDs))
443	for _, h := range hexIDs {
444		b, err := hex.DecodeString(h)
445		if err != nil || len(b) != 32 {
446			continue
447		}
448		var id [32]byte
449		copy(id[:], b)
450		ids = append(ids, id)
451	}
452
453	w.Header().Set("Content-Type", "application/octet-stream")
454	writeObjectPack(w, s.repo, ids) //nolint:errcheck
455}
456
457func writeObjectPack(w io.Writer, r *repo.Repo, ids [][32]byte) error {
458	for _, id := range ids {
459		kind, data, err := r.Store.ReadObject(id)
460		if err != nil {
461			continue
462		}
463
464		entry := PackEntry{ID: id, Kind: kind, Data: data}
465		if err := writePackEntry(w, entry); err != nil {
466			return err
467		}
468	}
469
470	return writePackEnd(w)
471}
472
473func storePackEntries(r *repo.Repo, entries []PackEntry) error {
474	if len(entries) == 0 {
475		return nil
476	}
477	tx, err := r.Store.Begin()
478	if err != nil {
479		return err
480	}
481
482	for _, e := range entries {
483		if err := r.Store.WriteObject(tx, e.ID, e.Kind, e.Data); err != nil {
484			r.Store.Rollback(tx)
485			return fmt.Errorf("store object %x: %w", e.ID[:6], err)
486		}
487		if e.Kind == string(object.KindCommit) {
488			if err := r.Store.SetPhase(tx, e.ID, object.PhasePublic); err != nil {
489				r.Store.Rollback(tx)
490				return fmt.Errorf("set phase %x: %w", e.ID[:6], err)
491			}
492		}
493	}
494
495	if err := r.Store.Commit(tx); err != nil {
496		return err
497	}
498
499	applyReceivedIssueEvents(r, entries)
500
501	return nil
502}
503
504func applyReceivedIssueEvents(r *repo.Repo, entries []PackEntry) {
505	var issueEvs []issues.IssueEvent
506	for _, e := range entries {
507		if e.Kind != string(object.KindIssueEvent) {
508			continue
509		}
510		obj, err := object.DecodeIssueEvent(e.Data)
511		if err != nil {
512			continue
513		}
514		issueEvs = append(issueEvs, issues.IssueEvent{
515			EventID: fmt.Sprintf("%x", e.ID),
516			IssueID: obj.IssueID,
517			HLCMS:   obj.HLCMS,
518			HLCSeq:  obj.HLCSeq,
519			Kind:    obj.Kind,
520			Payload: obj.Payload,
521			Author:  obj.Author,
522			Created: 0,
523		})
524	}
525	if len(issueEvs) == 0 {
526		return
527	}
528	idb, err := issuedb.Open(r.ArcheDir())
529	if err != nil {
530		return
531	}
532	defer idb.Close()
533	_ = idb.Issues.MergeEvents(issueEvs)
534}
535
536func countCommits(r *repo.Repo) (int64, error) {
537	ids, err := listAllObjectIDs(r)
538	if err != nil {
539		return 0, err
540	}
541	var n int64
542	for _, id := range ids {
543		_, raw, err := r.Store.ReadObject(id)
544		if err != nil {
545			continue
546		}
547		if bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
548			n++
549		}
550	}
551	return n, nil
552}
553
554func (s *Server) handleIssues(w http.ResponseWriter, r *http.Request) {
555	idb, err := issuedb.Open(s.repo.ArcheDir())
556	if err != nil {
557		http.Error(w, "open issuedb: "+err.Error(), http.StatusInternalServerError)
558		return
559	}
560	defer idb.Close()
561
562	switch r.Method {
563	case http.MethodGet:
564		evs, err := idb.Issues.AllEvents()
565		if err != nil {
566			http.Error(w, "list events: "+err.Error(), http.StatusInternalServerError)
567			return
568		}
569		w.Header().Set("Content-Type", "application/json")
570		json.NewEncoder(w).Encode(evs)
571
572	case http.MethodPost:
573		if !s.canWrite {
574			http.Error(w, "Forbidden: read-only access", http.StatusForbidden)
575			return
576		}
577		body, err := io.ReadAll(io.LimitReader(r.Body, 32<<20))
578		if err != nil {
579			http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
580			return
581		}
582		var evs []issues.IssueEvent
583		if err := json.Unmarshal(body, &evs); err != nil {
584			http.Error(w, "decode events: "+err.Error(), http.StatusBadRequest)
585			return
586		}
587		if err := idb.Issues.MergeEvents(evs); err != nil {
588			http.Error(w, "merge events: "+err.Error(), http.StatusInternalServerError)
589			return
590		}
591		w.Header().Set("Content-Type", "application/json")
592		json.NewEncoder(w).Encode(map[string]int{"merged": len(evs)})
593
594	default:
595		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
596	}
597}