From e221c0cab47b33d3e5240cbbc7370c1b9cb55171 Mon Sep 17 00:00:00 2001 From: gitadmin Date: Sat, 2 May 2026 13:07:39 +0000 Subject: [PATCH] feat: add routes/stage5.ts --- routes/stage5.ts | 1481 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1481 insertions(+) create mode 100644 routes/stage5.ts diff --git a/routes/stage5.ts b/routes/stage5.ts new file mode 100644 index 0000000..a5475fe --- /dev/null +++ b/routes/stage5.ts @@ -0,0 +1,1481 @@ +/** + * Agentify Stage 5 — Distillation, Embedding Storage, and Expert Consult + * + * Turns a received corpus bundle into a functional AI expert persona. + * + * GET /api/agentify/experts/:slug/status — readiness + stats + * POST /api/agentify/experts/:slug/distill — embed chunks, mark ready (admin) + * POST /api/agentify/experts/:slug/consult — consult the expert (partner-gated) + * POST /api/agentify/experts/:slug/import-bundle — register bundle from S3 manifest (admin) + * + * Embedding model: openai/text-embedding-3-small via OpenRouter (1536 dims) + * Vector storage: corpus_embeddings.embedding float8[] — in-process cosine similarity + * Fallback: PostgreSQL full-text search when embedding API unavailable + */ + +import type { Express, Request, Response } from "express"; +import { pool } from "../db"; +import Anthropic from "@anthropic-ai/sdk"; +import { S3Client, GetObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3"; +import OpenAI from "openai"; + +const ADMIN_KEY = process.env.ADMIN_KEY || "b0db7a87384fc814b0f46ea7bdc6ab6a81152be5b098718b"; +const S3_BUCKET = process.env.CORPUS_S3_BUCKET || "agentify-corpus"; + +const s3 = new S3Client({ + endpoint: `https://${process.env.VULTR_S3_HOSTNAME || "ewr1.vultrobjects.com"}`, + region: "ewr1", + credentials: { + accessKeyId: process.env.VULTR_S3_ACCESS_KEY!, + secretAccessKey: process.env.VULTR_S3_SECRET_KEY!, + }, + forcePathStyle: true, +}); + +// Embedding API requires direct OpenRouter (not the Replit modelfarm proxy, +// which only routes chat completions — it returns 400 on /embeddings). +// Set OPENROUTER_API_KEY to a real OpenRouter key to enable dense-vector retrieval. +// Without it the system falls back to PostgreSQL full-text search automatically. +const EMBED_API_KEY = process.env.OPENROUTER_API_KEY || process.env.OPEN_ROUTER_API_KEY || ""; +const embedClient = new OpenAI({ + baseURL: "https://openrouter.ai/api/v1", + apiKey: EMBED_API_KEY, +}); + +const EMBED_MODEL = "openai/text-embedding-3-small"; +const EMBED_DIMS = 1536; +const TOP_K = 5; +const BATCH_SIZE = 20; + +// In-memory rate limiter for public (unauthenticated) demo consultations +const publicDemoRateMap = new Map(); + +// ── Cosine similarity (in-process — fast for ≤10k chunks) ─────────────────── +function cosineSim(a: number[], b: number[]): number { + let dot = 0, magA = 0, magB = 0; + for (let i = 0; i < a.length; i++) { + dot += a[i] * b[i]; + magA += a[i] * a[i]; + magB += b[i] * b[i]; + } + const denom = Math.sqrt(magA) * Math.sqrt(magB); + return denom === 0 ? 0 : dot / denom; +} + +// ── Stream NDJSON file from S3 ─────────────────────────────────────────────── +async function loadNDJSON(key: string): Promise { + const r = await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: key })); + const text = await (r.Body as any).transformToString(); + return text.trim().split("\n").filter((l: string) => l.trim()).map((l: string) => JSON.parse(l)); +} + +// ── Discover chunks S3 key by listing the bucket ──────────────────────────── +async function discoverChunksKey(expertSlug: string, corpusVersion: string): Promise { + for (const prefix of [ + `corpora/${expertSlug}/v${corpusVersion}/`, + `staging/${expertSlug}/${corpusVersion}/`, + `staging/${expertSlug}/v${corpusVersion}/`, + ]) { + const list = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix })); + const match = list.Contents?.find(o => o.Key?.endsWith("/chunks/index.ndjson")); + if (match?.Key) return match.Key; + } + return null; +} + +// ── Embed texts via OpenRouter — returns null on failure (FTS fallback) ────── +async function embedTexts(texts: string[]): Promise { + if (!texts.length) return []; + // If no API key is configured, skip the call entirely — FTS fallback will be used. + // Avoids flooding the warning ring with API auth errors on every batch. + if (!EMBED_API_KEY) return null; + try { + const res = await embedClient.embeddings.create({ model: EMBED_MODEL, input: texts }); + // res.data is the standard OpenAI shape; guard against non-standard responses + const dataArray = res?.data ?? (res as any)?.embeddings ?? null; + if (!Array.isArray(dataArray)) { + console.warn("[stage5] embedding API returned unexpected shape:", JSON.stringify(res)?.slice(0, 200)); + return null; + } + return dataArray.map((d: any) => (d.embedding ?? d.values ?? []) as number[]); + } catch (e: any) { + console.warn("[stage5] embedding API unavailable:", e.message); + return null; + } +} + +// ── isAdmin helper ─────────────────────────────────────────────────────────── +function isAdmin(req: Request): boolean { + return (req.headers["x-admin-key"] as string) === ADMIN_KEY; +} + +export function registerStage5Routes(app: Express) { + + // Log retrieval mode so the operator knows the current state at startup + if (EMBED_API_KEY) { + console.log("[stage5] Dense-vector retrieval ENABLED (OPENROUTER_API_KEY present)"); + } else { + console.log("[stage5] FTS-only mode — set OPENROUTER_API_KEY for dense-vector retrieval"); + } + + // ── Migrate tables ───────────────────────────────────────────────────────── + pool.query(` + CREATE TABLE IF NOT EXISTS corpus_embeddings ( + id SERIAL PRIMARY KEY, + expert_slug TEXT NOT NULL, + bundle_id TEXT NOT NULL, + chunk_id TEXT NOT NULL, + chunk_text TEXT NOT NULL, + embedding float8[], + metadata JSONB, + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(bundle_id, chunk_id) + ) + `).catch(e => console.error("[stage5] corpus_embeddings init:", e.message)); + + pool.query(` + CREATE TABLE IF NOT EXISTS corpus_consult_log ( + id SERIAL PRIMARY KEY, + expert_slug TEXT NOT NULL, + partner_id TEXT, + question TEXT NOT NULL, + answer TEXT NOT NULL, + chunks_used INTEGER, + retrieval TEXT DEFAULT 'dense', + created_at TIMESTAMPTZ DEFAULT NOW() + ) + `).catch(e => console.error("[stage5] corpus_consult_log init:", e.message)); + + pool.query(`ALTER TABLE agentify_corpus_bundles ADD COLUMN IF NOT EXISTS chunks_s3_key TEXT`) + .catch(() => {}); + + // ── agentify_source_catalog — dedup memory: every URL ever ingested per expert + pool.query(` + CREATE TABLE IF NOT EXISTS agentify_source_catalog ( + id SERIAL PRIMARY KEY, + expert_slug TEXT NOT NULL, + bundle_id TEXT NOT NULL, + url TEXT NOT NULL, + canonical TEXT, + title TEXT, + type TEXT DEFAULT 'article', + artifact_id TEXT, + chunk_count INTEGER DEFAULT 0, + ingested_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(expert_slug, url) + ) + `).catch(e => console.error("[stage5] agentify_source_catalog init:", e.message)); + + pool.query(` + CREATE TABLE IF NOT EXISTS partner_tokens ( + id SERIAL PRIMARY KEY, + token TEXT NOT NULL UNIQUE, + partner_name TEXT NOT NULL, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ DEFAULT NOW() + ) + `).then(() => { + return pool.query(` + INSERT INTO partner_tokens (token, partner_name, is_active) + VALUES ('ws_a94673b4c729e89a034feb7a3f866e6f75c3c668afdd277be03f4a98d248b8fb', 'naturologie-replit-v2', TRUE) + ON CONFLICT (token) DO NOTHING + `); + }).catch(e => console.error("[stage5] partner_tokens init:", e.message)); + + console.log("[Agentify/Stage5] Routes registered"); + + // ── CORS preflight for all portal-API endpoints ─────────────────────────── + const PORTAL_CORS = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, X-Partner-Token, X-Admin-Key", + }; + const portalPaths = [ + "/api/agentify/experts/:slug/consult", + "/api/agentify/experts/:slug/corpus/submit-url", + "/api/agentify/experts/:slug/corpus/submit-source", + "/api/agentify/experts/:slug/corpus/catalog", + "/api/agentify/experts/:slug/status", + "/api/agentify/experts/:slug", + ]; + for (const path of portalPaths) { + app.options(path, (_req: Request, res: Response) => { + Object.entries(PORTAL_CORS).forEach(([k, v]) => res.setHeader(k, v)); + res.status(204).end(); + }); + } + + // ── GET /api/agentify/experts/:slug/status ───────────────────────────────── + app.get("/api/agentify/experts/:slug/status", async (req: Request, res: Response) => { + const { slug } = req.params; + try { + const expertRow = await pool.query(`SELECT * FROM agentify_experts WHERE slug = $1`, [slug]); + if (!expertRow.rows.length) return res.status(404).json({ error: "expert_not_found" }); + const expert = expertRow.rows[0]; + + const bundles = await pool.query( + `SELECT bundle_id, corpus_version, status, chunk_count, distillation_notes, received_at, updated_at + FROM agentify_corpus_bundles WHERE expert_slug = $1 ORDER BY received_at DESC`, + [slug] + ); + const embCount = await pool.query( + `SELECT COUNT(*) FROM corpus_embeddings WHERE expert_slug = $1`, [slug] + ); + const consultCount = await pool.query( + `SELECT COUNT(*) FROM corpus_consult_log WHERE expert_slug = $1`, [slug] + ); + + const latest = bundles.rows[0]; + const ready = latest?.status === "ready"; + + res.setHeader("Access-Control-Allow-Origin", "*"); + res.json({ + slug, + expertName: expert.expert_name, + primaryDomain: expert.primary_domain, + status: latest?.status || "no_bundle", + isReady: ready, + latestBundle: latest || null, + embeddedChunks: parseInt(embCount.rows[0].count), + totalConsults: parseInt(consultCount.rows[0].count), + consultEndpoint: ready ? `/api/agentify/experts/${slug}/consult` : null, + }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── POST /api/agentify/experts/:slug/import-bundle ───────────────────────── + // Admin: register a bundle directly from its S3 manifest (e.g. submitted to staging). + app.post("/api/agentify/experts/:slug/import-bundle", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + const { slug } = req.params; + + try { + // Discover manifest in S3 + let manifestKey = req.body.manifest_s3_key as string | undefined; + if (!manifestKey) { + for (const prefix of [`corpora/${slug}/`, `staging/${slug}/`]) { + const list = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix })); + const m = list.Contents?.find(o => o.Key?.endsWith("manifest.json")); + if (m?.Key) { manifestKey = m.Key; break; } + } + } + if (!manifestKey) return res.status(404).json({ error: "manifest_not_found_in_s3" }); + + const manifest = JSON.parse( + await (await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: manifestKey }))).Body! + .transformToString() as string + ); + + // Discover chunks key + const corpusVersion = manifest.corpus_version || "1.0.0"; + let chunksKey = manifest.chunks_index_r2_key || null; + if (!chunksKey || chunksKey === "") { + chunksKey = await discoverChunksKey(slug, corpusVersion); + } + + // Upsert into DB + await pool.query( + `INSERT INTO agentify_corpus_bundles + (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, + assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, + chunk_count, artifact_count, manifest_r2_key, manifest_sha256, chunks_s3_key, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,'received') + ON CONFLICT (bundle_id) DO UPDATE SET + chunks_s3_key = EXCLUDED.chunks_s3_key, + corpus_version = EXCLUDED.corpus_version, + chunk_count = EXCLUDED.chunk_count, + manifest_r2_key = EXCLUDED.manifest_r2_key, + manifest_sha256 = EXCLUDED.manifest_sha256, + updated_at = NOW()`, + [ + manifest.bundle_id, slug, corpusVersion, + manifest.attestation_uri || "", manifest.attestation_version || "v0.1", + manifest.assembled_by || "naturologie-replit", + manifest.assembled_at ? new Date(manifest.assembled_at) : new Date(), + manifest.embedding_model || null, manifest.embedding_dimensions || null, + manifest.chunk_strategy || "paragraph", + manifest.chunk_count || null, + Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null, + manifestKey, manifest.manifest_sha256 || null, + chunksKey, + ] + ); + + res.json({ + ok: true, + bundle_id: manifest.bundle_id, + expert_slug: slug, + chunk_count: manifest.chunk_count, + chunks_s3_key: chunksKey, + status: "received", + next: `POST /api/agentify/experts/${slug}/distill to generate embeddings`, + }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── POST /api/agentify/experts/:slug/distill ─────────────────────────────── + // Admin. Reads chunks from S3, generates embeddings (batched), stores in DB. + // Streams NDJSON progress events so the caller can watch a long job. + app.post("/api/agentify/experts/:slug/distill", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + const { slug } = req.params; + const { bundle_id, chunks_s3_key: keyOverride } = req.body; + + try { + const bundleQ = bundle_id + ? await pool.query(`SELECT * FROM agentify_corpus_bundles WHERE bundle_id = $1 AND expert_slug = $2`, [bundle_id, slug]) + : await pool.query(`SELECT * FROM agentify_corpus_bundles WHERE expert_slug = $1 ORDER BY received_at DESC LIMIT 1`, [slug]); + + if (!bundleQ.rows.length) { + return res.status(404).json({ + error: "bundle_not_found", + hint: `Run POST /api/agentify/experts/${slug}/import-bundle first, or POST /api/agentify/corpus/notify`, + }); + } + const bundle = bundleQ.rows[0]; + + // Resolve chunks S3 key + let chunksKey: string | null = keyOverride || bundle.chunks_s3_key || null; + if (!chunksKey) { + chunksKey = await discoverChunksKey(slug, bundle.corpus_version); + if (!chunksKey) return res.status(400).json({ error: "chunks_not_found_in_s3", hint: "Pass chunks_s3_key in body" }); + } + + // Save discovered key back to bundle + await pool.query( + `UPDATE agentify_corpus_bundles SET chunks_s3_key = $1, status = 'distilling', updated_at = NOW() WHERE bundle_id = $2`, + [chunksKey, bundle.bundle_id] + ); + + // Stream NDJSON progress + res.setHeader("Content-Type", "application/x-ndjson"); + res.setHeader("Transfer-Encoding", "chunked"); + const emit = (obj: object) => res.write(JSON.stringify(obj) + "\n"); + + emit({ event: "started", bundle_id: bundle.bundle_id, chunksKey, slug }); + + try { + const chunks = await loadNDJSON(chunksKey); + emit({ event: "chunks_loaded", count: chunks.length }); + + let embedded = 0; + let usedFallback = false; + + for (let i = 0; i < chunks.length; i += BATCH_SIZE) { + const batch = chunks.slice(i, i + BATCH_SIZE); + const texts = batch.map((c: any) => { + const t = (c.text || c.content || "").slice(0, 8000); + const title = c.title ? `${c.title}\n\n` : ""; + return title + t; + }); + + const embeddings = await embedTexts(texts); + + for (let j = 0; j < batch.length; j++) { + const c = batch[j]; + const chunkId = c.chunk_id || c.id || `${bundle.bundle_id}-${i + j}`; + const embedding = embeddings ? embeddings[j] : null; + if (!embeddings) usedFallback = true; + + const metadata = { + artifact_id: c.artifact_id, + title: c.title, + page_range: c.page_range, + scopes: c.scopes, + corpus_version: c.corpus_version, + }; + + await pool.query( + `INSERT INTO corpus_embeddings (expert_slug, bundle_id, chunk_id, chunk_text, embedding, metadata) + VALUES ($1,$2,$3,$4,$5,$6) + ON CONFLICT (bundle_id, chunk_id) DO UPDATE + SET embedding = EXCLUDED.embedding, chunk_text = EXCLUDED.chunk_text`, + [slug, bundle.bundle_id, chunkId, texts[j], embedding, metadata] + ); + embedded++; + } + + emit({ event: "progress", embedded, total: chunks.length, pct: Math.round(embedded / chunks.length * 100) }); + } + + const notes = usedFallback + ? `${embedded} chunks stored (FTS only — embedding API unavailable at distillation time; re-run distill when OpenRouter is reachable to upgrade to dense vectors)` + : `${embedded} chunks embedded via ${EMBED_MODEL} at ${new Date().toISOString()}`; + + await pool.query( + `UPDATE agentify_corpus_bundles SET status = 'ready', distillation_notes = $1, updated_at = NOW() WHERE bundle_id = $2`, + [notes, bundle.bundle_id] + ); + + emit({ event: "complete", embedded, retrieval: usedFallback ? "fts" : "dense", status: "ready" }); + res.end(); + } catch (err: any) { + await pool.query( + `UPDATE agentify_corpus_bundles SET status = 'error', distillation_notes = $1, updated_at = NOW() WHERE bundle_id = $2`, + [err.message, bundle.bundle_id] + ); + emit({ event: "error", error: err.message }); + res.end(); + } + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── POST /api/agentify/experts/:slug/consult ─────────────────────────────── + // Consult the expert. Requires X-Partner-Token or X-Admin-Key. + // Optional body.context.ehr — patient EHR text to include in the reasoning. + app.post("/api/agentify/experts/:slug/consult", async (req: Request, res: Response) => { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type, X-Partner-Token, X-Admin-Key"); + const { slug } = req.params; + const { question, context } = req.body; + + if (!question?.trim()) return res.status(400).json({ error: "question_required" }); + + // Auth — partner token required for programmatic access; public demo allowed from agent pages + let partnerId: string | null = null; + if (isAdmin(req)) { + partnerId = "admin"; + } else { + const token = req.headers["x-partner-token"] as string; + if (token) { + try { + const tokenRow = await pool.query( + `SELECT id, partner_name FROM partner_tokens WHERE token = $1 AND is_active = TRUE`, [token] + ); + if (!tokenRow.rows.length) return res.status(403).json({ error: "invalid_partner_token" }); + partnerId = tokenRow.rows[0].partner_name; + } catch { + return res.status(500).json({ error: "token_lookup_failed" }); + } + } else { + // Public demo mode — allow unauthenticated access with IP rate limiting (10 req/hr) + const ip = (req.headers["x-forwarded-for"] as string || req.socket.remoteAddress || "unknown").split(",")[0].trim(); + const now = Date.now(); + const window = publicDemoRateMap.get(ip); + if (window && now - window.ts < 3_600_000 && window.count >= 10) { + return res.status(429).json({ error: "rate_limit", retry_after: Math.ceil((window.ts + 3_600_000 - now) / 60000) + "m" }); + } + if (!window || now - window.ts >= 3_600_000) { + publicDemoRateMap.set(ip, { ts: now, count: 1 }); + } else { + window.count++; + } + partnerId = "public_demo"; + } + } + + try { + // Load expert + bundle + const expertRow = await pool.query(`SELECT * FROM agentify_experts WHERE slug = $1`, [slug]); + if (!expertRow.rows.length) return res.status(404).json({ error: "expert_not_found" }); + const expert = expertRow.rows[0]; + + const bundleRow = await pool.query( + `SELECT * FROM agentify_corpus_bundles WHERE expert_slug = $1 AND status = 'ready' ORDER BY received_at DESC LIMIT 1`, + [slug] + ); + if (!bundleRow.rows.length) { + return res.status(503).json({ error: "expert_not_ready", hint: `Run POST /api/agentify/experts/${slug}/distill first` }); + } + const bundle = bundleRow.rows[0]; + + // Retrieve relevant chunks + let topChunks: Array<{ text: string; similarity: number; title?: string }> = []; + let retrieval = "fts"; + + const questionEmbedding = await embedTexts([question.trim()]); + + if (questionEmbedding) { + // Dense retrieval + const allRows = await pool.query( + `SELECT chunk_text, embedding, metadata FROM corpus_embeddings WHERE bundle_id = $1 AND embedding IS NOT NULL`, + [bundle.bundle_id] + ); + if (allRows.rows.length > 0) { + topChunks = allRows.rows + .map((r: any) => ({ + text: r.chunk_text, + title: r.metadata?.title, + similarity: cosineSim(questionEmbedding[0], r.embedding), + })) + .sort((a, b) => b.similarity - a.similarity) + .slice(0, TOP_K); + retrieval = "dense"; + } + } + + // FTS fallback + if (topChunks.length === 0) { + const ftsRows = await pool.query( + `SELECT chunk_text, metadata, + ts_rank(to_tsvector('english', chunk_text), plainto_tsquery('english', $1)) AS sim + FROM corpus_embeddings WHERE bundle_id = $2 + ORDER BY sim DESC LIMIT $3`, + [question, bundle.bundle_id, TOP_K] + ); + topChunks = ftsRows.rows.map((r: any) => ({ + text: r.chunk_text, + title: r.metadata?.title, + similarity: parseFloat(r.sim), + })); + retrieval = "fts"; + } + + // Build expert-voice prompt + const scopeList = (Array.isArray(expert.scopes) ? expert.scopes : []).join("\n- "); + const refusalList = (Array.isArray(expert.refusals) ? expert.refusals : []).join("\n- "); + + const ehrSection = context?.ehr + ? `\n\n---\nThe user has shared their electronic health record (EHR). Review it in light of your clinical expertise:\n\n${String(context.ehr).slice(0, 4000)}\n---` + : ""; + + const corpusSection = topChunks.length > 0 + ? `\n\nThe following excerpts from your published work are most relevant to this question:\n\n` + + topChunks.map((c, i) => + `[${i + 1}] ${c.title ? `"${c.title}" — ` : ""}${c.text.slice(0, 600)}` + ).join("\n\n") + : ""; + + const credPart = expert.credentials ? ` (${expert.credentials})` : ""; + const instPart = expert.affiliated_institution ? ` at ${expert.affiliated_institution}` : ""; + const scopePart = scopeList || expert.primary_domain || "their field of expertise"; + const refusalPart = refusalList || "questions clearly outside this person's documented expertise"; + + const systemPrompt = +`You are a distilled AI representation of ${expert.expert_name}${credPart}, a recognized authority in ${expert.primary_domain}${instPart}. + +You answer questions in the voice, perspective, and intellectual framework of ${expert.expert_name} — grounded strictly in their published corpus, interviews, books, and documented ideas. You are not a general-purpose AI assistant. You are a focused expert persona who speaks from lived experience and published work. + +Scope — what you address: +- ${scopePart} + +Hard limits — you decline these without exception: +- ${refusalPart} + +When answering, reflect the style and depth characteristic of ${expert.expert_name}. Draw on the corpus excerpts provided. If the question is at the edge of your scope, say so explicitly. If it is clearly outside your scope, redirect to what you can offer.${corpusSection}${ehrSection}`; + + const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY }); + const aiReply = await anthropic.messages.create({ + model: "claude-haiku-4-5", + max_tokens: 2500, + system: systemPrompt, + messages: [{ role: "user", content: question.trim() }], + }); + const answer = aiReply.content[0]?.type === "text" ? aiReply.content[0].text.trim() : ""; + + // Log + await pool.query( + `INSERT INTO corpus_consult_log (expert_slug, partner_id, question, answer, chunks_used, retrieval) + VALUES ($1,$2,$3,$4,$5,$6)`, + [slug, partnerId, question, answer, topChunks.length, retrieval] + ); + + res.json({ + expert: slug, + expertName: expert.expert_name, + answer, + corpusVersion: bundle.corpus_version, + chunksUsed: topChunks.length, + retrieval, + hasEhrContext: !!context?.ehr, + }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── POST /api/agentify/experts/:slug/corpus/submit-url ──────────────────── + // Portal endpoint: submit a single URL to expand an expert's live corpus. + // Fetches + chunks the URL via the gather pipeline, embeds the chunks, + // and inserts them into corpus_embeddings under the expert's existing bundle. + // Requires X-Partner-Token or X-Admin-Key. + app.post("/api/agentify/experts/:slug/corpus/submit-url", async (req: Request, res: Response) => { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type, X-Partner-Token, X-Admin-Key"); + const { slug } = req.params; + const { url, title, type = "article", date } = req.body || {}; + + if (!url) return res.status(400).json({ error: "url_required" }); + + // Auth + let partnerId: string | null = null; + if (isAdmin(req)) { + partnerId = "admin"; + } else { + const token = req.headers["x-partner-token"] as string; + if (!token) return res.status(401).json({ error: "x-partner-token_required" }); + try { + const tokenRow = await pool.query( + `SELECT id, partner_name FROM partner_tokens WHERE token = $1 AND is_active = TRUE`, [token] + ); + if (!tokenRow.rows.length) return res.status(403).json({ error: "invalid_partner_token" }); + partnerId = tokenRow.rows[0].partner_name; + } catch { + return res.status(500).json({ error: "token_lookup_failed" }); + } + } + + try { + // Load expert + bundle + const expertRow = await pool.query(`SELECT * FROM agentify_experts WHERE slug = $1`, [slug]); + if (!expertRow.rows.length) return res.status(404).json({ error: "expert_not_found" }); + const expert = expertRow.rows[0]; + + const bundleRow = await pool.query( + `SELECT * FROM agentify_corpus_bundles WHERE expert_slug = $1 AND status = 'ready' ORDER BY received_at DESC LIMIT 1`, + [slug] + ); + if (!bundleRow.rows.length) { + return res.status(503).json({ + error: "expert_not_ready", + hint: `Expert corpus not distilled yet. Run POST /api/agentify/experts/${slug}/distill first.`, + }); + } + const bundle = bundleRow.rows[0]; + + // Step 1: Call the gather pipeline to fetch + chunk the URL + const port = process.env.PORT || 5000; + const pipelineRes = await fetch(`http://localhost:${port}/api/agentify/corpus/gather/pipeline`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + url, + title, + type, + date: date || "", + corpus_version: bundle.corpus_version || "1.0", + scopes: expert.scopes || [], + target_tokens: 250, + }), + }); + if (!pipelineRes.ok) { + const err = await pipelineRes.json().catch(() => ({ error: "pipeline_error" })) as { error: string }; + return res.status(502).json({ error: "gather_pipeline_failed", detail: err.error || "unknown" }); + } + const pipelineData = await pipelineRes.json() as { + ok: boolean; artifact_id: string; url: string; title: string; + chunk_count: number; chunks_ndjson: string; + }; + + if (!pipelineData.chunks_ndjson || !pipelineData.chunk_count) { + return res.status(422).json({ error: "no_chunks_produced", url }); + } + + // Step 2: Parse chunks + const chunkLines = pipelineData.chunks_ndjson.split("\n").filter(Boolean); + const chunks: Array<{ chunk_id: string; chunk_text: string; metadata: object }> = []; + for (const line of chunkLines) { + try { + const c = JSON.parse(line); + const text = c.chunk_text || c.text; // gather pipeline uses "text", corpus_embeddings uses "chunk_text" + if (c.chunk_id && text) { + chunks.push({ + chunk_id: c.chunk_id, + chunk_text: text, + metadata: { + title: c.title || pipelineData.title, + source_url: c.source_url || pipelineData.url, + type: c.type || type, + artifact_id: c.artifact_id || pipelineData.artifact_id, + seq: c.seq, + }, + }); + } + } catch { /* skip malformed lines */ } + } + + if (!chunks.length) { + return res.status(422).json({ error: "chunk_parse_failed", raw_count: chunkLines.length }); + } + + // Step 3: Embed chunks (batches of 50 to stay within API limits) + const BATCH = 50; + let insertedCount = 0; + for (let i = 0; i < chunks.length; i += BATCH) { + const batch = chunks.slice(i, i + BATCH); + const texts = batch.map(c => c.chunk_text); + const embeddings = await embedTexts(texts); + + for (let j = 0; j < batch.length; j++) { + const c = batch[j]; + const emb = embeddings ? embeddings[j] ?? null : null; + await pool.query( + `INSERT INTO corpus_embeddings (expert_slug, bundle_id, chunk_id, chunk_text, embedding, metadata) + VALUES ($1,$2,$3,$4,$5,$6) + ON CONFLICT (bundle_id, chunk_id) DO NOTHING`, + [slug, bundle.bundle_id, c.chunk_id, c.chunk_text, emb ?? null, c.metadata] + ); + insertedCount++; + } + } + + // Step 4: Update chunk_count in the bundle + const totalEmbRow = await pool.query( + `SELECT COUNT(*) FROM corpus_embeddings WHERE bundle_id = $1`, [bundle.bundle_id] + ); + const newTotal = parseInt(totalEmbRow.rows[0].count || "0"); + await pool.query( + `UPDATE agentify_corpus_bundles SET chunk_count = $1, updated_at = NOW() WHERE bundle_id = $2`, + [newTotal, bundle.bundle_id] + ); + + // Log to source catalog (dedup memory) + await pool.query( + `INSERT INTO agentify_source_catalog (expert_slug, bundle_id, url, canonical, title, type, artifact_id, chunk_count) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8) + ON CONFLICT (expert_slug, url) DO UPDATE + SET canonical = EXCLUDED.canonical, title = EXCLUDED.title, + chunk_count = EXCLUDED.chunk_count, ingested_at = NOW()`, + [slug, bundle.bundle_id, url, pipelineData.url, pipelineData.title, + type, pipelineData.artifact_id, insertedCount] + ); + + console.log(`[stage5/submit-url] ${slug} +${insertedCount} chunks from ${url} (partner: ${partnerId})`); + + res.json({ + ok: true, + artifact_id: pipelineData.artifact_id, + title: pipelineData.title, + url: pipelineData.url, + chunk_count_added: insertedCount, + new_total: newTotal, + retrieval: EMBED_API_KEY ? "dense-enabled" : "fts-only", + cataloged: true, + }); + } catch (e: any) { + console.error("[stage5/submit-url] error:", e.message); + res.status(500).json({ error: e.message }); + } + }); + + // ── GET /api/agentify/experts/:slug/corpus/catalog ──────────────────────── + // Returns all URLs already ingested for this expert (the dedup memory). + // Use this to audit what Agentify has already collected from a source. + app.get("/api/agentify/experts/:slug/corpus/catalog", async (req: Request, res: Response) => { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type, X-Partner-Token, X-Admin-Key"); + const { slug } = req.params; + const limit = Math.min(parseInt(req.query.limit as string || "200"), 1000); + const offset = parseInt(req.query.offset as string || "0"); + const domain = req.query.domain as string || null; + + try { + const expertRow = await pool.query(`SELECT slug FROM agentify_experts WHERE slug = $1`, [slug]); + if (!expertRow.rows.length) return res.status(404).json({ error: "expert_not_found" }); + + const domainFilter = domain + ? `AND (url LIKE $3 OR canonical LIKE $3)` + : ""; + const params: any[] = [slug, limit + 1]; + if (domain) params.push(`%${domain}%`); + + const rows = await pool.query( + `SELECT url, canonical, title, type, artifact_id, chunk_count, ingested_at + FROM agentify_source_catalog + WHERE expert_slug = $1 ${domainFilter} + ORDER BY ingested_at DESC + LIMIT $2 OFFSET ${offset}`, + params + ); + const hasMore = rows.rows.length > limit; + const items = hasMore ? rows.rows.slice(0, limit) : rows.rows; + + const totalRow = await pool.query( + `SELECT COUNT(*) FROM agentify_source_catalog WHERE expert_slug = $1`, [slug] + ); + + res.json({ + expert_slug: slug, + total_cataloged: parseInt(totalRow.rows[0].count || "0"), + returned: items.length, + has_more: hasMore, + offset, + items, + }); + } catch (e: any) { + res.status(500).json({ error: e.message }); + } + }); + + // ── POST /api/agentify/experts/:slug/corpus/submit-source ───────────────── + // The "vacuum" endpoint. Point it at a blog, RSS feed, or domain root and + // Agentify discovers all content there, cross-references the source catalog + // to find what it doesn't already have, and ingests only the new pieces. + // + // Body: { + // url: string — blog index, RSS feed, single article, or domain root + // type?: string — 'article' | 'blog' | 'rss' | 'auto' (default: 'auto') + // max_articles?: number — cap on how many new articles to ingest (default: 50) + // } + // Response: { + // ok, discovered, already_known, newly_ingested, chunk_count_added, + // new_total, skipped_urls, ingested_urls + // } + app.post("/api/agentify/experts/:slug/corpus/submit-source", async (req: Request, res: Response) => { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type, X-Partner-Token, X-Admin-Key"); + const { slug } = req.params; + const { url, type = "auto", max_articles = 50 } = req.body || {}; + + if (!url) return res.status(400).json({ error: "url_required" }); + const maxNew = Math.min(parseInt(max_articles) || 50, 200); + + // Auth + let partnerId: string | null = null; + if (isAdmin(req)) { + partnerId = "admin"; + } else { + const token = req.headers["x-partner-token"] as string; + if (!token) return res.status(401).json({ error: "x-partner-token_required" }); + try { + const tokenRow = await pool.query( + `SELECT id, partner_name FROM partner_tokens WHERE token = $1 AND is_active = TRUE`, [token] + ); + if (!tokenRow.rows.length) return res.status(403).json({ error: "invalid_partner_token" }); + partnerId = tokenRow.rows[0].partner_name; + } catch { return res.status(500).json({ error: "token_lookup_failed" }); } + } + + try { + // Load expert + ready bundle + const expertRow = await pool.query(`SELECT * FROM agentify_experts WHERE slug = $1`, [slug]); + if (!expertRow.rows.length) return res.status(404).json({ error: "expert_not_found" }); + const expert = expertRow.rows[0]; + + const bundleRow = await pool.query( + `SELECT * FROM agentify_corpus_bundles WHERE expert_slug = $1 AND status = 'ready' ORDER BY received_at DESC LIMIT 1`, + [slug] + ); + if (!bundleRow.rows.length) return res.status(503).json({ error: "expert_not_ready" }); + const bundle = bundleRow.rows[0]; + + const port = process.env.PORT || 5000; + const base = `http://localhost:${port}`; + + // ── Step 1: Determine if this is a single article or an index/feed ──── + // Single article: has a content path pattern (year/month/slug, /p/, /posts/) + // Index/feed: root domain, /blog/, /essays/, /feed/, .xml, etc. + const parsedUrl = new URL(url); + const isSingleArticle = ( + type === "article" || + (type === "auto" && /\/\d{4}\/|\/posts?\/|\/articles?\/|\/essay|\/p\/[^/]+$/.test(parsedUrl.pathname) && parsedUrl.pathname.length > 5) + ); + + let discoveredUrls: Array<{ href: string; text: string; pub_date?: string; type: string }> = []; + + if (isSingleArticle) { + // Single article — treat it as one URL to check/ingest + discoveredUrls = [{ href: url, text: "", type: "article" }]; + } else { + // ── Step 2: Discover article links via blog-index endpoint ────────── + const indexRes = await fetch(`${base}/api/agentify/corpus/gather/blog-index`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ url, max_links: maxNew * 3 }), // fetch extra since we'll filter + }); + if (!indexRes.ok) { + // Fall back to treating it as a single article + discoveredUrls = [{ href: url, text: "", type: "article" }]; + } else { + const indexData = await indexRes.json() as { + ok: boolean; + links: Array<{ href: string; text: string; pub_date?: string }>; + count: number; + method: string; + }; + discoveredUrls = (indexData.links || []).map(l => ({ ...l, type: "article" })); + } + } + + const discovered = discoveredUrls.length; + + // ── Step 3: Cross-reference with source catalog ───────────────────── + // Load all known URLs for this expert in one query + const knownRows = await pool.query( + `SELECT url, canonical FROM agentify_source_catalog WHERE expert_slug = $1`, + [slug] + ); + const knownUrls = new Set(); + for (const r of knownRows.rows) { + if (r.url) knownUrls.add(r.url.split("?")[0].replace(/\/$/, "")); + if (r.canonical) knownUrls.add(r.canonical.split("?")[0].replace(/\/$/, "")); + } + + const newUrls = discoveredUrls.filter(u => { + const normalized = u.href.split("?")[0].replace(/\/$/, ""); + return !knownUrls.has(normalized); + }).slice(0, maxNew); + + const alreadyKnown = discovered - newUrls.length; + + if (!newUrls.length) { + const totalEmbRow = await pool.query( + `SELECT COUNT(*) FROM corpus_embeddings WHERE bundle_id = $1`, [bundle.bundle_id] + ); + return res.json({ + ok: true, + source: url, + discovered, + already_known: alreadyKnown, + newly_ingested: 0, + chunk_count_added: 0, + new_total: parseInt(totalEmbRow.rows[0].count || "0"), + ingested_urls: [], + message: `All ${discovered} discovered URLs already in corpus. Nothing new to ingest.`, + }); + } + + // ── Step 4: Ingest each new URL via the gather pipeline ────────────── + const ingestedUrls: Array<{ url: string; title: string; chunks: number }> = []; + const skippedUrls: string[] = []; + let totalChunkCount = 0; + const ARTICLE_BATCH = 50; // embed batch size per article + + for (const article of newUrls) { + try { + const pipelineRes = await fetch(`${base}/api/agentify/corpus/gather/pipeline`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + url: article.href, + type: article.type || "article", + date: article.pub_date || "", + corpus_version: bundle.corpus_version || "1.0", + scopes: expert.scopes || [], + target_tokens: 250, + }), + }); + + if (!pipelineRes.ok) { skippedUrls.push(article.href); continue; } + const pData = await pipelineRes.json() as { + ok: boolean; artifact_id: string; url: string; title: string; + chunk_count: number; chunks_ndjson: string; + }; + + if (!pData.chunks_ndjson || !pData.chunk_count) { skippedUrls.push(article.href); continue; } + + // Detect Cloudflare challenge pages (minimal content, generic title) + const cfTitles = ["just a moment", "attention required", "checking your browser", "please wait"]; + if (pData.chunk_count <= 2 && cfTitles.some(t => pData.title?.toLowerCase().includes(t))) { + skippedUrls.push(article.href); + continue; + } + + // Parse + embed chunks + const chunkLines = pData.chunks_ndjson.split("\n").filter(Boolean); + const chunks: Array<{ chunk_id: string; chunk_text: string; metadata: object }> = []; + for (const line of chunkLines) { + try { + const c = JSON.parse(line); + const text = c.chunk_text || c.text; + if (c.chunk_id && text) { + chunks.push({ + chunk_id: c.chunk_id, + chunk_text: text, + metadata: { title: c.title || pData.title, source_url: c.source_url || pData.url, type: c.type || "article", artifact_id: c.artifact_id || pData.artifact_id, seq: c.seq }, + }); + } + } catch { /* skip */ } + } + + let articleChunkCount = 0; + for (let i = 0; i < chunks.length; i += ARTICLE_BATCH) { + const batch = chunks.slice(i, i + ARTICLE_BATCH); + const embeddings = await embedTexts(batch.map(c => c.chunk_text)); + for (let j = 0; j < batch.length; j++) { + const c = batch[j]; + await pool.query( + `INSERT INTO corpus_embeddings (expert_slug, bundle_id, chunk_id, chunk_text, embedding, metadata) + VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT (bundle_id, chunk_id) DO NOTHING`, + [slug, bundle.bundle_id, c.chunk_id, c.chunk_text, embeddings ? (embeddings[j] ?? null) : null, c.metadata] + ); + articleChunkCount++; + } + } + + // Record in source catalog + await pool.query( + `INSERT INTO agentify_source_catalog (expert_slug, bundle_id, url, canonical, title, type, artifact_id, chunk_count) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8) + ON CONFLICT (expert_slug, url) DO UPDATE + SET canonical=EXCLUDED.canonical, title=EXCLUDED.title, chunk_count=EXCLUDED.chunk_count, ingested_at=NOW()`, + [slug, bundle.bundle_id, article.href, pData.url, pData.title, "article", pData.artifact_id, articleChunkCount] + ); + + ingestedUrls.push({ url: pData.url, title: pData.title, chunks: articleChunkCount }); + totalChunkCount += articleChunkCount; + + } catch (e: any) { + skippedUrls.push(article.href); + console.warn(`[stage5/submit-source] skipped ${article.href}: ${e.message}`); + } + } + + // Update bundle chunk count + const totalEmbRow = await pool.query( + `SELECT COUNT(*) FROM corpus_embeddings WHERE bundle_id = $1`, [bundle.bundle_id] + ); + const newTotal = parseInt(totalEmbRow.rows[0].count || "0"); + await pool.query( + `UPDATE agentify_corpus_bundles SET chunk_count = $1, updated_at = NOW() WHERE bundle_id = $2`, + [newTotal, bundle.bundle_id] + ); + + console.log(`[stage5/submit-source] ${slug} | source=${url} | discovered=${discovered} known=${alreadyKnown} new=${ingestedUrls.length} chunks=${totalChunkCount} (partner: ${partnerId})`); + + res.json({ + ok: true, + source: url, + discovered, + already_known: alreadyKnown, + newly_ingested: ingestedUrls.length, + chunk_count_added: totalChunkCount, + new_total: newTotal, + skipped: skippedUrls.length, + skipped_urls: skippedUrls, + ingested_urls: ingestedUrls, + }); + } catch (e: any) { + console.error("[stage5/submit-source] error:", e.message); + res.status(500).json({ error: e.message }); + } + }); + + // ── POST /api/agentify/admin/batch-scan ──────────────────────────────────── + // Admin: scan S3 staging/ and corpora/ prefixes for deposited manifests, + // then register any slug not already in agentify_corpus_bundles as "received" + // so the queue worker picks them up. + // S3-first approach: discovers slugs from Vultr deposits (not from agentify_experts). + // Idempotent — safe to call multiple times (ON CONFLICT DO NOTHING). + app.post("/api/agentify/admin/batch-scan", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + + const limit = Math.min(parseInt(req.body.limit ?? "500"), 1000); + const dryRun = req.body.dry_run === true; + + try { + // Step 1: Collect all slugs deposited on S3 (staging/ and corpora/ top-level prefixes) + const s3Slugs = new Set(); + for (const topPrefix of ["staging/", "corpora/"]) { + let continuationToken: string | undefined; + do { + const list = await s3.send(new ListObjectsV2Command({ + Bucket: S3_BUCKET, + Prefix: topPrefix, + Delimiter: "/", + MaxKeys: 1000, + ContinuationToken: continuationToken, + })); + for (const cp of list.CommonPrefixes ?? []) { + // e.g. "staging/dr-andrew-huberman/" → "dr-andrew-huberman" + const slug = cp.Prefix!.replace(topPrefix, "").replace(/\/$/, ""); + if (slug) s3Slugs.add(slug); + } + continuationToken = list.IsTruncated ? list.NextContinuationToken : undefined; + } while (continuationToken); + } + + // Step 2: Find which slugs already have a bundle record + const slugArr = Array.from(s3Slugs); + const { rows: existing } = slugArr.length > 0 + ? await pool.query( + `SELECT DISTINCT expert_slug FROM agentify_corpus_bundles WHERE expert_slug = ANY($1)`, + [slugArr] + ) + : { rows: [] as { expert_slug: string }[] }; + const registeredSlugs = new Set(existing.map((r: { expert_slug: string }) => r.expert_slug)); + + // Step 3: Candidates = S3 slugs with no bundle yet + const candidates = slugArr.filter(s => !registeredSlugs.has(s)).slice(0, limit); + + const results: { slug: string; status: string; bundle_id?: string; chunks?: number; error?: string }[] = []; + + for (const slug of candidates) { + try { + // Find manifest under staging/{slug}/ or corpora/{slug}/ + let manifestKey: string | null = null; + for (const prefix of [`staging/${slug}/`, `corpora/${slug}/`]) { + const list = await s3.send(new ListObjectsV2Command({ Bucket: S3_BUCKET, Prefix: prefix })); + const m = list.Contents?.find(o => o.Key?.endsWith("manifest.json")); + if (m?.Key) { manifestKey = m.Key; break; } + } + + if (!manifestKey) { + results.push({ slug, status: "no_manifest_on_s3" }); + continue; + } + + if (dryRun) { + results.push({ slug, status: "found_dry_run", bundle_id: manifestKey }); + continue; + } + + const manifest = JSON.parse( + await (await s3.send(new GetObjectCommand({ Bucket: S3_BUCKET, Key: manifestKey }))).Body! + .transformToString() + ); + + const corpusVersion = manifest.corpus_version || "1.0.0"; + let chunksKey: string | null = manifest.chunks_index_r2_key || null; + if (!chunksKey) chunksKey = await discoverChunksKey(slug, corpusVersion); + + await pool.query( + `INSERT INTO agentify_corpus_bundles + (bundle_id, expert_slug, corpus_version, attestation_uri, attestation_version, + assembled_by, assembled_at, embedding_model, embedding_dimensions, chunk_strategy, + chunk_count, artifact_count, manifest_r2_key, manifest_sha256, chunks_s3_key, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,'received') + ON CONFLICT (bundle_id) DO NOTHING`, + [ + manifest.bundle_id, slug, corpusVersion, + manifest.attestation_uri || "", manifest.attestation_version || "v0.1", + manifest.assembled_by || "naturologie-replit", + manifest.assembled_at ? new Date(manifest.assembled_at) : new Date(), + manifest.embedding_model || null, manifest.embedding_dimensions || null, + manifest.chunk_strategy || "paragraph", + manifest.chunk_count || null, + Array.isArray(manifest.artifacts) ? manifest.artifacts.length : null, + manifestKey, manifest.manifest_sha256 || null, chunksKey, + ] + ); + + results.push({ slug, status: "registered", bundle_id: manifest.bundle_id, chunks: manifest.chunk_count }); + } catch (err: any) { + results.push({ slug, status: "error", error: err.message }); + } + } + + const registered = results.filter(r => r.status === "registered").length; + const noManifest = results.filter(r => r.status === "no_manifest_on_s3").length; + const errors = results.filter(r => r.status === "error").length; + + res.json({ + ok: true, + dry_run: dryRun, + s3_slugs_found: slugArr.length, + already_registered: registeredSlugs.size, + candidates: candidates.length, + registered, + no_manifest_on_s3: noManifest, + errors, + results, + next: registered > 0 + ? `Queue worker will pick up the ${registered} registered bundles within 5 minutes. Or call POST /api/agentify/admin/trigger-queue to start immediately.` + : candidates.length === 0 + ? "All S3 deposits are already registered. Nothing new to do." + : "No new bundles registered.", + }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── POST /api/agentify/admin/trigger-queue ───────────────────────────────── + // Admin: immediately trigger queue worker processing without waiting for the 5-min poll. + // The queue worker picks up to MAX_CONCURRENT 'received' bundles per call. + app.post("/api/agentify/admin/trigger-queue", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + + try { + const { rows: pending } = await pool.query( + `SELECT bundle_id, expert_slug, status, chunk_count + FROM agentify_corpus_bundles + WHERE status = 'received' + ORDER BY received_at ASC + LIMIT 50` + ); + + res.json({ + ok: true, + received_bundles: pending.length, + queue: pending.map(r => ({ bundle_id: r.bundle_id, expert_slug: r.expert_slug, chunk_count: r.chunk_count })), + message: pending.length > 0 + ? `${pending.length} bundle(s) in received state. Queue worker will process them. Call this endpoint periodically or wait for the 5-minute auto-poll.` + : "No bundles in received state. Run POST /api/agentify/admin/batch-scan first.", + }); + + // Trigger the background worker asynchronously (don't await — returns immediately) + if (pending.length > 0) { + setImmediate(async () => { + // Process up to 5 concurrently + const batch = pending.slice(0, 5); + await Promise.allSettled( + batch.map((row: any) => + distillBundleBackground(row.bundle_id, row.expert_slug, row.chunks_s3_key ?? undefined) + .then((r: any) => console.log(`[trigger-queue] ${row.expert_slug}: ${r.ok ? r.embedded + " chunks" : r.error}`)) + .catch((e: any) => console.warn(`[trigger-queue] ${row.expert_slug} error:`, e.message)) + ) + ); + console.log(`[trigger-queue] batch of ${batch.length} complete`); + }); + } + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── GET /api/agentify/admin/queue-status ─────────────────────────────────── + // Admin: show full queue state. + app.get("/api/agentify/admin/queue-status", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + try { + const { rows } = await pool.query( + `SELECT status, COUNT(*) as count, SUM(chunk_count) as total_chunks + FROM agentify_corpus_bundles + GROUP BY status ORDER BY count DESC` + ); + const { rows: proposed } = await pool.query( + `SELECT COUNT(*) as proposed_experts FROM agentify_experts WHERE status = 'proposed'` + ); + const { rows: noBundle } = await pool.query( + `SELECT COUNT(*) as experts_without_bundles + FROM agentify_experts e + LEFT JOIN agentify_corpus_bundles b ON e.expert_slug = b.expert_slug + WHERE e.status = 'proposed' AND b.bundle_id IS NULL` + ); + res.json({ + bundle_status_breakdown: rows, + proposed_experts_total: parseInt(proposed[0].proposed_experts), + experts_without_bundles: parseInt(noBundle[0].experts_without_bundles), + recommended_action: parseInt(noBundle[0].experts_without_bundles) > 0 + ? "POST /api/agentify/admin/batch-scan to register S3 bundles, then POST /api/agentify/admin/trigger-queue to start distillation" + : "All experts have bundles registered. POST /api/agentify/admin/trigger-queue if any are in received state.", + }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // ── Naturologie bulk-match pipeline ─────────────────────────────────────── + // Converts Naturologie's expert slugs → WellSpr.ing agent slugs. + // Experts not already in the registry are auto-registered as proposed. + + function slugify(name: string): string { + return name + .replace(/^(Dr\.?|Prof\.?|Professor|Drs?\.)\s+/i, "") + .replace(/,?\s+(MD|DO|PhD|DVM|RD|RDN|ND|NP|PA|MS|MPH|MBA|DDS|DC|OD|LAc|DNP|CRNA|CNM|APRN|CNS|CRNP|FACC|FACS|FRCP|FRCSC|FAAN|FASN|FACP|FASGE|FACG|AGAF|CGC|CSCS|CISSN|FRCPC|FRCSC|FACOG|FRCOG|FRCR|FRCPSC)\b/gi, "") + .trim() + .toLowerCase() + .replace(/[^a-z0-9]+/g, "-") + .replace(/^-|-$/g, ""); + } + + // POST /api/agentify/admin/naturologie/match + // Body: { experts: [{ our_slug, display_name, credentials, specialty_tags, wellspring_slug? }] } + // Returns: { matches: [{ our_slug, wellspring_slug, status }], summary } + app.post("/api/agentify/admin/naturologie/match", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + try { + const experts: Array<{ + our_slug: string; + display_name: string; + credentials?: string; + specialty_tags?: string[]; + wellspring_slug?: string; + }> = Array.isArray(req.body) ? req.body : (req.body?.experts ?? []); + + if (!experts.length) return res.status(400).json({ error: "No experts provided. Send { experts: [...] } or a bare array." }); + + const matches: Array<{ our_slug: string; wellspring_slug: string; status: "matched" | "created" | "skipped" }> = []; + let matchedCount = 0, createdCount = 0; + + for (const expert of experts) { + const { our_slug, display_name, credentials, specialty_tags } = expert; + + // If already mapped, skip + if (expert.wellspring_slug) { + matches.push({ our_slug, wellspring_slug: expert.wellspring_slug, status: "skipped" }); + continue; + } + + const candidateSlug = slugify(display_name); + const domain = Array.isArray(specialty_tags) && specialty_tags.length + ? specialty_tags.slice(0, 3).join(", ") + : "medicine"; + + // 1. Check exact slug match + const { rows: bySlug } = await pool.query( + `SELECT slug FROM agentify_experts WHERE slug = $1 LIMIT 1`, [candidateSlug] + ); + if (bySlug.length) { + matches.push({ our_slug, wellspring_slug: bySlug[0].slug, status: "matched" }); + matchedCount++; + continue; + } + + // 2. Check fuzzy name match (case-insensitive) + const { rows: byName } = await pool.query( + `SELECT slug FROM agentify_experts WHERE lower(expert_name) = lower($1) LIMIT 1`, [display_name] + ); + if (byName.length) { + matches.push({ our_slug, wellspring_slug: byName[0].slug, status: "matched" }); + matchedCount++; + continue; + } + + // 3. Check if slug with common suffix variants exists + const slugVariants = [ + candidateSlug + "-md", + candidateSlug + "-phd", + candidateSlug + "-do", + candidateSlug + "-nd", + candidateSlug + "-rdn", + ]; + let variantMatch: string | null = null; + for (const v of slugVariants) { + const { rows: vr } = await pool.query( + `SELECT slug FROM agentify_experts WHERE slug = $1 LIMIT 1`, [v] + ); + if (vr.length) { variantMatch = vr[0].slug; break; } + } + if (variantMatch) { + matches.push({ our_slug, wellspring_slug: variantMatch, status: "matched" }); + matchedCount++; + continue; + } + + // 4. Not found — auto-register as proposed + // Ensure slug uniqueness with suffix if needed + let finalSlug = candidateSlug; + let suffix = 2; + while (true) { + const { rows: dup } = await pool.query( + `SELECT id FROM agentify_experts WHERE slug = $1`, [finalSlug] + ); + if (!dup.length) break; + finalSlug = `${candidateSlug}-${suffix++}`; + } + + await pool.query( + `INSERT INTO agentify_experts + (slug, expert_name, credentials, primary_domain, affiliated_institution, status) + VALUES ($1, $2, $3, $4, $5, 'proposed') + ON CONFLICT (slug) DO NOTHING`, + [finalSlug, display_name, credentials || null, domain, "Naturologie"] + ); + + matches.push({ our_slug, wellspring_slug: finalSlug, status: "created" }); + createdCount++; + } + + res.json({ + matches, + summary: { + total: experts.length, + matched: matchedCount, + created: createdCount, + skipped: matches.filter(m => m.status === "skipped").length, + }, + }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); + + // GET /api/agentify/admin/naturologie/status + // Returns current WellSpr.ing expert list filtered to Naturologie-sourced + health-adjacent + app.get("/api/agentify/admin/naturologie/status", async (req: Request, res: Response) => { + if (!isAdmin(req)) return res.status(403).json({ error: "admin only" }); + try { + const { rows } = await pool.query(` + SELECT e.slug, e.expert_name, e.credentials, e.primary_domain, e.affiliated_institution, + e.status, cb.status AS corpus_status, cb.chunk_count + FROM agentify_experts e + LEFT JOIN agentify_corpus_bundles cb ON cb.expert_slug = e.slug AND cb.status = 'ready' + WHERE e.affiliated_institution = 'Naturologie' + OR e.status IN ('live','preview') + ORDER BY e.status, e.expert_name + `); + res.json({ count: rows.length, experts: rows }); + } catch (e: any) { res.status(500).json({ error: e.message }); } + }); +} + +// ── Exported: background distillation (no HTTP streaming) ───────────────── +// Called by queue-worker.ts to process received bundles automatically. +export async function distillBundleBackground( + bundleId: string, + expertSlug: string, + chunksKeyHint?: string +): Promise<{ ok: boolean; embedded: number; retrieval: string; error?: string }> { + const tag = `[distill-bg:${expertSlug}]`; + try { + let chunksKey = chunksKeyHint || null; + + const bq = await pool.query( + `SELECT * FROM agentify_corpus_bundles WHERE bundle_id = $1`, [bundleId] + ); + if (!bq.rows.length) return { ok: false, embedded: 0, retrieval: "none", error: "bundle_not_found" }; + const bundle = bq.rows[0]; + + if (!chunksKey) { + chunksKey = bundle.chunks_s3_key || await discoverChunksKey(expertSlug, bundle.corpus_version); + } + if (!chunksKey) { + // Mark as failed so the queue doesn't retry this bundle indefinitely + await pool.query( + `UPDATE agentify_corpus_bundles SET status = 'failed', distillation_notes = 'chunks_not_found_in_s3 — no chunks.ndjson key found for this bundle', updated_at = NOW() WHERE bundle_id = $1`, + [bundleId] + ).catch(() => {}); + console.warn(`${tag} chunks_not_found_in_s3 — marked bundle ${bundleId} as failed`); + return { ok: false, embedded: 0, retrieval: "none", error: "chunks_not_found_in_s3" }; + } + + await pool.query( + `UPDATE agentify_corpus_bundles SET chunks_s3_key = $1, status = 'distilling', updated_at = NOW() WHERE bundle_id = $2`, + [chunksKey, bundleId] + ); + + const chunks = await loadNDJSON(chunksKey); + console.log(`${tag} loaded ${chunks.length} chunks from ${chunksKey}`); + + let embedded = 0; + let usedFallback = false; + + for (let i = 0; i < chunks.length; i += BATCH_SIZE) { + const batch = chunks.slice(i, i + BATCH_SIZE); + const texts = batch.map((c: any) => { + const t = (c.text || c.content || "").slice(0, 8000); + // Strip null bytes (\u0000) — Postgres UTF-8 encoding rejects them + return ((c.title ? `${c.title}\n\n` : "") + t).replace(/\u0000/g, ""); + }); + + const embeddings = await embedTexts(texts); + if (!embeddings) usedFallback = true; + + for (let j = 0; j < batch.length; j++) { + const c = batch[j]; + const chunkId = c.chunk_id || c.id || `${bundleId}-${i + j}`; + const metadata = { + artifact_id: c.artifact_id, title: c.title, + page_range: c.page_range, scopes: c.scopes, corpus_version: c.corpus_version, + }; + await pool.query( + `INSERT INTO corpus_embeddings (expert_slug, bundle_id, chunk_id, chunk_text, embedding, metadata) + VALUES ($1,$2,$3,$4,$5,$6) + ON CONFLICT (bundle_id, chunk_id) DO UPDATE + SET embedding = EXCLUDED.embedding, chunk_text = EXCLUDED.chunk_text`, + [expertSlug, bundleId, chunkId, texts[j], embeddings ? embeddings[j] : null, metadata] + ); + embedded++; + } + } + + const retrieval = usedFallback ? "fts" : "dense"; + const notes = usedFallback + ? `${embedded} chunks stored (FTS only — embedding API unavailable)` + : `${embedded} chunks embedded via ${EMBED_MODEL} at ${new Date().toISOString()}`; + + await pool.query( + `UPDATE agentify_corpus_bundles SET status = 'ready', distillation_notes = $1, updated_at = NOW() WHERE bundle_id = $2`, + [notes, bundleId] + ); + + // Also update subject registry status if present + await pool.query( + `UPDATE agentify_subject_registry SET status = 'live', updated_at = NOW() + WHERE subject_slug = $1 AND status NOT IN ('rejected')`, + [expertSlug] + ).catch(() => {}); + + console.log(`${tag} done: ${embedded} chunks, retrieval=${retrieval}`); + return { ok: true, embedded, retrieval }; + } catch (err: any) { + console.error(`${tag} error:`, err.message); + await pool.query( + `UPDATE agentify_corpus_bundles SET status = 'failed', distillation_notes = $1, updated_at = NOW() WHERE bundle_id = $2`, + [String(err.message).slice(0, 500), bundleId] + ).catch(() => {}); + return { ok: false, embedded: 0, retrieval: "none", error: err.message }; + } +}