feat: fire webhooks on jetstream events, cap at 10 per DID
- db.ts: getWebhooksByDid returns the 10 most recent subscriptions (ORDER BY id DESC LIMIT 10) - jetstream.ts: fireWebhooks fans out to registered URLs via Promise.allSettled after each create/update/delete event
This commit is contained in:
32
jetstream.ts
32
jetstream.ts
@@ -2,6 +2,7 @@ import { Jetstream } from "@skyware/jetstream";
|
|||||||
import {
|
import {
|
||||||
deleteNote,
|
deleteNote,
|
||||||
getCursor,
|
getCursor,
|
||||||
|
getWebhooksByDid,
|
||||||
saveCursor,
|
saveCursor,
|
||||||
upsertNote,
|
upsertNote,
|
||||||
} from "./src/data/db.ts";
|
} from "./src/data/db.ts";
|
||||||
@@ -16,6 +17,28 @@ globalThis.addEventListener("error", (e) => {
|
|||||||
log("[jetstream] uncaught error:", e.error);
|
log("[jetstream] uncaught error:", e.error);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const fireWebhooks = async (
|
||||||
|
did: string,
|
||||||
|
payload: Record<string, unknown>,
|
||||||
|
): Promise<void> => {
|
||||||
|
const webhooks = getWebhooksByDid(did);
|
||||||
|
if (webhooks.length === 0) return;
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
webhooks.map(({ method, url }) =>
|
||||||
|
fetch(url, {
|
||||||
|
method,
|
||||||
|
headers: { "Content-Type": "application/json" },
|
||||||
|
body: JSON.stringify(payload),
|
||||||
|
})
|
||||||
|
),
|
||||||
|
);
|
||||||
|
for (const result of results) {
|
||||||
|
if (result.status === "rejected") {
|
||||||
|
log(`[jetstream] webhook error for ${did}:`, result.reason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const cursor = getCursor();
|
const cursor = getCursor();
|
||||||
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
|
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
|
||||||
|
|
||||||
@@ -25,36 +48,39 @@ const jetstream = new Jetstream({
|
|||||||
endpoint: "https://jetstream2.fr.hose.cam/subscribe"
|
endpoint: "https://jetstream2.fr.hose.cam/subscribe"
|
||||||
});
|
});
|
||||||
|
|
||||||
jetstream.onCreate("space.remanso.note", (event) => {
|
jetstream.onCreate("space.remanso.note", async (event) => {
|
||||||
try {
|
try {
|
||||||
const { did, commit: { rkey, record } } = event;
|
const { did, commit: { rkey, record } } = event;
|
||||||
log(`[jetstream] creating ${did}/${rkey}...`);
|
log(`[jetstream] creating ${did}/${rkey}...`);
|
||||||
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
||||||
upsertNote({ did, rkey, ...note });
|
upsertNote({ did, rkey, ...note });
|
||||||
log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
|
log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
|
||||||
|
await fireWebhooks(did, { event: "create", did, rkey, ...note });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log(`[jetstream] error on create:`, error);
|
log(`[jetstream] error on create:`, error);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
jetstream.onUpdate("space.remanso.note", (event) => {
|
jetstream.onUpdate("space.remanso.note", async (event) => {
|
||||||
try {
|
try {
|
||||||
const { did, commit: { rkey, record } } = event;
|
const { did, commit: { rkey, record } } = event;
|
||||||
log(`[jetstream] updating ${did}/${rkey}...`);
|
log(`[jetstream] updating ${did}/${rkey}...`);
|
||||||
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
const note = record as unknown as Omit<Note, "did" | "rkey">;
|
||||||
upsertNote({ did, rkey, ...note });
|
upsertNote({ did, rkey, ...note });
|
||||||
log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
|
log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
|
||||||
|
await fireWebhooks(did, { event: "update", did, rkey, ...note });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log(`[jetstream] error on update:`, error);
|
log(`[jetstream] error on update:`, error);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
jetstream.onDelete("space.remanso.note", (event) => {
|
jetstream.onDelete("space.remanso.note", async (event) => {
|
||||||
try {
|
try {
|
||||||
const { did, commit: { rkey } } = event;
|
const { did, commit: { rkey } } = event;
|
||||||
log(`[jetstream] deleting ${did}/${rkey}...`);
|
log(`[jetstream] deleting ${did}/${rkey}...`);
|
||||||
deleteNote({ did, rkey });
|
deleteNote({ did, rkey });
|
||||||
log(`[jetstream] delete ${did}/${rkey}`);
|
log(`[jetstream] delete ${did}/${rkey}`);
|
||||||
|
await fireWebhooks(did, { event: "delete", did, rkey });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log(`[jetstream] error on delete:`, error);
|
log(`[jetstream] error on delete:`, error);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -92,6 +92,12 @@ export const deleteWebhooksByDid = (did: string): void => {
|
|||||||
db.exec("DELETE FROM webhook_subscription WHERE did = ?", did);
|
db.exec("DELETE FROM webhook_subscription WHERE did = ?", did);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const getWebhooksByDid = (did: string): WebhookSubscriptionRow[] => {
|
||||||
|
return db.prepare(
|
||||||
|
"SELECT id, did, method, url FROM webhook_subscription WHERE did = ? ORDER BY id DESC LIMIT 10",
|
||||||
|
).all<WebhookSubscriptionRow>(did);
|
||||||
|
};
|
||||||
|
|
||||||
export const upsertNote = (note: Note) => {
|
export const upsertNote = (note: Note) => {
|
||||||
const now = new Date().toISOString();
|
const now = new Date().toISOString();
|
||||||
db.exec(
|
db.exec(
|
||||||
|
|||||||
Reference in New Issue
Block a user