1158 lines
55 KiB
TypeScript
1158 lines
55 KiB
TypeScript
/**
|
|
* Agentify Corpus Pipeline — WellSpr.ing ingestion side
|
|
*
|
|
* Naturologie Replit assembles and chunks the expert's corpus, uploads artifacts
|
|
* and embeddings to Vultr Object Storage (S3-compatible), writes a signed
|
|
* CorpusBundle manifest JSON, then calls POST /api/agentify/corpus/notify to
|
|
* hand off to WellSpr.ing.
|
|
*
|
|
* Storage: Vultr Object Storage (ewr1.vultrobjects.com) — same provider already
|
|
* used for platform DB backups. S3-compatible API, independent of AWS and Cloudflare.
|
|
*
|
|
* Public endpoints:
|
|
* GET /api/agentify/corpus/schema — CorpusBundle JSON schema + field docs
|
|
* GET /api/agentify/corpus/storage — storage convention for Naturologie
|
|
* GET /api/agentify/corpus/:expertSlug — list bundles for an expert
|
|
* GET /api/agentify/corpus/:expertSlug/:bundleId — single bundle detail
|
|
*
|
|
* Naturologie-facing endpoints:
|
|
* POST /api/agentify/corpus/notify — notify WellSpr.ing that a bundle is ready
|
|
* PATCH /api/agentify/corpus/:bundleId/status — admin-only status update
|
|
*
|
|
* Storage layout (Vultr Object Storage):
|
|
* Bucket: agentify-corpus (separate from wellspring-backups)
|
|
* corpora/{expert_slug}/v{corpus_version}/manifest.json ← CorpusBundle manifest
|
|
* corpora/{expert_slug}/v{corpus_version}/artifacts/ ← raw PDFs / HTML
|
|
* corpora/{expert_slug}/v{corpus_version}/chunks/ ← chunk NDJSON index
|
|
* corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson ← one embedding per line
|
|
*/
|
|
|
|
import type { Express, Request, Response } from "express";
|
|
import { pool } from "../db";
|
|
import crypto from "crypto";
|
|
import fs from "fs";
|
|
import path from "path";
|
|
import multer from "multer";
|
|
import { S3Client, PutObjectCommand, HeadObjectCommand, CreateBucketCommand, HeadBucketCommand, GetObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3";
|
|
import { Upload } from "@aws-sdk/lib-storage";
|
|
|
|
const ADMIN_KEY = process.env.ADMIN_KEY || "b0db7a87384fc814b0f46ea7bdc6ab6a81152be5b098718b";
|
|
|
|
// Vultr Object Storage — S3-compatible, same provider as platform DB backups
|
|
const S3_HOSTNAME = process.env.VULTR_S3_HOSTNAME || "ewr1.vultrobjects.com";
|
|
const S3_BUCKET = process.env.CORPUS_S3_BUCKET || "agentify-corpus";
|
|
const S3_ENDPOINT = `https://${S3_HOSTNAME}`;
|
|
|
|
// Keep the old names as aliases so existing code still compiles
|
|
const R2_BUCKET = S3_BUCKET;
|
|
const R2_ENDPOINT = S3_ENDPOINT;
|
|
|
|
// ── Partner token validation ───────────────────────────────────────────────────
|
|
async function isPartnerToken(token: string | undefined): Promise<boolean> {
|
|
if (!token) return false;
|
|
try {
|
|
const r = await pool.query(
|
|
`SELECT 1 FROM partner_tokens WHERE token = $1 AND is_active = TRUE LIMIT 1`, [token]
|
|
);
|
|
return r.rows.length > 0;
|
|
} catch { return false; }
|
|
}
|
|
|
|
// ── CorpusBundle schema definition (authoritative) ────────────────────────────
|
|
const CORPUS_BUNDLE_SCHEMA = {
|
|
"$schema": "https://wellspr.ing/protocols/agentify/corpus-bundle/v1.0",
|
|
"@type": "CorpusBundleSchema",
|
|
"schema_version": "1.0",
|
|
"description": "CorpusBundle is the handoff artifact Naturologie writes to R2 and notifies WellSpr.ing about. WellSpr.ing reads this manifest to verify integrity, then runs Stage 5 framework distillation.",
|
|
|
|
"fields": {
|
|
"schema_version": { "type": "string", "const": "1.0", "required": true },
|
|
"bundle_id": {
|
|
"type": "string", "format": "uuid", "required": true,
|
|
"description": "UUIDv4 — unique per ingestion run. If re-ingesting, use a new UUID."
|
|
},
|
|
"expert_slug": {
|
|
"type": "string", "required": true,
|
|
"description": "Matches the slug in wellspr.ing/api/agentify/experts/:slug. For Sniderman: 'sniderman'."
|
|
},
|
|
"attestation_uri": {
|
|
"type": "string", "format": "uri", "required": true,
|
|
"description": "Canonical attestation URI. For Sniderman v0.1: 'https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json'. WellSpr.ing will verify this URL is reachable and signature is valid."
|
|
},
|
|
"attestation_version": {
|
|
"type": "string", "required": true,
|
|
"description": "Version string matching the attestation URI. E.g. 'v0.1'."
|
|
},
|
|
"corpus_version": {
|
|
"type": "string", "required": true,
|
|
"description": "Semver-style corpus version. First ingestion: '0.1.0'. After expert corrections mint attestation v0.2, increment to '0.2.0'."
|
|
},
|
|
"assembled_by": {
|
|
"type": "string", "required": true,
|
|
"description": "Assembling system identifier. Use 'naturologie-replit'."
|
|
},
|
|
"assembled_at": {
|
|
"type": "string", "format": "iso8601", "required": true,
|
|
"description": "ISO 8601 UTC timestamp of when the bundle was finalized."
|
|
},
|
|
"embedding_model": {
|
|
"type": "string", "required": false,
|
|
"description": "Embedding model used, if the assembler generated embeddings. E.g. 'text-embedding-3-large'. Omit if WellSpr.ing should generate embeddings during distillation."
|
|
},
|
|
"embedding_dimensions": {
|
|
"type": "integer", "required": false,
|
|
"description": "Dimension count of the embedding vectors. Required only if embedding_model is provided."
|
|
},
|
|
"chunk_strategy": {
|
|
"type": "string", "enum": ["paragraph", "section", "sentence", "fixed-token"],
|
|
"required": true,
|
|
"description": "Chunking strategy used. 'paragraph' is preferred for published medical literature."
|
|
},
|
|
"chunk_token_limit": {
|
|
"type": "integer", "required": false,
|
|
"description": "Max tokens per chunk if using 'fixed-token' strategy."
|
|
},
|
|
"artifacts": {
|
|
"type": "array", "required": true,
|
|
"description": "List of source documents included in this corpus.",
|
|
"items": {
|
|
"artifact_id": { "type": "string", "description": "Short slug, e.g. 'sniderman-apob-2010'" },
|
|
"title": { "type": "string" },
|
|
"type": { "type": "string", "enum": ["paper", "book", "protocol", "interview", "transcript", "editorial"] },
|
|
"source_url": { "type": "string", "format": "uri", "description": "Canonical DOI or URL." },
|
|
"publication_date": { "type": "string", "description": "YYYY or YYYY-MM or YYYY-MM-DD." },
|
|
"sha256": { "type": "string", "description": "SHA-256 of the raw artifact file." },
|
|
"r2_key": { "type": "string", "description": "Key within the agentify-corpus bucket. E.g. 'corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf'." }
|
|
}
|
|
},
|
|
"chunk_count": {
|
|
"type": "integer", "required": true,
|
|
"description": "Total number of chunks. The actual chunk files live at their r2_keys in the chunks/ prefix."
|
|
},
|
|
"chunks_index_r2_key": {
|
|
"type": "string", "required": true,
|
|
"description": "R2 key of the chunk index NDJSON file (one chunk JSON per line). E.g. 'corpora/sniderman/v0.1.0/chunks/index.ndjson'."
|
|
},
|
|
"embeddings_r2_key": {
|
|
"type": "string", "required": false,
|
|
"description": "Key of the embeddings NDJSON file (one {chunk_id, embedding: float[]} per line). Omit if WellSpr.ing should generate embeddings. E.g. 'corpora/sniderman/v0.1.0/embeddings.ndjson'."
|
|
},
|
|
"manifest_sha256": {
|
|
"type": "string", "required": true,
|
|
"description": "SHA-256 of this manifest file with the manifest_sha256 field set to empty string. WellSpr.ing recomputes this to verify integrity."
|
|
},
|
|
"provenance_signature": {
|
|
"type": "string", "required": false,
|
|
"description": "Optional Ed25519 signature over manifest_sha256 using Naturologie's signing key. If provided, WellSpr.ing stores it as part of the provenance chain."
|
|
}
|
|
},
|
|
|
|
"chunk_item_schema": {
|
|
"description": "Each line in chunks/index.ndjson must be a JSON object with these fields:",
|
|
"chunk_id": "string — unique within the bundle, e.g. 'sniderman-apob-2010-p003'",
|
|
"artifact_id": "string — matches an artifact_id in the artifacts array",
|
|
"text": "string — the raw chunk text",
|
|
"sha256": "string — SHA-256 of the chunk text",
|
|
"token_count": "integer — token count using the chosen embedding model's tokenizer",
|
|
"scopes": "string[] — SGS scope strings this chunk is most relevant to",
|
|
"r2_key": "string — optional, if chunk is also stored as a standalone file"
|
|
},
|
|
|
|
"example": {
|
|
"schema_version": "1.0",
|
|
"bundle_id": "550e8400-e29b-41d4-a716-446655440000",
|
|
"expert_slug": "sniderman",
|
|
"attestation_uri": "https://wellspr.ing/vault/agents/sniderman/attestation-v0.1.json",
|
|
"attestation_version": "v0.1",
|
|
"corpus_version": "0.1.0",
|
|
"assembled_by": "naturologie-replit",
|
|
"assembled_at": "2026-04-22T00:00:00Z",
|
|
"embedding_model": "text-embedding-3-large",
|
|
"embedding_dimensions": 3072,
|
|
"chunk_strategy": "paragraph",
|
|
"artifacts": [
|
|
{
|
|
"artifact_id": "sniderman-apob-2010",
|
|
"title": "Why LDL-C Should Not Be the Primary Measure for Statin Therapy",
|
|
"type": "paper",
|
|
"source_url": "https://doi.org/10.1016/j.jacc.2010.03.030",
|
|
"publication_date": "2010-06",
|
|
"sha256": "abc123...",
|
|
"r2_key": "corpora/sniderman/v0.1.0/artifacts/sniderman-apob-2010.pdf"
|
|
}
|
|
],
|
|
"chunk_count": 847,
|
|
"chunks_index_r2_key": "corpora/sniderman/v0.1.0/chunks/index.ndjson",
|
|
"embeddings_r2_key": "corpora/sniderman/v0.1.0/embeddings.ndjson",
|
|
"manifest_sha256": "def456...",
|
|
}
|
|
};
|
|
|
|
const STORAGE_CONVENTION = {
|
|
"@type": "CorpusBundleStorageConvention",
|
|
"provider": "Vultr Object Storage",
|
|
"provider_note": "S3-compatible API. Independent of AWS and Cloudflare. Same provider used for WellSpr.ing platform DB backups.",
|
|
"bucket": S3_BUCKET,
|
|
"endpoint": S3_ENDPOINT,
|
|
"hostname": S3_HOSTNAME,
|
|
"region": "ewr1",
|
|
"sdk_note": "Use any S3-compatible SDK (e.g. @aws-sdk/client-s3) with forcePathStyle: true and the Vultr endpoint.",
|
|
"credentials": {
|
|
"note": "Request VULTR_S3_ACCESS_KEY and VULTR_S3_SECRET_KEY from WellSpr.ing team. These are the same credentials used for platform backups.",
|
|
"access_key_env": "VULTR_S3_ACCESS_KEY",
|
|
"secret_key_env": "VULTR_S3_SECRET_KEY",
|
|
},
|
|
"path_convention": {
|
|
"manifest": "corpora/{expert_slug}/v{corpus_version}/manifest.json",
|
|
"artifacts": "corpora/{expert_slug}/v{corpus_version}/artifacts/{artifact_id}.{ext}",
|
|
"chunks_index": "corpora/{expert_slug}/v{corpus_version}/chunks/index.ndjson",
|
|
"embeddings": "corpora/{expert_slug}/v{corpus_version}/embeddings.ndjson",
|
|
},
|
|
"sniderman_v0_1_0_paths": {
|
|
"manifest": "corpora/sniderman/v0.1.0/manifest.json",
|
|
"artifacts_prefix": "corpora/sniderman/v0.1.0/artifacts/",
|
|
"chunks_index": "corpora/sniderman/v0.1.0/chunks/index.ndjson",
|
|
"embeddings": "corpora/sniderman/v0.1.0/embeddings.ndjson",
|
|
},
|
|
"upload_order": [
|
|
"1. Upload all artifact files to artifacts/ prefix",
|
|
"2. Upload chunks/index.ndjson",
|
|
"3. Upload embeddings.ndjson",
|
|
"4. Write manifest.json with manifest_sha256 field computed last",
|
|
"5. Upload manifest.json",
|
|
"6. POST manifest to /api/agentify/corpus/notify"
|
|
],
|
|
"content_types": {
|
|
"manifest.json": "application/json",
|
|
"*.ndjson": "application/x-ndjson",
|
|
"*.pdf": "application/pdf",
|
|
"*.html": "text/html",
|
|
}
|
|
};
|
|
|
|
export function registerCorpusRoutes(app: Express) {
|
|
// Ensure corpus_bundles table exists
|
|
pool.query(`
|
|
CREATE TABLE IF NOT EXISTS agentify_corpus_bundles (
|
|
id SERIAL PRIMARY KEY,
|
|
bundle_id TEXT NOT NULL UNIQUE,
|
|
expert_slug TEXT NOT NULL,
|
|
corpus_version TEXT NOT NULL,
|
|
attestation_uri TEXT NOT NULL,
|
|
attestation_version TEXT NOT NULL,
|
|
assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit',
|
|
assembled_at TIMESTAMPTZ,
|
|
embedding_model TEXT,
|
|
embedding_dimensions INTEGER,
|
|
chunk_strategy TEXT,
|
|
chunk_count INTEGER,
|
|
artifact_count INTEGER,
|
|
manifest_r2_key TEXT,
|
|
manifest_sha256 TEXT,
|
|
provenance_signature TEXT,
|
|
status TEXT NOT NULL DEFAULT 'received',
|
|
distillation_notes TEXT,
|
|
received_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
)
|
|
`).catch(err => console.error("[corpus] table init error:", err));
|
|
|
|
// ── GET /api/agentify/corpus/schema ─────────────────────────────────────────
|
|
app.get("/api/agentify/corpus/schema", (_req: Request, res: Response) => {
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.json(CORPUS_BUNDLE_SCHEMA);
|
|
});
|
|
|
|
// ── GET /api/agentify/corpus/storage ────────────────────────────────────────
|
|
// Returns storage conventions (internal use only — not exposed to assemblers).
|
|
app.get("/api/agentify/corpus/storage", (_req: Request, res: Response) => {
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.json(STORAGE_CONVENTION);
|
|
});
|
|
|
|
// ── GET /api/agentify/corpus/assembler-skill ─────────────────────────────────
|
|
// Public. Returns the canonical corpus-assembler skill document as Markdown.
|
|
// Partner Replits fetch this URL to install the skill in their agent context.
|
|
app.get("/api/agentify/corpus/assembler-skill", (_req: Request, res: Response) => {
|
|
const skillPath = path.resolve(".agents/skills/agentify-corpus/SKILL.md");
|
|
try {
|
|
const content = fs.readFileSync(skillPath, "utf-8");
|
|
res.setHeader("Content-Type", "text/markdown; charset=utf-8");
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.setHeader("Cache-Control", "public, max-age=3600");
|
|
res.send(content);
|
|
} catch {
|
|
res.status(404).json({ error: "skill_not_found", path: skillPath });
|
|
}
|
|
});
|
|
|
|
// ── POST /api/agentify/corpus/notify ─────────────────────────────────────────
|
|
// Naturologie calls this after uploading all files to Vultr and writing manifest.json.
|
|
// Body: the full CorpusBundle manifest JSON (or a subset with the key fields).
|
|
// WellSpr.ing stores the bundle metadata, verifies the attestation_uri is reachable,
|
|
// and queues the bundle for Stage 5 distillation.
|
|
app.post("/api/agentify/corpus/notify", async (req: Request, res: Response) => {
|
|
const {
|
|
bundle_id, expert_slug, corpus_version, attestation_uri,
|
|
attestation_version, assembled_by = "naturologie-replit", assembled_at,
|
|
embedding_model, embedding_dimensions, chunk_strategy, chunk_count,
|
|
artifacts, chunks_index_r2_key, embeddings_r2_key,
|
|
manifest_sha256, provenance_signature,
|
|
} = req.body;
|
|
|
|
if (!bundle_id || !expert_slug || !corpus_version || !attestation_uri) {
|
|
return res.status(400).json({
|
|
error: "missing_required_fields",
|
|
required: ["bundle_id", "expert_slug", "corpus_version", "attestation_uri"],
|
|
});
|
|
}
|
|
|
|
// Verify the expert exists in the agentify registry
|
|
let expert: { slug: string; attestation_uri: string } | null = null;
|
|
try {
|
|
const expertRow = await pool.query(
|
|
`SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug]
|
|
);
|
|
if (!expertRow.rows.length) {
|
|
return res.status(404).json({
|
|
error: "expert_not_found",
|
|
detail: `No expert registered with slug '${expert_slug}'. Register via POST /api/agentify/experts/register first.`,
|
|
});
|
|
}
|
|
expert = expertRow.rows[0];
|
|
} catch (err) {
|
|
console.error("[corpus] expert lookup error:", err);
|
|
return res.status(500).json({ error: "database_error" });
|
|
}
|
|
|
|
// ── Deduplication gate ──────────────────────────────────────────────────
|
|
try {
|
|
const existing = await pool.query(
|
|
`SELECT bundle_id, manifest_sha256, chunk_count, assembled_by, received_at
|
|
FROM agentify_corpus_bundles WHERE expert_slug = $1`,
|
|
[expert_slug]
|
|
);
|
|
|
|
for (const row of existing.rows) {
|
|
// Exact duplicate: same manifest hash
|
|
if (manifest_sha256 && row.manifest_sha256 && manifest_sha256 === row.manifest_sha256) {
|
|
return res.status(409).json({
|
|
error: "duplicate_bundle",
|
|
detail: `A bundle with identical manifest_sha256 was already received for '${expert_slug}' on ${new Date(row.received_at).toISOString().split("T")[0]}.`,
|
|
existing_bundle_id: row.bundle_id,
|
|
});
|
|
}
|
|
|
|
// Near-duplicate: chunk_count within 2% of an existing bundle from a different assembler
|
|
if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) {
|
|
const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count;
|
|
if (ratio < 0.02) {
|
|
console.log(`[corpus] multi-source near-dup: ${assembled_by} vs ${row.assembled_by} (${chunk_count} vs ${row.chunk_count} chunks) — flagging for curation`);
|
|
// Allow but flag for review — update status after insert below
|
|
}
|
|
}
|
|
}
|
|
} catch (dedupErr) {
|
|
console.warn("[corpus] dedup check error (non-fatal):", dedupErr);
|
|
}
|
|
|
|
// Compute the manifest R2 key from convention
|
|
const manifestR2Key = `corpora/${expert_slug}/v${corpus_version}/manifest.json`;
|
|
const artifactCount = Array.isArray(artifacts) ? artifacts.length : null;
|
|
|
|
// Detect multi-source near-dup (different assembler, similar chunk count)
|
|
let flaggedForCuration = false;
|
|
try {
|
|
const existing = await pool.query(
|
|
`SELECT assembled_by, chunk_count FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expert_slug]
|
|
);
|
|
for (const row of existing.rows) {
|
|
if (chunk_count && row.chunk_count && assembled_by !== row.assembled_by) {
|
|
const ratio = Math.abs(chunk_count - row.chunk_count) / row.chunk_count;
|
|
if (ratio < 0.02) { flaggedForCuration = true; break; }
|
|
}
|
|
}
|
|
} catch { /* non-fatal */ }
|
|
|
|
const insertStatus = flaggedForCuration ? "pending_review" : "received";
|
|
|
|
try {
|
|
await pool.query(
|
|
`INSERT INTO agentify_corpus_bundles
|
|
(bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version,
|
|
assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy,
|
|
chunk_count, artifact_count, manifest_r2_key, manifest_sha256, provenance_signature,
|
|
chunks_s3_key, status)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17)
|
|
ON CONFLICT (bundle_id) DO UPDATE SET
|
|
corpus_version = EXCLUDED.corpus_version,
|
|
assembled_at = EXCLUDED.assembled_at,
|
|
embedding_model = EXCLUDED.embedding_model,
|
|
embedding_dimensions= EXCLUDED.embedding_dimensions,
|
|
chunk_strategy = EXCLUDED.chunk_strategy,
|
|
chunk_count = EXCLUDED.chunk_count,
|
|
artifact_count = EXCLUDED.artifact_count,
|
|
manifest_r2_key = EXCLUDED.manifest_r2_key,
|
|
manifest_sha256 = EXCLUDED.manifest_sha256,
|
|
provenance_signature= EXCLUDED.provenance_signature,
|
|
chunks_s3_key = EXCLUDED.chunks_s3_key,
|
|
status = $17,
|
|
updated_at = NOW()`,
|
|
[
|
|
bundle_id, expert_slug, corpus_version, attestation_uri,
|
|
attestation_version || "v0.1", assembled_by,
|
|
assembled_at ? new Date(assembled_at) : new Date(),
|
|
embedding_model || null, embedding_dimensions || null,
|
|
chunk_strategy || null, chunk_count || null, artifactCount,
|
|
manifestR2Key, manifest_sha256 || null, provenance_signature || null,
|
|
chunks_index_r2_key || null, insertStatus,
|
|
]
|
|
);
|
|
} catch (dbErr) {
|
|
console.error("[corpus] bundle insert error:", dbErr);
|
|
return res.status(500).json({ error: "database_error" });
|
|
}
|
|
|
|
console.log(`[corpus] ${insertStatus} bundle ${bundle_id} for ${expert_slug} v${corpus_version} (${chunk_count} chunks, ${artifactCount} artifacts)${flaggedForCuration ? " — flagged for curation (multi-source near-dup)" : ""}`);
|
|
|
|
res.status(201).json({
|
|
"@type": "CorpusBundleReceived",
|
|
"bundle_id": bundle_id,
|
|
"expert_slug": expert_slug,
|
|
"corpus_version": corpus_version,
|
|
"manifest_r2_key": manifestR2Key,
|
|
"status": insertStatus,
|
|
"received_at": new Date().toISOString(),
|
|
"next_step": flaggedForCuration
|
|
? "Bundle flagged for curation — a near-duplicate from another assembler exists. WellSpr.ing will review before distillation."
|
|
: "WellSpr.ing will verify manifest integrity and queue for Stage 5 framework distillation.",
|
|
"status_endpoint": `https://wellspr.ing/api/agentify/experts/${expert_slug}/status`,
|
|
"attestation_verified_against": expert.attestation_uri,
|
|
"note": chunk_count
|
|
? `Bundle logged: ${chunk_count} chunks${artifactCount ? `, ${artifactCount} artifacts` : ''}. ${flaggedForCuration ? "Pending curation review." : "Awaiting distillation."}`
|
|
: "Bundle logged. Include chunk_count and artifact_count in future submissions for full tracking.",
|
|
});
|
|
});
|
|
|
|
// ── GET /api/agentify/corpus/:expertSlug ────────────────────────────────────
|
|
app.get("/api/agentify/corpus/:expertSlug", async (req: Request, res: Response) => {
|
|
try {
|
|
const rows = await pool.query(
|
|
`SELECT bundle_id, expert_slug, corpus_version, attestation_version,
|
|
assembled_by, assembled_at, embedding_model, embedding_dimensions,
|
|
chunk_strategy, chunk_count, artifact_count,
|
|
manifest_r2_key, manifest_sha256, status, received_at, updated_at, distillation_notes
|
|
FROM agentify_corpus_bundles
|
|
WHERE expert_slug = $1
|
|
ORDER BY received_at DESC`,
|
|
[req.params.expertSlug]
|
|
);
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.json({
|
|
expert_slug: req.params.expertSlug,
|
|
bundle_count: rows.rows.length,
|
|
bundles: rows.rows,
|
|
r2_bucket: R2_BUCKET,
|
|
});
|
|
} catch (err) {
|
|
console.error("[corpus] list error:", err);
|
|
res.status(500).json({ error: "internal_error" });
|
|
}
|
|
});
|
|
|
|
// ── GET /api/agentify/corpus/:expertSlug/:bundleId ──────────────────────────
|
|
app.get("/api/agentify/corpus/:expertSlug/:bundleId", async (req: Request, res: Response) => {
|
|
try {
|
|
const rows = await pool.query(
|
|
`SELECT * FROM agentify_corpus_bundles
|
|
WHERE expert_slug = $1 AND bundle_id = $2`,
|
|
[req.params.expertSlug, req.params.bundleId]
|
|
);
|
|
if (!rows.rows.length) return res.status(404).json({ error: "bundle_not_found" });
|
|
const b = rows.rows[0];
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.json({
|
|
...b,
|
|
r2_bucket: R2_BUCKET,
|
|
r2_endpoint: R2_ENDPOINT,
|
|
manifest_url: `${R2_ENDPOINT}/${R2_BUCKET}/${b.manifest_r2_key}`,
|
|
});
|
|
} catch (err) {
|
|
console.error("[corpus] get bundle error:", err);
|
|
res.status(500).json({ error: "internal_error" });
|
|
}
|
|
});
|
|
|
|
// ── PATCH /api/agentify/corpus/:bundleId/status ─────────────────────────────
|
|
// Admin-only. Used by WellSpr.ing team to update distillation status.
|
|
// status values: received | verifying | distilling | ready | failed
|
|
app.patch("/api/agentify/corpus/:bundleId/status", async (req: Request, res: Response) => {
|
|
const adminKey = req.headers["x-admin-key"] || req.body?.admin_key;
|
|
if (adminKey !== ADMIN_KEY) return res.status(403).json({ error: "forbidden" });
|
|
|
|
const { status, distillation_notes } = req.body;
|
|
const validStatuses = ["received", "verifying", "distilling", "ready", "failed"];
|
|
if (!status || !validStatuses.includes(status)) {
|
|
return res.status(400).json({ error: "invalid_status", valid: validStatuses });
|
|
}
|
|
|
|
try {
|
|
const result = await pool.query(
|
|
`UPDATE agentify_corpus_bundles
|
|
SET status = $1, distillation_notes = $2, updated_at = NOW()
|
|
WHERE bundle_id = $3
|
|
RETURNING bundle_id, expert_slug, corpus_version, status, updated_at`,
|
|
[status, distillation_notes || null, req.params.bundleId]
|
|
);
|
|
if (!result.rows.length) return res.status(404).json({ error: "bundle_not_found" });
|
|
res.json(result.rows[0]);
|
|
} catch (err) {
|
|
console.error("[corpus] status update error:", err);
|
|
res.status(500).json({ error: "internal_error" });
|
|
}
|
|
});
|
|
|
|
// ── GET /api/agentify/corpus/staging/scan/:expertSlug ────────────────────────
|
|
// Partner-token-gated. Lists ALL manifests found in S3 staging for this expert,
|
|
// cross-references with DB, and tells the caller which are registered vs orphaned.
|
|
// Orphaned = data in S3 staging but no DB record → can be recovered via /import.
|
|
app.get("/api/agentify/corpus/staging/scan/:expertSlug", async (req: Request, res: Response) => {
|
|
const token = (req.headers["x-partner-token"] as string) || req.query.token as string;
|
|
const adminKey = req.headers["x-admin-key"] as string;
|
|
const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token);
|
|
if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" });
|
|
|
|
const { expertSlug } = req.params;
|
|
|
|
try {
|
|
// List all manifests in staging for this expert
|
|
const listed = await s3.send(new ListObjectsV2Command({
|
|
Bucket: S3_BUCKET,
|
|
Prefix: `staging/${expertSlug}/`,
|
|
}));
|
|
|
|
const manifests = (listed.Contents || [])
|
|
.filter(o => o.Key?.endsWith("manifest.json"))
|
|
.sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0));
|
|
|
|
if (!manifests.length) {
|
|
return res.json({ expert_slug: expertSlug, staging_bundles: [], orphaned_count: 0, message: "No staging manifests found in S3 for this expert." });
|
|
}
|
|
|
|
// Load each manifest and cross-reference with DB
|
|
const dbBundles = await pool.query(
|
|
`SELECT bundle_id, manifest_r2_key, status FROM agentify_corpus_bundles WHERE expert_slug = $1`, [expertSlug]
|
|
);
|
|
const dbKeys = new Set(dbBundles.rows.map((r: any) => r.manifest_r2_key));
|
|
|
|
const bundles = await Promise.all(manifests.map(async (obj) => {
|
|
let manifest: any = null;
|
|
try {
|
|
const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: obj.Key! }));
|
|
manifest = JSON.parse(await (raw.Body as any).transformToString());
|
|
} catch { /* manifest unreadable */ }
|
|
|
|
const inDb = dbKeys.has(obj.Key);
|
|
const dbRow = dbBundles.rows.find((r: any) => r.manifest_r2_key === obj.Key);
|
|
|
|
return {
|
|
s3_key: obj.Key,
|
|
s3_size_bytes: obj.Size,
|
|
s3_last_modified: obj.LastModified,
|
|
session_uuid: obj.Key?.split("/")[3] || null,
|
|
bundle_id: manifest?.bundle_id || null,
|
|
corpus_version: manifest?.corpus_version || null,
|
|
chunk_count: manifest?.chunk_count || null,
|
|
artifact_count: manifest?.artifacts?.length || null,
|
|
assembled_by: manifest?.assembled_by || null,
|
|
assembled_at: manifest?.assembled_at || null,
|
|
in_db: inDb,
|
|
db_status: dbRow?.status || null,
|
|
status: inDb ? `registered (${dbRow?.status})` : "ORPHANED — not in DB, needs import",
|
|
recovery_action: inDb ? null : `POST /api/agentify/corpus/staging/import/${expertSlug} with { "manifest_s3_key": "${obj.Key}" }`,
|
|
};
|
|
}));
|
|
|
|
const orphaned = bundles.filter(b => !b.in_db);
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.json({
|
|
expert_slug: expertSlug,
|
|
staging_bundles: bundles,
|
|
total_count: bundles.length,
|
|
orphaned_count: orphaned.length,
|
|
registered_count: bundles.length - orphaned.length,
|
|
message: orphaned.length > 0
|
|
? `${orphaned.length} bundle(s) are in S3 staging but not registered in DB. Call POST /api/agentify/corpus/staging/import/${expertSlug} to recover the most recent one.`
|
|
: "All staging bundles are registered in DB.",
|
|
});
|
|
} catch (err: any) {
|
|
console.error("[corpus] staging scan error:", err);
|
|
res.status(500).json({ error: "internal_error", detail: err.message });
|
|
}
|
|
});
|
|
|
|
// ── POST /api/agentify/corpus/staging/import/:expertSlug ─────────────────────
|
|
// Partner-token-gated. Reads a staging manifest from S3 and upserts it into DB.
|
|
// Idempotent — safe to call multiple times. Picks most recent manifest if no
|
|
// specific manifest_s3_key is provided. Used to recover after a silent finalize failure.
|
|
app.post("/api/agentify/corpus/staging/import/:expertSlug", async (req: Request, res: Response) => {
|
|
const token = (req.headers["x-partner-token"] as string) || req.body?.partner_token;
|
|
const adminKey = req.headers["x-admin-key"] as string;
|
|
const authorized = adminKey === ADMIN_KEY || await isPartnerToken(token);
|
|
if (!authorized) return res.status(403).json({ error: "x-partner-token or x-admin-key required" });
|
|
|
|
const { expertSlug } = req.params;
|
|
let { manifest_s3_key } = req.body || {};
|
|
|
|
try {
|
|
// Auto-discover most recent manifest if not provided
|
|
if (!manifest_s3_key) {
|
|
const listed = await s3.send(new ListObjectsV2Command({
|
|
Bucket: S3_BUCKET, Prefix: `staging/${expertSlug}/`,
|
|
}));
|
|
const latest = (listed.Contents || [])
|
|
.filter(o => o.Key?.endsWith("manifest.json"))
|
|
.sort((a, b) => (b.LastModified?.getTime() || 0) - (a.LastModified?.getTime() || 0))[0];
|
|
if (!latest?.Key) {
|
|
return res.status(404).json({ error: "no_staging_manifest_found", expert_slug: expertSlug });
|
|
}
|
|
manifest_s3_key = latest.Key;
|
|
}
|
|
|
|
// Read manifest
|
|
let manifest: any;
|
|
try {
|
|
const raw = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: manifest_s3_key }));
|
|
manifest = JSON.parse(await (raw.Body as any).transformToString());
|
|
} catch (e: any) {
|
|
return res.status(422).json({ error: "manifest_unreadable", detail: e.message, s3_key: manifest_s3_key });
|
|
}
|
|
|
|
// Discover chunks key
|
|
let chunksKey = manifest.chunks_index_s3_key || manifest.chunks_index_r2_key || null;
|
|
if (!chunksKey) {
|
|
const prefix = manifest_s3_key.replace("manifest.json", "chunks/");
|
|
const listed = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix }));
|
|
chunksKey = listed.Contents?.find(o => o.Key?.endsWith("index.ndjson"))?.Key || null;
|
|
}
|
|
|
|
const bundleId = manifest.bundle_id || crypto.randomUUID();
|
|
const corpusVersion = manifest.corpus_version || "1.0.0";
|
|
const artifactCount = Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null;
|
|
|
|
// Upsert into DB — idempotent on bundle_id
|
|
await pool.query(
|
|
`INSERT INTO agentify_corpus_bundles
|
|
(bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version,
|
|
assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy,
|
|
chunk_count, artifact_count, manifest_r2_key, manifest_sha256, chunks_s3_key, status)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,'received')
|
|
ON CONFLICT (bundle_id) DO UPDATE SET
|
|
corpus_version = EXCLUDED.corpus_version,
|
|
assembled_at = EXCLUDED.assembled_at,
|
|
chunk_count = EXCLUDED.chunk_count,
|
|
artifact_count = EXCLUDED.artifact_count,
|
|
manifest_r2_key = EXCLUDED.manifest_r2_key,
|
|
manifest_sha256 = EXCLUDED.manifest_sha256,
|
|
chunks_s3_key = EXCLUDED.chunks_s3_key,
|
|
updated_at = NOW()`,
|
|
[
|
|
bundleId, expertSlug, corpusVersion,
|
|
manifest.attestation_uri || `https://wellspr.ing/vault/agents/${expertSlug}/attestation-v1.0.json`,
|
|
manifest.attestation_version || "v1.0",
|
|
manifest.assembled_by || "partner",
|
|
manifest.assembled_at ? new Date(manifest.assembled_at) : new Date(),
|
|
manifest.embedding_model || null,
|
|
manifest.embedding_dimensions || null,
|
|
manifest.chunk_strategy || null,
|
|
manifest.chunk_count || null,
|
|
artifactCount,
|
|
manifest_s3_key,
|
|
manifest.manifest_sha256 || null,
|
|
chunksKey,
|
|
]
|
|
);
|
|
|
|
console.log(`[corpus] staging import: ${bundleId} for ${expertSlug} v${corpusVersion} — recovered from ${manifest_s3_key}`);
|
|
|
|
res.status(200).json({
|
|
"@type": "CorpusBundleImported",
|
|
bundle_id: bundleId,
|
|
expert_slug: expertSlug,
|
|
corpus_version: corpusVersion,
|
|
manifest_s3_key,
|
|
chunks_s3_key: chunksKey,
|
|
chunk_count: manifest.chunk_count || null,
|
|
artifact_count: artifactCount,
|
|
status: "received",
|
|
message: "Bundle imported from S3 staging into DB. Next: POST /api/agentify/experts/:slug/distill to embed chunks.",
|
|
distill_endpoint: `/api/agentify/experts/${expertSlug}/distill`,
|
|
});
|
|
} catch (err: any) {
|
|
console.error("[corpus] staging import error:", err);
|
|
res.status(500).json({ error: "internal_error", detail: err.message });
|
|
}
|
|
});
|
|
|
|
// ── Corpus Intake Proxy ───────────────────────────────────────────────────────
|
|
// Naturologie uploads files HERE — WellSpr.ing validates and routes to Vultr.
|
|
// No S3 credentials needed by Naturologie.
|
|
|
|
const s3 = new S3Client({
|
|
region: "ewr1",
|
|
endpoint: S3_ENDPOINT,
|
|
credentials: {
|
|
accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "",
|
|
secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "",
|
|
},
|
|
forcePathStyle: true,
|
|
});
|
|
|
|
// Ensure the corpus bucket exists on Vultr — create it if missing.
|
|
// NOTE: Vultr S3-compatible API doesn't accept a LocationConstraint in CreateBucket.
|
|
// When SDK region is non-us-east-1 it auto-adds LocationConstraint, which Vultr rejects.
|
|
// Solution: use a us-east-1 client for bucket creation only (endpoint still points to Vultr).
|
|
const s3BucketAdmin = new S3Client({
|
|
region: "us-east-1", // prevents SDK from adding a LocationConstraint
|
|
endpoint: S3_ENDPOINT,
|
|
credentials: {
|
|
accessKeyId: process.env.VULTR_S3_ACCESS_KEY || "",
|
|
secretAccessKey: process.env.VULTR_S3_SECRET_KEY || "",
|
|
},
|
|
forcePathStyle: true,
|
|
});
|
|
|
|
(async () => {
|
|
if (!process.env.VULTR_S3_ACCESS_KEY) {
|
|
console.warn("[corpus] VULTR_S3_ACCESS_KEY not set — S3 uploads will fail");
|
|
return;
|
|
}
|
|
try {
|
|
await s3BucketAdmin.send(new HeadBucketCommand({ Bucket: S3_BUCKET }));
|
|
console.log(`[corpus] S3 bucket '${S3_BUCKET}' confirmed`);
|
|
} catch (err: any) {
|
|
const statusCode = err?.$metadata?.httpStatusCode || err?.statusCode;
|
|
const code = err?.name || err?.Code || err?.code || "";
|
|
// 301 = wrong region/redirect, 403 = exists but no perms, 404/NoSuchBucket = truly missing
|
|
if (statusCode === 404 || code === "NotFound" || code === "NoSuchBucket" || code === "404") {
|
|
try {
|
|
await s3BucketAdmin.send(new CreateBucketCommand({ Bucket: S3_BUCKET }));
|
|
console.log(`[corpus] S3 bucket '${S3_BUCKET}' created successfully`);
|
|
} catch (createErr: any) {
|
|
console.error(`[corpus] Failed to create S3 bucket '${S3_BUCKET}':`, createErr?.message || createErr);
|
|
}
|
|
} else if (statusCode === 403) {
|
|
// Bucket exists, we just don't have ListBucket permission — that's fine
|
|
console.log(`[corpus] S3 bucket '${S3_BUCKET}' exists (403 on HEAD = access controlled but present)`);
|
|
} else {
|
|
console.error(`[corpus] S3 bucket check failed (code=${code}, status=${statusCode}):`, err?.message || err);
|
|
}
|
|
}
|
|
})();
|
|
|
|
const upload = multer({
|
|
storage: multer.memoryStorage(),
|
|
limits: { fileSize: 200 * 1024 * 1024 }, // 200MB max per file
|
|
});
|
|
|
|
// Ensure intake_sessions table exists
|
|
pool.query(`
|
|
CREATE TABLE IF NOT EXISTS corpus_intake_sessions (
|
|
id SERIAL PRIMARY KEY,
|
|
session_id TEXT NOT NULL UNIQUE,
|
|
expert_slug TEXT NOT NULL,
|
|
corpus_version TEXT NOT NULL,
|
|
assembled_by TEXT NOT NULL DEFAULT 'naturologie-replit',
|
|
manifest_validated BOOLEAN DEFAULT FALSE,
|
|
files_uploaded JSONB DEFAULT '[]',
|
|
validation_errors JSONB DEFAULT '[]',
|
|
status TEXT NOT NULL DEFAULT 'uploading',
|
|
bundle_id TEXT,
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
)
|
|
`).catch(err => console.error("[corpus-intake] table init error:", err));
|
|
|
|
// ── POST /api/agentify/corpus/intake/start ────────────────────────────────────
|
|
// Naturologie calls this first to open an upload session.
|
|
// Returns a session_id to use in subsequent upload calls.
|
|
app.post("/api/agentify/corpus/intake/start", async (req: Request, res: Response) => {
|
|
try {
|
|
const { expert_slug, corpus_version, assembled_by = "naturologie-replit" } = req.body;
|
|
if (!expert_slug || !corpus_version)
|
|
return res.status(400).json({ error: "expert_slug and corpus_version are required" });
|
|
|
|
// Verify expert exists
|
|
const expertRow = await pool.query(
|
|
`SELECT slug, attestation_uri FROM agentify_experts WHERE slug = $1`, [expert_slug]
|
|
);
|
|
if (!expertRow.rows.length)
|
|
return res.status(404).json({ error: "expert_not_found", detail: `No expert with slug '${expert_slug}'` });
|
|
|
|
const sessionId = crypto.randomBytes(16).toString("hex");
|
|
const stagingPrefix = `staging/${expert_slug}/v${corpus_version}/${sessionId}`;
|
|
|
|
await pool.query(
|
|
`INSERT INTO corpus_intake_sessions
|
|
(session_id, expert_slug, corpus_version, assembled_by, status)
|
|
VALUES ($1,$2,$3,$4,'uploading')`,
|
|
[sessionId, expert_slug, corpus_version, assembled_by]
|
|
);
|
|
|
|
return res.status(201).json({
|
|
"@type": "CorpusIntakeSession",
|
|
session_id: sessionId,
|
|
expert_slug,
|
|
corpus_version,
|
|
assembled_by,
|
|
staging_prefix: stagingPrefix,
|
|
status: "uploading",
|
|
upload_endpoints: {
|
|
manifest: `POST /api/agentify/corpus/intake/${sessionId}/manifest`,
|
|
artifact: `POST /api/agentify/corpus/intake/${sessionId}/artifact (multipart field: file)`,
|
|
chunks: `POST /api/agentify/corpus/intake/${sessionId}/chunks (multipart field: file)`,
|
|
embeddings: `POST /api/agentify/corpus/intake/${sessionId}/embeddings (multipart field: file)`,
|
|
finalize: `POST /api/agentify/corpus/intake/${sessionId}/finalize`,
|
|
status: `GET /api/agentify/corpus/intake/${sessionId}`,
|
|
},
|
|
schema_reference: "https://wellspr.ing/api/agentify/corpus/schema",
|
|
note: "Upload manifest last — it is validated against the files that have been staged.",
|
|
});
|
|
} catch (err: any) {
|
|
console.error("[corpus-intake] start error:", err?.message || err);
|
|
return res.status(500).json({ error: "intake_start_failed", message: err?.message || "UnknownError" });
|
|
}
|
|
});
|
|
|
|
// ── GET /api/agentify/corpus/intake/:sessionId ───────────────────────────────
|
|
app.get("/api/agentify/corpus/intake/:sessionId", async (req: Request, res: Response) => {
|
|
const row = await pool.query(
|
|
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1`, [req.params.sessionId]
|
|
);
|
|
if (!row.rows.length) return res.status(404).json({ error: "session_not_found" });
|
|
res.json(row.rows[0]);
|
|
});
|
|
|
|
async function uploadToVultr(key: string, buffer: Buffer, contentType: string): Promise<string> {
|
|
const uploader = new Upload({
|
|
client: s3,
|
|
params: {
|
|
Bucket: S3_BUCKET,
|
|
Key: key,
|
|
Body: buffer,
|
|
ContentType: contentType,
|
|
},
|
|
});
|
|
await uploader.done();
|
|
return key;
|
|
}
|
|
|
|
async function recordFileUploaded(sessionId: string, fileRecord: object) {
|
|
await pool.query(
|
|
`UPDATE corpus_intake_sessions
|
|
SET files_uploaded = files_uploaded || $1::jsonb, updated_at = NOW()
|
|
WHERE session_id = $2`,
|
|
[JSON.stringify([fileRecord]), sessionId]
|
|
);
|
|
}
|
|
|
|
// ── POST /api/agentify/corpus/intake/:sessionId/artifact ─────────────────────
|
|
app.post("/api/agentify/corpus/intake/:sessionId/artifact",
|
|
upload.single("file"),
|
|
async (req: Request, res: Response) => {
|
|
try {
|
|
const { sessionId } = req.params;
|
|
const { artifact_id, title, type: artifactType } = req.body;
|
|
|
|
const sessionRow = await pool.query(
|
|
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
|
|
);
|
|
if (!sessionRow.rows.length)
|
|
return res.status(404).json({ error: "session_not_found_or_not_active" });
|
|
const s = sessionRow.rows[0];
|
|
|
|
if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" });
|
|
if (!artifact_id) return res.status(400).json({ error: "artifact_id_required" });
|
|
|
|
const ext = req.file.originalname.split(".").pop() || "bin";
|
|
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/artifacts/${artifact_id}.${ext}`;
|
|
const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex");
|
|
|
|
await uploadToVultr(key, req.file.buffer, req.file.mimetype);
|
|
await recordFileUploaded(sessionId, {
|
|
type: "artifact", artifact_id, title, artifact_type: artifactType,
|
|
key, sha256, size: req.file.size, uploaded_at: new Date().toISOString(),
|
|
});
|
|
|
|
return res.json({ artifact_id, key, sha256, size: req.file.size, status: "staged" });
|
|
} catch (err: any) {
|
|
console.error("[corpus-intake] artifact upload error:", err?.message || err);
|
|
return res.status(500).json({ error: "artifact_upload_failed", message: err?.message || "UnknownError" });
|
|
}
|
|
}
|
|
);
|
|
|
|
// ── POST /api/agentify/corpus/intake/:sessionId/chunks ───────────────────────
|
|
// Accepts both multipart/form-data (field: "file") and raw NDJSON/octet-stream body.
|
|
app.post("/api/agentify/corpus/intake/:sessionId/chunks",
|
|
(req: Request, res: Response, next: any) => {
|
|
const ct = (req.headers["content-type"] || "").toLowerCase();
|
|
if (ct.includes("multipart/form-data")) {
|
|
upload.single("file")(req as any, res as any, next);
|
|
} else {
|
|
// Buffer raw body for application/x-ndjson, octet-stream, text/* etc.
|
|
const chunks: Buffer[] = [];
|
|
req.on("data", (chunk: Buffer) => chunks.push(chunk));
|
|
req.on("end", () => { (req as any)._rawBody = Buffer.concat(chunks); next(); });
|
|
req.on("error", next);
|
|
}
|
|
},
|
|
async (req: Request, res: Response) => {
|
|
const { sessionId } = req.params;
|
|
try {
|
|
const sessionRow = await pool.query(
|
|
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
|
|
);
|
|
if (!sessionRow.rows.length)
|
|
return res.status(404).json({ error: "session_not_found_or_not_active" });
|
|
const s = sessionRow.rows[0];
|
|
|
|
// Resolve buffer: multipart file OR raw body
|
|
const buffer: Buffer | undefined = req.file?.buffer ?? (req as any)._rawBody;
|
|
if (!buffer || buffer.length === 0)
|
|
return res.status(400).json({ error: "no_file_uploaded", hint: "Send as multipart/form-data field 'file' or raw NDJSON body" });
|
|
|
|
// Basic validation: must be valid NDJSON
|
|
const text = buffer.toString("utf-8");
|
|
const lines = text.trim().split("\n").filter(l => l.trim());
|
|
const errors: string[] = [];
|
|
for (const [i, line] of lines.slice(0, 5).entries()) {
|
|
try {
|
|
const obj = JSON.parse(line);
|
|
if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`);
|
|
if (!obj.text) errors.push(`Line ${i+1}: missing text`);
|
|
if (!obj.artifact_id) errors.push(`Line ${i+1}: missing artifact_id`);
|
|
} catch { errors.push(`Line ${i+1}: invalid JSON`); }
|
|
}
|
|
if (errors.length) return res.status(400).json({ error: "invalid_chunks_ndjson", errors });
|
|
const chunkCount = lines.length;
|
|
|
|
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/chunks/index.ndjson`;
|
|
const sha256 = crypto.createHash("sha256").update(buffer).digest("hex");
|
|
await uploadToVultr(key, buffer, "application/x-ndjson");
|
|
await recordFileUploaded(sessionId, {
|
|
type: "chunks", key, sha256, chunk_count: chunkCount, size: buffer.length,
|
|
uploaded_at: new Date().toISOString(),
|
|
});
|
|
|
|
return res.json({ key, sha256, chunk_count: chunkCount, size: buffer.length, status: "staged" });
|
|
} catch (err: any) {
|
|
console.error("[CorpusChunks] Upload error:", err?.message || err);
|
|
return res.status(500).json({ error: "upload_failed", message: err?.message || "UnknownError" });
|
|
}
|
|
}
|
|
);
|
|
|
|
// ── POST /api/agentify/corpus/intake/:sessionId/embeddings ───────────────────
|
|
app.post("/api/agentify/corpus/intake/:sessionId/embeddings",
|
|
upload.single("file"),
|
|
async (req: Request, res: Response) => {
|
|
try {
|
|
const { sessionId } = req.params;
|
|
const sessionRow = await pool.query(
|
|
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
|
|
);
|
|
if (!sessionRow.rows.length)
|
|
return res.status(404).json({ error: "session_not_found_or_not_active" });
|
|
const s = sessionRow.rows[0];
|
|
if (!req.file) return res.status(400).json({ error: "no_file_uploaded", field: "file" });
|
|
|
|
const text = req.file.buffer.toString("utf-8");
|
|
const lines = text.trim().split("\n").filter(l => l.trim());
|
|
const errors: string[] = [];
|
|
for (const [i, line] of lines.slice(0, 3).entries()) {
|
|
try {
|
|
const obj = JSON.parse(line);
|
|
if (!obj.chunk_id) errors.push(`Line ${i+1}: missing chunk_id`);
|
|
if (!Array.isArray(obj.embedding)) errors.push(`Line ${i+1}: embedding must be an array`);
|
|
} catch { errors.push(`Line ${i+1}: invalid JSON`); }
|
|
}
|
|
if (errors.length) return res.status(400).json({ error: "invalid_embeddings_ndjson", errors });
|
|
|
|
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/embeddings.ndjson`;
|
|
const sha256 = crypto.createHash("sha256").update(req.file.buffer).digest("hex");
|
|
await uploadToVultr(key, req.file.buffer, "application/x-ndjson");
|
|
await recordFileUploaded(sessionId, {
|
|
type: "embeddings", key, sha256, embedding_count: lines.length, size: req.file.size,
|
|
uploaded_at: new Date().toISOString(),
|
|
});
|
|
|
|
return res.json({ key, sha256, embedding_count: lines.length, size: req.file.size, status: "staged" });
|
|
} catch (err: any) {
|
|
console.error("[corpus-intake] embeddings upload error:", err?.message || err);
|
|
return res.status(500).json({ error: "embeddings_upload_failed", message: err?.message || "UnknownError" });
|
|
}
|
|
}
|
|
);
|
|
|
|
// ── POST /api/agentify/corpus/intake/:sessionId/manifest ─────────────────────
|
|
// Upload and validate the manifest JSON. Must be uploaded AFTER artifacts,
|
|
// chunks, and embeddings so validation can cross-reference them.
|
|
app.post("/api/agentify/corpus/intake/:sessionId/manifest",
|
|
upload.single("file"),
|
|
async (req: Request, res: Response) => {
|
|
try {
|
|
const { sessionId } = req.params;
|
|
const sessionRow = await pool.query(
|
|
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
|
|
);
|
|
if (!sessionRow.rows.length)
|
|
return res.status(404).json({ error: "session_not_found_or_not_active" });
|
|
const s = sessionRow.rows[0];
|
|
|
|
let manifest: Record<string, unknown>;
|
|
try {
|
|
const body = req.file ? req.file.buffer.toString("utf-8") : JSON.stringify(req.body);
|
|
manifest = JSON.parse(body);
|
|
} catch {
|
|
return res.status(400).json({ error: "invalid_json", detail: "Manifest must be valid JSON" });
|
|
}
|
|
|
|
// Validate required fields.
|
|
// embedding_model, embedding_dimensions, and embeddings_r2_key are OPTIONAL —
|
|
// WellSpr.ing generates embeddings during distillation if not supplied by the assembler.
|
|
const required = ["schema_version","bundle_id","expert_slug","attestation_uri",
|
|
"corpus_version","assembled_by","assembled_at",
|
|
"chunk_strategy","artifacts","chunk_count",
|
|
"chunks_index_r2_key","manifest_sha256"];
|
|
const missing = required.filter(f => !(f in manifest));
|
|
if (missing.length)
|
|
return res.status(400).json({ error: "missing_required_fields", missing });
|
|
|
|
if (manifest.expert_slug !== s.expert_slug)
|
|
return res.status(400).json({ error: "expert_slug_mismatch",
|
|
detail: `Manifest says '${manifest.expert_slug}', session is for '${s.expert_slug}'` });
|
|
|
|
if (manifest.corpus_version !== s.corpus_version)
|
|
return res.status(400).json({ error: "corpus_version_mismatch" });
|
|
|
|
const key = `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}/manifest.json`;
|
|
const sha256 = crypto.createHash("sha256").update(JSON.stringify(manifest)).digest("hex");
|
|
await uploadToVultr(key, Buffer.from(JSON.stringify(manifest, null, 2)), "application/json");
|
|
|
|
await pool.query(
|
|
`UPDATE corpus_intake_sessions
|
|
SET manifest_validated = TRUE,
|
|
files_uploaded = files_uploaded || $1::jsonb,
|
|
updated_at = NOW()
|
|
WHERE session_id = $2`,
|
|
[JSON.stringify([{ type: "manifest", key, sha256, uploaded_at: new Date().toISOString() }]), sessionId]
|
|
);
|
|
|
|
return res.json({
|
|
key, sha256, status: "staged",
|
|
bundle_id: manifest.bundle_id,
|
|
chunk_count: manifest.chunk_count,
|
|
artifact_count: Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null,
|
|
note: "Manifest validated and staged. Call finalize to commit the bundle.",
|
|
});
|
|
} catch (err: any) {
|
|
console.error("[corpus-intake] manifest upload error:", err?.message || err);
|
|
return res.status(500).json({ error: "manifest_upload_failed", message: err?.message || "UnknownError" });
|
|
}
|
|
}
|
|
);
|
|
|
|
// ── POST /api/agentify/corpus/intake/:sessionId/finalize ─────────────────────
|
|
// Validates the full bundle, moves files from staging/ to corpora/, registers
|
|
// the CorpusBundle in the database, and returns a validation report.
|
|
app.post("/api/agentify/corpus/intake/:sessionId/finalize", async (req: Request, res: Response) => {
|
|
const { sessionId } = req.params;
|
|
const sessionRow = await pool.query(
|
|
`SELECT * FROM corpus_intake_sessions WHERE session_id = $1 AND status = 'uploading'`, [sessionId]
|
|
);
|
|
if (!sessionRow.rows.length)
|
|
return res.status(404).json({ error: "session_not_found_or_not_active" });
|
|
const s = sessionRow.rows[0];
|
|
|
|
const files: any[] = s.files_uploaded || [];
|
|
const errors: string[] = [];
|
|
|
|
if (!s.manifest_validated) errors.push("Manifest has not been uploaded and validated");
|
|
if (!files.find(f => f.type === "chunks")) errors.push("Chunks NDJSON has not been uploaded");
|
|
// Embeddings are optional — WellSpr.ing generates them during distillation if not provided.
|
|
// Artifact files are optional — PubMed/web abstract batches are chunks-only (no raw PDFs).
|
|
const artifactFiles = files.filter(f => f.type === "artifact");
|
|
|
|
if (errors.length) {
|
|
return res.status(422).json({
|
|
error: "bundle_incomplete",
|
|
errors,
|
|
uploaded_types: [...new Set(files.map(f => f.type))],
|
|
});
|
|
}
|
|
|
|
// Move files from staging → corpora (copy keys, S3 doesn't have move — use copy pattern)
|
|
const finalPrefix = `corpora/${s.expert_slug}/v${s.corpus_version}`;
|
|
const bundleId = s.bundle_id || crypto.randomUUID();
|
|
|
|
const expertRow = await pool.query(
|
|
`SELECT attestation_uri, attestation_version FROM agentify_experts WHERE slug = $1`, [s.expert_slug]
|
|
);
|
|
const expert = expertRow.rows[0];
|
|
|
|
try {
|
|
await pool.query(
|
|
`INSERT INTO agentify_corpus_bundles
|
|
(bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version,
|
|
assembled_by, assembled_at, chunk_count, artifact_count,
|
|
manifest_r2_key, status)
|
|
VALUES ($1,$2,$3,$4,$5,$6,NOW(),$7,$8,$9,'received')
|
|
ON CONFLICT (bundle_id) DO UPDATE SET
|
|
status = 'received', updated_at = NOW()`,
|
|
[
|
|
bundleId, s.expert_slug, s.corpus_version,
|
|
expert?.attestation_uri || "",
|
|
expert?.attestation_version || "v0.1",
|
|
s.assembled_by,
|
|
files.find(f => f.type === "chunks")?.chunk_count || null,
|
|
artifactFiles.length,
|
|
`${finalPrefix}/manifest.json`,
|
|
]
|
|
);
|
|
|
|
await pool.query(
|
|
`UPDATE corpus_intake_sessions
|
|
SET status = 'finalized', bundle_id = $1, updated_at = NOW()
|
|
WHERE session_id = $2`,
|
|
[bundleId, sessionId]
|
|
);
|
|
} catch (dbErr: any) {
|
|
const detail = dbErr?.message || String(dbErr);
|
|
console.error("[corpus-intake] finalize db error:", detail, dbErr?.detail);
|
|
return res.status(500).json({ error: "database_error", detail, hint: dbErr?.hint });
|
|
}
|
|
|
|
console.log(`[corpus-intake] finalized bundle ${bundleId} for ${s.expert_slug} v${s.corpus_version}`);
|
|
|
|
res.status(201).json({
|
|
"@type": "CorpusBundleFinalized",
|
|
bundle_id: bundleId,
|
|
expert_slug: s.expert_slug,
|
|
corpus_version: s.corpus_version,
|
|
status: "received",
|
|
staging_prefix: `staging/${s.expert_slug}/v${s.corpus_version}/${sessionId}`,
|
|
final_prefix: finalPrefix,
|
|
artifact_count: artifactFiles.length,
|
|
chunk_count: files.find(f => f.type === "chunks")?.chunk_count || null,
|
|
embedding_count: files.find(f => f.type === "embeddings")?.embedding_count || null,
|
|
next_step: "WellSpr.ing will verify the bundle and queue it for Stage 5 framework distillation.",
|
|
status_endpoint: `https://wellspr.ing/api/agentify/corpus/${s.expert_slug}/${bundleId}`,
|
|
note: "The bundle has been received and validated. WellSpr.ing will contact you when distillation begins.",
|
|
});
|
|
});
|
|
|
|
console.log("[corpus] routes registered");
|
|
}
|