Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Proposal]: Support parsing server-sent events (SSE) #98105

Closed
stephentoub opened this issue Feb 7, 2024 · 19 comments · Fixed by #102238
Closed

[API Proposal]: Support parsing server-sent events (SSE) #98105

stephentoub opened this issue Feb 7, 2024 · 19 comments · Fixed by #102238
Assignees
Labels
api-approved API was approved in API review, it can be implemented area-System.Net blocking Marks issues that we want to fast track in order to unblock other important work in-pr There is an active PR which will close this issue when it is merged
Milestone

Comments

@stephentoub
Copy link
Member

stephentoub commented Feb 7, 2024

Background and motivation

SSE is becoming more and more popular, especially with prominent services like OpenAI relying on it for streaming responses. The format is very simple, but it still takes some amount of code to properly handle parsing the SSE format. We should have a built-in helper in either System.Net or System.Formats that take care of it for the developer (optionally then other higher-level helpers could be layered on top).

API Proposal

namespace System.Formats.Sse;

public readonly struct SseItem<T>
{
    public SseItem(T data, string eventType);
    public T Data { get; }
    public string EventType { get; }

    public const string EventTypeDefault = "message"; // defined as the default by SSE spec
}

public static class SseParser
{
    public static SseEnumerable<string> Parse(Stream sseStream); // equivalent to the other overload using Encoding.UTF8.GetString
    public static SseEnumerable<T> Parse<T>(Stream sseStream, SseItemParser<T> itemParser);
}

public delegate T SseItemParser<out T>(string eventType, ReadOnlySpan<byte> data);

public sealed class SseEnumerable<T> : IAsyncEnumerable<SseItem<T>>, IEnumerable<SseItem<T>>
{
    public IEnumerator<SseItem<T>> GetEnumerator();
    public IAsyncEnumerator<SseItem<T>> GetAsyncEnumerator(CancellationToken cancellationToken = default);

    public string LastEventId { get; }
    public TimeSpan ReconnectionInterval { get; }
}
  • Ownership of the Stream is not transferred to the enumerable. Parse{Async} will not dispose of the stream, nor will the returned enumerable, and it is up to the consumer to dispose of the stream when all use of it is done.
  • The enumerable will only be usable for a single enumeration. Calling Get{Async}Enumerable multiple times will fail. We could make it work in the future if really important, but for that to make sense you'd need to be working with a Stream that was seekable, and that's rare for these scenarios, which are typically networking-based.
  • ReconnectionInterval defaults to Timeout.InfiniteTimeSpan. The SSE specification states that if no retry is specified in the stream, the interval may be chosen by the client. By using InfiniteTimeSpan, this conveys to a client consuming this parser that they may choose their own value.
  • LastEventId defaults to empty string, per the SSE spec.

Open Issues

  • Single type that implements both IAsyncEnumerable and IEnumerable?
  • Single method that returns that single type? Naming?
  • ReconnectionInterval / LastEventId on each event or on the enumerable?
  • Assembly: netstandard2.0 support is needed for a variety of consumers, including the Azure SDK and Semantic Kernel. Proposal is to ship this as a new System.Formats.Sse nuget package that includes a netstandard2.0 asset. If there's an existing package it'd make sense to include this in, we could do that instead.

API Usage

HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<string> item in SseParser.Parse(responseStream, (_, bytes) => Encoding.Utf8.GetString(bytes)))
{
    Console.WriteLine(item.Data);
}
HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<T> item in SseParser.Parse(responseStream, (_, bytes) => JsonSerializer.Deserialize<T>(bytes)))
{
    Console.WriteLine(item.Data);
}

Alternative Designs

  • Always returning a byte[] instead of a callback that parses a span into a T
  • Also exposing a Serialize/Format method to put SseItem<T> instances onto a Stream, which ASP.NET could then use. This can come later when needed by ASP.NET.

Risks

  • We should have equivalent support in ASP.NET for serializing out an SSE stream, and it should use whatever shared SseItem<T> type is in the shared libraries, so we need to ensure it's designed accordingly.
@stephentoub stephentoub added api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Net labels Feb 7, 2024
@stephentoub stephentoub added this to the 9.0.0 milestone Feb 7, 2024
@ghost
Copy link

ghost commented Feb 7, 2024

Tagging subscribers to this area: @dotnet/ncl
See info in area-owners.md if you want to be subscribed.

Issue Details

Background and motivation

