1package syncpkg
2
3import (
4 "bytes"
5 "encoding/hex"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "strings"
12
13 "arche/internal/issuedb"
14 "arche/internal/issues"
15 "arche/internal/object"
16 "arche/internal/repo"
17 "arche/internal/store"
18)
19
20type Server struct {
21 repo *repo.Repo
22 token string
23 canWrite bool
24
25 OnBookmarkUpdated func(name, oldHex, newHex string)
26
27 PreUpdateHook func(name, oldHex, newHex string) error
28}
29
30func NewServer(r *repo.Repo, token string) *Server {
31 return &Server{repo: r, token: token, canWrite: true}
32}
33
34func NewServerAuth(r *repo.Repo, canWrite bool) *Server {
35 return &Server{repo: r, canWrite: canWrite}
36}
37
38func (s *Server) Handler() http.Handler {
39 mux := http.NewServeMux()
40 mux.HandleFunc("/arche/v1/info", s.handleInfo)
41 mux.HandleFunc("/arche/v1/bloom", s.handleBloom)
42 mux.HandleFunc("/arche/v1/fetch", s.handleFetch)
43 mux.HandleFunc("/arche/v1/push", s.handlePush)
44 mux.HandleFunc("/arche/v1/update-bookmark", s.handleUpdateBookmark)
45 mux.HandleFunc("/arche/v1/issues", s.handleIssues)
46 return s.authMiddleware(mux)
47}
48
49func (s *Server) authMiddleware(next http.Handler) http.Handler {
50 if s.token == "" {
51 return next
52 }
53 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
54 auth := r.Header.Get("Authorization")
55 expected := "Bearer " + s.token
56 if auth != expected {
57 http.Error(w, "Unauthorized", http.StatusUnauthorized)
58 return
59 }
60 next.ServeHTTP(w, r)
61 })
62}
63
64func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) {
65 if r.Method != http.MethodGet {
66 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
67 return
68 }
69
70 headCID, _ := s.repo.Head()
71 bare := object.StripChangeIDPrefix(headCID)
72 commitID, _ := s.repo.Store.GetChangeCommit(bare)
73
74 count, _ := countCommits(s.repo)
75
76 bms, _ := s.repo.Store.ListBookmarks()
77 bmMap := make(map[string]string, len(bms))
78 for _, bm := range bms {
79 bmMap[bm.Name] = hex.EncodeToString(bm.CommitID[:])
80 }
81 _ = commitID
82
83 resp := InfoResponse{
84 HeadChangeID: headCID,
85 CommitCount: count,
86 Bookmarks: bmMap,
87 }
88 w.Header().Set("Content-Type", "application/json")
89 json.NewEncoder(w).Encode(resp)
90}
91
92func (s *Server) handleBloom(w http.ResponseWriter, r *http.Request) {
93 if r.Method != http.MethodPost {
94 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
95 return
96 }
97
98 body, err := io.ReadAll(io.LimitReader(r.Body, 16<<20))
99 if err != nil {
100 http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
101 return
102 }
103
104 var filter *BloomFilter
105 if len(body) > 0 {
106 filter, err = BloomFilterFrom(body)
107 if err != nil {
108 http.Error(w, "parse bloom filter: "+err.Error(), http.StatusBadRequest)
109 return
110 }
111 }
112
113 missing, err := collectMissingObjects(s.repo, filter)
114 if err != nil {
115 http.Error(w, "collect missing: "+err.Error(), http.StatusInternalServerError)
116 return
117 }
118
119 w.Header().Set("Content-Type", "application/octet-stream")
120 if err := writeObjectPack(w, s.repo, missing); err != nil {
121 return
122 }
123}
124
125func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) {
126 if r.Method != http.MethodPost {
127 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
128 return
129 }
130 if !s.canWrite {
131 http.Error(w, "Forbidden: read-only access", http.StatusForbidden)
132 return
133 }
134
135 entries, err := ReadPack(r.Body)
136 if err != nil {
137 http.Error(w, "read pack: "+err.Error(), http.StatusBadRequest)
138 return
139 }
140
141 if err := storePackEntries(s.repo, entries); err != nil {
142 http.Error(w, "store objects: "+err.Error(), http.StatusInternalServerError)
143 return
144 }
145
146 w.Header().Set("Content-Type", "application/json")
147 json.NewEncoder(w).Encode(map[string]bool{"ok": true})
148}
149
150func (s *Server) handleUpdateBookmark(w http.ResponseWriter, r *http.Request) {
151 if r.Method != http.MethodPost {
152 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
153 return
154 }
155 if !s.canWrite {
156 http.Error(w, "Forbidden: read-only access", http.StatusForbidden)
157 return
158 }
159
160 var req RefUpdateRequest
161 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
162 http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
163 return
164 }
165
166 w.Header().Set("Content-Type", "application/json")
167
168 newIDBytes, err := hex.DecodeString(req.NewID)
169 if err != nil || len(newIDBytes) != 32 {
170 http.Error(w, "invalid new_id", http.StatusBadRequest)
171 return
172 }
173 var newID [32]byte
174 copy(newID[:], newIDBytes)
175
176 existing, _ := s.repo.Store.GetBookmark(req.Name)
177 currentHex := ""
178 var currentID [32]byte
179 if existing != nil {
180 currentHex = hex.EncodeToString(existing.CommitID[:])
181 currentID = existing.CommitID
182 }
183
184 if !req.Force && !req.ForcePublic && req.ExpectedID != currentHex {
185 json.NewEncoder(w).Encode(RefUpdateResponse{ //nolint:errcheck
186 OK: false,
187 Current: currentHex,
188 Err: fmt.Sprintf(
189 "bookmark %q diverged: remote tip is %s, expected %s — pull and merge before pushing",
190 req.Name, currentHex, req.ExpectedID),
191 })
192 return
193 }
194
195 if (req.Force || req.ForcePublic) && existing != nil {
196 phase, _ := s.repo.Store.GetPhase(currentID)
197 if phase == object.PhasePublic && !req.ForcePublic {
198 json.NewEncoder(w).Encode(RefUpdateResponse{ //nolint:errcheck
199 OK: false,
200 Err: fmt.Sprintf("bookmark %q points to a public commit; use --force-public to overwrite", req.Name),
201 })
202 return
203 }
204 }
205
206 if s.PreUpdateHook != nil {
207 if hookErr := s.PreUpdateHook(req.Name, currentHex, req.NewID); hookErr != nil {
208 json.NewEncoder(w).Encode(RefUpdateResponse{ //nolint:errcheck
209 OK: false,
210 Err: "pre-update hook rejected: " + hookErr.Error(),
211 })
212 return
213 }
214 }
215
216 tx, err := s.repo.Store.Begin()
217 if err != nil {
218 http.Error(w, "begin tx: "+err.Error(), http.StatusInternalServerError)
219 return
220 }
221
222 if req.ForcePublic && existing != nil {
223 marker := &object.ObsoleteMarker{
224 Predecessor: currentID,
225 Successors: [][32]byte{newID},
226 Reason: "force-push",
227 Timestamp: 0,
228 }
229 if _, err := repo.WriteObsoleteTx(s.repo.Store, tx, marker); err != nil {
230 s.repo.Store.Rollback(tx)
231 http.Error(w, "write obsolete marker: "+err.Error(), http.StatusInternalServerError)
232 return
233 }
234 slog.Warn("force-pushing public bookmark",
235 "bookmark", req.Name,
236 "old_tip", currentHex[:8],
237 "new_tip", req.NewID[:8],
238 )
239 }
240
241 if err := s.repo.Store.SetBookmark(tx, store.Bookmark{Name: req.Name, CommitID: newID}); err != nil {
242 s.repo.Store.Rollback(tx)
243 http.Error(w, "set bookmark: "+err.Error(), http.StatusInternalServerError)
244 return
245 }
246 if err := s.repo.Store.Commit(tx); err != nil {
247 http.Error(w, "commit: "+err.Error(), http.StatusInternalServerError)
248 return
249 }
250
251 if s.OnBookmarkUpdated != nil {
252 go s.OnBookmarkUpdated(req.Name, currentHex, req.NewID)
253 }
254
255 json.NewEncoder(w).Encode(RefUpdateResponse{OK: true}) //nolint:errcheck
256}
257
258func collectMissingObjects(r *repo.Repo, filter *BloomFilter) ([][32]byte, error) {
259 allIDs, err := listAllObjectIDs(r)
260 if err != nil {
261 return nil, err
262 }
263 if filter == nil {
264 return allIDs, nil
265 }
266 var missing [][32]byte
267 for _, id := range allIDs {
268 if !filter.Test(id) {
269 missing = append(missing, id)
270 }
271 }
272 return missing, nil
273}
274
275func listAllObjectIDs(r *repo.Repo) ([][32]byte, error) {
276 seen := make(map[[32]byte]bool)
277 var queue [][32]byte
278
279 if headCID, err := r.Head(); err == nil {
280 bare := object.StripChangeIDPrefix(headCID)
281 if id, err := r.Store.GetChangeCommit(bare); err == nil {
282 if !seen[id] {
283 seen[id] = true
284 queue = append(queue, id)
285 }
286 }
287 }
288 bms, _ := r.Store.ListBookmarks()
289 for _, bm := range bms {
290 if !seen[bm.CommitID] {
291 seen[bm.CommitID] = true
292 queue = append(queue, bm.CommitID)
293 }
294 }
295
296 var result [][32]byte
297 for len(queue) > 0 {
298 id := queue[0]
299 queue = queue[1:]
300 result = append(result, id)
301
302 _, raw, err := r.Store.ReadObject(id)
303 if err != nil {
304 continue
305 }
306 if strings.HasPrefix(string(raw), "arche-commit") {
307 c, err := object.DecodeCommit(raw)
308 if err != nil {
309 continue
310 }
311 if !seen[c.TreeID] {
312 seen[c.TreeID] = true
313 queue = append(queue, c.TreeID)
314 }
315 for _, p := range c.Parents {
316 if !seen[p] {
317 seen[p] = true
318 queue = append(queue, p)
319 }
320 }
321 } else if strings.HasPrefix(string(raw), "arche-tree") {
322 t, err := object.DecodeTree(raw)
323 if err != nil {
324 continue
325 }
326 for _, e := range t.Entries {
327 if !seen[e.ObjectID] {
328 seen[e.ObjectID] = true
329 queue = append(queue, e.ObjectID)
330 }
331 }
332 }
333 }
334
335 issueIDs, _ := r.Store.ListObjectsByKind(string(object.KindIssueEvent))
336 for _, id := range issueIDs {
337 if !seen[id] {
338 seen[id] = true
339 result = append(result, id)
340 }
341 }
342
343 return result, nil
344}
345
346func listPushableObjectIDs(r *repo.Repo) ([][32]byte, error) {
347 seen := make(map[[32]byte]bool)
348 var queue [][32]byte
349
350 addIfNonSecret := func(id [32]byte) {
351 phase, _ := r.Store.GetPhase(id)
352 if phase == object.PhaseSecret {
353 return
354 }
355 if !seen[id] {
356 seen[id] = true
357 queue = append(queue, id)
358 }
359 }
360
361 if headCID, err := r.Head(); err == nil {
362 bare := object.StripChangeIDPrefix(headCID)
363 if id, err := r.Store.GetChangeCommit(bare); err == nil {
364 addIfNonSecret(id)
365 }
366 }
367 bms, _ := r.Store.ListBookmarks()
368 for _, bm := range bms {
369 addIfNonSecret(bm.CommitID)
370 }
371
372 var result [][32]byte
373 for len(queue) > 0 {
374 id := queue[0]
375 queue = queue[1:]
376 result = append(result, id)
377
378 _, raw, err := r.Store.ReadObject(id)
379 if err != nil {
380 continue
381 }
382 if bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
383 c, err := object.DecodeCommit(raw)
384 if err != nil {
385 continue
386 }
387 if !seen[c.TreeID] {
388 seen[c.TreeID] = true
389 queue = append(queue, c.TreeID)
390 }
391 for _, p := range c.Parents {
392 phase, _ := r.Store.GetPhase(p)
393 if phase == object.PhaseSecret {
394 continue
395 }
396 if !seen[p] {
397 seen[p] = true
398 queue = append(queue, p)
399 }
400 }
401 } else if bytes.HasPrefix(raw, []byte("arche-tree\x00")) {
402 t, err := object.DecodeTree(raw)
403 if err != nil {
404 continue
405 }
406 for _, e := range t.Entries {
407 if !seen[e.ObjectID] {
408 seen[e.ObjectID] = true
409 queue = append(queue, e.ObjectID)
410 }
411 }
412 }
413 }
414
415 issueIDs, _ := r.Store.ListObjectsByKind(string(object.KindIssueEvent))
416 for _, id := range issueIDs {
417 if !seen[id] {
418 seen[id] = true
419 result = append(result, id)
420 }
421 }
422
423 return result, nil
424}
425
426func (s *Server) handleFetch(w http.ResponseWriter, r *http.Request) {
427 if r.Method != http.MethodPost {
428 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
429 return
430 }
431
432 body, err := io.ReadAll(io.LimitReader(r.Body, 4<<20))
433 if err != nil {
434 http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
435 return
436 }
437 var hexIDs []string
438 if err := json.Unmarshal(body, &hexIDs); err != nil {
439 http.Error(w, "decode: "+err.Error(), http.StatusBadRequest)
440 return
441 }
442 ids := make([][32]byte, 0, len(hexIDs))
443 for _, h := range hexIDs {
444 b, err := hex.DecodeString(h)
445 if err != nil || len(b) != 32 {
446 continue
447 }
448 var id [32]byte
449 copy(id[:], b)
450 ids = append(ids, id)
451 }
452
453 w.Header().Set("Content-Type", "application/octet-stream")
454 writeObjectPack(w, s.repo, ids) //nolint:errcheck
455}
456
457func writeObjectPack(w io.Writer, r *repo.Repo, ids [][32]byte) error {
458 for _, id := range ids {
459 kind, data, err := r.Store.ReadObject(id)
460 if err != nil {
461 continue
462 }
463
464 entry := PackEntry{ID: id, Kind: kind, Data: data}
465 if err := writePackEntry(w, entry); err != nil {
466 return err
467 }
468 }
469
470 return writePackEnd(w)
471}
472
473func storePackEntries(r *repo.Repo, entries []PackEntry) error {
474 if len(entries) == 0 {
475 return nil
476 }
477 tx, err := r.Store.Begin()
478 if err != nil {
479 return err
480 }
481
482 for _, e := range entries {
483 if err := r.Store.WriteObject(tx, e.ID, e.Kind, e.Data); err != nil {
484 r.Store.Rollback(tx)
485 return fmt.Errorf("store object %x: %w", e.ID[:6], err)
486 }
487 if e.Kind == string(object.KindCommit) {
488 if err := r.Store.SetPhase(tx, e.ID, object.PhasePublic); err != nil {
489 r.Store.Rollback(tx)
490 return fmt.Errorf("set phase %x: %w", e.ID[:6], err)
491 }
492 }
493 }
494
495 if err := r.Store.Commit(tx); err != nil {
496 return err
497 }
498
499 applyReceivedIssueEvents(r, entries)
500
501 return nil
502}
503
504func applyReceivedIssueEvents(r *repo.Repo, entries []PackEntry) {
505 var issueEvs []issues.IssueEvent
506 for _, e := range entries {
507 if e.Kind != string(object.KindIssueEvent) {
508 continue
509 }
510 obj, err := object.DecodeIssueEvent(e.Data)
511 if err != nil {
512 continue
513 }
514 issueEvs = append(issueEvs, issues.IssueEvent{
515 EventID: fmt.Sprintf("%x", e.ID),
516 IssueID: obj.IssueID,
517 HLCMS: obj.HLCMS,
518 HLCSeq: obj.HLCSeq,
519 Kind: obj.Kind,
520 Payload: obj.Payload,
521 Author: obj.Author,
522 Created: 0,
523 })
524 }
525 if len(issueEvs) == 0 {
526 return
527 }
528 idb, err := issuedb.Open(r.ArcheDir())
529 if err != nil {
530 return
531 }
532 defer idb.Close()
533 _ = idb.Issues.MergeEvents(issueEvs)
534}
535
536func countCommits(r *repo.Repo) (int64, error) {
537 ids, err := listAllObjectIDs(r)
538 if err != nil {
539 return 0, err
540 }
541 var n int64
542 for _, id := range ids {
543 _, raw, err := r.Store.ReadObject(id)
544 if err != nil {
545 continue
546 }
547 if bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
548 n++
549 }
550 }
551 return n, nil
552}
553
554func (s *Server) handleIssues(w http.ResponseWriter, r *http.Request) {
555 idb, err := issuedb.Open(s.repo.ArcheDir())
556 if err != nil {
557 http.Error(w, "open issuedb: "+err.Error(), http.StatusInternalServerError)
558 return
559 }
560 defer idb.Close()
561
562 switch r.Method {
563 case http.MethodGet:
564 evs, err := idb.Issues.AllEvents()
565 if err != nil {
566 http.Error(w, "list events: "+err.Error(), http.StatusInternalServerError)
567 return
568 }
569 w.Header().Set("Content-Type", "application/json")
570 json.NewEncoder(w).Encode(evs)
571
572 case http.MethodPost:
573 if !s.canWrite {
574 http.Error(w, "Forbidden: read-only access", http.StatusForbidden)
575 return
576 }
577 body, err := io.ReadAll(io.LimitReader(r.Body, 32<<20))
578 if err != nil {
579 http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
580 return
581 }
582 var evs []issues.IssueEvent
583 if err := json.Unmarshal(body, &evs); err != nil {
584 http.Error(w, "decode events: "+err.Error(), http.StatusBadRequest)
585 return
586 }
587 if err := idb.Issues.MergeEvents(evs); err != nil {
588 http.Error(w, "merge events: "+err.Error(), http.StatusInternalServerError)
589 return
590 }
591 w.Header().Set("Content-Type", "application/json")
592 json.NewEncoder(w).Encode(map[string]int{"merged": len(evs)})
593
594 default:
595 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
596 }
597}