src/org/jetbrains/r/console/jobs/RJobRunner.kt (133 lines of code) (raw):
/*
* Copyright 2000-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file.
*/
package org.jetbrains.r.console.jobs
import com.intellij.execution.impl.ConsoleViewImpl
import com.intellij.execution.process.ProcessEvent
import com.intellij.execution.process.ProcessHandler
import com.intellij.execution.process.ProcessListener
import com.intellij.openapi.application.EDT
import com.intellij.openapi.application.edtWriteAction
import com.intellij.openapi.components.Service
import com.intellij.openapi.components.service
import com.intellij.openapi.fileEditor.FileDocumentManager
import com.intellij.openapi.project.Project
import com.intellij.r.psi.RPluginUtil
import com.intellij.r.psi.interpreter.RInterpreter
import com.intellij.r.psi.interpreter.RInterpreterManager
import com.intellij.r.psi.interpreter.RInterpreterUtil
import com.intellij.r.psi.interpreter.runHelperProcess
import com.intellij.util.EventDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.jetbrains.annotations.TestOnly
import org.jetbrains.r.console.RConsoleManagerImpl
import org.jetbrains.r.console.RConsoleViewImpl
import org.jetbrains.r.rinterop.RInteropImpl
import java.lang.reflect.Field
import java.lang.reflect.Modifier
import java.util.*
import kotlin.reflect.full.memberProperties
import kotlin.reflect.jvm.javaField
@Service(Service.Level.PROJECT)
class RJobRunner(
private val project: Project,
val coroutineScope: CoroutineScope,
) {
internal val eventDispatcher = EventDispatcher.create(Listener::class.java)
fun canRun(): Boolean = RInterpreterManager.getInstance(project).hasInterpreterLocation()
@TestOnly
internal suspend fun runForTest(task: RJobTask): ProcessHandler = suspendableRun(task)
private suspend fun suspendableRun(task: RJobTask, exportEnvName: String? = null): ProcessHandler {
check(canRun())
edtWriteAction {
FileDocumentManager.getInstance().saveAllDocuments()
}
val rConsoleManager = RConsoleManagerImpl.getInstance(project)
val console = rConsoleManager.currentConsoleOrNull
val rInterop = console?.rInterop
val interpreter = RInterpreterManager.getInstance(project).awaitInterpreter().getOrThrow()
interpreter.prepareForExecution()
val (scriptFile, exportRDataFile) = generateRunScript(interpreter, task, rInterop)
val processHandler: ProcessHandler = interpreter.runHelperProcess(scriptFile, emptyList(), task.workingDirectory)
if (exportRDataFile != null) {
installProcessListener(processHandler, exportRDataFile, console, task, exportEnvName)
}
return processHandler
}
private fun installProcessListener(
processHandler: ProcessHandler,
exportRDataFile: String,
console: RConsoleViewImpl?,
task: RJobTask,
exportEnvName: String? = null,
) {
console?.rInterop?.let { rInterop ->
processHandler.addProcessListener(
object : ProcessListener {
override fun processWillTerminate(event: ProcessEvent, willBeDestroyed: Boolean) {
if (rInterop.isAlive) {
val variableName = if (task.exportGlobalEnv == ExportGlobalEnvPolicy.EXPORT_TO_VARIABLE)
exportEnvName ?: (task.script.nameWithoutExtension + "_results")
else ""
rInterop.loadEnvironment(exportRDataFile, variableName).then {
console.debuggerPanel?.onCommandExecuted()
}
}
}
},
rInterop
)
}
}
private fun generateRunScript(interpreter: RInterpreter, task: RJobTask, rInterop: RInteropImpl?): Pair<String, String?> {
var importFile: String? = null
var exportFile: String? = null
if (task.importGlobalEnv) {
if (rInterop?.isAlive == true) {
importFile = interpreter.createTempFileOnHost("import.RData")
rInterop.saveGlobalEnvironment(importFile).blockingGet(RInterpreterUtil.DEFAULT_TIMEOUT)
}
}
if (task.exportGlobalEnv != ExportGlobalEnvPolicy.DO_NO_EXPORT) {
exportFile = interpreter.createTempFileOnHost("export.RData")
}
var text = RPluginUtil.findTextInRHelpersOrNull("R/SourceWithProgress.template.R")!!
text = text.replace("<file-path>", interpreter.uploadFileToHostIfNeeded(task.script).toRString())
.replace("<rdata-import>", importFile?.toRString() ?: "NULL")
.replace("<rdata-export>", exportFile?.toRString() ?: "NULL")
return Pair(interpreter.createTempFileOnHost("rjob.R", text.toByteArray()), exportFile)
}
suspend fun runRJob(task: RJobTask, exportEnvName: String? = null, name: String? = null): RJobDescriptor {
return withContext(Dispatchers.Default) {
val processHandler = suspendableRun(task, exportEnvName)
val consoleView = ConsoleViewImpl(project, true)
consoleView.attachToProcess(processHandler)
val myInputMessageFilterField = ConsoleViewImpl::class.memberProperties.first { it.name == "myInputMessageFilter" }
val rJobProgressProvider = RJobProgressProvider()
val rSourceProgressInputFilter = RSourceProgressInputFilter(rJobProgressProvider::onProgressAvailable)
setFinalStatic(consoleView, myInputMessageFilterField.javaField!!, rSourceProgressInputFilter)
val rJobDescriptor = RJobDescriptorImpl(project, task, rJobProgressProvider, processHandler, consoleView, name)
eventDispatcher.multicaster.onJobDescriptionCreated(rJobDescriptor)
processHandler.startNotify()
coroutineScope.launch(Dispatchers.EDT) {
RJobsToolWindowFactory.getJobsPanel(project)?.addJobDescriptor(rJobDescriptor)
RJobsToolWindowFactory.focusOnJobs(project)
}
return@withContext rJobDescriptor
}
}
companion object {
fun getInstance(project: Project): RJobRunner = project.service()
// todo get rid of this
private fun setFinalStatic(o: Any, field: Field, newValue: Any) {
field.isAccessible = true
val modifiersField: Field = Field::class.java.getDeclaredField("modifiers")
modifiersField.isAccessible = true
modifiersField.setInt(field, field.modifiers and Modifier.FINAL.inv())
field.set(o, newValue)
}
}
internal interface Listener : EventListener {
fun onJobDescriptionCreated(rJobDescriptor: RJobDescriptor)
}
private fun String.toRString(): String =
"'${replace("""\""", """\\""").replace("""'""", """\'""")}'"
}