robustness: split jetstream into own container, add cursor persistence

Jetstream was running backgrounded in the same container as the API server,
so crashes went undetected and Docker never restarted it. Now each process
runs as a separate docker-compose service with independent restart policies.

Also adds cursor persistence to SQLite (saved every 5s) so restarts resume
from where they left off, moves event destructuring inside try/catch blocks,
and adds global unhandled error/rejection handlers for crash visibility.
This commit is contained in:
Julien Calixte
2026-02-17 01:17:42 +01:00
parent 1a7841b728
commit c84b4c5f97
9 changed files with 108 additions and 35 deletions

View File

@@ -1,3 +1,3 @@
{ {
"deno.enable": true "deno.enable": true
} }

View File

@@ -1,13 +1,19 @@
# CLAUDE.md # CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. This file provides guidance to Claude Code (claude.ai/code) when working with
code in this repository.
## Project Overview ## Project Overview
litenote-jetstream is the backend for Remanso, a blogging platform built on the AT Protocol (Bluesky ecosystem). It has two processes: litenote-jetstream is the backend for Remanso, a blogging platform built on the
AT Protocol (Bluesky ecosystem). It has two processes:
1. **Jetstream listener** (`jetstream.ts`) — Subscribes to the AT Protocol firehose via `@skyware/jetstream`, filtering for `space.remanso.note` records. On create/update events, it upserts notes into a local SQLite database. 1. **Jetstream listener** (`jetstream.ts`) — Subscribes to the AT Protocol
2. **HTTP API server** (`server.ts`) — An Oak (Deno HTTP framework) server on port 8080 that exposes read-only endpoints to query stored notes. firehose via `@skyware/jetstream`, filtering for `space.remanso.note`
records. On create/update events, it upserts notes into a local SQLite
database.
2. **HTTP API server** (`server.ts`) — An Oak (Deno HTTP framework) server on
port 8080 that exposes read-only endpoints to query stored notes.
Both processes share the same SQLite database (`src/data/db.ts`). Both processes share the same SQLite database (`src/data/db.ts`).
@@ -35,9 +41,13 @@ deno fmt
## Architecture ## Architecture
- **Runtime**: Deno (not Bun, despite the README). Uses `deno.json` for task definitions and import maps. - **Runtime**: Deno (not Bun, despite the README). Uses `deno.json` for task
- **Database**: SQLite via `https://deno.land/x/sqlite/mod.ts`. DB path is configurable via `SQLITE_PATH` env var, defaults to `notes.db`. definitions and import maps.
- **Note schema**: Defined as an AT Protocol lexicon in `lexicons/space/remanso/note.json`. Notes have `title`, optional `images` (blob refs), `publishedAt`, and `createdAt`. Primary key is `(did, rkey)`. - **Database**: SQLite via `https://deno.land/x/sqlite/mod.ts`. DB path is
configurable via `SQLITE_PATH` env var, defaults to `notes.db`.
- **Note schema**: Defined as an AT Protocol lexicon in
`lexicons/space/remanso/note.json`. Notes have `title`, optional `images`
(blob refs), `publishedAt`, and `createdAt`. Primary key is `(did, rkey)`.
- **API endpoints**: - **API endpoints**:
- `GET /notes?cursor=&limit=` — paginated notes (all users) - `GET /notes?cursor=&limit=` — paginated notes (all users)
- `GET /:did/notes?cursor=&limit=` — paginated notes for a specific DID - `GET /:did/notes?cursor=&limit=` — paginated notes for a specific DID

View File

@@ -14,4 +14,4 @@ ENV SQLITE_PATH=/data/notes.db
EXPOSE 8080 EXPOSE 8080
CMD ["sh", "-c", "deno task migrate && deno task jetstream:prod & deno task server:prod"] CMD ["deno", "task", "server:prod"]

View File

