Files
GenBI-Node-Setup/controller/handleOrchestration.js
T
Gitea 1184fbab2b
Deploy Node App / deploy (push) Successful in 11s
vega changes
2026-06-01 11:15:38 +05:30

521 lines
15 KiB
JavaScript

const axios = require('axios');
require('dotenv').config();
const { OpenAI } = require('openai');
const yaml = require('js-yaml');
const azureEndpoint = "https://cpmindiayoda-resource.services.ai.azure.com";
const deploymentName = "gpt-4o-mini";
const apiVersion = "2024-08-01-preview";
const client = new OpenAI({
baseURL: `${azureEndpoint}/openai/deployments/${deploymentName}`,
apiKey: process.env.AZURE_OPENAI_KEY,
defaultHeaders: { 'api-key': process.env.AZURE_OPENAI_KEY },
defaultQuery: { 'api-version': apiVersion }
});
const WREN_URL = "http://172.236.172.26:3000/api/graphql";
const gql = async (operationName, query, variables) => {
const res = await axios.post(WREN_URL, { operationName, query, variables }, {
headers: { "Content-Type": "application/json", Accept: "application/json" },
timeout: 60000,
});
if (res.data?.errors) throw new Error(res.data.errors[0].message);
return res.data.data;
};
const pollUntilFinished = async (taskId, maxAttempts = 50) => {
for (let i = 0; i < maxAttempts; i++) {
const { askingTask } = await gql("AskingTask",
`query AskingTask($taskId: String!) {
askingTask(taskId: $taskId) {
status
candidates { sql }
error { message }
}
}`,
{ taskId }
);
console.log(`Poll ${i + 1} => ${askingTask?.status}`);
if (askingTask?.error) throw new Error(askingTask.error.message);
if (askingTask?.status === "FINISHED") {
if (askingTask?.candidates?.length > 0) {
return { sql: askingTask.candidates[0].sql, type: "sql" };
}
return {
sql: null,
type: "clarification",
message: "I couldn't generate SQL for this question. Please try rephrasing.",
};
}
await new Promise(r => setTimeout(r, 2000));
}
throw new Error("Wren polling timeout");
};
const fetchWrenData = async (prompt) => {
try {
// Step 1: Create task
const { createAskingTask } = await gql("CreateAskingTask",
`mutation CreateAskingTask($data: AskingTaskInput!) {
createAskingTask(data: $data) { id }
}`,
{ data: { question: prompt } }
);
console.log("Task =>", createAskingTask.id);
// Step 2: Poll for SQL
const pollResult = await pollUntilFinished(createAskingTask.id);
// Clarification needed
if (pollResult.type === "clarification") {
return {
success: false,
type: "clarification",
message: pollResult.message,
data: [],
chart: null,
};
}
const wrenSql = pollResult.sql;
console.log("SQL ready");
// Step 3: Create thread
const { createThread } = await gql("CreateThread",
`mutation CreateThread($data: CreateThreadInput!) {
createThread(data: $data) { id }
}`,
{ data: { question: prompt, sql: wrenSql } }
);
console.log("Thread =>", createThread.id);
// Step 4: Create thread response
const { createThreadResponse } = await gql("CreateThreadResponse",
`mutation CreateThreadResponse($threadId: Int!, $data: CreateThreadResponseInput!) {
createThreadResponse(threadId: $threadId, data: $data) { id }
}`,
{ threadId: createThread.id, data: { question: prompt, sql: wrenSql } }
);
console.log("Response ID =>", createThreadResponse.id);
// Step 5: Preview data
const { previewData } = await gql("PreviewData",
`mutation PreviewData($where: PreviewDataInput!) {
previewData(where: $where)
}`,
{ where: { responseId: parseInt(createThreadResponse.id) } }
);
const columns = previewData.columns.map(c => c.name);
const rows = previewData.data.map(row =>
Object.fromEntries(columns.map((col, i) => [col, row[i]]))
);
console.log(`Done — ${rows.length} rows`);
console.table(rows);
return {
success: true,
type: "data",
prompt,
sql: wrenSql,
totalRows: rows.length,
columns,
data: rows,
};
} catch (err) {
console.error("WREN ERROR =>", err.message);
return {
success: false,
type: "error",
data: [],
chart: null,
error: err.message,
};
}
};
const generateVegaSchema = async (question, dataArray) => {
try {
const systemPrompt = `You are a data visualization expert. I will provide a user's question and a JSON array of data. Your task is to generate a strictly valid Vega-Lite JSON specification to visualize this data. The data array will be provided to the Vega spec internally. Map the JSON keys to the correct x, y, and color axes. Choose the best chart type (bar, line, arc) based on the question.`;
const userPrompt = `User Question: "${question}"\nData JSON: ${JSON.stringify(dataArray)}`;
const completion = await client.chat.completions.create({
model: '',
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userPrompt }
],
response_format: { type: "json_object" }
});
const rawText = completion.choices[0].message.content.trim();
return JSON.parse(rawText);
} catch (error) {
console.error('Azure OpenAI Engine Error:', error.message);
throw new Error('Failed to transform data architecture into valid Vega-Lite spec');
}
};
const generateSQL = async (prompt, mdl) => {
try {
const mdlObject = yaml.load(mdl);
const systemPrompt = `
You are an expert SQL query generator.
STRICT RULES:
1. Generate ONLY SQL query.
2. No explanation.
3. No markdown.
4. Use ONLY columns from MDL.
5. Use table name correctly.
6. Respect expression fields.
7. Use proper aggregation and GROUP BY.
`;
const userPrompt = `USER QUESTION:${prompt}MDL:${JSON.stringify(mdlObject, null, 2)}`;
const completion =
await client.chat.completions.create({
model: '',
temperature: 0,
messages: [
{
role: 'system',
content: systemPrompt
},
{
role: 'user',
content: userPrompt
}
]
});
const sql =
completion
.choices[0]
.message
.content
.trim();
return { prompt, sql };
} catch (error) {
console.error(
'Generate SQL Error:',
error.message
);
throw error;
}
}
const handleOrchestration = async (req, res) => {
try {
const { prompt, mdl } = req.body;
const tenant_id = req.user ? req.user.client_id : null;
if (!prompt) {
return res.status(400).json({ error: 'Prompt field is required in request body' });
}
if (!tenant_id) {
return res.status(400).json({ error: 'Tenant context (client_id) missing from auth token' });
}
const promptwithsql = await generateSQL(prompt, mdl)
// console.log('Prompt with SQL:', promptwithsql.prompt, promptwithsql.sql, tenant_id);
// return res.status(200).json(promptwithsql);
const wrenData = await fetchWrenData(promptwithsql.prompt, promptwithsql.sql, tenant_id);
console.log('Wren Data =>', wrenData);
return res.status(200).json(wrenData);
const vegaSchema = await generateVegaSchema(wrenData.question, wrenData.data);
if (!vegaSchema || !vegaSchema.$schema) {
return res.status(500).json({ error: 'Egress Pipeline Validation Failure: Output missing standard Vega $schema identifier' });
}
return res.status(200).json(vegaSchema);
} catch (error) {
console.error('API Gateway Orchestrator Crash:', error.message);
return res.status(500).json({ error: error.message });
}
};
const generateVegaJson = async (queryResult) => {
try {
const systemPrompt = `You are a data visualization expert. I will provide a user's question and a JSON array of data. Your task is to generate a strictly valid Vega-Lite JSON specification to visualize this data. The data array will be provided to the Vega spec internally. Map the JSON keys to the correct x, y, and color axes. Choose the best chart type (bar,pai,line, arc) based on the question.`;
const userPrompt = `
DATA:
${JSON.stringify(queryResult, null, 2)}
Generate Vega-Lite JSON.
`;
const completion = await client.chat.completions.create({
model: "",
temperature: 0,
messages: [
{
role: "system",
content: systemPrompt
},
{
role: "user",
content: userPrompt
}
]
});
const vegaJson = completion.choices[0].message.content.trim();
const cleanJson = vegaJson
.replace(/```json/g, "")
.replace(/```/g, "")
.trim();
return JSON.parse(cleanJson);
} catch (err) {
console.error("Vega Generation Error =>", err.message);
throw err;
}
};
const ask = async (req, res) => {
try {
const { prompt } = req.body;
if (!prompt?.trim()) {
return res.status(400).json({
success: false,
error: "Prompt required"
});
}
const result = await fetchWrenData(prompt);
const vegaSpec = await generateVegaJson({
columns: result.columns,
data: result.data,
chart: result.prompt,
sql: result.sql
});
// console.log("Ask Result =>",vegaSpec);
return res.json({
...result,
vegaSpec
});
} catch (err) {
console.error(err);
return res.status(500).json({
success: false,
error: err.message
});
}
};
// const ask = async (req, res) => {
// const { prompt } = req.body;
// if (!prompt?.trim()) {
// return res.status(400).json({ success: false, error: "Prompt required" });
// }
// const result = await fetchWrenData(prompt);
// console.log('Ask Result =>', result);
// res.json(result);
// }
const getSuggestedQuestions = async () => {
try {
const { threads } = await gql(
"Threads",
`
query Threads {
threads {
id
summary
}
}
`,
{}
);
return (threads || [])
.filter(t => t.summary)
.slice(0, 5)
.map(t => ({
question: t.summary,
category: "Recent"
}));
} catch (fallbackErr) {
console.error(
"Fallback failed =>",
fallbackErr.message
);
return [];
}
};
const suggestions = async (req, res) => {
const questions = await getSuggestedQuestions();
res.json({ success: true, questions });
}
module.exports = { handleOrchestration,ask,suggestions };
// const axios = require('axios');
// const dotenv = require('dotenv');
// dotenv.config();
// const { GoogleGenerativeAI } = require('@google/generative-ai');
// const ai = new GoogleGenerativeAI({ apiKey: process.env.GOOGLE_API_KEY });
// console.log('Gemini API Key Loaded:', process.env.GOOGLE_API_KEY);
// const fetchWrenData = async (prompt, tenantId) => {
// try {
// return {
// question: "Top sales person this month",
// sql: `
// SELECT sales_person,
// SUM(amount) AS total_sales
// FROM orders
// WHERE MONTH(created_at)=MONTH(CURRENT_DATE)
// GROUP BY sales_person
// ORDER BY total_sales DESC
// LIMIT 10;
// `,
// data: [
// {
// sales_person: "Rahul",
// total_sales: 92000
// },
// {
// sales_person: "Amit",
// total_sales: 87000
// }
// ]
// };
// } catch (error) {
// console.error('Wren AI Integration Error:', error.message);
// throw new Error('Failed to fetch data payload from Wren AI endpoint');
// }
// };
// const generateVegaSchema = async (question, dataArray) => {
// try {
// const model = ai.getGenerativeModel({
// model: 'gemini-2.5-flash',
// generationConfig: { responseMimeType: 'application/json' }
// });
// const systemPrompt = `You are a data visualization expert. I will provide a user's question and a JSON array of data.
// Your task is to generate a strictly valid Vega-Lite JSON specification to visualize this data.
// The data array will be provided to the Vega spec internally. Map the JSON keys to the correct x, y, and color axes.
// Choose the best chart type (bar, line, arc) based on the question.`;
// const userPrompt = `User Question: "${question}"\nData JSON: ${JSON.stringify(dataArray)}`;
// const result = await model.generateContent({
// contents: [{ role: 'user', parts: [{ text: `${systemPrompt}\n\n${userPrompt}` }] }]
// });
// return JSON.parse(result.response.text());
// } catch (error) {
// console.error('Gemini Engine Error:', error.message);
// throw new Error('Failed to transform data architecture into valid Vega-Lite spec');
// }
// };
// const handleOrchestration = async (req, res) => {
// try {
// const { prompt } = req.body;
// const tenant_id = req.user ? req.user.client_id : null;
// // return res.status(200).json("ok");
// if (!prompt) {
// return res.status(400).json({ error: 'Prompt field is required in request body' });
// }
// if (!tenant_id) {
// return res.status(400).json({ error: 'Tenant context (client_id) missing from auth token' });
// }
// const wrenData = await fetchWrenData(prompt, tenant_id);
// const vegaSchema = await generateVegaSchema(prompt, wrenData);
// if (!vegaSchema || !vegaSchema.$schema) {
// return res.status(500).json({ error: 'Egress Pipeline Validation Failure: Output missing standard Vega $schema identifier' });
// }
// return res.status(200).json(vegaSchema);
// } catch (error) {
// console.error('API Gateway Orchestrator Crash:', error.message);
// return res.status(500).json({ error: error.message });
// }
// };
// module.exports = { handleOrchestration };
// const fetchWrenData = async (prompt, tenantId) => {
// try {
// const url = process.env.WREN_AI_URL || 'http://localhost:5555/api/v1/text-to-sql';
// const response = await axios.post(url,
// { prompt: prompt },
// {
// headers: {
// 'X-Wren-Session-Properties': `@user_org_id=${tenantId}`,
// 'Content-Type': 'application/json'
// }
// }
// );
// return response.data;
// } catch (error) {
// console.error('Wren AI Integration Error:', error.message);
// throw new Error('Failed to fetch data payload from Wren AI endpoint');
// }
// };