in core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala [218:234]
override def close(): Unit = {
logInfo(s"close is called. ${EventHubsUtils.getTaskContextSlim}")
val future = Future.sequence(pendingWorks)
future.onComplete {
case Success(_) => cleanup()
case Failure(e) =>
logError(
s"failed to complete pending tasks. event hubs: ${ehConf.name}, ${EventHubsUtils.getTaskContextSlim}",
e)
cleanup()
throw e
}
Await.result(future, ehConf.internalOperationTimeout)
}