app/lib/server/processors/ApiJobExecutor.ts (747 lines of code) (raw):
import { JobExecutor } from "./JobExecutor";
import serverConfig from "../config";
import { dockerConfig } from "../../../routes/api.config.docker";
import { GitHubTokenService } from "../githubTokenService";
import { getEffectiveUsername } from "../auth";
import { promises as fs } from "fs";
import { exec } from "child_process";
import { promisify } from "util";
import * as path from "path";
import * as os from "os";
const execAsync = promisify(exec);
const { HUGGINGFACE_API, REPO } = serverConfig;
export class ApiJobExecutor extends JobExecutor {
constructor() {
super();
}
async execute(jobId: string, jobData: any, credentials: any) {
try {
// Prepare environment and secrets for job execution
const { environment, secrets, apiJobId } = await this.submitJobToApi(
jobId,
jobData,
credentials
);
// Poll for completion with credentials
const result = await this.pollJobCompletion(apiJobId, credentials);
console.log(`✅ Job result length: ${result.length} characters`);
console.log(`📄 Job result preview:`, result.substring(0, 500));
// Extract diff from output
const diff = this.extractDiffFromOutput(result, jobId);
return {
success: true,
output: result,
diff: diff,
apiJobId: apiJobId,
environment: environment,
secrets: secrets,
};
} catch (error) {
console.error(`❌ API job execution failed for ${jobId}:`, error);
throw error;
}
}
private extractDiffFromOutput(output: string, jobId: string) {
const delimiter =
"================================================================================";
console.log(
`🔍 Extracting diff from output (${output.length} chars) for job ${jobId}`
);
// Count total occurrences for debugging
const delimiterCount = (
output.match(new RegExp(delimiter.replace(/=/g, "\\="), "g")) || []
).length;
console.log(`📊 Found ${delimiterCount} delimiter occurrences`);
// Find the last occurrence of the delimiter (end marker)
const endIndex = output.lastIndexOf(delimiter);
if (endIndex === -1) {
console.warn(
"❌ No end delimiter found in output, generating empty diff"
);
// Log a sample of the output for debugging
console.log(
"📄 Output sample (first 500 chars):",
output.substring(0, 500)
);
return {
jobId,
files: [],
summary: { totalAdditions: 0, totalDeletions: 0, totalFiles: 0 },
};
}
console.log(`📍 End delimiter found at position: ${endIndex}`);
// Find the second-to-last occurrence of the delimiter (start marker)
// by searching backwards from the position before the end delimiter
const searchUpTo = endIndex - 1;
const startIndex = output.lastIndexOf(delimiter, searchUpTo);
console.log(
`📍 Start delimiter search up to position ${searchUpTo}, found at: ${startIndex}`
);
if (startIndex !== -1 && startIndex !== endIndex) {
const diffContent = output
.substring(startIndex + delimiter.length, endIndex)
.trim();
console.log(`📝 Extracted diff content (${diffContent.length} chars)`);
console.log(`📄 Diff preview:`, diffContent.substring(0, 200));
if (diffContent) {
return this.parseDiff(diffContent, jobId);
} else {
console.warn("⚠️ Diff content is empty after trimming");
}
} else if (startIndex === -1) {
console.warn(
"⚠️ No start delimiter found - only one delimiter in output"
);
// If there's only one delimiter, maybe the diff is after it?
const contentAfterDelimiter = output
.substring(endIndex + delimiter.length)
.trim();
if (contentAfterDelimiter) {
console.log(
`🔄 Trying content after single delimiter (${contentAfterDelimiter.length} chars)`
);
console.log(
`📄 Content preview:`,
contentAfterDelimiter.substring(0, 200)
);
return this.parseDiff(contentAfterDelimiter, jobId);
}
} else {
console.warn("⚠️ Start and end delimiters are the same position");
}
console.warn("❌ No valid diff found in output, generating empty diff");
return {
jobId,
files: [],
summary: { totalAdditions: 0, totalDeletions: 0, totalFiles: 0 },
};
}
private async submitJobToApi(jobId: string, jobData: any, credentials: any) {
// Use credentials from request (required - no fallback)
const hfToken = credentials?.huggingfaceToken;
const openaiKey = credentials?.openaiApiKey;
const githubToken = credentials?.githubToken;
if (!hfToken) {
throw new Error(
"Hugging Face token is required but not provided in credentials"
);
}
const username = getEffectiveUsername(credentials);
// Get repository URL from job data or fall back to server config
const repositoryUrl = jobData.repository?.url || REPO.URL;
const repositoryBranch = jobData.repository?.branch || REPO.BRANCH;
// Handle GitHub repository access for private repos
let authenticatedRepoUrl = repositoryUrl;
let githubEphemeralToken = null;
if (githubToken && repositoryUrl.includes("github.com")) {
console.log("🔑 GitHub token available, checking repository access...");
// Validate repository access
const repoAccess = await GitHubTokenService.validateRepositoryAccess(
githubToken,
repositoryUrl
);
if (repoAccess.canAccess) {
console.log(
`📂 Repository access confirmed (private: ${repoAccess.isPrivate})`
);
// Create ephemeral token for container use (expires in 60 minutes)
const ephemeralResult = await GitHubTokenService.createEphemeralToken(
githubToken,
repositoryUrl,
60 // 60 minutes
);
if (ephemeralResult) {
githubEphemeralToken = ephemeralResult.token;
// Create authenticated clone URL for the container
authenticatedRepoUrl = GitHubTokenService.createAuthenticatedCloneUrl(
ephemeralResult.token,
repositoryUrl
);
console.log("🎫 Ephemeral GitHub token created for container access");
} else {
console.warn(
"⚠️ Could not create ephemeral token, using original URL"
);
}
} else {
console.warn("⚠️ Cannot access repository with provided GitHub token");
}
}
// Merge base environment with user's custom environment variables from job data
const environment = {
JOB_ID: jobId,
REPO_URL: authenticatedRepoUrl, // Use authenticated URL if available
REPO_BRANCH: repositoryBranch,
// PROMPT: `Clone the repository, then change to the repository directory (${repositoryUrl.split("/").pop()?.replace(".git", "") || "repo"}) and execute the following task: ${jobData.description}. Make sure to stay within the repository directory for all operations and use file editing tools to make any necessary changes.`,
PROMPT: jobData.description,
...dockerConfig.environment, // Global defaults
...(jobData.environment || {}), // Job-specific environment variables override global ones
};
// Merge required secrets with user's custom secrets from job data
const secrets = {
OPENAI_API_KEY: openaiKey,
...(githubEphemeralToken && { GITHUB_TOKEN: githubEphemeralToken }), // Add GitHub token if available
...dockerConfig.secrets, // Global secrets
...(jobData.secrets || {}), // Job-specific secrets override global ones
};
// Create job payload for Hugging Face API
const payload = {
command: ["/opt/agents/codex"],
arguments: [],
environment,
flavor: "cpu-basic",
dockerImage: dockerConfig.image,
secrets,
timeoutSeconds: HUGGINGFACE_API.TIMEOUT_SECONDS,
};
console.log("🔍 API Payload (environment and secrets debug):", {
...payload,
environment: payload.environment, // Show actual environment variables
secrets: Object.keys(payload.secrets).reduce(
(acc, key) => {
acc[key] = payload.secrets[key] ? "***" : "(not set)";
return acc;
},
{} as Record<string, string>
),
});
// Construct URL with username from credentials
// Format: https://huggingface.co/api/jobs/username
const apiUrl = `${HUGGINGFACE_API.BASE_URL.replace(
"/api/jobs/",
`/api/jobs/${username}`
)}`;
console.log(`🔗 Using API URL with username: ${apiUrl}`);
const response = await fetch(apiUrl, {
method: "POST",
headers: {
Authorization: `Bearer ${hfToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify(payload),
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(
`API request failed: ${response.status} ${response.statusText} - ${errorText}`
);
}
const result = await response.json();
const submittedApiJobId = result.id || result.jobId || result._id;
console.log(
`✅ Job submitted to API with ID: ${submittedApiJobId || "unknown"}`
);
return {
apiJobId: submittedApiJobId,
environment,
secrets,
};
}
private async pollJobCompletion(apiJobId: string, credentials: any) {
const maxAttempts = HUGGINGFACE_API.MAX_POLL_ATTEMPTS;
const pollInterval = HUGGINGFACE_API.POLL_INTERVAL;
const hfToken = credentials?.huggingfaceToken;
if (!hfToken) {
throw new Error(
"Hugging Face token is required for polling job completion"
);
}
// Use authenticated username from credentials or 'anonymous' as fallback
const username = getEffectiveUsername(credentials);
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
console.log(`Polling attempt ${attempt}/${maxAttempts}`);
// Construct URL with username from credentials
const statusUrl = `${HUGGINGFACE_API.BASE_URL.replace(
"/api/jobs/",
`/api/jobs/${username}`
)}/${apiJobId}`;
console.log(`🔗 Using status URL with username: ${statusUrl}`);
const statusResponse = await fetch(statusUrl, {
headers: {
Authorization: `Bearer ${hfToken}`,
},
});
if (!statusResponse.ok) {
console.warn(`⚠️ Status check failed: ${statusResponse.status}`);
await this.sleep(pollInterval);
continue;
}
const status = await statusResponse.json();
console.log(
`📊 Job status:`,
status.status || status.state || "unknown"
);
// Check if job is completed
if (this.isJobCompleted(status)) {
console.log(`✅ Job completed successfully`);
return await this.getJobOutput(apiJobId, credentials);
}
// Check if job failed
if (this.isJobFailed(status)) {
throw new Error(
`Job failed with status: ${status.status || status.status.stage}`
);
}
// Continue polling
await this.sleep(pollInterval);
} catch (error) {
console.error(
`❌ Error during polling attempt ${attempt}:`,
(error as Error).message
);
if (attempt === maxAttempts) {
throw error;
}
await this.sleep(pollInterval);
}
}
throw new Error(
"Job polling timeout - job did not complete within expected time"
);
}
private isJobCompleted(status: any): boolean {
const completedStates = [
"completed",
"succeeded",
"success",
"finished",
"done",
];
const currentState = (status.status?.stage || "").toLowerCase();
return completedStates.includes(currentState);
}
private isJobFailed(status: any): boolean {
const failedStates = ["failed", "error", "cancelled", "timeout", "aborted"];
const currentState = (status.status?.stage || "").toLowerCase();
return failedStates.includes(currentState);
}
private async getJobOutput(
apiJobId: string,
credentials: any
): Promise<string> {
try {
console.log(`📥 Fetching job output for ${apiJobId}`);
const hfToken = credentials?.huggingfaceToken;
if (!hfToken) {
throw new Error(
"Hugging Face token is required for fetching job output"
);
}
// Use authenticated username from credentials or 'anonymous' as fallback
const username = getEffectiveUsername(credentials);
// Construct URL with username from credentials
const logsUrl = `${HUGGINGFACE_API.BASE_URL.replace(
"/api/jobs/",
`/api/jobs/${username}`
)}/${apiJobId}/logs`;
console.log(`🔗 Using logs URL with username: ${logsUrl}`);
const outputResponse = await fetch(logsUrl, {
headers: {
Authorization: `Bearer ${hfToken}`,
},
});
if (!outputResponse.ok) {
console.warn(`⚠️ Could not fetch job output: ${outputResponse.status}`);
return "Job completed but output not available";
}
// Get the readable stream
const reader = outputResponse.body?.getReader();
if (!reader) {
return "Job completed but output not available";
}
const decoder = new TextDecoder();
let rawStream = "";
let parsedOutput = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// Decode the chunk and add to raw stream
const chunk = decoder.decode(value, { stream: true });
rawStream += chunk;
// Optional: Log progress for debugging
console.log(`📄 Received chunk: ${chunk.length} characters`);
}
// Final decode to handle any remaining bytes
rawStream += decoder.decode();
// Parse the SSE data format
const lines = rawStream.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
try {
const jsonData = line.substring(6); // Remove 'data: ' prefix
const parsed = JSON.parse(jsonData);
if (parsed.data) {
parsedOutput += parsed.data + "\n";
}
} catch (parseError) {
console.warn(`⚠️ Could not parse SSE line: ${line}`);
}
}
}
console.log(
`✅ Stream complete. Parsed output: ${parsedOutput.length} characters`
);
return parsedOutput.trim(); // Remove trailing newline
} finally {
// Always release the reader
reader.releaseLock();
}
} catch (error) {
console.error("❌ Error fetching job output:", error);
return "Job completed but output could not be retrieved";
}
}
async createBranchAndPush(options: {
repositoryUrl: string;
branch: string;
baseBranch: string;
title: string;
description: string;
files: any[];
credentials?: any;
}): Promise<{ branch: string; commitHash: string }> {
let tempDir: string | null = null;
try {
console.log(
`🌿 Creating branch '${options.branch}' and pushing changes...`
);
// Create temporary directory
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "hugex-git-"));
console.log(`📁 Created temp directory: ${tempDir}`);
// Get authenticated repository URL (if needed)
const repoUrl = await this.getAuthenticatedRepoUrl(options.repositoryUrl);
// 1. Shallow clone the repository
console.log(`📌 Cloning repository: ${options.repositoryUrl}`);
await execAsync(
`git clone --depth=1 --branch ${options.baseBranch} "${repoUrl}" repo`,
{
cwd: tempDir,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
}
);
const repoPath = path.join(tempDir, "repo");
// 2. Create and checkout new branch
console.log(`🌱 Creating branch: ${options.branch}`);
await execAsync(`git checkout -b "${options.branch}"`, {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
// 3. Apply file changes from diff
console.log(`🗏 Applying ${options.files.length} file changes...`);
await this.applyFileChanges(repoPath, options.files);
// 4. Configure git user (required for commits)
await execAsync('git config user.name "HugeX Bot"', {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
await execAsync(
'git config user.email "hugex@users.noreply.github.com"',
{
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
}
);
// 5. Stage all changes
console.log(`📚 Staging changes...`);
await execAsync("git add .", {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
// Check if there are any changes to commit
const { stdout: statusOutput } = await execAsync(
"git status --porcelain",
{
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
}
);
console.log(`Git status output:`, statusOutput);
if (!statusOutput.trim()) {
// No changes detected - let's create a minimal change to ensure we have something to commit
console.log("No changes detected, creating minimal change...");
// Create a simple marker file to indicate this branch was created by HugeX
const markerPath = path.join(repoPath, ".hugex-branch-marker");
const markerContent = `Branch created by Hugex\nTimestamp: ${new Date().toISOString()}\nBranch: ${
options.branch
}\nRepository: ${options.repositoryUrl}\n`;
await fs.writeFile(markerPath, markerContent, "utf8");
// Stage the marker file
await execAsync("git add .hugex-branch-marker", {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
console.log("Created marker file to ensure non-empty commit");
}
// 6. Commit changes
const commitMessage = `${options.title}\n\n${options.description}`;
console.log(`📝 Committing changes...`);
await execAsync(`git commit -m "${commitMessage.replace(/"/g, '\\"')}"`, {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
// 7. Get commit hash
const { stdout: commitHash } = await execAsync("git rev-parse HEAD", {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
// 8. Push branch to origin
console.log(`🚀 Pushing branch to origin...`);
await execAsync(`git push origin "${options.branch}"`, {
cwd: repoPath,
env: { ...process.env, GIT_TERMINAL_PROMPT: "0" },
});
const finalCommitHash = commitHash.trim();
console.log(
`✅ Successfully created branch '${options.branch}' with commit: ${finalCommitHash}`
);
return {
branch: options.branch,
commitHash: finalCommitHash,
};
} catch (error) {
console.error("❌ Failed to create branch and push:", error);
throw new Error(`Git operation failed: ${error.message}`);
} finally {
// Clean up temporary directory
if (tempDir) {
try {
await fs.rm(tempDir, { recursive: true, force: true });
console.log(`🧽 Cleaned up temp directory: ${tempDir}`);
} catch (cleanupError) {
console.warn(`⚠️ Failed to clean up temp directory: ${cleanupError}`);
}
}
}
}
private async getAuthenticatedRepoUrl(
repositoryUrl: string
): Promise<string> {
// For GitHub repositories, check if we have a token available
if (repositoryUrl.includes("github.com")) {
// In a production system, you would:
// 1. Get the GitHub token from the current user's session/credentials
// 2. Create an ephemeral token for this operation
// 3. Return authenticated URL with token
// For now, return the original URL
// TODO: Integrate with GitHubTokenService for authenticated access
console.log(
"⚠️ Using unauthenticated repository URL - private repositories may fail"
);
}
return repositoryUrl;
}
private async applyFileChanges(
repoPath: string,
files: any[]
): Promise<void> {
console.log(`Applying changes to ${files.length} files:`);
for (const file of files) {
console.log(`Processing file: ${file.filename} (status: ${file.status})`);
// Use diff field if available, otherwise fall back to patch
const diffContent = file.diff || file.patch;
console.log(
`File diff preview:`,
diffContent ? diffContent.substring(0, 150) + "..." : "NO DIFF/PATCH"
);
console.log(`Available fields:`, Object.keys(file));
const filePath = path.join(repoPath, file.filename);
try {
switch (file.status) {
case "added":
// Create new file
console.log(`Creating new file: ${filePath}`);
await fs.mkdir(path.dirname(filePath), { recursive: true });
await this.applyDiffToFile(filePath, diffContent, true);
break;
case "modified":
// Modify existing file
console.log(`Modifying existing file: ${filePath}`);
await this.applyDiffToFile(filePath, diffContent, false);
break;
case "deleted":
// Delete file
console.log(`Deleting file: ${filePath}`);
try {
await fs.unlink(filePath);
} catch (err) {
console.warn(`File already deleted or not found: ${filePath}`);
}
break;
case "renamed":
// Handle renamed files
console.log(
`Renaming file: ${file.oldFilename} -> ${file.filename}`
);
if (file.oldFilename) {
const oldPath = path.join(repoPath, file.oldFilename);
try {
await fs.rename(oldPath, filePath);
// Apply any changes to the renamed file
if (diffContent) {
await this.applyDiffToFile(filePath, diffContent, false);
}
} catch (err) {
console.warn(
`Rename failed, treating as new file: ${err.message}`
);
await this.applyDiffToFile(filePath, diffContent, true);
}
}
break;
default:
console.warn(
`Unknown file status: ${file.status} for ${file.filename}`
);
}
} catch (error) {
console.error(`Failed to apply changes to ${file.filename}:`, error);
throw error;
}
}
}
private async applyDiffToFile(
filePath: string,
diff: string,
isNewFile: boolean
): Promise<void> {
if (!diff) {
console.warn(`No diff content for ${filePath}`);
return;
}
console.log(`Applying diff to ${filePath}:`, {
isNewFile,
diffLength: diff.length,
diffPreview: diff.substring(0, 200) + (diff.length > 200 ? "..." : ""),
});
if (isNewFile) {
// For new files, extract content from diff
const content = this.extractContentFromDiff(diff);
console.log(
`Extracted content for new file (${content.length} chars):`,
content.substring(0, 100)
);
await fs.writeFile(filePath, content, "utf8");
} else {
// For existing files, apply patch
try {
const currentContent = await fs.readFile(filePath, "utf8");
console.log(
`Current file content (${currentContent.length} chars):`,
currentContent.substring(0, 100)
);
const patchedContent = this.applySimplePatch(currentContent, diff);
console.log(
`Patched content (${patchedContent.length} chars):`,
patchedContent.substring(0, 100)
);
await fs.writeFile(filePath, patchedContent, "utf8");
} catch (error) {
console.error(`Failed to apply patch to ${filePath}:`, error);
// Fallback: try to extract content from diff
const content = this.extractContentFromDiff(diff);
console.log(
`Fallback: extracted content (${content.length} chars):`,
content.substring(0, 100)
);
await fs.writeFile(filePath, content, "utf8");
}
}
}
private extractContentFromDiff(diff: string): string {
// Extract content from diff format
// This handles unified diff format and extracts added lines
if (!diff || diff.trim() === "") {
console.warn("Empty diff provided, returning empty content");
return "";
}
const lines = diff.split("\n");
const content: string[] = [];
let foundContent = false;
console.log(`Extracting content from diff with ${lines.length} lines`);
for (const line of lines) {
if (line.startsWith("+") && !line.startsWith("+++")) {
// Add line (remove the + prefix)
content.push(line.substring(1));
foundContent = true;
} else if (
!line.startsWith("-") &&
!line.startsWith("@@") &&
!line.startsWith("index") &&
!line.startsWith("diff") &&
!line.startsWith("+++") &&
!line.startsWith("---")
) {
// Context line (unchanged)
if (line.trim() !== "") {
content.push(line);
foundContent = true;
}
}
}
const result = content.join("\n");
console.log(
`Extracted ${content.length} lines, found content: ${foundContent}, result length: ${result.length}`
);
// If no content was found in the diff, it might be a simple text replacement
// Try to extract everything after the diff headers
if (!foundContent && diff.includes("@@")) {
const hunkStart = diff.indexOf("@@");
const secondHunkStart = diff.indexOf("@@", hunkStart + 2);
if (secondHunkStart !== -1) {
const hunkContent = diff.substring(
secondHunkStart + diff.substring(secondHunkStart).indexOf("\n") + 1
);
const hunkLines = hunkContent.split("\n");
const extractedContent: string[] = [];
for (const line of hunkLines) {
if (line.startsWith("+")) {
extractedContent.push(line.substring(1));
} else if (line.startsWith(" ")) {
extractedContent.push(line.substring(1));
}
}
if (extractedContent.length > 0) {
console.log(
`Fallback extraction found ${extractedContent.length} lines`
);
return extractedContent.join("\n");
}
}
}
return result;
}
private applySimplePatch(originalContent: string, diff: string): string {
// Simple patch application - this is a basic implementation
// For production, consider using a proper diff/patch library like 'diff' or 'node-patch'
const originalLines = originalContent.split("\n");
const diffLines = diff.split("\n");
const result: string[] = [];
let originalIndex = 0;
let inHunk = false;
let hunkOriginalStart = 0;
let hunkOriginalLength = 0;
let hunkNewStart = 0;
let hunkNewLength = 0;
let processedInHunk = 0;
for (const line of diffLines) {
if (line.startsWith("@@")) {
// Parse hunk header
const match = line.match(/@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@/);
if (match) {
hunkOriginalStart = parseInt(match[1]) - 1; // Convert to 0-based
hunkOriginalLength = match[2] ? parseInt(match[2]) : 1;
hunkNewStart = parseInt(match[3]) - 1;
hunkNewLength = match[4] ? parseInt(match[4]) : 1;
// Copy lines before this hunk
while (originalIndex < hunkOriginalStart) {
result.push(originalLines[originalIndex]);
originalIndex++;
}
inHunk = true;
processedInHunk = 0;
}
} else if (inHunk) {
if (line.startsWith("-")) {
// Remove line - skip it in original
originalIndex++;
processedInHunk++;
} else if (line.startsWith("+")) {
// Add line
result.push(line.substring(1));
processedInHunk++;
} else if (line.startsWith(" ")) {
// Context line - keep it
result.push(line.substring(1));
originalIndex++;
processedInHunk++;
}
// Check if hunk is complete
if (processedInHunk >= Math.max(hunkOriginalLength, hunkNewLength)) {
inHunk = false;
}
}
}
// Copy remaining lines
while (originalIndex < originalLines.length) {
result.push(originalLines[originalIndex]);
originalIndex++;
}
return result.join("\n");
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}