def apply[F[_]: Sync, S]()

in lib/salesforce/sttp-client/src/main/scala/com/gu/salesforce/sttp/SalesforceClient.scala [44:240]


  def apply[F[_]: Sync, S](
      backend: SttpBackend[F, S],
      config: SFAuthConfig,
  ): EitherT[F, SalesforceClientError, SalesforceClient[F]] = {

    def auth(config: SFAuthConfig): EitherT[F, SalesforceClientError, SalesforceAuth] = {
      sendRequest[SalesforceAuth](
        basicRequest
          .post(
            Uri(new URI(config.url + "/services/oauth2/token")),
          )
          .body(
            "client_id" -> config.client_id,
            "client_secret" -> config.client_secret,
            "username" -> config.username,
            "password" -> (config.password + config.token),
            "grant_type" -> "password",
          )
          .response(asJson[SalesforceAuth])
          .mapResponse(_.left.map(e => DeserializationException(e.getMessage, DecodingFailure(e.getMessage, Nil)))),
      )
    }

    def followNextRecordsLinks[A: Decoder](
        auth: SalesforceAuth,
        records: List[A],
        optionalNextRecordsLink: Option[String],
    ): EitherT[F, SalesforceClientError, RecordsWrapperCaseClass[A]] = {
      optionalNextRecordsLink match {
        case Some(nextRecordsLinks) =>
          for {
            nextPageResults <- sendAuthenticatedRequest[QueryRecordsWrapperCaseClass[A]](
              auth,
              Method.GET,
              Uri(new URI(auth.instance_url + nextRecordsLinks)),
            )
            allRecords <- followNextRecordsLinks(
              auth,
              records ++ nextPageResults.records,
              nextPageResults.nextRecordsUrl,
            )
          } yield allRecords
        case None =>
          EitherT.rightT(RecordsWrapperCaseClass(records))
      }
    }

    def sendAuthenticatedRequestWithOptionalBody[REQ_BODY: Encoder, RESP_BODY: Decoder](
        auth: SalesforceAuth,
        method: Method,
        uri: Uri,
        body: Option[REQ_BODY],
    ): EitherT[F, SalesforceClientError, RESP_BODY] = {

      /*
       * This is to decode an empty String as a RESP_BODY.
       * Circe treats an empty response body as an error
       * because it's not valid json.
       */
      def decode[RESP_BODY: Decoder](s: String) = {
        val parsed =
          if (s.isEmpty) Right(Json.Null)
          else parse(s)
        parsed.flatMap(_.as[RESP_BODY])
      }

      val requestWithoutBody: Request[Either[DeserializationException[circe.Error], RESP_BODY], Any] =
        basicRequest
          .method(method, uri)
          .headers(
            Map(
              "Authorization" -> s"Bearer ${auth.access_token}",
              "X-SFDC-Session" -> auth.access_token,
              "Content-Type" -> "application/json",
            ),
          )
          .mapResponse {
            case Left(failure) =>
              logger.info(failure)
              decode[RESP_BODY](failure).left.map(e => DeserializationException(e.getMessage, e))

            case Right(success) =>
              logger.info(success)
              val errorOrRespbody: Either[circe.Error, RESP_BODY] = decode[RESP_BODY](success)
              errorOrRespbody.left.map(e => DeserializationException(e.getMessage, e))
          }

      val bodyAsStringNoNulls = printer.print(body.asJson)

      sendRequest[RESP_BODY](
        body.fold(requestWithoutBody)(_ => requestWithoutBody.body(bodyAsStringNoNulls)),
      )
    }

    def sendAuthenticatedRequest[RESP_BODY: Decoder](
        auth: SalesforceAuth,
        method: Method,
        uri: Uri,
    ): EitherT[F, SalesforceClientError, RESP_BODY] = sendAuthenticatedRequestWithOptionalBody[Unit, RESP_BODY](
      auth,
      method,
      uri,
      body = None,
    )

    def sendRequest[A](
        request: Request[Either[DeserializationException[circe.Error], A], Any],
    ): EitherT[F, SalesforceClientError, A] = {
      for {
        response <- EitherT.right[SalesforceClientError](request.send(backend))
        responseBody <- EitherT.fromEither[F](formatError[A](request, response))
      } yield responseBody
    }

    def formatError[A](
        request: Request[Either[DeserializationException[circe.Error], A], S],
        response: Response[Either[DeserializationException[circe.Error], A]],
    ): Either[SalesforceClientError, A] = {
      response.body.left.map(errorBody =>
        SalesforceClientError(
          s"Request ${request.method.method} ${request.uri.toString()} failed returning a status ${response.code} with body: ${errorBody}",
        ),
      )
    }

    def logQuery(query: String) =
      Sync[F]
        .delay(logger.info(s"Sending query to Salesforce: ${query}"))
        .asRight[SalesforceClientError]
        .toEitherT[F]

    for {
      auth <- auth(config)
      client = new SalesforceClient[F]() {

        override def query[A: Decoder](
            query: String,
        ): EitherT[F, SalesforceClientError, RecordsWrapperCaseClass[A]] = {
          val initialQueryUri = Uri(new URI(auth.instance_url + soqlQueryBaseUrl)).addParam("q", query)
          for {
            _ <- logQuery(query)
            initialQueryResults <- sendAuthenticatedRequest[QueryRecordsWrapperCaseClass[A]](
              auth,
              Method.GET,
              initialQueryUri,
            )
            allResults <- followNextRecordsLinks[A](
              auth,
              initialQueryResults.records,
              initialQueryResults.nextRecordsUrl,
            )
          } yield allResults
        }

        override def patch[REQ_BODY: Encoder](
            objectName: String,
            objectId: String,
            body: REQ_BODY,
        ): EitherT[F, SalesforceClientError, Unit] = {
          val uri = Uri(new URI(s"${auth.instance_url}$sfObjectsBaseUrl$objectName/$objectId"))
          for {
            _ <- logQuery(s"$objectName $objectId PATCH with '$body'")
            _ <- sendAuthenticatedRequestWithOptionalBody[REQ_BODY, Unit](auth, Method.PATCH, uri, Some(body))
          } yield ()
        }

        private lazy val successStatusCodes = 200 to 299

        override def composite[PART_BODY: Encoder](
            body: SFApiCompositeRequest[PART_BODY],
        ): EitherT[F, SalesforceClientError, SFApiCompositeResponse] = {
          sendAuthenticatedRequestWithOptionalBody[SFApiCompositeRequest[PART_BODY], SFApiCompositeResponse](
            auth,
            Method.POST,
            Uri(new URI(auth.instance_url + compositeBaseUrl)),
            Some(body),
          ).flatMap(response =>
            // this is necessary because for some bizarre reason composite requests return a 200 even if the sub-requests fail
            // see https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/requests_composite.htm
            response.compositeResponse
              .filterNot(part => successStatusCodes.contains(part.httpStatusCode))
              .toNel
              .fold(
                response.asRight[SalesforceClientError], // no inner failures so return response
              )(failureCodes =>
                SalesforceClientError(
                  failureCodes
                    .map(part => s"${part.httpStatusCode} (${part.referenceId})")
                    .mkString_("Composite Failure Status Codes : ", ", ", ""),
                ).asLeft[SFApiCompositeResponse],
              )
              .toEitherT[F],
          )
        }
      }
    } yield client
  }