feat(webhooks): add per-verb subscriptions and bulk-create debounce

Subscriptions now declare a `verb` (create | delete | bulk-create).
POST /:did/webhooks defaults to inserting both create and delete rows
when no verb is given, preserving existing all-events behavior. Update
events fold into the create verb. The new bulk-create verb debounces
creates per DID over 400 ms and delivers a `records` array.

Migration adds the verb column with default 'create' and clones every
existing row for the delete verb so legacy subscriptions keep firing
on both events.
This commit is contained in:
Julien Calixte
2026-05-05 12:25:54 +02:00
parent 8e967ba43c
commit 7b53909c52
4 changed files with 122 additions and 18 deletions

View File

@@ -2,9 +2,10 @@ import { Jetstream } from "@skyware/jetstream";
import {
deleteNote,
getCursor,
getWebhooksByDid,
getWebhooksByDidAndVerb,
saveCursor,
upsertNote,
type WebhookVerb,
} from "./src/data/db.ts";
import { Note } from "./src/data/note.ts";
import { log } from "./src/log.ts";
@@ -17,11 +18,17 @@ globalThis.addEventListener("error", (e) => {
log("[jetstream] uncaught error:", e.error);
});
const fireWebhooks = async (
did: string,
type WebhookTarget = {
method: string;
url: string;
token?: string;
};
const dispatchAll = async (
webhooks: WebhookTarget[],
payload: Record<string, unknown>,
label: string,
): Promise<void> => {
const webhooks = getWebhooksByDid(did);
if (webhooks.length === 0) return;
const results = await Promise.allSettled(
webhooks.map(({ method, url, token }) => {
@@ -38,18 +45,73 @@ const fireWebhooks = async (
);
for (const result of results) {
if (result.status === "rejected") {
log(`[jetstream] webhook error for ${did}:`, result.reason);
log(`[jetstream] ${label} webhook error:`, result.reason);
}
}
};
const fireWebhooks = async (
did: string,
verb: WebhookVerb,
payload: Record<string, unknown>,
): Promise<void> => {
const webhooks = getWebhooksByDidAndVerb(did, verb);
await dispatchAll(webhooks, payload, `${verb} ${did}`);
};
const BULK_CREATE_DEBOUNCE_MS = 400;
type BulkBuffer = {
records: Record<string, unknown>[];
timer: number;
};
const bulkBuffers = new Map<string, BulkBuffer>();
const flushBulkCreate = async (did: string): Promise<void> => {
const buffer = bulkBuffers.get(did);
if (!buffer) return;
bulkBuffers.delete(did);
const webhooks = getWebhooksByDidAndVerb(did, "bulk-create");
await dispatchAll(
webhooks,
{ event: "bulk-create", did, records: buffer.records },
`bulk-create ${did}`,
);
};
// Buffered records are not persisted: if jetstream restarts mid-window, those
// `bulk-create` notifications are lost. Subscribers reconcile on cold start
// because the underlying notes are already saved to the `note` table.
const queueBulkCreate = (
did: string,
record: Record<string, unknown>,
): void => {
const existing = bulkBuffers.get(did);
if (existing) {
clearTimeout(existing.timer);
existing.records.push(record);
existing.timer = setTimeout(
() => flushBulkCreate(did),
BULK_CREATE_DEBOUNCE_MS,
);
return;
}
bulkBuffers.set(did, {
records: [record],
timer: setTimeout(
() => flushBulkCreate(did),
BULK_CREATE_DEBOUNCE_MS,
),
});
};
const cursor = getCursor();
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
const jetstream = new Jetstream({
wantedCollections: ["space.remanso.note"],
cursor: cursor ? Number(cursor) : undefined,
endpoint: "https://jetstream2.fr.hose.cam/subscribe"
endpoint: "https://jetstream2.fr.hose.cam/subscribe",
});
jetstream.onCreate("space.remanso.note", async (event) => {
@@ -59,7 +121,8 @@ jetstream.onCreate("space.remanso.note", async (event) => {
const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note });
log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
await fireWebhooks(did, { event: "create", did, rkey, ...note });
await fireWebhooks(did, "create", { event: "create", did, rkey, ...note });
queueBulkCreate(did, { rkey, ...note });
} catch (error) {
log(`[jetstream] error on create:`, error);
}
@@ -72,7 +135,9 @@ jetstream.onUpdate("space.remanso.note", async (event) => {
const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note });
log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
await fireWebhooks(did, { event: "update", did, rkey, ...note });
// Updates fold into the `create` verb — subscribers reconcile by (did, rkey).
await fireWebhooks(did, "create", { event: "create", did, rkey, ...note });
queueBulkCreate(did, { rkey, ...note });
} catch (error) {
log(`[jetstream] error on update:`, error);
}
@@ -84,7 +149,7 @@ jetstream.onDelete("space.remanso.note", async (event) => {
log(`[jetstream] deleting ${did}/${rkey}...`);
deleteNote({ did, rkey });
log(`[jetstream] delete ${did}/${rkey}`);
await fireWebhooks(did, { event: "delete", did, rkey });
await fireWebhooks(did, "delete", { event: "delete", did, rkey });
} catch (error) {
log(`[jetstream] error on delete:`, error);
}