in client.go [537:587]
func (c *Connection) createStream(ctx context.Context) error {
loggerPrintf("Creating stream.")
token, err := getIdentityToken()
if err != nil {
return fmt.Errorf("%w: %v", ErrGettingInstanceToken, err)
}
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
"authentication": "Bearer " + token,
"agent-communication-resource-id": c.resourceID,
"agent-communication-channel-id": c.channelID,
}))
loggerPrintf("Using ResourceID %q", c.resourceID)
loggerPrintf("Using ChannelID %q", c.channelID)
// Set a timeout for the stream, this is well above service side timeout.
cnclCtx, cancel := context.WithTimeout(ctx, 60*time.Minute)
stream, err := createStreamLoop(cnclCtx, c.client, c.resourceID, c.channelID)
if err != nil {
cancel()
c.close(err)
return err
}
// Reading headers is best effort, if we fail to read headers we will just log the error and
// continue.
c.readHeaders(stream)
// Used to signal that the stream is closed.
streamClosed := make(chan struct{})
// This ensures that only one send is happening at a time.
streamSendLock := make(chan struct{}, 1)
go c.recv(ctx, streamClosed, streamSendLock, stream)
go c.send(streamClosed, streamSendLock, stream)
go func() {
defer cancel()
for {
select {
// Indicates that the stream is setup and is ready to send, this is used by sendMessage to
// block sends during reconnect.
case <-c.streamReady:
case <-streamClosed:
return
}
}
}()
loggerPrintf("Stream established.")
return nil
}