in codegen/smithy-aws-go-codegen/src/main/java/software/amazon/smithy/aws/go/codegen/AwsEventStreamUtils.java [857:1131]
private static void generateEventStreamWriter(
GenerationContext context,
UnionShape eventStream,
Set<EventStreamInfo> eventStreamInfos,
boolean withInitialMessages
) {
var settings = context.getSettings();
var service = context.getService();
var symbolProvider = context.getSymbolProvider();
var writer = context.getWriter().get();
var eventUnionSymbol = symbolProvider.toSymbol(eventStream);
var asyncEventSymbol = getModuleSymbol(settings, getAsyncWriteReporterName(eventStream,
service));
var eventSymbol = withInitialMessages ? getWriterEventWrapperInterface(symbolProvider,
eventStream, service) : eventUnionSymbol;
generateAsyncWriteReporter(writer, eventSymbol, asyncEventSymbol);
var writerImplName = getEventStreamWriterImplName(eventStream, service);
var writerSymbol = getModuleSymbol(settings, writerImplName);
var encoderSymbol = getSymbol("Encoder", AwsGoDependency.SERVICE_INTERNAL_EVENTSTREAM);
var writeCloser = getSymbol("WriteCloser", SmithyGoDependency.IO);
var signerInterface = getModuleSymbol(settings, EVENT_STREAM_SIGNER_INTERFACE);
var messageSymbol = getEventStreamSymbol("Message", true);
if (withInitialMessages) {
generateEventStreamWriterMessageWrapper(eventStream, service, symbolProvider, writer, eventUnionSymbol);
}
writer.openBlock("type $T struct {", "}", writerSymbol, () -> {
var bytesBufferSymbol = SymbolUtils.createPointableSymbolBuilder("Buffer",
SmithyGoDependency.BYTES).build();
var syncOnce = getSymbol("Once", SmithyGoDependency.SYNC);
var onceErr = getSymbol("OnceErr", SmithyGoDependency.SMITHY_SYNC);
writer.write("""
encoder $P
signer $T
stream chan $T
serializationBuffer $P
signingBuffer $P
eventStream $T
done chan struct{}
closeOnce $T
err $P
""", encoderSymbol, signerInterface, asyncEventSymbol, bytesBufferSymbol, bytesBufferSymbol,
writeCloser, syncOnce, onceErr);
if (withInitialMessages) {
writer.write("initialRequestSerializer func(interface{}, $P) error", messageSymbol);
}
}).write("");
Symbol bytesNewBuffer = SymbolUtils.createValueSymbolBuilder("NewBuffer",
SmithyGoDependency.BYTES).build();
writer.writeInline("func $L(stream $T, encoder $P, signer $T",
getEventStreamWriterImplConstructorName(eventStream, service), writeCloser, encoderSymbol,
signerInterface);
if (withInitialMessages) {
writer.writeInline(", irs func(interface{}, $P) error", messageSymbol);
}
writer.openBlock(") $P {", "}", writerSymbol, () -> writer
.openBlock("w := &$T{", "}", writerSymbol, () -> {
var onceErr = SymbolUtils.createValueSymbolBuilder("NewOnceErr",
SmithyGoDependency.SMITHY_SYNC).build();
writer.write("""
encoder: encoder,
signer: signer,
stream: make(chan $T),
eventStream: stream,
done: make(chan struct{}),
err: $T(),
serializationBuffer: $T(nil),
signingBuffer: $T(nil),
""", asyncEventSymbol, onceErr, bytesNewBuffer, bytesNewBuffer);
if (withInitialMessages) {
writer.write("initialRequestSerializer: irs,");
}
}).write("")
.write("""
go w.writeStream()
return w
""")).write("");
Symbol contextSymbol = SymbolUtils.createValueSymbolBuilder("Context", SmithyGoDependency.CONTEXT).build();
writer.openBlock("func (w $P) Send(ctx $P, event $P) error {", "}", writerSymbol, contextSymbol,
eventUnionSymbol, () -> {
if (withInitialMessages) {
writer.write("return w.send(ctx, &$T{Value: event})",
getWriterEventWrapperMessageType(symbolProvider, eventStream, service));
} else {
writer.write("return w.send(ctx, event)");
}
}).write("");
writer.openBlock("func (w $P) send(ctx $P, event $P) error {", "}", writerSymbol, contextSymbol,
eventSymbol, () -> {
writer.write("""
if err := w.err.Err(); err != nil {
return err
}
resultCh := make(chan error)
wrapped := $T{
Event: event,
Result: resultCh,
}
""", asyncEventSymbol);
Symbol errorfSymbol = SymbolUtils.createValueSymbolBuilder("Errorf", SmithyGoDependency.FMT)
.build();
final String streamClosedError = "stream closed, unable to send event";
writer.openBlock("select {", "}", () -> writer
.write("""
case w.stream <- wrapped:
case <-ctx.Done():
return ctx.Err()
case <-w.done:
return $T($S)
""", errorfSymbol, streamClosedError)).write("");
writer.openBlock("select {", "}", () -> writer
.write("""
case err := <-resultCh:
return err
case <-ctx.Done():
return ctx.Err()
case <-w.done:
return $T($S)
""", errorfSymbol, streamClosedError)).write("");
}).write("");
writer.openBlock("func (w $P) writeStream() {", "}", writerSymbol, () -> writer
.write("defer w.Close()").write("")
.openBlock("for {", "}", () -> writer
.openBlock("select {", "}", () -> writer
.openBlock("case wrapper := <-w.stream:", "", () -> writer
.write("err := w.writeEvent(wrapper.Event)")
.write("wrapper.ReportResult(w.done, err)")
.openBlock("if err != nil {", "}", () -> writer
.write("w.err.SetError(err)")
.write("return")))
.openBlock("case <-w.done:", "", () -> writer
.openBlock("if err := w.closeStream(); err != nil {", "}",
() -> writer.write("w.err.SetError(err)"))
.write("return"))))).write("");
writer.openBlock("func (w $P) writeEvent(event $P) error {", "}", writerSymbol, eventSymbol, () -> {
Runnable returnErr = () -> writer.openBlock("if err != nil {", "}", () -> writer.write("return err"))
.write("");
writer.writeDocs("""
serializedEvent returned bytes refers to an underlying byte buffer and must not escape
this writeEvent scope without first copying. Any previous bytes stored in the buffer
are cleared by this call.
""");
writer.write("serializedEvent, err := w.serializeEvent(event)");
returnErr.run();
writer.writeDocs("""
signedEvent returned bytes refers to an underlying byte buffer and must not escape
this writeEvent scope without first copying. Any previous bytes stored in the buffer
are cleared by this call.
""");
writer.write("signedEvent, err := w.signEvent(serializedEvent)");
returnErr.run();
writer.writeDocs("bytes are now copied to the underlying stream writer");
writer.write("_, err = io.Copy(w.eventStream, bytes.NewReader(signedEvent))")
.write("return err");
}).write("");
writer.openBlock("func (w $P) serializeEvent(event $P) ([]byte, error) {", "}", writerSymbol, eventSymbol,
() -> {
writer.write("w.serializationBuffer.Reset()").write("")
.write("eventMessage := $T{}", messageSymbol).write("");
var eventStreamSerializerName = getEventStreamSerializerName(eventStream, service,
context.getProtocolName());
if (withInitialMessages) {
var initialRequestType = getWriterEventWrapperInitialRequestType(symbolProvider, eventStream,
service);
var messageEventType = getWriterEventWrapperMessageType(symbolProvider, eventStream,
service);
var errorf = getSymbol("Errorf", SmithyGoDependency.FMT, false);
writer.write("""
switch ev := event.(type) {
case $P:
if err := w.initialRequestSerializer(ev.Value, &eventMessage); err != nil {
return nil, err
}
case $P:
if err := $L(ev.Value, &eventMessage); err != nil {
return nil, err
}
default:
return nil, $T("unknown event wrapper type: %v", event)
}
""", initialRequestType, messageEventType, eventStreamSerializerName, errorf);
} else {
writer.write("""
if err := $L(event, &eventMessage); err != nil {
return nil, err
}
""",
eventStreamSerializerName);
}
writer.write("""
if err := w.encoder.Encode(w.serializationBuffer, eventMessage); err != nil {
return nil, err
}
return w.serializationBuffer.Bytes(), nil""");
}).write("");
writer.openBlock("func (w $P) signEvent(payload []byte) ([]byte, error) {", "}", writerSymbol, () -> {
var timestampValue = getEventStreamSymbol("TimestampValue", false);
var dateHeader = getEventStreamApiSymbol("DateHeader", false);
var chunkSignatureHeader = getEventStreamApiSymbol("ChunkSignatureHeader", false);
var bytesValue = getEventStreamSymbol("BytesValue", false);
writer.addUseImports(SmithyGoDependency.TIME);
writer.write("w.signingBuffer.Reset()").write("")
.write("date := time.Now().UTC()").write("")
.write("var msg $T", messageSymbol)
.write("msg.Headers.Set($T, $T(date))", dateHeader, timestampValue)
.write("msg.Payload = payload").write("")
.write("var headers bytes.Buffer")
.openBlock("if err := $T(&headers, msg.Headers); err != nil {", "}",
getEventStreamSymbol("EncodeHeaders", false),
() -> writer.write("return nil, err")).write("")
.write("sig, err := w.signer.GetSignature(context.Background(), headers.Bytes(), "
+ "msg.Payload, date)")
.openBlock("if err != nil {", "}", () -> writer
.write("return nil, err")).write("")
.write("msg.Headers.Set($T, $T(sig))", chunkSignatureHeader, bytesValue).write("")
.openBlock("if err := w.encoder.Encode(w.signingBuffer, msg); err != nil {", "}", () -> writer
.write("return nil, err")).write("")
.write("return w.signingBuffer.Bytes(), nil");
}).write("");
writer.openBlock("func (w $P) closeStream() (err error) {", "}", writerSymbol, () -> writer
.openBlock("defer func() {", "}()", () -> writer
.openBlock("if cErr := w.eventStream.Close(); cErr != nil && err == nil {", "}",
() -> writer.write("err = cErr"))).write("")
.write("""
// Per the protocol, a signed empty message is used to indicate the end of the stream,
// and that no subsequent events will be sent.
signedEvent, err := w.signEvent([]byte{})""")
.openBlock("if err != nil {", "}", () -> writer.write("return err")).write("")
.write("_, err = io.Copy(w.eventStream, bytes.NewReader(signedEvent))")
.write("return err")).write("");
writer.openBlock("func (w $P) ErrorSet() <-chan struct{} {", "}", writerSymbol, () -> writer
.write("return w.err.ErrorSet()")).write("");
writer.openBlock("func (w $P) Close() error {", "}", writerSymbol, () -> writer
.write("w.closeOnce.Do(w.safeClose)")
.write("return w.Err()")).write("");
writer.openBlock("func (w $P) safeClose() {", "}", writerSymbol, () -> writer
.write("close(w.done)")).write("");
writer.openBlock("func (w $P) Err() error {", "}", writerSymbol, () -> writer
.write("return w.err.Err()")).write("");
}