1package syncpkg
2
3import (
4 "bytes"
5 "encoding/hex"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "net/url"
11 "regexp"
12 "strings"
13 "time"
14
15 "arche/internal/issuedb"
16 "arche/internal/issues"
17 "arche/internal/object"
18 "arche/internal/repo"
19 "arche/internal/store"
20)
21
22type Client struct {
23 url string
24 token string
25 repo *repo.Repo
26 client *http.Client
27}
28
29func NewClient(r *repo.Repo, rawURL, token string) *Client {
30 httpURL, httpClient := resolveClientTransport(rawURL, token)
31 return &Client{
32 url: strings.TrimRight(httpURL, "/"),
33 token: token,
34 repo: r,
35 client: httpClient,
36 }
37}
38
39func resolveClientTransport(rawURL, token string) (string, *http.Client) {
40 u, err := url.Parse(rawURL)
41 if err != nil {
42 return rawURL, &http.Client{}
43 }
44
45 switch u.Scheme {
46 case "arche+ssh":
47 host := u.Hostname()
48 port := u.Port()
49 if port == "" {
50 port = "22"
51 }
52 sshHost := host + ":" + port
53 repoName := strings.TrimPrefix(u.Path, "/")
54 sshClient, err := NewSSHClient(sshHost, repoName, "")
55 if err != nil {
56 return rawURL, &http.Client{}
57 }
58 _ = token
59 return "http://localhost", sshClient
60
61 case "arche+mtls":
62 host := u.Hostname()
63 port := u.Port()
64 if port == "" {
65 port = "8443"
66 }
67 certFile := u.Query().Get("cert")
68 keyFile := u.Query().Get("key")
69 mtlsClient, err := NewMTLSClient(certFile, keyFile)
70 if err != nil {
71 return rawURL, &http.Client{}
72 }
73 _ = token
74 baseURL := "https://" + host + ":" + port
75 return baseURL, mtlsClient
76
77 default:
78 return rawURL, &http.Client{}
79 }
80}
81
82func (c *Client) Pull() error {
83 info, err := c.getInfo()
84 if err != nil {
85 return fmt.Errorf("pull: info: %w", err)
86 }
87
88 allLocalIDs, err := listAllObjectIDs(c.repo)
89 if err != nil {
90 return fmt.Errorf("pull: enumerate local: %w", err)
91 }
92 filter := NewBloom(max(len(allLocalIDs), 1))
93 for _, id := range allLocalIDs {
94 filter.Add(id)
95 }
96
97 pack, err := c.fetchBloom(filter.Bytes())
98 if err != nil {
99 return fmt.Errorf("pull: bloom/fetch: %w", err)
100 }
101
102 if err := storePackEntries(c.repo, pack); err != nil {
103 return fmt.Errorf("pull: store pack: %w", err)
104 }
105
106 if err := ensureChangeInStore(c.repo, pack); err != nil {
107 return fmt.Errorf("pull: register change IDs: %w", err)
108 }
109
110 for round := 0; round < 3; round++ {
111 missing, err := c.checkMissingObjects(info.Bookmarks)
112 if err != nil || len(missing) == 0 {
113 break
114 }
115 pack2, err := c.fetchExact(missing)
116 if err != nil {
117 return fmt.Errorf("pull: convergence round %d: %w", round+1, err)
118 }
119 if err := storePackEntries(c.repo, pack2); err != nil {
120 return fmt.Errorf("pull: store convergence pack: %w", err)
121 }
122 if err := ensureChangeInStore(c.repo, pack2); err != nil {
123 return fmt.Errorf("pull: register change IDs (round %d): %w", round+1, err)
124 }
125 }
126
127 for name, hexID := range info.Bookmarks {
128 idBytes, err := hex.DecodeString(hexID)
129 if err != nil || len(idBytes) != 32 {
130 continue
131 }
132 var id [32]byte
133 copy(id[:], idBytes)
134
135 tx, err := c.repo.Store.Begin()
136 if err != nil {
137 return fmt.Errorf("pull: begin tx: %w", err)
138 }
139 if err := c.repo.Store.SetBookmark(tx, store.Bookmark{Name: name, CommitID: id, Remote: c.url}); err != nil {
140 c.repo.Store.Rollback(tx)
141 return fmt.Errorf("pull: set bookmark %q: %w", name, err)
142 }
143 if err := c.repo.Store.Commit(tx); err != nil {
144 return fmt.Errorf("pull: commit bookmark: %w", err)
145 }
146 }
147
148 if err := c.pullIssues(); err != nil {
149 return fmt.Errorf("pull: issues: %w", err)
150 }
151
152 return nil
153}
154
155type PushOptions struct {
156 Force bool
157 ForcePublic bool
158}
159
160func (c *Client) Push() error { return c.PushWith(PushOptions{}) }
161
162func (c *Client) PushWith(opts PushOptions) error {
163 info, err := c.getInfo()
164 if err != nil {
165 return fmt.Errorf("push: info: %w", err)
166 }
167
168 allLocalIDs, err := listPushableObjectIDs(c.repo)
169 if err != nil {
170 return fmt.Errorf("push: enumerate local: %w", err)
171 }
172
173 var toSend [][32]byte
174 if info.CommitCount == 0 {
175 toSend = allLocalIDs
176 } else {
177 toSend, err = c.objectsToSend(info)
178 if err != nil {
179 return fmt.Errorf("push: diff objects: %w", err)
180 }
181 }
182
183 if len(toSend) > 0 {
184 if err := c.pushObjects(toSend); err != nil {
185 return fmt.Errorf("push: upload pack: %w", err)
186 }
187 }
188
189 before, _ := c.repo.CaptureRefState()
190
191 localBMs, err := c.repo.Store.ListBookmarks()
192 if err != nil {
193 return fmt.Errorf("push: list bookmarks: %w", err)
194 }
195
196 var pushedNames []string
197 var allToPromote [][32]byte
198 for _, bm := range localBMs {
199 if phase, _ := c.repo.Store.GetPhase(bm.CommitID); phase == object.PhaseSecret {
200 continue
201 }
202
203 remoteBM := info.Bookmarks[bm.Name]
204 newHex := hex.EncodeToString(bm.CommitID[:])
205 resp, err := c.updateRemoteBookmark(bm.Name, remoteBM, newHex, opts)
206 if err != nil {
207 return fmt.Errorf("push: update bookmark %q: %w", bm.Name, err)
208 }
209 if !resp.OK {
210 return fmt.Errorf("push rejected for bookmark %q: %s", bm.Name, resp.Err)
211 }
212
213 promotable, err := collectPromotable(c.repo, bm.CommitID)
214 if err != nil {
215 return fmt.Errorf("push: collect promotable for %q: %w", bm.Name, err)
216 }
217 allToPromote = append(allToPromote, promotable...)
218 pushedNames = append(pushedNames, bm.Name)
219 }
220
221 if len(allToPromote) > 0 || len(pushedNames) > 0 {
222 seen := make(map[[32]byte]bool)
223 deduped := allToPromote[:0]
224 for _, id := range allToPromote {
225 if !seen[id] {
226 seen[id] = true
227 deduped = append(deduped, id)
228 }
229 }
230 meta := fmt.Sprintf("pushed [%s] to %s", strings.Join(pushedNames, ", "), c.url)
231 tx, err := c.repo.Store.Begin()
232 if err != nil {
233 return fmt.Errorf("push: begin tx: %w", err)
234 }
235 for _, id := range deduped {
236 if err := c.repo.Store.SetPhase(tx, id, object.PhasePublic); err != nil {
237 c.repo.Store.Rollback(tx)
238 return fmt.Errorf("push: promote phase: %w", err)
239 }
240 }
241 op := store.Operation{
242 Kind: "push", Timestamp: time.Now().Unix(),
243 Before: before, After: before,
244 Metadata: meta,
245 }
246 if _, err := c.repo.Store.InsertOperation(tx, op); err != nil {
247 c.repo.Store.Rollback(tx)
248 return fmt.Errorf("push: insert oplog: %w", err)
249 }
250 if err := c.repo.Store.Commit(tx); err != nil {
251 return fmt.Errorf("push: commit tx: %w", err)
252 }
253 }
254
255 if err := c.pushIssues(); err != nil {
256 return fmt.Errorf("push: issues: %w", err)
257 }
258 return nil
259}
260
261func (c *Client) getInfo() (*InfoResponse, error) {
262 req, _ := http.NewRequest(http.MethodGet, c.url+"/arche/v1/info", nil)
263 c.setAuth(req)
264 resp, err := c.client.Do(req)
265 if err != nil {
266 return nil, err
267 }
268 defer resp.Body.Close()
269 if resp.StatusCode != http.StatusOK {
270 return nil, fmt.Errorf("info: HTTP %d", resp.StatusCode)
271 }
272 var info InfoResponse
273 if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
274 return nil, fmt.Errorf("info decode: %w", err)
275 }
276 return &info, nil
277}
278
279func (c *Client) fetchBloom(filterBytes []byte) ([]PackEntry, error) {
280 req, _ := http.NewRequest(http.MethodPost, c.url+"/arche/v1/bloom",
281 bytes.NewReader(filterBytes))
282 req.Header.Set("Content-Type", "application/octet-stream")
283 c.setAuth(req)
284 resp, err := c.client.Do(req)
285 if err != nil {
286 return nil, err
287 }
288 defer resp.Body.Close()
289 if resp.StatusCode != http.StatusOK {
290 body, _ := io.ReadAll(resp.Body)
291 return nil, fmt.Errorf("bloom: HTTP %d: %s", resp.StatusCode, body)
292 }
293 return ReadPack(resp.Body)
294}
295
296func (c *Client) pushObjects(ids [][32]byte) error {
297 pr, pw := io.Pipe()
298 errCh := make(chan error, 1)
299 go func() {
300 entries := make([]PackEntry, 0, len(ids))
301 for _, id := range ids {
302 kind, data, err := c.repo.Store.ReadObject(id)
303 if err != nil {
304 continue
305 }
306 entries = append(entries, PackEntry{ID: id, Kind: kind, Data: data})
307 }
308 errCh <- WritePack(pw, entries)
309 pw.Close()
310 }()
311
312 req, _ := http.NewRequest(http.MethodPost, c.url+"/arche/v1/push", pr)
313 req.Header.Set("Content-Type", "application/octet-stream")
314 c.setAuth(req)
315 resp, err := c.client.Do(req)
316 if packErr := <-errCh; packErr != nil && err == nil {
317 err = packErr
318 }
319 if err != nil {
320 return err
321 }
322 defer resp.Body.Close()
323 if resp.StatusCode != http.StatusOK {
324 body, _ := io.ReadAll(resp.Body)
325 return fmt.Errorf("push objects: HTTP %d: %s", resp.StatusCode, body)
326 }
327 return nil
328}
329
330func (c *Client) updateRemoteBookmark(name, expectedHex, newHex string, opts PushOptions) (*RefUpdateResponse, error) {
331 body, _ := json.Marshal(RefUpdateRequest{
332 Name: name,
333 ExpectedID: expectedHex,
334 NewID: newHex,
335 Force: opts.Force,
336 ForcePublic: opts.ForcePublic,
337 })
338
339 req, _ := http.NewRequest(http.MethodPost, c.url+"/arche/v1/update-bookmark",
340 bytes.NewReader(body))
341 req.Header.Set("Content-Type", "application/json")
342 c.setAuth(req)
343 resp, err := c.client.Do(req)
344 if err != nil {
345 return nil, err
346 }
347
348 defer resp.Body.Close()
349 var result RefUpdateResponse
350 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
351 return nil, fmt.Errorf("decode update-bookmark response: %w", err)
352 }
353
354 return &result, nil
355}
356
357func (c *Client) setAuth(req *http.Request) {
358 if c.token != "" {
359 req.Header.Set("Authorization", "Bearer "+c.token)
360 }
361}
362
363func (c *Client) objectsToSend(info *InfoResponse) ([][32]byte, error) {
364 allLocal, err := listPushableObjectIDs(c.repo)
365 if err != nil {
366 return nil, err
367 }
368
369 remoteKnown := make(map[[32]byte]bool)
370 for _, hexID := range info.Bookmarks {
371 b, err := hex.DecodeString(hexID)
372 if err != nil || len(b) != 32 {
373 continue
374 }
375 var id [32]byte
376 copy(id[:], b)
377 c.walkAncestors(id, remoteKnown)
378 }
379
380 var toSend [][32]byte
381 for _, id := range allLocal {
382 if !remoteKnown[id] {
383 toSend = append(toSend, id)
384 }
385 }
386 return toSend, nil
387}
388
389func (c *Client) walkAncestors(id [32]byte, known map[[32]byte]bool) {
390 if known[id] {
391 return
392 }
393 known[id] = true
394
395 _, raw, err := c.repo.Store.ReadObject(id)
396 if err != nil {
397 return
398 }
399 if bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
400 commit, err := object.DecodeCommit(raw)
401 if err != nil {
402 return
403 }
404 c.walkAncestors(commit.TreeID, known)
405 for _, p := range commit.Parents {
406 c.walkAncestors(p, known)
407 }
408 } else if bytes.HasPrefix(raw, []byte("arche-tree\x00")) {
409 t, err := object.DecodeTree(raw)
410 if err != nil {
411 return
412 }
413 for _, e := range t.Entries {
414 c.walkAncestors(e.ObjectID, known)
415 }
416 }
417}
418
419func ensureChangeInStore(r *repo.Repo, entries []PackEntry) error {
420 if len(entries) == 0 {
421 return nil
422 }
423 tx, err := r.Store.Begin()
424 if err != nil {
425 return err
426 }
427
428 for _, e := range entries {
429 if !strings.HasPrefix(e.Kind, "commit") {
430 continue
431 }
432 c, err := object.DecodeCommit(e.Data)
433 if err != nil {
434 continue
435 }
436 if c.ChangeID == "" {
437 continue
438 }
439 r.Store.SetChangeCommit(tx, c.ChangeID, e.ID)
440 }
441 return r.Store.Commit(tx)
442}
443
444func (c *Client) pullIssues() error {
445 req, err := http.NewRequest(http.MethodGet, c.url+"/arche/v1/issues", nil)
446 if err != nil {
447 return err
448 }
449 c.setAuth(req)
450 resp, err := c.client.Do(req)
451 if err != nil {
452 return err
453 }
454 defer resp.Body.Close()
455 if resp.StatusCode != http.StatusOK {
456 return fmt.Errorf("server returned %d", resp.StatusCode)
457 }
458 var evs []issues.IssueEvent
459 if err := json.NewDecoder(resp.Body).Decode(&evs); err != nil {
460 return fmt.Errorf("decode: %w", err)
461 }
462 if len(evs) == 0 {
463 return nil
464 }
465 idb, err := issuedb.Open(c.repo.ArcheDir())
466 if err != nil {
467 return fmt.Errorf("open issuedb: %w", err)
468 }
469 defer idb.Close()
470 return idb.Issues.MergeEvents(evs)
471}
472
473func (c *Client) pushIssues() error {
474 idb, err := issuedb.Open(c.repo.ArcheDir())
475 if err != nil {
476 return fmt.Errorf("open issuedb: %w", err)
477 }
478 defer idb.Close()
479
480 all, err := idb.Issues.AllEvents()
481 if err != nil {
482 return fmt.Errorf("list events: %w", err)
483 }
484 if len(all) == 0 {
485 return nil
486 }
487
488 secretIDs, err := c.repo.Store.ListSecretCommitIDs()
489 if err != nil {
490 return fmt.Errorf("list secret commits: %w", err)
491 }
492 secretHexSet := make(map[string]bool, len(secretIDs))
493 for _, id := range secretIDs {
494 secretHexSet[fmt.Sprintf("%x", id)] = true
495 }
496
497 filtered := all[:0]
498 for _, ev := range all {
499 if ev.Kind == "ref" {
500 var ref string
501 json.Unmarshal(ev.Payload, &ref) //nolint:errcheck
502 if c.isSecretCommitRef(ref) {
503 continue
504 }
505 }
506 if ev.Kind == "comment" && len(secretHexSet) > 0 {
507 ev.Payload = redactSecretHashesInPayload(ev.Payload, secretHexSet)
508 }
509 filtered = append(filtered, ev)
510 }
511
512 body, err := json.Marshal(filtered)
513 if err != nil {
514 return err
515 }
516 req, err := http.NewRequest(http.MethodPost, c.url+"/arche/v1/issues", bytes.NewReader(body))
517 if err != nil {
518 return err
519 }
520 req.Header.Set("Content-Type", "application/json")
521 c.setAuth(req)
522 resp, err := c.client.Do(req)
523 if err != nil {
524 return err
525 }
526 defer resp.Body.Close()
527 if resp.StatusCode != http.StatusOK {
528 return fmt.Errorf("server returned %d", resp.StatusCode)
529 }
530 return nil
531}
532
533func (c *Client) isSecretCommitRef(ref string) bool {
534 if len(ref) < 8 {
535 return false
536 }
537 b, err := hex.DecodeString(ref)
538 if err != nil || len(b) != 32 {
539 return false
540 }
541 var id [32]byte
542 copy(id[:], b)
543 phase, err := c.repo.Store.GetPhase(id)
544 if err != nil {
545 return false
546 }
547 return phase == object.PhaseSecret
548}
549
550var hexHashRe = regexp.MustCompile(`\b[0-9a-fA-F]{8,64}\b`)
551
552func redactSecretHashesInPayload(payload []byte, secretHexSet map[string]bool) []byte {
553 var text string
554 if err := json.Unmarshal(payload, &text); err != nil {
555 return payload
556 }
557 redacted := hexHashRe.ReplaceAllStringFunc(text, func(match string) string {
558 lower := strings.ToLower(match)
559 for h := range secretHexSet {
560 if strings.HasPrefix(h, lower) {
561 return "[redacted]"
562 }
563 }
564 return match
565 })
566 if redacted == text {
567 return payload
568 }
569 out, err := json.Marshal(redacted)
570 if err != nil {
571 return payload
572 }
573 return out
574}
575
576func (c *Client) checkMissingObjects(serverBookmarks map[string]string) ([][32]byte, error) {
577 seen := make(map[[32]byte]bool)
578 var missing [][32]byte
579
580 var walk func(id [32]byte)
581 walk = func(id [32]byte) {
582 if seen[id] {
583 return
584 }
585 seen[id] = true
586 kind, data, err := c.repo.Store.ReadObject(id)
587 if err != nil {
588 missing = append(missing, id)
589 return
590 }
591 if strings.HasPrefix(kind, "commit") {
592 commit, err := object.DecodeCommit(data)
593 if err != nil {
594 return
595 }
596 walk(commit.TreeID)
597 for _, p := range commit.Parents {
598 walk(p)
599 }
600 } else if kind == "tree" {
601 tree, err := object.DecodeTree(data)
602 if err != nil {
603 return
604 }
605 for _, e := range tree.Entries {
606 walk(e.ObjectID)
607 }
608 }
609 }
610
611 for _, hexID := range serverBookmarks {
612 b, err := hex.DecodeString(hexID)
613 if err != nil || len(b) != 32 {
614 continue
615 }
616 var id [32]byte
617 copy(id[:], b)
618 walk(id)
619 }
620 return missing, nil
621}
622
623func (c *Client) fetchExact(ids [][32]byte) ([]PackEntry, error) {
624 hexIDs := make([]string, len(ids))
625 for i, id := range ids {
626 hexIDs[i] = hex.EncodeToString(id[:])
627 }
628 body, err := json.Marshal(hexIDs)
629 if err != nil {
630 return nil, err
631 }
632 req, err := http.NewRequest(http.MethodPost, c.url+"/arche/v1/fetch", bytes.NewReader(body))
633 if err != nil {
634 return nil, err
635 }
636 req.Header.Set("Content-Type", "application/json")
637 c.setAuth(req)
638 resp, err := c.client.Do(req)
639 if err != nil {
640 return nil, err
641 }
642 defer resp.Body.Close()
643 if resp.StatusCode != http.StatusOK {
644 b, _ := io.ReadAll(resp.Body)
645 return nil, fmt.Errorf("fetch: HTTP %d: %s", resp.StatusCode, b)
646 }
647 return ReadPack(resp.Body)
648}
649
650func collectPromotable(r *repo.Repo, tipID [32]byte) ([][32]byte, error) {
651 seen := make(map[[32]byte]bool)
652 queue := [][32]byte{tipID}
653 var toPromote [][32]byte
654
655 for len(queue) > 0 {
656 id := queue[0]
657 queue = queue[1:]
658 if seen[id] {
659 continue
660 }
661 seen[id] = true
662
663 phase, _ := r.Store.GetPhase(id)
664 switch phase {
665 case object.PhasePublic, object.PhaseSecret:
666 continue
667 }
668 toPromote = append(toPromote, id)
669
670 _, raw, err := r.Store.ReadObject(id)
671 if err != nil || !bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
672 continue
673 }
674 cm, err := object.DecodeCommit(raw)
675 if err != nil {
676 continue
677 }
678 for _, p := range cm.Parents {
679 if !seen[p] {
680 queue = append(queue, p)
681 }
682 }
683 }
684 return toPromote, nil
685}