1package archesrv
2
3import (
4 "bytes"
5 "crypto/hmac"
6 "crypto/sha256"
7 "encoding/hex"
8 "encoding/json"
9 "fmt"
10 "log/slog"
11 "net/http"
12 "time"
13
14 "github.com/google/uuid"
15)
16
17type WebhookRecord struct {
18 ID int64
19 RepoID int64
20 URL string
21 Secret string
22 Events string
23 Active bool
24 CreatedAt time.Time
25}
26
27type WebhookDelivery struct {
28 ID int64
29 WebhookID int64
30 Event string
31 Payload string
32 ResponseCode int
33 ResponseBody string
34 Error string
35 DeliveredAt time.Time
36}
37
38type PushPayload struct {
39 Event string `json:"event"`
40 Repo string `json:"repo"`
41 Pusher string `json:"pusher"`
42 PushID string `json:"push_id"`
43 Bookmark string `json:"bookmark"`
44 OldCommit string `json:"old_commit"`
45 NewCommit string `json:"new_commit"`
46 Commits []CommitRef `json:"commits"`
47}
48
49func newPushID() string {
50 return uuid.Must(uuid.NewV7()).String()
51}
52
53type CommitRef struct {
54 ID string `json:"id"`
55 ChangeID string `json:"change_id"`
56 Message string `json:"message"`
57 Author string `json:"author"`
58}
59
60func (d *DB) CreateWebhook(repoID int64, url, secret, events string) (*WebhookRecord, error) {
61 res, err := d.db.Exec(
62 "INSERT INTO webhooks(repo_id,url,secret,events,active,created_at) VALUES(?,?,?,?,1,?)",
63 repoID, url, secret, events, time.Now().Unix(),
64 )
65 if err != nil {
66 return nil, err
67 }
68 id, _ := res.LastInsertId()
69 return &WebhookRecord{ID: id, RepoID: repoID, URL: url, Secret: secret, Events: events, Active: true}, nil
70}
71
72func (d *DB) ListWebhooks(repoID int64) ([]WebhookRecord, error) {
73 rows, err := d.db.Query(
74 "SELECT id, repo_id, url, secret, events, active, created_at FROM webhooks WHERE repo_id=? ORDER BY id",
75 repoID,
76 )
77 if err != nil {
78 return nil, err
79 }
80 defer rows.Close()
81 var out []WebhookRecord
82 for rows.Next() {
83 var w WebhookRecord
84 var active int
85 var ts int64
86 if err := rows.Scan(&w.ID, &w.RepoID, &w.URL, &w.Secret, &w.Events, &active, &ts); err != nil {
87 return nil, err
88 }
89 w.Active = active == 1
90 w.CreatedAt = time.Unix(ts, 0)
91 out = append(out, w)
92 }
93 return out, rows.Err()
94}
95
96func (d *DB) GetWebhook(id int64) (*WebhookRecord, error) {
97 var w WebhookRecord
98 var active int
99 var ts int64
100 err := d.db.QueryRow(
101 "SELECT id, repo_id, url, secret, events, active, created_at FROM webhooks WHERE id=?", id,
102 ).Scan(&w.ID, &w.RepoID, &w.URL, &w.Secret, &w.Events, &active, &ts)
103 if err != nil {
104 return nil, err
105 }
106 w.Active = active == 1
107 w.CreatedAt = time.Unix(ts, 0)
108 return &w, nil
109}
110
111func (d *DB) DeleteWebhook(id int64) error {
112 _, err := d.db.Exec("DELETE FROM webhooks WHERE id=?", id)
113 return err
114}
115
116func (d *DB) ListDeliveries(webhookID int64) ([]WebhookDelivery, error) {
117 rows, err := d.db.Query(
118 `SELECT id, webhook_id, event, payload, COALESCE(response_code,0),
119 COALESCE(response_body,''), COALESCE(error,''), delivered_at
120 FROM webhook_deliveries WHERE webhook_id=? ORDER BY id DESC LIMIT 50`,
121 webhookID,
122 )
123 if err != nil {
124 return nil, err
125 }
126 defer rows.Close()
127 var out []WebhookDelivery
128 for rows.Next() {
129 var d WebhookDelivery
130 var ts int64
131 if err := rows.Scan(&d.ID, &d.WebhookID, &d.Event, &d.Payload,
132 &d.ResponseCode, &d.ResponseBody, &d.Error, &ts); err != nil {
133 return nil, err
134 }
135 d.DeliveredAt = time.Unix(ts, 0)
136 out = append(out, d)
137 }
138 return out, rows.Err()
139}
140
141func (d *DB) recordDelivery(webhookID int64, event, payload string, code int, body, errStr string) {
142 d.db.Exec( //nolint:errcheck
143 `INSERT INTO webhook_deliveries(webhook_id,event,payload,response_code,response_body,error,delivered_at)
144 VALUES(?,?,?,?,?,?,?)`,
145 webhookID, event, payload, code, body, errStr, time.Now().Unix(),
146 )
147}
148
149func (d *DB) FirePushWebhooks(repoName, pusher, bookmark, oldCommit, newCommit string, commits []CommitRef) {
150 hooks, err := d.db.Query(
151 "SELECT id, url, secret FROM webhooks WHERE repo_id=(SELECT id FROM repos WHERE name=?) AND active=1",
152 repoName,
153 )
154 if err != nil {
155 return
156 }
157 type hook struct {
158 id int64
159 url string
160 secret string
161 }
162 var hs []hook
163 for hooks.Next() {
164 var h hook
165 hooks.Scan(&h.id, &h.url, &h.secret) //nolint:errcheck
166 hs = append(hs, h)
167 }
168 hooks.Close()
169
170 if len(hs) == 0 {
171 return
172 }
173
174 if commits == nil {
175 commits = []CommitRef{}
176 }
177
178 payload := PushPayload{
179 Event: "push",
180 Repo: repoName,
181 Pusher: pusher,
182 PushID: newPushID(),
183 Bookmark: bookmark,
184 OldCommit: oldCommit,
185 NewCommit: newCommit,
186 Commits: commits,
187 }
188 payloadBytes, _ := json.Marshal(payload)
189
190 for _, h := range hs {
191 h := h
192 go d.deliverWebhook(h.id, h.url, h.secret, "push", payloadBytes)
193 }
194}
195
196func (d *DB) deliverWebhook(webhookID int64, url, secret, event string, payload []byte) {
197 sig := computeHMAC(secret, payload)
198
199 var lastCode int
200 var lastBody string
201 var lastErr string
202
203 for attempt := 0; attempt < 5; attempt++ {
204 if attempt > 0 {
205 time.Sleep(time.Duration(1<<uint(attempt-1)) * time.Second)
206 }
207
208 req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(payload))
209 if err != nil {
210 lastErr = err.Error()
211 continue
212 }
213 req.Header.Set("Content-Type", "application/json")
214 req.Header.Set("X-Arche-Event", event)
215 req.Header.Set("X-Arche-Signature", "sha256="+sig)
216
217 client := &http.Client{Timeout: 10 * time.Second}
218 resp, err := client.Do(req)
219 if err != nil {
220 lastErr = err.Error()
221 continue
222 }
223
224 buf := new(bytes.Buffer)
225 buf.ReadFrom(resp.Body) //nolint:errcheck
226 resp.Body.Close()
227
228 lastCode = resp.StatusCode
229 lastBody = buf.String()
230 lastErr = ""
231
232 if resp.StatusCode >= 200 && resp.StatusCode < 300 {
233 d.recordDelivery(webhookID, event, string(payload), lastCode, lastBody, "")
234 return
235 }
236 }
237 d.recordDelivery(webhookID, event, string(payload), lastCode, lastBody, fmt.Sprintf("failed after 5 attempts: %s", lastErr))
238 slog.Warn("webhook delivery failed", "webhook_id", webhookID, "url", url, "event", event, "last_err", lastErr, "last_code", lastCode)
239}
240
241func computeHMAC(secret string, payload []byte) string {
242 mac := hmac.New(sha256.New, []byte(secret))
243 mac.Write(payload)
244 return hex.EncodeToString(mac.Sum(nil))
245}
246
247func (d *DB) ReplayDelivery(deliveryID int64) error {
248 var webhookID int64
249 var event, payload string
250 err := d.db.QueryRow(
251 `SELECT webhook_id, event, payload FROM webhook_deliveries WHERE id=?`,
252 deliveryID,
253 ).Scan(&webhookID, &event, &payload)
254 if err != nil {
255 return fmt.Errorf("delivery not found: %w", err)
256 }
257 hook, err := d.GetWebhook(webhookID)
258 if err != nil {
259 return fmt.Errorf("webhook not found: %w", err)
260 }
261 go d.deliverWebhook(hook.ID, hook.URL, hook.Secret, event, []byte(payload))
262 return nil
263}