SSE is becoming more and more popular, especially with prominent services like OpenAI relying on it for streaming responses. The format is very simple, but it still takes some amount of code to properly handle parsing the SSE format. We should have a built-in helper in either System.Net or System.Formats that take care of it for the developer (optionally then other higher-level helpers could be layered on top).

API Proposal

namespace System.Formats.Sse;

public struct SseItem<T>
{
    public string? Event { get; set; }
    public T Data { get; set; }
}

public static class SseParser
{
    public static IAsyncEnumerable<SseItem<T>> ParseAsync<T>(Stream sseStream, Func<ReadOnlySpan<byte>, T> itemParser, CancellationToken cancellationToken = default);
}

API Usage

HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<string> item in SseParser.ParseAsync(responseStream, Encoding.Utf8.GetString))
{
    Console.WriteLine(item.Data);
}

Alternative Designs

  • Not having an SseItem and just having an IAsyncEnumerable of a tuple
  • Always returning a byte[] instead of a callback that parses a span into a T
  • Making it specific to System.Net, e.g. putting it somewhere in the System.Net.Http library.

Risks

  • The proposed API relies on ref structs being able to be used as a generic parameters and on Func being annotated accordingly. This is on track to happen for .NET 9 / C# 13, but if it doesn't, a different shape might be needed.
  • We should have equivalent support in ASP.NET for serializing out an SSE stream, and it should use whatever shared SseItem<T> type is in the shared libraries, so we need to ensure it's designed accordingly.
Author: stephentoub
Assignees: -
Labels:

api-suggestion, area-System.Net

Milestone: 9.0.0

@ericstj
Copy link
Member

ericstj commented Mar 22, 2024

helper in either System.Net or System.Formats that take care of it for the developer

Is the proposal here to have a separate assembly for this or place it in an existing assembly? The way this is designed it seems like you have a preference. Might be good to add some thoughts around why not for the alternatives.

What do you think are next steps here? cc @dotnet/ncl @bartonjs

@stephentoub
Copy link
Member Author

Is the proposal here to have a separate assembly for this or place it in an existing assembly?

Ideally I think it would be a separate assembly that just worked in terms of Stream and that had a netstandard2.0 asset, so that it could be used downlevel by AI-related components that need it.

What do you think are next steps here?

  1. Prototype it and make sure it meets the needs of various known existing consumers, e.g. https://github.com/microsoft/semantic-kernel/tree/9b8a218f5b6df9dcc72d69d989aff9b904cdecf0/dotnet/src/InternalUtilities/src/Text and https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs.
  2. Sync with ASP.NET folks on what we should be doing here for production and not just consumption: we might want both sides in this library, both reader and writer.
  3. Review, implement, test, ship it.

@trrwilson
Copy link

First: it's very cool to see a proposal to standardize server-sent events in this manner. It'll be a huge help to have this for consumers like OpenAI, which leans heavily on the pattern for its streamed REST response payloads.

With that, I'd suggest the API proposal be expanded to proactively cover the entirety of the spec (as outlined at https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation):

public struct SseItem<T>
{
    public string EventName { get; set; } = "message"; // per spec, "message" is the implicit default and null is impossible
    public T Data { get; set; }
    public string LastEventId { get; set; } // "id" in spec
    public TimeSpan ReconnectionInterval { get; set; } // "retry" in spec
}

Notably, the parsing logic then needs to record and propagate LastEventId as the note mentions:

If an event doesn't have an "id" field, but an earlier event did set the event source's last event ID string, then the event's lastEventId field will be set to the value of whatever the last seen "id" field was.

I haven't yet encountered use of the reconnection-related fields (id/retry) but then I also hadn't previously seen a reliance on event until it was later introduced -- just covering it all at once seems prudent.

@eiriktsarpalis
Copy link
Member

Any reason why the envelope type is not a readonly struct?

@stephentoub
Copy link
Member Author

Any reason why the envelope type is not a readonly struct?

It could be, then adding a constructor for the data. It's a simple POCO.

@stephentoub stephentoub added api-ready-for-review API is ready for review, it is NOT ready for implementation and removed api-suggestion Early API idea and discussion, it is NOT ready for implementation labels May 13, 2024
@stephentoub stephentoub self-assigned this May 13, 2024
@dlyz
Copy link

dlyz commented May 14, 2024