@@ -1,6 +1,8 @@
# litenote-jetstream # litenote-jetstream
Backend for [Remanso](https://remanso.space), a blogging platform on the AT Protocol. Listens to the Jetstream firehose for `space.remanso.note` records and serves them via a REST API. Backend for [Remanso](https://remanso.space), a blogging platform on the AT
Protocol. Listens to the Jetstream firehose for `space.remanso.note` records and
serves them via a REST API.
## Prerequisites ## Prerequisites
@@ -39,13 +41,13 @@ docker run -p 8080:8080 -v litenote-data:/data litenote-jetstream
## API ## API
| Endpoint | Description | | Endpoint | Description |
|---|---| | -------------------------------- | ---------------------------------- |
| `GET /notes?cursor=&limit=` | Paginated notes from all users | | `GET /notes?cursor=&limit=` | Paginated notes from all users |
| `GET /:did/notes?cursor=&limit=` | Paginated notes for a specific DID | | `GET /:did/notes?cursor=&limit=` | Paginated notes for a specific DID |
## Environment Variables ## Environment Variables
| Variable | Default | Description | | Variable | Default | Description |
|---|---|---| | ------------- | ---------- | -------------------------------- |
| `SQLITE_PATH` | `notes.db` | Path to the SQLite database file | | `SQLITE_PATH` | `notes.db` | Path to the SQLite database file |

View File

@@ -1,8 +1,17 @@
services: services:
api: jetstream:
# build: .
image: docker.li212.fr/litenote:latest image: docker.li212.fr/litenote:latest
restart: unless-stopped restart: unless-stopped
command: ["sh", "-c", "deno task migrate && deno task jetstream:prod"]
env_file:
- .env
volumes:
- ${DATA_VOLUME:-data}:/data
api:
image: docker.li212.fr/litenote:latest
restart: unless-stopped
command: ["deno", "task", "server:prod"]
ports: ports:
- "${PORT}:8080" - "${PORT}:8080"
env_file: env_file:

View File

@@ -1,48 +1,68 @@
import { Jetstream } from "@skyware/jetstream"; import { Jetstream } from "@skyware/jetstream";
import { deleteNote, upsertNote } from "./src/data/db.ts"; import {
deleteNote,
getCursor,
saveCursor,
upsertNote,
} from "./src/data/db.ts";
import { Note } from "./src/data/note.ts"; import { Note } from "./src/data/note.ts";
import { log } from "./src/log.ts"; import { log } from "./src/log.ts";
globalThis.addEventListener("unhandledrejection", (e) => {
log("[jetstream] unhandled rejection:", e.reason);
});
globalThis.addEventListener("error", (e) => {
log("[jetstream] uncaught error:", e.error);
});
const cursor = getCursor();
log(`[jetstream] starting with cursor: ${cursor ?? "none"}`);
const jetstream = new Jetstream({ const jetstream = new Jetstream({
wantedCollections: ["space.remanso.note"], wantedCollections: ["space.remanso.note"],
cursor: cursor ? Number(cursor) : undefined,
}); });
jetstream.onCreate("space.remanso.note", (event) => { jetstream.onCreate("space.remanso.note", (event) => {
const { did, commit: { rkey, record } } = event;
log(`[jetstream] creating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">;
try { try {
const { did, commit: { rkey, record } } = event;
log(`[jetstream] creating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note }); upsertNote({ did, rkey, ...note });
log(`[jetstream] create ${did}/${rkey}: ${note.title}`); log(`[jetstream] create ${did}/${rkey}: ${note.title}`);
} catch (error) { } catch (error) {
log(`[jetstream] error on create ${did}/${rkey}:`, error); log(`[jetstream] error on create:`, error);
} }
}); });
jetstream.onUpdate("space.remanso.note", (event) => { jetstream.onUpdate("space.remanso.note", (event) => {
const { did, commit: { rkey, record } } = event;
log(`[jetstream] updating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">;
try { try {
const { did, commit: { rkey, record } } = event;
log(`[jetstream] updating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note }); upsertNote({ did, rkey, ...note });
log(`[jetstream] update ${did}/${rkey}: ${note.title}`); log(`[jetstream] update ${did}/${rkey}: ${note.title}`);
} catch (error) { } catch (error) {
log(`[jetstream] error on update ${did}/${rkey}:`, error); log(`[jetstream] error on update:`, error);
} }
}); });
jetstream.onDelete("space.remanso.note", (event) => { jetstream.onDelete("space.remanso.note", (event) => {
const { did, commit: { rkey } } = event;
log(`[jetstream] deleting ${did}/${rkey}...`);
try { try {
const { did, commit: { rkey } } = event;
log(`[jetstream] deleting ${did}/${rkey}...`);
deleteNote({ did, rkey }); deleteNote({ did, rkey });
log(`[jetstream] delete ${did}/${rkey}`); log(`[jetstream] delete ${did}/${rkey}`);
} catch (error) { } catch (error) {
log(`[jetstream] error on delete ${did}/${rkey}:`, error); log(`[jetstream] error on delete:`, error);
} }
}); });
jetstream.on("open", () => {
log("[jetstream] connected");
});
jetstream.on("close", () => { jetstream.on("close", () => {
log("[jetstream] connection closed"); log("[jetstream] connection closed");
}); });
@@ -52,5 +72,10 @@ jetstream.on("error", (error) => {
}); });
log("[jetstream] launching"); log("[jetstream] launching");
jetstream.start(); jetstream.start();
setInterval(() => {
if (jetstream.cursor) {
saveCursor(jetstream.cursor);
}
}, 5000);

