require("dotenv").config(); const fs = require("fs"); const path = require("path"); // ====================== // SUPPRESS PDF WARNINGS // ====================== const originalWarn = console.warn; console.warn = ( message, ...args ) => { if ( typeof message === "string" && ( message.includes( "UnknownErrorException" ) || message.includes( "TT:" ) ) ) { return; } originalWarn( message, ...args ); }; // ====================== // PDF.js // ====================== const pdfjsLib = require( "pdfjs-dist/legacy/build/pdf.mjs" ); // ====================== // Transformers // ====================== const { pipeline, } = require("@xenova/transformers"); // ====================== // Qdrant // ====================== const { QdrantClient, } = require("@qdrant/js-client-rest"); // ====================== // QDRANT CONFIG // ====================== const qdrant = new QdrantClient({ url: "http://20.40.61.65:6333", checkCompatibility: false, timeout: 30000, }); const COLLECTION_NAME = "pdf_rag"; let embedder; // ====================== // LOAD MODEL // ====================== async function loadModel() { console.log( "⏳ Loading embedding model..." ); embedder = await pipeline( "feature-extraction", "Xenova/all-MiniLM-L6-v2" ); console.log( "✅ Embedding model loaded" ); } // ====================== // SMART CHUNKING // ====================== function chunkText( text, chunkSize = 800, overlap = 150 ) { const chunks = []; text = text .replace(/\s+/g, " ") .trim(); let start = 0; while ( start < text.length ) { let end = start + chunkSize; // Try sentence ending if ( end < text.length ) { const lastPeriod = text.lastIndexOf( ".", end ); if ( lastPeriod > start ) { end = lastPeriod + 1; } } const chunk = text .slice(start, end) .trim(); if ( chunk.length > 50 ) { chunks.push( chunk ); } start = end - overlap; } return chunks; } // ====================== // CREATE EMBEDDING // ====================== async function createEmbedding( text ) { const output = await embedder(text, { pooling: "mean", normalize: true, }); return Array.from( output.data ); } // ====================== // CREATE COLLECTION // ====================== async function createCollection() { try { await qdrant.getCollection( COLLECTION_NAME ); console.log( "ℹ️ Collection already exists" ); } catch (err) { console.log( "⏳ Creating collection..." ); await qdrant.createCollection( COLLECTION_NAME, { vectors: { size: 384, distance: "Cosine", }, } ); console.log( "✅ Collection created" ); } } // ====================== // EXTRACT TEXT FROM PDF // ====================== async function extractTextFromPDF( filePath ) { try { const dataBuffer = fs.readFileSync( filePath ); const uint8Array = new Uint8Array( dataBuffer ); const loadingTask = pdfjsLib.getDocument( { data: uint8Array, } ); const pdf = await loadingTask.promise; let fullText = ""; console.log( `📄 Pages: ${pdf.numPages}` ); for ( let i = 1; i <= pdf.numPages; i++ ) { const page = await pdf.getPage(i); const content = await page.getTextContent(); const pageText = content.items .map( (item) => item.str ) .join(" "); fullText += pageText + "\n"; } return fullText; } catch (error) { console.log( "❌ PDF extraction error:", error ); return ""; } } // ====================== // PROCESS PDF // ====================== async function processPDF(filePath, fileName) { try { const dataBuffer = fs.readFileSync(filePath); const pdf = await pdfjsLib .getDocument({ data: new Uint8Array(dataBuffer), }) .promise; console.log( `📄 ${fileName} - ${pdf.numPages} pages` ); const batchSize = 50; let batchPoints = []; let globalChunkIndex = 0; for ( let pageNum = 1; pageNum <= pdf.numPages; pageNum++ ) { console.log( `📖 Processing page ${pageNum}/${pdf.numPages}` ); const page = await pdf.getPage(pageNum); const content = await page.getTextContent(); const pageText = content.items .map((item) => item.str) .join(" "); if ( !pageText || pageText.trim().length === 0 ) { continue; } const chunks = chunkText( pageText, 1200, 250 ); for (const chunk of chunks) { const embedding = await createEmbedding(chunk); batchPoints.push({ id: Number( `${Date.now()}${globalChunkIndex}` ), vector: embedding, payload: { file: fileName, page: pageNum, chunk: globalChunkIndex, text: chunk, created_at: new Date().toISOString(), }, }); globalChunkIndex++; if ( batchPoints.length >= batchSize ) { console.log( `⬆️ Uploading ${batchPoints.length} vectors` ); await qdrant.upsert( COLLECTION_NAME, { wait: true, points: batchPoints, } ); batchPoints = []; } } } if (batchPoints.length > 0) { console.log( `⬆️ Uploading final ${batchPoints.length} vectors` ); await qdrant.upsert( COLLECTION_NAME, { wait: true, points: batchPoints, } ); } console.log( `✅ ${fileName} indexed successfully` ); } catch (error) { console.log( `❌ Error processing ${fileName}:`, error ); } } // ====================== // MAIN // ====================== async function main() { try { await loadModel(); await createCollection(); const folder = path.join( __dirname, "uploads" ); if ( !fs.existsSync( folder ) ) { console.log( "❌ uploads folder not found" ); return; } const files = fs .readdirSync( folder ) .filter((file) => file.endsWith( ".pdf" ) ); if ( files.length === 0 ) { console.log( "⚠️ No PDFs found" ); return; } console.log( `📚 Found ${files.length} PDFs` ); for (const file of files) { const filePath = path.join( folder, file ); console.log( `\n📄 Processing ${file}` ); await processPDF( filePath, file ); } console.log( "\n🎉 All PDFs indexed successfully" ); } catch (err) { console.error( "❌ MAIN ERROR:", err ); } } main(); // require("dotenv").config(); // const fs = require("fs"); // const path = require("path"); // const crypto = require("crypto"); // // ─── Suppress PDF warnings ──────────────────────────────────────────────────── // const _warn = console.warn; // console.warn = (msg, ...a) => { // if (typeof msg === "string" && (msg.includes("UnknownErrorException") || msg.includes("TT:"))) return; // _warn(msg, ...a); // }; // const pdfjsLib = require("pdfjs-dist/legacy/build/pdf.mjs"); // const { pipeline } = require("@xenova/transformers"); // const { QdrantClient } = require("@qdrant/js-client-rest"); // // ─── Config ─────────────────────────────────────────────────────────────────── // const QDRANT_URL = process.env.QDRANT_URL || "http://20.40.61.65:6333"; // const COLLECTION_NAME = "pdf_rag"; // const VECTOR_SIZE = 384; // const CHUNK_SIZE = 1200; // const CHUNK_OVERLAP = 250; // const BATCH_SIZE = 100; // points per upsert call // const EMBED_CONCURRENCY = 8; // parallel embeddings at once // const MAX_RETRIES = 3; // const qdrant = new QdrantClient({ url: QDRANT_URL, checkCompatibility: false, timeout: 60000 }); // let embedder; // // ─── Semaphore ───────────────────────────────────────────────────────────────── // class Semaphore { // constructor(n) { this.n = n; this.queue = []; } // acquire() { // return new Promise(res => { // if (this.n > 0) { this.n--; res(); } // else this.queue.push(res); // }); // } // release() { // if (this.queue.length) this.queue.shift()(); // else this.n++; // } // } // // ─── Retry helper ───────────────────────────────────────────────────────────── // async function withRetry(fn, retries = MAX_RETRIES, delay = 500) { // for (let i = 0; i <= retries; i++) { // try { return await fn(); } // catch (err) { // if (i === retries) throw err; // console.warn(` ⚠️ Retry ${i + 1}/${retries} after error: ${err.message}`); // await new Promise(r => setTimeout(r, delay * 2 ** i)); // } // } // } // // ─── Deterministic UUID from content hash ──────────────────────────────────── // // Prevents duplicates if you re-run indexing on the same file // function makePointId(fileName, page, chunkIndex) { // const hash = crypto // .createHash("sha256") // .update(`${fileName}::${page}::${chunkIndex}`) // .digest("hex"); // // Qdrant supports UUID strings or unsigned ints; use hex slice as UUID-like string // return `${hash.slice(0,8)}-${hash.slice(8,12)}-${hash.slice(12,16)}-${hash.slice(16,20)}-${hash.slice(20,32)}`; // } // // ─── Chunking ───────────────────────────────────────────────────────────────── // function chunkText(text, size = CHUNK_SIZE, overlap = CHUNK_OVERLAP) { // const chunks = []; // text = text.replace(/\s+/g, " ").trim(); // let start = 0; // while (start < text.length) { // let end = start + size; // if (end < text.length) { // const last = text.lastIndexOf(".", end); // if (last > start) end = last + 1; // } // const chunk = text.slice(start, end).trim(); // if (chunk.length > 50) chunks.push(chunk); // start = end - overlap; // } // return chunks; // } // // ─── Embedding ──────────────────────────────────────────────────────────────── // async function embed(text) { // const out = await embedder(text, { pooling: "mean", normalize: true }); // return Array.from(out.data); // } // // Embed multiple texts with bounded parallelism // async function embedBatch(texts) { // const sem = new Semaphore(EMBED_CONCURRENCY); // return Promise.all( // texts.map(async (text) => { // await sem.acquire(); // try { return await embed(text); } // finally { sem.release(); } // }) // ); // } // // ─── Qdrant helpers ─────────────────────────────────────────────────────────── // async function ensureCollection() { // try { // await qdrant.getCollection(COLLECTION_NAME); // console.log("ℹ️ Collection already exists"); // } catch { // console.log("⏳ Creating collection..."); // await qdrant.createCollection(COLLECTION_NAME, { // vectors: { size: VECTOR_SIZE, distance: "Cosine" }, // // Optimizers: tune for bulk ingest speed, re-enable indexing after // optimizers_config: { indexing_threshold: 0 }, // }); // console.log("✅ Collection created"); // } // } // // Upload a batch with retry // async function upsertBatch(points) { // await withRetry(() => // qdrant.upsert(COLLECTION_NAME, { wait: true, points }) // ); // } // // After bulk ingest, re-enable HNSW indexing // async function enableIndexing() { // await qdrant.updateCollection(COLLECTION_NAME, { // optimizers_config: { indexing_threshold: 20000 }, // }); // console.log("🔧 HNSW indexing re-enabled"); // } // // ─── Check if file already indexed ─────────────────────────────────────────── // async function isFileIndexed(fileName) { // try { // const result = await qdrant.scroll(COLLECTION_NAME, { // filter: { must: [{ key: "file", match: { value: fileName } }] }, // limit: 1, // with_payload: false, // with_vector: false, // }); // return result.points.length > 0; // } catch { return false; } // } // // ─── Process a single PDF ───────────────────────────────────────────────────── // async function processPDF(filePath, fileName) { // console.log(`\n📄 ${fileName}`); // if (await isFileIndexed(fileName)) { // console.log(` ⏭️ Already indexed — skipping`); // return; // } // const pdf = await pdfjsLib // .getDocument({ data: new Uint8Array(fs.readFileSync(filePath)) }) // .promise; // console.log(` 📖 ${pdf.numPages} pages`); // const allChunks = []; // { text, page, chunkIndex } // // 1️⃣ Extract all text first (fast, sequential is fine for I/O) // for (let p = 1; p <= pdf.numPages; p++) { // const page = await pdf.getPage(p); // const content = await page.getTextContent(); // const text = content.items.map(i => i.str).join(" "); // if (!text.trim()) continue; // const chunks = chunkText(text); // chunks.forEach((chunk, ci) => allChunks.push({ text: chunk, page: p, chunkIndex: allChunks.length })); // } // console.log(` 🧩 ${allChunks.length} chunks — embedding with concurrency=${EMBED_CONCURRENCY}`); // // 2️⃣ Embed all chunks in parallel (bounded by semaphore) // const start = Date.now(); // const vectors = await embedBatch(allChunks.map(c => c.text)); // const elapsed = ((Date.now() - start) / 1000).toFixed(1); // console.log(` ⚡ Embedding done in ${elapsed}s`); // // 3️⃣ Build points // const points = allChunks.map((c, i) => ({ // id: makePointId(fileName, c.page, c.chunkIndex), // vector: vectors[i], // payload: { // file: fileName, // page: c.page, // chunk: c.chunkIndex, // text: c.text, // created_at: new Date().toISOString(), // }, // })); // // 4️⃣ Batch upsert with progress // let uploaded = 0; // for (let i = 0; i < points.length; i += BATCH_SIZE) { // const batch = points.slice(i, i + BATCH_SIZE); // await upsertBatch(batch); // uploaded += batch.length; // process.stdout.write(`\r ⬆️ ${uploaded}/${points.length} vectors uploaded`); // } // console.log(`\n ✅ ${fileName} indexed`); // } // // ─── Main ───────────────────────────────────────────────────────────────────── // async function main() { // console.log("⏳ Loading embedding model..."); // embedder = await pipeline("feature-extraction", "Xenova/all-MiniLM-L6-v2"); // console.log("✅ Model loaded\n"); // await ensureCollection(); // const folder = path.join(__dirname, "uploads"); // if (!fs.existsSync(folder)) return console.log("❌ uploads/ folder not found"); // const pdfs = fs.readdirSync(folder).filter(f => f.endsWith(".pdf")); // if (!pdfs.length) return console.log("⚠️ No PDFs found"); // console.log(`📚 Found ${pdfs.length} PDF(s)\n`); // const t0 = Date.now(); // for (const file of pdfs) { // await processPDF(path.join(folder, file), file); // } // await enableIndexing(); // re-enable HNSW after bulk load // console.log(`\n🎉 Done in ${((Date.now() - t0) / 1000).toFixed(1)}s`); // } // main().catch(err => { console.error("❌ Fatal:", err); process.exit(1); });