185 lines
5.0 KiB
TypeScript
185 lines
5.0 KiB
TypeScript
import { Jetstream } from "@skyware/jetstream";
|
|
import {
|
|
deleteNote,
|
|
getCursor,
|
|
getWebhooksByDidAndVerb,
|
|
saveCursor,
|
|
upsertNote,
|
|
type WebhookVerb,
|
|
} from "./src/data/db.ts";
|
|
import { Note } from "./src/data/note.ts";
|
|
import { log } from "./src/log.ts";
|
|
|
|
globalThis.addEventListener("unhandledrejection", (e) => {
|
|
log("[jetstream] unhandled rejection:", e.reason);
|
|
});
|
|
|
|
globalThis.addEventListener("error", (e) => {
|
|
log("[jetstream] uncaught error:", e.error);
|
|
});
|
|
|
|
type WebhookTarget = {
|
|
method: string;
|
|
url: string;
|
|
token?: string;
|
|
};
|
|
|
|
const dispatchAll = async (
|
|
webhooks: WebhookTarget[],
|
|
payload: Record<string, unknown>,
|
|
label: string,
|
|
): Promise<void> => {
|
|
if (webhooks.length === 0) return;
|
|
const results = await Promise.allSettled(
|
|
webhooks.map(({ method, url, token }) => {
|
|
const hasBody = method !== "GET" && method !== "HEAD";
|
|
return fetch(url, {
|
|
method,
|
|
headers: {
|
|
...(hasBody ? { "Content-Type": "application/json" } : {}),
|
|
...(token ? { Authorization: `Bearer ${token}` } : {}),
|
|
},
|
|
body: hasBody ? JSON.stringify(payload) : undefined,
|
|
});
|
|
}),
|
|
);
|
|
for (const result of results) {
|
|
if (result.status === "rejected") {
|
|
log(`[jetstream] ${label} webhook error:`, result.reason);
|
|
}
|
|
}
|
|
};
|
|
|
|
const fireWebhooks = async (
|
|
did: string,
|
|
verb: WebhookVerb,
|
|
payload: Record<string, unknown>,
|
|
): Promise<void> => {
|
|
const webhooks = getWebhooksByDidAndVerb(did, verb);
|
|
await dispatchAll(webhooks, payload, `${verb} ${did}`);
|
|
};
|
|
|
|
const BULK_CREATE_DEBOUNCE_MS = 15000;
|
|
|
|
type BulkBuffer = {
|
|
records: Record<string, unknown>[];
|
|
timer: number;
|
|
};
|
|
const bulkBuffers = new Map<string, BulkBuffer>();
|
|
|
|
const flushBulkCreate = async (did: string): Promise<void> => {
|
|
const buffer = bulkBuffers.get(did);
|
|
if (!buffer) return;
|
|
bulkBuffers.delete(did);
|
|
const webhooks = getWebhooksByDidAndVerb(did, "bulk-create");
|
|
log(`[jetstream] bulk-create ${did}: ${buffer.records.length} record(s)`);
|
|
await dispatchAll(
|
|
webhooks,
|
|
{ event: "bulk-create", did, records: buffer.records },
|
|
`bulk-create ${did}`,
|
|
);
|
|
};
|
|
|
|
// Buffered records are not persisted: if jetstream restarts mid-window, those
|
|
// `bulk-create` notifications are lost. Subscribers reconcile on cold start
|
|
// because the underlying notes are already saved to the `note` table.
|
|
const queueBulkCreate = (
|
|
did: string,
|
|
record: Record<string, unknown>,
|
|
): void => {
|
|
const existing = bulkBuffers.get(did);
|
|
if (existing) {
|
|
clearTimeout(existing.timer);
|
|
existing.records.push(record);
|
|
existing.timer = setTimeout(
|
|
() => flushBulkCreate(did),
|
|
BULK_CREATE_DEBOUNCE_MS,
|
|
);
|
|
return;
|
|
}
|
|
bulkBuffers.set(did, {
|
|
records: [record],
|
|
timer: setTimeout(() => flushBulkCreate(did), BULK_CREATE_DEBOUNCE_MS),
|
|
});
|
|
};
|
|
|
|
const cursor = getCursor();
|
|
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
|
|
|
|
const jetstream = new Jetstream({
|
|
wantedCollections: ["space.remanso.note"],
|
|
cursor: cursor ? Number(cursor) : undefined,
|
|
endpoint: "https://jetstream2.fr.hose.cam/subscribe",
|
|
});
|
|
|
|
jetstream.onCreate("space.remanso.note", async (event) => {
|
|
try {
|
|
const {
|
|
did,
|
|
commit: { rkey, record },
|
|
} = event;
|
|
log(`[jetstream] creating ${did}/${rkey}...`);
|
|
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
|
upsertNote({ did, rkey, ...note });
|
|
log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
|
|
await fireWebhooks(did, "create", { event: "create", did, rkey, ...note });
|
|
queueBulkCreate(did, { rkey, ...note });
|
|
} catch (error) {
|
|
log(`[jetstream] error on create:`, error);
|
|
}
|
|
});
|
|
|
|
jetstream.onUpdate("space.remanso.note", async (event) => {
|
|
try {
|
|
const {
|
|
did,
|
|
commit: { rkey, record },
|
|
} = event;
|
|
log(`[jetstream] updating ${did}/${rkey}...`);
|
|
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
|
upsertNote({ did, rkey, ...note });
|
|
log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
|
|
// Updates fold into the `create` verb — subscribers reconcile by (did, rkey).
|
|
await fireWebhooks(did, "create", { event: "create", did, rkey, ...note });
|
|
queueBulkCreate(did, { rkey, ...note });
|
|
} catch (error) {
|
|
log(`[jetstream] error on update:`, error);
|
|
}
|
|
});
|
|
|
|
jetstream.onDelete("space.remanso.note", async (event) => {
|
|
try {
|
|
const {
|
|
did,
|
|
commit: { rkey },
|
|
} = event;
|
|
log(`[jetstream] deleting ${did}/${rkey}...`);
|
|
deleteNote({ did, rkey });
|
|
log(`[jetstream] delete ${did}/${rkey}`);
|
|
await fireWebhooks(did, "delete", { event: "delete", did, rkey });
|
|
} catch (error) {
|
|
log(`[jetstream] error on delete:`, error);
|
|
}
|
|
});
|
|
|
|
jetstream.on("open", () => {
|
|
log("[jetstream] connected");
|
|
});
|
|
|
|
jetstream.on("close", () => {
|
|
log("[jetstream] connection closed");
|
|
});
|
|
|
|
jetstream.on("error", (error) => {
|
|
log("[jetstream] connection closed with error", error);
|
|
});
|
|
|
|
log("[jetstream] launching");
|
|
jetstream.start();
|
|
|
|
setInterval(() => {
|
|
if (jetstream.cursor) {
|
|
saveCursor(jetstream.cursor);
|
|
}
|
|
}, 5000);
|