diff --git a/controller/handleOrchestration.js b/controller/handleOrchestration.js index 8fc7262..4644e40 100644 --- a/controller/handleOrchestration.js +++ b/controller/handleOrchestration.js @@ -16,13 +16,141 @@ const client = new OpenAI({ const WREN_URL = "http://172.236.172.26:3000/api/graphql"; -const gql = async (operationName, query, variables) => { - const res = await axios.post(WREN_URL, { operationName, query, variables }, { - headers: { "Content-Type": "application/json", Accept: "application/json" }, - timeout: 60000, - }); - if (res.data?.errors) throw new Error(res.data.errors[0].message); - return res.data.data; +// const gql = async (operationName, query, variables) => { +// const res = await axios.post(WREN_URL, { operationName, query, variables }, { +// headers: { "Content-Type": "application/json", Accept: "application/json" }, +// timeout: 60000, +// }); +// if (res.data?.errors) throw new Error(res.data.errors[0].message); +// return res.data.data; +// }; + + + +const gql = async ( + operationName, + query, + variables = {}, + options = {} +) => { + const { + retries = 3, + timeout = 60000 + } = options; + + const payload = { + operationName, + query, + variables + }; + + let lastError; + + for (let attempt = 1; attempt <= retries; attempt++) { + const startTime = Date.now(); + + try { + console.log( + `[GraphQL] ${operationName} | Attempt ${attempt}/${retries}` + ); + + const response = await axios.post( + WREN_URL, + payload, + { + timeout, + headers: { + "Content-Type": "application/json", + Accept: "application/json" + }, + validateStatus: () => true + } + ); + + const duration = Date.now() - startTime; + + console.log( + `[GraphQL] ${operationName} | ${response.status} | ${duration}ms` + ); + + // HTTP Error + if (response.status >= 400) { + throw new Error( + `HTTP ${response.status}: ${response.data?.message || + response.statusText + }` + ); + } + + // Empty Response + if (!response.data) { + throw new Error( + `Empty response received for ${operationName}` + ); + } + + // GraphQL Errors + if ( + Array.isArray(response.data.errors) && + response.data.errors.length > 0 + ) { + const graphQLError = response.data.errors + .map(err => err.message) + .join(" | "); + + throw new Error(graphQLError); + } + + // Missing Data + if (!response.data.data) { + throw new Error( + `No data returned from GraphQL operation: ${operationName}` + ); + } + + return response.data.data; + + } catch (error) { + lastError = error; + + const duration = Date.now() - startTime; + + console.error( + `[GraphQL Error] ${operationName} | Attempt ${attempt}/${retries} | ${duration}ms` + ); + + console.error(error.message); + + const shouldRetry = + attempt < retries && + ( + error.code === "ECONNABORTED" || + error.code === "ECONNRESET" || + error.code === "ETIMEDOUT" || + error.code === "ENOTFOUND" || + error.code === "ECONNREFUSED" || + error.message.includes("timeout") + ); + + if (!shouldRetry) { + break; + } + + const delay = attempt * 2000; + + console.log( + `[GraphQL Retry] Waiting ${delay}ms before retry...` + ); + + await new Promise(resolve => + setTimeout(resolve, delay) + ); + } + } + + throw new Error( + `[${operationName}] Failed after ${retries} attempts. ${lastError?.message}` + ); }; const pollUntilFinished = async (taskId, maxAttempts = 50) => { @@ -157,7 +285,7 @@ Generate Vega-Lite JSON. `; const completion = await client.chat.completions.create({ - model:deploymentName, + model: deploymentName, temperature: 0, messages: [ { @@ -261,338 +389,3 @@ const suggestions = async (req, res) => { } module.exports = { ask, suggestions }; - - - - - - - - - - - - - - - -// const axios = require('axios'); -// require('dotenv').config(); -// const { OpenAI } = require('openai'); - -// const azureEndpoint = "https://cpmindiayoda-resource.services.ai.azure.com"; -// const deploymentName = "gpt-4o-mini"; -// const apiVersion = "2024-08-01-preview"; - -// const client = new OpenAI({ -// baseURL: `${azureEndpoint}/openai/deployments/${deploymentName}`, -// apiKey: process.env.AZURE_OPENAI_KEY, -// defaultHeaders: { 'api-key': process.env.AZURE_OPENAI_KEY }, -// defaultQuery: { 'api-version': apiVersion } -// }); - -// const WREN_URL = "http://172.236.172.26:3000/api/graphql"; - -// const gql = async (operationName, query, variables) => { -// const res = await axios.post(WREN_URL, { operationName, query, variables }, { -// headers: { "Content-Type": "application/json", Accept: "application/json" }, -// timeout: 60000, -// }); -// if (res.data?.errors) throw new Error(res.data.errors[0].message); -// return res.data.data; -// }; - -// const pollUntilFinished = async (taskId, maxAttempts = 50) => { -// for (let i = 0; i < maxAttempts; i++) { -// const { askingTask } = await gql("AskingTask", -// `query AskingTask($taskId: String!) { -// askingTask(taskId: $taskId) { -// status -// candidates { sql } -// error { message } -// queryId -// } -// }`, -// { taskId } -// ); - -// console.log(`Poll ${i + 1} => ${askingTask?.status}`); -// if (askingTask?.error) throw new Error(askingTask.error.message); - -// if (askingTask?.status === "FINISHED") { -// if (askingTask?.candidates?.length > 0) { -// return { sql: askingTask.candidates[0].sql, type: "sql" }; -// } - -// console.log("No candidates — fetching recommended questions..."); -// try { -// const { createInstantRecommendedQuestions } = await gql( -// "CreateInstantRecommendedQuestions", -// `mutation CreateInstantRecommendedQuestions($data: InstantRecommendedQuestionsInput!) { -// createInstantRecommendedQuestions(data: $data) { id } -// }`, -// { data: { askingTaskId: taskId } } -// ); - -// const recTaskId = createInstantRecommendedQuestions.id; -// console.log("Rec task ID =>", recTaskId); - -// for (let j = 0; j < 20; j++) { -// const { instantRecommendedQuestions } = await gql( -// "InstantRecommendedQuestions", -// `query InstantRecommendedQuestions($taskId: String!) { -// instantRecommendedQuestions(taskId: $taskId) { -// status -// questions { question category sql } -// error { message } -// } -// }`, -// { taskId: recTaskId } -// ); - -// console.log(`Rec poll ${j + 1} => ${instantRecommendedQuestions?.status}`); - -// if (instantRecommendedQuestions?.status === "FINISHED") { -// return { type: "recommended", questions: instantRecommendedQuestions.questions || [] }; -// } -// if (instantRecommendedQuestions?.error) { -// throw new Error(instantRecommendedQuestions.error.message); -// } -// await new Promise(r => setTimeout(r, 2000)); -// } -// return { type: "recommended", questions: [] }; - -// } catch (recErr) { -// console.error("Recommended questions error =>", recErr.message); -// return { type: "clarification", message: "Please try rephrasing your question." }; -// } -// } - -// await new Promise(r => setTimeout(r, 2000)); -// } -// throw new Error("Wren polling timeout"); -// }; - -// const THREAD_RESPONSE_QUERY = ` -// query ThreadResponse($responseId: Int!) { -// threadResponse(responseId: $responseId) { -// id -// question -// sql -// answerDetail { -// status -// content -// numRowsUsedInLLM -// error { message } -// } -// } -// }`; - -// const pollThreadResponse = async (responseId, maxAttempts = 30) => { -// for (let i = 0; i < maxAttempts; i++) { -// const { threadResponse } = await gql( -// "ThreadResponse", -// THREAD_RESPONSE_QUERY, -// { responseId: parseInt(responseId) } -// ); - -// const status = threadResponse?.answerDetail?.status; -// console.log(`Answer poll ${i + 1} => ${status}`); - -// if (status === "FINISHED" && threadResponse?.answerDetail?.content) { -// return { answer: threadResponse.answerDetail.content }; -// } -// if (status === "FAILED" || threadResponse?.answerDetail?.error) { -// return { answer: null }; -// } -// await new Promise(r => setTimeout(r, 2000)); -// } -// return { answer: null }; -// }; - -// // ✅ FIXED generateVegaJson — JSON parse error fix -// const generateVegaJson = async (queryResult) => { -// try { -// const completion = await client.chat.completions.create({ -// model: "", -// temperature: 0, -// messages: [ -// { -// role: "system", -// content: `You are a data visualization expert. -// Return ONLY a raw JSON object. -// No markdown. No backticks. No explanation. No text before or after. -// Generate a valid Vega-Lite v5 spec with data embedded under "values" key inside "data". -// Choose best chart type (bar, line, arc) based on the question.` -// }, -// { -// role: "user", -// content: `DATA:\n${JSON.stringify(queryResult, null, 2)}\n\nReturn only the Vega-Lite JSON object.` -// } -// ] -// }); - -// let raw = completion.choices[0].message.content.trim(); -// console.log("GPT RAW (first 300 chars) =>", raw.substring(0, 300)); - -// // Remove all markdown fences -// raw = raw -// .replace(/^```json\s*/i, "") -// .replace(/^```vega-lite\s*/i, "") -// .replace(/^```\s*/i, "") -// .replace(/```[\s\S]*$/i, "") -// .trim(); - -// // Extract only { ... } block -// const start = raw.indexOf("{"); -// const end = raw.lastIndexOf("}"); - -// if (start === -1 || end === -1) { -// throw new Error("No JSON object found in GPT response"); -// } - -// const cleanJson = raw.substring(start, end + 1); -// return JSON.parse(cleanJson); - -// } catch (err) { -// console.error("Vega Generation Error =>", err.message); -// // Fallback — basic chart taaki crash na ho -// return { -// "$schema": "https://vega.github.io/schema/vega-lite/v5.json", -// "data": { "values": queryResult.data || [] }, -// "mark": "bar", -// "encoding": { -// "x": { "field": queryResult.columns?.[0] || "x", "type": "nominal", "title": queryResult.columns?.[0] || "X" }, -// "y": { "field": queryResult.columns?.[1] || "y", "type": "quantitative", "title": queryResult.columns?.[1] || "Y" } -// }, -// "width": "container", -// "height": 400 -// }; -// } -// }; - -// const ask = async (req, res) => { -// try { -// const { prompt } = req.body; - -// if (!prompt?.trim()) { -// return res.status(400).json({ success: false, error: "Prompt required" }); -// } - -// // SSE headers -// res.setHeader("Content-Type", "text/event-stream"); -// res.setHeader("Cache-Control", "no-cache"); -// res.setHeader("Connection", "keep-alive"); -// res.flushHeaders(); - -// const send = (event, data) => { -// res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); -// }; - -// // Step 1: Task create karo -// send("status", { step: 1, message: "Generating SQL..." }); -// const { createAskingTask } = await gql("CreateAskingTask", -// `mutation CreateAskingTask($data: AskingTaskInput!) { -// createAskingTask(data: $data) { id } -// }`, -// { data: { question: prompt } } -// ); - -// // Step 2: SQL ready hone tak poll karo -// const pollResult = await pollUntilFinished(createAskingTask.id); - -// // Recommended questions aaye -// if (pollResult.type === "recommended") { -// send("recommended", { questions: pollResult.questions }); -// send("done", { success: true }); -// return res.end(); -// } - -// // Clarification chahiye -// if (pollResult.type === "clarification") { -// send("clarification", { message: pollResult.message }); -// send("done", { success: false }); -// return res.end(); -// } - -// const wrenSql = pollResult.sql; -// send("status", { step: 2, message: "SQL ready, preparing answer..." }); - -// // Step 3: Thread banao -// const { createThread } = await gql("CreateThread", -// `mutation CreateThread($data: CreateThreadInput!) { -// createThread(data: $data) { id } -// }`, -// { data: { question: prompt, sql: wrenSql } } -// ); - -// // Step 4: Thread response banao -// const { createThreadResponse } = await gql("CreateThreadResponse", -// `mutation CreateThreadResponse($threadId: Int!, $data: CreateThreadResponseInput!) { -// createThreadResponse(threadId: $threadId, data: $data) { id } -// }`, -// { threadId: createThread.id, data: { question: prompt, sql: wrenSql } } -// ); - -// const responseId = createThreadResponse.id; -// send("status", { step: 3, message: "Fetching results..." }); - -// // Step 5: Answer + Data parallel fetch karo -// const answerPromise = pollThreadResponse(responseId); - -// const { previewData } = await gql("PreviewData", -// `mutation PreviewData($where: PreviewDataInput!) { -// previewData(where: $where) -// }`, -// { where: { responseId: parseInt(responseId) } } -// ); - -// const columns = previewData.columns.map(c => c.name); -// const rows = previewData.data.map(row => -// Object.fromEntries(columns.map((col, i) => [col, row[i]])) -// ); - -// // Answer pehle bhejo — jaise Wren AI karta hai -// const { answer } = await answerPromise; -// send("answer", { answer: answer || "Data is ready.", sql: wrenSql }); - -// // Table data bhejo -// send("data", { columns, rows, totalRows: rows.length, sql: wrenSql }); - -// // Chart generate karo -// send("status", { step: 4, message: "Generating chart..." }); -// const vegaSpec = await generateVegaJson({ columns, data: rows, chart: prompt, sql: wrenSql }); -// send("chart", { vegaSpec }); - -// send("done", { success: true }); -// res.end(); - -// } catch (err) { -// console.error("Ask Error =>", err.message); -// res.write(`event: error\ndata: ${JSON.stringify({ error: err.message })}\n\n`); -// res.end(); -// } -// }; - -// const getSuggestedQuestions = async () => { -// try { -// const { threads } = await gql("Threads", -// `query Threads { threads { id summary } }`, -// {} -// ); -// return (threads || []) -// .filter(t => t.summary) -// .slice(0, 5) -// .map(t => ({ question: t.summary, category: "Recent" })); -// } catch (err) { -// console.error("Suggestions failed =>", err.message); -// return []; -// } -// }; - -// const suggestions = async (req, res) => { -// const questions = await getSuggestedQuestions(); -// res.json({ success: true, questions }); -// }; - -// module.exports = { ask, suggestions }; \ No newline at end of file