app/lib/server/processors/DockerJobExecutor.ts (634 lines of code) (raw):

import { JobExecutor } from "./JobExecutor"; import serverConfig from "../config"; import { dockerConfig } from "../../../routes/api.config.docker"; import { getJobStore } from "../jobStore"; const { DOCKER, REPO } = serverConfig; export class DockerJobExecutor extends JobExecutor { private docker: any; constructor() { super(); this.docker = null; // Will be initialized when needed } async execute(jobId: string, jobData: any, credentials: any) { console.log(`🐳 Executing job ${jobId} via Docker`); console.log(`📦 Job data:`, jobData); try { // Initialize Docker if not already done if (!this.docker) { try { // Dynamic import to avoid issues in client-side code const Docker = await import("dockerode"); this.docker = new Docker.default(); // Test Docker connection await this.docker.ping(); console.log("🐳 Docker daemon connected successfully"); } catch (error) { throw new Error(`Docker daemon not available: ${error.message}`); } } // Execute job in Docker container using the same setup as API const { output, environment, secrets } = await this.runJobInContainer( jobId, jobData, credentials ); // Extract diff from output const diff = this.extractDiffFromOutput(output, jobId); return { success: true, output: output, diff: diff, environment: environment, secrets: secrets, }; } catch (error) { console.error(`❌ Docker job execution failed for ${jobId}:`, error); throw error; } } private extractDiffFromOutput(output: string, jobId: string) { const delimiter = "================================================================================"; console.log( `🔍 Extracting diff from output (${output.length} chars) for job ${jobId}` ); // Count total occurrences for debugging const delimiterCount = ( output.match(new RegExp(delimiter.replace(/=/g, "\\="), "g")) || [] ).length; console.log(`📊 Found ${delimiterCount} delimiter occurrences`); // Find the last occurrence of the delimiter (end marker) const endIndex = output.lastIndexOf(delimiter); if (endIndex === -1) { console.warn( "❌ No end delimiter found in output, generating empty diff" ); // Log a sample of the output for debugging console.log( "📄 Output sample (first 500 chars):", output.substring(0, 500) ); return { jobId, files: [], summary: { totalAdditions: 0, totalDeletions: 0, totalFiles: 0 }, }; } console.log(`📍 End delimiter found at position: ${endIndex}`); // Find the second-to-last occurrence of the delimiter (start marker) // by searching backwards from the position before the end delimiter const searchUpTo = endIndex - 1; const startIndex = output.lastIndexOf(delimiter, searchUpTo); console.log( `📍 Start delimiter search up to position ${searchUpTo}, found at: ${startIndex}` ); if (startIndex !== -1 && startIndex !== endIndex) { const diffContent = output .substring(startIndex + delimiter.length, endIndex) .trim(); console.log(`📝 Extracted diff content (${diffContent.length} chars)`); console.log(`📄 Diff preview:`, diffContent.substring(0, 200)); if (diffContent) { return this.parseDiff(diffContent, jobId); } else { console.warn("⚠️ Diff content is empty after trimming"); } } else if (startIndex === -1) { console.warn( "⚠️ No start delimiter found - only one delimiter in output" ); // If there's only one delimiter, maybe the diff is after it? const contentAfterDelimiter = output .substring(endIndex + delimiter.length) .trim(); if (contentAfterDelimiter) { console.log( `🔄 Trying content after single delimiter (${contentAfterDelimiter.length} chars)` ); console.log( `📄 Content preview:`, contentAfterDelimiter.substring(0, 200) ); return this.parseDiff(contentAfterDelimiter, jobId); } } else { console.warn("⚠️ Start and end delimiters are the same position"); } console.warn("❌ No valid diff found in output, generating empty diff"); return { jobId, files: [], summary: { totalAdditions: 0, totalDeletions: 0, totalFiles: 0 }, }; } private async runJobInContainer( jobId: string, jobData: any, credentials: any ): Promise<{ output: string; environment: Record<string, string>; secrets: Record<string, string>; }> { console.log(`🐳 Creating Docker container for job ${jobId}`); // Use credentials from request (required - no fallback) const openaiKey = credentials?.openaiApiKey; // (Optional) enforce openaiKey presence if needed: // if (!openaiKey) { // throw new Error( // "OpenAI API key is required but not provided in credentials" // ); // } // Use user's Docker configuration let imageRef = dockerConfig.image; console.log(`🐳 Using Docker image: ${imageRef}`); // REMOVE to handle private registry or custom namespace // If the default image is "codex-universal-explore:dev" if (imageRef === "codex-universal-explore:dev") { imageRef = "docker.io/drbh/codex-universal-explore:dev"; } // Check if image exists locally try { await this.docker.getImage(imageRef).inspect(); console.log(`✅ Docker image found: ${imageRef}`); } catch (error) { console.error(`❌ Docker image not found: ${imageRef}`); try { const pullStream = await this.docker.pull(imageRef); await new Promise<void>((resolve, reject) => { this.docker.modem.followProgress( pullStream, (err: Error) => { if (err) return reject(err); return resolve(); }, (event: any) => { // (Optional) uncomment to see each pull progress event: console.log(event.status, event.progress || ""); } ); }); console.log(`✅ Successfully pulled image: ${imageRef}`); } catch (pullErr) { console.error( `❌ Failed to pull image "${imageRef}": ${pullErr.message}` ); throw new Error( `Could not pull Docker image "${imageRef}". ` + `If it’s on Docker Hub under namespace "drbh", make sure you’ve pushed ` + `"${imageRef}". If it’s on a private registry, ensure you’re logged in and the name is correct.` ); } } // Get repository URL from job data or fall back to server config const repositoryUrl = jobData.repository?.url || REPO.URL; const repositoryBranch = jobData.repository?.branch || REPO.BRANCH; console.log( `📂 Repository: ${repositoryUrl} (branch: ${repositoryBranch})` ); console.log( `📋 Environment variables: ${ Object.keys(dockerConfig.environment).length } custom variables` ); console.log( `🔐 Secrets: ${Object.keys(dockerConfig.secrets).length} custom secrets` ); // Merge base environment with user's custom environment variables const baseEnvironment = { JOB_ID: jobId, REPO_URL: repositoryUrl, REPO_BRANCH: repositoryBranch, PROMPT: jobData.description, ...dockerConfig.environment, // User's custom env vars }; const secrets = { OPENAI_API_KEY: openaiKey, ...dockerConfig.secrets, // User's custom secrets }; // Convert to array format for Docker const environment = [ ...Object.entries(baseEnvironment).map( ([key, value]) => `${key}=${value}` ), ...Object.entries(secrets).map(([key, value]) => `${key}=${value}`), ]; // Use the user's configured Docker image const container = await this.docker.createContainer({ Image: imageRef, Cmd: ["/opt/agents/codex"], Env: environment, WorkingDir: "/workspace", Tty: false, AttachStdout: true, AttachStderr: true, HostConfig: { AutoRemove: true, Memory: DOCKER.MEMORY_LIMIT, CpuShares: DOCKER.CPU_SHARES, }, }); console.log(`🚀 Starting container for job ${jobId}`); // Start container await container.start(); // Get container logs const logs = await this.getContainerLogs(container, jobId); console.log( `📜 Container logs for job ${jobId} (${logs.length} characters)` ); console.log(`📄 Container logs preview:`, logs.substring(0, 500)); return { output: logs, environment: baseEnvironment, secrets: secrets, }; } private async getContainerLogs( container: any, jobId: string ): Promise<string> { // Get logs stream const stream = await container.logs({ stdout: true, stderr: true, follow: true, }); return new Promise((resolve, reject) => { let output = ""; const jobStore = getJobStore(); stream.on("data", async (chunk: Buffer) => { // Docker multiplexes stdout/stderr, need to handle the stream format const cleanChunk = this.cleanDockerStreamChunk(chunk); output += cleanChunk; // Store incremental logs in real-time try { await jobStore.setJobLogs(jobId, output); } catch (error) { console.error( `Failed to store incremental logs for job ${jobId}:`, error ); } }); stream.on("end", () => { console.log(`✅ Container execution completed for job ${jobId}`); console.log(`📋 Total output length: ${output.length} characters`); resolve(output); }); stream.on("error", (error: Error) => { console.error(`🚨 Container error for job ${jobId}:`, error); reject(error); }); // Timeout after configured time setTimeout(() => { container.kill().catch(console.error); // Attempt to kill the container reject(new Error("Container execution timeout")); }, DOCKER.TIMEOUT); }); } // Helper method to clean Docker stream multiplexing private cleanDockerStreamChunk(chunk: Buffer): string { // Docker streams are multiplexed with 8-byte headers // Format: [STREAM_TYPE][0][0][0][SIZE][SIZE][SIZE][SIZE][DATA...] if (chunk.length < 8) { return chunk.toString(); } let result = ""; let offset = 0; while (offset < chunk.length) { if (offset + 8 > chunk.length) { // Not enough bytes for a complete header, treat as raw data result += chunk.slice(offset).toString(); break; } // Read the size from bytes 4-7 (big-endian) const size = chunk.readUInt32BE(offset + 4); if (size === 0) { offset += 8; continue; } if (offset + 8 + size > chunk.length) { // Not enough data for the claimed size, treat as raw data result += chunk.slice(offset).toString(); break; } // Extract the actual data const data = chunk.slice(offset + 8, offset + 8 + size); result += data.toString(); offset += 8 + size; } return result; } async createBranchAndPush(options: { repositoryUrl: string; branch: string; baseBranch: string; title: string; description: string; files: any[]; credentials?: any; }): Promise<{ branch: string; commitHash: string }> { // For Docker executor, we'll use the same Git operations as ApiJobExecutor // since Git operations don't need to run inside the container const { promises: fs } = await import("fs"); const { exec } = await import("child_process"); const { promisify } = await import("util"); const path = await import("path"); const os = await import("os"); const execAsync = promisify(exec); let tempDir: string | null = null; try { console.log( `🌿 Creating branch '${options.branch}' and pushing changes...` ); // Create temporary directory tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "hugex-git-")); console.log(`📁 Created temp directory: ${tempDir}`); // Get authenticated repository URL (if needed) const repoUrl = await this.getAuthenticatedRepoUrl(options.repositoryUrl); // 1. Shallow clone the repository console.log(`📌 Cloning repository: ${options.repositoryUrl}`); await execAsync( `git clone --depth=1 --branch ${options.baseBranch} "${repoUrl}" repo`, { cwd: tempDir, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, } ); const repoPath = path.join(tempDir, "repo"); // 2. Create and checkout new branch console.log(`🌱 Creating branch: ${options.branch}`); await execAsync(`git checkout -b "${options.branch}"`, { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); // 3. Apply file changes from diff console.log(`🗏 Applying ${options.files.length} file changes...`); await this.applyFileChanges(repoPath, options.files); // 4. Configure git user (required for commits) await execAsync('git config user.name "HugeX Bot"', { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); await execAsync( 'git config user.email "hugex@users.noreply.github.com"', { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, } ); // 5. Stage all changes console.log(`📚 Staging changes...`); await execAsync("git add .", { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); // Check if there are any changes to commit const { stdout: statusOutput } = await execAsync( "git status --porcelain", { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, } ); console.log(`Git status output:`, statusOutput); if (!statusOutput.trim()) { // No changes detected - let's create a minimal change to ensure we have something to commit console.log("No changes detected, creating minimal change..."); // Create a simple marker file to indicate this branch was created by HugeX const markerPath = path.join(repoPath, ".hugex-branch-marker"); const markerContent = `Branch created by Hugex\nTimestamp: ${new Date().toISOString()}\nBranch: ${ options.branch }\nRepository: ${options.repositoryUrl}\n`; await fs.writeFile(markerPath, markerContent, "utf8"); // Stage the marker file await execAsync("git add .hugex-branch-marker", { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); console.log("Created marker file to ensure non-empty commit"); } // 6. Commit changes const commitMessage = `${options.title}\n\n${options.description}`; console.log(`📝 Committing changes...`); await execAsync(`git commit -m "${commitMessage.replace(/"/g, '\\"')}"`, { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); // 7. Get commit hash const { stdout: commitHash } = await execAsync("git rev-parse HEAD", { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); // 8. Push branch to origin console.log(`🚀 Pushing branch to origin...`); await execAsync(`git push origin "${options.branch}"`, { cwd: repoPath, env: { ...process.env, GIT_TERMINAL_PROMPT: "0" }, }); const finalCommitHash = commitHash.trim(); console.log( `✅ Successfully created branch '${options.branch}' with commit: ${finalCommitHash}` ); return { branch: options.branch, commitHash: finalCommitHash, }; } catch (error) { console.error("❌ Failed to create branch and push:", error); throw new Error(`Git operation failed: ${error.message}`); } finally { // Clean up temporary directory if (tempDir) { try { await fs.rm(tempDir, { recursive: true, force: true }); console.log(`🧽 Cleaned up temp directory: ${tempDir}`); } catch (cleanupError) { console.warn(`⚠️ Failed to clean up temp directory: ${cleanupError}`); } } } } private async getAuthenticatedRepoUrl( repositoryUrl: string ): Promise<string> { // For GitHub repositories, check if we have a token available if (repositoryUrl.includes("github.com")) { // In a production system, you would: // 1. Get the GitHub token from the current user's session/credentials // 2. Create an ephemeral token for this operation // 3. Return authenticated URL with token // For now, return the original URL // TODO: Integrate with GitHubTokenService for authenticated access console.log( "⚠️ Using unauthenticated repository URL - private repositories may fail" ); } return repositoryUrl; } private async applyFileChanges( repoPath: string, files: any[] ): Promise<void> { const { promises: fs } = await import("fs"); const path = await import("path"); console.log(`Applying changes to ${files.length} files:`); for (const file of files) { console.log(`Processing file: ${file.filename} (status: ${file.status})`); // Use diff field if available, otherwise fall back to patch const diffContent = file.diff || file.patch; console.log( `File diff preview:`, diffContent ? diffContent.substring(0, 150) + "..." : "NO DIFF/PATCH" ); console.log(`Available fields:`, Object.keys(file)); const filePath = path.join(repoPath, file.filename); try { switch (file.status) { case "added": // Create new file console.log(`Creating new file: ${filePath}`); await fs.mkdir(path.dirname(filePath), { recursive: true }); await this.applyDiffToFile(filePath, diffContent, true); break; case "modified": // Modify existing file console.log(`Modifying existing file: ${filePath}`); await this.applyDiffToFile(filePath, diffContent, false); break; case "deleted": // Delete file console.log(`Deleting file: ${filePath}`); try { await fs.unlink(filePath); } catch (err) { console.warn(`File already deleted or not found: ${filePath}`); } break; case "renamed": // Handle renamed files console.log( `Renaming file: ${file.oldFilename} -> ${file.filename}` ); if (file.oldFilename) { const oldPath = path.join(repoPath, file.oldFilename); try { await fs.rename(oldPath, filePath); // Apply any changes to the renamed file if (diffContent) { await this.applyDiffToFile(filePath, diffContent, false); } } catch (err) { console.warn( `Rename failed, treating as new file: ${err.message}` ); await this.applyDiffToFile(filePath, diffContent, true); } } break; default: console.warn( `Unknown file status: ${file.status} for ${file.filename}` ); } } catch (error) { console.error(`Failed to apply changes to ${file.filename}:`, error); throw error; } } } private async applyDiffToFile( filePath: string, diff: string, isNewFile: boolean ): Promise<void> { const { promises: fs } = await import("fs"); if (!diff) { console.warn(`No diff content for ${filePath}`); return; } console.log(`Applying diff to ${filePath}:`, { isNewFile, diffLength: diff.length, diffPreview: diff.substring(0, 200) + (diff.length > 200 ? "..." : ""), }); if (isNewFile) { // For new files, extract content from diff const content = this.extractContentFromDiff(diff); console.log( `Extracted content for new file (${content.length} chars):`, content.substring(0, 100) ); await fs.writeFile(filePath, content, "utf8"); } else { // For existing files, apply patch try { const currentContent = await fs.readFile(filePath, "utf8"); console.log( `Current file content (${currentContent.length} chars):`, currentContent.substring(0, 100) ); const patchedContent = this.applySimplePatch(currentContent, diff); console.log( `Patched content (${patchedContent.length} chars):`, patchedContent.substring(0, 100) ); await fs.writeFile(filePath, patchedContent, "utf8"); } catch (error) { console.error(`Failed to apply patch to ${filePath}:`, error); // Fallback: try to extract content from diff const content = this.extractContentFromDiff(diff); console.log( `Fallback: extracted content (${content.length} chars):`, content.substring(0, 100) ); await fs.writeFile(filePath, content, "utf8"); } } } private extractContentFromDiff(diff: string): string { // Extract content from diff format // This handles unified diff format and extracts added lines if (!diff || diff.trim() === "") { console.warn("Empty diff provided, returning empty content"); return ""; } const lines = diff.split("\n"); const content: string[] = []; let foundContent = false; console.log(`Extracting content from diff with ${lines.length} lines`); for (const line of lines) { if (line.startsWith("+") && !line.startsWith("+++")) { // Add line (remove the + prefix) content.push(line.substring(1)); foundContent = true; } else if ( !line.startsWith("-") && !line.startsWith("@@") && !line.startsWith("index") && !line.startsWith("diff") && !line.startsWith("+++") && !line.startsWith("---") ) { // Context line (unchanged) if (line.trim() !== "") { content.push(line); foundContent = true; } } } const result = content.join("\n"); console.log( `Extracted ${content.length} lines, found content: ${foundContent}, result length: ${result.length}` ); // If no content was found in the diff, it might be a simple text replacement // Try to extract everything after the diff headers if (!foundContent && diff.includes("@@")) { const hunkStart = diff.indexOf("@@"); const secondHunkStart = diff.indexOf("@@", hunkStart + 2); if (secondHunkStart !== -1) { const hunkContent = diff.substring( secondHunkStart + diff.substring(secondHunkStart).indexOf("\n") + 1 ); const hunkLines = hunkContent.split("\n"); const extractedContent: string[] = []; for (const line of hunkLines) { if (line.startsWith("+")) { extractedContent.push(line.substring(1)); } else if (line.startsWith(" ")) { extractedContent.push(line.substring(1)); } } if (extractedContent.length > 0) { console.log( `Fallback extraction found ${extractedContent.length} lines` ); return extractedContent.join("\n"); } } } return result; } private applySimplePatch(originalContent: string, diff: string): string { // Simple patch application - this is a basic implementation // For production, consider using a proper diff/patch library like 'diff' or 'node-patch' const originalLines = originalContent.split("\n"); const diffLines = diff.split("\n"); const result: string[] = []; let originalIndex = 0; let inHunk = false; let hunkOriginalStart = 0; let hunkOriginalLength = 0; let hunkNewStart = 0; let hunkNewLength = 0; let processedInHunk = 0; for (const line of diffLines) { if (line.startsWith("@@")) { // Parse hunk header const match = line.match(/@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@/); if (match) { hunkOriginalStart = parseInt(match[1]) - 1; // Convert to 0-based hunkOriginalLength = match[2] ? parseInt(match[2]) : 1; hunkNewStart = parseInt(match[3]) - 1; hunkNewLength = match[4] ? parseInt(match[4]) : 1; // Copy lines before this hunk while (originalIndex < hunkOriginalStart) { result.push(originalLines[originalIndex]); originalIndex++; } inHunk = true; processedInHunk = 0; } } else if (inHunk) { if (line.startsWith("-")) { // Remove line - skip it in original originalIndex++; processedInHunk++; } else if (line.startsWith("+")) { // Add line result.push(line.substring(1)); processedInHunk++; } else if (line.startsWith(" ")) { // Context line - keep it result.push(line.substring(1)); originalIndex++; processedInHunk++; } // Check if hunk is complete if (processedInHunk >= Math.max(hunkOriginalLength, hunkNewLength)) { inHunk = false; } } } // Copy remaining lines while (originalIndex < originalLines.length) { result.push(originalLines[originalIndex]); originalIndex++; } return result.join("\n"); } // Health check for Docker availability async healthCheck(): Promise<{ available: boolean; error?: string }> { try { if (!this.docker) { const Docker = await import("dockerode"); this.docker = new Docker.default(); } await this.docker.ping(); return { available: true }; } catch (error) { return { available: false, error: error.message }; } } }