Files
Pi/extensions/multiagent/index.ts
T

377 lines
11 KiB
TypeScript
Raw Normal View History

2026-05-07 22:04:21 +02:00
import { spawn } from "node:child_process";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import type { Message } from "@earendil-works/pi-ai";
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import { TDM_PROMPT, DEV_PROMPT, ANA_PROMPT } from "./prompts";
import { AgentRunResult, AnaVerdict } from "./types";
const MAX_ITERATIONS = 5;
// UI Symbols:
// ┌ ┐ └ ┘ │ ─ ├ ┤
const TDM_UI = `
┌─────┐ ┌─────┐ ┌─────┐
│ **TDM** │ --> │ DEV │ --> │ ANA │
└─────┘ └─────┘ └─────┘
`
const DEV_UI = `
┌─────┐ ┌─────┐ ┌─────┐
│ TDM │ --> │ **DEV** │ --> │ ANA │
└─────┘ └─────┘ └─────┘
`
const ANA_UI = `
┌─────┐ ┌─────┐ ┌─────┐
│ TDM │ --> │ DEV │ --> │ **ANA** │
└─────┘ └─────┘ └─────┘
`
const ANA_REWORK_UI = `
┌─────┐ ┌─────┐ ┌─────┐
│ TDM │ --> │ DEV │ --> │ **ANA** │
└─────┘ └─────┘ └─────┘
`
function getPiInvocation(args: string[]): { command: string; args: string[] } {
const currentScript = process.argv[1];
const isBunVirtualScript = currentScript?.startsWith("/$bunfs/root/");
if (currentScript && !isBunVirtualScript && fs.existsSync(currentScript)) {
return { command: process.execPath, args: [currentScript, ...args] };
}
const execName = path.basename(process.execPath).toLowerCase();
const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName);
if (!isGenericRuntime) return { command: process.execPath, args };
return { command: "pi", args };
}
async function writePromptToTempFile(
name: string,
prompt: string,
): Promise<{ dir: string; filePath: string }> {
const tmpDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), "pi-multiagent-"),
);
const safeName = name.replace(/[^\w.-]+/g, "_");
const filePath = path.join(tmpDir, `${safeName}.md`);
await fs.promises.writeFile(filePath, prompt, {
encoding: "utf-8",
mode: 0o600,
});
return { dir: tmpDir, filePath };
}
function getFinalText(messages: Message[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role !== "assistant") continue;
const text = msg.content.find((c) => c.type === "text");
if (text && text.type === "text") return text.text;
}
return "";
}
async function runAgent(
cwd: string,
systemPrompt: string,
task: string,
tools: string,
signal?: AbortSignal,
): Promise<AgentRunResult> {
const tmp = await writePromptToTempFile("system", systemPrompt);
const args = [
"--mode",
"json",
"-p",
"--no-session",
"--tools",
tools,
"--append-system-prompt",
tmp.filePath,
task,
];
try {
return await new Promise<AgentRunResult>((resolve) => {
const invocation = getPiInvocation(args);
const proc = spawn(invocation.command, invocation.args, {
cwd,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
const messages: Message[] = [];
let stderr = "";
let buffer = "";
const processLine = (line: string) => {
if (!line.trim()) return;
try {
const event = JSON.parse(line);
if (event.type === "message_end" && event.message)
messages.push(event.message as Message);
if (event.type === "tool_result_end" && event.message)
messages.push(event.message as Message);
} catch {
// ignore malformed lines
}
};
proc.stdout.on("data", (d) => {
buffer += d.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const l of lines) processLine(l);
});
proc.stderr.on("data", (d) => {
stderr += d.toString();
});
proc.on("close", (code) => {
if (buffer.trim()) processLine(buffer);
resolve({ exitCode: code ?? 1, messages, stderr });
});
if (signal) {
const killProc = () => {
proc.kill("SIGTERM");
setTimeout(() => {
if (!proc.killed) proc.kill("SIGKILL");
}, 5000);
};
if (signal.aborted) killProc();
else signal.addEventListener("abort", killProc, { once: true });
}
});
} finally {
try {
fs.unlinkSync(tmp.filePath);
} catch { }
try {
fs.rmdirSync(tmp.dir);
} catch { }
}
}
function parseAnaVerdict(raw: string): AnaVerdict {
try {
const parsed = JSON.parse(raw) as AnaVerdict;
return {
approved: Boolean(parsed.approved),
feedback: String(parsed.feedback ?? "").trim(),
};
} catch {
return {
approved: false,
feedback: `ANA output was not valid JSON:\n${raw}`,
};
}
}
function startLoading(
setStatus: (text: string) => void,
baseLabel: string,
): () => void {
const frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
let i = 0;
setStatus(`${frames[0]} ${baseLabel}`);
const timer = setInterval(() => {
i = (i + 1) % frames.length;
setStatus(`${frames[i]} ${baseLabel}`);
}, 120);
return () => {
clearInterval(timer);
};
}
export default function (pi: ExtensionAPI) {
let enabled = false;
pi.registerCommand("multiagent", {
description: "Toggle multi-agent workflow (/multiagent on|off|status)",
handler: async (args, ctx) => {
const action = (args || "toggle").trim().toLowerCase();
if (action === "on") enabled = true;
else if (action === "off") enabled = false;
else if (action === "toggle") enabled = !enabled;
else if (action === "status") {
ctx.ui.notify(`Multiagent: ${enabled ? "ON" : "OFF"}`, "info");
return;
} else {
ctx.ui.notify("Usage: /multiagent [on|off|toggle|status]", "warning");
return;
}
ctx.ui.notify(
`Multiagent: ${enabled ? "ON" : "OFF"}`,
enabled ? "info" : "info",
);
},
});
pi.registerShortcut("ctrl+alt+m", {
description: "Toggle multi-agent workflow",
handler: async (ctx) => {
enabled = !enabled;
ctx.ui.notify(
`Multiagent: ${enabled ? "ON" : "OFF"}`,
enabled ? "info" : "info",
);
},
});
pi.on("input", async (event, ctx) => {
if (!enabled) return { action: "continue" as const };
if (event.source === "extension") return { action: "continue" as const };
if (event.text.trim().startsWith("/"))
return { action: "continue" as const };
ctx.ui.setStatus("multiagent", "multiagent: queued");
pi.sendMessage({
customType: "multiagent-result",
content: `### Multiagent workflow started\n\n${TDM_UI}\n\n(max ${MAX_ITERATIONS} DEV/ANA loops).`,
display: true,
});
const userTask = event.text.trim();
const stopTdmLoader = startLoading(
(text) => ctx.ui.setStatus("multiagent", text),
"TDM analyzing request + codebase",
);
const tdm = await runAgent(
ctx.cwd,
TDM_PROMPT,
`User request:\n${userTask}`,
"read,grep,find,ls,bash",
ctx.signal,
);
stopTdmLoader();
const specs = getFinalText(tdm.messages);
if (tdm.exitCode !== 0 || !specs) {
pi.sendMessage({
customType: "multiagent-result",
content: `TDM step failed.\n\n${tdm.stderr || "No output."}`,
display: true,
});
ctx.ui.setStatus("multiagent", "");
return { action: "handled" as const };
}
pi.sendMessage({
customType: "multiagent-result",
content: `### TDM output (specs)\n\n${specs}`,
display: true,
});
pi.sendMessage({
customType: "multiagent-result",
content: DEV_UI,
display: true,
});
let devContext = `TDM Specs:\n${specs}`;
let lastDevOutput = "";
let lastAnaFeedback = "";
for (let i = 1; i <= MAX_ITERATIONS; i++) {
const stopDevLoader = startLoading(
(text) => ctx.ui.setStatus("multiagent", text),
`DEV implementing iteration ${i}/${MAX_ITERATIONS}`,
);
const dev = await runAgent(
ctx.cwd,
DEV_PROMPT,
`${devContext}\n\nProceed with the implementation the current codebase. Then summarize changes and rationale.`,
"read,grep,find,ls,bash,edit,write",
ctx.signal,
);
stopDevLoader();
lastDevOutput = getFinalText(dev.messages);
pi.sendMessage({
customType: "multiagent-result",
content: `### DEV iteration ${i} output\n\n${lastDevOutput || "(no DEV output)"}`,
display: true,
});
if (dev.exitCode !== 0) {
pi.sendMessage({
customType: "multiagent-result",
content: `DEV iteration ${i} failed.\n\n${dev.stderr || "No stderr output."}`,
display: true,
});
}
pi.sendMessage({
customType: "multiagent-result",
content: ANA_UI,
display: true,
});
const stopAnaLoader = startLoading(
(text) => ctx.ui.setStatus("multiagent", text),
`ANA reviewing iteration ${i}/${MAX_ITERATIONS}`,
);
const ana = await runAgent(
ctx.cwd,
ANA_PROMPT,
`TDM Specs:\n${specs}\n\nDEV report:\n${lastDevOutput}\n\nReview the implementation in the codebase and provide verdict JSON.`,
"read,grep,find,ls,bash",
ctx.signal,
);
stopAnaLoader();
const verdict = parseAnaVerdict(getFinalText(ana.messages));
lastAnaFeedback = verdict.feedback;
pi.sendMessage({
customType: "multiagent-result",
content: `### ANA iteration ${i} verdict\n\n- approved: **${verdict.approved ? "true" : "false"}**\n\n${verdict.feedback || "(no feedback)"}`,
display: true,
});
if (verdict.approved) break;
pi.sendMessage({
customType: "multiagent-result",
content: ANA_REWORK_UI,
display: true,
});
devContext = `TDM Specs:\n${specs}\n\nANA feedback to address:\n${verdict.feedback}`;
}
pi.sendMessage({
customType: "multiagent-result",
content: `### Multiagent workflow completed\n\n${lastDevOutput || "(No DEV summary)"}\n\n---\n**Final ANA feedback:**\n${lastAnaFeedback || "Approved with no additional feedback."}`,
display: true,
});
ctx.ui.setStatus("multiagent", "");
return { action: "handled" as const };
});
}