private void generateOperationEventStreamStructure()

in codegen/smithy-go-codegen/src/main/java/software/amazon/smithy/go/codegen/EventStreamGenerator.java [147:376]


    private void generateOperationEventStreamStructure(GoWriter writer, OperationShape operationShape) {
        var opEventStreamStructure = getEventStreamOperationStructureSymbol(serviceShape, operationShape);
        var constructor = getEventStreamOperationStructureConstructor(serviceShape, operationShape);

        var inputInfo = streamIndex.getInputInfo(operationShape);
        var outputInfo = streamIndex.getOutputInfo(operationShape);


        writer.write("""
                     // $T provides the event stream handling for the $L operation.
                     //
                     // For testing and mocking the event stream this type should be initialized via
                     // the $T constructor function. Using the functional options
                     // to pass in nested mock behavior.""", opEventStreamStructure, operationShape.getId().getName(),
                constructor
                );
        writer.openBlock("type $T struct {", "}", opEventStreamStructure, () -> {
            inputInfo.ifPresent(eventStreamInfo -> {
                var eventStreamTarget = eventStreamInfo.getEventStreamTarget();
                var writerInterfaceName = getEventStreamWriterInterfaceName(serviceShape, eventStreamTarget);

                writer.writeDocs(String.format("""
                                               %s is the EventStream writer for the %s events. This value is
                                               automatically set by the SDK when the API call is made Use this
                                               member when unit testing your code with the SDK to mock out the
                                               EventStream Writer.""",
                                writerInterfaceName, eventStreamTarget.getId().getName(serviceShape)))
                        .writeDocs("")
                        .writeDocs("Must not be nil.")
                        .write("Writer $L", writerInterfaceName).write("");
            });

            outputInfo.ifPresent(eventStreamInfo -> {
                var eventStreamTarget = eventStreamInfo.getEventStreamTarget();
                var readerInterfaceName = getEventStreamReaderInterfaceName(serviceShape, eventStreamTarget);

                writer.writeDocs(String.format("""
                                               %s is the EventStream reader for the %s events. This value is
                                               automatically set by the SDK when the API call is made Use this
                                               member when unit testing your code with the SDK to mock out the
                                               EventStream Reader.""",
                                readerInterfaceName, eventStreamTarget.getId().getName(serviceShape)))
                        .writeDocs("")
                        .writeDocs("Must not be nil.")
                        .write("Reader $L", readerInterfaceName).write("");
            });

            writer.write("done chan struct{}")
                    .write("closeOnce $T", SymbolUtils.createValueSymbolBuilder("Once", SmithyGoDependency.SYNC)
                            .build())
                    .write("err $P", SymbolUtils.createPointableSymbolBuilder("OnceErr",
                            SmithyGoDependency.SMITHY_SYNC).build());
        }).write("");

        writer.write("""
                     // $T initializes an $T.
                     // This function should only be used for testing and mocking the $T
                     // stream within your application.""", constructor, opEventStreamStructure,
                opEventStreamStructure);
        if (inputInfo.isPresent()) {
            writer.writeDocs("");
            writer.writeDocs("The Writer member must be set before writing events to the stream.");
        }
        if (outputInfo.isPresent()) {
            writer.writeDocs("");
            writer.writeDocs("The Reader member must be set before reading events from the stream.");
        }
        writer.openBlock("func $T(optFns ...func($P)) $P {", "}", constructor,
                opEventStreamStructure, opEventStreamStructure, () -> writer
                        .openBlock("es := &$L{", "}", opEventStreamStructure, () -> writer
                                .write("done: make(chan struct{}),")
                                .write("err: $T(),", SymbolUtils.createValueSymbolBuilder("NewOnceErr",
                                        SmithyGoDependency.SMITHY_SYNC).build()))
                        .openBlock("for _, fn := range optFns {", "}", () -> writer
                                .write("fn(es)"))
                        .write("return es")).write("");

        if (inputInfo.isPresent()) {
            writer.write("""
                         // Send writes the event to the stream blocking until the event is written.
                         // Returns an error if the event was not written.
                         func (es $P) Send(ctx $P, event $P) error {
                             return es.Writer.Send(ctx, event)
                         }
                         """, opEventStreamStructure, SymbolUtils.createValueSymbolBuilder("Context",
                            SmithyGoDependency.CONTEXT).build(),
                    symbolProvider.toSymbol(inputInfo.get().getEventStreamTarget()));
        }

        if (outputInfo.isPresent()) {
            writer.write("""
                         // Events returns a channel to read events from.
                         func (es $P) Events() <-chan $P {
                             return es.Reader.Events()
                         }
                         """, opEventStreamStructure, symbolProvider.toSymbol(outputInfo.get().getEventStreamTarget()));
        }

        writer.write("""
                     // Close closes the stream. This will also cause the stream to be closed.
                     // Close must be called when done using the stream API. Not calling Close
                     // may result in resource leaks.
                     //
                     // Will close the underlying EventStream writer and reader, and no more events can be
                     // sent or received.
                     func (es $P) Close() error {
                         es.closeOnce.Do(es.safeClose)
                         return es.Err()
                     }
                     """, opEventStreamStructure);

        writer.openBlock("func (es $P) safeClose() {", "}",
                opEventStreamStructure, () -> {
                    writer.write("""
                                 close(es.done)
                                 """);

                    if (inputInfo.isPresent()) {
                        var newTicker = SymbolUtils.createValueSymbolBuilder("NewTicker",
                                SmithyGoDependency.TIME).build();
                        var second = SymbolUtils.createValueSymbolBuilder("Second",
                                SmithyGoDependency.TIME).build();
                        writer.write("""
                                     t := $T($T)
                                     defer t.Stop()
                                     writeCloseDone := make(chan error)
                                     go func() {
                                         if err := es.Writer.Close(); err != nil {
                                             es.err.SetError(err)
                                         }
                                         close(writeCloseDone)
                                     }()
                                     select {
                                     case <-t.C:
                                     case <-writeCloseDone:
                                     }
                                      """, newTicker, second);
                    }

                    if (outputInfo.isPresent()) {
                        writer.write("es.Reader.Close()");
                    }
                }).write("");

        writer.writeDocs("""
                         Err returns any error that occurred while reading or writing EventStream
                         Events from the service API's response. Returns nil if there were no errors.""");
        writer.openBlock("func (es $P) Err() error {", "}",
                opEventStreamStructure, () -> {
                    writer.write("""
                                 if err := es.err.Err(); err != nil {
                                     return err
                                 }
                                 """);

                    if (inputInfo.isPresent()) {
                        writer.write("""
                                     if err := es.Writer.Err(); err != nil {
                                         return err
                                     }
                                     """);
                    }

                    if (outputInfo.isPresent()) {
                        writer.write("""
                                     if err := es.Reader.Err(); err != nil {
                                         return err
                                     }
                                     """);
                    }

                    writer.write("return nil");
                }).write("");

        writer.openBlock("func (es $P) waitStreamClose() {", "}", opEventStreamStructure,
                () -> {
                    writer.write("""
                                 type errorSet interface {
                                     ErrorSet() <-chan struct{}
                                 }
                                 """);

                    if (inputInfo.isPresent()) {
                        writer.write("""
                                     var inputErrCh <-chan struct{}
                                     if v, ok := es.Writer.(errorSet); ok {
                                         inputErrCh = v.ErrorSet()
                                     }
                                     """);
                    }

                    if (outputInfo.isPresent()) {
                        writer.write("""
                                     var outputErrCh <-chan struct{}
                                     if v, ok := es.Reader.(errorSet); ok {
                                         outputErrCh = v.ErrorSet()
                                     }
                                     var outputClosedCh <-chan struct{}
                                     if v, ok := es.Reader.(interface{ Closed() <-chan struct{} }); ok {
                                         outputClosedCh = v.Closed()
                                     }
                                     """);
                    }

                    writer.openBlock("select {", "}", () -> {
                        writer.write("case <-es.done:");
                        if (inputInfo.isPresent()) {
                            writer.write("""
                                         case <-inputErrCh:
                                             es.err.SetError(es.Writer.Err())
                                             es.Close()
                                         """);
                        }
                        if (outputInfo.isPresent()) {
                            writer.write("""
                                         case <-outputErrCh:
                                             es.err.SetError(es.Reader.Err())
                                             es.Close()

                                         case <-outputClosedCh:
                                             if err := es.Reader.Err(); err != nil {
                                                 es.err.SetError(es.Reader.Err())
                                             }
                                             es.Close()
                                         """);
                        }
                    });

                }).write("");
    }