From 62f981dd936e33f35c9d223b71750fd988e95131 Mon Sep 17 00:00:00 2001 From: Julien Calixte Date: Wed, 25 Feb 2026 22:51:48 +0100 Subject: [PATCH] feat: fire webhooks on jetstream events, cap at 10 per DID - db.ts: getWebhooksByDid returns the 10 most recent subscriptions (ORDER BY id DESC LIMIT 10) - jetstream.ts: fireWebhooks fans out to registered URLs via Promise.allSettled after each create/update/delete event --- jetstream.ts | 32 +++++++++++++++++++++++++++++--- src/data/db.ts | 6 ++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/jetstream.ts b/jetstream.ts index 79926e9..4b39b4d 100644 --- a/jetstream.ts +++ b/jetstream.ts @@ -2,6 +2,7 @@ import { Jetstream } from "@skyware/jetstream"; import { deleteNote, getCursor, + getWebhooksByDid, saveCursor, upsertNote, } from "./src/data/db.ts"; @@ -16,6 +17,28 @@ globalThis.addEventListener("error", (e) => { log("[jetstream] uncaught error:", e.error); }); +const fireWebhooks = async ( + did: string, + payload: Record, +): Promise => { + const webhooks = getWebhooksByDid(did); + if (webhooks.length === 0) return; + const results = await Promise.allSettled( + webhooks.map(({ method, url }) => + fetch(url, { + method, + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + }) + ), + ); + for (const result of results) { + if (result.status === "rejected") { + log(`[jetstream] webhook error for ${did}:`, result.reason); + } + } +}; + const cursor = getCursor(); log(`[jetstream] starting with cursor: ${cursor ?? "none"}`); @@ -25,36 +48,39 @@ const jetstream = new Jetstream({ endpoint: "https://jetstream2.fr.hose.cam/subscribe" }); -jetstream.onCreate("space.remanso.note", (event) => { +jetstream.onCreate("space.remanso.note", async (event) => { try { const { did, commit: { rkey, record } } = event; log(`[jetstream] creating ${did}/${rkey}...`); const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] create ${did}/${rkey}: ${note.title}`); + await fireWebhooks(did, { event: "create", did, rkey, ...note }); } catch (error) { log(`[jetstream] error on create:`, error); } }); -jetstream.onUpdate("space.remanso.note", (event) => { +jetstream.onUpdate("space.remanso.note", async (event) => { try { const { did, commit: { rkey, record } } = event; log(`[jetstream] updating ${did}/${rkey}...`); const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] update ${did}/${rkey}: ${note.title}`); + await fireWebhooks(did, { event: "update", did, rkey, ...note }); } catch (error) { log(`[jetstream] error on update:`, error); } }); -jetstream.onDelete("space.remanso.note", (event) => { +jetstream.onDelete("space.remanso.note", async (event) => { try { const { did, commit: { rkey } } = event; log(`[jetstream] deleting ${did}/${rkey}...`); deleteNote({ did, rkey }); log(`[jetstream] delete ${did}/${rkey}`); + await fireWebhooks(did, { event: "delete", did, rkey }); } catch (error) { log(`[jetstream] error on delete:`, error); } diff --git a/src/data/db.ts b/src/data/db.ts index 80dc0d9..a55ece5 100644 --- a/src/data/db.ts +++ b/src/data/db.ts @@ -92,6 +92,12 @@ export const deleteWebhooksByDid = (did: string): void => { db.exec("DELETE FROM webhook_subscription WHERE did = ?", did); }; +export const getWebhooksByDid = (did: string): WebhookSubscriptionRow[] => { + return db.prepare( + "SELECT id, did, method, url FROM webhook_subscription WHERE did = ? ORDER BY id DESC LIMIT 10", + ).all(did); +}; + export const upsertNote = (note: Note) => { const now = new Date().toISOString(); db.exec(