import { Jetstream } from "@skyware/jetstream"; import { deleteNote, getCursor, getWebhooksByDid, saveCursor, upsertNote, } from "./src/data/db.ts"; import { Note } from "./src/data/note.ts"; import { log } from "./src/log.ts"; globalThis.addEventListener("unhandledrejection", (e) => { log("[jetstream] unhandled rejection:", e.reason); }); globalThis.addEventListener("error", (e) => { log("[jetstream] uncaught error:", e.error); }); const fireWebhooks = async ( did: string, payload: Record, ): Promise => { const webhooks = getWebhooksByDid(did); if (webhooks.length === 0) return; const results = await Promise.allSettled( webhooks.map(({ method, url, token }) => { const hasBody = method !== "GET" && method !== "HEAD"; return fetch(url, { method, headers: { ...(hasBody ? { "Content-Type": "application/json" } : {}), ...(token ? { Authorization: `Bearer ${token}` } : {}), }, body: hasBody ? JSON.stringify(payload) : undefined, }); }), ); for (const result of results) { if (result.status === "rejected") { log(`[jetstream] webhook error for ${did}:`, result.reason); } } }; const cursor = getCursor(); log(`[jetstream] starting with cursor: ${cursor ?? "none"}`); const jetstream = new Jetstream({ wantedCollections: ["space.remanso.note"], cursor: cursor ? Number(cursor) : undefined, endpoint: "https://jetstream2.fr.hose.cam/subscribe" }); jetstream.onCreate("space.remanso.note", async (event) => { try { const { did, commit: { rkey, record } } = event; log(`[jetstream] creating ${did}/${rkey}...`); const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] create ${did}/${rkey}: ${note.title}`); await fireWebhooks(did, { event: "create", did, rkey, ...note }); } catch (error) { log(`[jetstream] error on create:`, error); } }); jetstream.onUpdate("space.remanso.note", async (event) => { try { const { did, commit: { rkey, record } } = event; log(`[jetstream] updating ${did}/${rkey}...`); const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] update ${did}/${rkey}: ${note.title}`); await fireWebhooks(did, { event: "update", did, rkey, ...note }); } catch (error) { log(`[jetstream] error on update:`, error); } }); jetstream.onDelete("space.remanso.note", async (event) => { try { const { did, commit: { rkey } } = event; log(`[jetstream] deleting ${did}/${rkey}...`); deleteNote({ did, rkey }); log(`[jetstream] delete ${did}/${rkey}`); await fireWebhooks(did, { event: "delete", did, rkey }); } catch (error) { log(`[jetstream] error on delete:`, error); } }); jetstream.on("open", () => { log("[jetstream] connected"); }); jetstream.on("close", () => { log("[jetstream] connection closed"); }); jetstream.on("error", (error) => { log("[jetstream] connection closed with error", error); }); log("[jetstream] launching"); jetstream.start(); setInterval(() => { if (jetstream.cursor) { saveCursor(jetstream.cursor); } }, 5000);