override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of()

in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/OMDelete.scala [30:78]


  override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var vault:Vault = _

    override def preStart(): Unit = {
      try {
        vault = MatrixStore.openVault(vaultInfo)
      } catch {
        case err:Throwable=>
          logger.error("Could not connect to MatrixStore: ", err)
          failStage(err)
          throw err
      }
    }

    override def postStop(): Unit = {
      if(vault!=null) vault.dispose()
    }

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = pull(in)
    })

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val elem = grab(in)

        if(reallyDelete) {
          logger.debug(s"Deleting ${elem.oid} (${elem.maybeGetFilename()})...")
          OMDelete.doDelete(vault, elem) match {
            case Success(_)=>
              logger.info(s"Successfully deleted ${elem.oid} (${elem.maybeGetFilename()})")
              push(out, elem)
            case Failure(err)=>
              logger.error(s"Could not delete ${elem.oid}: ${err.getMessage}")
              if(failOnError) {
                failStage(err)
              } else {
                pull(in)
              }
          }
        } else {
          logger.warn(s"If reallyDelete were true then I would attempt to delete ${elem.oid} (${elem.maybeGetFilename()})")
          push(out, elem)
        }
      }
    })
  }