in internal/vm/ioproxy.go [76:157]
func (connectorPair *IOConnectorPair) proxy(
ctx context.Context,
logger *logrus.Entry,
timeoutAfterExit time.Duration,
) (ioInitDone <-chan error, ioCopyDone <-chan error) {
// initDone might not have to be buffered. We only send ioInitErr once.
initDone := make(chan error, 2)
copyDone := make(chan error)
ioCtx, ioCancel := context.WithCancel(context.Background())
// Start the initialization process. Any synchronous setup made by the connectors will
// be completed after these lines. Async setup will be done once initDone is closed in
// the goroutine below.
readerResultCh := connectorPair.ReadConnector(ioCtx, logger.WithField("direction", "read"))
writerResultCh := connectorPair.WriteConnector(ioCtx, logger.WithField("direction", "write"))
go func() {
defer ioCancel()
defer close(copyDone)
var reader io.ReadCloser
var writer io.WriteCloser
var ioInitErr error
// Send the first error we get to initDone, but consume both so we can ensure both
// end up closed in the case of an error
for readerResultCh != nil || writerResultCh != nil {
var err error
select {
case readerResult := <-readerResultCh:
readerResultCh = nil
reader, err = readerResult.ReadWriteCloser, readerResult.Err
case writerResult := <-writerResultCh:
writerResultCh = nil
writer, err = writerResult.ReadWriteCloser, writerResult.Err
}
if err != nil {
ioInitErr = errors.Wrap(err, "error initializing io")
logger.WithError(ioInitErr).Error()
initDone <- ioInitErr
}
}
close(initDone)
if ioInitErr != nil {
logClose(logger, reader, writer)
return
}
// IO streams have been initialized successfully
// Once the proc exits, wait the provided time before forcibly closing io streams.
// If the io streams close on their own before the timeout, the Close calls here
// should just be no-ops.
go func() {
<-ctx.Done()
time.AfterFunc(timeoutAfterExit, func() {
logClose(logger, reader, writer)
})
}()
logger.Debug("begin copying io")
defer logger.Debug("end copying io")
size, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize))
logger.Debugf("copied %d", size)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "file already closed") {
logger.Infof("connection was closed: %v", err)
} else {
logger.WithError(err).Error("error copying io")
}
copyDone <- err
}
defer logClose(logger, reader, writer)
}()
return initDone, copyDone
}