arche / internal/archesrv/webhooks.go

commit 154431fd
  1package archesrv
  2
  3import (
  4	"bytes"
  5	"crypto/hmac"
  6	"crypto/sha256"
  7	"encoding/hex"
  8	"encoding/json"
  9	"fmt"
 10	"log/slog"
 11	"net/http"
 12	"time"
 13
 14	"github.com/google/uuid"
 15)
 16
 17type WebhookRecord struct {
 18	ID        int64
 19	RepoID    int64
 20	URL       string
 21	Secret    string
 22	Events    string
 23	Active    bool
 24	CreatedAt time.Time
 25}
 26
 27type WebhookDelivery struct {
 28	ID           int64
 29	WebhookID    int64
 30	Event        string
 31	Payload      string
 32	ResponseCode int
 33	ResponseBody string
 34	Error        string
 35	DeliveredAt  time.Time
 36}
 37
 38type PushPayload struct {
 39	Event     string      `json:"event"`
 40	Repo      string      `json:"repo"`
 41	Pusher    string      `json:"pusher"`
 42	PushID    string      `json:"push_id"`
 43	Bookmark  string      `json:"bookmark"`
 44	OldCommit string      `json:"old_commit"`
 45	NewCommit string      `json:"new_commit"`
 46	Commits   []CommitRef `json:"commits"`
 47}
 48
 49func newPushID() string {
 50	return uuid.Must(uuid.NewV7()).String()
 51}
 52
 53type CommitRef struct {
 54	ID       string `json:"id"`
 55	ChangeID string `json:"change_id"`
 56	Message  string `json:"message"`
 57	Author   string `json:"author"`
 58}
 59
 60func (d *DB) CreateWebhook(repoID int64, url, secret, events string) (*WebhookRecord, error) {
 61	res, err := d.db.Exec(
 62		"INSERT INTO webhooks(repo_id,url,secret,events,active,created_at) VALUES(?,?,?,?,1,?)",
 63		repoID, url, secret, events, time.Now().Unix(),
 64	)
 65	if err != nil {
 66		return nil, err
 67	}
 68	id, _ := res.LastInsertId()
 69	return &WebhookRecord{ID: id, RepoID: repoID, URL: url, Secret: secret, Events: events, Active: true}, nil
 70}
 71
 72func (d *DB) ListWebhooks(repoID int64) ([]WebhookRecord, error) {
 73	rows, err := d.db.Query(
 74		"SELECT id, repo_id, url, secret, events, active, created_at FROM webhooks WHERE repo_id=? ORDER BY id",
 75		repoID,
 76	)
 77	if err != nil {
 78		return nil, err
 79	}
 80	defer rows.Close()
 81	var out []WebhookRecord
 82	for rows.Next() {
 83		var w WebhookRecord
 84		var active int
 85		var ts int64
 86		if err := rows.Scan(&w.ID, &w.RepoID, &w.URL, &w.Secret, &w.Events, &active, &ts); err != nil {
 87			return nil, err
 88		}
 89		w.Active = active == 1
 90		w.CreatedAt = time.Unix(ts, 0)
 91		out = append(out, w)
 92	}
 93	return out, rows.Err()
 94}
 95
 96func (d *DB) GetWebhook(id int64) (*WebhookRecord, error) {
 97	var w WebhookRecord
 98	var active int
 99	var ts int64
100	err := d.db.QueryRow(
101		"SELECT id, repo_id, url, secret, events, active, created_at FROM webhooks WHERE id=?", id,
102	).Scan(&w.ID, &w.RepoID, &w.URL, &w.Secret, &w.Events, &active, &ts)
103	if err != nil {
104		return nil, err
105	}
106	w.Active = active == 1
107	w.CreatedAt = time.Unix(ts, 0)
108	return &w, nil
109}
110
111func (d *DB) DeleteWebhook(id int64) error {
112	_, err := d.db.Exec("DELETE FROM webhooks WHERE id=?", id)
113	return err
114}
115
116func (d *DB) ListDeliveries(webhookID int64) ([]WebhookDelivery, error) {
117	rows, err := d.db.Query(
118		`SELECT id, webhook_id, event, payload, COALESCE(response_code,0),
119		 COALESCE(response_body,''), COALESCE(error,''), delivered_at
120		 FROM webhook_deliveries WHERE webhook_id=? ORDER BY id DESC LIMIT 50`,
121		webhookID,
122	)
123	if err != nil {
124		return nil, err
125	}
126	defer rows.Close()
127	var out []WebhookDelivery
128	for rows.Next() {
129		var d WebhookDelivery
130		var ts int64
131		if err := rows.Scan(&d.ID, &d.WebhookID, &d.Event, &d.Payload,
132			&d.ResponseCode, &d.ResponseBody, &d.Error, &ts); err != nil {
133			return nil, err
134		}
135		d.DeliveredAt = time.Unix(ts, 0)
136		out = append(out, d)
137	}
138	return out, rows.Err()
139}
140
141func (d *DB) recordDelivery(webhookID int64, event, payload string, code int, body, errStr string) {
142	d.db.Exec( //nolint:errcheck
143		`INSERT INTO webhook_deliveries(webhook_id,event,payload,response_code,response_body,error,delivered_at)
144		 VALUES(?,?,?,?,?,?,?)`,
145		webhookID, event, payload, code, body, errStr, time.Now().Unix(),
146	)
147}
148
149func (d *DB) FirePushWebhooks(repoName, pusher, bookmark, oldCommit, newCommit string, commits []CommitRef) {
150	hooks, err := d.db.Query(
151		"SELECT id, url, secret FROM webhooks WHERE repo_id=(SELECT id FROM repos WHERE name=?) AND active=1",
152		repoName,
153	)
154	if err != nil {
155		return
156	}
157	type hook struct {
158		id     int64
159		url    string
160		secret string
161	}
162	var hs []hook
163	for hooks.Next() {
164		var h hook
165		hooks.Scan(&h.id, &h.url, &h.secret) //nolint:errcheck
166		hs = append(hs, h)
167	}
168	hooks.Close()
169
170	if len(hs) == 0 {
171		return
172	}
173
174	if commits == nil {
175		commits = []CommitRef{}
176	}
177
178	payload := PushPayload{
179		Event:     "push",
180		Repo:      repoName,
181		Pusher:    pusher,
182		PushID:    newPushID(),
183		Bookmark:  bookmark,
184		OldCommit: oldCommit,
185		NewCommit: newCommit,
186		Commits:   commits,
187	}
188	payloadBytes, _ := json.Marshal(payload)
189
190	for _, h := range hs {
191		h := h
192		go d.deliverWebhook(h.id, h.url, h.secret, "push", payloadBytes)
193	}
194}
195
196func (d *DB) deliverWebhook(webhookID int64, url, secret, event string, payload []byte) {
197	sig := computeHMAC(secret, payload)
198
199	var lastCode int
200	var lastBody string
201	var lastErr string
202
203	for attempt := 0; attempt < 5; attempt++ {
204		if attempt > 0 {
205			time.Sleep(time.Duration(1<<uint(attempt-1)) * time.Second)
206		}
207
208		req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(payload))
209		if err != nil {
210			lastErr = err.Error()
211			continue
212		}
213		req.Header.Set("Content-Type", "application/json")
214		req.Header.Set("X-Arche-Event", event)
215		req.Header.Set("X-Arche-Signature", "sha256="+sig)
216
217		client := &http.Client{Timeout: 10 * time.Second}
218		resp, err := client.Do(req)
219		if err != nil {
220			lastErr = err.Error()
221			continue
222		}
223
224		buf := new(bytes.Buffer)
225		buf.ReadFrom(resp.Body) //nolint:errcheck
226		resp.Body.Close()
227
228		lastCode = resp.StatusCode
229		lastBody = buf.String()
230		lastErr = ""
231
232		if resp.StatusCode >= 200 && resp.StatusCode < 300 {
233			d.recordDelivery(webhookID, event, string(payload), lastCode, lastBody, "")
234			return
235		}
236	}
237	d.recordDelivery(webhookID, event, string(payload), lastCode, lastBody, fmt.Sprintf("failed after 5 attempts: %s", lastErr))
238	slog.Warn("webhook delivery failed", "webhook_id", webhookID, "url", url, "event", event, "last_err", lastErr, "last_code", lastCode)
239}
240
241func computeHMAC(secret string, payload []byte) string {
242	mac := hmac.New(sha256.New, []byte(secret))
243	mac.Write(payload)
244	return hex.EncodeToString(mac.Sum(nil))
245}
246
247func (d *DB) ReplayDelivery(deliveryID int64) error {
248	var webhookID int64
249	var event, payload string
250	err := d.db.QueryRow(
251		`SELECT webhook_id, event, payload FROM webhook_deliveries WHERE id=?`,
252		deliveryID,
253	).Scan(&webhookID, &event, &payload)
254	if err != nil {
255		return fmt.Errorf("delivery not found: %w", err)
256	}
257	hook, err := d.GetWebhook(webhookID)
258	if err != nil {
259		return fmt.Errorf("webhook not found: %w", err)
260	}
261	go d.deliverWebhook(hook.ID, hook.URL, hook.Secret, event, []byte(payload))
262	return nil
263}