in common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala [191:248]
override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(
this,
DATABASE_ATTS_DELETE,
s"[ATTS_DELETE] deleting attachments of document 'id: $docId' with prefix ${objectKeyPrefix(docId)}")
var count = 0
val f = Source
.fromPublisher(client.listBlobsByHierarchy(objectKeyPrefix(docId)))
.mapAsync(1) { b =>
count += 1
val startDelete =
transid.started(
this,
DATABASE_ATT_DELETE,
s"[ATT_DELETE] deleting attachment '${b.getName}' of document 'id: $docId'")
client
.getBlobAsyncClient(b.getName)
.delete()
.toFuture
.toScala
.map(
_ =>
transid.finished(
this,
startDelete,
s"[ATT_DELETE] completed: deleting attachment '${b.getName}' of document 'id: $docId'"))
.recover {
case t =>
transid.failed(
this,
startDelete,
s"[ATT_DELETE] failed: deleting attachment '${b.getName}' of document 'id: $docId' error: $t")
}
}
.recover {
case t =>
logging.error(this, s"[ATT_DELETE] :error in delete ${t}")
throw t
}
.runWith(Sink.seq)
.map(_ => true)
f.foreach(
_ =>
transid.finished(
this,
start,
s"[ATTS_DELETE] completed: deleting ${count} attachments of document 'id: $docId'",
InfoLevel))
reportFailure(
f,
start,
failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}