diff --git a/jetstream.ts b/jetstream.ts index 6731760..999ddfe 100644 --- a/jetstream.ts +++ b/jetstream.ts @@ -2,9 +2,10 @@ import { Jetstream } from "@skyware/jetstream"; import { deleteNote, getCursor, - getWebhooksByDid, + getWebhooksByDidAndVerb, saveCursor, upsertNote, + type WebhookVerb, } from "./src/data/db.ts"; import { Note } from "./src/data/note.ts"; import { log } from "./src/log.ts"; @@ -17,11 +18,17 @@ globalThis.addEventListener("error", (e) => { log("[jetstream] uncaught error:", e.error); }); -const fireWebhooks = async ( - did: string, +type WebhookTarget = { + method: string; + url: string; + token?: string; +}; + +const dispatchAll = async ( + webhooks: WebhookTarget[], payload: Record, + label: string, ): Promise => { - const webhooks = getWebhooksByDid(did); if (webhooks.length === 0) return; const results = await Promise.allSettled( webhooks.map(({ method, url, token }) => { @@ -38,18 +45,73 @@ const fireWebhooks = async ( ); for (const result of results) { if (result.status === "rejected") { - log(`[jetstream] webhook error for ${did}:`, result.reason); + log(`[jetstream] ${label} webhook error:`, result.reason); } } }; +const fireWebhooks = async ( + did: string, + verb: WebhookVerb, + payload: Record, +): Promise => { + const webhooks = getWebhooksByDidAndVerb(did, verb); + await dispatchAll(webhooks, payload, `${verb} ${did}`); +}; + +const BULK_CREATE_DEBOUNCE_MS = 400; + +type BulkBuffer = { + records: Record[]; + timer: number; +}; +const bulkBuffers = new Map(); + +const flushBulkCreate = async (did: string): Promise => { + const buffer = bulkBuffers.get(did); + if (!buffer) return; + bulkBuffers.delete(did); + const webhooks = getWebhooksByDidAndVerb(did, "bulk-create"); + await dispatchAll( + webhooks, + { event: "bulk-create", did, records: buffer.records }, + `bulk-create ${did}`, + ); +}; + +// Buffered records are not persisted: if jetstream restarts mid-window, those +// `bulk-create` notifications are lost. Subscribers reconcile on cold start +// because the underlying notes are already saved to the `note` table. +const queueBulkCreate = ( + did: string, + record: Record, +): void => { + const existing = bulkBuffers.get(did); + if (existing) { + clearTimeout(existing.timer); + existing.records.push(record); + existing.timer = setTimeout( + () => flushBulkCreate(did), + BULK_CREATE_DEBOUNCE_MS, + ); + return; + } + bulkBuffers.set(did, { + records: [record], + timer: setTimeout( + () => flushBulkCreate(did), + BULK_CREATE_DEBOUNCE_MS, + ), + }); +}; + const cursor = getCursor(); log(`[jetstream] starting with cursor: ${cursor ?? "none"}`); const jetstream = new Jetstream({ wantedCollections: ["space.remanso.note"], cursor: cursor ? Number(cursor) : undefined, - endpoint: "https://jetstream2.fr.hose.cam/subscribe" + endpoint: "https://jetstream2.fr.hose.cam/subscribe", }); jetstream.onCreate("space.remanso.note", async (event) => { @@ -59,7 +121,8 @@ jetstream.onCreate("space.remanso.note", async (event) => { const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] create ${did}/${rkey}: ${note.title}`); - await fireWebhooks(did, { event: "create", did, rkey, ...note }); + await fireWebhooks(did, "create", { event: "create", did, rkey, ...note }); + queueBulkCreate(did, { rkey, ...note }); } catch (error) { log(`[jetstream] error on create:`, error); } @@ -72,7 +135,9 @@ jetstream.onUpdate("space.remanso.note", async (event) => { const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] update ${did}/${rkey}: ${note.title}`); - await fireWebhooks(did, { event: "update", did, rkey, ...note }); + // Updates fold into the `create` verb — subscribers reconcile by (did, rkey). + await fireWebhooks(did, "create", { event: "create", did, rkey, ...note }); + queueBulkCreate(did, { rkey, ...note }); } catch (error) { log(`[jetstream] error on update:`, error); } @@ -84,7 +149,7 @@ jetstream.onDelete("space.remanso.note", async (event) => { log(`[jetstream] deleting ${did}/${rkey}...`); deleteNote({ did, rkey }); log(`[jetstream] delete ${did}/${rkey}`); - await fireWebhooks(did, { event: "delete", did, rkey }); + await fireWebhooks(did, "delete", { event: "delete", did, rkey }); } catch (error) { log(`[jetstream] error on delete:`, error); } diff --git a/server.ts b/server.ts index a3d3f4e..7bcae54 100644 --- a/server.ts +++ b/server.ts @@ -5,6 +5,7 @@ import { getNotes, getNotesByDid, getNotesByDids, + type WebhookVerb, } from "./src/data/db.ts"; import { log } from "./src/log.ts"; @@ -84,18 +85,30 @@ router.post("/notes/feed", async (ctx) => { ctx.response.body = getNotesByDids(dids, cursor, Number(limit) || PAGINATION); }); +const ALLOWED_VERBS = ["create", "delete", "bulk-create"] as const; + router.post("/:did/webhooks", async (ctx) => { const { did } = ctx.params; const body = await ctx.request.body.json(); - const { method, url, token } = body ?? {}; + const { method, url, token, verb } = body ?? {}; if (!method || !url) { ctx.response.status = 400; ctx.response.body = { error: "method and url are required" }; return; } - const subscription = addWebhookSubscription({ did, method, url, token }); + if (verb !== undefined && !ALLOWED_VERBS.includes(verb)) { + ctx.response.status = 400; + ctx.response.body = { + error: `verb must be one of ${ALLOWED_VERBS.join(", ")}`, + }; + return; + } + const verbsToInsert: WebhookVerb[] = verb ? [verb] : ["create", "delete"]; + const subscriptions = verbsToInsert.map((v) => + addWebhookSubscription({ did, method, url, token, verb: v }) + ); ctx.response.status = 201; - ctx.response.body = subscription; + ctx.response.body = subscriptions; }); router.delete("/:did/webhooks", (ctx) => { diff --git a/src/data/db.ts b/src/data/db.ts index bcb2505..db0674b 100644 --- a/src/data/db.ts +++ b/src/data/db.ts @@ -87,26 +87,30 @@ export const saveCursor = (cursor: number) => { ); }; +export type WebhookVerb = "create" | "delete" | "bulk-create"; + type WebhookSubscriptionRow = { id: number; did: string; method: string; url: string; token?: string; + verb: WebhookVerb; }; export const addWebhookSubscription = ( - { did, method, url, token }: Omit, + { did, method, url, token, verb }: Omit, ): WebhookSubscriptionRow => { db.exec( - "INSERT INTO webhook_subscription (did, method, url, token) VALUES (?, ?, ?, ?)", + "INSERT INTO webhook_subscription (did, method, url, token, verb) VALUES (?, ?, ?, ?, ?)", did, method, url, token ?? null, + verb, ); return db.prepare( - "SELECT id, did, method, url FROM webhook_subscription WHERE id = last_insert_rowid()", + "SELECT id, did, method, url, verb FROM webhook_subscription WHERE id = last_insert_rowid()", ).get()!; // Note: token is intentionally excluded from the SELECT (write-only) }; @@ -115,10 +119,13 @@ export const deleteWebhooksByDid = (did: string): void => { db.exec("DELETE FROM webhook_subscription WHERE did = ?", did); }; -export const getWebhooksByDid = (did: string): WebhookSubscriptionRow[] => { +export const getWebhooksByDidAndVerb = ( + did: string, + verb: WebhookVerb, +): WebhookSubscriptionRow[] => { return db.prepare( - "SELECT id, did, method, url, token FROM webhook_subscription WHERE did = ? ORDER BY id DESC LIMIT 10", - ).all(did); + "SELECT id, did, method, url, token, verb FROM webhook_subscription WHERE did = ? AND verb = ? ORDER BY id DESC LIMIT 10", + ).all(did, verb); }; export const upsertNote = (note: Note) => { diff --git a/src/migrations/init.ts b/src/migrations/init.ts index 2b0cf9c..21a540d 100644 --- a/src/migrations/init.ts +++ b/src/migrations/init.ts @@ -61,4 +61,23 @@ try { // Column already exists — no-op } +try { + db.exec( + `ALTER TABLE webhook_subscription ADD COLUMN verb TEXT NOT NULL DEFAULT 'create';`, + ); + db.exec(` + INSERT INTO webhook_subscription (did, method, url, token, verb) + SELECT did, method, url, token, 'delete' + FROM webhook_subscription + WHERE verb = 'create'; + `); +} catch { + // Column already exists — backfill already happened on a previous run. +} + +db.exec(` + CREATE INDEX IF NOT EXISTS idx_webhook_subscription_did_verb + ON webhook_subscription(did, verb); +`); + db.close();