app/lib/server/jobProcessor.ts (104 lines of code) (raw):
import { ApiJobExecutor } from "./processors/ApiJobExecutor";
import { DockerJobExecutor } from "./processors/DockerJobExecutor";
import { JobExecutor } from "./processors/JobExecutor";
import { JobStore } from "./jobStore";
import serverConfig from "./config";
import type { Job } from "~/types/job";
// JobProcessor supports both API and Docker execution modes
export class JobProcessor {
private executor: JobExecutor;
private currentMode: string;
constructor(mode?: string) {
this.currentMode = mode || serverConfig.EXECUTION_MODE;
this.executor = this.createExecutor(this.currentMode);
}
private createExecutor(mode: string): JobExecutor {
switch (mode) {
case "docker":
return new DockerJobExecutor();
case "api":
default:
return new ApiJobExecutor();
}
}
async processJob(jobId: string, jobStore: JobStore, credentials: any) {
try {
// Update status to running
await jobStore.updateJobStatus(jobId, "running");
// Get job data for context
const job = await jobStore.getJob(jobId);
// Execute the job using the API executor with credentials
const result = await this.executor.execute(jobId, job, credentials);
// Store the environment and secrets used for this job
if (result.environment || result.secrets || result.apiJobId) {
await jobStore.updateJobEnvironment(
jobId,
result.environment || {},
result.secrets
? Object.keys(result.secrets).reduce(
(acc, key) => {
acc[key] = "***"; // Mask secret values for security
return acc;
},
{} as Record<string, string>
)
: {},
result.apiJobId
);
}
// Store the full output logs (before extracting diff)
if (result.output) {
await jobStore.setJobLogs(jobId, result.output);
}
// Store the diff
await jobStore.setJobDiff(jobId, result.diff);
// Update status to completed with changes
await jobStore.updateJobStatus(jobId, "completed", {
additions: result.diff.summary.totalAdditions,
deletions: result.diff.summary.totalDeletions,
files: result.diff.summary.totalFiles,
});
console.log(
`✅ Job ${jobId} completed successfully via ${this.currentMode}`
);
return result;
} catch (error) {
console.error(
`❌ Job ${jobId} failed in ${this.currentMode} mode:`,
error
);
await jobStore.updateJobStatus(jobId, "failed");
throw error;
}
}
async createBranchAndPush(options: {
repositoryUrl: string;
branch: string;
baseBranch: string;
title: string;
description: string;
files: any[];
credentials?: any;
}): Promise<{ branch: string; commitHash: string }> {
try {
// Use the API executor to create branch and push changes
const result = await this.executor.createBranchAndPush(options);
return result;
} catch (error) {
console.error("Failed to create branch and push:", error);
throw error;
}
}
// Method to switch execution mode
switchExecutionMode(mode: string) {
if (mode !== this.currentMode && (mode === "api" || mode === "docker")) {
this.currentMode = mode;
this.executor = this.createExecutor(mode);
console.log(`🔄 Switched execution mode to: ${mode}`);
} else if (mode !== "api" && mode !== "docker") {
console.warn(
`Invalid execution mode: ${mode}. Valid modes are 'api' and 'docker'.`
);
}
}
// Get current execution mode
getExecutionMode(): string {
return this.currentMode;
}
}
// Singleton instance
let jobProcessorInstance: JobProcessor | null = null;
export function getJobProcessor(): JobProcessor {
if (!jobProcessorInstance) {
jobProcessorInstance = new JobProcessor();
}
return jobProcessorInstance;
}