FBControlCore/Utility/FBProcessStream.m (990 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#import "FBProcessStream.h"
#import <sys/types.h>
#import <sys/stat.h>
#import "FBControlCoreError.h"
#import "FBDataBuffer.h"
#import "FBFileReader.h"
#import "FBFileWriter.h"
#import "FBProcess.h"
#import "FBProcessBuilder.h"
#import "FBFuture+Sync.h"
static NSTimeInterval ProcessDetachDrainTimeout = 4;
#pragma mark FBProcessStreamAttachment
@implementation FBProcessStreamAttachment
- (instancetype)initWithFileDescriptor:(int)fileDescriptor closeOnEndOfFile:(BOOL)closeOnEndOfFile mode:(FBProcessStreamAttachmentMode)mode
{
self = [super init];
if (!self) {
return nil;
}
_fileDescriptor = fileDescriptor;
_closeOnEndOfFile = closeOnEndOfFile;
_mode = mode;
return self;
}
@end
#pragma mark FBProcessFileOutput
@interface FBProcessFileOutput_DirectToFile : NSObject <FBProcessFileOutput>
@end
@interface FBProcessFileOutput_Consumer : NSObject <FBProcessFileOutput>
@property (nonatomic, strong, readonly) id<FBDataConsumer> consumer;
@property (nonatomic, strong, nullable, readwrite) FBProcess<NSNull *, id<FBDataConsumer>, NSNull *> *task;
@property (nonatomic, strong, readonly) dispatch_queue_t queue;
@end
@interface FBProcessFileOutput_Reader : NSObject <FBProcessFileOutput>
@property (nonatomic, strong, readonly) FBProcessOutput *output;
@property (nonatomic, strong, nullable, readwrite) id<FBDataConsumer> writer;
@property (nonatomic, strong, nullable, readwrite) id<FBProcessFileOutput> nested;
@property (nonatomic, strong, readonly) dispatch_queue_t queue;
@end
@implementation FBProcessFileOutput_DirectToFile
@synthesize filePath = _filePath;
#pragma mark Initializers
- (instancetype)initWithFilePath:(NSString *)filePath
{
self = [super init];
if (!self) {
return nil;
}
_filePath = filePath;
return self;
}
#pragma mark FBProcessFileOutput
- (FBFuture<NSNull *> *)startReading
{
return [[FBFuture
futureWithResult:NSNull.null]
nameFormat:@"Start reading %@", self.description];
}
- (FBFuture<NSNull *> *)stopReading
{
return [[FBFuture
futureWithResult:NSNull.null]
nameFormat:@"Stop reading %@", self.description];
}
#pragma mark NSObject
- (NSString *)description
{
return [NSString stringWithFormat:@"File output to %@", self.filePath];
}
@end
@implementation FBProcessFileOutput_Consumer
@synthesize filePath = _filePath;
#pragma mark Initializers
- (instancetype)initWithConsumer:(id<FBDataConsumer>)consumer filePath:(NSString *)filePath queue:(dispatch_queue_t)queue
{
self = [super init];
if (!self) {
return nil;
}
_consumer = consumer;
_filePath = filePath;
_queue = queue;
return self;
}
#pragma mark FBProcessFileOutput
- (FBFuture<NSNull *> *)startReading
{
return [[FBFuture
onQueue:self.queue resolve:^ FBFuture<FBProcess<NSNull *, id<FBDataConsumer>, NSNull *> *> *{
if (self.task) {
return [[FBControlCoreError
describeFormat:@"Cannot start reading, already reading"]
failFuture];
}
return [[[[FBProcessBuilder
withLaunchPath:@"/bin/cat" arguments:@[self.filePath]]
withStdOutConsumer:self.consumer]
withStdErrToDevNull]
start];
}]
onQueue:self.queue map:^(FBProcess<NSNull *, id<FBDataConsumer>, NSNull *> *task) {
self.task = task;
return NSNull.null;
}];
}
- (FBFuture<NSNull *> *)stopReading
{
return [[FBFuture
onQueue:self.queue resolve:^ FBFuture<NSNumber *> *{
FBProcess<NSNull *, id<FBDataConsumer>, NSNull *> *task = self.task;
self.task = nil;
if (!task) {
return [[FBControlCoreError
describeFormat:@"Cannot stop reading, not reading"]
failFuture];
}
return [task sendSignal:SIGTERM];
}]
mapReplace:NSNull.null];
}
#pragma mark NSObject
- (NSString *)description
{
return [NSString stringWithFormat:@"Consumer output to %@", self.filePath];
}
@end
@implementation FBProcessFileOutput_Reader
@synthesize filePath = _filePath;
#pragma mark Initializers
- (instancetype)initWithOutput:(FBProcessOutput *)output filePath:(NSString *)filePath queue:(dispatch_queue_t)queue
{
self = [super init];
if (!self) {
return nil;
}
_output = output;
_filePath = filePath;
_queue = queue;
return self;
}
#pragma mark FBProcessFileOutput
- (FBFuture<NSNull *> *)startReading
{
return [[[[[FBFuture
onQueue:self.queue resolve:^ FBFuture<FBProcessStreamAttachment *> * {
if (self.writer || self.nested) {
return [[FBControlCoreError
describe:@"Cannot call startReading twice"]
failFuture];
}
return [self.output attach];
}]
onQueue:self.queue map:^ id<FBDataConsumer> (FBProcessStreamAttachment *attachment) {
return [FBFileWriter syncWriterWithFileDescriptor:attachment.fileDescriptor closeOnEndOfFile:attachment.closeOnEndOfFile];
}]
onQueue:self.queue fmap:^ FBFuture<id<FBProcessFileOutput>> * (id<FBDataConsumer> writer) {
self.writer = writer;
id<FBProcessFileOutput> consumer = [[FBProcessFileOutput_Consumer alloc] initWithConsumer:writer filePath:self.filePath queue:self.queue];
return [[consumer startReading] mapReplace:consumer];
}]
onQueue:self.queue map:^ NSNull * (id<FBProcessFileOutput> nested) {
self.nested = nested;
return NSNull.null;
}]
nameFormat:@"Start Reading %@", self.description];
}
- (FBFuture<NSNull *> *)stopReading
{
return [[[FBFuture
onQueue:self.queue resolve:^ FBFuture<NSNull *> * {
if (!self.writer || !self.nested) {
return [[FBControlCoreError
describeFormat:@"No active reader for fifo"]
failFuture];
}
return [self.nested stopReading];
}]
onQueue:self.queue map:^(id _) {
self.nested = nil;
return NSNull.null;
}]
nameFormat:@"Stop Reading %@", self.description];
}
#pragma mark NSObject
- (NSString *)description
{
return [NSString stringWithFormat:@"Output of %@ to file handle", self.filePath];
}
@end
#pragma mark - FBProcessOutput
@interface FBProcessOutput ()
@property (nonatomic, strong, readonly) dispatch_queue_t workQueue;
@end
@interface FBProcessOutput_Null : FBProcessOutput
@end
@interface FBProcessOutput_FilePath : FBProcessOutput
@property (nonatomic, copy, readonly) NSString *filePath;
@property (nonatomic, assign, readwrite) int fileDescriptor;
- (instancetype)initWithFilePath:(NSString *)filePath;
@end
@interface FBProcessOutput_Pipe : FBProcessOutput
@property (nonatomic, assign, readwrite) int readEnd;
@property (nonatomic, assign, readwrite) int writeEnd;
@end
@class NSInputStream_FBProcessOutput;
@interface FBProcessOutput_InputStream : FBProcessOutput_Pipe
@property (nonatomic, strong, readonly) FBMutableFuture<NSNumber *> *readFuture;
@property (nonatomic, strong, readonly) NSInputStream_FBProcessOutput *stream;
@end
@interface NSInputStream_FBProcessOutput : NSInputStream
@property (nonatomic, strong, readonly) FBFuture<NSNumber *> *readFuture;
@property (nonatomic, assign, readwrite) int fileDescriptor;
- (instancetype)initWithReadFuture:(FBFuture<NSNumber *> *)readFuture;
@end
@interface FBProcessOutput_Consumer : FBProcessOutput_Pipe
@property (nonatomic, strong, readwrite) id<FBDataConsumer> consumer;
@property (nonatomic, strong, nullable, readwrite) FBFileReader *reader;
@property (nonatomic, strong, nullable, readwrite) id<FBControlCoreLogger> logger;
- (instancetype)initWithConsumer:(id<FBDataConsumer>)consumer logger:(nullable id<FBControlCoreLogger>)logger;
@end
@interface FBProcessOutput_Logger : FBProcessOutput_Consumer
- (instancetype)initWithLogger:(id<FBControlCoreLogger>)logger;
@end
@interface FBProcessOutput_Data : FBProcessOutput_Consumer
@property (nonatomic, strong, readonly) id<FBAccumulatingBuffer> dataConsumer;
- (instancetype)initWithMutableData:(NSMutableData *)mutableData;
@end
@interface FBProcessOutput_String : FBProcessOutput_Data
@end
@implementation FBProcessOutput
#pragma mark Initializers
+ (dispatch_queue_t)createWorkQueue
{
return dispatch_queue_create("com.facebook.fbcontrolcore.process_stream", DISPATCH_QUEUE_SERIAL);
}
+ (FBProcessOutput<NSNull *> *)outputForNullDevice
{
return [[FBProcessOutput_Null alloc] init];
}
+ (FBProcessOutput<NSString *> *)outputForFilePath:(NSString *)filePath
{
return [[FBProcessOutput_FilePath alloc] initWithFilePath:filePath];
}
+ (FBProcessOutput<NSInputStream *> *)outputToInputStream
{
return [[FBProcessOutput_InputStream alloc] init];
}
+ (FBProcessOutput<id<FBDataConsumer>> *)outputForDataConsumer:(id<FBDataConsumer>)dataConsumer logger:(id<FBControlCoreLogger>)logger
{
return [[FBProcessOutput_Consumer alloc] initWithConsumer:dataConsumer logger:logger];
}
+ (FBProcessOutput<id<FBDataConsumer>> *)outputForDataConsumer:(id<FBDataConsumer>)dataConsumer
{
return [[FBProcessOutput_Consumer alloc] initWithConsumer:dataConsumer logger:nil];
}
+ (FBProcessOutput<id<FBControlCoreLogger>> *)outputForLogger:(id<FBControlCoreLogger>)logger
{
return [[FBProcessOutput_Logger alloc] initWithLogger:logger];
}
+ (FBProcessOutput<NSMutableData *> *)outputToMutableData:(NSMutableData *)data
{
return [[FBProcessOutput_Data alloc] initWithMutableData:data];
}
+ (FBProcessOutput<NSString *> *)outputToStringBackedByMutableData:(NSMutableData *)data
{
return [[FBProcessOutput_String alloc] initWithMutableData:data];
}
- (instancetype)init
{
return [self initWithWorkQueue:FBProcessOutput.createWorkQueue];
}
- (instancetype)initWithWorkQueue:(dispatch_queue_t)workQueue
{
self = [super init];
if (!self) {
return nil;
}
_workQueue = workQueue;
return self;
}
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd));
return nil;
}
- (FBFuture<NSNull *> *)detach
{
NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd));
return nil;
}
- (id)contents
{
NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd));
return nil;
}
#pragma mark FBProcessOutput implementation
- (FBFuture<id<FBProcessFileOutput>> *)providedThroughFile
{
return [[self
makeFifoOutput]
onQueue:self.workQueue map:^(NSString *fifoPath) {
return [[FBProcessFileOutput_Reader alloc] initWithOutput:self filePath:fifoPath queue:FBProcessOutput.createWorkQueue];
}];
}
- (FBFuture<id<FBDataConsumer>> *)providedThroughConsumer
{
NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd));
return nil;
}
#pragma mark Private
- (FBFuture<NSString *> *)makeFifoOutput
{
NSString *fifoPath = [NSTemporaryDirectory() stringByAppendingPathComponent:NSUUID.UUID.UUIDString];
if (mkfifo(fifoPath.UTF8String, S_IWUSR | S_IRUSR) != 0) {
return [[[FBControlCoreError
describeFormat:@"Failed to create a named pipe for fifo %@ with error '%s'", fifoPath, strerror(errno)]
inDomain:NSPOSIXErrorDomain]
failFuture];
}
return [FBFuture futureWithResult:fifoPath];
}
@end
@implementation FBProcessOutput_Null
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [FBFuture futureWithResult:[[FBProcessStreamAttachment alloc] initWithFileDescriptor:-1 closeOnEndOfFile:NO mode:FBProcessStreamAttachmentModeOutput]];
}
- (FBFuture<NSNull *> *)detach
{
return FBFuture.empty;
}
- (NSNull *)contents
{
return NSNull.null;
}
#pragma mark FBProcessOutput Implementation
- (FBFuture<id<FBProcessFileOutput>> *)providedThroughFile
{
return [FBFuture futureWithResult:[[FBProcessFileOutput_DirectToFile alloc] initWithFilePath:@"/dev/null"]];
}
- (FBFuture<id<FBDataConsumer>> *)providedThroughConsumer
{
return [FBFuture futureWithResult:FBNullDataConsumer.new];
}
#pragma mark NSObject
- (NSString *)description
{
return @"Null Output";
}
@end
@implementation FBProcessOutput_Pipe
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [FBFuture
onQueue:self.workQueue resolve:^{
if (self.readEnd != 0 || self.writeEnd != 0) {
return [[FBControlCoreError
describeFormat:@"Cannot attach when already attached to %d:%d", self.readEnd, self.writeEnd]
failFuture];
}
int fileDescriptors[2] = {0, 0};
if (pipe(fileDescriptors) != 0) {
return [[FBControlCoreError
describeFormat:@"Failed to create a pipe: %s", strerror(errno)]
failFuture];
}
self.readEnd = fileDescriptors[0];
self.writeEnd = fileDescriptors[1];
// Pass out the write end in the attachment, for the caller to write to.
return [FBFuture futureWithResult:[[FBProcessStreamAttachment alloc] initWithFileDescriptor:fileDescriptors[1] closeOnEndOfFile:YES mode:FBProcessStreamAttachmentModeOutput]];
}];
}
- (FBFuture<NSNull *> *)detach
{
return [[self
closeWriteEndOfPipe]
nameFormat:@"Detach %@", self];
}
#pragma mark Private
- (FBFuture<NSNull *> *)closeWriteEndOfPipe
{
return [[FBFuture
onQueue:self.workQueue resolve:^ FBFuture<NSNull *> * {
if (!self.writeEnd) {
return [[FBControlCoreError
describe:@"Cannot detach when not attached"]
failFuture];
}
// Close the write end, but leave the read end open.
// This is needed as there may be read operations pending on the file descriptor.
// Any readers are themselves responsible for closing when they've read to the end of the file.
close(self.writeEnd);
self.writeEnd = 0;
self.readEnd = 0;
return FBFuture.empty;
}]
nameFormat:@"Detach %@", self.description];
}
@end
@implementation FBProcessOutput_InputStream
- (instancetype)init
{
self = [super init];
if (!self) {
return nil;
}
_readFuture = FBMutableFuture.future;
_stream = [[NSInputStream_FBProcessOutput alloc] initWithReadFuture:_readFuture];
return self;
}
#pragma mark FBStandardStream
- (NSInputStream *)contents
{
return self.stream;
}
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[super
attach]
onQueue:self.workQueue map:^(FBProcessStreamAttachment *result) {
[self.readFuture resolveWithResult:@(self.readEnd)];
return result;
}];
}
- (FBFuture<NSNull *> *)detach
{
return [[self
closeWriteEndOfPipe]
nameFormat:@"Detach %@", self];
}
@end
@implementation NSInputStream_FBProcessOutput
- (instancetype)initWithReadFuture:(FBFuture<NSNumber *> *)readFuture
{
self = [super init];
if (!self) {
return nil;
}
_fileDescriptor = 0;
_readFuture = readFuture;
return self;
}
#pragma mark NSInputStream
- (void)open
{
NSNumber *fileDescriptor = [self.readFuture block:nil];
self.fileDescriptor = fileDescriptor.intValue;
}
- (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)len
{
int fileDescriptor = self.fileDescriptor;
if (fileDescriptor == 0) {
return -1;
}
NSInteger readBytes = read(fileDescriptor, buffer, len);
if (readBytes < 1) {
[self close];
}
return readBytes;
}
- (void)close
{
if (self.fileDescriptor) {
close(self.fileDescriptor);
self.fileDescriptor = 0;
}
}
@end
@implementation FBProcessOutput_Consumer
#pragma mark Initializers
- (instancetype)initWithConsumer:(id<FBDataConsumer>)consumer logger:(nullable id<FBControlCoreLogger>)logger
{
self = [super init];
if (!self) {
return nil;
}
_consumer = consumer;
_logger = logger;
return self;
}
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[super
attach]
onQueue:self.workQueue fmap:^(FBProcessStreamAttachment *attachment) {
if (self.reader) {
return [[FBControlCoreError
describeFormat:@"Cannot attach to %@ twice", self]
failFuture];
}
// FBFileReader consumes the read end, the write end is passed out in the attachment.
// The super's attach creates the pipe and the detach closes the write end.
id<FBDataConsumer> consumer = self.consumer;
id<FBControlCoreLogger> logger = self.logger;
if (logger) {
consumer = [FBCompositeDataConsumer consumerWithConsumers:@[
consumer,
[FBLoggingDataConsumer consumerWithLogger:logger],
]];
}
// FBProcessOuput consumes the read end, the write end is passed out in the attachment.
self.reader = [FBFileReader readerWithFileDescriptor:self.readEnd closeOnEndOfFile:YES consumer:consumer logger:self.logger];
return [[[self.reader
startReading]
mapReplace:attachment]
nameFormat:@"Attach to pipe %@", self.description];
}];
}
- (FBFuture<NSNull *> *)detach
{
return [[[[super
detach]
onQueue:self.workQueue fmap:^ FBFuture<NSNumber *> * (id _){
FBFileReader *reader = self.reader;
if (!reader) {
return [[FBControlCoreError
describeFormat:@"Cannot detach from %@, no active reader", self]
failFuture];
}
// Since detach may be called before the reader has finished reading asynchronously,
// we should attempt to wait for this to happen naturally and then use the backoff API.
return [reader finishedReadingWithTimeout:ProcessDetachDrainTimeout];
}]
onQueue:self.workQueue chain:^(FBFuture *future) {
self.reader = nil;
return future;
}]
nameFormat:@"Detach %@", self.description];
}
- (id<FBDataConsumer>)contents
{
return self.consumer;
}
#pragma mark FBProcessOutput Implementation
- (FBFuture<id<FBProcessFileOutput>> *)providedThroughFile
{
return [[[self
makeFifoOutput]
onQueue:self.workQueue map:^ id<FBProcessFileOutput> (NSString *fifoPath) {
return [[FBProcessFileOutput_Consumer alloc] initWithConsumer:self.consumer filePath:fifoPath queue:FBProcessOutput.createWorkQueue];
}]
nameFormat:@"Relay %@ to file", self.description];
}
- (FBFuture<id<FBDataConsumer>> *)providedThroughConsumer
{
return [FBFuture futureWithResult:self.consumer];
}
#pragma mark NSObject
- (NSString *)description
{
return @"Output to consumer";
}
@end
@implementation FBProcessOutput_Logger
#pragma mark Initializers
- (instancetype)initWithLogger:(id<FBControlCoreLogger>)logger
{
id<FBDataConsumer> consumer = [FBLoggingDataConsumer consumerWithLogger:logger];
self = [super initWithConsumer:consumer logger:logger];
if (!self) {
return nil;
}
return self;
}
#pragma mark FBStandardStream
- (id<FBControlCoreLogger>)contents
{
return self.logger;
}
#pragma mark NSObject
- (NSString *)description
{
return @"Output to logger";
}
@end
@implementation FBProcessOutput_FilePath
#pragma mark Initializers
- (instancetype)initWithFilePath:(NSString *)filePath
{
self = [super init];
if (!self) {
return nil;
}
_filePath = filePath;
return self;
}
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[FBFuture
onQueue:self.workQueue resolve:^{
int fileDescriptor = self.fileDescriptor;
if (fileDescriptor) {
return [[FBControlCoreError
describeFormat:@"Cannot attach when already attached to file %@: %d", self.filePath, fileDescriptor]
failFuture];
}
fileDescriptor = open(self.filePath.UTF8String, O_WRONLY | O_CREAT);
if (!fileDescriptor) {
return [[FBControlCoreError
describeFormat:@"Cannot create file descriptor for %@: %s", self.filePath, strerror(errno)]
failFuture];
}
self.fileDescriptor = fileDescriptor;
return [FBFuture futureWithResult:[[FBProcessStreamAttachment alloc] initWithFileDescriptor:fileDescriptor closeOnEndOfFile:YES mode:FBProcessStreamAttachmentModeOutput]];
}]
nameFormat:@"Attach to %@", self.description];
}
- (FBFuture<NSNull *> *)detach
{
return [[FBFuture
onQueue:self.workQueue resolve:^ FBFuture<NSNull *> * {
int fileDescriptor = self.fileDescriptor;
if (fileDescriptor == 0) {
return [[FBControlCoreError
describe:@"Cannot Detach Twice"]
failFuture];
}
close(fileDescriptor);
self.fileDescriptor = 0;
return FBFuture.empty;
}]
nameFormat:@"Detach from %@", self.description];
}
- (NSString *)contents
{
return self.filePath;
}
#pragma mark FBProcessOutput Implementation
- (FBFuture<id<FBProcessFileOutput>> *)providedThroughFile
{
return [FBFuture futureWithResult:[[FBProcessFileOutput_DirectToFile alloc] initWithFilePath:self.filePath]];
}
- (FBFuture<id<FBDataConsumer>> *)providedThroughConsumer
{
return (FBFuture<id<FBDataConsumer>> *) [FBFileWriter asyncWriterForFilePath:self.filePath];
}
#pragma mark NSObject
- (NSString *)description
{
return [NSString stringWithFormat:@"Output to %@", self.filePath];
}
@end
@implementation FBProcessOutput_Data
#pragma mark Initializers
- (instancetype)initWithMutableData:(NSMutableData *)mutableData
{
id<FBAccumulatingBuffer> consumer = [FBDataBuffer accumulatingBufferForMutableData:mutableData];
self = [super initWithConsumer:consumer logger:nil];
if (!self) {
return nil;
}
_dataConsumer = consumer;
return self;
}
#pragma mark FBStandardStream
- (NSData *)contents
{
return self.dataConsumer.data;
}
#pragma mark NSObject
- (NSString *)description
{
return @"Output to Mutable Data";
}
@end
@implementation FBProcessOutput_String
#pragma mark FBStandardStream
- (NSString *)contents
{
NSData *data = self.dataConsumer.data;
// Strip newline from the end of the buffer.
if (data.length) {
char lastByte = 0;
NSRange range = NSMakeRange(data.length - 1, 1);
[data getBytes:&lastByte range:range];
if (lastByte == '\n') {
data = [data subdataWithRange:NSMakeRange(0, data.length - 1)];
}
}
return [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
}
#pragma mark NSObject
- (NSString *)description
{
return @"Output to Mutable String";
}
@end
@interface FBProcessInput ()
@property (nonatomic, strong, readonly) dispatch_queue_t workQueue;
@property (nonatomic, assign, readwrite) int readEnd;
@property (nonatomic, assign, readwrite) int writeEnd;
@end
@interface FBProcessInput_Consumer : FBProcessInput <FBDataConsumer>
@property (nonatomic, strong, nullable, readwrite) id<FBDataConsumer> writer;
@end
@interface FBProcessInput_Data : FBProcessInput_Consumer
- (instancetype)initWithData:(NSData *)data;
@property (nonatomic, strong, readonly) NSData *data;
@end
@class NSOutputStream_FBProcessInput;
@interface FBProcessInput_InputStream : FBProcessInput <FBStandardStreamTransfer>
@property (nonatomic, strong, readonly) NSOutputStream_FBProcessInput *stream;
@property (nonatomic, strong, readonly) FBMutableFuture<NSNumber *> *writeFuture;
@end
@interface NSOutputStream_FBProcessInput : NSOutputStream
@property (nonatomic, strong, readonly) FBFuture<NSNumber *> *writeFuture;
@property (nonatomic, assign, readwrite) int fileDescriptor;
@property (atomic, assign, readwrite) ssize_t bytesWritten;
@property (atomic, copy, nullable, readwrite) NSString *errorMessage;
@property (atomic, assign, readwrite) NSStreamStatus status;
- (instancetype)initWithWriteFuture:(FBFuture<NSNumber *> *)writeFuture;
@end
@implementation FBProcessInput
#pragma mark Initializers
+ (FBProcessInput<id<FBDataConsumer>> *)inputFromConsumer
{
return [[FBProcessInput_Consumer alloc] init];
}
+ (FBProcessInput<NSOutputStream *> *)inputFromStream
{
return [[FBProcessInput_InputStream alloc] init];
}
+ (FBProcessInput<NSData *> *)inputFromData:(NSData *)data
{
return [[FBProcessInput_Data alloc] initWithData:data];
}
- (instancetype)init
{
return [self initWithWorkQueue:FBProcessOutput.createWorkQueue];
}
- (instancetype)initWithWorkQueue:(dispatch_queue_t)workQueue
{
self = [super init];
if (!self) {
return nil;
}
_workQueue = workQueue;
return self;
}
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[FBFuture
onQueue:self.workQueue resolve:^{
if (self.readEnd || self.writeEnd) {
return [[FBControlCoreError
describeFormat:@"Cannot Attach Twice"]
failFuture];
}
int fileDescriptors[2] = {0, 0};
if (pipe(fileDescriptors) != 0) {
return [[FBControlCoreError
describeFormat:@"Failed to create a pipe: %s", strerror(errno)]
failFuture];
}
self.readEnd = fileDescriptors[0];
self.writeEnd = fileDescriptors[1];
// Pass out the read end as input to a process.
// Subclases will write to the write end.
return [FBFuture futureWithResult:[[FBProcessStreamAttachment alloc] initWithFileDescriptor:self.readEnd closeOnEndOfFile:YES mode:FBProcessStreamAttachmentModeInput]];
}]
nameFormat:@"Attach %@ to pipe", self.description];
}
- (FBFuture<NSNull *> *)detach
{
return [[FBFuture
onQueue:self.workQueue resolve:^ FBFuture<NSNull *> * {
int readEnd = self.readEnd;
if (!readEnd) {
return [[FBControlCoreError
describeFormat:@"Nothing is attached to %@", self]
failFuture];
}
// Close the read end of the descriptor since the input it no-longer consuming it
// The writer is responsible for closing and referencing the write end.
close(readEnd);
self.readEnd = 0;
self.writeEnd = 0;
return FBFuture.empty;
}]
nameFormat:@"Detach %@", self.description];
}
- (id<FBDataConsumer>)contents
{
NSAssert(NO, @"-[%@ %@] is abstract and should be overridden", NSStringFromClass(self.class), NSStringFromSelector(_cmd));
return nil;
}
- (NSError *)streamError
{
return nil;
}
@end
@implementation FBProcessInput_Consumer
#pragma mark FBStandardStream
- (id<FBDataConsumer>)contents
{
return self;
}
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[[super
attach]
onQueue:self.workQueue fmap:^(FBProcessStreamAttachment *attachment) {
NSError *error = nil;
// Construct a writer to write to, on eof the file descriptor is closed and the reading continues on the other side of the pipe.
// The read end is closed in the superclassess detach.
id<FBDataConsumer> writer = [FBFileWriter asyncWriterWithFileDescriptor:self.writeEnd closeOnEndOfFile:YES error:&error];
if (!writer) {
return [[FBControlCoreError
describeFormat:@"Failed to create a writer for pipe %@", error]
failFuture];
}
self.writer = writer;
return [FBFuture futureWithResult:attachment];
}]
nameFormat:@"Attach %@ to pipe", self.description];
}
- (FBFuture<NSNull *> *)detach
{
return [[[super
detach]
onQueue:self.workQueue notifyOfCompletion:^(id _) {
self.writer = nil;
}]
nameFormat:@"Detach %@", self.description];
}
#pragma mark FBDataConsumer
- (void)consumeData:(NSData *)data
{
[self.writer consumeData:data];
}
- (void)consumeEndOfFile
{
[self.writer consumeEndOfFile];
}
#pragma mark NSObject
- (NSString *)description
{
return @"Input to consumer";
}
@end
@implementation FBProcessInput_Data
#pragma mark Initializers
- (instancetype)initWithData:(NSData *)data
{
self = [super init];
if (!self) {
return nil;
}
_data = data;
return self;
}
#pragma mark FBStandardStream
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[[super
attach]
onQueue:self.workQueue map:^(FBProcessStreamAttachment *attachment) {
[self.writer consumeData:self.data];
[self.writer consumeEndOfFile];
return attachment;
}]
nameFormat:@"Attach %@ to pipe", self.description];
}
- (NSData *)contents
{
return self.data;
}
#pragma mark NSObject
- (NSString *)description
{
return @"Input to Data";
}
@end
@implementation FBProcessInput_InputStream
#pragma mark Initializers
- (instancetype)init
{
self = [super init];
if (!self) {
return nil;
}
_writeFuture = FBMutableFuture.future;
_stream = [[NSOutputStream_FBProcessInput alloc] initWithWriteFuture:_writeFuture];
return self;
}
#pragma mark FBStandardStream
- (NSOutputStream *)contents
{
return self.stream;
}
- (FBFuture<FBProcessStreamAttachment *> *)attach
{
return [[super
attach]
onQueue:self.workQueue map:^(FBProcessStreamAttachment *attachment) {
[self.writeFuture resolveWithResult:@(self.writeEnd)];
return attachment;
}];
}
- (NSString *)description
{
return @"Input to NSOutputStream";
}
#pragma mark FBStandardStreamTransfer
- (ssize_t)bytesTransferred
{
return self.stream.bytesWritten;
}
- (NSError *)streamError
{
return self.stream.streamError;
}
@end
@implementation NSOutputStream_FBProcessInput
#pragma mark Initializers
- (instancetype)initWithWriteFuture:(FBFuture<NSNumber *> *)writeFuture
{
self = [super init];
if (!self) {
return nil;
}
// The pipe first has to be created, so we don't know this ahead of time.
// Instead we block until the write descriptor becomes available.
_writeFuture = writeFuture;
_fileDescriptor = 0;
_bytesWritten = 0;
_errorMessage = nil;
_status = NSStreamStatusNotOpen;
return self;
}
#pragma mark NSOutputStream
- (NSInteger)write:(const uint8_t *)buffer maxLength:(NSUInteger)len
{
int fileDescriptor = self.fileDescriptor;
if (!fileDescriptor) {
NSStreamStatus status = self.status;
if (status == NSStreamStatusNotOpen) {
[self resolveError:@"Pipe for writing is not open"];
} else if (status == NSStreamStatusClosed) {
[self resolveError:@"Pipe for writing is closed"];
} else {
[self resolveError:@"Pipe for writing is does not exist"];
}
return -1;
}
self.status = NSStreamStatusWriting;
ssize_t result = write(self.fileDescriptor, buffer, len);
self.status = NSStreamStatusOpen;
if (result == -1) {
[self resolveError:[[NSString alloc] initWithCString:strerror(errno) encoding:NSASCIIStringEncoding]];
return -1;
}
self.bytesWritten += result;
return result;
}
- (void)open
{
if (self.streamStatus != NSStreamStatusNotOpen) {
[self resolveError:[NSString stringWithFormat:@"Stream status is not NSStreamStatusNotOpen is %lu", self.streamStatus]];
return;
}
self.status = NSStreamStatusOpening;
NSNumber *fileDescriptor = [self.writeFuture block:nil];
self.fileDescriptor = fileDescriptor.intValue;
self.status = NSStreamStatusOpen;
}
- (void)close
{
if (self.fileDescriptor) {
close(self.fileDescriptor);
self.fileDescriptor = 0;
self.status = NSStreamStatusClosed;
}
}
- (BOOL)hasSpaceAvailable
{
return YES;
}
- (NSError *)streamError
{
NSString *errorMessage = self.errorMessage;
if (!errorMessage) {
return nil;
}
return [[FBControlCoreError
describe:errorMessage]
build];
}
- (NSStreamStatus)streamStatus
{
return self.status;
}
#pragma mark Private
- (void)resolveError:(NSString *)errorMessage
{
if (self.errorMessage) {
return;
}
self.errorMessage = errorMessage;
self.status = NSStreamStatusError;
}
@end