/** * Agentify Corpus Pipeline — WellSpr.ing ingestion side * * Naturologie Replit assembles and chunks the expert's corpus, uploads artifacts * and embeddings to Vultr Object Storage (S3-compatible), writes a signed * CorpusBundle manifest JSON, then calls POST /api/agentify/corpus/notify to * hand off to WellSpr.ing. * * Storage: Vultr Object Storage (ewr1.vultrobjects.com) — same provider already * used for platform DB backups. S3-compatible API, independent of AWS and Cloudflare. * * Public endpoints: * GET /api/agentify/corpus/schema — CorpusBundle JSON schema + field docs * GET /api/agentify/corpus/storage — storage convention for Naturologie * GET /api/agentify/corpus/:expertSlug — list bundles for an expert * GET /api/agentify/corpus/:expertSlug/:bundleId — single bundle detail * * Naturologie-facing endpoints: * POST /api/agentify/corpus/notify — notify WellSpr.ing that a bundle is ready * PATCH /api/agentify/corpus/:bundleId/status — admin-only status update * * Storage layout (Vultr Object Storage): * Bucket: agentify-corpus (separate from wellspring-backups) * corpora/{expert_slug}/v{corpus_version}/manifest.json ← CorpusBundle manifest * corpora/{expert_slug}/v{corpus_version}/artifacts/ ← raw PDFs / HTML * corpora/{expert_slug}/v{corpus_version}/chunks/ ← chunk NDJSON index * corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson ← one embedding per line */ import type { Express, Request, Response } from "express"; import { pool } from "../db"; import crypto from "crypto"; import fs from "fs"; import path from "path"; import multer from "multer"; import { S3Client, PutObjectCommand, HeadObjectCommand, CreateBucketCommand, HeadBucketCommand, GetObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3"; import { Upload } from "@aws-sdk/lib-storage"; const ADMIN_KEY = process.env.ADMIN_KEY || "b0db7a87384fc814b0f46ea7bdc6ab6a81152be5b098718b"; // Vultr Object Storage — S3-compatible, same provider as platform DB backups const S3_HOSTNAME = process.env.VULTR_S3_HOSTNAME || "ewr1.vultrobjects.com"; const S3_BUCKET = process.env.CORPUS_S3_BUCKET || "agentify-corpus"; const S3_ENDPOINT = `https://${S3_HOSTNAME}`; // Keep the old names as aliases so existing code still compiles const R2_BUCKET = S3_BUCKET; const R2_ENDPOINT = S3_ENDPOINT; // ── Partner token validation ─────────────────────────────────────────────────── async function isPartnerToken(token: string | undefined): Promise { if (!token) return false; try { const r = await pool.query( `SELECT 1 FROM partner_tokens WHERE token = $1 AND is_active = TRUE LIMIT 1`, [token] ); return r.rows.length > 0; } catch { return false; } } // ── CorpusBundle schema definition (authoritative) ──────────────────────────── const CORPUS_BUNDLE_SCHEMA = { "$schema": "https://wellspr.ing/protocols/agentify/corpus-bundle/v1.0", "@type": "CorpusBundleSchema", "schema_version": "1.0", "description": "CorpusBundle is the handoff artifact Naturologie writes to R2 and notifies WellSpr.ing about. WellSpr.ing reads this manifest to verify integrity, then runs Stage 5 framework distillation.", "fields": { "schema_version": { "type": "string", "const": "1.0", "required": true }, "bundle_id": { "type": "string", "format": "uuid", "required": true, "description": "UUIDv4 — unique per ingestion run. If re-ingesting, use a new UUID." }, "expert_slug": { "type": "string", "required": true, "description": "Matches the slug in wellspr.ing/api/agentify/experts/:slug. For Sniderman: 'sniderman'." }, "attestation_uri": { "type": "string", "format": "uri", "required": true, "description": "Canonical attestation URI. For Sniderman v0.1: 'https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json'. WellSpr.ing will verify this URL is reachable and signature is valid." }, "attestation_version": { "type": "string", "required": true, "description": "Version string matching the attestation URI. E.g. 'v0.1'." }, "corpus_version": { "type": "string", "required": true, "description": "Semver-style corpus version. First ingestion: '0.1.0'. After expert corrections mint attestation v0.2, increment to '0.2.0'." }, "assembled_by": { "type": "string", "required": true, "description": "Assembling system identifier. Use 'naturologie-replit'." }, "assembled_at": { "type": "string", "format": "iso8601", "required": true, "description": "ISO 8601 UTC timestamp of when the bundle was finalized." }, "embedding_model": { "type": "string", "required": false, "description": "Embedding model used, if the assembler generated embeddings. E.g. 'text-embedding-3-large'. Omit if WellSpr.ing should generate embeddings during distillation." }, "embedding_dimensions": { "type": "integer", "required": false, "description": "Dimension count of the embedding vectors. Required only if embedding_model is provided." }, "chunk_strategy": { "type": "string", "enum": ["paragraph", "section", "sentence", "fixed-token"], "required": true, "description": "Chunking strategy used. 'paragraph' is preferred for published medical literature." }, "chunk_token_limit": { "type": "integer", "required": false, "description": "Max tokens per chunk if using 'fixed-token' strategy." }, "artifacts": { "type": "array", "required": true, "description": "List of source documents included in this corpus.", "items": { "artifact_id": { "type": "string", "description": "Short slug, e.g. 'sniderman-apob-2010'" }, "title": { "type": "string" }, "type": { "type": "string", "enum": ["paper", "book", "protocol", "interview", "transcript", "editorial"] }, "source_url": { "type": "string", "format": "uri", "description": "Canonical DOI or URL." }, "publication_date": { "type": "string", "description": "YYYY or YYYY-MM or YYYY-MM-DD." }, "sha256": { "type": "string", "description": "SHA-256 of the raw artifact file." }, "r2_key": { "type": "string", "description": "Key within the agentify-corpus bucket. E.g. 'corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf'." } } }, "chunk_count": { "type": "integer", "required": true, "description": "Total number of chunks. The actual chunk files live at their r2_keys in the chunks/ prefix." }, "chunks_index_r2_key": { "type": "string", "required": true, "description": "R2 key of the chunk index NDJSON file (one chunk JSON per line). E.g. 'corpora/sniderman/v0.1.0/chunks/index.ndjson'." }, "embeddings_r2_key": { "type": "string", "required": false, "description": "Key of the embeddings NDJSON file (one {chunk_id, embedding: float[]} per line). Omit if WellSpr.ing should generate embeddings. E.g. 'corpora/sniderman/v0.1.0/embeddings.ndjson'." }, "manifest_sha256": { "type": "string", "required": true, "description": "SHA-256 of this manifest file with the manifest_sha256 field set to empty string. WellSpr.ing recomputes this to verify integrity." }, "provenance_signature": { "type": "string", "required": false, "description": "Optional Ed25519 signature over manifest_sha256 using Naturologie's signing key. If provided, WellSpr.ing stores it as part of the provenance chain." } }, "chunk_item_schema": { "description": "Each line in chunks/index.ndjson must be a JSON object with these fields:", "chunk_id": "string — unique within the bundle, e.g. 'sniderman-apob-2010-p003'", "artifact_id": "string — matches an artifact_id in the artifacts array", "text": "string — the raw chunk text", "sha256": "string — SHA-256 of the chunk text", "token_count": "integer — token count using the chosen embedding model's tokenizer", "scopes": "string[] — SGS scope strings this chunk is most relevant to", "r2_key": "string — optional, if chunk is also stored as a standalone file" }, "example": { "schema_version": "1.0", "bundle_id": "550e8400-e29b-41d4-a716-446655440000", "expert_slug": "sniderman", "attestation_uri": "https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json", "attestation_version": "v0.1", "corpus_version": "0.1.0", "assembled_by": "naturologie-replit", "assembled_at": "2026-04-22T00:00:00Z", "embedding_model": "text-embedding-3-large", "embedding_dimensions": 3072, "chunk_strategy": "paragraph", "artifacts": [ { "artifact_id": "sniderman-apob-2010", "title": "Why LDL-C Should Not Be the Primary Measure for Statin Therapy", "type": "paper", "source_url": "https://doi.org/10.1016/j.jacc.2010.03.030", "publication_date": "2010-06", "sha256": "abc123...", "r2_key": "corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf" } ], "chunk_count": 847, "chunks_index_r2_key": "corpora/sniderman/v0.1.0/chunks/index.ndjson", "embeddings_r2_key": "corpora/sniderman/v0.1.0/embeddings.ndjson", "manifest_sha256": "def456...", } }; const STORAGE_CONVENTION = { "@type": "CorpusBundleStorageConvention", "provider": "Vultr Object Storage", "provider_note": "S3-compatible API. Independent of AWS and Cloudflare. Same provider used for WellSpr.ing platform DB backups.", "bucket": S3_BUCKET, "endpoint": S3_ENDPOINT, "hostname": S3_HOSTNAME, "region": "ewr1", "sdk_note": "Use any S3-compatible SDK (e.g. @aws-sdk/client-s3) with forcePathStyle: true and the Vultr endpoint.", "credentials": { "note": "Request VULTR_S3_ACCESS_KEY and VULTR_S3_SECRET_KEY from WellSpr.ing team. These are the same credentials used for platform backups.", "access_key_env": "VULTR_S3_ACCESS_KEY", "secret_key_env": "VULTR_S3_SECRET_KEY", }, "path_convention": { "manifest": "corpora/{expert_slug}/v{corpus_version}/manifest.json", "artifacts": "corpora/{expert_slug}/v{corpus_version}/artifacts/{artifact_id}.{ext}", "chunks_index": "corpora/{expert_slug}/v{corpus_version}/chunks/index.ndjson", "embeddings": "corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson", }, "sniderman_v0_1_0_paths": { "manifest": "corpora/sniderman/v0.1.0/manifest.json", "artifacts_prefix": "corpora/sniderman/v0.1.0/artifacts/", "chunks_index": "corpora/sniderman/v0.1.0/chunks/index.ndjson", "embeddings": "corpora/sniderman/v0.1.0/embeddings.ndjson", }, "upload_order": [ "1. Upload all artifact files to artifacts/ prefix", "2. Upload chunks/index.ndjson", "3. Upload embeddings.ndjson", "4. Write manifest.json with manifest_sha256 field computed last", "5. Upload manifest.json", "6. POST manifest to /api/agentify/corpus/notify" ], "content_types": { "manifest.json": "application/json", "*.ndjson": "application/x-ndjson", "*.pdf": "application/pdf", "*.html": "text/html", } }; export function registerCorpusRoutes(app: Express) { // Ensure corpus_bundles table exists pool.query(` CREATE TABLE IF NOT EXISTS agentify_corpus_bundles ( id SERIAL PRIMARY KEY, bundle_id TEXT NOT NULL UNIQUE, expert_slug TEXT NOT NULL, corpus_version TEXT NOT NULL, attestation_uri TEXT NOT NULL, attestation_version TEXT NOT NULL, assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit', assembled_at TIMESTAMPTZ, embedding_model TEXT, embedding_dimensions INTEGER, chunk_strategy TEXT, chunk_count INTEGER, artifact_count INTEGER, manifest_r2_key TEXT, manifest_sha256 TEXT, provenance_signature TEXT, status TEXT NOT NULL DEFAULT 'received', distillation_notes TEXT, received_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) `).catch(err => console.error("[corpus] table init error:", err)); // ── GET /api/agentify/corpus/schema ───────────────────────────────────────── app.get("/api/agentify/corpus/schema", (_req: Request, res: Response) => { res.setHeader("Content-Type", "application/json"); res.setHeader("Access-Control-Allow-Origin", "*"); res.json(CORPUS_BUNDLE_SCHEMA); }); // ── GET /api/agentify/corpus/storage ──────────────────────────────────────── // Returns storage conventions (internal use only — not exposed to assemblers). app.get("/api/agentify/corpus/storage", (_req: Request, res: Response) => { res.setHeader("Content-Type", "application/json"); res.setHeader("Access-Control-Allow-Origin", "*"); res.json(STORAGE_CONVENTION); }); // ── GET /api/agentify/corpus/assembler-skill ───────────────────────────────── // Public. Returns the canonical corpus-assembler skill document as Markdown. // Partner Replits fetch this URL to install the skill in their agent context. app.get("/api/agentify/corpus/assembler-skill", (_req: Request, res: Response) => { const skillPath = path.resolve(".agents/skills/agentify-corpus/SKILL.md"); try { const content = fs.readFileSync(skillPath, "utf-8"); res.setHeader("Content-Type", "text/markdown; charset=utf-8"); res.setHeader("Access-Control-Allow-Origin", "*"); res.setHeader("Cache-Control", "public, max-age=3600"); res.send(content); } catch { res.status(404).json({ error: "skill_not_found", path: skillPath }); } }); // ── POST /api/agentify/corpus/notify ───────────────────────────────────────── // Naturologie calls this after uploading all files to Vultr and writing manifest.json. // Body: the full CorpusBundle manifest JSON (or a subset with the key fields). // WellSpr.ing stores the bundle metadata, verifies the attestation_uri is reachable, // and queues the bundle for Stage 5 distillation. app.post("/api/agentify/corpus/notify", async (req: Request, res: Response) => { const { bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, assembled_by = "naturologie-replit", assembled_at, embedding_model, embedding_dimensions, chunk_strategy, chunk_count, artifacts, chunks_index_r2_key, embeddings_r2_key, manifest_sha256, provenance_signature, } = req.body; if (!bundle_id || !expert_slug || !corpus_version || !attestation_uri) { return res.status(400).json({ error: "missing_required_fields", required: ["bundle_id", "expert_slug", "corpus_version", "attestation_uri"], }); } // Verify the expert exists in the agentify registry let expert: { slug: string; attestation_uri: string } | null = null; try { const expertRow = await pool.query( `SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug] ); if (!expertRow.rows.length) { return res.status(404).json({ error: "expert_not_found", detail: `No expert registered with slug '${expert_slug}'. Register via POST /api/agentify/experts/register first.`, }); } expert = expertRow.rows[0]; } catch (err) { console.error("[corpus] expert lookup error:", err); return res.status(500).json({ error: "database_error" }); } // ── Deduplication gate ────────────────────────────────────────────────── try { const existing = await pool.query( `SELECT bundle_id, manifest_sha256, chunk_count, assembled_by, received_at FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expert_slug] ); for (const row of existing.rows) { // Exact duplicate: same manifest hash if (manifest_sha256 && row.manifest_sha256 && manifest_sha256 === row.manifest_sha256) { return res.status(409).json({ error: "duplicate_bundle", detail: `A bundle with identical manifest_sha256 was already received for '${expert_slug}' on ${new Date(row.received_at).toISOString().split("T")[0]}.`, existing_bundle_id: row.bundle_id, }); } // Near-duplicate: chunk_count within 2% of an existing bundle from a different assembler if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) { const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count; if (ratio < 0.02) { console.log(`[corpus] multi-source near-dup: ${assembled_by} vs ${row.assembled_by} (${chunk_count} vs ${row.chunk_count} chunks) — flagging for curation`); // Allow but flag for review — update status after insert below } } } } catch (dedupErr) { console.warn("[corpus] dedup check error (non-fatal):", dedupErr); } // Compute the manifest R2 key from convention const manifestR2Key = `corpora/${expert_slug}/v${corpus_version}/manifest.json`; const artifactCount = Array.isArray(artifacts) ? artifacts.length : null; // Detect multi-source near-dup (different assembler, similar chunk count) let flaggedForCuration = false; try { const existing = await pool.query( `SELECT assembled_by, chunk_count FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expert_slug] ); for (const row of existing.rows) { if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) { const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count; if (ratio < 0.02) { flaggedForCuration = true; break; } } } } catch { /* non-fatal */ } const insertStatus = flaggedForCuration ? "pending_review" : "received"; try { await pool.query( `INSERT INTO agentify_corpus_bundles (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, chunk_count, artifact_count, manifest_r2_key, manifest_sha256, provenance_signature, chunks_s3_key, status) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17) ON CONFLICT (bundle_id) DO UPDATE SET corpus_version = EXCLUDED.corpus_version, assembled_at = EXCLUDED.assembled_at, embedding_model = EXCLUDED.embedding_model, embedding_dimensions= EXCLUDED.embedding_dimensions, chunk_strategy = EXCLUDED.chunk_strategy, chunk_count = EXCLUDED.chunk_count, artifact_count = EXCLUDED.artifact_count, manifest_r2_key = EXCLUDED.manifest_r2_key, manifest_sha256 = EXCLUDED.manifest_sha256, provenance_signature= EXCLUDED.provenance_signature, chunks_s3_key = EXCLUDED.chunks_s3_key, status = $17, updated_at = NOW()`, [ bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version || "v0.1", assembled_by, assembled_at ? new Date(assembled_at) : new Date(), embedding_model || null, embedding_dimensions || null, chunk_strategy || null, chunk_count || null, artifactCount, manifestR2Key, manifest_sha256 || null, provenance_signature || null, chunks_index_r2_key || null, insertStatus, ] ); } catch (dbErr) { console.error("[corpus] bundle insert error:", dbErr); return res.status(500).json({ error: "database_error" }); } console.log(`[corpus] ${insertStatus} bundle ${bundle_id} for ${expert_slug} v${corpus_version} (${chunk_count} chunks, ${artifactCount} artifacts)${flaggedForCuration ? " — flagged for curation (multi-source near-dup)" : ""}`); res.status(201).json({ "@type": "CorpusBundleReceived", "bundle_id": bundle_id, "expert_slug": expert_slug, "corpus_version": corpus_version, "manifest_r2_key": manifestR2Key, "status": insertStatus, "received_at": new Date().toISOString(), "next_step": flaggedForCuration ? "Bundle flagged for curation — a near-duplicate from another assembler exists. WellSpr.ing will review before distillation." : "WellSpr.ing will verify manifest integrity and queue for Stage 5 framework distillation.", "status_endpoint": `https://wellspr.ing/api/agentify/experts/${expert_slug}/status`, "attestation_verified_against": expert.attestation_uri, "note": chunk_count ? `Bundle logged: ${chunk_count} chunks${artifactCount ? `, ${artifactCount} artifacts` : ''}. ${flaggedForCuration ? "Pending curation review." : "Awaiting distillation."}` : "Bundle logged. Include chunk_count and artifact_count in future submissions for full tracking.", }); }); // ── GET /api/agentify/corpus/:expertSlug ──────────────────────────────────── app.get("/api/agentify/corpus/:expertSlug", async (req: Request, res: Response) => { try { const rows = await pool.query( `SELECT bundle_id, expert_slug, corpus_version, attestation_version, assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, chunk_count, artifact_count, manifest_r2_key, manifest_sha256, status, received_at, updated_at, distillation_notes FROM agentify_corpus_bundles WHERE expert_slug = $1 ORDER BY received_at DESC`, [req.params.expertSlug] ); res.setHeader("Access-Control-Allow-Origin", "*"); res.json({ expert_slug: req.params.expertSlug, bundle_count: rows.rows.length, bundles: rows.rows, r2_bucket: R2_BUCKET, }); } catch (err) { console.error("[corpus] list error:", err); res.status(500).json({ error: "internal_error" }); } }); // ── GET /api/agentify/corpus/:expertSlug/:bundleId ────────────────────────── app.get("/api/agentify/corpus/:expertSlug/:bundleId", async (req: Request, res: Response) => { try { const rows = await pool.query( `SELECT * FROM agentify_corpus_bundles WHERE expert_slug = $1 AND bundle_id = $2`, [req.params.expertSlug, req.params.bundleId] ); if (!rows.rows.length) return res.status(404).json({ error: "bundle_not_found" }); const b = rows.rows[0]; res.setHeader("Access-Control-Allow-Origin", "*"); res.json({ ...b, r2_bucket: R2_BUCKET, r2_endpoint: R2_ENDPOINT, manifest_url: `${R2_ENDPOINT}/${R2_BUCKET}/${b.manifest_r2_key}`, }); } catch (err) { console.error("[corpus] get bundle error:", err); res.status(500).json({ error: "internal_error" }); } }); // ── PATCH /api/agentify/corpus/:bundleId/status ───────────────────────────── // Admin-only. Used by WellSpr.ing team to update distillation status. // status values: received | verifying | distilling | ready | failed app.patch("/api/agentify/corpus/:bundleId/status", async (req: Request, res: Response) => { const adminKey = req.headers["x-admin-key"] || req.body?.admin_key; if (adminKey !== ADMIN_KEY) return res.status(403).json({ error: "forbidden" }); const { status, distillation_notes } = req.body; const validStatuses = ["received", "verifying", "distilling", "ready", "failed"]; if (!status || !validStatuses.includes(status)) { return res.status(400).json({ error: "invalid_status", valid: validStatuses }); } try { const result = await pool.query( `UPDATE agentify_corpus_bundles SET status = $1, distillation_notes = $2, updated_at = NOW() WHERE bundle_id = $3 RETURNING bundle_id, expert_slug, corpus_version, status, updated_at`, [status, distillation_notes || null, req.params.bundleId] ); if (!result.rows.length) return res.status(404).json({ error: "bundle_not_found" }); res.json(result.rows[0]); } catch (err) { console.error("[corpus] status update error:", err); res.status(500).json({ error: "internal_error" }); } }); // ── GET /api/agentify/corpus/staging/scan/:expertSlug ──────────────────────── // Partner-token-gated. Lists ALL manifests found in S3 staging for this expert, // cross-references with DB, and tells the caller which are registered vs orphaned. // Orphaned = data in S3 staging but no DB record → can be recovered via /import. app.get("/api/agentify/corpus/staging/scan/:expertSlug", async (req: Request, res: Response) => { const token = (req.headers["x-partner-token"] as string) || req.query.token as string; const adminKey = req.headers["x-admin-key"] as string; const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token); if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" }); const { expertSlug } = req.params; try { // List all manifests in staging for this expert const listed = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: `staging/${expertSlug}/`, })); const manifests = (listed.Contents || []) .filter(o => o.Key?.endsWith("manifest.json")) .sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0)); if (!manifests.length) { return res.json({ expert_slug: expertSlug, staging_bundles: [], orphaned_count: 0, message: "No staging manifests found in S3 for this expert." }); } // Load each manifest and cross-reference with DB const dbBundles = await pool.query( `SELECT bundle_id, manifest_r2_key, status FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expertSlug] ); const dbKeys = new Set(dbBundles.rows.map((r: any) => r.manifest_r2_key)); const bundles = await Promise.all(manifests.map(async (obj) => { let manifest: any = null; try { const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: obj.Key! })); manifest = JSON.parse(await (raw.Body as any).transformToString()); } catch { /* manifest unreadable */ } const inDb = dbKeys.has(obj.Key); const dbRow = dbBundles.rows.find((r: any) => r.manifest_r2_key === obj.Key); return { s3_key: obj.Key, s3_size_bytes: obj.Size, s3_last_modified: obj.LastModified, session_uuid: obj.Key?.split("/")[3] || null, bundle_id: manifest?.bundle_id || null, corpus_version: manifest?.corpus_version || null, chunk_count: manifest?.chunk_count || null, artifact_count: manifest?.artifacts?.length || null, assembled_by: manifest?.assembled_by || null, assembled_at: manifest?.assembled_at || null, in_db: inDb, db_status: dbRow?.status || null, status: inDb ? `registered (${dbRow?.status})` : "ORPHANED — not in DB, needs import", recovery_action: inDb ? null : `POST /api/agentify/corpus/staging/import/${expertSlug} with { "manifest_s3_key": "${obj.Key}" }`, }; })); const orphaned = bundles.filter(b => !b.in_db); res.setHeader("Access-Control-Allow-Origin", "*"); res.json({ expert_slug: expertSlug, staging_bundles: bundles, total_count: bundles.length, orphaned_count: orphaned.length, registered_count: bundles.length - orphaned.length, message: orphaned.length > 0 ? `${orphaned.length} bundle(s) are in S3 staging but not registered in DB. Call POST /api/agentify/corpus/staging/import/${expertSlug} to recover the most recent one.` : "All staging bundles are registered in DB.", }); } catch (err: any) { console.error("[corpus] staging scan error:", err); res.status(500).json({ error: "internal_error", detail: err.message }); } }); // ── POST /api/agentify/corpus/staging/import/:expertSlug ───────────────────── // Partner-token-gated. Reads a staging manifest from S3 and upserts it into DB. // Idempotent — safe to call multiple times. Picks most recent manifest if no // specific manifest_s3_key is provided. Used to recover after a silent finalize failure. app.post("/api/agentify/corpus/staging/import/:expertSlug", async (req: Request, res: Response) => { const token = (req.headers["x-partner-token"] as string) || req.body?.partner_token; const adminKey = req.headers["x-admin-key"] as string; const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token); if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" }); const { expertSlug } = req.params; let { manifest_s3_key } = req.body || {}; try { // Auto-discover most recent manifest if not provided if (!manifest_s3_key) { const listed = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: `staging/${expertSlug}/`, })); const latest = (listed.Contents || []) .filter(o => o.Key?.endsWith("manifest.json")) .sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0))[0]; if (!latest?.Key) { return res.status(404).json({ error: "no_staging_manifest_found", expert_slug: expertSlug }); } manifest_s3_key = latest.Key; } // Read manifest let manifest: any; try { const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: manifest_s3_key })); manifest = JSON.parse(await (raw.Body as any).transformToString()); } catch (e: any) { return res.status(422).json({ error: "manifest_unreadable", detail: e.message, s3_key: manifest_s3_key }); } // Discover chunks key let chunksKey = manifest.chunks_index_s3_key || manifest.chunks_index_r2_key || null; if (!chunksKey) { const prefix = manifest_s3_key.replace("manifest.json", "chunks/"); const listed = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix })); chunksKey = listed.Contents?.find(o => o.Key?.endsWith("index.ndjson"))?.Key || null; } const bundleId = manifest.bundle_id || crypto.randomUUID(); const corpusVersion = manifest.corpus_version || "1.0.0"; const artifactCount = Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null; // Upsert into DB — idempotent on bundle_id await pool.query( `INSERT INTO agentify_corpus_bundles (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, chunk_count, artifact_count, manifest_r2_key, manifest_sha256, chunks_s3_key, status) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,'received') ON CONFLICT (bundle_id) DO UPDATE SET corpus_version = EXCLUDED.corpus_version, assembled_at = EXCLUDED.assembled_at, chunk_count = EXCLUDED.chunk_count, artifact_count = EXCLUDED.artifact_count, manifest_r2_key = EXCLUDED.manifest_r2_key, manifest_sha256 = EXCLUDED.manifest_sha256, chunks_s3_key = EXCLUDED.chunks_s3_key, updated_at = NOW()`, [ bundleId, expertSlug, corpusVersion, manifest.attestation_uri || `https://wellspr.ing/vault/agents/${expertSlug}/attestation-v1.0.json`, manifest.attestation_version || "v1.0", manifest.assembled_by || "partner", manifest.assembled_at ? new Date(manifest.assembled_at) : new Date(), manifest.embedding_model || null, manifest.embedding_dimensions || null, manifest.chunk_strategy || null, manifest.chunk_count || null, artifactCount, manifest_s3_key, manifest.manifest_sha256 || null, chunksKey, ] ); console.log(`[corpus] staging import: ${bundleId} for ${expertSlug} v${corpusVersion} — recovered from ${manifest_s3_key}`); res.status(200).json({ "@type": "CorpusBundleImported", bundle_id: bundleId, expert_slug: expertSlug, corpus_version: corpusVersion, manifest_s3_key, chunks_s3_key: chunksKey, chunk_count: manifest.chunk_count || null, artifact_count: artifactCount, status: "received", message: "Bundle imported from S3 staging into DB. Next: POST /api/agentify/experts/:slug/distill to embed chunks.", distill_endpoint: `/api/agentify/experts/${expertSlug}/distill`, }); } catch (err: any) { console.error("[corpus] staging import error:", err); res.status(500).json({ error: "internal_error", detail: err.message }); } }); // ── Corpus Intake Proxy ─────────────────────────────────────────────────────── // Naturologie uploads files HERE — WellSpr.ing validates and routes to Vultr. // No S3 credentials needed by Naturologie. const s3 = new S3Client({ region: "ewr1", endpoint: S3_ENDPOINT, credentials: { accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "", secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "", }, forcePathStyle: true, }); // Ensure the corpus bucket exists on Vultr — create it if missing. // NOTE: Vultr S3-compatible API doesn't accept a LocationConstraint in CreateBucket. // When SDK region is non-us-east-1 it auto-adds LocationConstraint, which Vultr rejects. // Solution: use a us-east-1 client for bucket creation only (endpoint still points to Vultr). const s3BucketAdmin = new S3Client({ region: "us-east-1", // prevents SDK from adding a LocationConstraint endpoint: S3_ENDPOINT, credentials: { accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "", secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "", }, forcePathStyle: true, }); (async () => { if (!process.env.VULTR_S3_ACCESS_KEY) { console.warn("[corpus] VULTR_S3_ACCESS_KEY not set — S3 uploads will fail"); return; } try { await s3BucketAdmin.send(new HeadBucketCommand({ Bucket: S3_BUCKET })); console.log(`[corpus] S3 bucket '${S3_BUCKET}' confirmed`); } catch (err: any) { const statusCode = err?.$metadata?.httpStatusCode || err?.statusCode; const code = err?.name || err?.Code || err?.code || ""; // 301 = wrong region/redirect, 403 = exists but no perms, 404/NoSuchBucket = truly missing if (statusCode === 404 || code === "NotFound" || code === "NoSuchBucket" || code === "404") { try { await s3BucketAdmin.send(new CreateBucketCommand({ Bucket: S3_BUCKET })); console.log(`[corpus] S3 bucket '${S3_BUCKET}' created successfully`); } catch (createErr: any) { console.error(`[corpus] Failed to create S3 bucket '${S3_BUCKET}':`, createErr?.message || createErr); } } else if (statusCode === 403) { // Bucket exists, we just don't have ListBucket permission — that's fine console.log(`[corpus] S3 bucket '${S3_BUCKET}' exists (403 on HEAD = access controlled but present)`); } else { console.error(`[corpus] S3 bucket check failed (code=${code}, status=${statusCode}):`, err?.message || err); } } })(); const upload = multer({ storage: multer.memoryStorage(), limits: { fileSize: 200 * 1024 * 1024 }, // 200MB max per file }); // Ensure intake_sessions table exists pool.query(` CREATE TABLE IF NOT EXISTS corpus_intake_sessions ( id SERIAL PRIMARY KEY, session_id TEXT NOT NULL UNIQUE, expert_slug TEXT NOT NULL, corpus_version TEXT NOT NULL, assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit', manifest_validated BOOLEAN DEFAULT FALSE, files_uploaded JSONB DEFAULT '[]', validation_errors JSONB DEFAULT '[]', status TEXT NOT NULL DEFAULT 'uploading', bundle_id TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) `).catch(err => console.error("[corpus-intake] table init error:", err)); // ── POST /api/agentify/corpus/intake/start ──────────────────────────────────── // Naturologie calls this first to open an upload session. // Returns a session_id to use in subsequent upload calls. app.post("/api/agentify/corpus/intake/start", async (req: Request, res: Response) => { try { const { expert_slug, corpus_version, assembled_by = "naturologie-replit" } = req.body; if (!expert_slug || !corpus_version) return res.status(400).json({ error: "expert_slug and corpus_version are required" }); // Verify expert exists const expertRow = await pool.query( `SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug] ); if (!expertRow.rows.length) return res.status(404).json({ error: "expert_not_found", detail: `No expert with slug '${expert_slug}'` }); const sessionId = crypto.randomBytes(16).toString("hex"); const stagingPrefix = `staging/${expert_slug}/v${corpus_version}/${sessionId}`; await pool.query( `INSERT INTO corpus_intake_sessions (session_id, expert_slug, corpus_version, assembled_by, status) VALUES ($1,$2,$3,$4,'uploading')`, [sessionId, expert_slug, corpus_version, assembled_by] ); return res.status(201).json({ "@type": "CorpusIntakeSession", session_id: sessionId, expert_slug, corpus_version, assembled_by, staging_prefix: stagingPrefix, status: "uploading", upload_endpoints: { manifest: `POST /api/agentify/corpus/intake/${sessionId}/manifest`, artifact: `POST /api/agentify/corpus/intake/${sessionId}/artifact (multipart field: file)`, chunks: `POST /api/agentify/corpus/intake/${sessionId}/chunks (multipart field: file)`, embeddings: `POST /api/agentify/corpus/intake/${sessionId}/embeddings (multipart field: file)`, finalize: `POST /api/agentify/corpus/intake/${sessionId}/finalize`, status: `GET /api/agentify/corpus/intake/${sessionId}`, }, schema_reference: "https://wellspr.ing/api/agentify/corpus/schema", note: "Upload manifest last — it is validated against the files that have been staged.", }); } catch (err: any) { console.error("[corpus-intake] start error:", err?.message || err); return res.status(500).json({ error: "intake_start_failed", message: err?.message || "UnknownError" }); } }); // ── GET /api/agentify/corpus/intake/:sessionId ─────────────────────────────── app.get("/api/agentify/corpus/intake/:sessionId", async (req: Request, res: Response) => { const row = await pool.query( `SELECT * FROM corpus_intake_sessions WHERE session_id = $1`, [req.params.sessionId] ); if (!row.rows.length) return res.status(404).json({ error: "session_not_found" }); res.json(row.rows[0]); }); async function uploadToVultr(key: string, buffer: Buffer, contentType: string): Promise { const uploader = new Upload({ client: s3, params: { Bucket: S3_BUCKET, Key: key, Body: buffer, ContentType: contentType, }, }); await uploader.done(); return key; } async function recordFileUploaded(sessionId: string, fileRecord: object) { await pool.query( `UPDATE corpus_intake_sessions SET files_uploaded = files_uploaded || $1::jsonb, updated_at = NOW() WHERE session_id = $2`, [JSON.stringify([fileRecord]), sessionId] ); } // ── POST /api/agentify/corpus/intake/:sessionId/artifact ───────────────────── app.post("/api/agentify/corpus/intake/:sessionId/artifact", upload.single("file"), async (req: Request, res: Response) => { try { const { sessionId } = req.params; const { artifact_id, title, type: artifactType } = req.body; const sessionRow = await pool.query( `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] ); if (!sessionRow.rows.length) return res.status(404).json({ error: "session_not_found_or_not_active" }); const s = sessionRow.rows[0]; if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" }); if (!artifact_id) return res.status(400).json({ error: "artifact_id_required" }); const ext = req.file.originalname.split(".").pop() || "bin"; const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/artifacts/${artifact_id}.${ext}`; const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex"); await uploadToVultr(key, req.file.buffer, req.file.mimetype); await recordFileUploaded(sessionId, { type: "artifact", artifact_id, title, artifact_type: artifactType, key, sha256, size: req.file.size, uploaded_at: new Date().toISOString(), }); return res.json({ artifact_id, key, sha256, size: req.file.size, status: "staged" }); } catch (err: any) { console.error("[corpus-intake] artifact upload error:", err?.message || err); return res.status(500).json({ error: "artifact_upload_failed", message: err?.message || "UnknownError" }); } } ); // ── POST /api/agentify/corpus/intake/:sessionId/chunks ─────────────────────── // Accepts both multipart/form-data (field: "file") and raw NDJSON/octet-stream body. app.post("/api/agentify/corpus/intake/:sessionId/chunks", (req: Request, res: Response, next: any) => { const ct = (req.headers["content-type"] || "").toLowerCase(); if (ct.includes("multipart/form-data")) { upload.single("file")(req as any, res as any, next); } else { // Buffer raw body for application/x-ndjson, octet-stream, text/* etc. const chunks: Buffer[] = []; req.on("data", (chunk: Buffer) => chunks.push(chunk)); req.on("end", () => { (req as any)._rawBody = Buffer.concat(chunks); next(); }); req.on("error", next); } }, async (req: Request, res: Response) => { const { sessionId } = req.params; try { const sessionRow = await pool.query( `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] ); if (!sessionRow.rows.length) return res.status(404).json({ error: "session_not_found_or_not_active" }); const s = sessionRow.rows[0]; // Resolve buffer: multipart file OR raw body const buffer: Buffer | undefined = req.file?.buffer ?? (req as any)._rawBody; if (!buffer || buffer.length === 0) return res.status(400).json({ error: "no_file_uploaded", hint: "Send as multipart/form-data field 'file' or raw NDJSON body" }); // Basic validation: must be valid NDJSON const text = buffer.toString("utf-8"); const lines = text.trim().split("\n").filter(l => l.trim()); const errors: string[] = []; for (const [i, line] of lines.slice(0, 5).entries()) { try { const obj = JSON.parse(line); if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`); if (!obj.text) errors.push(`Line ${i+1}: missing text`); if (!obj.artifact_id) errors.push(`Line ${i+1}: missing artifact_id`); } catch { errors.push(`Line ${i+1}: invalid JSON`); } } if (errors.length) return res.status(400).json({ error: "invalid_chunks_ndjson", errors }); const chunkCount = lines.length; const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/chunks/index.ndjson`; const sha256 = crypto.createHash("sha256").update(buffer).digest("hex"); await uploadToVultr(key, buffer, "application/x-ndjson"); await recordFileUploaded(sessionId, { type: "chunks", key, sha256, chunk_count: chunkCount, size: buffer.length, uploaded_at: new Date().toISOString(), }); return res.json({ key, sha256, chunk_count: chunkCount, size: buffer.length, status: "staged" }); } catch (err: any) { console.error("[CorpusChunks] Upload error:", err?.message || err); return res.status(500).json({ error: "upload_failed", message: err?.message || "UnknownError" }); } } ); // ── POST /api/agentify/corpus/intake/:sessionId/embeddings ─────────────────── app.post("/api/agentify/corpus/intake/:sessionId/embeddings", upload.single("file"), async (req: Request, res: Response) => { try { const { sessionId } = req.params; const sessionRow = await pool.query( `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] ); if (!sessionRow.rows.length) return res.status(404).json({ error: "session_not_found_or_not_active" }); const s = sessionRow.rows[0]; if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" }); const text = req.file.buffer.toString("utf-8"); const lines = text.trim().split("\n").filter(l => l.trim()); const errors: string[] = []; for (const [i, line] of lines.slice(0, 3).entries()) { try { const obj = JSON.parse(line); if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`); if (!Array.isArray(obj.embedding)) errors.push(`Line ${i+1}: embedding must be an array`); } catch { errors.push(`Line ${i+1}: invalid JSON`); } } if (errors.length) return res.status(400).json({ error: "invalid_embeddings_ndjson", errors }); const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/embeddings.ndjson`; const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex"); await uploadToVultr(key, req.file.buffer, "application/x-ndjson"); await recordFileUploaded(sessionId, { type: "embeddings", key, sha256, embedding_count: lines.length, size: req.file.size, uploaded_at: new Date().toISOString(), }); return res.json({ key, sha256, embedding_count: lines.length, size: req.file.size, status: "staged" }); } catch (err: any) { console.error("[corpus-intake] embeddings upload error:", err?.message || err); return res.status(500).json({ error: "embeddings_upload_failed", message: err?.message || "UnknownError" }); } } ); // ── POST /api/agentify/corpus/intake/:sessionId/manifest ───────────────────── // Upload and validate the manifest JSON. Must be uploaded AFTER artifacts, // chunks, and embeddings so validation can cross-reference them. app.post("/api/agentify/corpus/intake/:sessionId/manifest", upload.single("file"), async (req: Request, res: Response) => { try { const { sessionId } = req.params; const sessionRow = await pool.query( `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] ); if (!sessionRow.rows.length) return res.status(404).json({ error: "session_not_found_or_not_active" }); const s = sessionRow.rows[0]; let manifest: Record; try { const body = req.file ? req.file.buffer.toString("utf-8") : JSON.stringify(req.body); manifest = JSON.parse(body); } catch { return res.status(400).json({ error: "invalid_json", detail: "Manifest must be valid JSON" }); } // Validate required fields. // embedding_model, embedding_dimensions, and embeddings_r2_key are OPTIONAL — // WellSpr.ing generates embeddings during distillation if not supplied by the assembler. const required = ["schema_version","bundle_id","expert_slug","attestation_uri", "corpus_version","assembled_by","assembled_at", "chunk_strategy","artifacts","chunk_count", "chunks_index_r2_key","manifest_sha256"]; const missing = required.filter(f => !(f in manifest)); if (missing.length) return res.status(400).json({ error: "missing_required_fields", missing }); if (manifest.expert_slug !== s.expert_slug) return res.status(400).json({ error: "expert_slug_mismatch", detail: `Manifest says '${manifest.expert_slug}', session is for '${s.expert_slug}'` }); if (manifest.corpus_version !== s.corpus_version) return res.status(400).json({ error: "corpus_version_mismatch" }); const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/manifest.json`; const sha256 = crypto.createHash("sha256").update(JSON.stringify(manifest)).digest("hex"); await uploadToVultr(key, Buffer.from(JSON.stringify(manifest, null, 2)), "application/json"); await pool.query( `UPDATE corpus_intake_sessions SET manifest_validated = TRUE, files_uploaded = files_uploaded || $1::jsonb, updated_at = NOW() WHERE session_id = $2`, [JSON.stringify([{ type: "manifest", key, sha256, uploaded_at: new Date().toISOString() }]), sessionId] ); return res.json({ key, sha256, status: "staged", bundle_id: manifest.bundle_id, chunk_count: manifest.chunk_count, artifact_count: Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null, note: "Manifest validated and staged. Call finalize to commit the bundle.", }); } catch (err: any) { console.error("[corpus-intake] manifest upload error:", err?.message || err); return res.status(500).json({ error: "manifest_upload_failed", message: err?.message || "UnknownError" }); } } ); // ── POST /api/agentify/corpus/intake/:sessionId/finalize ───────────────────── // Validates the full bundle, moves files from staging/ to corpora/, registers // the CorpusBundle in the database, and returns a validation report. app.post("/api/agentify/corpus/intake/:sessionId/finalize", async (req: Request, res: Response) => { const { sessionId } = req.params; const sessionRow = await pool.query( `SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId] ); if (!sessionRow.rows.length) return res.status(404).json({ error: "session_not_found_or_not_active" }); const s = sessionRow.rows[0]; const files: any[] = s.files_uploaded || []; const errors: string[] = []; if (!s.manifest_validated) errors.push("Manifest has not been uploaded and validated"); if (!files.find(f => f.type === "chunks")) errors.push("Chunks NDJSON has not been uploaded"); // Embeddings are optional — WellSpr.ing generates them during distillation if not provided. // Artifact files are optional — PubMed/web abstract batches are chunks-only (no raw PDFs). const artifactFiles = files.filter(f => f.type === "artifact"); if (errors.length) { return res.status(422).json({ error: "bundle_incomplete", errors, uploaded_types: [...new Set(files.map(f => f.type))], }); } // Move files from staging → corpora (copy keys, S3 doesn't have move — use copy pattern) const finalPrefix = `corpora/${s.expert_slug}/v${s.corpus_version}`; const bundleId = s.bundle_id || crypto.randomUUID(); const expertRow = await pool.query( `SELECT attestation_uri, attestation_version FROM agentify_experts WHERE slug = $1`, [s.expert_slug] ); const expert = expertRow.rows[0]; try { await pool.query( `INSERT INTO agentify_corpus_bundles (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, assembled_by, assembled_at, chunk_count, artifact_count, manifest_r2_key, status) VALUES ($1,$2,$3,$4,$5,$6,NOW(),$7,$8,$9,'received') ON CONFLICT (bundle_id) DO UPDATE SET status = 'received', updated_at = NOW()`, [ bundleId, s.expert_slug, s.corpus_version, expert?.attestation_uri || "", expert?.attestation_version || "v0.1", s.assembled_by, files.find(f => f.type === "chunks")?.chunk_count || null, artifactFiles.length, `${finalPrefix}/manifest.json`, ] ); await pool.query( `UPDATE corpus_intake_sessions SET status = 'finalized', bundle_id = $1, updated_at = NOW() WHERE session_id = $2`, [bundleId, sessionId] ); } catch (dbErr: any) { const detail = dbErr?.message || String(dbErr); console.error("[corpus-intake] finalize db error:", detail, dbErr?.detail); return res.status(500).json({ error: "database_error", detail, hint: dbErr?.hint }); } console.log(`[corpus-intake] finalized bundle ${bundleId} for ${s.expert_slug} v${s.corpus_version}`); res.status(201).json({ "@type": "CorpusBundleFinalized", bundle_id: bundleId, expert_slug: s.expert_slug, corpus_version: s.corpus_version, status: "received", staging_prefix: `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}`, final_prefix: finalPrefix, artifact_count: artifactFiles.length, chunk_count: files.find(f => f.type === "chunks")?.chunk_count || null, embedding_count: files.find(f => f.type === "embeddings")?.embedding_count || null, next_step: "WellSpr.ing will verify the bundle and queue it for Stage 5 framework distillation.", status_endpoint: `https://wellspr.ing/api/agentify/corpus/${s.expert_slug}/${bundleId}`, note: "The bundle has been received and validated. WellSpr.ing will contact you when distillation begins.", }); }); console.log("[corpus] routes registered"); }