Files
remanso-jetstream/jetstream.ts
Julien Calixte c84b4c5f97 robustness: split jetstream into own container, add cursor persistence
Jetstream was running backgrounded in the same container as the API server,
so crashes went undetected and Docker never restarted it. Now each process
runs as a separate docker-compose service with independent restart policies.

Also adds cursor persistence to SQLite (saved every 5s) so restarts resume
from where they left off, moves event destructuring inside try/catch blocks,
and adds global unhandled error/rejection handlers for crash visibility.
2026-02-17 01:17:42 +01:00

82 lines
2.1 KiB
TypeScript

import { Jetstream } from "@skyware/jetstream";
import {
deleteNote,
getCursor,
saveCursor,
upsertNote,
} from "./src/data/db.ts";
import { Note } from "./src/data/note.ts";
import { log } from "./src/log.ts";
globalThis.addEventListener("unhandledrejection", (e) => {
log("[jetstream] unhandled rejection:", e.reason);
});
globalThis.addEventListener("error", (e) => {
log("[jetstream] uncaught error:", e.error);
});
const cursor = getCursor();
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
const jetstream = new Jetstream({
wantedCollections: ["space.remanso.note"],
cursor: cursor ? Number(cursor) : undefined,
});
jetstream.onCreate("space.remanso.note", (event) => {
try {
const { did, commit: { rkey, record } } = event;
log(`[jetstream] creating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note });
log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
} catch (error) {
log(`[jetstream] error on create:`, error);
}
});
jetstream.onUpdate("space.remanso.note", (event) => {
try {
const { did, commit: { rkey, record } } = event;
log(`[jetstream] updating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note });
log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
} catch (error) {
log(`[jetstream] error on update:`, error);
}
});
jetstream.onDelete("space.remanso.note", (event) => {
try {
const { did, commit: { rkey } } = event;
log(`[jetstream] deleting ${did}/${rkey}...`);
deleteNote({ did, rkey });
log(`[jetstream] delete ${did}/${rkey}`);
} catch (error) {
log(`[jetstream] error on delete:`, error);
}
});
jetstream.on("open", () => {
log("[jetstream] connected");
});
jetstream.on("close", () => {
log("[jetstream] connection closed");
});
jetstream.on("error", (error) => {
log("[jetstream] connection closed with error", error);
});
log("[jetstream] launching");
jetstream.start();
setInterval(() => {
if (jetstream.cursor) {
saveCursor(jetstream.cursor);
}
}, 5000);