override def shape: SinkShape[ByteString] = SinkShape.of()

in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/MatrixStoreFileSink.scala [23:77]


  override def shape: SinkShape[ByteString] = SinkShape.of(in)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Long]) = {
    val completionPromise = Promise[Long]

    val logic = new GraphStageLogic(shape) {
      private val logger = LoggerFactory.getLogger(getClass)
      private var channel:SeekableByteChannel = _
      private var ctr:Long = 0

      setHandler(in, new AbstractInHandler {
        override def onPush(): Unit = {
          val byteArr = grab(in).toArray
          val buffer = ByteBuffer.allocate(byteArr.length)
          buffer.put(byteArr)
          ctr+=byteArr.length
          buffer.flip()
          channel.write(buffer)
          pull(in)
        }

        override def onUpstreamFailure(ex: Throwable): Unit = {
          logger.error(s"Copy source failed with error ${ex.getMessage}, aborting copy")
          if(channel!=null) {
            logger.debug("Closing write channel")
            channel.close()
          }
          completionPromise.failure(ex)
        }
      })

      override def preStart(): Unit = {
        try {
          logger.info(s"Requesting write to ${mxsFile.getId}...")
          channel = mxsFile.newSeekableObjectChannel(Set(AccessOption.WRITE).asJava)

          pull(in)
        } catch {
          case err:Throwable=>
            logger.error(s"Could not set up MXS file to write to: ", err)
            failStage(err)
        }
      }

      override def postStop(): Unit = {
        if(channel!=null) {
          logger.debug("Closing write channel")
          channel.close()
        }
        logger.debug(s"Wrote $ctr bytes")
        completionPromise.success(ctr)
      }
    }
    (logic, completionPromise.future)
  }