I've implemented something close to some of the editions of this proposal for my own purposes and now sharing it here in case someone finds it useful. Note that the code hasn't been heavily tested, so it probably contains few bugs. Anyway I tried to implement it as close to the spec as I could (including event dispatch instructions for browsers) and to make it as efficient as I could (to a reasonable extent).

Notes about incompatibilities with the proposal:

  • There is no sync (IEnumerable) support, as I don't need it, but it would be very straightforward to add.
  • LastEventId is a part of the event item and I think it may be useful because LastEventId may mean 'EventId' in some scenarios or because it will be more consistent in case of reconnect implementation.
  • LastEventId and ReconnectionInterval is not part of the stream. Firstly, it matches the edition of the proposal on the moment I started to implement parser. Secondly, it is not obvious how thread safe it should be, when exactly this properties of the stream object should be updated, etc. And it might be not obvious for the user too.

As a bonus you can find in the gist demo of streaming request to the OpenAI API, few tests (examples from the spec), and implementation of SseResult<TValue> : IResult for ASP.NET.

@stephentoub
Copy link
Member Author

stephentoub commented May 14, 2024

I've implemented something close to some of the editions of this proposal for my own purposes and now sharing it here in case someone finds it useful.

Thanks! I have an implementation ready to go once this is approved, but appreciate your sharing none-the-less. It's great to know that the general shape has worked well for you.

LastEventId and ReconnectionInterval is not part of the stream. Firstly, it matches the edition of the proposal on the moment I started to implement parser. Secondly, it is not obvious how thread safe it should be, when exactly this properties of the stream object should be updated, etc. And it might be not obvious for the user too.

I think it's ok. The properties are possibly modified during a call to MoveNextAsync, and so you shouldn't be reading them during such a call, but that maps to normal concurrency rules around such objects. If someone just writes a normal foreach loop, they'll naturally fall into a pit of success in this regard.

LastEventId is a part of the event item and I think it may be useful because LastEventId may mean 'EventId' in some scenarios or because it will be more consistent in case of reconnect implementation.

I think they're effectively one in the same, as the spec details that the last event ID is retained and used for subsequent events that don't include their own ID: "Initialize event's type attribute to "message", its data attribute to data, its origin attribute to the serialization of the origin of the event stream's final URL (i.e., the URL after redirects), and its lastEventId attribute to the last event ID string of the event source" (in this regard the enumerable is effectively the equivalent of the EventSource). And by putting it on the enumerable rather than on each event, we streamline the event a bit and pass less data around. It's also expected that a consumer doesn't actually need the last event ID on each message; it's only necessary when a reconnect is desired.

@dlyz
Copy link

dlyz commented May 14, 2024

I think it's ok. The properties are possibly modified during a call to MoveNextAsync, and so you shouldn't be reading them during such a call

Yeah, probably you are right, there is no need to overthink this for some weird non-practical use-cases.

It's also expected that a consumer doesn't actually need the last event ID on each message; it's only necessary when a reconnect is desired.

I assumed id may be used not only for reconnects, but also have some application level meaning, but I am not an expert in SSE, maybe id shouldn't be used this way.

The second concern was about consistency, we should guarantee that the user has processed the event with lastEventId from the stream object before the possible reconnect. But on second thought I don't see any problems with that if we don't count application-level event processing errors that should not probably lead to reconnect.

@stephentoub stephentoub added the blocking Marks issues that we want to fast track in order to unblock other important work label May 15, 2024
@dotnet-policy-service dotnet-policy-service bot added the in-pr There is an active PR which will close this issue when it is merged label May 15, 2024
@stephentoub
Copy link
Member Author

stephentoub commented May 15, 2024

@davidfowl, has anyone looked into what ASP.NET would want here? I'm imagining it would need some kind of SseFormatter for writing out events, which would be trivial, much more so than the parser, and that could be added later if desirable (might be easier for that part to just be in ASP.NET). I'm more interested in the SseItem struct here, as I'd hope that could be shared. It occurs to me that could influence the open question about whether there's a Retry and Id on the struct and whether they're optional, though as optional they could also be added later (with a new constructor).

@davidfowl
Copy link
Member

No nobody has looked, but what you said is right, though I don't know if we need it, the format is simple enough. I think we'd want to support returning a SseResult<T> or SseEnumerable<T> natively from a minimal API or controller (via IResult). We could probably also adopt this client side in SignalR and replace our home grown impl (is this .NET 9 only?)

cc @captainsafia @BrennanConroy

