require("dotenv").config(); const postgre = require('../database/postgre'); const express = require("express"); const cors = require("cors"); const OpenAI = require("openai"); const { pipeline } = require("@xenova/transformers"); const { QdrantClient } = require("@qdrant/js-client-rest"); const { v4: uuidv4 } = require("uuid"); async function getOrCreateSession(user_id) { const result = await postgre.query( ` SELECT * FROM user_sessions WHERE user_id = $1 AND is_active = true AND last_activity > NOW() - INTERVAL '30 minutes' ORDER BY last_activity DESC LIMIT 1 `, [user_id] ); if (result.rows.length) { const session = result.rows[0]; await postgre.query( `UPDATE user_sessions SET last_activity = NOW() WHERE id = $1`, [session.id] ); return session.session_id; } const session_id = uuidv4(); await postgre.query( ` INSERT INTO user_sessions (user_id, session_id) VALUES ($1, $2) `, [user_id, session_id] ); return session_id; } const CONFIG = { azure: { endpoint: process.env.AZURE_OPENAI_ENDPOINT, deployment: process.env.AZURE_DEPLOYMENT, apiVersion: process.env.AZURE_API_VERSION, apiKey: process.env.AZURE_OPENAI_KEY, }, qdrant: { url: process.env.QDRANT_URL, collection: process.env.QDRANT_COLLECTION, }, search: { topK: 20, minScore: 0.10, maxContextDocs: 10, }, port: process.env.PORT || 5000, }; // ─── Clients ────────────────────────────────────────────────────────────────── const llm = new OpenAI({ baseURL: `${CONFIG.azure.endpoint}/openai/deployments/${CONFIG.azure.deployment}`, apiKey: CONFIG.azure.apiKey, defaultHeaders: { "api-key": CONFIG.azure.apiKey }, defaultQuery: { "api-version": CONFIG.azure.apiVersion }, }); const qdrant = new QdrantClient({ url: CONFIG.qdrant.url, checkCompatibility: false, timeout: 30000, }); // ─── Embedding model (singleton, lazy-init) ─────────────────────────────────── let _embedder = null; async function getEmbedder() { if (!_embedder) { console.log("Loading MiniLM model..."); _embedder = await pipeline("feature-extraction", "Xenova/all-MiniLM-L6-v2"); console.log("Embedding model ready"); } return _embedder; } async function createEmbedding(text) { const model = await getEmbedder(); const out = await model(text, { pooling: "mean", normalize: true }); return Array.from(out.data); } // ─── Qdrant search ──────────────────────────────────────────────────────────── async function searchQdrant(embedding, { topK, minScore, maxContextDocs } = CONFIG.search) { const results = await qdrant.search(CONFIG.qdrant.collection, { vector: embedding, limit: topK, with_payload: true, score_threshold: minScore, }); console.log(`Qdrant returned ${results.length} results (threshold: ${minScore})`); return results .sort((a, b) => b.score - a.score) .slice(0, maxContextDocs); } // ─── Build LLM context string ───────────────────────────────────────────────── function buildContext(results) { return results .map((item, i) => `[${i + 1}] File: ${item.payload?.file ?? "unknown"} | Page: ${item.payload?.page ?? "?"}\n${item.payload?.text ?? ""}` ) .join("\n\n---\n\n"); } // const SYSTEM_PROMPT = ` // You are CPM AI Assistant. // Rules: // - Answer only from the provided information. // - If the answer is not available, reply exactly: // "❌ I could not find this information in the uploaded documents." // - Do not make up information. // - Do not mention documents, context, or chunks. // - Reply in the same language and style as the user's question. // - If the user asks in Hindi, answer in Hindi. // - If the user asks in Hinglish, answer in Hinglish. // - If the user asks in English, answer in English. // Response Style: // - Use simple and easy-to-understand language. // - Keep answers short and clear. // - Use headings and bullet points when helpful. // - Highlight important words in **bold**. // Format: // # 📋 Topic // ## 🎯 Summary // Short answer in the user's language. // ## ✅ Details // - Point 1 // - Point 2 // - Point 3 // ## ⚠️ Notes // - Extra information (if available). // `.trim(); const SYSTEM_PROMPT = ` You are CPM AI Assistant. RULES: - Answer only using the provided context. - If the answer is not available in the context, reply exactly: "❌ I could not find this information in the uploaded documents." - Do not make up information. - Do not use external knowledge. - Do not mention context, documents, chunks, or sources. STRICT OUTPUT RULE: - NEVER include words like "documents.", "context", "chunk", or similar metadata in the final answer. - NEVER end the response with the word "documents." or any system-related word. - Ensure the final sentence always ends naturally and cleanly. LANGUAGE RULE: - Reply in the same language as the user (English, Hindi, Hinglish). RESPONSE STYLE: - Use simple, clear language. - Keep answers short and structured. - Use headings and bullet points when needed. - Highlight important words in **bold**. FORMAT: # 📋 Topic ## 🎯 Summary Short answer. ## ✅ Details - Point 1 - Point 2 - Point 3 ## ⚠️ Notes - Only if needed `.trim(); async function askLLM(question, context) { const completion = await llm.chat.completions.create({ model: CONFIG.azure.deployment, temperature: 0, max_tokens: 1500, messages: [ { role: "system", content: SYSTEM_PROMPT }, { role: "user", content: `Context:\n${context}\n\nQuestion:\n${question}` }, ], }); return completion.choices[0].message.content; } const health = async (req, res) => { res.json({ status: "ok", model: CONFIG.azure.deployment, collection: CONFIG.qdrant.collection }); } const ask = async (req, res) => { const { question } = req.body ?? {}; if (!question?.trim()) { return res.status(400).json({ success: false, error: "question is required" }); } const t0 = Date.now(); try { const embedding = await createEmbedding(question.trim()); const results = await searchQdrant(embedding); if (!results.length) { return res.json({ success: true, question, answer: "❌ I could not find this information in the uploaded documents.", sources: [], ms: Date.now() - t0, }); } const context = buildContext(results); const answer = await askLLM(question, context); return res.json({ success: true, question, answer, sources: results.map(r => ({ score: +r.score.toFixed(4), file: r.payload?.file, page: r.payload?.page, chunk: r.payload?.chunk, })), ms: Date.now() - t0, }); } catch (err) { console.error("❌ /ask error:", err); return res.status(500).json({ success: false, error: err.message }); } } const askstream = async (req, res) => { const { question } = req.body ?? {}; const user_id = req.user.id; const session_id = await getOrCreateSession(user_id); if (!question?.trim()) { return res.status(400).json({ success: false, error: "question is required" }); } res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.flushHeaders(); const send = (event, data) => res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); try { send("status", { message: "🔍 Searching documents..." }); const embedding = await createEmbedding(question.trim()); const results = await searchQdrant(embedding); if (!results.length) { console.log("No results found for question:", question); send("token", { token: "❌", isWord: true }); send("token", { token: "I", isWord: true }); send("token", { token: "could", isWord: true }); send("token", { token: "not", isWord: true }); send("token", { token: "find", isWord: true }); send("token", { token: "this", isWord: true }); send("token", { token: "information", isWord: true }); send("token", { token: "in", isWord: true }); send("token", { token: "the", isWord: true }); send("token", { token: "uploaded", isWord: true }); send("token", { token: "documents.", isWord: true }); send("done", { sources: [] }); await postgre.query( ` INSERT INTO useraskquestion (user_id, session_id, questions, status) VALUES ($1, $2, $3, $4) `, [user_id, session_id, question, '0'] ); return res.end(); } await postgre.query( ` INSERT INTO useraskquestion (user_id, session_id, questions, status) VALUES ($1, $2, $3, $4) `, [user_id, session_id, question, '1'] ); const sources = results.map(r => ({ score: +r.score.toFixed(4), file: r.payload?.file, page: r.payload?.page, chunk: r.payload?.chunk, })); send("sources", { sources }); send("status", { message: "💬 Generating answer..." }); const context = buildContext(results); const stream = await llm.chat.completions.create({ model: CONFIG.azure.deployment, temperature: 0, max_tokens: 1500, stream: true, messages: [ { role: "system", content: SYSTEM_PROMPT }, { role: "user", content: `Context:\n${context}\n\nQuestion:\n${question}` }, ], }); let wordBuffer = ""; for await (const chunk of stream) { const rawToken = chunk.choices[0]?.delta?.content ?? ""; if (!rawToken) continue; wordBuffer += rawToken; const parts = wordBuffer.split(/(\s+)/); wordBuffer = parts.pop() ?? ""; for (const part of parts) { if (part) { send("token", { token: part, isWord: /\S/.test(part) }); } } } if (wordBuffer) { console.log("Emitting buffered token:", wordBuffer); if (wordBuffer.trim() == "documents.") { const result = await postgre.query( `UPDATE useraskquestion SET status = $1 WHERE session_id = $2 RETURNING *` , ['0', session_id] ); } send("token", { token: wordBuffer, isWord: true }); } send("done", { sources }); } catch (err) { console.error("❌ /ask/stream error:", err); send("error", { error: err.message }); } res.end(); } module.exports = { ask, askstream, health };