arche / internal/syncpkg/client.go

commit 154431fd
  1package syncpkg
  2
  3import (
  4	"bytes"
  5	"encoding/hex"
  6	"encoding/json"
  7	"fmt"
  8	"io"
  9	"net/http"
 10	"net/url"
 11	"regexp"
 12	"strings"
 13	"time"
 14
 15	"arche/internal/issuedb"
 16	"arche/internal/issues"
 17	"arche/internal/object"
 18	"arche/internal/repo"
 19	"arche/internal/store"
 20)
 21
 22type Client struct {
 23	url    string
 24	token  string
 25	repo   *repo.Repo
 26	client *http.Client
 27}
 28
 29func NewClient(r *repo.Repo, rawURL, token string) *Client {
 30	httpURL, httpClient := resolveClientTransport(rawURL, token)
 31	return &Client{
 32		url:    strings.TrimRight(httpURL, "/"),
 33		token:  token,
 34		repo:   r,
 35		client: httpClient,
 36	}
 37}
 38
 39func resolveClientTransport(rawURL, token string) (string, *http.Client) {
 40	u, err := url.Parse(rawURL)
 41	if err != nil {
 42		return rawURL, &http.Client{}
 43	}
 44
 45	switch u.Scheme {
 46	case "arche+ssh":
 47		host := u.Hostname()
 48		port := u.Port()
 49		if port == "" {
 50			port = "22"
 51		}
 52		sshHost := host + ":" + port
 53		repoName := strings.TrimPrefix(u.Path, "/")
 54		sshClient, err := NewSSHClient(sshHost, repoName, "")
 55		if err != nil {
 56			return rawURL, &http.Client{}
 57		}
 58		_ = token
 59		return "http://localhost", sshClient
 60
 61	case "arche+mtls":
 62		host := u.Hostname()
 63		port := u.Port()
 64		if port == "" {
 65			port = "8443"
 66		}
 67		certFile := u.Query().Get("cert")
 68		keyFile := u.Query().Get("key")
 69		mtlsClient, err := NewMTLSClient(certFile, keyFile)
 70		if err != nil {
 71			return rawURL, &http.Client{}
 72		}
 73		_ = token
 74		baseURL := "https://" + host + ":" + port
 75		return baseURL, mtlsClient
 76
 77	default:
 78		return rawURL, &http.Client{}
 79	}
 80}
 81
 82func (c *Client) Pull() error {
 83	info, err := c.getInfo()
 84	if err != nil {
 85		return fmt.Errorf("pull: info: %w", err)
 86	}
 87
 88	allLocalIDs, err := listAllObjectIDs(c.repo)
 89	if err != nil {
 90		return fmt.Errorf("pull: enumerate local: %w", err)
 91	}
 92	filter := NewBloom(max(len(allLocalIDs), 1))
 93	for _, id := range allLocalIDs {
 94		filter.Add(id)
 95	}
 96
 97	pack, err := c.fetchBloom(filter.Bytes())
 98	if err != nil {
 99		return fmt.Errorf("pull: bloom/fetch: %w", err)
100	}
101
102	if err := storePackEntries(c.repo, pack); err != nil {
103		return fmt.Errorf("pull: store pack: %w", err)
104	}
105
106	if err := ensureChangeInStore(c.repo, pack); err != nil {
107		return fmt.Errorf("pull: register change IDs: %w", err)
108	}
109
110	for round := 0; round < 3; round++ {
111		missing, err := c.checkMissingObjects(info.Bookmarks)
112		if err != nil || len(missing) == 0 {
113			break
114		}
115		pack2, err := c.fetchExact(missing)
116		if err != nil {
117			return fmt.Errorf("pull: convergence round %d: %w", round+1, err)
118		}
119		if err := storePackEntries(c.repo, pack2); err != nil {
120			return fmt.Errorf("pull: store convergence pack: %w", err)
121		}
122		if err := ensureChangeInStore(c.repo, pack2); err != nil {
123			return fmt.Errorf("pull: register change IDs (round %d): %w", round+1, err)
124		}
125	}
126
127	for name, hexID := range info.Bookmarks {
128		idBytes, err := hex.DecodeString(hexID)
129		if err != nil || len(idBytes) != 32 {
130			continue
131		}
132		var id [32]byte
133		copy(id[:], idBytes)
134
135		tx, err := c.repo.Store.Begin()
136		if err != nil {
137			return fmt.Errorf("pull: begin tx: %w", err)
138		}
139		if err := c.repo.Store.SetBookmark(tx, store.Bookmark{Name: name, CommitID: id, Remote: c.url}); err != nil {
140			c.repo.Store.Rollback(tx)
141			return fmt.Errorf("pull: set bookmark %q: %w", name, err)
142		}
143		if err := c.repo.Store.Commit(tx); err != nil {
144			return fmt.Errorf("pull: commit bookmark: %w", err)
145		}
146	}
147
148	if err := c.pullIssues(); err != nil {
149		return fmt.Errorf("pull: issues: %w", err)
150	}
151
152	return nil
153}
154
155type PushOptions struct {
156	Force       bool
157	ForcePublic bool
158}
159
160func (c *Client) Push() error { return c.PushWith(PushOptions{}) }
161
162func (c *Client) PushWith(opts PushOptions) error {
163	info, err := c.getInfo()
164	if err != nil {
165		return fmt.Errorf("push: info: %w", err)
166	}
167
168	allLocalIDs, err := listPushableObjectIDs(c.repo)
169	if err != nil {
170		return fmt.Errorf("push: enumerate local: %w", err)
171	}
172
173	var toSend [][32]byte
174	if info.CommitCount == 0 {
175		toSend = allLocalIDs
176	} else {
177		toSend, err = c.objectsToSend(info)
178		if err != nil {
179			return fmt.Errorf("push: diff objects: %w", err)
180		}
181	}
182
183	if len(toSend) > 0 {
184		if err := c.pushObjects(toSend); err != nil {
185			return fmt.Errorf("push: upload pack: %w", err)
186		}
187	}
188
189	before, _ := c.repo.CaptureRefState()
190
191	localBMs, err := c.repo.Store.ListBookmarks()
192	if err != nil {
193		return fmt.Errorf("push: list bookmarks: %w", err)
194	}
195
196	var pushedNames []string
197	var allToPromote [][32]byte
198	for _, bm := range localBMs {
199		if phase, _ := c.repo.Store.GetPhase(bm.CommitID); phase == object.PhaseSecret {
200			continue
201		}
202
203		remoteBM := info.Bookmarks[bm.Name]
204		newHex := hex.EncodeToString(bm.CommitID[:])
205		resp, err := c.updateRemoteBookmark(bm.Name, remoteBM, newHex, opts)
206		if err != nil {
207			return fmt.Errorf("push: update bookmark %q: %w", bm.Name, err)
208		}
209		if !resp.OK {
210			return fmt.Errorf("push rejected for bookmark %q: %s", bm.Name, resp.Err)
211		}
212
213		promotable, err := collectPromotable(c.repo, bm.CommitID)
214		if err != nil {
215			return fmt.Errorf("push: collect promotable for %q: %w", bm.Name, err)
216		}
217		allToPromote = append(allToPromote, promotable...)
218		pushedNames = append(pushedNames, bm.Name)
219	}
220
221	if len(allToPromote) > 0 || len(pushedNames) > 0 {
222		seen := make(map[[32]byte]bool)
223		deduped := allToPromote[:0]
224		for _, id := range allToPromote {
225			if !seen[id] {
226				seen[id] = true
227				deduped = append(deduped, id)
228			}
229		}
230		meta := fmt.Sprintf("pushed [%s] to %s", strings.Join(pushedNames, ", "), c.url)
231		tx, err := c.repo.Store.Begin()
232		if err != nil {
233			return fmt.Errorf("push: begin tx: %w", err)
234		}
235		for _, id := range deduped {
236			if err := c.repo.Store.SetPhase(tx, id, object.PhasePublic); err != nil {
237				c.repo.Store.Rollback(tx)
238				return fmt.Errorf("push: promote phase: %w", err)
239			}
240		}
241		op := store.Operation{
242			Kind: "push", Timestamp: time.Now().Unix(),
243			Before: before, After: before,
244			Metadata: meta,
245		}
246		if _, err := c.repo.Store.InsertOperation(tx, op); err != nil {
247			c.repo.Store.Rollback(tx)
248			return fmt.Errorf("push: insert oplog: %w", err)
249		}
250		if err := c.repo.Store.Commit(tx); err != nil {
251			return fmt.Errorf("push: commit tx: %w", err)
252		}
253	}
254
255	if err := c.pushIssues(); err != nil {
256		return fmt.Errorf("push: issues: %w", err)
257	}
258	return nil
259}
260
261func (c *Client) getInfo() (*InfoResponse, error) {
262	req, _ := http.NewRequest(http.MethodGet, c.url+"/arche/v1/info", nil)
263	c.setAuth(req)
264	resp, err := c.client.Do(req)
265	if err != nil {
266		return nil, err
267	}
268	defer resp.Body.Close()
269	if resp.StatusCode != http.StatusOK {
270		return nil, fmt.Errorf("info: HTTP %d", resp.StatusCode)
271	}
272	var info InfoResponse
273	if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
274		return nil, fmt.Errorf("info decode: %w", err)
275	}
276	return &info, nil
277}
278
279func (c *Client) fetchBloom(filterBytes []byte) ([]PackEntry, error) {
280	req, _ := http.NewRequest(http.MethodPost, c.url+"/arche/v1/bloom",
281		bytes.NewReader(filterBytes))
282	req.Header.Set("Content-Type", "application/octet-stream")
283	c.setAuth(req)
284	resp, err := c.client.Do(req)
285	if err != nil {
286		return nil, err
287	}
288	defer resp.Body.Close()
289	if resp.StatusCode != http.StatusOK {
290		body, _ := io.ReadAll(resp.Body)
291		return nil, fmt.Errorf("bloom: HTTP %d: %s", resp.StatusCode, body)
292	}
293	return ReadPack(resp.Body)
294}
295
296func (c *Client) pushObjects(ids [][32]byte) error {
297	pr, pw := io.Pipe()
298	errCh := make(chan error, 1)
299	go func() {
300		entries := make([]PackEntry, 0, len(ids))
301		for _, id := range ids {
302			kind, data, err := c.repo.Store.ReadObject(id)
303			if err != nil {
304				continue
305			}
306			entries = append(entries, PackEntry{ID: id, Kind: kind, Data: data})
307		}
308		errCh <- WritePack(pw, entries)
309		pw.Close()
310	}()
311
312	req, _ := http.NewRequest(http.MethodPost, c.url+"/arche/v1/push", pr)
313	req.Header.Set("Content-Type", "application/octet-stream")
314	c.setAuth(req)
315	resp, err := c.client.Do(req)
316	if packErr := <-errCh; packErr != nil && err == nil {
317		err = packErr
318	}
319	if err != nil {
320		return err
321	}
322	defer resp.Body.Close()
323	if resp.StatusCode != http.StatusOK {
324		body, _ := io.ReadAll(resp.Body)
325		return fmt.Errorf("push objects: HTTP %d: %s", resp.StatusCode, body)
326	}
327	return nil
328}
329
330func (c *Client) updateRemoteBookmark(name, expectedHex, newHex string, opts PushOptions) (*RefUpdateResponse, error) {
331	body, _ := json.Marshal(RefUpdateRequest{
332		Name:        name,
333		ExpectedID:  expectedHex,
334		NewID:       newHex,
335		Force:       opts.Force,
336		ForcePublic: opts.ForcePublic,
337	})
338
339	req, _ := http.NewRequest(http.MethodPost, c.url+"/arche/v1/update-bookmark",
340		bytes.NewReader(body))
341	req.Header.Set("Content-Type", "application/json")
342	c.setAuth(req)
343	resp, err := c.client.Do(req)
344	if err != nil {
345		return nil, err
346	}
347
348	defer resp.Body.Close()
349	var result RefUpdateResponse
350	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
351		return nil, fmt.Errorf("decode update-bookmark response: %w", err)
352	}
353
354	return &result, nil
355}
356
357func (c *Client) setAuth(req *http.Request) {
358	if c.token != "" {
359		req.Header.Set("Authorization", "Bearer "+c.token)
360	}
361}
362
363func (c *Client) objectsToSend(info *InfoResponse) ([][32]byte, error) {
364	allLocal, err := listPushableObjectIDs(c.repo)
365	if err != nil {
366		return nil, err
367	}
368
369	remoteKnown := make(map[[32]byte]bool)
370	for _, hexID := range info.Bookmarks {
371		b, err := hex.DecodeString(hexID)
372		if err != nil || len(b) != 32 {
373			continue
374		}
375		var id [32]byte
376		copy(id[:], b)
377		c.walkAncestors(id, remoteKnown)
378	}
379
380	var toSend [][32]byte
381	for _, id := range allLocal {
382		if !remoteKnown[id] {
383			toSend = append(toSend, id)
384		}
385	}
386	return toSend, nil
387}
388
389func (c *Client) walkAncestors(id [32]byte, known map[[32]byte]bool) {
390	if known[id] {
391		return
392	}
393	known[id] = true
394
395	_, raw, err := c.repo.Store.ReadObject(id)
396	if err != nil {
397		return
398	}
399	if bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
400		commit, err := object.DecodeCommit(raw)
401		if err != nil {
402			return
403		}
404		c.walkAncestors(commit.TreeID, known)
405		for _, p := range commit.Parents {
406			c.walkAncestors(p, known)
407		}
408	} else if bytes.HasPrefix(raw, []byte("arche-tree\x00")) {
409		t, err := object.DecodeTree(raw)
410		if err != nil {
411			return
412		}
413		for _, e := range t.Entries {
414			c.walkAncestors(e.ObjectID, known)
415		}
416	}
417}
418
419func ensureChangeInStore(r *repo.Repo, entries []PackEntry) error {
420	if len(entries) == 0 {
421		return nil
422	}
423	tx, err := r.Store.Begin()
424	if err != nil {
425		return err
426	}
427
428	for _, e := range entries {
429		if !strings.HasPrefix(e.Kind, "commit") {
430			continue
431		}
432		c, err := object.DecodeCommit(e.Data)
433		if err != nil {
434			continue
435		}
436		if c.ChangeID == "" {
437			continue
438		}
439		r.Store.SetChangeCommit(tx, c.ChangeID, e.ID)
440	}
441	return r.Store.Commit(tx)
442}
443
444func (c *Client) pullIssues() error {
445	req, err := http.NewRequest(http.MethodGet, c.url+"/arche/v1/issues", nil)
446	if err != nil {
447		return err
448	}
449	c.setAuth(req)
450	resp, err := c.client.Do(req)
451	if err != nil {
452		return err
453	}
454	defer resp.Body.Close()
455	if resp.StatusCode != http.StatusOK {
456		return fmt.Errorf("server returned %d", resp.StatusCode)
457	}
458	var evs []issues.IssueEvent
459	if err := json.NewDecoder(resp.Body).Decode(&evs); err != nil {
460		return fmt.Errorf("decode: %w", err)
461	}
462	if len(evs) == 0 {
463		return nil
464	}
465	idb, err := issuedb.Open(c.repo.ArcheDir())
466	if err != nil {
467		return fmt.Errorf("open issuedb: %w", err)
468	}
469	defer idb.Close()
470	return idb.Issues.MergeEvents(evs)
471}
472
473func (c *Client) pushIssues() error {
474	idb, err := issuedb.Open(c.repo.ArcheDir())
475	if err != nil {
476		return fmt.Errorf("open issuedb: %w", err)
477	}
478	defer idb.Close()
479
480	all, err := idb.Issues.AllEvents()
481	if err != nil {
482		return fmt.Errorf("list events: %w", err)
483	}
484	if len(all) == 0 {
485		return nil
486	}
487
488	secretIDs, err := c.repo.Store.ListSecretCommitIDs()
489	if err != nil {
490		return fmt.Errorf("list secret commits: %w", err)
491	}
492	secretHexSet := make(map[string]bool, len(secretIDs))
493	for _, id := range secretIDs {
494		secretHexSet[fmt.Sprintf("%x", id)] = true
495	}
496
497	filtered := all[:0]
498	for _, ev := range all {
499		if ev.Kind == "ref" {
500			var ref string
501			json.Unmarshal(ev.Payload, &ref) //nolint:errcheck
502			if c.isSecretCommitRef(ref) {
503				continue
504			}
505		}
506		if ev.Kind == "comment" && len(secretHexSet) > 0 {
507			ev.Payload = redactSecretHashesInPayload(ev.Payload, secretHexSet)
508		}
509		filtered = append(filtered, ev)
510	}
511
512	body, err := json.Marshal(filtered)
513	if err != nil {
514		return err
515	}
516	req, err := http.NewRequest(http.MethodPost, c.url+"/arche/v1/issues", bytes.NewReader(body))
517	if err != nil {
518		return err
519	}
520	req.Header.Set("Content-Type", "application/json")
521	c.setAuth(req)
522	resp, err := c.client.Do(req)
523	if err != nil {
524		return err
525	}
526	defer resp.Body.Close()
527	if resp.StatusCode != http.StatusOK {
528		return fmt.Errorf("server returned %d", resp.StatusCode)
529	}
530	return nil
531}
532
533func (c *Client) isSecretCommitRef(ref string) bool {
534	if len(ref) < 8 {
535		return false
536	}
537	b, err := hex.DecodeString(ref)
538	if err != nil || len(b) != 32 {
539		return false
540	}
541	var id [32]byte
542	copy(id[:], b)
543	phase, err := c.repo.Store.GetPhase(id)
544	if err != nil {
545		return false
546	}
547	return phase == object.PhaseSecret
548}
549
550var hexHashRe = regexp.MustCompile(`\b[0-9a-fA-F]{8,64}\b`)
551
552func redactSecretHashesInPayload(payload []byte, secretHexSet map[string]bool) []byte {
553	var text string
554	if err := json.Unmarshal(payload, &text); err != nil {
555		return payload
556	}
557	redacted := hexHashRe.ReplaceAllStringFunc(text, func(match string) string {
558		lower := strings.ToLower(match)
559		for h := range secretHexSet {
560			if strings.HasPrefix(h, lower) {
561				return "[redacted]"
562			}
563		}
564		return match
565	})
566	if redacted == text {
567		return payload
568	}
569	out, err := json.Marshal(redacted)
570	if err != nil {
571		return payload
572	}
573	return out
574}
575
576func (c *Client) checkMissingObjects(serverBookmarks map[string]string) ([][32]byte, error) {
577	seen := make(map[[32]byte]bool)
578	var missing [][32]byte
579
580	var walk func(id [32]byte)
581	walk = func(id [32]byte) {
582		if seen[id] {
583			return
584		}
585		seen[id] = true
586		kind, data, err := c.repo.Store.ReadObject(id)
587		if err != nil {
588			missing = append(missing, id)
589			return
590		}
591		if strings.HasPrefix(kind, "commit") {
592			commit, err := object.DecodeCommit(data)
593			if err != nil {
594				return
595			}
596			walk(commit.TreeID)
597			for _, p := range commit.Parents {
598				walk(p)
599			}
600		} else if kind == "tree" {
601			tree, err := object.DecodeTree(data)
602			if err != nil {
603				return
604			}
605			for _, e := range tree.Entries {
606				walk(e.ObjectID)
607			}
608		}
609	}
610
611	for _, hexID := range serverBookmarks {
612		b, err := hex.DecodeString(hexID)
613		if err != nil || len(b) != 32 {
614			continue
615		}
616		var id [32]byte
617		copy(id[:], b)
618		walk(id)
619	}
620	return missing, nil
621}
622
623func (c *Client) fetchExact(ids [][32]byte) ([]PackEntry, error) {
624	hexIDs := make([]string, len(ids))
625	for i, id := range ids {
626		hexIDs[i] = hex.EncodeToString(id[:])
627	}
628	body, err := json.Marshal(hexIDs)
629	if err != nil {
630		return nil, err
631	}
632	req, err := http.NewRequest(http.MethodPost, c.url+"/arche/v1/fetch", bytes.NewReader(body))
633	if err != nil {
634		return nil, err
635	}
636	req.Header.Set("Content-Type", "application/json")
637	c.setAuth(req)
638	resp, err := c.client.Do(req)
639	if err != nil {
640		return nil, err
641	}
642	defer resp.Body.Close()
643	if resp.StatusCode != http.StatusOK {
644		b, _ := io.ReadAll(resp.Body)
645		return nil, fmt.Errorf("fetch: HTTP %d: %s", resp.StatusCode, b)
646	}
647	return ReadPack(resp.Body)
648}
649
650func collectPromotable(r *repo.Repo, tipID [32]byte) ([][32]byte, error) {
651	seen := make(map[[32]byte]bool)
652	queue := [][32]byte{tipID}
653	var toPromote [][32]byte
654
655	for len(queue) > 0 {
656		id := queue[0]
657		queue = queue[1:]
658		if seen[id] {
659			continue
660		}
661		seen[id] = true
662
663		phase, _ := r.Store.GetPhase(id)
664		switch phase {
665		case object.PhasePublic, object.PhaseSecret:
666			continue
667		}
668		toPromote = append(toPromote, id)
669
670		_, raw, err := r.Store.ReadObject(id)
671		if err != nil || !bytes.HasPrefix(raw, []byte("arche-commit\x00")) {
672			continue
673		}
674		cm, err := object.DecodeCommit(raw)
675		if err != nil {
676			continue
677		}
678		for _, p := range cm.Parents {
679			if !seen[p] {
680				queue = append(queue, p)
681			}
682		}
683	}
684	return toPromote, nil
685}