cdslogviewer/app/controllers/LogsController.scala (109 lines of code) (raw):

package controllers import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, Framing, Keep, Sink, Source} import akka.util.ByteString import auth.{BearerTokenAuth, Security} import org.slf4j.LoggerFactory import play.api.Configuration import play.api.mvc.{AbstractController, ControllerComponents, ResponseHeader, Result} import java.nio.file.{Files, Paths} import javax.inject.{Inject, Singleton} import scala.concurrent.ExecutionContext import scala.jdk.CollectionConverters._ import io.circe.generic.auto._ import io.circe.syntax._ import play.api.cache.SyncCacheApi import play.api.http.HttpEntity import play.api.libs.circe.Circe import responses.{GenericErrorResponse, LogInfo} import java.nio.file.attribute.PosixFileAttributeView import java.time.format.DateTimeFormatter import java.time.{ZoneId, ZonedDateTime} @Singleton class LogsController @Inject() (cc:ControllerComponents, override val bearerTokenAuth:BearerTokenAuth, override implicit val config:Configuration, override implicit val cache:SyncCacheApi) (implicit system:ActorSystem, mat:Materializer) extends AbstractController(cc) with Security with Circe { override val logger = LoggerFactory.getLogger(getClass) private implicit val ec:ExecutionContext = system.dispatcher private implicit val tz:ZoneId = config.getOptional[String]("timezone").map(ZoneId.of).getOrElse(ZoneId.systemDefault()) def listRoutes = IsAdminAsync { uid=> request=> val path = Paths.get(config.get[String]("cds.logbase")) logger.debug(s"Logs base path is $path") Source.fromIterator(()=>Files.newDirectoryStream(path).asScala.iterator) .filter(_.toFile.isDirectory) .map(path.relativize) //convert to a path relative to the log base .map(_.toString) .filter(! _.startsWith(".")) .toMat(Sink.seq)(Keep.right) .run() .map(dirs=>Ok(dirs.asJson)) .recover({ case err:Throwable=> logger.error(s"Could not list directories: ${err.getMessage}",err) InternalServerError(GenericErrorResponse("config_error", "Could not list directories, see server logs").asJson) }) } def listLogs(route:String) = IsAdmin { uid=> request=> val base = config.get[String]("cds.logbase") val path = Paths.get(base, route) if(!path.toString.startsWith(base)) { BadRequest(GenericErrorResponse("bad_request","That is not a valid log").asJson) } else if(!path.toFile.exists()) { NotFound(GenericErrorResponse("not_found","The given route logs do not exist").asJson) } else { val stream = Source.fromIterator(()=>Files.newDirectoryStream(path).asScala.iterator) .map(LogInfo.fromPath) .collect({case Some(logInfo)=>logInfo}) .map(_.asJson.noSpaces + "\n") .map(ByteString.apply) Result( header = ResponseHeader(200, Map.empty), body = HttpEntity.Streamed(stream, None, Some("application/x-ndjson")) ) } } def streamLog(route:String, logname:String, fromLine:Long) = IsAdmin { uid=> request=> val base = config.get[String]("cds.logbase") val path = Paths.get(base, route, logname) if(!path.toString.startsWith(base)) { BadRequest(GenericErrorResponse("bad_request","That is not a valid log").asJson) } else if(!path.toFile.exists()) { NotFound(GenericErrorResponse("not_found","The given log file does not exist").asJson) } else { try { val view = Files.getFileAttributeView(path, classOf[PosixFileAttributeView]) val modTime = ZonedDateTime.ofInstant(view.readAttributes().lastModifiedTime().toInstant, tz) val stream = FileIO.fromPath(path) .via(Framing.delimiter(ByteString("\n"), 32768, true)) .drop(fromLine) .map(_ ++ ByteString("\n")) //setting Content-Length to the length of the file does not make sense, since we may have skipped an unknown //number of characters if fromLine != 0 Result( header = ResponseHeader(200, Map("X-Logfile-Modified"->modTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))), body = HttpEntity.Streamed(stream, None, Some("text/plain")) ) } catch { case err:Throwable=> logger.error(s"Could not stream log '$logname' from '$route': ${err.getMessage}", err) InternalServerError(GenericErrorResponse("error", err.getMessage).asJson) } } } case class URLLink(log_url:String) def logByJobName(name:String) = IsAdmin { uid=> request=> val base = config.get[String]("cds.logbase") val path = Paths.get(base, "podnames", name+".txt") if(!path.toFile.exists()) { logger.error(s"Pod name file not found at path: ${path.toString()}") NotFound(GenericErrorResponse("not_found","Job name not found").asJson) } else { val fileSource = scala.io.Source.fromFile(base + "/podnames/" + name + ".txt") val fileData = try fileSource.mkString finally fileSource.close() val uRLToUse = fileData.replace("/var/log/cds_backend/", "/log/").filterNot(_.isWhitespace) Ok(URLLink(uRLToUse).asJson) } } }