app/helpers/S3XMLProcessor.scala (165 lines of code) (raw):
package helpers
import akka.stream.alpakka.s3.ListBucketResultContents
import java.net.URLDecoder
import java.time.Instant
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic}
import akka.util.ByteString
import scala.xml.pull.XMLEventReader
import play.api.Logger
import scala.collection.mutable
import scala.io.Source
import scala.util.{Failure, Success, Try}
import scala.xml.pull._
class S3XMLProcessor (skipZeroLength:Boolean=true) extends GraphStage[FlowShape[ByteString,ListBucketResultContents]]{
private val in:Inlet[ByteString] = Inlet.create("S3XMLProcessor.in")
private val out:Outlet[ListBucketResultContents] = Outlet.create("S3XMLProcessor.out")
private val logger=Logger(getClass)
override def shape: FlowShape[ByteString, ListBucketResultContents] = FlowShape.of(in, out)
def makeProperPath(str: String):String = {
val decoded = str.split("/").map(URLDecoder.decode(_, "UTF-8")).mkString("/")
if (decoded.slice(0, 1) == "/") {
decoded.substring(1)
} else {
decoded
}
decoded
}
/**
* generate a ListBucketContentsResult based on a Map of content from the raw S3 XML output
* @param captures Map of (key->value) corresponding to the <Contents> structure of the S3 cml
* @param bucketName bucket name that this relates to
* @return a ListBucketContents result if we could coerce the map into its structure, or None (with an error output via the logger)
*/
def listBucketResultFor(captures: Map[String, String], bucketName:String): Option[ListBucketResultContents] = try {
val lastModTime = Instant.parse(captures("LastModified"))
Some(ListBucketResultContents(bucketName, makeProperPath(captures("Key")), captures("ETag"), captures("Size").toLong, lastModTime, captures("StorageClass")))
} catch {
case ex:Throwable=>
logger.error(s"Could not convert $captures into ListbucketResultContents: ", ex)
None
}
private def captureFromXml(xml:XMLEventReader, currNode:List[String],allCaptures:Map[String,String]):Map[String,String] =
if(xml.hasNext){
xml.next() match {
case EvElemStart(_, label, attrs, scope)=>
logger.debug(s"\telem start: $label")
captureFromXml(xml, label :: currNode, allCaptures)
case EvText(text)=>
if(currNode.nonEmpty) {
logger.debug(s"\ttext: $text, currentCapture: ${currNode.head}")
val updatedText = allCaptures.get(currNode.head) match {
case Some(existingText) => existingText + text
case None => text
}
val updatedCaptures = allCaptures ++ Map(currNode.head -> updatedText)
logger.debug(updatedCaptures.toString)
captureFromXml(xml, currNode, updatedCaptures)
} else {
captureFromXml(xml, currNode, allCaptures)
}
case EvElemEnd(_, label)=>
logger.debug(s"\telem end: $label")
if(currNode.isEmpty){
allCaptures
} else {
captureFromXml(xml, currNode.tail, allCaptures)
}
case _=>
captureFromXml(xml, currNode, allCaptures)
}
} else {
allCaptures
}
def parseContentNode(xml:XMLEventReader, bucketName:String):Option[ListBucketResultContents] = {
val allCaptures = captureFromXml(xml, List.empty, Map.empty)
listBucketResultFor(allCaptures, bucketName)
}
def parseDoc(xml:XMLEventReader)(cb:Either[S3Error, ListBucketResultContents]=>Unit): Unit = {
def loop(currNode: List[String], bucketName:String, inBucketName:Boolean=false): Unit = {
logger.debug(s"in loop: $currNode")
logger.debug(s"xml.hasNext: ${xml.hasNext}")
if(xml.hasNext){
xml.next() match {
case EvElemStart(prefix, label, attrs, scope)=>
logger.debug(s"start element: $label")
if(label=="Contents"){
logger.debug("Got contents")
parseContentNode(xml, bucketName) match {
case Some(result)=>
logger.debug(s"Got $result")
cb(Right(result))
case None=>logger.error(s"Could not build ListBucketResult")
}
} else if(label=="Error"){
val errorData = captureFromXml(xml, List.empty, Map.empty)
S3Error.fromMap(errorData) match {
case Success(err)=>cb(Left(err))
case Failure(excp)=>
logger.error("Could not generate error entity: ", excp)
}
}
if(label=="Name"){
loop(label::currNode, bucketName, inBucketName = true)
} else {
loop(label::currNode, bucketName)
}
case EvElemEnd(prefix, label)=>
logger.debug(s"end element: $label")
if(label=="Bucket"){
loop(currNode.tail, bucketName)
} else {
loop(currNode.tail, bucketName, inBucketName)
}
case EvText(text)=>
logger.debug(s"text: '$text'")
if(inBucketName){
logger.debug(s"Got bucketname $text")
loop(currNode, bucketName + text.trim, inBucketName)
} else {
loop(currNode, bucketName, inBucketName)
}
case EvEntityRef(entity)=>
logger.debug(s"entity: $entity")
loop(currNode, bucketName, inBucketName)
case _=>
logger.debug("Got nothing")
loop(currNode, bucketName, inBucketName)
}
}
}
loop(List.empty, "")
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = Logger(getClass)
var entriesQueue:mutable.Queue[ListBucketResultContents] = mutable.Queue()
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
//elem is an XML document, that can't necessarily be trusted...
logger.info("xml document:")
logger.info(elem.utf8String)
val xml = new XMLEventReader(Source.fromBytes(elem.toByteBuffer.array(), "UTF-8"))
parseDoc(xml)({
case Right(entry)=>
if(skipZeroLength){
if(entry.size>0) entriesQueue.enqueue(entry)
} else {
entriesQueue.enqueue(entry)
}
case Left(s3error)=>
logger.error(s"Got an error from S3: ${s3error.toString}")
failStage(new RuntimeException(s3error.toString))
})
if(entriesQueue.nonEmpty){
push(out, entriesQueue.dequeue())
} else {
completeStage()
}
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
logger.debug("pull from downstream")
if(entriesQueue.nonEmpty){
push(out, entriesQueue.dequeue())
} else {
pull(in)
}
}
})
}
}