func()

in client.go [537:587]


func (c *Connection) createStream(ctx context.Context) error {
	loggerPrintf("Creating stream.")
	token, err := getIdentityToken()
	if err != nil {
		return fmt.Errorf("%w: %v", ErrGettingInstanceToken, err)
	}

	ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
		"authentication":                  "Bearer " + token,
		"agent-communication-resource-id": c.resourceID,
		"agent-communication-channel-id":  c.channelID,
	}))

	loggerPrintf("Using ResourceID %q", c.resourceID)
	loggerPrintf("Using ChannelID %q", c.channelID)

	// Set a timeout for the stream, this is well above service side timeout.
	cnclCtx, cancel := context.WithTimeout(ctx, 60*time.Minute)
	stream, err := createStreamLoop(cnclCtx, c.client, c.resourceID, c.channelID)
	if err != nil {
		cancel()
		c.close(err)
		return err
	}

	// Reading headers is best effort, if we fail to read headers we will just log the error and
	// continue.
	c.readHeaders(stream)

	// Used to signal that the stream is closed.
	streamClosed := make(chan struct{})
	// This ensures that only one send is happening at a time.
	streamSendLock := make(chan struct{}, 1)
	go c.recv(ctx, streamClosed, streamSendLock, stream)
	go c.send(streamClosed, streamSendLock, stream)

	go func() {
		defer cancel()
		for {
			select {
			// Indicates that the stream is setup and is ready to send, this is used by sendMessage to
			// block sends during reconnect.
			case <-c.streamReady:
			case <-streamClosed:
				return
			}
		}
	}()
	loggerPrintf("Stream established.")
	return nil
}