src/org/jetbrains/r/console/jobs/RJobRunner.kt (133 lines of code) (raw):

/* * Copyright 2000-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file. */ package org.jetbrains.r.console.jobs import com.intellij.execution.impl.ConsoleViewImpl import com.intellij.execution.process.ProcessEvent import com.intellij.execution.process.ProcessHandler import com.intellij.execution.process.ProcessListener import com.intellij.openapi.application.EDT import com.intellij.openapi.application.edtWriteAction import com.intellij.openapi.components.Service import com.intellij.openapi.components.service import com.intellij.openapi.fileEditor.FileDocumentManager import com.intellij.openapi.project.Project import com.intellij.r.psi.RPluginUtil import com.intellij.r.psi.interpreter.RInterpreter import com.intellij.r.psi.interpreter.RInterpreterManager import com.intellij.r.psi.interpreter.RInterpreterUtil import com.intellij.r.psi.interpreter.runHelperProcess import com.intellij.util.EventDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.jetbrains.annotations.TestOnly import org.jetbrains.r.console.RConsoleManagerImpl import org.jetbrains.r.console.RConsoleViewImpl import org.jetbrains.r.rinterop.RInteropImpl import java.lang.reflect.Field import java.lang.reflect.Modifier import java.util.* import kotlin.reflect.full.memberProperties import kotlin.reflect.jvm.javaField @Service(Service.Level.PROJECT) class RJobRunner( private val project: Project, val coroutineScope: CoroutineScope, ) { internal val eventDispatcher = EventDispatcher.create(Listener::class.java) fun canRun(): Boolean = RInterpreterManager.getInstance(project).hasInterpreterLocation() @TestOnly internal suspend fun runForTest(task: RJobTask): ProcessHandler = suspendableRun(task) private suspend fun suspendableRun(task: RJobTask, exportEnvName: String? = null): ProcessHandler { check(canRun()) edtWriteAction { FileDocumentManager.getInstance().saveAllDocuments() } val rConsoleManager = RConsoleManagerImpl.getInstance(project) val console = rConsoleManager.currentConsoleOrNull val rInterop = console?.rInterop val interpreter = RInterpreterManager.getInstance(project).awaitInterpreter().getOrThrow() interpreter.prepareForExecution() val (scriptFile, exportRDataFile) = generateRunScript(interpreter, task, rInterop) val processHandler: ProcessHandler = interpreter.runHelperProcess(scriptFile, emptyList(), task.workingDirectory) if (exportRDataFile != null) { installProcessListener(processHandler, exportRDataFile, console, task, exportEnvName) } return processHandler } private fun installProcessListener( processHandler: ProcessHandler, exportRDataFile: String, console: RConsoleViewImpl?, task: RJobTask, exportEnvName: String? = null, ) { console?.rInterop?.let { rInterop -> processHandler.addProcessListener( object : ProcessListener { override fun processWillTerminate(event: ProcessEvent, willBeDestroyed: Boolean) { if (rInterop.isAlive) { val variableName = if (task.exportGlobalEnv == ExportGlobalEnvPolicy.EXPORT_TO_VARIABLE) exportEnvName ?: (task.script.nameWithoutExtension + "_results") else "" rInterop.loadEnvironment(exportRDataFile, variableName).then { console.debuggerPanel?.onCommandExecuted() } } } }, rInterop ) } } private fun generateRunScript(interpreter: RInterpreter, task: RJobTask, rInterop: RInteropImpl?): Pair<String, String?> { var importFile: String? = null var exportFile: String? = null if (task.importGlobalEnv) { if (rInterop?.isAlive == true) { importFile = interpreter.createTempFileOnHost("import.RData") rInterop.saveGlobalEnvironment(importFile).blockingGet(RInterpreterUtil.DEFAULT_TIMEOUT) } } if (task.exportGlobalEnv != ExportGlobalEnvPolicy.DO_NO_EXPORT) { exportFile = interpreter.createTempFileOnHost("export.RData") } var text = RPluginUtil.findTextInRHelpersOrNull("R/SourceWithProgress.template.R")!! text = text.replace("<file-path>", interpreter.uploadFileToHostIfNeeded(task.script).toRString()) .replace("<rdata-import>", importFile?.toRString() ?: "NULL") .replace("<rdata-export>", exportFile?.toRString() ?: "NULL") return Pair(interpreter.createTempFileOnHost("rjob.R", text.toByteArray()), exportFile) } suspend fun runRJob(task: RJobTask, exportEnvName: String? = null, name: String? = null): RJobDescriptor { return withContext(Dispatchers.Default) { val processHandler = suspendableRun(task, exportEnvName) val consoleView = ConsoleViewImpl(project, true) consoleView.attachToProcess(processHandler) val myInputMessageFilterField = ConsoleViewImpl::class.memberProperties.first { it.name == "myInputMessageFilter" } val rJobProgressProvider = RJobProgressProvider() val rSourceProgressInputFilter = RSourceProgressInputFilter(rJobProgressProvider::onProgressAvailable) setFinalStatic(consoleView, myInputMessageFilterField.javaField!!, rSourceProgressInputFilter) val rJobDescriptor = RJobDescriptorImpl(project, task, rJobProgressProvider, processHandler, consoleView, name) eventDispatcher.multicaster.onJobDescriptionCreated(rJobDescriptor) processHandler.startNotify() coroutineScope.launch(Dispatchers.EDT) { RJobsToolWindowFactory.getJobsPanel(project)?.addJobDescriptor(rJobDescriptor) RJobsToolWindowFactory.focusOnJobs(project) } return@withContext rJobDescriptor } } companion object { fun getInstance(project: Project): RJobRunner = project.service() // todo get rid of this private fun setFinalStatic(o: Any, field: Field, newValue: Any) { field.isAccessible = true val modifiersField: Field = Field::class.java.getDeclaredField("modifiers") modifiersField.isAccessible = true modifiersField.setInt(field, field.modifiers and Modifier.FINAL.inv()) field.set(o, newValue) } } internal interface Listener : EventListener { fun onJobDescriptionCreated(rJobDescriptor: RJobDescriptor) } private fun String.toRString(): String = "'${replace("""\""", """\\""").replace("""'""", """\'""")}'" }