import { Jetstream } from "@skyware/jetstream"; import { deleteNote, getCursor, getWebhooksByDidAndVerb, saveCursor, upsertNote, type WebhookVerb, } from "./src/data/db.ts"; import { Note } from "./src/data/note.ts"; import { log } from "./src/log.ts"; globalThis.addEventListener("unhandledrejection", (e) => { log("[jetstream] unhandled rejection:", e.reason); }); globalThis.addEventListener("error", (e) => { log("[jetstream] uncaught error:", e.error); }); type WebhookTarget = { method: string; url: string; token?: string; }; const dispatchAll = async ( webhooks: WebhookTarget[], payload: Record, label: string, ): Promise => { if (webhooks.length === 0) return; const results = await Promise.allSettled( webhooks.map(({ method, url, token }) => { const hasBody = method !== "GET" && method !== "HEAD"; return fetch(url, { method, headers: { ...(hasBody ? { "Content-Type": "application/json" } : {}), ...(token ? { Authorization: `Bearer ${token}` } : {}), }, body: hasBody ? JSON.stringify(payload) : undefined, }); }), ); for (const result of results) { if (result.status === "rejected") { log(`[jetstream] ${label} webhook error:`, result.reason); } } }; const fireWebhooks = async ( did: string, verb: WebhookVerb, payload: Record, ): Promise => { const webhooks = getWebhooksByDidAndVerb(did, verb); await dispatchAll(webhooks, payload, `${verb} ${did}`); }; const BULK_CREATE_DEBOUNCE_MS = 400; type BulkBuffer = { records: Record[]; timer: number; }; const bulkBuffers = new Map(); const flushBulkCreate = async (did: string): Promise => { const buffer = bulkBuffers.get(did); if (!buffer) return; bulkBuffers.delete(did); const webhooks = getWebhooksByDidAndVerb(did, "bulk-create"); await dispatchAll( webhooks, { event: "bulk-create", did, records: buffer.records }, `bulk-create ${did}`, ); }; // Buffered records are not persisted: if jetstream restarts mid-window, those // `bulk-create` notifications are lost. Subscribers reconcile on cold start // because the underlying notes are already saved to the `note` table. const queueBulkCreate = ( did: string, record: Record, ): void => { const existing = bulkBuffers.get(did); if (existing) { clearTimeout(existing.timer); existing.records.push(record); existing.timer = setTimeout( () => flushBulkCreate(did), BULK_CREATE_DEBOUNCE_MS, ); return; } bulkBuffers.set(did, { records: [record], timer: setTimeout( () => flushBulkCreate(did), BULK_CREATE_DEBOUNCE_MS, ), }); }; const cursor = getCursor(); log(`[jetstream] starting with cursor: ${cursor ?? "none"}`); const jetstream = new Jetstream({ wantedCollections: ["space.remanso.note"], cursor: cursor ? Number(cursor) : undefined, endpoint: "https://jetstream2.fr.hose.cam/subscribe", }); jetstream.onCreate("space.remanso.note", async (event) => { try { const { did, commit: { rkey, record } } = event; log(`[jetstream] creating ${did}/${rkey}...`); const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] create ${did}/${rkey}: ${note.title}`); await fireWebhooks(did, "create", { event: "create", did, rkey, ...note }); queueBulkCreate(did, { rkey, ...note }); } catch (error) { log(`[jetstream] error on create:`, error); } }); jetstream.onUpdate("space.remanso.note", async (event) => { try { const { did, commit: { rkey, record } } = event; log(`[jetstream] updating ${did}/${rkey}...`); const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] update ${did}/${rkey}: ${note.title}`); // Updates fold into the `create` verb — subscribers reconcile by (did, rkey). await fireWebhooks(did, "create", { event: "create", did, rkey, ...note }); queueBulkCreate(did, { rkey, ...note }); } catch (error) { log(`[jetstream] error on update:`, error); } }); jetstream.onDelete("space.remanso.note", async (event) => { try { const { did, commit: { rkey } } = event; log(`[jetstream] deleting ${did}/${rkey}...`); deleteNote({ did, rkey }); log(`[jetstream] delete ${did}/${rkey}`); await fireWebhooks(did, "delete", { event: "delete", did, rkey }); } catch (error) { log(`[jetstream] error on delete:`, error); } }); jetstream.on("open", () => { log("[jetstream] connected"); }); jetstream.on("close", () => { log("[jetstream] connection closed"); }); jetstream.on("error", (error) => { log("[jetstream] connection closed with error", error); }); log("[jetstream] launching"); jetstream.start(); setInterval(() => { if (jetstream.cursor) { saveCursor(jetstream.cursor); } }, 5000);