Sources/OSS/Internal/ClientImpl.swift (413 lines of code) (raw):
import Foundation
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif
struct InnerOptions {
let userAgent: String
let logger: LogAgent?
let urlsession: URLSession
let sessionOwner: Bool
}
struct PresignInnerResult {
var method: String?
var url: String?
var expiration: Date?
var signedHeaders: [String: String]?
}
class ClientImpl {
var executeStack: ExecuteStack
let options: ClientOptions
let innerOptions: InnerOptions
public init(
_ config: Configuration,
_ actions: [ClientOptionsAction] = []
) {
// apply config & options
let opts = Self.resolveConfig(config)
for action in actions {
action(opts)
}
let userAgent = Self.resolveUserAgent(config)
let (urlsession, sessionOwner) = Self.resolveURLSession(config)
let innerOpts = InnerOptions(
userAgent: userAgent,
logger: config.logger,
urlsession: urlsession,
sessionOwner: sessionOwner
)
// build execute stack
let stack = opts.executeMW != nil ?
ExecuteStack(handler: opts.executeMW!) :
ExecuteStack(session: innerOpts.urlsession, logger: innerOpts.logger)
stack.push(
create: { (next: ExecuteMiddleware) in
let retryHandler = opts.featureFlags.contains(.correctClockSkew) ? FixTimeRetryHandler() : nil
return RetryerMiddleware(
nextHandler: next,
retryer: opts.retryer,
logger: innerOpts.logger,
retryHandler: retryHandler
)
}, name: "Retryer"
)
stack.push(
create: { (next: ExecuteMiddleware) in
SignerMiddleware(
nextHandler: next,
signer: opts.signer,
provider: opts.credentialsProvider,
logger: innerOpts.logger
)
}, name: "Singer"
)
stack.push(
create: { (next: ExecuteMiddleware) in
ResponseCheckerMiddleware(nextHandler: next, logger: innerOpts.logger)
}, name: "Checker"
)
options = opts
innerOptions = innerOpts
executeStack = stack
}
func hasFeatureFlag(_ flag: FeatureFlag) -> Bool {
return options.featureFlags.contains(flag)
}
deinit {
if self.innerOptions.sessionOwner {
self.innerOptions.urlsession.finishTasksAndInvalidate()
}
}
static func resolveConfig(_ config: Configuration) -> ClientOptions {
let product = Defaults.product
let region = config.region ?? ""
let endpoint = resolveEndpoint(config)
let retryer = resolveRetryer(config)
let signer = resolveSigner(config)
let addressStyle = resolveAddressStyle(config, endpoint)
let featureFlags = resolveFeatureFlags(config)
let opts = ClientOptions(
product: product,
region: region,
endpoint: endpoint,
retryMaxAttempts: config.retryMaxAttempts,
retryer: retryer,
signer: signer,
credentialsProvider: config.credentialsProvider,
addressStyle: addressStyle,
authMethod: nil,
featureFlags: featureFlags
)
return opts
}
static func resolveEndpoint(_ config: Configuration) -> URL? {
var endpoint: String
if let _endpoint = config.endpoint,
!_endpoint.isEmpty
{
endpoint = _endpoint
} else {
guard let region = config.region else {
return nil
}
let endpointType: EndpointType
if config.useDualStackEndpoint ?? false {
endpointType = .dualstack
} else if config.useInternalEndpoint ?? false {
endpointType = .internal
} else if config.useAccelerateEndpoint ?? false {
endpointType = .accelerate
} else {
endpointType = .defautt
}
endpoint = OSSUtils.regionToEndpoint(region: region, type: endpointType)
}
if !endpoint.contains("://") {
endpoint = config.httpProtocal.rawValue + "://" + endpoint
}
return URL(string: endpoint)
}
static func resolveRetryer(_ config: Configuration) -> Retryer {
guard let retryer = config.retryer else {
return StandardRetryer(maxAttempt: config.retryMaxAttempts)
}
return retryer
}
static func resolveSigner(_ config: Configuration) -> Signer {
switch config.signerVersion {
case .v1:
SignerV1()
default:
SignerV4()
}
}
static func resolveURLSession(_ config: Configuration) -> (URLSession, Bool) {
let owner = true
let delegate = OSSURLSessionDelegate(enableTLSVerify: config.enableTLSVerify ?? false,
enableFollowRedirect: config.enableFollowRedirect ?? false)
let delegateQueue = OperationQueue()
var sessionConfig: URLSessionConfiguration
sessionConfig = .default
#if !(os(Linux) || os(Windows))
if config.enableBackgroundTransmitService {
let identifier = config.backgroundSesseionIdentifier ?? Defaults.backgroundSesseionIdentifier
sessionConfig = .background(withIdentifier: identifier)
}
#endif
sessionConfig.timeoutIntervalForRequest = config.timeoutIntervalForRequest
sessionConfig.timeoutIntervalForResource = config.timeoutIntervalForResource
sessionConfig.requestCachePolicy = .reloadIgnoringLocalCacheData
if let maximumConnectionsPerHost = config.maxConnectionsPerHost {
sessionConfig.httpMaximumConnectionsPerHost = maximumConnectionsPerHost
delegateQueue.maxConcurrentOperationCount = maximumConnectionsPerHost
}
return (URLSession(configuration: sessionConfig,
delegate: delegate,
delegateQueue: delegateQueue), owner)
}
static func resolveAddressStyle(_ config: Configuration, _ endpoint: URL?) -> AddressStyleType {
var style: AddressStyleType
if config.useCname ?? false {
style = .cname
} else if config.usePathStyle ?? false {
style = .path
} else {
style = .virtualHosted
}
if let endpoint = endpoint, let host = endpoint.host {
style = host.isIPAddress() ? .path : style
}
return style
}
static func resolveUserAgent(_ config: Configuration) -> String {
var ua = UserAgent.getDefault()
if let userAgent = config.userAgent {
ua = "\(ua) /\(userAgent)"
}
return ua
}
static func resolveFeatureFlags(_ config: Configuration) -> FeatureFlag {
var featureFlags = Defaults.featureFlags
if let enableUploadCRC64Validation = config.enableUploadCRC64Validation {
if enableUploadCRC64Validation {
featureFlags.insert(.enableCRC64CheckUpload)
} else {
featureFlags.remove(.enableCRC64CheckUpload)
}
}
if let enableDownloadCRC64Validation = config.enableDownloadCRC64Validation {
if enableDownloadCRC64Validation {
featureFlags.insert(.enableCRC64CheckDownload)
} else {
featureFlags.remove(.enableCRC64CheckDownload)
}
}
return featureFlags
}
static func verifyOperation(input: inout OperationInput) throws {
if let bucketName = input.bucket, try !bucketName.isValidBucketName() {
throw ClientError.bucketInvalidError(bucketName)
}
if let key = input.key, try !key.isValidObjectKey() {
throw ClientError.objectInvalidError(key)
}
}
public func execute(
with input: inout OperationInput,
args opOpts: OperationOptions? = nil
) async throws -> OperationOutput {
// verify input
try Self.verifyOperation(input: &input)
// build execute context
let (request, context) = try buildRequestContext(with: &input, opts: opOpts)
// execute and wait result
do {
let result = try await executeStack.execute(request, context)
return OperationOutput(
input: input,
statusCode: result.statusCode,
headers: result.headers,
body: result.content
)
} catch {
// wrap to operation error
throw error
}
}
public func presignInner(
with input: inout OperationInput,
args opOpts: OperationOptions? = nil
) async throws -> PresignInnerResult {
// verify input
try Self.verifyOperation(input: &input)
// build execute context
let (request, context) = try buildRequestContext(with: &input, opts: opOpts)
var result = PresignInnerResult(
method: request.method,
url: request.requestUri.absoluteString
)
guard let provider = options.credentialsProvider else {
return result
}
if provider is AnonymousCredentialsProvider {
return result
}
let credentials = try await provider.getCredentials()
if credentials.isEmpty() {
throw ClientError.credentialsEmptyError()
}
context.signingContext?.credentials = credentials
context.signingContext?.authHeader = false
let req = try await options.signer.sign(request: request, signingContext: &(context.signingContext!))
// update signable headers & check max expiration time
// signed headers
// content-type, content-md5, x-oss- and additionalHeaders in sign v4
let expiration = context.signingContext!.expirationTime!
var expect: [String] = ["content-type", "content-md5"]
if options.signer is SignerV4 {
context.signingContext?.additionalHeaderNames?.forEach { expect.append($0.lowercased()) }
let expires = Int(expiration.timeIntervalSince1970 - Date().timeIntervalSince1970)
if expires > 7 * 24 * 3600 {
throw ClientError.presignExpirationError(expiration: expiration)
}
}
var signedHeaders: [String: String] = [:]
for (k, v) in req.headers {
let lowkey = k.lowercased()
if expect.contains(lowkey) {
signedHeaders[lowkey] = v
}
}
result.url = req.requestUri.absoluteString
result.expiration = expiration
result.signedHeaders = signedHeaders
return result
}
func buildRequestContext(
with input: inout OperationInput,
opts: OperationOptions?
) throws -> (RequestMessage, ExecuteContext) {
// default api options
let retryMaxAttempts = opts?.retryMaxAttempts
let readWriteTimeout = opts?.readWriteTimeout
var responseHandlers: [ResponseHandler] = [ServerResponseHandler()]
// response delegate
if let handlers = input.metadata.values(key: AttributeKeys.responseHandler) {
handlers.forEach { value in responseHandlers.append(value) }
}
// signing context
var signingContext = SigningContext(
bucket: input.bucket,
key: input.key,
region: options.region,
product: options.product
)
// signing time from user
if let expirationTime = input.metadata.get(key: AttributeKeys.expirationTime) {
signingContext.expirationTime = expirationTime
}
// request
// request::host & path & query
guard let endpoint = options.endpoint else {
throw ClientError.endpointInvalidError()
}
let baseUrl = input.buildHostPath(
host: endpoint.hostPort(),
addressStyle: options.addressStyle
)
var url = "\(endpoint.scheme!)://\(baseUrl)"
let query = input.queryString()
if !query.isEmpty {
url = "\(url)?\(query)"
}
guard let uri = URL(string: url) else {
throw ClientError.requestError(detail: "Build url fail. url: \(url)")
}
// request::headers
var headers: [String: String] = [:]
input.headers.forEach { key, value in headers[key.lowercased()] = value }
headers["user-agent"] = innerOptions.userAgent
// request::body
let request = RequestMessage(
method: input.method,
requestUri: uri,
headers: headers,
content: input.body
)
let context = ExecuteContext(
retryMaxAttempts: retryMaxAttempts,
readWriteTimeout: readWriteTimeout,
signingContext: signingContext,
progressDelegate: input.metadata.get(key: AttributeKeys.progressDelegate),
responseHandlers: responseHandlers,
saveToURL: opts?.saveToURL
)
return (request, context)
}
}
extension OperationInput {
func buildPath(pathStyle: AddressStyleType) -> String? {
var path: String?
if let bucketName = bucket {
if pathStyle == .path {
path = "/".appending(bucketName)
}
}
if let objectKey = key?.urlEncodeWithoutSeparator() {
path = (path ?? "").appending("/").appending(objectKey)
}
return path
}
func buildHost(endpoint: String, pathStyle: AddressStyleType) throws -> String {
guard let temComs = URLComponents(string: endpoint),
var host = temComs.host
else {
throw ClientError.requestError(detail: "Endpoint format error.")
}
if let bucketName = bucket {
if pathStyle != .cname {
host = bucketName.appending(".").appending(host)
}
}
return host
}
func queryItems() -> [URLQueryItem]? {
var queryItems: [URLQueryItem]?
if parameters.count > 0 {
queryItems = parameters.compactMap {
if let name = $0.urlEncode() {
let value = $1?.urlEncode()
return URLQueryItem(name: name, value: value)
} else {
return nil
}
}
}
return queryItems
}
func queryString() -> String {
if parameters.count == 0 {
return ""
}
return parameters.map { key, value in
if let name = key.urlEncode() {
let value = value?.urlEncode()
return "\(name)=\(value ?? "")"
} else {
return ""
}
}.joined(separator: "&")
}
func buildHostPath(host: String, addressStyle: AddressStyleType) -> String {
var paths: [String] = []
var baseUrl = host
if let bucket = bucket {
switch addressStyle {
case .path:
paths.append(bucket)
if key == nil {
paths.append("")
}
case .cname:
break
default: // virtual host
baseUrl = "\(bucket).\(host)"
}
}
if let encodeKey = key?.urlEncodePath() {
paths.append(encodeKey)
}
return "\(baseUrl)/\(paths.joined(separator: "/"))"
}
}
extension URL {
func hostPort() -> String {
guard var str = host else {
return ""
}
if let port = port {
str += ":\(port)"
}
return str
}
}