Skip to content

implement schema-serde event streams#641

Open
lucix-aws wants to merge 1 commit intofeat-serde2-eventstreamfrom
feat-serde2-eventstream-impl
Open

implement schema-serde event streams#641
lucix-aws wants to merge 1 commit intofeat-serde2-eventstreamfrom
feat-serde2-eventstream-impl

Conversation

@lucix-aws
Copy link
Copy Markdown
Contributor

@lucix-aws lucix-aws commented Mar 19, 2026

implements schema-serde event stream support for aws-framed protocols (i.e. all of them)

https://smithy.io/2.0/aws/amazon-eventstream.html#amazon-eventstream

This is fundamentally the exact same thing that we had before, with all of the pieces of event stream wireup split out to allow for schema-serde. If you open up an existing generated event stream body, e.g. https://github.com/aws/aws-sdk-go-v2/blob/main/service/transcribestreaming/eventstream.go, you will see how things sort of map out in the new runtime. I made a point of preserving the structure of the old code as much as possible.

at a glance:

  • ClientProtocol is extended with new event stream APIs. in practice since all the protocols use the same one, our implementations can just embed the new eventstream.Codec to get support
  • the "generated" part of event streams is now just a type marshaler that figures out what variant schema to pass to the underlying event stream writer which is now in the runtime. pasted an example of that below
  • as you saw in a previous PR i migrated the aws eventstream/ module over to smithy-go. in this patch i've also flattened it a bit by dropping the eventstreamapi/ sub-package, which had middleware stuff in it which caused an import cycle. that got moved over to transport/http and the constants got promoted up into smithy-go/eventstream/
  • i ended up just adapting the SDK's sigv4 for event streams. I extended the auth resolution stuff to where smithyhttp.Signer can optionally implement a message signer, and the SDK provides support for that.

example of generated type adapter:

type audioStreamWriter struct {
    // smithyhttp.EventStreamWriter is new, previously basically its entire body of logic was generated inline per-op
    writer *smithyhttp.EventStreamWriter
}

var _ AudioStreamWriter = (*audioStreamWriter)(nil)

func (w *audioStreamWriter) Send(ctx context.Context, event types.AudioStream) error {
    var variant *smithy.Schema
    switch event.(type) {
    case *types.AudioStreamMemberAudioEvent:
        variant = schemas.AudioStream_AudioEvent
    case *types.AudioStreamMemberConfigurationEvent:
        variant = schemas.AudioStream_ConfigurationEvent
    default:
        return fmt.Errorf("unknown event type: %T", event)
    }   
    sv, ok := event.(smithy.Serializable)
    if !ok {
        return fmt.Errorf("event %T is not serializable", event)
    }   
   // pass it along to the new runtime, which handles delegating to the protocol
    return w.writer.Send(ctx, variant, sv) 
}

I did manual e2e tests w/ transcribestreaming,bedrockruntime, and cloudwatchlogs.

@lucix-aws lucix-aws requested review from a team as code owners March 19, 2026 14:50
@lucix-aws
Copy link
Copy Markdown
Contributor Author

relates #458

inner := c.Codec.Serializer()
ss := NewShapeSerializer(&msg, inner)

v.Serialize(ss)
Copy link
Copy Markdown
Contributor

@wty-Bryant wty-Bryant Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some dummy question: what's inside v.Serialize(ss) here? From first glance SerializeEventMessage just encodes payload passed from inner protocol serializer impl and sends that to the wire io.Writer, it's a little obscured to understand what happens inside the Serialize() without an implementation/test for now

Copy link
Copy Markdown
Contributor

@wty-Bryant wty-Bryant Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I understand here is input v contains the payload that will be serialized/encoded to eventmessage/http request, the runtime serialize workflow is decided by the type of v

DeserializeResponse(ctx context.Context, schema *smithy.OperationSchema, types *smithy.TypeRegistry, resp *Response, out smithy.Deserializable) error

// event stream APIs
HasInitialMessages() bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what this means, but from the ClientProtocol interface it's weird to read "HasInitialMessage" since yeah, there are some messages that are "initial". Maybe something like HasInitialEventMessage would make it way more clear that this just event stream related

}

// String returns the IDL microformat for the shape ID.
func (s *ShapeID) String() string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to not make this a pointer receiver?

Symbol opEventStreamConstructor,
Symbol opEventStreamSymbol) {

// For v2 (early-return) event streams, we must:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

// DeserializeInitialResponse reads the first event stream message and
// deserializes it as the operation output.
func (c *Codec) DeserializeInitialResponse(schema *smithy.Schema, r io.Reader, out smithy.Deserializable) error {
c.payloadBuf = c.payloadBuf[0:0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL this pattern

}

func (d *ShapeDeserializer) ReadString(s *smithy.Schema, v *string) error {
if d.inBindings {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly a question for me, but why inBindings mean look into event stream stuff?

if d.inBindings && isEventHeader(s) {
return readEventHeaderInt(d, s, v)
}
return d.inner.ReadInt8(s, v)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do any bounds check?

var unionSymbol = symbolProvider.toSymbol(union);
var implName = StringUtils.uncapitalize(union.getId().getName(serviceShape)) + "Reader";
var ifaceName = getEventStreamReaderInterfaceName(serviceShape, union);
var members = union.members().stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw a sorted when iterating through members, I noticed that our current code generation basically iterates this at random, so we generate a lot more releases of protocol test than we should https://github.com/aws/aws-sdk-go-v2/commits/main/internal/protocoltest/awsrestjson/deserializers.go


output, ok := out.Result.($output:P)
if out.Result != nil && !ok {
return out, md, $fmtErrorf:T("unexpected output result type: %T", out.Result)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add "Expected type X got Y"?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants