test-utils/jvm/src/TestBase.kt (161 lines of code) (raw):
package kotlinx.coroutines.testing
import kotlinx.coroutines.scheduling.*
import java.io.*
import java.util.*
import kotlin.coroutines.*
import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicReference
import kotlin.test.*
actual val VERBOSE = try {
System.getProperty("test.verbose")?.toBoolean() ?: false
} catch (e: SecurityException) {
false
}
/**
* Is `true` when running in a nightly stress test mode.
*/
actual val isStressTest = System.getProperty("stressTest")?.toBoolean() ?: false
actual val stressTestMultiplierSqrt = if (isStressTest) 5 else 1
private const val SHUTDOWN_TIMEOUT = 1_000L // 1s at most to wait per thread
/**
* Multiply various constants in stress tests by this factor, so that they run longer during nightly stress test.
*/
actual val stressTestMultiplier = stressTestMultiplierSqrt * stressTestMultiplierSqrt
@Suppress("ACTUAL_WITHOUT_EXPECT")
actual typealias TestResult = Unit
internal actual fun lastResortReportException(error: Throwable) {
System.err.println("${error.message}${error.cause?.let { ": $it" } ?: ""}")
error.cause?.printStackTrace(System.err)
System.err.println("--- Detected at ---")
Throwable().printStackTrace(System.err)
}
/**
* Base class for tests, so that tests for predictable scheduling of actions in multiple coroutines sharing a single
* thread can be written. Use it like this:
*
* ```
* class MyTest : TestBase() {
* @Test
* fun testSomething() = runBlocking { // run in the context of the main thread
* expect(1) // initiate action counter
* launch { // use the context of the main thread
* expect(3) // the body of this coroutine in going to be executed in the 3rd step
* }
* expect(2) // launch just scheduled coroutine for execution later, so this line is executed second
* yield() // yield main thread to the launched job
* finish(4) // fourth step is the last one. `finish` must be invoked or test fails
* }
* }
* ```
*/
actual open class TestBase(
private var disableOutCheck: Boolean,
private val errorCatching: ErrorCatching.Impl = ErrorCatching.Impl()
): OrderedExecutionTestBase(), ErrorCatching by errorCatching {
actual constructor(): this(false)
// Shutdown sequence
private lateinit var threadsBefore: Set<Thread>
private val uncaughtExceptions = Collections.synchronizedList(ArrayList<Throwable>())
private var originalUncaughtExceptionHandler: Thread.UncaughtExceptionHandler? = null
actual fun println(message: Any?) {
PrintlnStrategy.actualSystemOut.println(message)
}
@BeforeTest
fun before() {
initPoolsBeforeTest()
threadsBefore = currentThreads()
originalUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
Thread.setDefaultUncaughtExceptionHandler { t, e ->
println("Exception in thread $t: $e") // The same message as in default handler
e.printStackTrace()
uncaughtExceptions.add(e)
}
PrintlnStrategy.configure(disableOutCheck)
}
@AfterTest
fun onCompletion() {
// onCompletion should not throw exceptions before it finishes all cleanup, so that other tests always
// start in a clear, restored state, so we postpone throwing the observed errors.
fun cleanupStep(block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
reportError(e)
}
}
cleanupStep { checkFinishCall() }
// Reset the output stream first
cleanupStep { PrintlnStrategy.reset() }
// Shutdown all thread pools
cleanupStep { shutdownPoolsAfterTest() }
// Check that are now leftover threads
cleanupStep { checkTestThreads(threadsBefore) }
// Restore original uncaught exception handler after the main shutdown sequence
Thread.setDefaultUncaughtExceptionHandler(originalUncaughtExceptionHandler)
if (uncaughtExceptions.isNotEmpty()) {
reportError(IllegalStateException("Expected no uncaught exceptions, but got $uncaughtExceptions"))
}
// The very last action -- throw all the detected errors
errorCatching.close()
}
actual fun runTest(
expected: ((Throwable) -> Boolean)?,
unhandled: List<(Throwable) -> Boolean>,
block: suspend CoroutineScope.() -> Unit
): TestResult {
var exCount = 0
var ex: Throwable? = null
try {
runBlocking(block = block, context = CoroutineExceptionHandler { _, e ->
if (e is CancellationException) return@CoroutineExceptionHandler // are ignored
exCount++
when {
exCount > unhandled.size ->
error("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e)
!unhandled[exCount - 1](e) ->
error("Unhandled exception was unexpected: $e", e)
}
})
} catch (e: Throwable) {
ex = e
if (expected != null) {
if (!expected(e))
error("Unexpected exception: $e", e)
} else {
throw e
}
} finally {
if (ex == null && expected != null) error("Exception was expected but none produced")
}
if (exCount < unhandled.size)
error("Too few unhandled exceptions $exCount, expected ${unhandled.size}")
}
protected suspend fun currentDispatcher() = coroutineContext[ContinuationInterceptor]!!
}
private object PrintlnStrategy {
/**
* Installs a custom [PrintStream] instead of [System.out] to capture all the output and throw an exception if
* any was detected.
*
* Removes the previously set println handler and throws the exceptions detected by it.
* If [disableOutCheck] is set, this is the only effect.
*/
fun configure(disableOutCheck: Boolean) {
val systemOut = System.out
if (systemOut is TestOutputStream) {
try {
systemOut.remove()
} catch (e: AssertionError) {
throw AssertionError("The previous TestOutputStream contained ", e)
}
}
if (!disableOutCheck) {
// Invariant: at most one indirection level in `TestOutputStream`.
System.setOut(TestOutputStream(actualSystemOut))
}
}
/**
* Removes the custom [PrintStream] and throws an exception if any output was detected.
*/
fun reset() {
(System.out as? TestOutputStream)?.remove()
}
/**
* The [PrintStream] representing the actual stdout, ignoring the replacement [TestOutputStream].
*/
val actualSystemOut: PrintStream get() = when (val out = System.out) {
is TestOutputStream -> out.previousOut
else -> out
}
private class TestOutputStream(
/*
* System.out that we redefine in order to catch any debugging/diagnostics
* 'println' from main source set.
* NB: We do rely on the name 'previousOut' in the FieldWalker in order to skip its
* processing
*/
val previousOut: PrintStream,
private val myOutputStream: MyOutputStream = MyOutputStream(),
) : PrintStream(myOutputStream) {
fun remove() {
System.setOut(previousOut)
if (myOutputStream.firstPrintStacktace.get() != null) {
throw AssertionError(
"Detected a println. The captured output is: <<<${myOutputStream.capturedOutput}>>>",
myOutputStream.firstPrintStacktace.get()
)
}
}
private class MyOutputStream(): OutputStream() {
val capturedOutput = ByteArrayOutputStream()
val firstPrintStacktace = AtomicReference<Throwable?>(null)
override fun write(b: Int) {
if (firstPrintStacktace.get() == null) {
firstPrintStacktace.compareAndSet(null, IllegalStateException())
}
capturedOutput.write(b)
}
}
}
}
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
fun initPoolsBeforeTest() {
DefaultScheduler.usePrivateScheduler()
}
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
fun shutdownPoolsAfterTest() {
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
DefaultScheduler.restore()
}
actual val isNative = false
actual val isBoundByJsTestTimeout = false
/*
* We ignore tests that test **real** non-virtualized tests with time on Windows, because
* our CI Windows is virtualized itself (oh, the irony) and its clock resolution is dozens of ms,
* which makes such tests flaky.
*/
actual val isJavaAndWindows: Boolean = System.getProperty("os.name")!!.contains("Windows")
actual val usesSharedEventLoop: Boolean = false