func()

in internal/vm/ioproxy.go [76:157]


func (connectorPair *IOConnectorPair) proxy(
	ctx context.Context,
	logger *logrus.Entry,
	timeoutAfterExit time.Duration,
) (ioInitDone <-chan error, ioCopyDone <-chan error) {
	// initDone might not have to be buffered. We only send ioInitErr once.
	initDone := make(chan error, 2)
	copyDone := make(chan error)

	ioCtx, ioCancel := context.WithCancel(context.Background())

	// Start the initialization process. Any synchronous setup made by the connectors will
	// be completed after these lines. Async setup will be done once initDone is closed in
	// the goroutine below.
	readerResultCh := connectorPair.ReadConnector(ioCtx, logger.WithField("direction", "read"))
	writerResultCh := connectorPair.WriteConnector(ioCtx, logger.WithField("direction", "write"))

	go func() {
		defer ioCancel()
		defer close(copyDone)

		var reader io.ReadCloser
		var writer io.WriteCloser
		var ioInitErr error

		// Send the first error we get to initDone, but consume both so we can ensure both
		// end up closed in the case of an error
		for readerResultCh != nil || writerResultCh != nil {
			var err error
			select {
			case readerResult := <-readerResultCh:
				readerResultCh = nil
				reader, err = readerResult.ReadWriteCloser, readerResult.Err
			case writerResult := <-writerResultCh:
				writerResultCh = nil
				writer, err = writerResult.ReadWriteCloser, writerResult.Err
			}

			if err != nil {
				ioInitErr = errors.Wrap(err, "error initializing io")
				logger.WithError(ioInitErr).Error()
				initDone <- ioInitErr
			}
		}

		close(initDone)
		if ioInitErr != nil {
			logClose(logger, reader, writer)
			return
		}

		// IO streams have been initialized successfully

		// Once the proc exits, wait the provided time before forcibly closing io streams.
		// If the io streams close on their own before the timeout, the Close calls here
		// should just be no-ops.
		go func() {
			<-ctx.Done()
			time.AfterFunc(timeoutAfterExit, func() {
				logClose(logger, reader, writer)
			})
		}()

		logger.Debug("begin copying io")
		defer logger.Debug("end copying io")

		size, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize))
		logger.Debugf("copied %d", size)
		if err != nil {
			if strings.Contains(err.Error(), "use of closed network connection") ||
				strings.Contains(err.Error(), "file already closed") {
				logger.Infof("connection was closed: %v", err)
			} else {
				logger.WithError(err).Error("error copying io")
			}
			copyDone <- err
		}
		defer logClose(logger, reader, writer)
	}()

	return initDone, copyDone
}