View File

@@ -4,7 +4,7 @@ import { log } from "./src/log.ts";
const router = new Router(); const router = new Router();
const PAGINATION = 20 const PAGINATION = 20;
router.get("/", (ctx) => { router.get("/", (ctx) => {
ctx.response.body = "Hello world"; ctx.response.body = "Hello world";
@@ -48,8 +48,14 @@ const app = new Application();
app.use(async (ctx, next) => { app.use(async (ctx, next) => {
ctx.response.headers.set("Access-Control-Allow-Origin", "*"); ctx.response.headers.set("Access-Control-Allow-Origin", "*");
ctx.response.headers.set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"); ctx.response.headers.set(
ctx.response.headers.set("Access-Control-Allow-Headers", "Content-Type, Authorization"); "Access-Control-Allow-Methods",
"GET, POST, PUT, DELETE, OPTIONS",
);
ctx.response.headers.set(
"Access-Control-Allow-Headers",
"Content-Type, Authorization",
);
if (ctx.request.method === "OPTIONS") { if (ctx.request.method === "OPTIONS") {
ctx.response.status = 204; ctx.response.status = 204;
return; return;

View File

@@ -57,8 +57,22 @@ export const deleteNote = ({ did, rkey }: { did: string; rkey: string }) => {
db.query("DELETE FROM note WHERE did = ? AND rkey = ?", [did, rkey]); db.query("DELETE FROM note WHERE did = ? AND rkey = ?", [did, rkey]);
}; };
export const getCursor = (): string | undefined => {
const rows = db.query<[string]>(
"SELECT value FROM state WHERE key = 'cursor'",
);
return rows[0]?.[0];
};
export const saveCursor = (cursor: number) => {
db.query(
"INSERT OR REPLACE INTO state (key, value) VALUES ('cursor', ?)",
[String(cursor)],
);
};
export const upsertNote = (note: Note) => { export const upsertNote = (note: Note) => {
const now = new Date().toISOString() const now = new Date().toISOString();
db.query( db.query(
` `

View File

@@ -11,4 +11,11 @@ db.execute(`
); );
`); `);
db.execute(`
CREATE TABLE IF NOT EXISTS state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`);
db.close(); db.close();