Subscriptions now declare a `verb` (create | delete | bulk-create). POST /:did/webhooks defaults to inserting both create and delete rows when no verb is given, preserving existing all-events behavior. Update events fold into the create verb. The new bulk-create verb debounces creates per DID over 400 ms and delivers a `records` array. Migration adds the verb column with default 'create' and clones every existing row for the delete verb so legacy subscriptions keep firing on both events.
178 lines
4.9 KiB
TypeScript
178 lines
4.9 KiB
TypeScript
import { Jetstream } from "@skyware/jetstream";
|
|
import {
|
|
deleteNote,
|
|
getCursor,
|
|
getWebhooksByDidAndVerb,
|
|
saveCursor,
|
|
upsertNote,
|
|
type WebhookVerb,
|
|
} from "./src/data/db.ts";
|
|
import { Note } from "./src/data/note.ts";
|
|
import { log } from "./src/log.ts";
|
|
|
|
globalThis.addEventListener("unhandledrejection", (e) => {
|
|
log("[jetstream] unhandled rejection:", e.reason);
|
|
});
|
|
|
|
globalThis.addEventListener("error", (e) => {
|
|
log("[jetstream] uncaught error:", e.error);
|
|
});
|
|
|
|
type WebhookTarget = {
|
|
method: string;
|
|
url: string;
|
|
token?: string;
|
|
};
|
|
|
|
const dispatchAll = async (
|
|
webhooks: WebhookTarget[],
|
|
payload: Record<string, unknown>,
|
|
label: string,
|
|
): Promise<void> => {
|
|
if (webhooks.length === 0) return;
|
|
const results = await Promise.allSettled(
|
|
webhooks.map(({ method, url, token }) => {
|
|
const hasBody = method !== "GET" && method !== "HEAD";
|
|
return fetch(url, {
|
|
method,
|
|
headers: {
|
|
...(hasBody ? { "Content-Type": "application/json" } : {}),
|
|
...(token ? { Authorization: `Bearer ${token}` } : {}),
|
|
},
|
|
body: hasBody ? JSON.stringify(payload) : undefined,
|
|
});
|
|
}),
|
|
);
|
|
for (const result of results) {
|
|
if (result.status === "rejected") {
|
|
log(`[jetstream] ${label} webhook error:`, result.reason);
|
|
}
|
|
}
|
|
};
|
|
|
|
const fireWebhooks = async (
|
|
did: string,
|
|
verb: WebhookVerb,
|
|
payload: Record<string, unknown>,
|
|
): Promise<void> => {
|
|
const webhooks = getWebhooksByDidAndVerb(did, verb);
|
|
await dispatchAll(webhooks, payload, `${verb} ${did}`);
|
|
};
|
|
|
|
const BULK_CREATE_DEBOUNCE_MS = 400;
|
|
|
|
type BulkBuffer = {
|
|
records: Record<string, unknown>[];
|
|
timer: number;
|
|
};
|
|
const bulkBuffers = new Map<string, BulkBuffer>();
|
|
|
|
const flushBulkCreate = async (did: string): Promise<void> => {
|
|
const buffer = bulkBuffers.get(did);
|
|
if (!buffer) return;
|
|
bulkBuffers.delete(did);
|
|
const webhooks = getWebhooksByDidAndVerb(did, "bulk-create");
|
|
await dispatchAll(
|
|
webhooks,
|
|
{ event: "bulk-create", did, records: buffer.records },
|
|
`bulk-create ${did}`,
|
|
);
|
|
};
|
|
|
|
// Buffered records are not persisted: if jetstream restarts mid-window, those
|
|
// `bulk-create` notifications are lost. Subscribers reconcile on cold start
|
|
// because the underlying notes are already saved to the `note` table.
|
|
const queueBulkCreate = (
|
|
did: string,
|
|
record: Record<string, unknown>,
|
|
): void => {
|
|
const existing = bulkBuffers.get(did);
|
|
if (existing) {
|
|
clearTimeout(existing.timer);
|
|
existing.records.push(record);
|
|
existing.timer = setTimeout(
|
|
() => flushBulkCreate(did),
|
|
BULK_CREATE_DEBOUNCE_MS,
|
|
);
|
|
return;
|
|
}
|
|
bulkBuffers.set(did, {
|
|
records: [record],
|
|
timer: setTimeout(
|
|
() => flushBulkCreate(did),
|
|
BULK_CREATE_DEBOUNCE_MS,
|
|
),
|
|
});
|
|
};
|
|
|
|
const cursor = getCursor();
|
|
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
|
|
|
|
const jetstream = new Jetstream({
|
|
wantedCollections: ["space.remanso.note"],
|
|
cursor: cursor ? Number(cursor) : undefined,
|
|
endpoint: "https://jetstream2.fr.hose.cam/subscribe",
|
|
});
|
|
|
|
jetstream.onCreate("space.remanso.note", async (event) => {
|
|
try {
|
|
const { did, commit: { rkey, record } } = event;
|
|
log(`[jetstream] creating ${did}/${rkey}...`);
|
|
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
|
upsertNote({ did, rkey, ...note });
|
|
log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
|
|
await fireWebhooks(did, "create", { event: "create", did, rkey, ...note });
|
|
queueBulkCreate(did, { rkey, ...note });
|
|
} catch (error) {
|
|
log(`[jetstream] error on create:`, error);
|
|
}
|
|
});
|
|
|
|
jetstream.onUpdate("space.remanso.note", async (event) => {
|
|
try {
|
|
const { did, commit: { rkey, record } } = event;
|
|
log(`[jetstream] updating ${did}/${rkey}...`);
|
|
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
|
upsertNote({ did, rkey, ...note });
|
|
log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
|
|
// Updates fold into the `create` verb — subscribers reconcile by (did, rkey).
|
|
await fireWebhooks(did, "create", { event: "create", did, rkey, ...note });
|
|
queueBulkCreate(did, { rkey, ...note });
|
|
} catch (error) {
|
|
log(`[jetstream] error on update:`, error);
|
|
}
|
|
});
|
|
|
|
jetstream.onDelete("space.remanso.note", async (event) => {
|
|
try {
|
|
const { did, commit: { rkey } } = event;
|
|
log(`[jetstream] deleting ${did}/${rkey}...`);
|
|
deleteNote({ did, rkey });
|
|
log(`[jetstream] delete ${did}/${rkey}`);
|
|
await fireWebhooks(did, "delete", { event: "delete", did, rkey });
|
|
} catch (error) {
|
|
log(`[jetstream] error on delete:`, error);
|
|
}
|
|
});
|
|
|
|
jetstream.on("open", () => {
|
|
log("[jetstream] connected");
|
|
});
|
|
|
|
jetstream.on("close", () => {
|
|
log("[jetstream] connection closed");
|
|
});
|
|
|
|
jetstream.on("error", (error) => {
|
|
log("[jetstream] connection closed with error", error);
|
|
});
|
|
|
|
log("[jetstream] launching");
|
|
jetstream.start();
|
|
|
|
setInterval(() => {
|
|
if (jetstream.cursor) {
|
|
saveCursor(jetstream.cursor);
|
|
}
|
|
}, 5000);
|