FBControlCore/Utility/FBDataConsumer.m (451 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #import "FBDataConsumer.h" #import "FBCollectionInformation.h" #import "FBControlCoreError.h" #import "FBControlCoreLogger.h" #import "FBDataBuffer.h" #import <stdatomic.h> @interface FBDataConsumerAdaptor () + (dispatch_data_t)adaptNSData:(NSData *)dispatchData; @end @interface FBDataConsumerAdaptor_ToNSData : NSObject <FBDispatchDataConsumer> @property (nonatomic, strong, readonly) id<FBDataConsumer> consumer; @end @implementation FBDataConsumerAdaptor_ToNSData #pragma mark Initializers - (instancetype)initWithConsumer:(id<FBDataConsumer>)consumer { self = [super init]; if (!self) { return nil; } _consumer = consumer; return self; } #pragma mark FBDataConsumer - (void)consumeData:(dispatch_data_t)dispatchData { NSData *data = [FBDataConsumerAdaptor adaptDispatchData:dispatchData]; [self.consumer consumeData:data]; } - (void)consumeEndOfFile { [self.consumer consumeEndOfFile]; } @end @interface FBDataConsumerAdaptor_ToDispatchData : NSObject <FBDataConsumer, FBDataConsumerLifecycle> @property (nonatomic, strong, readonly) id<FBDispatchDataConsumer, FBDataConsumerLifecycle> consumer; @end @implementation FBDataConsumerAdaptor_ToDispatchData #pragma mark Initializers - (instancetype)initWithConsumer:(id<FBDispatchDataConsumer, FBDataConsumerLifecycle>)consumer { self = [super init]; if (!self) { return nil; } _consumer = consumer; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { dispatch_data_t dispatchData = [FBDataConsumerAdaptor adaptNSData:data]; [self.consumer consumeData:dispatchData]; } - (void)consumeEndOfFile { [self.consumer consumeEndOfFile]; } - (FBFuture<NSNull *> *)finishedConsuming { return self.consumer.finishedConsuming; } @end @implementation FBDataConsumerAdaptor #pragma mark Initializers + (id<FBDispatchDataConsumer>)dispatchDataConsumerForDataConsumer:(id<FBDataConsumer>)consumer; { return [[FBDataConsumerAdaptor_ToNSData alloc] initWithConsumer:consumer]; } + (id<FBDataConsumer, FBDataConsumerLifecycle>)dataConsumerForDispatchDataConsumer:(id<FBDispatchDataConsumer, FBDataConsumerLifecycle>)consumer; { return [[FBDataConsumerAdaptor_ToDispatchData alloc] initWithConsumer:consumer]; } #pragma mark Public + (NSData *)adaptDispatchData:(dispatch_data_t)dispatchData { // One-way bridging of dispatch_data_t to NSData is permitted. // Since we can't safely assume all consumers of the NSData work discontiguous ranges, we have to make the dispatch_data contiguous. // This is done with dispatch_data_create_map, which is 0-copy for a contiguous range but copies for non-contiguous ranges. // https://twitter.com/catfish_man/status/393032222808100864 // https://developer.apple.com/library/archive/releasenotes/Foundation/RN-Foundation-older-but-post-10.8/ return (NSData *) dispatch_data_create_map(dispatchData, NULL, NULL); } #pragma mark Private + (dispatch_data_t)adaptNSData:(NSData *)data __attribute__((no_sanitize("nullability-arg"))) { // The safest possible way of adapting the NSData to dispatch_data_t is to ensure that buffer backing the dispatch_data_t data is: // 1) Immutable // 2) Is not freed until the dispatch_data_t is destroyed. // There are two ways of doing this: // 1) Copy the NSData, and retain it for the lifecycle of the dispatch_data_t. // 2) Use DISPATCH_DATA_DESTRUCTOR_DEFAULT which will copy the underlying buffer. // This uses #2 as it's preferable to let libdispatch do the management itself and avoids an object copy (NSData) as well as a potential buffer copy in `-[NSData copy]`. // It can be quite surprising how many methods result in the creation of NSMutableData, for example `-[NSString dataUsingEncoding:]` can result in NSConcreteMutableData. // By copying the buffer we are sure that the data in the dispatch wrapper is completely immutable. return dispatch_data_create( data.bytes, data.length, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), DISPATCH_DATA_DESTRUCTOR_DEFAULT ); } @end typedef void (^dataBlock)(NSData *); static inline dataBlock FBDataConsumerToStringConsumer (void(^consumer)(NSString *)) { return ^(NSData *data){ NSString *line = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; if (line == nil) { line = @"non-utf8"; } consumer(line); }; } @interface FBBlockDataConsumer_Dispatcher : NSObject <FBDataConsumer> @property (nonatomic, strong, nullable, readwrite) dispatch_queue_t queue; @property (nonatomic, strong, nullable, readwrite) dispatch_group_t group; @property (nonatomic, copy, nullable, readwrite) void (^consumer)(NSData *); @property _Atomic int64_t numPendingTasks; @end @implementation FBBlockDataConsumer_Dispatcher - (instancetype)initWithQueue:(dispatch_queue_t)queue consumer:(void (^)(NSData *))consumer { self = [super init]; if (!self) { return nil; } _queue = queue; _group = dispatch_group_create(); _consumer = consumer; atomic_init(&_numPendingTasks, 0); return self; } - (void)consumeData:(NSData *)data { void (^consumer)(NSData *) = nil; dispatch_queue_t queue; dispatch_group_t group; atomic_fetch_add(&_numPendingTasks, 1); @synchronized (self) { consumer = self.consumer; queue = self.queue; group = self.group; if (!consumer) { return; } if (queue) { dispatch_group_async(group, queue, ^{ consumer(data); atomic_fetch_sub(&self->_numPendingTasks, 1); }); } else { consumer(data); atomic_fetch_sub(&_numPendingTasks, 1); } } } - (void)consumeEndOfFile { dispatch_group_t group; @synchronized (self) { group = self.group; dispatch_group_wait(group, DISPATCH_TIME_FOREVER); self.group = nil; self.consumer = nil; self.queue = nil; } } @end @interface FBBlockDataConsumer () <FBDataConsumer, FBDataConsumerLifecycle> @property (nonatomic, strong, readonly) FBBlockDataConsumer_Dispatcher *dispatcher; @end @interface FBBlockDataConsumerAsync : NSObject <FBDataConsumer, FBDataConsumerLifecycle, FBDataConsumerAsync> @property (nonatomic, strong, readonly) FBBlockDataConsumer_Dispatcher *dispatcher; - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher; @end @interface FBBlockDataConsumer_Buffered : FBBlockDataConsumer @property (nonatomic, strong, readonly) id<FBConsumableBuffer> buffer; - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher terminal:(NSData *)terminal; @end @interface FBBlockDataConsumer_Unbuffered : FBBlockDataConsumer @property (nonatomic, strong, readonly) FBMutableFuture<NSNull *> *finishedConsumingFuture; @end @interface FBBlockDataConsumerAsync_Unbuffered : FBBlockDataConsumerAsync @property (nonatomic, strong, readonly) FBMutableFuture<NSNull *> *finishedConsumingFuture; @end @implementation FBBlockDataConsumer #pragma mark Initializers + (id<FBDataConsumer, FBDataConsumerLifecycle>)synchronousDataConsumerWithBlock:(void (^)(NSData *))consumer { FBBlockDataConsumer_Dispatcher *dispatcher = [[FBBlockDataConsumer_Dispatcher alloc] initWithQueue:nil consumer:consumer]; return [[FBBlockDataConsumer_Unbuffered alloc] initWithDispatcher:dispatcher]; } + (id<FBDataConsumer, FBDataConsumerLifecycle>)synchronousLineConsumerWithBlock:(void (^)(NSString *))consumer { FBBlockDataConsumer_Dispatcher *dispatcher = [[FBBlockDataConsumer_Dispatcher alloc] initWithQueue:nil consumer:FBDataConsumerToStringConsumer(consumer)]; return [[FBBlockDataConsumer_Buffered alloc] initWithDispatcher:dispatcher terminal:FBDataBuffer.newlineTerminal]; } + (id<FBDataConsumer, FBDataConsumerLifecycle, FBDataConsumerAsync>)asynchronousDataConsumerOnQueue:(dispatch_queue_t)queue consumer:(void (^)(NSData *))consumer { FBBlockDataConsumer_Dispatcher *dispatcher = [[FBBlockDataConsumer_Dispatcher alloc] initWithQueue:queue consumer:consumer]; return [[FBBlockDataConsumerAsync_Unbuffered alloc] initWithDispatcher:dispatcher]; } + (id<FBDataConsumer, FBDataConsumerLifecycle, FBDataConsumerAsync>)asynchronousDataConsumerWithBlock:(void (^)(NSData *))consumer { dispatch_queue_t queue = dispatch_queue_create("com.facebook.FBControlCore.BlockDataConsumer.data", DISPATCH_QUEUE_SERIAL); return [self asynchronousDataConsumerOnQueue:queue consumer:consumer]; } + (id<FBDataConsumer, FBDataConsumerLifecycle>)asynchronousLineConsumerWithBlock:(void (^)(NSString *))consumer { dispatch_queue_t queue = dispatch_queue_create("com.facebook.FBControlCore.BlockDataConsumer.lines", DISPATCH_QUEUE_SERIAL); FBBlockDataConsumer_Dispatcher *dispatcher = [[FBBlockDataConsumer_Dispatcher alloc] initWithQueue:queue consumer:FBDataConsumerToStringConsumer(consumer)]; return [[FBBlockDataConsumer_Buffered alloc] initWithDispatcher:dispatcher terminal:FBDataBuffer.newlineTerminal]; } + (id<FBDataConsumer, FBDataConsumerLifecycle>)asynchronousLineConsumerWithQueue:(dispatch_queue_t)queue consumer:(void (^)(NSString *))consumer { FBBlockDataConsumer_Dispatcher *dispatcher = [[FBBlockDataConsumer_Dispatcher alloc] initWithQueue:queue consumer:FBDataConsumerToStringConsumer(consumer)]; return [[FBBlockDataConsumer_Buffered alloc] initWithDispatcher:dispatcher terminal:FBDataBuffer.newlineTerminal]; } + (id<FBDataConsumer, FBDataConsumerLifecycle>)asynchronousLineConsumerWithQueue:(dispatch_queue_t)queue dataConsumer:(void (^)(NSData *))consumer { FBBlockDataConsumer_Dispatcher *dispatcher = [[FBBlockDataConsumer_Dispatcher alloc] initWithQueue:queue consumer:consumer]; return [[FBBlockDataConsumer_Buffered alloc] initWithDispatcher:dispatcher terminal:FBDataBuffer.newlineTerminal]; } - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher { self = [super init]; if (!self) { return nil; } _dispatcher = dispatcher; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); } - (void)consumeEndOfFile { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); return nil; } @end @implementation FBBlockDataConsumerAsync - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher { self = [super init]; if (!self) { return nil; } _dispatcher = dispatcher; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); } - (void)consumeEndOfFile { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); return nil; } #pragma mark FBDataConsumerAsync - (NSInteger)unprocessedDataCount { NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd)); return 0; } @end @implementation FBBlockDataConsumer_Buffered #pragma mark Initializers - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher terminal:(NSData *)terminal { self = [super initWithDispatcher:dispatcher]; if (!self) { return nil; } _buffer = [FBDataBuffer consumableBufferForwardingToConsumer:dispatcher onQueue:nil terminal:terminal]; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { @synchronized (self) { [self.buffer consumeData:data]; } } - (void)consumeEndOfFile { @synchronized (self) { [self.buffer consumeEndOfFile]; } } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { return self.buffer.finishedConsuming; } @end @implementation FBBlockDataConsumer_Unbuffered #pragma mark Initializers - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher { self = [super initWithDispatcher:dispatcher]; if (!self) { return nil; } _finishedConsumingFuture = FBMutableFuture.future; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { @synchronized (self) { [self.dispatcher consumeData:data]; } } - (void)consumeEndOfFile { @synchronized (self) { [self.dispatcher consumeEndOfFile]; [self.finishedConsumingFuture resolveWithResult:NSNull.null]; } } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { return self.finishedConsumingFuture; } @end @implementation FBBlockDataConsumerAsync_Unbuffered #pragma mark Initializers - (instancetype)initWithDispatcher:(FBBlockDataConsumer_Dispatcher *)dispatcher { self = [super initWithDispatcher:dispatcher]; if (!self) { return nil; } _finishedConsumingFuture = FBMutableFuture.future; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { @synchronized (self) { [self.dispatcher consumeData:data]; } } - (void)consumeEndOfFile { @synchronized (self) { [self.dispatcher consumeEndOfFile]; [self.finishedConsumingFuture resolveWithResult:NSNull.null]; } } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { return self.finishedConsumingFuture; } #pragma mark FBDataConsumerAsync - (NSInteger)unprocessedDataCount { // not synchronized on purpose // as processing data happens on a different thread // so we cannot be accurate anyway return self.dispatcher.numPendingTasks; } @end @implementation FBLoggingDataConsumer #pragma mark Initializers + (instancetype)consumerWithLogger:(id<FBControlCoreLogger>)logger { return [[self alloc] initWithLogger:logger]; } - (instancetype)initWithLogger:(id<FBControlCoreLogger>)logger { self = [super init]; if (!self) { return nil; } _logger = logger; return self; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { NSString *string = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; if (!string) { return; } string = [string stringByTrimmingCharactersInSet:NSCharacterSet.newlineCharacterSet]; if (string.length < 1) { return; } [self.logger log:string]; } - (void)consumeEndOfFile { } @end @interface FBCompositeDataConsumer () @property (nonatomic, copy, readonly) NSArray<id<FBDataConsumer>> *consumers; @property (nonatomic, strong, readonly) FBMutableFuture<NSNull *> *finishedConsumingFuture; @end @implementation FBCompositeDataConsumer #pragma mark Initializers + (instancetype)consumerWithConsumers:(NSArray<id<FBDataConsumer>> *)consumers { return [[self alloc] initWithConsumers:consumers]; } - (instancetype)initWithConsumers:(NSArray<id<FBDataConsumer>> *)consumers { self = [super init]; if (!self) { return nil; } _consumers = consumers; _finishedConsumingFuture = FBMutableFuture.future; return self; } #pragma mark NSObject - (NSString *)description { return [NSString stringWithFormat:@"Composite Consumer %@", [FBCollectionInformation oneLineDescriptionFromArray:self.consumers]]; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { for (id<FBDataConsumer> consumer in self.consumers) { [consumer consumeData:data]; } } - (void)consumeEndOfFile { for (id<FBDataConsumer> consumer in self.consumers) { [consumer consumeEndOfFile]; } [self.finishedConsumingFuture resolveWithResult:NSNull.null]; } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { return self.finishedConsumingFuture; } @end @implementation FBNullDataConsumer #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { } - (void)consumeEndOfFile { } @end