def fileString()

in magenta-lib/src/main/scala/magenta/tasks/gcp/GCSTasks.scala [72:161]


  def fileString(quantity: Int) =
    s"$quantity file${if (quantity != 1) "s" else ""}"

  // end-user friendly description of this task
  def description: String =
    s"Upload ${fileString(objectMappings.size)} to GCS bucket $gcsTargetBucket using file mapping $paths"

  // execute this task (should throw on failure)
  override def execute(
      resources: DeploymentResources,
      stopFlag: => Boolean
  ): Unit = {
    if (totalSize == 0) {
      val locationDescription = paths
        .map {
          case (path: S3Path, _) => path.show()
          case (location, _)     => location.toString
        }
        .mkString("\n")
      resources.reporter.fail(
        s"No files found to upload in $locationDescription"
      )
    }

    val withClient = withClientFactory(keyRing, resources)
    withClient { client =>
      val currentlyDeployedObjectsToDelete =
        getCurrentObjectsForDeletion(client)

      resources.reporter.verbose(
        s"Starting transfer of ${fileString(objectMappings.size)} ($totalSize bytes)"
      )
      transfers.zipWithIndex.par.foreach { case (transfer, index) =>
        logger.debug(s"Transferring $transfer")
        index match {
          case x if x < 10 =>
            resources.reporter.verbose(s"Transferring $transfer")
          case 10 =>
            resources.reporter.verbose(
              s"Not logging details for the remaining ${fileString(objectMappings.size - 10)}"
            )
          case _ =>
        }
        GCP.api.retryWhen500orGoogleError(
          resources.reporter,
          s"GCS Upload $transfer"
        ) {
          val copyObjectRequest = GetObjectRequest
            .builder()
            .bucket(transfer.source.bucket)
            .key(transfer.source.key)
            .build()
          val inputStream = resources.artifactClient
            .getObjectAsBytes(copyObjectRequest)
            .asInputStream()
          val contentType = Option(transfer.target.getContentType)
            .getOrElse(URLConnection.guessContentTypeFromStream(inputStream))
          val result = client
            .objects()
            .insert(
              gcsTargetBucket.name,
              transfer.target,
              new InputStreamContent(contentType, inputStream)
            )
            .execute()
          logger.debug(
            s"Put object ${result.getName}: MD5: ${result.getMd5Hash} Metadata: ${result.getMetadata}"
          )
          result
        }
      }
      currentlyDeployedObjectsToDelete.par.foreach {
        case storageObjectToDelete =>
          resources.reporter.verbose(
            s"Deleting obsolete file from GCP: gcs://${gcsTargetBucket.name}/${storageObjectToDelete.getName}"
          )
          val errorMessage =
            s"Could notremove obselete object ${storageObjectToDelete.getName}"
          GCP.api.retryWhen500orGoogleError(resources.reporter, errorMessage) {
            client
              .objects()
              .delete(gcsTargetBucket.name, storageObjectToDelete.getName)
              .execute
          }
      }
    }
    resources.reporter.verbose(
      s"Finished transfer of ${fileString(objectMappings.size)}"
    )
  }