@Krzysztof318
Copy link

In the sample, always is returned a generic SseItem<T> what if sse source returns different types of objects?

@stephentoub
Copy link
Member Author

stephentoub commented May 16, 2024

In the sample, always is returned a generic SseItem<T> what if sse source returns different types of objects?

Then either a) you can make T a discriminated union, b) you can make T be object and have the consumer type test, c) you can just have T be byte[] containing the UTF8 bytes and transform it at the consumer (the delegate just calls ToArray), or d) you can have T be string and transform it at the consumer (the delegate just calls Encoding.UTF8.GetString). The proposed Parse overload that doesn't take a delegate does (d).

@stephentoub
Copy link
Member Author

though I don't know if we need it, the format is simple enough

Yeah

I think we'd want to support returning a SseResult<T>

Is that the same as SseItem<T>? I could rename it that if we like it better.

or SseEnumerable<T> natively from a minimal API or controller (via IResult)

I was thinking we'd want to support IAsyncEnunerable<SseItem<T>> being returned.

We could probably also adopt this client side in SignalR and replace our home grown impl (is this .NET 9 only?)

I forgot signalr had an implementation. That'd be great. The plan here, and the draft PR, creates a nupkg which includes downlevel assets.

@davidfowl
Copy link
Member

Is that the same as SseItem? I could rename it that if we like it better.

I mean something that implemented the ASP.NET Core IResult, similar to @dlyz implemented https://gist.github.com/dlyz/1c2f892e482f599093bdb9021e20c26f#file-04_sseresult-cs-L12.

I was thinking we'd want to support IAsyncEnunerable<SseItem> being returned.

Yes, we would want to support this natively as well. One point of ambiguity is if the T should be JSON serialized in the ASP.NET Core case. We need to do something with it. If you don't want this behavior, is there a way to opt out?

@BrennanConroy
Copy link
Member

Looks like this should be fairly easy to adopt in client-side SignalR, our parser delegate would just be data.ToArray() since we need to pass on a ReadOnlyMemory to another layer for actual parsing.

I do find it very odd that AI folks are investing in SSE though. WebSockets and Http response streaming are great for streaming data. They also both allow binary data, whereas SSE is text only.

@stephentoub stephentoub changed the title [API Proposal]: Support parsing server-sent events (SSE) with HttpClient response [API Proposal]: Support parsing server-sent events (SSE) May 20, 2024
@terrajobst
Copy link
Member

terrajobst commented May 28, 2024

Video

  • The verb Parse is unfortunate because it doesn't actually do any parsing, it creates an object that does the parsing as data is enumerated. Hence we'd like to split the two concepts.
  • We're concerned that having one object for both enumerables makes the async foreach harder to discover. If we have to different methods, one returning an IAsyncEnumerable than people can't accidentally foreach in an async context.
  • We discussed whether the call should be able to access the payload buffer but decided against due to lifetime complexity. If there is a scenario we can add a marshal-like API that receives the current buffer.
  • We discussed whether the delegate should take a state but decided against it.
  • We concluded that System.Net is more logical based on consumers
namespace System.Net.ServerSentEvents;

public readonly struct SseItem<T>
{
    public SseItem(T data, string eventType);
    public T Data { get; }
    public string EventType { get; }
}

public delegate T SseItemParser<out T>(string eventType, ReadOnlySpan<byte> data);

public static class SseParser
{
    public const string EventTypeDefault = "message";

    public static SseParser<string> Create(Stream sseStream);
    public static SseParser<T> Create<T>(Stream sseStream, SseItemParser<T> itemParser);
}

public sealed class SseParser<T>
{
    public IEnumerable<SseItem<T>> Enumerate();
    public IAsyncEnumerable<SseItem<T>> EnumerateAsync();

    public string LastEventId { get; }
    public TimeSpan ReconnectionInterval { get; }
}

@davidfowl
Copy link
Member

@captainsafia @BrennanConroy Can we make sure to add follow up issues on ASP.NET Core for SignalR and minimal APIs?

@davidfowl
Copy link
Member

I do find it very odd that AI folks are investing in SSE though. WebSockets and Http response streaming are great for streaming data. They also both allow binary data, whereas SSE is text only.

I agree!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-approved API was approved in API review, it can be implemented area-System.Net blocking Marks issues that we want to fast track in order to unblock other important work in-pr There is an active PR which will close this issue when it is merged
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants