Compare commits

...

7 Commits

Author SHA1 Message Date
Julien Calixte
6f4d8d3b56 chore: add dockerignore for scripts, local artifacts, and env files
The image previously inherited everything from a `COPY . .`, including
.env (secrets), local notes.db copies, and admin scripts that should
not run in prod containers.
2026-05-05 15:04:43 +02:00
Julien Calixte
355fc45316 refactor(scripts): switch webhooks:all to api fetch
Hits GET /admin/webhooks instead of opening the local SQLite directly,
so the task can be run from a developer laptop without ssh or file
access to the server. Drops the FFI/read/write task permissions in
favour of net/env.
2026-05-05 14:07:31 +02:00
Julien Calixte
1c160b6c53 refactor(scripts): extract atproto session helpers to shared module 2026-05-05 14:07:26 +02:00
Julien Calixte
34faa10be2 feat(webhooks): add admin endpoint to list every subscription
Adds GET /admin/webhooks gated by an ADMIN_DIDS env-var allowlist of
verified AT Proto DIDs. Fail-closed: if ADMIN_DIDS is unset, the route
always returns 403 — no accidental exposure on deploys that forget it.
2026-05-05 14:07:22 +02:00
Julien Calixte
c00f3d631c chore(scripts): add admin task to list every webhook
Direct SQLite read with no DID filter — complements the API-backed
`webhooks list`, which is scoped per DID.
2026-05-05 14:00:10 +02:00
Julien Calixte
911d062423 chore(jetstream): log bulk-create record count on flush 2026-05-05 13:46:00 +02:00
Julien Calixte
8055060af3 fix: increase create debounce time 2026-05-05 12:48:15 +02:00
8 changed files with 235 additions and 78 deletions

25
.dockerignore Normal file
View File

@@ -0,0 +1,25 @@
# version control / editor / OS
.git/
.gitignore
.github/
.idea/
.vscode/
.DS_Store
# local SQLite + sidecars (DB lives at /data in the container)
*.db
*.db-shm
*.db-wal
*.db-journal
remote-db/
# secrets — provide via the orchestrator's env config, not baked into the image
.env
.env.*
# admin / dev-only scripts (run locally, not in prod containers)
scripts/
# logs and caches
*.log
.cache/

View File

