From c84b4c5f97e72b7752ed2b91d03e999b781bfc45 Mon Sep 17 00:00:00 2001 From: Julien Calixte Date: Tue, 17 Feb 2026 01:17:42 +0100 Subject: [PATCH] robustness: split jetstream into own container, add cursor persistence Jetstream was running backgrounded in the same container as the API server, so crashes went undetected and Docker never restarted it. Now each process runs as a separate docker-compose service with independent restart policies. Also adds cursor persistence to SQLite (saved every 5s) so restarts resume from where they left off, moves event destructuring inside try/catch blocks, and adds global unhandled error/rejection handlers for crash visibility. --- .vscode/settings.json | 2 +- CLAUDE.md | 24 +++++++++++++------ Dockerfile | 2 +- README.md | 14 ++++++----- docker-compose.yml | 13 +++++++++-- jetstream.ts | 53 +++++++++++++++++++++++++++++++----------- server.ts | 12 +++++++--- src/data/db.ts | 16 ++++++++++++- src/migrations/init.ts | 7 ++++++ 9 files changed, 108 insertions(+), 35 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 4b9fb22..cbac569 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,3 @@ { "deno.enable": true -} \ No newline at end of file +} diff --git a/CLAUDE.md b/CLAUDE.md index 58367e7..0dea89a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,13 +1,19 @@ # CLAUDE.md -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +This file provides guidance to Claude Code (claude.ai/code) when working with +code in this repository. ## Project Overview -litenote-jetstream is the backend for Remanso, a blogging platform built on the AT Protocol (Bluesky ecosystem). It has two processes: +litenote-jetstream is the backend for Remanso, a blogging platform built on the +AT Protocol (Bluesky ecosystem). It has two processes: -1. **Jetstream listener** (`jetstream.ts`) — Subscribes to the AT Protocol firehose via `@skyware/jetstream`, filtering for `space.remanso.note` records. On create/update events, it upserts notes into a local SQLite database. -2. **HTTP API server** (`server.ts`) — An Oak (Deno HTTP framework) server on port 8080 that exposes read-only endpoints to query stored notes. +1. **Jetstream listener** (`jetstream.ts`) — Subscribes to the AT Protocol + firehose via `@skyware/jetstream`, filtering for `space.remanso.note` + records. On create/update events, it upserts notes into a local SQLite + database. +2. **HTTP API server** (`server.ts`) — An Oak (Deno HTTP framework) server on + port 8080 that exposes read-only endpoints to query stored notes. Both processes share the same SQLite database (`src/data/db.ts`). @@ -35,9 +41,13 @@ deno fmt ## Architecture -- **Runtime**: Deno (not Bun, despite the README). Uses `deno.json` for task definitions and import maps. -- **Database**: SQLite via `https://deno.land/x/sqlite/mod.ts`. DB path is configurable via `SQLITE_PATH` env var, defaults to `notes.db`. -- **Note schema**: Defined as an AT Protocol lexicon in `lexicons/space/remanso/note.json`. Notes have `title`, optional `images` (blob refs), `publishedAt`, and `createdAt`. Primary key is `(did, rkey)`. +- **Runtime**: Deno (not Bun, despite the README). Uses `deno.json` for task + definitions and import maps. +- **Database**: SQLite via `https://deno.land/x/sqlite/mod.ts`. DB path is + configurable via `SQLITE_PATH` env var, defaults to `notes.db`. +- **Note schema**: Defined as an AT Protocol lexicon in + `lexicons/space/remanso/note.json`. Notes have `title`, optional `images` + (blob refs), `publishedAt`, and `createdAt`. Primary key is `(did, rkey)`. - **API endpoints**: - `GET /notes?cursor=&limit=` — paginated notes (all users) - `GET /:did/notes?cursor=&limit=` — paginated notes for a specific DID diff --git a/Dockerfile b/Dockerfile index 4f48f9a..7d8c0ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,4 +14,4 @@ ENV SQLITE_PATH=/data/notes.db EXPOSE 8080 -CMD ["sh", "-c", "deno task migrate && deno task jetstream:prod & deno task server:prod"] +CMD ["deno", "task", "server:prod"] diff --git a/README.md b/README.md index 1c60fc2..b0a25e6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # litenote-jetstream -Backend for [Remanso](https://remanso.space), a blogging platform on the AT Protocol. Listens to the Jetstream firehose for `space.remanso.note` records and serves them via a REST API. +Backend for [Remanso](https://remanso.space), a blogging platform on the AT +Protocol. Listens to the Jetstream firehose for `space.remanso.note` records and +serves them via a REST API. ## Prerequisites @@ -39,13 +41,13 @@ docker run -p 8080:8080 -v litenote-data:/data litenote-jetstream ## API -| Endpoint | Description | -|---|---| -| `GET /notes?cursor=&limit=` | Paginated notes from all users | +| Endpoint | Description | +| -------------------------------- | ---------------------------------- | +| `GET /notes?cursor=&limit=` | Paginated notes from all users | | `GET /:did/notes?cursor=&limit=` | Paginated notes for a specific DID | ## Environment Variables -| Variable | Default | Description | -|---|---|---| +| Variable | Default | Description | +| ------------- | ---------- | -------------------------------- | | `SQLITE_PATH` | `notes.db` | Path to the SQLite database file | diff --git a/docker-compose.yml b/docker-compose.yml index 6831383..bba8966 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,17 @@ services: - api: - # build: . + jetstream: image: docker.li212.fr/litenote:latest restart: unless-stopped + command: ["sh", "-c", "deno task migrate && deno task jetstream:prod"] + env_file: + - .env + volumes: + - ${DATA_VOLUME:-data}:/data + + api: + image: docker.li212.fr/litenote:latest + restart: unless-stopped + command: ["deno", "task", "server:prod"] ports: - "${PORT}:8080" env_file: diff --git a/jetstream.ts b/jetstream.ts index b797dbd..ccf159a 100644 --- a/jetstream.ts +++ b/jetstream.ts @@ -1,48 +1,68 @@ import { Jetstream } from "@skyware/jetstream"; -import { deleteNote, upsertNote } from "./src/data/db.ts"; +import { + deleteNote, + getCursor, + saveCursor, + upsertNote, +} from "./src/data/db.ts"; import { Note } from "./src/data/note.ts"; import { log } from "./src/log.ts"; +globalThis.addEventListener("unhandledrejection", (e) => { + log("[jetstream] unhandled rejection:", e.reason); +}); + +globalThis.addEventListener("error", (e) => { + log("[jetstream] uncaught error:", e.error); +}); + +const cursor = getCursor(); +log(`[jetstream] starting with cursor: ${cursor ?? "none"}`); + const jetstream = new Jetstream({ wantedCollections: ["space.remanso.note"], + cursor: cursor ? Number(cursor) : undefined, }); jetstream.onCreate("space.remanso.note", (event) => { - const { did, commit: { rkey, record } } = event; - log(`[jetstream] creating ${did}/${rkey}...`); - const note = record as unknown as Omit; try { + const { did, commit: { rkey, record } } = event; + log(`[jetstream] creating ${did}/${rkey}...`); + const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] create ${did}/${rkey}: ${note.title}`); } catch (error) { - log(`[jetstream] error on create ${did}/${rkey}:`, error); + log(`[jetstream] error on create:`, error); } }); jetstream.onUpdate("space.remanso.note", (event) => { - const { did, commit: { rkey, record } } = event; - log(`[jetstream] updating ${did}/${rkey}...`); - const note = record as unknown as Omit; try { + const { did, commit: { rkey, record } } = event; + log(`[jetstream] updating ${did}/${rkey}...`); + const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] update ${did}/${rkey}: ${note.title}`); } catch (error) { - log(`[jetstream] error on update ${did}/${rkey}:`, error); + log(`[jetstream] error on update:`, error); } }); jetstream.onDelete("space.remanso.note", (event) => { - const { did, commit: { rkey } } = event; - - log(`[jetstream] deleting ${did}/${rkey}...`); try { + const { did, commit: { rkey } } = event; + log(`[jetstream] deleting ${did}/${rkey}...`); deleteNote({ did, rkey }); log(`[jetstream] delete ${did}/${rkey}`); } catch (error) { - log(`[jetstream] error on delete ${did}/${rkey}:`, error); + log(`[jetstream] error on delete:`, error); } }); +jetstream.on("open", () => { + log("[jetstream] connected"); +}); + jetstream.on("close", () => { log("[jetstream] connection closed"); }); @@ -52,5 +72,10 @@ jetstream.on("error", (error) => { }); log("[jetstream] launching"); - jetstream.start(); + +setInterval(() => { + if (jetstream.cursor) { + saveCursor(jetstream.cursor); + } +}, 5000); diff --git a/server.ts b/server.ts index fd4cbd7..4e88683 100644 --- a/server.ts +++ b/server.ts @@ -4,7 +4,7 @@ import { log } from "./src/log.ts"; const router = new Router(); -const PAGINATION = 20 +const PAGINATION = 20; router.get("/", (ctx) => { ctx.response.body = "Hello world"; @@ -48,8 +48,14 @@ const app = new Application(); app.use(async (ctx, next) => { ctx.response.headers.set("Access-Control-Allow-Origin", "*"); - ctx.response.headers.set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"); - ctx.response.headers.set("Access-Control-Allow-Headers", "Content-Type, Authorization"); + ctx.response.headers.set( + "Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS", + ); + ctx.response.headers.set( + "Access-Control-Allow-Headers", + "Content-Type, Authorization", + ); if (ctx.request.method === "OPTIONS") { ctx.response.status = 204; return; diff --git a/src/data/db.ts b/src/data/db.ts index a947349..9de9964 100644 --- a/src/data/db.ts +++ b/src/data/db.ts @@ -57,8 +57,22 @@ export const deleteNote = ({ did, rkey }: { did: string; rkey: string }) => { db.query("DELETE FROM note WHERE did = ? AND rkey = ?", [did, rkey]); }; +export const getCursor = (): string | undefined => { + const rows = db.query<[string]>( + "SELECT value FROM state WHERE key = 'cursor'", + ); + return rows[0]?.[0]; +}; + +export const saveCursor = (cursor: number) => { + db.query( + "INSERT OR REPLACE INTO state (key, value) VALUES ('cursor', ?)", + [String(cursor)], + ); +}; + export const upsertNote = (note: Note) => { - const now = new Date().toISOString() + const now = new Date().toISOString(); db.query( ` diff --git a/src/migrations/init.ts b/src/migrations/init.ts index 40f240b..7dfea35 100644 --- a/src/migrations/init.ts +++ b/src/migrations/init.ts @@ -11,4 +11,11 @@ db.execute(` ); `); +db.execute(` + CREATE TABLE IF NOT EXISTS state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); +`); + db.close();