From d3240ddf374afdbb047dca84a92f21c27f4c2dc9 Mon Sep 17 00:00:00 2001 From: gitadmin Date: Sat, 2 May 2026 13:07:36 +0000 Subject: [PATCH] feat: add routes/corpus.ts --- routes/corpus.ts | 1158 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1158 insertions(+) create mode 100644 routes/corpus.ts diff --git a/routes/corpus.ts b/routes/corpus.ts new file mode 100644 index 0000000..0662a2d --- /dev/null +++ b/routes/corpus.ts @@ -0,0 +1,1158 @@ +/** + * Agentify Corpus Pipeline — WellSpr.ing ingestion side + * + * Naturologie Replit assembles and chunks the expert's corpus, uploads artifacts + * and embeddings to Vultr Object Storage (S3-compatible), writes a signed + * CorpusBundle manifest JSON, then calls POST /api/agentify/corpus/notify to + * hand off to WellSpr.ing. + * + * Storage: Vultr Object Storage (ewr1.vultrobjects.com) — same provider already + * used for platform DB backups. S3-compatible API, independent of AWS and Cloudflare. + * + * Public endpoints: + * GET /api/agentify/corpus/schema — CorpusBundle JSON schema + field docs + * GET /api/agentify/corpus/storage — storage convention for Naturologie + * GET /api/agentify/corpus/:expertSlug — list bundles for an expert + * GET /api/agentify/corpus/:expertSlug/:bundleId — single bundle detail + * + * Naturologie-facing endpoints: + * POST /api/agentify/corpus/notify — notify WellSpr.ing that a bundle is ready + * PATCH /api/agentify/corpus/:bundleId/status — admin-only status update + * + * Storage layout (Vultr Object Storage): + * Bucket: agentify-corpus (separate from wellspring-backups) + * corpora/{expert_slug}/v{corpus_version}/manifest.json ← CorpusBundle manifest + * corpora/{expert_slug}/v{corpus_version}/artifacts/ ← raw PDFs / HTML + * corpora/{expert_slug}/v{corpus_version}/chunks/ ← chunk NDJSON index + * corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson ← one embedding per line + */ + +import type { Express, Request, Response } from "express"; +import { pool } from "../db"; +import crypto from "crypto"; +import fs from "fs"; +import path from "path"; +import multer from "multer"; +import { S3Client, PutObjectCommand, HeadObjectCommand, CreateBucketCommand, HeadBucketCommand, GetObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3"; +import { Upload } from "@aws-sdk/lib-storage"; + +const ADMIN_KEY = process.env.ADMIN_KEY || "b0db7a87384fc814b0f46ea7bdc6ab6a81152be5b098718b"; + +// Vultr Object Storage — S3-compatible, same provider as platform DB backups +const S3_HOSTNAME = process.env.VULTR_S3_HOSTNAME || "ewr1.vultrobjects.com"; +const S3_BUCKET = process.env.CORPUS_S3_BUCKET || "agentify-corpus"; +const S3_ENDPOINT = `https://${S3_HOSTNAME}`; + +// Keep the old names as aliases so existing code still compiles +const R2_BUCKET = S3_BUCKET; +const R2_ENDPOINT = S3_ENDPOINT; + +// ── Partner token validation ─────────────────────────────────────────────────── +async function isPartnerToken(token: string | undefined): Promise { + if (!token) return false; + try { + const r = await pool.query( + `SELECT 1 FROM partner_tokens WHERE token = $1 AND is_active = TRUE LIMIT 1`, [token] + ); + return r.rows.length > 0; + } catch { return false; } +} + +// ── CorpusBundle schema definition (authoritative) ──────────────────────────── +const CORPUS_BUNDLE_SCHEMA = { + "$schema": "https://wellspr.ing/protocols/agentify/corpus-bundle/v1.0", + "@type": "CorpusBundleSchema", + "schema_version": "1.0", + "description": "CorpusBundle is the handoff artifact Naturologie writes to R2 and notifies WellSpr.ing about. WellSpr.ing reads this manifest to verify integrity, then runs Stage 5 framework distillation.", + + "fields": { + "schema_version": { "type": "string", "const": "1.0", "required": true }, + "bundle_id": { + "type": "string", "format": "uuid", "required": true, + "description": "UUIDv4 — unique per ingestion run. If re-ingesting, use a new UUID." + }, + "expert_slug": { + "type": "string", "required": true, + "description": "Matches the slug in wellspr.ing/api/agentify/experts/:slug. For Sniderman: 'sniderman'." + }, + "attestation_uri": { + "type": "string", "format": "uri", "required": true, + "description": "Canonical attestation URI. For Sniderman v0.1: 'https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json'. WellSpr.ing will verify this URL is reachable and signature is valid." + }, + "attestation_version": { + "type": "string", "required": true, + "description": "Version string matching the attestation URI. E.g. 'v0.1'." + }, + "corpus_version": { + "type": "string", "required": true, + "description": "Semver-style corpus version. First ingestion: '0.1.0'. After expert corrections mint attestation v0.2, increment to '0.2.0'." + }, + "assembled_by": { + "type": "string", "required": true, + "description": "Assembling system identifier. Use 'naturologie-replit'." + }, + "assembled_at": { + "type": "string", "format": "iso8601", "required": true, + "description": "ISO 8601 UTC timestamp of when the bundle was finalized." + }, + "embedding_model": { + "type": "string", "required": false, + "description": "Embedding model used, if the assembler generated embeddings. E.g. 'text-embedding-3-large'. Omit if WellSpr.ing should generate embeddings during distillation." + }, + "embedding_dimensions": { + "type": "integer", "required": false, + "description": "Dimension count of the embedding vectors. Required only if embedding_model is provided." + }, + "chunk_strategy": { + "type": "string", "enum": ["paragraph", "section", "sentence", "fixed-token"], + "required": true, + "description": "Chunking strategy used. 'paragraph' is preferred for published medical literature." + }, + "chunk_token_limit": { + "type": "integer", "required": false, + "description": "Max tokens per chunk if using 'fixed-token' strategy." + }, + "artifacts": { + "type": "array", "required": true, + "description": "List of source documents included in this corpus.", + "items": { + "artifact_id": { "type": "string", "description": "Short slug, e.g. 'sniderman-apob-2010'" }, + "title": { "type": "string" }, + "type": { "type": "string", "enum": ["paper", "book", "protocol", "interview", "transcript", "editorial"] }, + "source_url": { "type": "string", "format": "uri", "description": "Canonical DOI or URL." }, + "publication_date": { "type": "string", "description": "YYYY or YYYY-MM or YYYY-MM-DD." }, + "sha256": { "type": "string", "description": "SHA-256 of the raw artifact file." }, + "r2_key": { "type": "string", "description": "Key within the agentify-corpus bucket. E.g. 'corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf'." } + } + }, + "chunk_count": { + "type": "integer", "required": true, + "description": "Total number of chunks. The actual chunk files live at their r2_keys in the chunks/ prefix." + }, + "chunks_index_r2_key": { + "type": "string", "required": true, + "description": "R2 key of the chunk index NDJSON file (one chunk JSON per line). E.g. 'corpora/sniderman/v0.1.0/chunks/index.ndjson'." + }, + "embeddings_r2_key": { + "type": "string", "required": false, + "description": "Key of the embeddings NDJSON file (one {chunk_id, embedding: float[]} per line). Omit if WellSpr.ing should generate embeddings. E.g. 'corpora/sniderman/v0.1.0/embeddings.ndjson'." + }, + "manifest_sha256": { + "type": "string", "required": true, + "description": "SHA-256 of this manifest file with the manifest_sha256 field set to empty string. WellSpr.ing recomputes this to verify integrity." + }, + "provenance_signature": { + "type": "string", "required": false, + "description": "Optional Ed25519 signature over manifest_sha256 using Naturologie's signing key. If provided, WellSpr.ing stores it as part of the provenance chain." + } + }, + + "chunk_item_schema": { + "description": "Each line in chunks/index.ndjson must be a JSON object with these fields:", + "chunk_id": "string — unique within the bundle, e.g. 'sniderman-apob-2010-p003'", + "artifact_id": "string — matches an artifact_id in the artifacts array", + "text": "string — the raw chunk text", + "sha256": "string — SHA-256 of the chunk text", + "token_count": "integer — token count using the chosen embedding model's tokenizer", + "scopes": "string[] — SGS scope strings this chunk is most relevant to", + "r2_key": "string — optional, if chunk is also stored as a standalone file" + }, + + "example": { + "schema_version": "1.0", + "bundle_id": "550e8400-e29b-41d4-a716-446655440000", + "expert_slug": "sniderman", + "attestation_uri": "https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json", + "attestation_version": "v0.1", + "corpus_version": "0.1.0", + "assembled_by": "naturologie-replit", + "assembled_at": "2026-04-22T00:00:00Z", + "embedding_model": "text-embedding-3-large", + "embedding_dimensions": 3072, + "chunk_strategy": "paragraph", + "artifacts": [ + { + "artifact_id": "sniderman-apob-2010", + "title": "Why LDL-C Should Not Be the Primary Measure for Statin Therapy", + "type": "paper", + "source_url": "https://doi.org/10.1016/j.jacc.2010.03.030", + "publication_date": "2010-06", + "sha256": "abc123...", + "r2_key": "corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf" + } + ], + "chunk_count": 847, + "chunks_index_r2_key": "corpora/sniderman/v0.1.0/chunks/index.ndjson", + "embeddings_r2_key": "corpora/sniderman/v0.1.0/embeddings.ndjson", + "manifest_sha256": "def456...", + } +}; + +const STORAGE_CONVENTION = { + "@type": "CorpusBundleStorageConvention", + "provider": "Vultr Object Storage", + "provider_note": "S3-compatible API. Independent of AWS and Cloudflare. Same provider used for WellSpr.ing platform DB backups.", + "bucket": S3_BUCKET, + "endpoint": S3_ENDPOINT, + "hostname": S3_HOSTNAME, + "region": "ewr1", + "sdk_note": "Use any S3-compatible SDK (e.g. @aws-sdk/client-s3) with forcePathStyle: true and the Vultr endpoint.", + "credentials": { + "note": "Request VULTR_S3_ACCESS_KEY and VULTR_S3_SECRET_KEY from WellSpr.ing team. These are the same credentials used for platform backups.", + "access_key_env": "VULTR_S3_ACCESS_KEY", + "secret_key_env": "VULTR_S3_SECRET_KEY", + }, + "path_convention": { + "manifest": "corpora/{expert_slug}/v{corpus_version}/manifest.json", + "artifacts": "corpora/{expert_slug}/v{corpus_version}/artifacts/{artifact_id}.{ext}", + "chunks_index": "corpora/{expert_slug}/v{corpus_version}/chunks/index.ndjson", + "embeddings": "corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson", + }, + "sniderman_v0_1_0_paths": { + "manifest": "corpora/sniderman/v0.1.0/manifest.json", + "artifacts_prefix": "corpora/sniderman/v0.1.0/artifacts/", + "chunks_index": "corpora/sniderman/v0.1.0/chunks/index.ndjson", + "embeddings": "corpora/sniderman/v0.1.0/embeddings.ndjson", + }, + "upload_order": [ + "1. Upload all artifact files to artifacts/ prefix", + "2. Upload chunks/index.ndjson", + "3. Upload embeddings.ndjson", + "4. Write manifest.json with manifest_sha256 field computed last", + "5. Upload manifest.json", + "6. POST manifest to /api/agentify/corpus/notify" + ], + "content_types": { + "manifest.json": "application/json", + "*.ndjson": "application/x-ndjson", + "*.pdf": "application/pdf", + "*.html": "text/html", + } +}; + +export function registerCorpusRoutes(app: Express) { + // Ensure corpus_bundles table exists + pool.query(` + CREATE TABLE IF NOT EXISTS agentify_corpus_bundles ( + id SERIAL PRIMARY KEY, + bundle_id TEXT NOT NULL UNIQUE, + expert_slug TEXT NOT NULL, + corpus_version TEXT NOT NULL, + attestation_uri TEXT NOT NULL, + attestation_version TEXT NOT NULL, + assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit', + assembled_at TIMESTAMPTZ, + embedding_model TEXT, + embedding_dimensions INTEGER, + chunk_strategy TEXT, + chunk_count INTEGER, + artifact_count INTEGER, + manifest_r2_key TEXT, + manifest_sha256 TEXT, + provenance_signature TEXT, + status TEXT NOT NULL DEFAULT 'received', + distillation_notes TEXT, + received_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() + ) + `).catch(err => console.error("[corpus] table init error:", err)); + + // ── GET /api/agentify/corpus/schema ───────────────────────────────────────── + app.get("/api/agentify/corpus/schema", (_req: Request, res: Response) => { + res.setHeader("Content-Type", "application/json"); + res.setHeader("Access-Control-Allow-Origin", "*"); + res.json(CORPUS_BUNDLE_SCHEMA); + }); + + // ── GET /api/agentify/corpus/storage ──────────────────────────────────────── + // Returns storage conventions (internal use only — not exposed to assemblers). + app.get("/api/agentify/corpus/storage", (_req: Request, res: Response) => { + res.setHeader("Content-Type", "application/json"); + res.setHeader("Access-Control-Allow-Origin", "*"); + res.json(STORAGE_CONVENTION); + }); + + // ── GET /api/agentify/corpus/assembler-skill ───────────────────────────────── + // Public. Returns the canonical corpus-assembler skill document as Markdown. + // Partner Replits fetch this URL to install the skill in their agent context. + app.get("/api/agentify/corpus/assembler-skill", (_req: Request, res: Response) => { + const skillPath = path.resolve(".agents/skills/agentify-corpus/SKILL.md"); + try { + const content = fs.readFileSync(skillPath, "utf-8"); + res.setHeader("Content-Type", "text/markdown; charset=utf-8"); + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Cache-Control", "public, max-age=3600"); + res.send(content); + } catch { + res.status(404).json({ error: "skill_not_found", path: skillPath }); + } + }); + + // ── POST /api/agentify/corpus/notify ───────────────────────────────────────── + // Naturologie calls this after uploading all files to Vultr and writing manifest.json. + // Body: the full CorpusBundle manifest JSON (or a subset with the key fields). + // WellSpr.ing stores the bundle metadata, verifies the attestation_uri is reachable, + // and queues the bundle for Stage 5 distillation. + app.post("/api/agentify/corpus/notify", async (req: Request, res: Response) => { + const { + bundle_id, expert_slug, corpus_version, attestation_uri, + attestation_version, assembled_by = "naturologie-replit", assembled_at, + embedding_model, embedding_dimensions, chunk_strategy, chunk_count, + artifacts, chunks_index_r2_key, embeddings_r2_key, + manifest_sha256, provenance_signature, + } = req.body; + + if (!bundle_id || !expert_slug || !corpus_version || !attestation_uri) { + return res.status(400).json({ + error: "missing_required_fields", + required: ["bundle_id", "expert_slug", "corpus_version", "attestation_uri"], + }); + } + + // Verify the expert exists in the agentify registry + let expert: { slug: string; attestation_uri: string } | null = null; + try { + const expertRow = await pool.query( + `SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug] + ); + if (!expertRow.rows.length) { + return res.status(404).json({ + error: "expert_not_found", + detail: `No expert registered with slug '${expert_slug}'. Register via POST /api/agentify/experts/register first.`, + }); + } + expert = expertRow.rows[0]; + } catch (err) { + console.error("[corpus] expert lookup error:", err); + return res.status(500).json({ error: "database_error" }); + } + + // ── Deduplication gate ────────────────────────────────────────────────── + try { + const existing = await pool.query( + `SELECT bundle_id, manifest_sha256, chunk_count, assembled_by, received_at + FROM agentify_corpus_bundles WHERE expert_slug = $1`, + [expert_slug] + ); + + for (const row of existing.rows) { + // Exact duplicate: same manifest hash + if (manifest_sha256 && row.manifest_sha256 && manifest_sha256 === row.manifest_sha256) { + return res.status(409).json({ + error: "duplicate_bundle", + detail: `A bundle with identical manifest_sha256 was already received for '${expert_slug}' on ${new Date(row.received_at).toISOString().split("T")[0]}.`, + existing_bundle_id: row.bundle_id, + }); + } + + // Near-duplicate: chunk_count within 2% of an existing bundle from a different assembler + if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) { + const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count; + if (ratio < 0.02) { + console.log(`[corpus] multi-source near-dup: ${assembled_by} vs ${row.assembled_by} (${chunk_count} vs ${row.chunk_count} chunks) — flagging for curation`); + // Allow but flag for review — update status after insert below + } + } + } + } catch (dedupErr) { + console.warn("[corpus] dedup check error (non-fatal):", dedupErr); + } + + // Compute the manifest R2 key from convention + const manifestR2Key = `corpora/${expert_slug}/v${corpus_version}/manifest.json`; + const artifactCount = Array.isArray(artifacts) ? artifacts.length : null; + + // Detect multi-source near-dup (different assembler, similar chunk count) + let flaggedForCuration = false; + try { + const existing = await pool.query( + `SELECT assembled_by, chunk_count FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expert_slug] + ); + for (const row of existing.rows) { + if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) { + const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count; + if (ratio < 0.02) { flaggedForCuration = true; break; } + } + } + } catch { /* non-fatal */ } + + const insertStatus = flaggedForCuration ? "pending_review" : "received"; + + try { + await pool.query( + `INSERT INTO agentify_corpus_bundles + (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, + assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, + chunk_count, artifact_count, manifest_r2_key, manifest_sha256, provenance_signature, + chunks_s3_key, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17) + ON CONFLICT (bundle_id) DO UPDATE SET + corpus_version = EXCLUDED.corpus_version, + assembled_at = EXCLUDED.assembled_at, + embedding_model = EXCLUDED.embedding_model, + embedding_dimensions= EXCLUDED.embedding_dimensions, + chunk_strategy = EXCLUDED.chunk_strategy, + chunk_count = EXCLUDED.chunk_count, + artifact_count = EXCLUDED.artifact_count, + manifest_r2_key = EXCLUDED.manifest_r2_key, + manifest_sha256 = EXCLUDED.manifest_sha256, + provenance_signature= EXCLUDED.provenance_signature, + chunks_s3_key = EXCLUDED.chunks_s3_key, + status = $17, + updated_at = NOW()`, + [ + bundle_id, expert_slug, corpus_version, attestation_uri, + attestation_version || "v0.1", assembled_by, + assembled_at ? new Date(assembled_at) : new Date(), + embedding_model || null, embedding_dimensions || null, + chunk_strategy || null, chunk_count || null, artifactCount, + manifestR2Key, manifest_sha256 || null, provenance_signature || null, + chunks_index_r2_key || null, insertStatus, + ] + ); + } catch (dbErr) { + console.error("[corpus] bundle insert error:", dbErr); + return res.status(500).json({ error: "database_error" }); + } + + console.log(`[corpus] ${insertStatus} bundle ${bundle_id} for ${expert_slug} v${corpus_version} (${chunk_count} chunks, ${artifactCount} artifacts)${flaggedForCuration ? " — flagged for curation (multi-source near-dup)" : ""}`); + + res.status(201).json({ + "@type": "CorpusBundleReceived", + "bundle_id": bundle_id, + "expert_slug": expert_slug, + "corpus_version": corpus_version, + "manifest_r2_key": manifestR2Key, + "status": insertStatus, + "received_at": new Date().toISOString(), + "next_step": flaggedForCuration + ? "Bundle flagged for curation — a near-duplicate from another assembler exists. WellSpr.ing will review before distillation." + : "WellSpr.ing will verify manifest integrity and queue for Stage 5 framework distillation.", + "status_endpoint": `https://wellspr.ing/api/agentify/experts/${expert_slug}/status`, + "attestation_verified_against": expert.attestation_uri, + "note": chunk_count + ? `Bundle logged: ${chunk_count} chunks${artifactCount ? `, ${artifactCount} artifacts` : ''}. ${flaggedForCuration ? "Pending curation review." : "Awaiting distillation."}` + : "Bundle logged. Include chunk_count and artifact_count in future submissions for full tracking.", + }); + }); + + // ── GET /api/agentify/corpus/:expertSlug ──────────────────────────────────── + app.get("/api/agentify/corpus/:expertSlug", async (req: Request, res: Response) => { + try { + const rows = await pool.query( + `SELECT bundle_id, expert_slug, corpus_version, attestation_version, + assembled_by, assembled_at, embedding_model, embedding_dimensions, + chunk_strategy, chunk_count, artifact_count, + manifest_r2_key, manifest_sha256, status, received_at, updated_at, distillation_notes + FROM agentify_corpus_bundles + WHERE expert_slug = $1 + ORDER BY received_at DESC`, + [req.params.expertSlug] + ); + res.setHeader("Access-Control-Allow-Origin", "*"); + res.json({ + expert_slug: req.params.expertSlug, + bundle_count: rows.rows.length, + bundles: rows.rows, + r2_bucket: R2_BUCKET, + }); + } catch (err) { + console.error("[corpus] list error:", err); + res.status(500).json({ error: "internal_error" }); + } + }); + + // ── GET /api/agentify/corpus/:expertSlug/:bundleId ────────────────────────── + app.get("/api/agentify/corpus/:expertSlug/:bundleId", async (req: Request, res: Response) => { + try { + const rows = await pool.query( + `SELECT * FROM agentify_corpus_bundles + WHERE expert_slug = $1 AND bundle_id = $2`, + [req.params.expertSlug, req.params.bundleId] + ); + if (!rows.rows.length) return res.status(404).json({ error: "bundle_not_found" }); + const b = rows.rows[0]; + res.setHeader("Access-Control-Allow-Origin", "*"); + res.json({ + ...b, + r2_bucket: R2_BUCKET, + r2_endpoint: R2_ENDPOINT, + manifest_url: `${R2_ENDPOINT}/${R2_BUCKET}/${b.manifest_r2_key}`, + }); + } catch (err) { + console.error("[corpus] get bundle error:", err); + res.status(500).json({ error: "internal_error" }); + } + }); + + // ── PATCH /api/agentify/corpus/:bundleId/status ───────────────────────────── + // Admin-only. Used by WellSpr.ing team to update distillation status. + // status values: received | verifying | distilling | ready | failed + app.patch("/api/agentify/corpus/:bundleId/status", async (req: Request, res: Response) => { + const adminKey = req.headers["x-admin-key"] || req.body?.admin_key; + if (adminKey !== ADMIN_KEY) return res.status(403).json({ error: "forbidden" }); + + const { status, distillation_notes } = req.body; + const validStatuses = ["received", "verifying", "distilling", "ready", "failed"]; + if (!status || !validStatuses.includes(status)) { + return res.status(400).json({ error: "invalid_status", valid: validStatuses }); + } + + try { + const result = await pool.query( + `UPDATE agentify_corpus_bundles + SET status = $1, distillation_notes = $2, updated_at = NOW() + WHERE bundle_id = $3 + RETURNING bundle_id, expert_slug, corpus_version, status, updated_at`, + [status, distillation_notes || null, req.params.bundleId] + ); + if (!result.rows.length) return res.status(404).json({ error: "bundle_not_found" }); + res.json(result.rows[0]); + } catch (err) { + console.error("[corpus] status update error:", err); + res.status(500).json({ error: "internal_error" }); + } + }); + + // ── GET /api/agentify/corpus/staging/scan/:expertSlug ──────────────────────── + // Partner-token-gated. Lists ALL manifests found in S3 staging for this expert, + // cross-references with DB, and tells the caller which are registered vs orphaned. + // Orphaned = data in S3 staging but no DB record → can be recovered via /import. + app.get("/api/agentify/corpus/staging/scan/:expertSlug", async (req: Request, res: Response) => { + const token = (req.headers["x-partner-token"] as string) || req.query.token as string; + const adminKey = req.headers["x-admin-key"] as string; + const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token); + if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" }); + + const { expertSlug } = req.params; + + try { + // List all manifests in staging for this expert + const listed = await s3.send(new ListObjectsV2Command({ + Bucket: S3_BUCKET, + Prefix: `staging/${expertSlug}/`, + })); + + const manifests = (listed.Contents || []) + .filter(o => o.Key?.endsWith("manifest.json")) + .sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0)); + + if (!manifests.length) { + return res.json({ expert_slug: expertSlug, staging_bundles: [], orphaned_count: 0, message: "No staging manifests found in S3 for this expert." }); + } + + // Load each manifest and cross-reference with DB + const dbBundles = await pool.query( + `SELECT bundle_id, manifest_r2_key, status FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expertSlug] + ); + const dbKeys = new Set(dbBundles.rows.map((r: any) => r.manifest_r2_key)); + + const bundles = await Promise.all(manifests.map(async (obj) => { + let manifest: any = null; + try { + const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: obj.Key! })); + manifest = JSON.parse(await (raw.Body as any).transformToString()); + } catch { /* manifest unreadable */ } + + const inDb = dbKeys.has(obj.Key); + const dbRow = dbBundles.rows.find((r: any) => r.manifest_r2_key === obj.Key); + + return { + s3_key: obj.Key, + s3_size_bytes: obj.Size, + s3_last_modified: obj.LastModified, + session_uuid: obj.Key?.split("/")[3] || null, + bundle_id: manifest?.bundle_id || null, + corpus_version: manifest?.corpus_version || null, + chunk_count: manifest?.chunk_count || null, + artifact_count: manifest?.artifacts?.length || null, + assembled_by: manifest?.assembled_by || null, + assembled_at: manifest?.assembled_at || null, + in_db: inDb, + db_status: dbRow?.status || null, + status: inDb ? `registered (${dbRow?.status})` : "ORPHANED — not in DB, needs import", + recovery_action: inDb ? null : `POST /api/agentify/corpus/staging/import/${expertSlug} with { "manifest_s3_key": "${obj.Key}" }`, + }; + })); + + const orphaned = bundles.filter(b => !b.in_db); + res.setHeader("Access-Control-Allow-Origin", "*"); + res.json({ + expert_slug: expertSlug, + staging_bundles: bundles, + total_count: bundles.length, + orphaned_count: orphaned.length, + registered_count: bundles.length - orphaned.length, + message: orphaned.length > 0 + ? `${orphaned.length} bundle(s) are in S3 staging but not registered in DB. Call POST /api/agentify/corpus/staging/import/${expertSlug} to recover the most recent one.` + : "All staging bundles are registered in DB.", + }); + } catch (err: any) { + console.error("[corpus] staging scan error:", err); + res.status(500).json({ error: "internal_error", detail: err.message }); + } + }); + + // ── POST /api/agentify/corpus/staging/import/:expertSlug ───────────────────── + // Partner-token-gated. Reads a staging manifest from S3 and upserts it into DB. + // Idempotent — safe to call multiple times. Picks most recent manifest if no + // specific manifest_s3_key is provided. Used to recover after a silent finalize failure. + app.post("/api/agentify/corpus/staging/import/:expertSlug", async (req: Request, res: Response) => { + const token = (req.headers["x-partner-token"] as string) || req.body?.partner_token; + const adminKey = req.headers["x-admin-key"] as string; + const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token); + if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" }); + + const { expertSlug } = req.params; + let { manifest_s3_key } = req.body || {}; + + try { + // Auto-discover most recent manifest if not provided + if (!manifest_s3_key) { + const listed = await s3.send(new ListObjectsV2Command({ + Bucket: S3_BUCKET, Prefix: `staging/${expertSlug}/`, + })); + const latest = (listed.Contents || []) + .filter(o => o.Key?.endsWith("manifest.json")) + .sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0))[0]; + if (!latest?.Key) { + return res.status(404).json({ error: "no_staging_manifest_found", expert_slug: expertSlug }); + } + manifest_s3_key = latest.Key; + } + + // Read manifest + let manifest: any; + try { + const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: manifest_s3_key })); + manifest = JSON.parse(await (raw.Body as any).transformToString()); + } catch (e: any) { + return res.status(422).json({ error: "manifest_unreadable", detail: e.message, s3_key: manifest_s3_key }); + } + + // Discover chunks key + let chunksKey = manifest.chunks_index_s3_key || manifest.chunks_index_r2_key || null; + if (!chunksKey) { + const prefix = manifest_s3_key.replace("manifest.json", "chunks/"); + const listed = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix })); + chunksKey = listed.Contents?.find(o => o.Key?.endsWith("index.ndjson"))?.Key || null; + } + + const bundleId = manifest.bundle_id || crypto.randomUUID(); + const corpusVersion = manifest.corpus_version || "1.0.0"; + const artifactCount = Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null; + + // Upsert into DB — idempotent on bundle_id + await pool.query( + `INSERT INTO agentify_corpus_bundles + (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, + assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, + chunk_count, artifact_count, manifest_r2_key, manifest_sha256, chunks_s3_key, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,'received') + ON CONFLICT (bundle_id) DO UPDATE SET + corpus_version = EXCLUDED.corpus_version, + assembled_at = EXCLUDED.assembled_at, + chunk_count = EXCLUDED.chunk_count, + artifact_count = EXCLUDED.artifact_count, + manifest_r2_key = EXCLUDED.manifest_r2_key, + manifest_sha256 = EXCLUDED.manifest_sha256, + chunks_s3_key = EXCLUDED.chunks_s3_key, + updated_at = NOW()`, + [ + bundleId, expertSlug, corpusVersion, + manifest.attestation_uri || `https://wellspr.ing/vault/agents/${expertSlug}/attestation-v1.0.json`, + manifest.attestation_version || "v1.0", + manifest.assembled_by || "partner", + manifest.assembled_at ? new Date(manifest.assembled_at) : new Date(), + manifest.embedding_model || null, + manifest.embedding_dimensions || null, + manifest.chunk_strategy || null, + manifest.chunk_count || null, + artifactCount, + manifest_s3_key, + manifest.manifest_sha256 || null, + chunksKey, + ] + ); + + console.log(`[corpus] staging import: ${bundleId} for ${expertSlug} v${corpusVersion} — recovered from ${manifest_s3_key}`); + + res.status(200).json({ + "@type": "CorpusBundleImported", + bundle_id: bundleId, + expert_slug: expertSlug, + corpus_version: corpusVersion, + manifest_s3_key, + chunks_s3_key: chunksKey, + chunk_count: manifest.chunk_count || null, + artifact_count: artifactCount, + status: "received", + message: "Bundle imported from S3 staging into DB. Next: POST /api/agentify/experts/:slug/distill to embed chunks.", + distill_endpoint: `/api/agentify/experts/${expertSlug}/distill`, + }); + } catch (err: any) { + console.error("[corpus] staging import error:", err); + res.status(500).json({ error: "internal_error", detail: err.message }); + } + }); + + // ── Corpus Intake Proxy ─────────────────────────────────────────────────────── + // Naturologie uploads files HERE — WellSpr.ing validates and routes to Vultr. + // No S3 credentials needed by Naturologie. + + const s3 = new S3Client({ + region: "ewr1", + endpoint: S3_ENDPOINT, + credentials: { + accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "", + secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "", + }, + forcePathStyle: true, + }); + + // Ensure the corpus bucket exists on Vultr — create it if missing. + // NOTE: Vultr S3-compatible API doesn't accept a LocationConstraint in CreateBucket. + // When SDK region is non-us-east-1 it auto-adds LocationConstraint, which Vultr rejects. + // Solution: use a us-east-1 client for bucket creation only (endpoint still points to Vultr). + const s3BucketAdmin = new S3Client({ + region: "us-east-1", // prevents SDK from adding a LocationConstraint + endpoint: S3_ENDPOINT, + credentials: { + accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "", + secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "", + }, + forcePathStyle: true, + }); + + (async () => { + if (!process.env.VULTR_S3_ACCESS_KEY) { + console.warn("[corpus] VULTR_S3_ACCESS_KEY not set — S3 uploads will fail"); + return; + } + try { + await s3BucketAdmin.send(new HeadBucketCommand({ Bucket: S3_BUCKET })); + console.log(`[corpus] S3 bucket '${S3_BUCKET}' confirmed`); + } catch (err: any) { + const statusCode = err?.$metadata?.httpStatusCode || err?.statusCode; + const code = err?.name || err?.Code || err?.code || ""; + // 301 = wrong region/redirect, 403 = exists but no perms, 404/NoSuchBucket = truly missing + if (statusCode === 404 || code === "NotFound" || code === "NoSuchBucket" || code === "404") { + try { + await s3BucketAdmin.send(new CreateBucketCommand({ Bucket: S3_BUCKET })); + console.log(`[corpus] S3 bucket '${S3_BUCKET}' created successfully`); + } catch (createErr: any) { + console.error(`[corpus] Failed to create S3 bucket '${S3_BUCKET}':`, createErr?.message || createErr); + } + } else if (statusCode === 403) { + // Bucket exists, we just don't have ListBucket permission — that's fine + console.log(`[corpus] S3 bucket '${S3_BUCKET}' exists (403 on HEAD = access controlled but present)`); + } else { + console.error(`[corpus] S3 bucket check failed (code=${code}, status=${statusCode}):`, err?.message || err); + } + } + })(); + + const upload = multer({ + storage: multer.memoryStorage(), + limits: { fileSize: 200 * 1024 * 1024 }, // 200MB max per file + }); + + // Ensure intake_sessions table exists + pool.query(` + CREATE TABLE IF NOT EXISTS corpus_intake_sessions ( + id SERIAL PRIMARY KEY, + session_id TEXT NOT NULL UNIQUE, + expert_slug TEXT NOT NULL, + corpus_version TEXT NOT NULL, + assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit', + manifest_validated BOOLEAN DEFAULT FALSE, + files_uploaded JSONB DEFAULT '[]', + validation_errors JSONB DEFAULT '[]', + status TEXT NOT NULL DEFAULT 'uploading', + bundle_id TEXT, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() + ) + `).catch(err => console.error("[corpus-intake] table init error:", err)); + + // ── POST /api/agentify/corpus/intake/start ──────────────────────────────────── + // Naturologie calls this first to open an upload session. + // Returns a session_id to use in subsequent upload calls. + app.post("/api/agentify/corpus/intake/start", async (req: Request, res: Response) => { + try { + const { expert_slug, corpus_version, assembled_by = "naturologie-replit" } = req.body; + if (!expert_slug || !corpus_version) + return res.status(400).json({ error: "expert_slug and corpus_version are required" }); + + // Verify expert exists + const expertRow = await pool.query( + `SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug] + ); + if (!expertRow.rows.length) + return res.status(404).json({ error: "expert_not_found", detail: `No expert with slug '${expert_slug}'` }); + + const sessionId = crypto.randomBytes(16).toString("hex"); + const stagingPrefix = `staging/${expert_slug}/v${corpus_version}/${sessionId}`; + + await pool.query( + `INSERT INTO corpus_intake_sessions + (session_id, expert_slug, corpus_version, assembled_by, status) + VALUES ($1,$2,$3,$4,'uploading')`, + [sessionId, expert_slug, corpus_version, assembled_by] + ); + + return res.status(201).json({ + "@type": "CorpusIntakeSession", + session_id: sessionId, + expert_slug, + corpus_version, + assembled_by, + staging_prefix: stagingPrefix, + status: "uploading", + upload_endpoints: { + manifest: `POST /api/agentify/corpus/intake/${sessionId}/manifest`, + artifact: `POST /api/agentify/corpus/intake/${sessionId}/artifact (multipart field: file)`, + chunks: `POST /api/agentify/corpus/intake/${sessionId}/chunks (multipart field: file)`, + embeddings: `POST /api/agentify/corpus/intake/${sessionId}/embeddings (multipart field: file)`, + finalize: `POST /api/agentify/corpus/intake/${sessionId}/finalize`, + status: `GET /api/agentify/corpus/intake/${sessionId}`, + }, + schema_reference: "https://wellspr.ing/api/agentify/corpus/schema", + note: "Upload manifest last — it is validated against the files that have been staged.", + }); + } catch (err: any) { + console.error("[corpus-intake] start error:", err?.message || err); + return res.status(500).json({ error: "intake_start_failed", message: err?.message || "UnknownError" }); + } + }); + + // ── GET /api/agentify/corpus/intake/:sessionId ─────────────────────────────── + app.get("/api/agentify/corpus/intake/:sessionId", async (req: Request, res: Response) => { + const row = await pool.query( + `SELECT * FROM corpus_intake_sessions WHERE session_id = $1`, [req.params.sessionId] + ); + if (!row.rows.length) return res.status(404).json({ error: "session_not_found" }); + res.json(row.rows[0]); + }); + + async function uploadToVultr(key: string, buffer: Buffer, contentType: string): Promise { + const uploader = new Upload({ + client: s3, + params: { + Bucket: S3_BUCKET, + Key: key, + Body: buffer, + ContentType: contentType, + }, + }); + await uploader.done(); + return key; + } + + async function recordFileUploaded(sessionId: string, fileRecord: object) { + await pool.query( + `UPDATE corpus_intake_sessions + SET files_uploaded = files_uploaded || $1::jsonb, updated_at = NOW() + WHERE session_id = $2`, + [JSON.stringify([fileRecord]), sessionId] + ); + } + + // ── POST /api/agentify/corpus/intake/:sessionId/artifact ───────────────────── + app.post("/api/agentify/corpus/intake/:sessionId/artifact", + upload.single("file"), + async (req: Request, res: Response) => { + try { + const { sessionId } = req.params; + const { artifact_id, title, type: artifactType } = req.body; + + const sessionRow = await pool.query( + `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] + ); + if (!sessionRow.rows.length) + return res.status(404).json({ error: "session_not_found_or_not_active" }); + const s = sessionRow.rows[0]; + + if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" }); + if (!artifact_id) return res.status(400).json({ error: "artifact_id_required" }); + + const ext = req.file.originalname.split(".").pop() || "bin"; + const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/artifacts/${artifact_id}.${ext}`; + const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex"); + + await uploadToVultr(key, req.file.buffer, req.file.mimetype); + await recordFileUploaded(sessionId, { + type: "artifact", artifact_id, title, artifact_type: artifactType, + key, sha256, size: req.file.size, uploaded_at: new Date().toISOString(), + }); + + return res.json({ artifact_id, key, sha256, size: req.file.size, status: "staged" }); + } catch (err: any) { + console.error("[corpus-intake] artifact upload error:", err?.message || err); + return res.status(500).json({ error: "artifact_upload_failed", message: err?.message || "UnknownError" }); + } + } + ); + + // ── POST /api/agentify/corpus/intake/:sessionId/chunks ─────────────────────── + // Accepts both multipart/form-data (field: "file") and raw NDJSON/octet-stream body. + app.post("/api/agentify/corpus/intake/:sessionId/chunks", + (req: Request, res: Response, next: any) => { + const ct = (req.headers["content-type"] || "").toLowerCase(); + if (ct.includes("multipart/form-data")) { + upload.single("file")(req as any, res as any, next); + } else { + // Buffer raw body for application/x-ndjson, octet-stream, text/* etc. + const chunks: Buffer[] = []; + req.on("data", (chunk: Buffer) => chunks.push(chunk)); + req.on("end", () => { (req as any)._rawBody = Buffer.concat(chunks); next(); }); + req.on("error", next); + } + }, + async (req: Request, res: Response) => { + const { sessionId } = req.params; + try { + const sessionRow = await pool.query( + `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] + ); + if (!sessionRow.rows.length) + return res.status(404).json({ error: "session_not_found_or_not_active" }); + const s = sessionRow.rows[0]; + + // Resolve buffer: multipart file OR raw body + const buffer: Buffer | undefined = req.file?.buffer ?? (req as any)._rawBody; + if (!buffer || buffer.length === 0) + return res.status(400).json({ error: "no_file_uploaded", hint: "Send as multipart/form-data field 'file' or raw NDJSON body" }); + + // Basic validation: must be valid NDJSON + const text = buffer.toString("utf-8"); + const lines = text.trim().split("\n").filter(l => l.trim()); + const errors: string[] = []; + for (const [i, line] of lines.slice(0, 5).entries()) { + try { + const obj = JSON.parse(line); + if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`); + if (!obj.text) errors.push(`Line ${i+1}: missing text`); + if (!obj.artifact_id) errors.push(`Line ${i+1}: missing artifact_id`); + } catch { errors.push(`Line ${i+1}: invalid JSON`); } + } + if (errors.length) return res.status(400).json({ error: "invalid_chunks_ndjson", errors }); + const chunkCount = lines.length; + + const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/chunks/index.ndjson`; + const sha256 = crypto.createHash("sha256").update(buffer).digest("hex"); + await uploadToVultr(key, buffer, "application/x-ndjson"); + await recordFileUploaded(sessionId, { + type: "chunks", key, sha256, chunk_count: chunkCount, size: buffer.length, + uploaded_at: new Date().toISOString(), + }); + + return res.json({ key, sha256, chunk_count: chunkCount, size: buffer.length, status: "staged" }); + } catch (err: any) { + console.error("[CorpusChunks] Upload error:", err?.message || err); + return res.status(500).json({ error: "upload_failed", message: err?.message || "UnknownError" }); + } + } + ); + + // ── POST /api/agentify/corpus/intake/:sessionId/embeddings ─────────────────── + app.post("/api/agentify/corpus/intake/:sessionId/embeddings", + upload.single("file"), + async (req: Request, res: Response) => { + try { + const { sessionId } = req.params; + const sessionRow = await pool.query( + `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] + ); + if (!sessionRow.rows.length) + return res.status(404).json({ error: "session_not_found_or_not_active" }); + const s = sessionRow.rows[0]; + if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" }); + + const text = req.file.buffer.toString("utf-8"); + const lines = text.trim().split("\n").filter(l => l.trim()); + const errors: string[] = []; + for (const [i, line] of lines.slice(0, 3).entries()) { + try { + const obj = JSON.parse(line); + if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`); + if (!Array.isArray(obj.embedding)) errors.push(`Line ${i+1}: embedding must be an array`); + } catch { errors.push(`Line ${i+1}: invalid JSON`); } + } + if (errors.length) return res.status(400).json({ error: "invalid_embeddings_ndjson", errors }); + + const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/embeddings.ndjson`; + const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex"); + await uploadToVultr(key, req.file.buffer, "application/x-ndjson"); + await recordFileUploaded(sessionId, { + type: "embeddings", key, sha256, embedding_count: lines.length, size: req.file.size, + uploaded_at: new Date().toISOString(), + }); + + return res.json({ key, sha256, embedding_count: lines.length, size: req.file.size, status: "staged" }); + } catch (err: any) { + console.error("[corpus-intake] embeddings upload error:", err?.message || err); + return res.status(500).json({ error: "embeddings_upload_failed", message: err?.message || "UnknownError" }); + } + } + ); + + // ── POST /api/agentify/corpus/intake/:sessionId/manifest ───────────────────── + // Upload and validate the manifest JSON. Must be uploaded AFTER artifacts, + // chunks, and embeddings so validation can cross-reference them. + app.post("/api/agentify/corpus/intake/:sessionId/manifest", + upload.single("file"), + async (req: Request, res: Response) => { + try { + const { sessionId } = req.params; + const sessionRow = await pool.query( + `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] + ); + if (!sessionRow.rows.length) + return res.status(404).json({ error: "session_not_found_or_not_active" }); + const s = sessionRow.rows[0]; + + let manifest: Record; + try { + const body = req.file ? req.file.buffer.toString("utf-8") : JSON.stringify(req.body); + manifest = JSON.parse(body); + } catch { + return res.status(400).json({ error: "invalid_json", detail: "Manifest must be valid JSON" }); + } + + // Validate required fields. + // embedding_model, embedding_dimensions, and embeddings_r2_key are OPTIONAL — + // WellSpr.ing generates embeddings during distillation if not supplied by the assembler. + const required = ["schema_version","bundle_id","expert_slug","attestation_uri", + "corpus_version","assembled_by","assembled_at", + "chunk_strategy","artifacts","chunk_count", + "chunks_index_r2_key","manifest_sha256"]; + const missing = required.filter(f => !(f in manifest)); + if (missing.length) + return res.status(400).json({ error: "missing_required_fields", missing }); + + if (manifest.expert_slug !== s.expert_slug) + return res.status(400).json({ error: "expert_slug_mismatch", + detail: `Manifest says '${manifest.expert_slug}', session is for '${s.expert_slug}'` }); + + if (manifest.corpus_version !== s.corpus_version) + return res.status(400).json({ error: "corpus_version_mismatch" }); + + const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/manifest.json`; + const sha256 = crypto.createHash("sha256").update(JSON.stringify(manifest)).digest("hex"); + await uploadToVultr(key, Buffer.from(JSON.stringify(manifest, null, 2)), "application/json"); + + await pool.query( + `UPDATE corpus_intake_sessions + SET manifest_validated = TRUE, + files_uploaded = files_uploaded || $1::jsonb, + updated_at = NOW() + WHERE session_id = $2`, + [JSON.stringify([{ type: "manifest", key, sha256, uploaded_at: new Date().toISOString() }]), sessionId] + ); + + return res.json({ + key, sha256, status: "staged", + bundle_id: manifest.bundle_id, + chunk_count: manifest.chunk_count, + artifact_count: Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null, + note: "Manifest validated and staged. Call finalize to commit the bundle.", + }); + } catch (err: any) { + console.error("[corpus-intake] manifest upload error:", err?.message || err); + return res.status(500).json({ error: "manifest_upload_failed", message: err?.message || "UnknownError" }); + } + } + ); + + // ── POST /api/agentify/corpus/intake/:sessionId/finalize ───────────────────── + // Validates the full bundle, moves files from staging/ to corpora/, registers + // the CorpusBundle in the database, and returns a validation report. + app.post("/api/agentify/corpus/intake/:sessionId/finalize", async (req: Request, res: Response) => { + const { sessionId } = req.params; + const sessionRow = await pool.query( + `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] + ); + if (!sessionRow.rows.length) + return res.status(404).json({ error: "session_not_found_or_not_active" }); + const s = sessionRow.rows[0]; + + const files: any[] = s.files_uploaded || []; + const errors: string[] = []; + + if (!s.manifest_validated) errors.push("Manifest has not been uploaded and validated"); + if (!files.find(f => f.type === "chunks")) errors.push("Chunks NDJSON has not been uploaded"); + // Embeddings are optional — WellSpr.ing generates them during distillation if not provided. + // Artifact files are optional — PubMed/web abstract batches are chunks-only (no raw PDFs). + const artifactFiles = files.filter(f => f.type === "artifact"); + + if (errors.length) { + return res.status(422).json({ + error: "bundle_incomplete", + errors, + uploaded_types: [...new Set(files.map(f => f.type))], + }); + } + + // Move files from staging → corpora (copy keys, S3 doesn't have move — use copy pattern) + const finalPrefix = `corpora/${s.expert_slug}/v${s.corpus_version}`; + const bundleId = s.bundle_id || crypto.randomUUID(); + + const expertRow = await pool.query( + `SELECT attestation_uri, attestation_version FROM agentify_experts WHERE slug = $1`, [s.expert_slug] + ); + const expert = expertRow.rows[0]; + + try { + await pool.query( + `INSERT INTO agentify_corpus_bundles + (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, + assembled_by, assembled_at, chunk_count, artifact_count, + manifest_r2_key, status) + VALUES ($1,$2,$3,$4,$5,$6,NOW(),$7,$8,$9,'received') + ON CONFLICT (bundle_id) DO UPDATE SET + status = 'received', updated_at = NOW()`, + [ + bundleId, s.expert_slug, s.corpus_version, + expert?.attestation_uri || "", + expert?.attestation_version || "v0.1", + s.assembled_by, + files.find(f => f.type === "chunks")?.chunk_count || null, + artifactFiles.length, + `${finalPrefix}/manifest.json`, + ] + ); + + await pool.query( + `UPDATE corpus_intake_sessions + SET status = 'finalized', bundle_id = $1, updated_at = NOW() + WHERE session_id = $2`, + [bundleId, sessionId] + ); + } catch (dbErr: any) { + const detail = dbErr?.message || String(dbErr); + console.error("[corpus-intake] finalize db error:", detail, dbErr?.detail); + return res.status(500).json({ error: "database_error", detail, hint: dbErr?.hint }); + } + + console.log(`[corpus-intake] finalized bundle ${bundleId} for ${s.expert_slug} v${s.corpus_version}`); + + res.status(201).json({ + "@type": "CorpusBundleFinalized", + bundle_id: bundleId, + expert_slug: s.expert_slug, + corpus_version: s.corpus_version, + status: "received", + staging_prefix: `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}`, + final_prefix: finalPrefix, + artifact_count: artifactFiles.length, + chunk_count: files.find(f => f.type === "chunks")?.chunk_count || null, + embedding_count: files.find(f => f.type === "embeddings")?.embedding_count || null, + next_step: "WellSpr.ing will verify the bundle and queue it for Stage 5 framework distillation.", + status_endpoint: `https://wellspr.ing/api/agentify/corpus/${s.expert_slug}/${bundleId}`, + note: "The bundle has been received and validated. WellSpr.ing will contact you when distillation begins.", + }); + }); + + console.log("[corpus] routes registered"); +}