in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/OMDelete.scala [30:78]
override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var vault:Vault = _
override def preStart(): Unit = {
try {
vault = MatrixStore.openVault(vaultInfo)
} catch {
case err:Throwable=>
logger.error("Could not connect to MatrixStore: ", err)
failStage(err)
throw err
}
}
override def postStop(): Unit = {
if(vault!=null) vault.dispose()
}
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
if(reallyDelete) {
logger.debug(s"Deleting ${elem.oid} (${elem.maybeGetFilename()})...")
OMDelete.doDelete(vault, elem) match {
case Success(_)=>
logger.info(s"Successfully deleted ${elem.oid} (${elem.maybeGetFilename()})")
push(out, elem)
case Failure(err)=>
logger.error(s"Could not delete ${elem.oid}: ${err.getMessage}")
if(failOnError) {
failStage(err)
} else {
pull(in)
}
}
} else {
logger.warn(s"If reallyDelete were true then I would attempt to delete ${elem.oid} (${elem.maybeGetFilename()})")
push(out, elem)
}
}
})
}