FBControlCore/Utility/FBDataBuffer.m (338 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #import "FBDataBuffer.h" #import "FBControlCoreError.h" @interface FBDataBuffer_Accumilating : NSObject <FBDataConsumer, FBAccumulatingBuffer> @property (nonatomic, strong, readwrite) NSMutableData *buffer; @property (nonatomic, assign, readonly) size_t capacity; @property (nonatomic, strong, readonly) FBMutableFuture<NSNull *> *finishedConsumingFuture; @end @implementation FBDataBuffer_Accumilating #pragma mark Initializers - (instancetype)init { return [self initWithBackingBuffer:NSMutableData.new capacity:0]; } - (instancetype)initWithBackingBuffer:(NSMutableData *)buffer capacity:(size_t)capacity { self = [super init]; if (!self) { return nil; } _buffer = buffer; _capacity = capacity; _finishedConsumingFuture = FBMutableFuture.future; return self; } #pragma mark NSObject - (NSString *)description { @synchronized (self) { return [NSString stringWithFormat:@"Accumilating Buffer %lu Bytes", self.data.length]; } } #pragma mark FBAccumilatingLineBuffer - (NSData *)data { @synchronized (self) { return [self.buffer copy]; } } - (NSArray<NSString *> *)lines { NSString *output = [[NSString alloc] initWithData:self.data encoding:NSUTF8StringEncoding]; return [output componentsSeparatedByCharactersInSet:NSCharacterSet.newlineCharacterSet]; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { @synchronized (self) { if (self.finishedConsuming.hasCompleted) { return; } [self.buffer appendData:data]; if (self.capacity > 0) { NSInteger overrun = (NSInteger) self.buffer.length - (NSInteger) self.capacity; if (overrun > 0) { [self.buffer replaceBytesInRange:NSMakeRange(0, (NSUInteger) overrun) withBytes:NULL length:0]; } } } } - (void)consumeEndOfFile { @synchronized (self) { if (self.finishedConsuming.hasCompleted) { return; } [self.finishedConsumingFuture resolveWithResult:NSNull.null]; } } #pragma mark FBDataConsumerLifecycle - (FBFuture<NSNull *> *)finishedConsuming { return self.finishedConsumingFuture; } @end @protocol FBDataBuffer_Forwarder <NSObject> - (void)run:(id<FBConsumableBuffer>)buffer; @property (nonatomic, strong, readonly) id<FBDataConsumer> consumer; @end @interface FBDataBuffer_Terminal_Forwarder : NSObject <FBDataBuffer_Forwarder> @property (nonatomic, copy, readonly) NSData *terminal; @property (nonatomic, strong, nullable, readonly) dispatch_queue_t queue; @end @implementation FBDataBuffer_Terminal_Forwarder @synthesize consumer = _consumer; - (instancetype)initWithTerminal:(NSData *)terminal consumer:(id<FBDataConsumer>)consumer queue:(dispatch_queue_t)queue { self = [super init]; if (!self) { return nil; } _terminal = terminal; _consumer = consumer; _queue = queue; return self; } - (void)run:(id<FBConsumableBuffer>)buffer { NSData *partial = [buffer consumeUntil:self.terminal]; dispatch_queue_t queue = self.queue; id<FBDataConsumer> consumer = self.consumer; while (partial) { if (queue) { dispatch_async(queue, ^{ [consumer consumeData:partial]; }); } else { [consumer consumeData:partial]; } partial = [buffer consumeUntil:self.terminal]; } } @end @interface FBDataBuffer_Header_Forwarder : NSObject <FBDataBuffer_Forwarder> @property (nonatomic, assign, readonly) NSUInteger headerLength; @property (nonatomic, strong, readonly) NSUInteger(^derivedLength)(NSData *); @property (nonatomic, strong, readonly) dispatch_queue_t queue; @property (nonatomic, copy, nullable, readwrite) NSNumber *knownderivedLength; @end @implementation FBDataBuffer_Header_Forwarder @synthesize consumer = _consumer; - (instancetype)initWithHeaderLength:(NSUInteger)headerLength derivedLength:(NSUInteger(^)(NSData *))derivedLength consumer:(id<FBDataConsumer>)consumer queue:(dispatch_queue_t)queue { self = [super init]; if (!self) { return nil; } _headerLength = headerLength; _derivedLength = derivedLength; _consumer = consumer; _queue = queue; return self; } - (void)run:(id<FBConsumableBuffer>)buffer { if (!self.knownderivedLength) { NSData *header = [buffer consumeLength:self.headerLength]; if (!header) { return; } self.knownderivedLength = @(self.derivedLength(header)); } NSData *data = [buffer consumeLength:self.knownderivedLength.unsignedIntegerValue]; dispatch_queue_t queue = self.queue; id<FBDataConsumer> consumer = self.consumer; if (data) { if (queue) { dispatch_async(queue, ^{ [consumer consumeData:data]; }); } else { [consumer consumeData:data]; } } } @end @interface FBDataBuffer_Consumable : FBDataBuffer_Accumilating <FBConsumableBuffer, FBNotifyingBuffer> @property (nonatomic, strong, nullable, readwrite) id<FBDataBuffer_Forwarder> forwarder; @end @implementation FBDataBuffer_Consumable #pragma mark Initializers - (instancetype)initWithForwarder:(FBDataBuffer_Terminal_Forwarder *)forwarder; { self = [super init]; if (!self) { return nil; } _forwarder = forwarder; return self; } #pragma mark NSObject - (NSString *)description { @synchronized (self) { return [NSString stringWithFormat:@"Consumable Buffer %lu Bytes", self.data.length]; } } #pragma mark FBConsumableBuffer - (nullable NSData *)consumeCurrentData { @synchronized (self) { NSData *data = self.data; self.buffer.data = NSData.data; return data; } } - (nullable NSString *)consumeCurrentString { NSData *data = [self consumeCurrentData]; return [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; } - (nullable NSData *)consumeLength:(NSUInteger)length { @synchronized (self) { if (length > self.buffer.length) { return nil; } NSRange range = NSMakeRange(0, length); NSData *data = [self.buffer subdataWithRange:range]; if (!data) { return nil; } [self.buffer replaceBytesInRange:range withBytes:"" length:0]; return data; } } - (nullable NSData *)consumeUntil:(NSData *)terminal { @synchronized (self) { if (self.buffer.length == 0) { return nil; } NSRange terminalRange = [self.buffer rangeOfData:terminal options:0 range:NSMakeRange(0, self.buffer.length)]; if (terminalRange.location == NSNotFound) { return nil; } NSData *data = [self.buffer subdataWithRange:NSMakeRange(0, terminalRange.location)]; [self.buffer replaceBytesInRange:NSMakeRange(0, terminalRange.location + terminal.length) withBytes:"" length:0]; return data; } } - (nullable NSData *)consumeLineData { return [self consumeUntil:FBDataBuffer.newlineTerminal]; } - (nullable NSString *)consumeLineString { NSData *lineData = self.consumeLineData; if (!lineData) { return nil; } return [[NSString alloc] initWithData:lineData encoding:NSUTF8StringEncoding]; } - (BOOL)consume:(id<FBDataConsumer>)consumer onQueue:(dispatch_queue_t)queue untilTerminal:(NSData *)terminal error:(NSError **)error { id<FBDataBuffer_Forwarder> forwarder = [[FBDataBuffer_Terminal_Forwarder alloc] initWithTerminal:terminal consumer:consumer queue:queue]; return [self attachForwardingConsumer:forwarder error:error]; } - (FBFuture<NSData *> *)consumeAndNotifyWhen:(NSData *)terminal { FBMutableFuture<NSData *> *future = FBMutableFuture.future; id<FBDataConsumer> consumer = [FBBlockDataConsumer synchronousDataConsumerWithBlock:^(NSData *data) { [self removeForwardingConsumer]; [future resolveWithResult:data]; }]; NSError *error = nil; if (![self consume:consumer untilTerminal:terminal error:&error]) { return [FBFuture futureWithError:error]; } return future; } - (FBFuture<NSData *> *)consumeHeaderLength:(NSUInteger)headerLength derivedLength:(NSUInteger(^)(NSData *))derivedLength { FBMutableFuture<NSData *> *future = FBMutableFuture.future; id<FBDataConsumer> consumer = [FBBlockDataConsumer synchronousDataConsumerWithBlock:^(NSData *data) { [self removeForwardingConsumer]; [future resolveWithResult:data]; }]; id<FBDataBuffer_Forwarder> forwarder = [[FBDataBuffer_Header_Forwarder alloc] initWithHeaderLength:headerLength derivedLength:derivedLength consumer:consumer queue:nil]; NSError *error = nil; if (![self attachForwardingConsumer:forwarder error:&error]) { return [FBFuture futureWithError:error]; } return future; } #pragma mark FBDataConsumer - (void)consumeData:(NSData *)data { [super consumeData:data]; @synchronized (self) { [self.forwarder run:self]; } } #pragma mark Private - (BOOL)attachForwardingConsumer:(id<FBDataBuffer_Forwarder>)forwarder error:(NSError **)error { @synchronized (self) { if (self.forwarder) { return [[FBControlCoreError describe:@"Cannot listen for the two terminals at the same time"] failBool:error]; } self.forwarder = forwarder; [self.forwarder run:self]; } return YES; } - (nullable id<FBDataConsumer>)removeForwardingConsumer { id<FBDataBuffer_Forwarder> forwarder = self.forwarder; self.forwarder = nil; return forwarder.consumer; } - (BOOL)consume:(id<FBDataConsumer>)consumer untilTerminal:(NSData *)terminal error:(NSError **)error { return [self consume:consumer onQueue:nil untilTerminal:terminal error:error]; } @end @implementation FBDataBuffer #pragma mark Initializers + (id<FBAccumulatingBuffer>)accumulatingBuffer { return [FBDataBuffer_Accumilating new]; } + (id<FBAccumulatingBuffer>)accumulatingBufferWithCapacity:(size_t)capacity { NSParameterAssert(capacity > 0); return [[FBDataBuffer_Accumilating alloc] initWithBackingBuffer:NSMutableData.data capacity:capacity]; } + (id<FBAccumulatingBuffer>)accumulatingBufferForMutableData:(NSMutableData *)data { return [[FBDataBuffer_Accumilating alloc] initWithBackingBuffer:data capacity:0]; } + (id<FBConsumableBuffer>)consumableBuffer { return [self consumableBufferForwardingToConsumer:nil onQueue:nil terminal:nil]; } + (id<FBNotifyingBuffer>)notifyingBuffer { return [self consumableBufferForwardingToConsumer:nil onQueue:nil terminal:nil]; } + (id<FBNotifyingBuffer>)consumableBufferForwardingToConsumer:(id<FBDataConsumer>)consumer onQueue:(nullable dispatch_queue_t)queue terminal:(NSData *)terminal { FBDataBuffer_Terminal_Forwarder *forwarder = nil; if (consumer) { forwarder = [[FBDataBuffer_Terminal_Forwarder alloc] initWithTerminal:terminal consumer:consumer queue:queue]; } return [[FBDataBuffer_Consumable alloc] initWithForwarder:forwarder]; } + (NSData *)newlineTerminal { static dispatch_once_t onceToken; static NSData *data = nil; dispatch_once(&onceToken, ^{ data = [NSData dataWithBytes:"\n" length:1]; }); return data; } @end