agentify-help/routes/corpus.ts

1158 lines
55 KiB
TypeScript

/**
* Agentify Corpus Pipeline — WellSpr.ing ingestion side
*
* Naturologie Replit assembles and chunks the expert's corpus, uploads artifacts
* and embeddings to Vultr Object Storage (S3-compatible), writes a signed
* CorpusBundle manifest JSON, then calls POST /api/agentify/corpus/notify to
* hand off to WellSpr.ing.
*
* Storage: Vultr Object Storage (ewr1.vultrobjects.com) — same provider already
* used for platform DB backups. S3-compatible API, independent of AWS and Cloudflare.
*
* Public endpoints:
* GET /api/agentify/corpus/schema — CorpusBundle JSON schema + field docs
* GET /api/agentify/corpus/storage — storage convention for Naturologie
* GET /api/agentify/corpus/:expertSlug — list bundles for an expert
* GET /api/agentify/corpus/:expertSlug/:bundleId — single bundle detail
*
* Naturologie-facing endpoints:
* POST /api/agentify/corpus/notify — notify WellSpr.ing that a bundle is ready
* PATCH /api/agentify/corpus/:bundleId/status — admin-only status update
*
* Storage layout (Vultr Object Storage):
* Bucket: agentify-corpus (separate from wellspring-backups)
* corpora/{expert_slug}/v{corpus_version}/manifest.json ← CorpusBundle manifest
* corpora/{expert_slug}/v{corpus_version}/artifacts/ ← raw PDFs / HTML
* corpora/{expert_slug}/v{corpus_version}/chunks/ ← chunk NDJSON index
* corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson ← one embedding per line
*/
import type { Express, Request, Response } from "express";
import { pool } from "../db";
import crypto from "crypto";
import fs from "fs";
import path from "path";
import multer from "multer";
import { S3Client, PutObjectCommand, HeadObjectCommand, CreateBucketCommand, HeadBucketCommand, GetObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
const ADMIN_KEY = process.env.ADMIN_KEY || "b0db7a87384fc814b0f46ea7bdc6ab6a81152be5b098718b";
// Vultr Object Storage — S3-compatible, same provider as platform DB backups
const S3_HOSTNAME = process.env.VULTR_S3_HOSTNAME || "ewr1.vultrobjects.com";
const S3_BUCKET = process.env.CORPUS_S3_BUCKET || "agentify-corpus";
const S3_ENDPOINT = `https://${S3_HOSTNAME}`;
// Keep the old names as aliases so existing code still compiles
const R2_BUCKET = S3_BUCKET;
const R2_ENDPOINT = S3_ENDPOINT;
// ── Partner token validation ───────────────────────────────────────────────────
async function isPartnerToken(token: string | undefined): Promise<boolean> {
if (!token) return false;
try {
const r = await pool.query(
`SELECT 1 FROM partner_tokens WHERE token = $1 AND is_active = TRUE LIMIT 1`, [token]
);
return r.rows.length > 0;
} catch { return false; }
}
// ── CorpusBundle schema definition (authoritative) ────────────────────────────
const CORPUS_BUNDLE_SCHEMA = {
"$schema": "https://wellspr.ing/protocols/agentify/corpus-bundle/v1.0",
"@type": "CorpusBundleSchema",
"schema_version": "1.0",
"description": "CorpusBundle is the handoff artifact Naturologie writes to R2 and notifies WellSpr.ing about. WellSpr.ing reads this manifest to verify integrity, then runs Stage 5 framework distillation.",
"fields": {
"schema_version": { "type": "string", "const": "1.0", "required": true },
"bundle_id": {
"type": "string", "format": "uuid", "required": true,
"description": "UUIDv4 — unique per ingestion run. If re-ingesting, use a new UUID."
},
"expert_slug": {
"type": "string", "required": true,
"description": "Matches the slug in wellspr.ing/api/agentify/experts/:slug. For Sniderman: 'sniderman'."
},
"attestation_uri": {
"type": "string", "format": "uri", "required": true,
"description": "Canonical attestation URI. For Sniderman v0.1: 'https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json'. WellSpr.ing will verify this URL is reachable and signature is valid."
},
"attestation_version": {
"type": "string", "required": true,
"description": "Version string matching the attestation URI. E.g. 'v0.1'."
},
"corpus_version": {
"type": "string", "required": true,
"description": "Semver-style corpus version. First ingestion: '0.1.0'. After expert corrections mint attestation v0.2, increment to '0.2.0'."
},
"assembled_by": {
"type": "string", "required": true,
"description": "Assembling system identifier. Use 'naturologie-replit'."
},
"assembled_at": {
"type": "string", "format": "iso8601", "required": true,
"description": "ISO 8601 UTC timestamp of when the bundle was finalized."
},
"embedding_model": {
"type": "string", "required": false,
"description": "Embedding model used, if the assembler generated embeddings. E.g. 'text-embedding-3-large'. Omit if WellSpr.ing should generate embeddings during distillation."
},
"embedding_dimensions": {
"type": "integer", "required": false,
"description": "Dimension count of the embedding vectors. Required only if embedding_model is provided."
},
"chunk_strategy": {
"type": "string", "enum": ["paragraph", "section", "sentence", "fixed-token"],
"required": true,
"description": "Chunking strategy used. 'paragraph' is preferred for published medical literature."
},
"chunk_token_limit": {
"type": "integer", "required": false,
"description": "Max tokens per chunk if using 'fixed-token' strategy."
},
"artifacts": {
"type": "array", "required": true,
"description": "List of source documents included in this corpus.",
"items": {
"artifact_id": { "type": "string", "description": "Short slug, e.g. 'sniderman-apob-2010'" },
"title": { "type": "string" },
"type": { "type": "string", "enum": ["paper", "book", "protocol", "interview", "transcript", "editorial"] },
"source_url": { "type": "string", "format": "uri", "description": "Canonical DOI or URL." },
"publication_date": { "type": "string", "description": "YYYY or YYYY-MM or YYYY-MM-DD." },
"sha256": { "type": "string", "description": "SHA-256 of the raw artifact file." },
"r2_key": { "type": "string", "description": "Key within the agentify-corpus bucket. E.g. 'corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf'." }
}
},
"chunk_count": {
"type": "integer", "required": true,
"description": "Total number of chunks. The actual chunk files live at their r2_keys in the chunks/ prefix."
},
"chunks_index_r2_key": {
"type": "string", "required": true,
"description": "R2 key of the chunk index NDJSON file (one chunk JSON per line). E.g. 'corpora/sniderman/v0.1.0/chunks/index.ndjson'."
},
"embeddings_r2_key": {
"type": "string", "required": false,
"description": "Key of the embeddings NDJSON file (one {chunk_id, embedding: float[]} per line). Omit if WellSpr.ing should generate embeddings. E.g. 'corpora/sniderman/v0.1.0/embeddings.ndjson'."
},
"manifest_sha256": {
"type": "string", "required": true,
"description": "SHA-256 of this manifest file with the manifest_sha256 field set to empty string. WellSpr.ing recomputes this to verify integrity."
},
"provenance_signature": {
"type": "string", "required": false,
"description": "Optional Ed25519 signature over manifest_sha256 using Naturologie's signing key. If provided, WellSpr.ing stores it as part of the provenance chain."
}
},
"chunk_item_schema": {
"description": "Each line in chunks/index.ndjson must be a JSON object with these fields:",
"chunk_id": "string — unique within the bundle, e.g. 'sniderman-apob-2010-p003'",
"artifact_id": "string — matches an artifact_id in the artifacts array",
"text": "string — the raw chunk text",
"sha256": "string — SHA-256 of the chunk text",
"token_count": "integer — token count using the chosen embedding model's tokenizer",
"scopes": "string[] — SGS scope strings this chunk is most relevant to",
"r2_key": "string — optional, if chunk is also stored as a standalone file"
},
"example": {
"schema_version": "1.0",
"bundle_id": "550e8400-e29b-41d4-a716-446655440000",
"expert_slug": "sniderman",
"attestation_uri": "https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json",
"attestation_version": "v0.1",
"corpus_version": "0.1.0",
"assembled_by": "naturologie-replit",
"assembled_at": "2026-04-22T00:00:00Z",
"embedding_model": "text-embedding-3-large",
"embedding_dimensions": 3072,
"chunk_strategy": "paragraph",
"artifacts": [
{
"artifact_id": "sniderman-apob-2010",
"title": "Why LDL-C Should Not Be the Primary Measure for Statin Therapy",
"type": "paper",
"source_url": "https://doi.org/10.1016/j.jacc.2010.03.030",
"publication_date": "2010-06",
"sha256": "abc123...",
"r2_key": "corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf"
}
],
"chunk_count": 847,
"chunks_index_r2_key": "corpora/sniderman/v0.1.0/chunks/index.ndjson",
"embeddings_r2_key": "corpora/sniderman/v0.1.0/embeddings.ndjson",
"manifest_sha256": "def456...",
}
};
const STORAGE_CONVENTION = {
"@type": "CorpusBundleStorageConvention",
"provider": "Vultr Object Storage",
"provider_note": "S3-compatible API. Independent of AWS and Cloudflare. Same provider used for WellSpr.ing platform DB backups.",
"bucket": S3_BUCKET,
"endpoint": S3_ENDPOINT,
"hostname": S3_HOSTNAME,
"region": "ewr1",
"sdk_note": "Use any S3-compatible SDK (e.g. @aws-sdk/client-s3) with forcePathStyle: true and the Vultr endpoint.",
"credentials": {
"note": "Request VULTR_S3_ACCESS_KEY and VULTR_S3_SECRET_KEY from WellSpr.ing team. These are the same credentials used for platform backups.",
"access_key_env": "VULTR_S3_ACCESS_KEY",
"secret_key_env": "VULTR_S3_SECRET_KEY",
},
"path_convention": {
"manifest": "corpora/{expert_slug}/v{corpus_version}/manifest.json",
"artifacts": "corpora/{expert_slug}/v{corpus_version}/artifacts/{artifact_id}.{ext}",
"chunks_index": "corpora/{expert_slug}/v{corpus_version}/chunks/index.ndjson",
"embeddings": "corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson",
},
"sniderman_v0_1_0_paths": {
"manifest": "corpora/sniderman/v0.1.0/manifest.json",
"artifacts_prefix": "corpora/sniderman/v0.1.0/artifacts/",
"chunks_index": "corpora/sniderman/v0.1.0/chunks/index.ndjson",
"embeddings": "corpora/sniderman/v0.1.0/embeddings.ndjson",
},
"upload_order": [
"1. Upload all artifact files to artifacts/ prefix",
"2. Upload chunks/index.ndjson",
"3. Upload embeddings.ndjson",
"4. Write manifest.json with manifest_sha256 field computed last",
"5. Upload manifest.json",
"6. POST manifest to /api/agentify/corpus/notify"
],
"content_types": {
"manifest.json": "application/json",
"*.ndjson": "application/x-ndjson",
"*.pdf": "application/pdf",
"*.html": "text/html",
}
};
export function registerCorpusRoutes(app: Express) {
// Ensure corpus_bundles table exists
pool.query(`
CREATE TABLE IF NOT EXISTS agentify_corpus_bundles (
id SERIAL PRIMARY KEY,
bundle_id TEXT NOT NULL UNIQUE,
expert_slug TEXT NOT NULL,
corpus_version TEXT NOT NULL,
attestation_uri TEXT NOT NULL,
attestation_version TEXT NOT NULL,
assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit',
assembled_at TIMESTAMPTZ,
embedding_model TEXT,
embedding_dimensions INTEGER,
chunk_strategy TEXT,
chunk_count INTEGER,
artifact_count INTEGER,
manifest_r2_key TEXT,
manifest_sha256 TEXT,
provenance_signature TEXT,
status TEXT NOT NULL DEFAULT 'received',
distillation_notes TEXT,
received_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`).catch(err => console.error("[corpus] table init error:", err));
// ── GET /api/agentify/corpus/schema ─────────────────────────────────────────
app.get("/api/agentify/corpus/schema", (_req: Request, res: Response) => {
res.setHeader("Content-Type", "application/json");
res.setHeader("Access-Control-Allow-Origin", "*");
res.json(CORPUS_BUNDLE_SCHEMA);
});
// ── GET /api/agentify/corpus/storage ────────────────────────────────────────
// Returns storage conventions (internal use only — not exposed to assemblers).
app.get("/api/agentify/corpus/storage", (_req: Request, res: Response) => {
res.setHeader("Content-Type", "application/json");
res.setHeader("Access-Control-Allow-Origin", "*");
res.json(STORAGE_CONVENTION);
});
// ── GET /api/agentify/corpus/assembler-skill ─────────────────────────────────
// Public. Returns the canonical corpus-assembler skill document as Markdown.
// Partner Replits fetch this URL to install the skill in their agent context.
app.get("/api/agentify/corpus/assembler-skill", (_req: Request, res: Response) => {
const skillPath = path.resolve(".agents/skills/agentify-corpus/SKILL.md");
try {
const content = fs.readFileSync(skillPath, "utf-8");
res.setHeader("Content-Type", "text/markdown; charset=utf-8");
res.setHeader("Access-Control-Allow-Origin", "*");
res.setHeader("Cache-Control", "public, max-age=3600");
res.send(content);
} catch {
res.status(404).json({ error: "skill_not_found", path: skillPath });
}
});
// ── POST /api/agentify/corpus/notify ─────────────────────────────────────────
// Naturologie calls this after uploading all files to Vultr and writing manifest.json.
// Body: the full CorpusBundle manifest JSON (or a subset with the key fields).
// WellSpr.ing stores the bundle metadata, verifies the attestation_uri is reachable,
// and queues the bundle for Stage 5 distillation.
app.post("/api/agentify/corpus/notify", async (req: Request, res: Response) => {
const {
bundle_id, expert_slug, corpus_version, attestation_uri,
attestation_version, assembled_by = "naturologie-replit", assembled_at,
embedding_model, embedding_dimensions, chunk_strategy, chunk_count,
artifacts, chunks_index_r2_key, embeddings_r2_key,
manifest_sha256, provenance_signature,
} = req.body;
if (!bundle_id || !expert_slug || !corpus_version || !attestation_uri) {
return res.status(400).json({
error: "missing_required_fields",
required: ["bundle_id", "expert_slug", "corpus_version", "attestation_uri"],
});
}
// Verify the expert exists in the agentify registry
let expert: { slug: string; attestation_uri: string } | null = null;
try {
const expertRow = await pool.query(
`SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug]
);
if (!expertRow.rows.length) {
return res.status(404).json({
error: "expert_not_found",
detail: `No expert registered with slug '${expert_slug}'. Register via POST /api/agentify/experts/register first.`,
});
}
expert = expertRow.rows[0];
} catch (err) {
console.error("[corpus] expert lookup error:", err);
return res.status(500).json({ error: "database_error" });
}
// ── Deduplication gate ──────────────────────────────────────────────────
try {
const existing = await pool.query(
`SELECT bundle_id, manifest_sha256, chunk_count, assembled_by, received_at
FROM agentify_corpus_bundles WHERE expert_slug = $1`,
[expert_slug]
);
for (const row of existing.rows) {
// Exact duplicate: same manifest hash
if (manifest_sha256 && row.manifest_sha256 && manifest_sha256 === row.manifest_sha256) {
return res.status(409).json({
error: "duplicate_bundle",
detail: `A bundle with identical manifest_sha256 was already received for '${expert_slug}' on ${new Date(row.received_at).toISOString().split("T")[0]}.`,
existing_bundle_id: row.bundle_id,
});
}
// Near-duplicate: chunk_count within 2% of an existing bundle from a different assembler
if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) {
const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count;
if (ratio < 0.02) {
console.log(`[corpus] multi-source near-dup: ${assembled_by} vs ${row.assembled_by} (${chunk_count} vs ${row.chunk_count} chunks) — flagging for curation`);
// Allow but flag for review — update status after insert below
}
}
}
} catch (dedupErr) {
console.warn("[corpus] dedup check error (non-fatal):", dedupErr);
}
// Compute the manifest R2 key from convention
const manifestR2Key = `corpora/${expert_slug}/v${corpus_version}/manifest.json`;
const artifactCount = Array.isArray(artifacts) ? artifacts.length : null;
// Detect multi-source near-dup (different assembler, similar chunk count)
let flaggedForCuration = false;
try {
const existing = await pool.query(
`SELECT assembled_by, chunk_count FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expert_slug]
);
for (const row of existing.rows) {
if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) {
const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count;
if (ratio < 0.02) { flaggedForCuration = true; break; }
}
}
} catch { /* non-fatal */ }
const insertStatus = flaggedForCuration ? "pending_review" : "received";
try {
await pool.query(
`INSERT INTO agentify_corpus_bundles
(bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version,
assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy,
chunk_count, artifact_count, manifest_r2_key, manifest_sha256, provenance_signature,
chunks_s3_key, status)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17)
ON CONFLICT (bundle_id) DO UPDATE SET
corpus_version = EXCLUDED.corpus_version,
assembled_at = EXCLUDED.assembled_at,
embedding_model = EXCLUDED.embedding_model,
embedding_dimensions= EXCLUDED.embedding_dimensions,
chunk_strategy = EXCLUDED.chunk_strategy,
chunk_count = EXCLUDED.chunk_count,
artifact_count = EXCLUDED.artifact_count,
manifest_r2_key = EXCLUDED.manifest_r2_key,
manifest_sha256 = EXCLUDED.manifest_sha256,
provenance_signature= EXCLUDED.provenance_signature,
chunks_s3_key = EXCLUDED.chunks_s3_key,
status = $17,
updated_at = NOW()`,
[
bundle_id, expert_slug, corpus_version, attestation_uri,
attestation_version || "v0.1", assembled_by,
assembled_at ? new Date(assembled_at) : new Date(),
embedding_model || null, embedding_dimensions || null,
chunk_strategy || null, chunk_count || null, artifactCount,
manifestR2Key, manifest_sha256 || null, provenance_signature || null,
chunks_index_r2_key || null, insertStatus,
]
);
} catch (dbErr) {
console.error("[corpus] bundle insert error:", dbErr);
return res.status(500).json({ error: "database_error" });
}
console.log(`[corpus] ${insertStatus} bundle ${bundle_id} for ${expert_slug} v${corpus_version} (${chunk_count} chunks, ${artifactCount} artifacts)${flaggedForCuration ? " — flagged for curation (multi-source near-dup)" : ""}`);
res.status(201).json({
"@type": "CorpusBundleReceived",
"bundle_id": bundle_id,
"expert_slug": expert_slug,
"corpus_version": corpus_version,
"manifest_r2_key": manifestR2Key,
"status": insertStatus,
"received_at": new Date().toISOString(),
"next_step": flaggedForCuration
? "Bundle flagged for curation — a near-duplicate from another assembler exists. WellSpr.ing will review before distillation."
: "WellSpr.ing will verify manifest integrity and queue for Stage 5 framework distillation.",
"status_endpoint": `https://wellspr.ing/api/agentify/experts/${expert_slug}/status`,
"attestation_verified_against": expert.attestation_uri,
"note": chunk_count
? `Bundle logged: ${chunk_count} chunks${artifactCount ? `, ${artifactCount} artifacts` : ''}. ${flaggedForCuration ? "Pending curation review." : "Awaiting distillation."}`
: "Bundle logged. Include chunk_count and artifact_count in future submissions for full tracking.",
});
});
// ── GET /api/agentify/corpus/:expertSlug ────────────────────────────────────
app.get("/api/agentify/corpus/:expertSlug", async (req: Request, res: Response) => {
try {
const rows = await pool.query(
`SELECT bundle_id, expert_slug, corpus_version, attestation_version,
assembled_by, assembled_at, embedding_model, embedding_dimensions,
chunk_strategy, chunk_count, artifact_count,
manifest_r2_key, manifest_sha256, status, received_at, updated_at, distillation_notes
FROM agentify_corpus_bundles
WHERE expert_slug = $1
ORDER BY received_at DESC`,
[req.params.expertSlug]
);
res.setHeader("Access-Control-Allow-Origin", "*");
res.json({
expert_slug: req.params.expertSlug,
bundle_count: rows.rows.length,
bundles: rows.rows,
r2_bucket: R2_BUCKET,
});
} catch (err) {
console.error("[corpus] list error:", err);
res.status(500).json({ error: "internal_error" });
}
});
// ── GET /api/agentify/corpus/:expertSlug/:bundleId ──────────────────────────
app.get("/api/agentify/corpus/:expertSlug/:bundleId", async (req: Request, res: Response) => {
try {
const rows = await pool.query(
`SELECT * FROM agentify_corpus_bundles
WHERE expert_slug = $1 AND bundle_id = $2`,
[req.params.expertSlug, req.params.bundleId]
);
if (!rows.rows.length) return res.status(404).json({ error: "bundle_not_found" });
const b = rows.rows[0];
res.setHeader("Access-Control-Allow-Origin", "*");
res.json({
...b,
r2_bucket: R2_BUCKET,
r2_endpoint: R2_ENDPOINT,
manifest_url: `${R2_ENDPOINT}/${R2_BUCKET}/${b.manifest_r2_key}`,
});
} catch (err) {
console.error("[corpus] get bundle error:", err);
res.status(500).json({ error: "internal_error" });
}
});
// ── PATCH /api/agentify/corpus/:bundleId/status ─────────────────────────────
// Admin-only. Used by WellSpr.ing team to update distillation status.
// status values: received | verifying | distilling | ready | failed
app.patch("/api/agentify/corpus/:bundleId/status", async (req: Request, res: Response) => {
const adminKey = req.headers["x-admin-key"] || req.body?.admin_key;
if (adminKey !== ADMIN_KEY) return res.status(403).json({ error: "forbidden" });
const { status, distillation_notes } = req.body;
const validStatuses = ["received", "verifying", "distilling", "ready", "failed"];
if (!status || !validStatuses.includes(status)) {
return res.status(400).json({ error: "invalid_status", valid: validStatuses });
}
try {
const result = await pool.query(
`UPDATE agentify_corpus_bundles
SET status = $1, distillation_notes = $2, updated_at = NOW()
WHERE bundle_id = $3
RETURNING bundle_id, expert_slug, corpus_version, status, updated_at`,
[status, distillation_notes || null, req.params.bundleId]
);
if (!result.rows.length) return res.status(404).json({ error: "bundle_not_found" });
res.json(result.rows[0]);
} catch (err) {
console.error("[corpus] status update error:", err);
res.status(500).json({ error: "internal_error" });
}
});
// ── GET /api/agentify/corpus/staging/scan/:expertSlug ────────────────────────
// Partner-token-gated. Lists ALL manifests found in S3 staging for this expert,
// cross-references with DB, and tells the caller which are registered vs orphaned.
// Orphaned = data in S3 staging but no DB record → can be recovered via /import.
app.get("/api/agentify/corpus/staging/scan/:expertSlug", async (req: Request, res: Response) => {
const token = (req.headers["x-partner-token"] as string) || req.query.token as string;
const adminKey = req.headers["x-admin-key"] as string;
const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token);
if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" });
const { expertSlug } = req.params;
try {
// List all manifests in staging for this expert
const listed = await s3.send(new ListObjectsV2Command({
Bucket: S3_BUCKET,
Prefix: `staging/${expertSlug}/`,
}));
const manifests = (listed.Contents || [])
.filter(o => o.Key?.endsWith("manifest.json"))
.sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0));
if (!manifests.length) {
return res.json({ expert_slug: expertSlug, staging_bundles: [], orphaned_count: 0, message: "No staging manifests found in S3 for this expert." });
}
// Load each manifest and cross-reference with DB
const dbBundles = await pool.query(
`SELECT bundle_id, manifest_r2_key, status FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expertSlug]
);
const dbKeys = new Set(dbBundles.rows.map((r: any) => r.manifest_r2_key));
const bundles = await Promise.all(manifests.map(async (obj) => {
let manifest: any = null;
try {
const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: obj.Key! }));
manifest = JSON.parse(await (raw.Body as any).transformToString());
} catch { /* manifest unreadable */ }
const inDb = dbKeys.has(obj.Key);
const dbRow = dbBundles.rows.find((r: any) => r.manifest_r2_key === obj.Key);
return {
s3_key: obj.Key,
s3_size_bytes: obj.Size,
s3_last_modified: obj.LastModified,
session_uuid: obj.Key?.split("/")[3] || null,
bundle_id: manifest?.bundle_id || null,
corpus_version: manifest?.corpus_version || null,
chunk_count: manifest?.chunk_count || null,
artifact_count: manifest?.artifacts?.length || null,
assembled_by: manifest?.assembled_by || null,
assembled_at: manifest?.assembled_at || null,
in_db: inDb,
db_status: dbRow?.status || null,
status: inDb ? `registered (${dbRow?.status})` : "ORPHANED — not in DB, needs import",
recovery_action: inDb ? null : `POST /api/agentify/corpus/staging/import/${expertSlug} with { "manifest_s3_key": "${obj.Key}" }`,
};
}));
const orphaned = bundles.filter(b => !b.in_db);
res.setHeader("Access-Control-Allow-Origin", "*");
res.json({
expert_slug: expertSlug,
staging_bundles: bundles,
total_count: bundles.length,
orphaned_count: orphaned.length,
registered_count: bundles.length - orphaned.length,
message: orphaned.length > 0
? `${orphaned.length} bundle(s) are in S3 staging but not registered in DB. Call POST /api/agentify/corpus/staging/import/${expertSlug} to recover the most recent one.`
: "All staging bundles are registered in DB.",
});
} catch (err: any) {
console.error("[corpus] staging scan error:", err);
res.status(500).json({ error: "internal_error", detail: err.message });
}
});
// ── POST /api/agentify/corpus/staging/import/:expertSlug ─────────────────────
// Partner-token-gated. Reads a staging manifest from S3 and upserts it into DB.
// Idempotent — safe to call multiple times. Picks most recent manifest if no
// specific manifest_s3_key is provided. Used to recover after a silent finalize failure.
app.post("/api/agentify/corpus/staging/import/:expertSlug", async (req: Request, res: Response) => {
const token = (req.headers["x-partner-token"] as string) || req.body?.partner_token;
const adminKey = req.headers["x-admin-key"] as string;
const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token);
if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" });
const { expertSlug } = req.params;
let { manifest_s3_key } = req.body || {};
try {
// Auto-discover most recent manifest if not provided
if (!manifest_s3_key) {
const listed = await s3.send(new ListObjectsV2Command({
Bucket: S3_BUCKET, Prefix: `staging/${expertSlug}/`,
}));
const latest = (listed.Contents || [])
.filter(o => o.Key?.endsWith("manifest.json"))
.sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0))[0];
if (!latest?.Key) {
return res.status(404).json({ error: "no_staging_manifest_found", expert_slug: expertSlug });
}
manifest_s3_key = latest.Key;
}
// Read manifest
let manifest: any;
try {
const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: manifest_s3_key }));
manifest = JSON.parse(await (raw.Body as any).transformToString());
} catch (e: any) {
return res.status(422).json({ error: "manifest_unreadable", detail: e.message, s3_key: manifest_s3_key });
}
// Discover chunks key
let chunksKey = manifest.chunks_index_s3_key || manifest.chunks_index_r2_key || null;
if (!chunksKey) {
const prefix = manifest_s3_key.replace("manifest.json", "chunks/");
const listed = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix }));
chunksKey = listed.Contents?.find(o => o.Key?.endsWith("index.ndjson"))?.Key || null;
}
const bundleId = manifest.bundle_id || crypto.randomUUID();
const corpusVersion = manifest.corpus_version || "1.0.0";
const artifactCount = Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null;
// Upsert into DB — idempotent on bundle_id
await pool.query(
`INSERT INTO agentify_corpus_bundles
(bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version,
assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy,
chunk_count, artifact_count, manifest_r2_key, manifest_sha256, chunks_s3_key, status)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,'received')
ON CONFLICT (bundle_id) DO UPDATE SET
corpus_version = EXCLUDED.corpus_version,
assembled_at = EXCLUDED.assembled_at,
chunk_count = EXCLUDED.chunk_count,
artifact_count = EXCLUDED.artifact_count,
manifest_r2_key = EXCLUDED.manifest_r2_key,
manifest_sha256 = EXCLUDED.manifest_sha256,
chunks_s3_key = EXCLUDED.chunks_s3_key,
updated_at = NOW()`,
[
bundleId, expertSlug, corpusVersion,
manifest.attestation_uri || `https://wellspr.ing/vault/agents/${expertSlug}/attestation-v1.0.json`,
manifest.attestation_version || "v1.0",
manifest.assembled_by || "partner",
manifest.assembled_at ? new Date(manifest.assembled_at) : new Date(),
manifest.embedding_model || null,
manifest.embedding_dimensions || null,
manifest.chunk_strategy || null,
manifest.chunk_count || null,
artifactCount,
manifest_s3_key,
manifest.manifest_sha256 || null,
chunksKey,
]
);
console.log(`[corpus] staging import: ${bundleId} for ${expertSlug} v${corpusVersion} — recovered from ${manifest_s3_key}`);
res.status(200).json({
"@type": "CorpusBundleImported",
bundle_id: bundleId,
expert_slug: expertSlug,
corpus_version: corpusVersion,
manifest_s3_key,
chunks_s3_key: chunksKey,
chunk_count: manifest.chunk_count || null,
artifact_count: artifactCount,
status: "received",
message: "Bundle imported from S3 staging into DB. Next: POST /api/agentify/experts/:slug/distill to embed chunks.",
distill_endpoint: `/api/agentify/experts/${expertSlug}/distill`,
});
} catch (err: any) {
console.error("[corpus] staging import error:", err);
res.status(500).json({ error: "internal_error", detail: err.message });
}
});
// ── Corpus Intake Proxy ───────────────────────────────────────────────────────
// Naturologie uploads files HERE — WellSpr.ing validates and routes to Vultr.
// No S3 credentials needed by Naturologie.
const s3 = new S3Client({
region: "ewr1",
endpoint: S3_ENDPOINT,
credentials: {
accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "",
secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "",
},
forcePathStyle: true,
});
// Ensure the corpus bucket exists on Vultr — create it if missing.
// NOTE: Vultr S3-compatible API doesn't accept a LocationConstraint in CreateBucket.
// When SDK region is non-us-east-1 it auto-adds LocationConstraint, which Vultr rejects.
// Solution: use a us-east-1 client for bucket creation only (endpoint still points to Vultr).
const s3BucketAdmin = new S3Client({
region: "us-east-1", // prevents SDK from adding a LocationConstraint
endpoint: S3_ENDPOINT,
credentials: {
accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "",
secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "",
},
forcePathStyle: true,
});
(async () => {
if (!process.env.VULTR_S3_ACCESS_KEY) {
console.warn("[corpus] VULTR_S3_ACCESS_KEY not set — S3 uploads will fail");
return;
}
try {
await s3BucketAdmin.send(new HeadBucketCommand({ Bucket: S3_BUCKET }));
console.log(`[corpus] S3 bucket '${S3_BUCKET}' confirmed`);
} catch (err: any) {
const statusCode = err?.$metadata?.httpStatusCode || err?.statusCode;
const code = err?.name || err?.Code || err?.code || "";
// 301 = wrong region/redirect, 403 = exists but no perms, 404/NoSuchBucket = truly missing
if (statusCode === 404 || code === "NotFound" || code === "NoSuchBucket" || code === "404") {
try {
await s3BucketAdmin.send(new CreateBucketCommand({ Bucket: S3_BUCKET }));
console.log(`[corpus] S3 bucket '${S3_BUCKET}' created successfully`);
} catch (createErr: any) {
console.error(`[corpus] Failed to create S3 bucket '${S3_BUCKET}':`, createErr?.message || createErr);
}
} else if (statusCode === 403) {
// Bucket exists, we just don't have ListBucket permission — that's fine
console.log(`[corpus] S3 bucket '${S3_BUCKET}' exists (403 on HEAD = access controlled but present)`);
} else {
console.error(`[corpus] S3 bucket check failed (code=${code}, status=${statusCode}):`, err?.message || err);
}
}
})();
const upload = multer({
storage: multer.memoryStorage(),
limits: { fileSize: 200 * 1024 * 1024 }, // 200MB max per file
});
// Ensure intake_sessions table exists
pool.query(`
CREATE TABLE IF NOT EXISTS corpus_intake_sessions (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL UNIQUE,
expert_slug TEXT NOT NULL,
corpus_version TEXT NOT NULL,
assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit',
manifest_validated BOOLEAN DEFAULT FALSE,
files_uploaded JSONB DEFAULT '[]',
validation_errors JSONB DEFAULT '[]',
status TEXT NOT NULL DEFAULT 'uploading',
bundle_id TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`).catch(err => console.error("[corpus-intake] table init error:", err));
// ── POST /api/agentify/corpus/intake/start ────────────────────────────────────
// Naturologie calls this first to open an upload session.
// Returns a session_id to use in subsequent upload calls.
app.post("/api/agentify/corpus/intake/start", async (req: Request, res: Response) => {
try {
const { expert_slug, corpus_version, assembled_by = "naturologie-replit" } = req.body;
if (!expert_slug || !corpus_version)
return res.status(400).json({ error: "expert_slug and corpus_version are required" });
// Verify expert exists
const expertRow = await pool.query(
`SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug]
);
if (!expertRow.rows.length)
return res.status(404).json({ error: "expert_not_found", detail: `No expert with slug '${expert_slug}'` });
const sessionId = crypto.randomBytes(16).toString("hex");
const stagingPrefix = `staging/${expert_slug}/v${corpus_version}/${sessionId}`;
await pool.query(
`INSERT INTO corpus_intake_sessions
(session_id, expert_slug, corpus_version, assembled_by, status)
VALUES ($1,$2,$3,$4,'uploading')`,
[sessionId, expert_slug, corpus_version, assembled_by]
);
return res.status(201).json({
"@type": "CorpusIntakeSession",
session_id: sessionId,
expert_slug,
corpus_version,
assembled_by,
staging_prefix: stagingPrefix,
status: "uploading",
upload_endpoints: {
manifest: `POST /api/agentify/corpus/intake/${sessionId}/manifest`,
artifact: `POST /api/agentify/corpus/intake/${sessionId}/artifact (multipart field: file)`,
chunks: `POST /api/agentify/corpus/intake/${sessionId}/chunks (multipart field: file)`,
embeddings: `POST /api/agentify/corpus/intake/${sessionId}/embeddings (multipart field: file)`,
finalize: `POST /api/agentify/corpus/intake/${sessionId}/finalize`,
status: `GET /api/agentify/corpus/intake/${sessionId}`,
},
schema_reference: "https://wellspr.ing/api/agentify/corpus/schema",
note: "Upload manifest last — it is validated against the files that have been staged.",
});
} catch (err: any) {
console.error("[corpus-intake] start error:", err?.message || err);
return res.status(500).json({ error: "intake_start_failed", message: err?.message || "UnknownError" });
}
});
// ── GET /api/agentify/corpus/intake/:sessionId ───────────────────────────────
app.get("/api/agentify/corpus/intake/:sessionId", async (req: Request, res: Response) => {
const row = await pool.query(
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1`, [req.params.sessionId]
);
if (!row.rows.length) return res.status(404).json({ error: "session_not_found" });
res.json(row.rows[0]);
});
async function uploadToVultr(key: string, buffer: Buffer, contentType: string): Promise<string> {
const uploader = new Upload({
client: s3,
params: {
Bucket: S3_BUCKET,
Key: key,
Body: buffer,
ContentType: contentType,
},
});
await uploader.done();
return key;
}
async function recordFileUploaded(sessionId: string, fileRecord: object) {
await pool.query(
`UPDATE corpus_intake_sessions
SET files_uploaded = files_uploaded || $1::jsonb, updated_at = NOW()
WHERE session_id = $2`,
[JSON.stringify([fileRecord]), sessionId]
);
}
// ── POST /api/agentify/corpus/intake/:sessionId/artifact ─────────────────────
app.post("/api/agentify/corpus/intake/:sessionId/artifact",
upload.single("file"),
async (req: Request, res: Response) => {
try {
const { sessionId } = req.params;
const { artifact_id, title, type: artifactType } = req.body;
const sessionRow = await pool.query(
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
);
if (!sessionRow.rows.length)
return res.status(404).json({ error: "session_not_found_or_not_active" });
const s = sessionRow.rows[0];
if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" });
if (!artifact_id) return res.status(400).json({ error: "artifact_id_required" });
const ext = req.file.originalname.split(".").pop() || "bin";
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/artifacts/${artifact_id}.${ext}`;
const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex");
await uploadToVultr(key, req.file.buffer, req.file.mimetype);
await recordFileUploaded(sessionId, {
type: "artifact", artifact_id, title, artifact_type: artifactType,
key, sha256, size: req.file.size, uploaded_at: new Date().toISOString(),
});
return res.json({ artifact_id, key, sha256, size: req.file.size, status: "staged" });
} catch (err: any) {
console.error("[corpus-intake] artifact upload error:", err?.message || err);
return res.status(500).json({ error: "artifact_upload_failed", message: err?.message || "UnknownError" });
}
}
);
// ── POST /api/agentify/corpus/intake/:sessionId/chunks ───────────────────────
// Accepts both multipart/form-data (field: "file") and raw NDJSON/octet-stream body.
app.post("/api/agentify/corpus/intake/:sessionId/chunks",
(req: Request, res: Response, next: any) => {
const ct = (req.headers["content-type"] || "").toLowerCase();
if (ct.includes("multipart/form-data")) {
upload.single("file")(req as any, res as any, next);
} else {
// Buffer raw body for application/x-ndjson, octet-stream, text/* etc.
const chunks: Buffer[] = [];
req.on("data", (chunk: Buffer) => chunks.push(chunk));
req.on("end", () => { (req as any)._rawBody = Buffer.concat(chunks); next(); });
req.on("error", next);
}
},
async (req: Request, res: Response) => {
const { sessionId } = req.params;
try {
const sessionRow = await pool.query(
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
);
if (!sessionRow.rows.length)
return res.status(404).json({ error: "session_not_found_or_not_active" });
const s = sessionRow.rows[0];
// Resolve buffer: multipart file OR raw body
const buffer: Buffer | undefined = req.file?.buffer ?? (req as any)._rawBody;
if (!buffer || buffer.length === 0)
return res.status(400).json({ error: "no_file_uploaded", hint: "Send as multipart/form-data field 'file' or raw NDJSON body" });
// Basic validation: must be valid NDJSON
const text = buffer.toString("utf-8");
const lines = text.trim().split("\n").filter(l => l.trim());
const errors: string[] = [];
for (const [i, line] of lines.slice(0, 5).entries()) {
try {
const obj = JSON.parse(line);
if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`);
if (!obj.text) errors.push(`Line ${i+1}: missing text`);
if (!obj.artifact_id) errors.push(`Line ${i+1}: missing artifact_id`);
} catch { errors.push(`Line ${i+1}: invalid JSON`); }
}
if (errors.length) return res.status(400).json({ error: "invalid_chunks_ndjson", errors });
const chunkCount = lines.length;
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/chunks/index.ndjson`;
const sha256 = crypto.createHash("sha256").update(buffer).digest("hex");
await uploadToVultr(key, buffer, "application/x-ndjson");
await recordFileUploaded(sessionId, {
type: "chunks", key, sha256, chunk_count: chunkCount, size: buffer.length,
uploaded_at: new Date().toISOString(),
});
return res.json({ key, sha256, chunk_count: chunkCount, size: buffer.length, status: "staged" });
} catch (err: any) {
console.error("[CorpusChunks] Upload error:", err?.message || err);
return res.status(500).json({ error: "upload_failed", message: err?.message || "UnknownError" });
}
}
);
// ── POST /api/agentify/corpus/intake/:sessionId/embeddings ───────────────────
app.post("/api/agentify/corpus/intake/:sessionId/embeddings",
upload.single("file"),
async (req: Request, res: Response) => {
try {
const { sessionId } = req.params;
const sessionRow = await pool.query(
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
);
if (!sessionRow.rows.length)
return res.status(404).json({ error: "session_not_found_or_not_active" });
const s = sessionRow.rows[0];
if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" });
const text = req.file.buffer.toString("utf-8");
const lines = text.trim().split("\n").filter(l => l.trim());
const errors: string[] = [];
for (const [i, line] of lines.slice(0, 3).entries()) {
try {
const obj = JSON.parse(line);
if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`);
if (!Array.isArray(obj.embedding)) errors.push(`Line ${i+1}: embedding must be an array`);
} catch { errors.push(`Line ${i+1}: invalid JSON`); }
}
if (errors.length) return res.status(400).json({ error: "invalid_embeddings_ndjson", errors });
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/embeddings.ndjson`;
const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex");
await uploadToVultr(key, req.file.buffer, "application/x-ndjson");
await recordFileUploaded(sessionId, {
type: "embeddings", key, sha256, embedding_count: lines.length, size: req.file.size,
uploaded_at: new Date().toISOString(),
});
return res.json({ key, sha256, embedding_count: lines.length, size: req.file.size, status: "staged" });
} catch (err: any) {
console.error("[corpus-intake] embeddings upload error:", err?.message || err);
return res.status(500).json({ error: "embeddings_upload_failed", message: err?.message || "UnknownError" });
}
}
);
// ── POST /api/agentify/corpus/intake/:sessionId/manifest ─────────────────────
// Upload and validate the manifest JSON. Must be uploaded AFTER artifacts,
// chunks, and embeddings so validation can cross-reference them.
app.post("/api/agentify/corpus/intake/:sessionId/manifest",
upload.single("file"),
async (req: Request, res: Response) => {
try {
const { sessionId } = req.params;
const sessionRow = await pool.query(
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
);
if (!sessionRow.rows.length)
return res.status(404).json({ error: "session_not_found_or_not_active" });
const s = sessionRow.rows[0];
let manifest: Record<string, unknown>;
try {
const body = req.file ? req.file.buffer.toString("utf-8") : JSON.stringify(req.body);
manifest = JSON.parse(body);
} catch {
return res.status(400).json({ error: "invalid_json", detail: "Manifest must be valid JSON" });
}
// Validate required fields.
// embedding_model, embedding_dimensions, and embeddings_r2_key are OPTIONAL —
// WellSpr.ing generates embeddings during distillation if not supplied by the assembler.
const required = ["schema_version","bundle_id","expert_slug","attestation_uri",
"corpus_version","assembled_by","assembled_at",
"chunk_strategy","artifacts","chunk_count",
"chunks_index_r2_key","manifest_sha256"];
const missing = required.filter(f => !(f in manifest));
if (missing.length)
return res.status(400).json({ error: "missing_required_fields", missing });
if (manifest.expert_slug !== s.expert_slug)
return res.status(400).json({ error: "expert_slug_mismatch",
detail: `Manifest says '${manifest.expert_slug}', session is for '${s.expert_slug}'` });
if (manifest.corpus_version !== s.corpus_version)
return res.status(400).json({ error: "corpus_version_mismatch" });
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/manifest.json`;
const sha256 = crypto.createHash("sha256").update(JSON.stringify(manifest)).digest("hex");
await uploadToVultr(key, Buffer.from(JSON.stringify(manifest, null, 2)), "application/json");
await pool.query(
`UPDATE corpus_intake_sessions
SET manifest_validated = TRUE,
files_uploaded = files_uploaded || $1::jsonb,
updated_at = NOW()
WHERE session_id = $2`,
[JSON.stringify([{ type: "manifest", key, sha256, uploaded_at: new Date().toISOString() }]), sessionId]
);
return res.json({
key, sha256, status: "staged",
bundle_id: manifest.bundle_id,
chunk_count: manifest.chunk_count,
artifact_count: Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null,
note: "Manifest validated and staged. Call finalize to commit the bundle.",
});
} catch (err: any) {
console.error("[corpus-intake] manifest upload error:", err?.message || err);
return res.status(500).json({ error: "manifest_upload_failed", message: err?.message || "UnknownError" });
}
}
);
// ── POST /api/agentify/corpus/intake/:sessionId/finalize ─────────────────────
// Validates the full bundle, moves files from staging/ to corpora/, registers
// the CorpusBundle in the database, and returns a validation report.
app.post("/api/agentify/corpus/intake/:sessionId/finalize", async (req: Request, res: Response) => {
const { sessionId } = req.params;
const sessionRow = await pool.query(
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
);
if (!sessionRow.rows.length)
return res.status(404).json({ error: "session_not_found_or_not_active" });
const s = sessionRow.rows[0];
const files: any[] = s.files_uploaded || [];
const errors: string[] = [];
if (!s.manifest_validated) errors.push("Manifest has not been uploaded and validated");
if (!files.find(f => f.type === "chunks")) errors.push("Chunks NDJSON has not been uploaded");
// Embeddings are optional — WellSpr.ing generates them during distillation if not provided.
// Artifact files are optional — PubMed/web abstract batches are chunks-only (no raw PDFs).
const artifactFiles = files.filter(f => f.type === "artifact");
if (errors.length) {
return res.status(422).json({
error: "bundle_incomplete",
errors,
uploaded_types: [...new Set(files.map(f => f.type))],
});
}
// Move files from staging → corpora (copy keys, S3 doesn't have move — use copy pattern)
const finalPrefix = `corpora/${s.expert_slug}/v${s.corpus_version}`;
const bundleId = s.bundle_id || crypto.randomUUID();
const expertRow = await pool.query(
`SELECT attestation_uri, attestation_version FROM agentify_experts WHERE slug = $1`, [s.expert_slug]
);
const expert = expertRow.rows[0];
try {
await pool.query(
`INSERT INTO agentify_corpus_bundles
(bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version,
assembled_by, assembled_at, chunk_count, artifact_count,
manifest_r2_key, status)
VALUES ($1,$2,$3,$4,$5,$6,NOW(),$7,$8,$9,'received')
ON CONFLICT (bundle_id) DO UPDATE SET
status = 'received', updated_at = NOW()`,
[
bundleId, s.expert_slug, s.corpus_version,
expert?.attestation_uri || "",
expert?.attestation_version || "v0.1",
s.assembled_by,
files.find(f => f.type === "chunks")?.chunk_count || null,
artifactFiles.length,
`${finalPrefix}/manifest.json`,
]
);
await pool.query(
`UPDATE corpus_intake_sessions
SET status = 'finalized', bundle_id = $1, updated_at = NOW()
WHERE session_id = $2`,
[bundleId, sessionId]
);
} catch (dbErr: any) {
const detail = dbErr?.message || String(dbErr);
console.error("[corpus-intake] finalize db error:", detail, dbErr?.detail);
return res.status(500).json({ error: "database_error", detail, hint: dbErr?.hint });
}
console.log(`[corpus-intake] finalized bundle ${bundleId} for ${s.expert_slug} v${s.corpus_version}`);
res.status(201).json({
"@type": "CorpusBundleFinalized",
bundle_id: bundleId,
expert_slug: s.expert_slug,
corpus_version: s.corpus_version,
status: "received",
staging_prefix: `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}`,
final_prefix: finalPrefix,
artifact_count: artifactFiles.length,
chunk_count: files.find(f => f.type === "chunks")?.chunk_count || null,
embedding_count: files.find(f => f.type === "embeddings")?.embedding_count || null,
next_step: "WellSpr.ing will verify the bundle and queue it for Stage 5 framework distillation.",
status_endpoint: `https://wellspr.ing/api/agentify/corpus/${s.expert_slug}/${bundleId}`,
note: "The bundle has been received and validated. WellSpr.ing will contact you when distillation begins.",
});
});
console.log("[corpus] routes registered");
}