@@ -5,7 +5,8 @@
"server": "deno run --watch --allow-net --allow-read --allow-write --allow-env --allow-ffi --unstable-ffi server.ts", "server": "deno run --watch --allow-net --allow-read --allow-write --allow-env --allow-ffi --unstable-ffi server.ts",
"server:prod": "deno run --allow-net --allow-read --allow-write --allow-env --allow-ffi --unstable-ffi server.ts", "server:prod": "deno run --allow-net --allow-read --allow-write --allow-env --allow-ffi --unstable-ffi server.ts",
"migrate": "deno run --allow-net --allow-read --allow-write --allow-env --allow-ffi --unstable-ffi src/migrations/init.ts", "migrate": "deno run --allow-net --allow-read --allow-write --allow-env --allow-ffi --unstable-ffi src/migrations/init.ts",
"webhooks": "deno run --allow-net --allow-env scripts/manage-webhooks.ts" "webhooks": "deno run --allow-net --allow-env scripts/manage-webhooks.ts",
"webhooks:all": "deno run --allow-net --allow-env scripts/list-all-webhooks.ts"
}, },
"imports": { "imports": {
"@db/sqlite": "jsr:@db/sqlite@^0.13.0", "@db/sqlite": "jsr:@db/sqlite@^0.13.0",

View File

@@ -59,7 +59,7 @@ const fireWebhooks = async (
await dispatchAll(webhooks, payload, `${verb} ${did}`); await dispatchAll(webhooks, payload, `${verb} ${did}`);
}; };
const BULK_CREATE_DEBOUNCE_MS = 400; const BULK_CREATE_DEBOUNCE_MS = 500;
type BulkBuffer = { type BulkBuffer = {
records: Record<string, unknown>[]; records: Record<string, unknown>[];
@@ -72,6 +72,7 @@ const flushBulkCreate = async (did: string): Promise<void> => {
if (!buffer) return; if (!buffer) return;
bulkBuffers.delete(did); bulkBuffers.delete(did);
const webhooks = getWebhooksByDidAndVerb(did, "bulk-create"); const webhooks = getWebhooksByDidAndVerb(did, "bulk-create");
log(`[jetstream] bulk-create ${did}: ${buffer.records.length} record(s)`);
await dispatchAll( await dispatchAll(
webhooks, webhooks,
{ event: "bulk-create", did, records: buffer.records }, { event: "bulk-create", did, records: buffer.records },
@@ -98,10 +99,7 @@ const queueBulkCreate = (
} }
bulkBuffers.set(did, { bulkBuffers.set(did, {
records: [record], records: [record],
timer: setTimeout( timer: setTimeout(() => flushBulkCreate(did), BULK_CREATE_DEBOUNCE_MS),
() => flushBulkCreate(did),
BULK_CREATE_DEBOUNCE_MS,
),
}); });
}; };
@@ -116,7 +114,10 @@ const jetstream = new Jetstream({
jetstream.onCreate("space.remanso.note", async (event) => { jetstream.onCreate("space.remanso.note", async (event) => {
try { try {
const { did, commit: { rkey, record } } = event; const {
did,
commit: { rkey, record },
} = event;
log(`[jetstream] creating ${did}/${rkey}...`); log(`[jetstream] creating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">; const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note }); upsertNote({ did, rkey, ...note });
@@ -130,7 +131,10 @@ jetstream.onCreate("space.remanso.note", async (event) => {
jetstream.onUpdate("space.remanso.note", async (event) => { jetstream.onUpdate("space.remanso.note", async (event) => {
try { try {
const { did, commit: { rkey, record } } = event; const {
did,
commit: { rkey, record },
} = event;
log(`[jetstream] updating ${did}/${rkey}...`); log(`[jetstream] updating ${did}/${rkey}...`);
const note = record as unknown as Omit<Note, "did" | "rkey">; const note = record as unknown as Omit<Note, "did" | "rkey">;
upsertNote({ did, rkey, ...note }); upsertNote({ did, rkey, ...note });
@@ -145,7 +149,10 @@ jetstream.onUpdate("space.remanso.note", async (event) => {
jetstream.onDelete("space.remanso.note", async (event) => { jetstream.onDelete("space.remanso.note", async (event) => {
try { try {
const { did, commit: { rkey } } = event; const {
did,
commit: { rkey },
} = event;
log(`[jetstream] deleting ${did}/${rkey}...`); log(`[jetstream] deleting ${did}/${rkey}...`);
deleteNote({ did, rkey }); deleteNote({ did, rkey });
log(`[jetstream] delete ${did}/${rkey}`); log(`[jetstream] delete ${did}/${rkey}`);

View File

@@ -0,0 +1,97 @@
// Shared helpers for admin scripts that authenticate against the Remanso API
// using an AT Protocol session. The handle is resolved to a DID, the DID is
// resolved to a PDS, and a session is created against that PDS — yielding an
// access JWT the API can verify with `authenticateRequest`.
type ResolveHandleResponse = { did: string };
type DidDocument = {
service?: { id: string; serviceEndpoint: string }[];
};
export type CreateSessionResponse = { did: string; accessJwt: string };
export const parseArgs = (args: string[]): Record<string, string> => {
const out: Record<string, string> = {};
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (!arg.startsWith("--")) continue;
const key = arg.slice(2);
const next = args[i + 1];
if (next === undefined || next.startsWith("--")) {
out[key] = "true";
} else {
out[key] = next;
i++;
}
}
return out;
};
export const resolveHandleToDid = async (handle: string): Promise<string> => {
const url =
`https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${
encodeURIComponent(handle)
}`;
const res = await fetch(url);
if (!res.ok) {
throw new Error(
`resolveHandle failed for ${handle} (${res.status}): ${await res.text()}`,
);
}
const { did } = await res.json() as ResolveHandleResponse;
return did;
};
export const resolveDidToPds = async (did: string): Promise<string> => {
if (!did.startsWith("did:plc:")) {
throw new Error(
`Unsupported DID method (server only handles did:plc): ${did}`,
);
}
const res = await fetch(`https://plc.directory/${did}`);
if (!res.ok) {
throw new Error(`plc.directory lookup failed (${res.status})`);
}
const doc = await res.json() as DidDocument;
const pds = doc.service?.find((s) => s.id === "#atproto_pds");
if (!pds) throw new Error("No #atproto_pds service in DID document");
return pds.serviceEndpoint;
};
export const createSession = async (
pds: string,
identifier: string,
password: string,
): Promise<CreateSessionResponse> => {
const res = await fetch(`${pds}/xrpc/com.atproto.server.createSession`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ identifier, password }),
});
if (!res.ok) {
throw new Error(
`createSession failed (${res.status}): ${await res.text()}`,
);
}
return await res.json();
};
// Convenience: pull credentials from flags or env, resolve the full session
// chain in one call. Throws if required inputs are missing.
export const sessionFromFlagsOrEnv = async (
flags: Record<string, string>,
): Promise<{ session: CreateSessionResponse; api: string }> => {
const handle = flags.handle ?? Deno.env.get("ATPROTO_HANDLE");
const password = flags["app-password"] ??
Deno.env.get("ATPROTO_APP_PASSWORD");
const api = flags.api ?? Deno.env.get("REMANSO_API") ??
"https://api.remanso.space";
if (!handle) throw new Error("ATPROTO_HANDLE (or --handle) is required");
if (!password) {
throw new Error("ATPROTO_APP_PASSWORD (or --app-password) is required");
}
const did = await resolveHandleToDid(handle);
const pds = await resolveDidToPds(did);
console.log(`[resolve] ${handle}${did} via ${pds}`);
const session = await createSession(pds, handle, password);
return { session, api };
};

View File

@@ -0,0 +1,52 @@
// Admin: list every webhook subscription via the Remanso API. Requires the
// authenticated DID to be in the server-side ADMIN_DIDS allowlist.
//
// deno task webhooks:all
//
// Inputs (env or flag, env preferred):
// ATPROTO_HANDLE / --handle e.g. alice.eurosky.social
// ATPROTO_APP_PASSWORD / --app-password app password (NOT your account password)
// REMANSO_API / --api default: https://api.remanso.space
const HELP = `
Usage:
deno task webhooks:all
Inputs (env or flag, env preferred):
ATPROTO_HANDLE / --handle your AT Protocol handle
ATPROTO_APP_PASSWORD / --app-password app password (NOT your account password)
REMANSO_API / --api default: https://api.remanso.space
`;
import { parseArgs, sessionFromFlagsOrEnv } from "./_atproto-session.ts";
const die = (msg: string): never => {
console.error(`error: ${msg}`);
console.error(HELP);
Deno.exit(1);
};
const main = async () => {
const flags = parseArgs(Deno.args);
if (flags.help === "true" || flags.h === "true") {
console.log(HELP);
Deno.exit(0);
}
const { session, api } = await sessionFromFlagsOrEnv(flags).catch(
(e: Error) => die(e.message),
);
const res = await fetch(`${api}/admin/webhooks`, {
headers: { "Authorization": `Bearer ${session.accessJwt}` },
});
if (!res.ok) {
console.error(`list-all failed (${res.status}): ${await res.text()}`);
Deno.exit(1);
}
const rows = await res.json();
console.log(JSON.stringify(rows, null, 2));
console.error(`[done] ${rows.length} subscription(s) at ${api}`);
};
await main();

View File

@@ -32,28 +32,12 @@ Inputs (env or flag, env preferred):
Your PDS is resolved automatically from the handle. Your PDS is resolved automatically from the handle.
`; `;
type ResolveHandleResponse = { did: string }; import {
type DidDocument = { createSession,
service?: { id: string; serviceEndpoint: string }[]; parseArgs,
}; resolveDidToPds,
type CreateSessionResponse = { did: string; accessJwt: string }; resolveHandleToDid,
} from "./_atproto-session.ts";
const parseArgs = (args: string[]): Record<string, string> => {
const out: Record<string, string> = {};
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (!arg.startsWith("--")) continue;
const key = arg.slice(2);
const next = args[i + 1];
if (next === undefined || next.startsWith("--")) {
out[key] = "true";
} else {
out[key] = next;
i++;
}
}
return out;
};
const die = (msg: string): never => { const die = (msg: string): never => {
console.error(`error: ${msg}`); console.error(`error: ${msg}`);
@@ -61,53 +45,6 @@ const die = (msg: string): never => {
Deno.exit(1); Deno.exit(1);
}; };
const resolveHandleToDid = async (handle: string): Promise<string> => {
const url =
`https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${
encodeURIComponent(handle)
}`;
const res = await fetch(url);
if (!res.ok) {
throw new Error(
`resolveHandle failed for ${handle} (${res.status}): ${await res.text()}`,
);
}
const { did } = await res.json() as ResolveHandleResponse;
return did;
};
const resolveDidToPds = async (did: string): Promise<string> => {
if (!did.startsWith("did:plc:")) {
throw new Error(`Unsupported DID method (server only handles did:plc): ${did}`);
}
const res = await fetch(`https://plc.directory/${did}`);
if (!res.ok) {
throw new Error(`plc.directory lookup failed (${res.status})`);
}
const doc = await res.json() as DidDocument;
const pds = doc.service?.find((s) => s.id === "#atproto_pds");
if (!pds) throw new Error("No #atproto_pds service in DID document");
return pds.serviceEndpoint;
};
const createSession = async (
pds: string,
identifier: string,
password: string,
): Promise<CreateSessionResponse> => {
const res = await fetch(`${pds}/xrpc/com.atproto.server.createSession`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ identifier, password }),
});
if (!res.ok) {
throw new Error(
`createSession failed (${res.status}): ${await res.text()}`,
);
}
return await res.json();
};
const main = async () => { const main = async () => {
const [command, ...rest] = Deno.args; const [command, ...rest] = Deno.args;
if (!command || command === "--help" || command === "-h") { if (!command || command === "--help" || command === "-h") {

View File

@@ -6,6 +6,7 @@ import {
getNotes, getNotes,
getNotesByDid, getNotesByDid,
getNotesByDids, getNotesByDids,
listAllWebhooks,
listWebhooksByDid, listWebhooksByDid,
type WebhookVerb, type WebhookVerb,
} from "./src/data/db.ts"; } from "./src/data/db.ts";
@@ -39,6 +40,32 @@ const requireDidOwnership = async (
return true; return true;
}; };
const ADMIN_DIDS = new Set(
(Deno.env.get("ADMIN_DIDS") ?? "")
.split(",")
.map((d) => d.trim())
.filter(Boolean),
);
const requireAdmin = async (ctx: AuthCtx): Promise<boolean> => {
let verifiedDid: string;
try {
verifiedDid = await authenticateRequest(
ctx.request.headers.get("Authorization"),
);
} catch {
ctx.response.status = 401;
ctx.response.body = { error: "Unauthorized" };
return false;
}
if (!ADMIN_DIDS.has(verifiedDid)) {
ctx.response.status = 403;
ctx.response.body = { error: "Admin only" };
return false;
}
return true;
};
const router = new Router(); const router = new Router();
const PAGINATION = 20; const PAGINATION = 20;
@@ -117,6 +144,11 @@ router.post("/notes/feed", async (ctx) => {
const ALLOWED_VERBS = ["create", "delete", "bulk-create"] as const; const ALLOWED_VERBS = ["create", "delete", "bulk-create"] as const;
router.get("/admin/webhooks", async (ctx) => {
if (!(await requireAdmin(ctx))) return;
ctx.response.body = listAllWebhooks();
});
router.get("/:did/webhooks", async (ctx) => { router.get("/:did/webhooks", async (ctx) => {
const { did } = ctx.params; const { did } = ctx.params;
if (!(await requireDidOwnership(ctx, did))) return; if (!(await requireDidOwnership(ctx, did))) return;

View File

@@ -136,6 +136,12 @@ export const listWebhooksByDid = (
).all<Omit<WebhookSubscriptionRow, "token">>(did); ).all<Omit<WebhookSubscriptionRow, "token">>(did);
}; };
export const listAllWebhooks = (): Omit<WebhookSubscriptionRow, "token">[] => {
return db.prepare(
"SELECT id, did, method, url, verb FROM webhook_subscription ORDER BY did, id",
).all<Omit<WebhookSubscriptionRow, "token">>();
};
export const getWebhooksByDidAndVerb = ( export const getWebhooksByDidAndVerb = (
did: string, did: string,
verb: WebhookVerb, verb: WebhookVerb,