diff --git a/.vscode/settings.json b/.vscode/settings.json index 4b9fb22..cbac569 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,3 @@ { "deno.enable": true -} \ No newline at end of file +} diff --git a/CLAUDE.md b/CLAUDE.md index 58367e7..0dea89a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,13 +1,19 @@ # CLAUDE.md -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +This file provides guidance to Claude Code (claude.ai/code) when working with +code in this repository. ## Project Overview -litenote-jetstream is the backend for Remanso, a blogging platform built on the AT Protocol (Bluesky ecosystem). It has two processes: +litenote-jetstream is the backend for Remanso, a blogging platform built on the +AT Protocol (Bluesky ecosystem). It has two processes: -1. **Jetstream listener** (`jetstream.ts`) — Subscribes to the AT Protocol firehose via `@skyware/jetstream`, filtering for `space.remanso.note` records. On create/update events, it upserts notes into a local SQLite database. -2. **HTTP API server** (`server.ts`) — An Oak (Deno HTTP framework) server on port 8080 that exposes read-only endpoints to query stored notes. +1. **Jetstream listener** (`jetstream.ts`) — Subscribes to the AT Protocol + firehose via `@skyware/jetstream`, filtering for `space.remanso.note` + records. On create/update events, it upserts notes into a local SQLite + database. +2. **HTTP API server** (`server.ts`) — An Oak (Deno HTTP framework) server on + port 8080 that exposes read-only endpoints to query stored notes. Both processes share the same SQLite database (`src/data/db.ts`). @@ -35,9 +41,13 @@ deno fmt ## Architecture -- **Runtime**: Deno (not Bun, despite the README). Uses `deno.json` for task definitions and import maps. -- **Database**: SQLite via `https://deno.land/x/sqlite/mod.ts`. DB path is configurable via `SQLITE_PATH` env var, defaults to `notes.db`. -- **Note schema**: Defined as an AT Protocol lexicon in `lexicons/space/remanso/note.json`. Notes have `title`, optional `images` (blob refs), `publishedAt`, and `createdAt`. Primary key is `(did, rkey)`. +- **Runtime**: Deno (not Bun, despite the README). Uses `deno.json` for task + definitions and import maps. +- **Database**: SQLite via `https://deno.land/x/sqlite/mod.ts`. DB path is + configurable via `SQLITE_PATH` env var, defaults to `notes.db`. +- **Note schema**: Defined as an AT Protocol lexicon in + `lexicons/space/remanso/note.json`. Notes have `title`, optional `images` + (blob refs), `publishedAt`, and `createdAt`. Primary key is `(did, rkey)`. - **API endpoints**: - `GET /notes?cursor=&limit=` — paginated notes (all users) - `GET /:did/notes?cursor=&limit=` — paginated notes for a specific DID diff --git a/Dockerfile b/Dockerfile index 4f48f9a..7d8c0ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,4 +14,4 @@ ENV SQLITE_PATH=/data/notes.db EXPOSE 8080 -CMD ["sh", "-c", "deno task migrate && deno task jetstream:prod & deno task server:prod"] +CMD ["deno", "task", "server:prod"] diff --git a/README.md b/README.md index 1c60fc2..b0a25e6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # litenote-jetstream -Backend for [Remanso](https://remanso.space), a blogging platform on the AT Protocol. Listens to the Jetstream firehose for `space.remanso.note` records and serves them via a REST API. +Backend for [Remanso](https://remanso.space), a blogging platform on the AT +Protocol. Listens to the Jetstream firehose for `space.remanso.note` records and +serves them via a REST API. ## Prerequisites @@ -39,13 +41,13 @@ docker run -p 8080:8080 -v litenote-data:/data litenote-jetstream ## API -| Endpoint | Description | -|---|---| -| `GET /notes?cursor=&limit=` | Paginated notes from all users | +| Endpoint | Description | +| -------------------------------- | ---------------------------------- | +| `GET /notes?cursor=&limit=` | Paginated notes from all users | | `GET /:did/notes?cursor=&limit=` | Paginated notes for a specific DID | ## Environment Variables -| Variable | Default | Description | -|---|---|---| +| Variable | Default | Description | +| ------------- | ---------- | -------------------------------- | | `SQLITE_PATH` | `notes.db` | Path to the SQLite database file | diff --git a/docker-compose.yml b/docker-compose.yml index 6831383..bba8966 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,17 @@ services: - api: - # build: . + jetstream: image: docker.li212.fr/litenote:latest restart: unless-stopped + command: ["sh", "-c", "deno task migrate && deno task jetstream:prod"] + env_file: + - .env + volumes: + - ${DATA_VOLUME:-data}:/data + + api: + image: docker.li212.fr/litenote:latest + restart: unless-stopped + command: ["deno", "task", "server:prod"] ports: - "${PORT}:8080" env_file: diff --git a/jetstream.ts b/jetstream.ts index b797dbd..ccf159a 100644 --- a/jetstream.ts +++ b/jetstream.ts @@ -1,48 +1,68 @@ import { Jetstream } from "@skyware/jetstream"; -import { deleteNote, upsertNote } from "./src/data/db.ts"; +import { + deleteNote, + getCursor, + saveCursor, + upsertNote, +} from "./src/data/db.ts"; import { Note } from "./src/data/note.ts"; import { log } from "./src/log.ts"; +globalThis.addEventListener("unhandledrejection", (e) => { + log("[jetstream] unhandled rejection:", e.reason); +}); + +globalThis.addEventListener("error", (e) => { + log("[jetstream] uncaught error:", e.error); +}); + +const cursor = getCursor(); +log(`[jetstream] starting with cursor: ${cursor ?? "none"}`); + const jetstream = new Jetstream({ wantedCollections: ["space.remanso.note"], + cursor: cursor ? Number(cursor) : undefined, }); jetstream.onCreate("space.remanso.note", (event) => { - const { did, commit: { rkey, record } } = event; - log(`[jetstream] creating ${did}/${rkey}...`); - const note = record as unknown as Omit; try { + const { did, commit: { rkey, record } } = event; + log(`[jetstream] creating ${did}/${rkey}...`); + const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] create ${did}/${rkey}: ${note.title}`); } catch (error) { - log(`[jetstream] error on create ${did}/${rkey}:`, error); + log(`[jetstream] error on create:`, error); } }); jetstream.onUpdate("space.remanso.note", (event) => { - const { did, commit: { rkey, record } } = event; - log(`[jetstream] updating ${did}/${rkey}...`); - const note = record as unknown as Omit; try { + const { did, commit: { rkey, record } } = event; + log(`[jetstream] updating ${did}/${rkey}...`); + const note = record as unknown as Omit; upsertNote({ did, rkey, ...note }); log(`[jetstream] update ${did}/${rkey}: ${note.title}`); } catch (error) { - log(`[jetstream] error on update ${did}/${rkey}:`, error); + log(`[jetstream] error on update:`, error); } }); jetstream.onDelete("space.remanso.note", (event) => { - const { did, commit: { rkey } } = event; - - log(`[jetstream] deleting ${did}/${rkey}...`); try { + const { did, commit: { rkey } } = event; + log(`[jetstream] deleting ${did}/${rkey}...`); deleteNote({ did, rkey }); log(`[jetstream] delete ${did}/${rkey}`); } catch (error) { - log(`[jetstream] error on delete ${did}/${rkey}:`, error); + log(`[jetstream] error on delete:`, error); } }); +jetstream.on("open", () => { + log("[jetstream] connected"); +}); + jetstream.on("close", () => { log("[jetstream] connection closed"); }); @@ -52,5 +72,10 @@ jetstream.on("error", (error) => { }); log("[jetstream] launching"); - jetstream.start(); + +setInterval(() => { + if (jetstream.cursor) { + saveCursor(jetstream.cursor); + } +}, 5000); diff --git a/server.ts b/server.ts index fd4cbd7..4e88683 100644 --- a/server.ts +++ b/server.ts @@ -4,7 +4,7 @@ import { log } from "./src/log.ts"; const router = new Router(); -const PAGINATION = 20 +const PAGINATION = 20; router.get("/", (ctx) => { ctx.response.body = "Hello world"; @@ -48,8 +48,14 @@ const app = new Application(); app.use(async (ctx, next) => { ctx.response.headers.set("Access-Control-Allow-Origin", "*"); - ctx.response.headers.set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"); - ctx.response.headers.set("Access-Control-Allow-Headers", "Content-Type, Authorization"); + ctx.response.headers.set( + "Access-Control-Allow-Methods", + "GET, POST, PUT, DELETE, OPTIONS", + ); + ctx.response.headers.set( + "Access-Control-Allow-Headers", + "Content-Type, Authorization", + ); if (ctx.request.method === "OPTIONS") { ctx.response.status = 204; return; diff --git a/src/data/db.ts b/src/data/db.ts index a947349..9de9964 100644 --- a/src/data/db.ts +++ b/src/data/db.ts @@ -57,8 +57,22 @@ export const deleteNote = ({ did, rkey }: { did: string; rkey: string }) => { db.query("DELETE FROM note WHERE did = ? AND rkey = ?", [did, rkey]); }; +export const getCursor = (): string | undefined => { + const rows = db.query<[string]>( + "SELECT value FROM state WHERE key = 'cursor'", + ); + return rows[0]?.[0]; +}; + +export const saveCursor = (cursor: number) => { + db.query( + "INSERT OR REPLACE INTO state (key, value) VALUES ('cursor', ?)", + [String(cursor)], + ); +}; + export const upsertNote = (note: Note) => { - const now = new Date().toISOString() + const now = new Date().toISOString(); db.query( ` diff --git a/src/migrations/init.ts b/src/migrations/init.ts index 40f240b..7dfea35 100644 --- a/src/migrations/init.ts +++ b/src/migrations/init.ts @@ -11,4 +11,11 @@ db.execute(` ); `); +db.execute(` + CREATE TABLE IF NOT EXISTS state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); +`); + db.close();