Skip to content

Commit

Permalink
ClientModel: AsyncResultCollection<T> and SSE event collection implem…
Browse files Browse the repository at this point in the history
…entation (#43840)

* Add async and sync enumerable client results

* Implement IAsyncDisposable

* Remove constrant and disposable; update ClientResult so response can be replaced in a polling paradigm

* rename and upate tests

* initial addition of files from joseharriaga/openai-in-typespec#68

* make it build

* hello world test

* bootstrap more tests

* more internal tests

* adding enumerator tests; haven't figured out the batch piece yet

* Make batch test pass

* remove collection-event functionality and add tests for public type

* reshuffle

* Add mock convenience SSE type to give POC of lazy request sending

* add tests of delayed request

* Add BinaryData factory method

* remove funcs for creating enumerators

* renames

* postpone call to protocol method from convenience APIs

* implement IAsyncDisposable correctly

* initial pass over cancellation token

* Per FDG, throw OperationCanceledException if cancellation token is cancelled.

* remove factory method taking Func<T> and provide example of layering convenience implementation in a way that postpones sending the request

* rename internal types and WIP adding reader tests

* nits

* parameterize terminal event; TBD to provide virtual method on collection type

* WIP: nits

* WIP: added concatenation of data lines per SSE spec

* updates and bug fixes

* add tests and update per SSE spec

* WIP: refactor to reuse field processing across sync and async methods

* make look a little more like the BCL type proposal

* simplify field implementation a bit

* cosmetic reworking of creating an event from a pending event

* Remove factory method from public API; move MockSseClient to Tests.Internal to access internal SSE types

* update API; reimplement mock client implementations without internal BinaryData enumerable

* Add sync client result collection abstraction

* tidy up and add tests

* add default constructor to ClientResult

* more tidy-up

* rename and add refdocs

* comments

* pr fb

* rework last event id and retry per BCL design shift

* add CHANGELOG entry
  • Loading branch information
annelo-msft authored May 14, 2024
1 parent 8d0a33c commit 2a0e020
Show file tree
Hide file tree
Showing 22 changed files with 1,512 additions and 9 deletions.
2 changes: 1 addition & 1 deletion sdk/core/Azure.Core.TestFramework/src/MockJsonModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Azure.Core.TestFramework
{
public class MockJsonModel : IJsonModel<MockJsonModel>
{
internal MockJsonModel()
public MockJsonModel()
{
}

Expand Down
4 changes: 4 additions & 0 deletions sdk/core/System.ClientModel/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
### Features Added

- Added `BufferResponse` property to `RequestOptions` so protocol method callers can turn off response buffering if desired.
- Added `AsyncResultCollection<T>` and `ResultCollection<T>` for clients to return from service methods where the service response contains a collection of values.
- Added `SetRawResponse` method to `ClientResult` to allow the response held by the result to be changed, for example by derived types that obtain multiple responses from polling the service.

### Breaking Changes

- `ClientResult.GetRawResponse` will now throw `InvalidOperationException` if called before the result's raw response is set, for example by collection result types that delay sending a request to the service until the collection is enumerated.

### Bugs Fixed

### Other Changes
Expand Down
17 changes: 16 additions & 1 deletion sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ public ApiKeyCredential(string key) { }
public static implicit operator System.ClientModel.ApiKeyCredential (string key) { throw null; }
public void Update(string key) { }
}
public abstract partial class AsyncResultCollection<T> : System.ClientModel.ClientResult, System.Collections.Generic.IAsyncEnumerable<T>
{
protected internal AsyncResultCollection() { }
protected internal AsyncResultCollection(System.ClientModel.Primitives.PipelineResponse response) { }
public abstract System.Collections.Generic.IAsyncEnumerator<T> GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
}
public abstract partial class BinaryContent : System.IDisposable
{
protected BinaryContent() { }
Expand All @@ -20,11 +26,13 @@ protected BinaryContent() { }
}
public partial class ClientResult
{
protected ClientResult() { }
protected ClientResult(System.ClientModel.Primitives.PipelineResponse response) { }
public static System.ClientModel.ClientResult<T?> FromOptionalValue<T>(T? value, System.ClientModel.Primitives.PipelineResponse response) { throw null; }
public static System.ClientModel.ClientResult FromResponse(System.ClientModel.Primitives.PipelineResponse response) { throw null; }
public static System.ClientModel.ClientResult<T> FromValue<T>(T value, System.ClientModel.Primitives.PipelineResponse response) { throw null; }
public System.ClientModel.Primitives.PipelineResponse GetRawResponse() { throw null; }
protected void SetRawResponse(System.ClientModel.Primitives.PipelineResponse response) { }
}
public partial class ClientResultException : System.Exception
{
Expand All @@ -36,10 +44,17 @@ public ClientResultException(string message, System.ClientModel.Primitives.Pipel
}
public partial class ClientResult<T> : System.ClientModel.ClientResult
{
protected internal ClientResult(T value, System.ClientModel.Primitives.PipelineResponse response) : base (default(System.ClientModel.Primitives.PipelineResponse)) { }
protected internal ClientResult(T value, System.ClientModel.Primitives.PipelineResponse response) { }
public virtual T Value { get { throw null; } }
public static implicit operator T (System.ClientModel.ClientResult<T> result) { throw null; }
}
public abstract partial class ResultCollection<T> : System.ClientModel.ClientResult, System.Collections.Generic.IEnumerable<T>, System.Collections.IEnumerable
{
protected internal ResultCollection() { }
protected internal ResultCollection(System.ClientModel.Primitives.PipelineResponse response) { }
public abstract System.Collections.Generic.IEnumerator<T> GetEnumerator();
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { throw null; }
}
}
namespace System.ClientModel.Primitives
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ public ApiKeyCredential(string key) { }
public static implicit operator System.ClientModel.ApiKeyCredential (string key) { throw null; }
public void Update(string key) { }
}
public abstract partial class AsyncResultCollection<T> : System.ClientModel.ClientResult, System.Collections.Generic.IAsyncEnumerable<T>
{
protected internal AsyncResultCollection() { }
protected internal AsyncResultCollection(System.ClientModel.Primitives.PipelineResponse response) { }
public abstract System.Collections.Generic.IAsyncEnumerator<T> GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
}
public abstract partial class BinaryContent : System.IDisposable
{
protected BinaryContent() { }
Expand All @@ -20,11 +26,13 @@ protected BinaryContent() { }
}
public partial class ClientResult
{
protected ClientResult() { }
protected ClientResult(System.ClientModel.Primitives.PipelineResponse response) { }
public static System.ClientModel.ClientResult<T?> FromOptionalValue<T>(T? value, System.ClientModel.Primitives.PipelineResponse response) { throw null; }
public static System.ClientModel.ClientResult FromResponse(System.ClientModel.Primitives.PipelineResponse response) { throw null; }
public static System.ClientModel.ClientResult<T> FromValue<T>(T value, System.ClientModel.Primitives.PipelineResponse response) { throw null; }
public System.ClientModel.Primitives.PipelineResponse GetRawResponse() { throw null; }
protected void SetRawResponse(System.ClientModel.Primitives.PipelineResponse response) { }
}
public partial class ClientResultException : System.Exception
{
Expand All @@ -36,10 +44,17 @@ public ClientResultException(string message, System.ClientModel.Primitives.Pipel
}
public partial class ClientResult<T> : System.ClientModel.ClientResult
{
protected internal ClientResult(T value, System.ClientModel.Primitives.PipelineResponse response) : base (default(System.ClientModel.Primitives.PipelineResponse)) { }
protected internal ClientResult(T value, System.ClientModel.Primitives.PipelineResponse response) { }
public virtual T Value { get { throw null; } }
public static implicit operator T (System.ClientModel.ClientResult<T> result) { throw null; }
}
public abstract partial class ResultCollection<T> : System.ClientModel.ClientResult, System.Collections.Generic.IEnumerable<T>, System.Collections.IEnumerable
{
protected internal ResultCollection() { }
protected internal ResultCollection(System.ClientModel.Primitives.PipelineResponse response) { }
public abstract System.Collections.Generic.IEnumerator<T> GetEnumerator();
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { throw null; }
}
}
namespace System.ClientModel.Primitives
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.Threading;

namespace System.ClientModel;

/// <summary>
/// Represents a collection of results returned from a cloud service operation.
/// </summary>
public abstract class AsyncResultCollection<T> : ClientResult, IAsyncEnumerable<T>
{
/// <summary>
/// Create a new instance of <see cref="AsyncResultCollection{T}"/>.
/// </summary>
/// <remarks>If no <see cref="PipelineResponse"/> is provided when the
/// <see cref="ClientResult"/> instance is created, it is expected that
/// a derived type will call <see cref="ClientResult.SetRawResponse(PipelineResponse)"/>
/// prior to a user calling <see cref="ClientResult.GetRawResponse"/>.
/// This constructor is indended for use by collection implementations that
/// postpone sending a request until <see cref="GetAsyncEnumerator(CancellationToken)"/>
/// is called. Such implementations will typically be returned from client
/// convenience methods so that callers of the methods don't need to
/// dispose the return value. </remarks>
protected internal AsyncResultCollection() : base()
{
}

/// <summary>
/// Create a new instance of <see cref="AsyncResultCollection{T}"/>.
/// </summary>
/// <param name="response">The <see cref="PipelineResponse"/> holding the
/// items in the collection, or the first set of the items in the collection.
/// </param>
protected internal AsyncResultCollection(PipelineResponse response) : base(response)
{
}

/// <inheritdoc/>
public abstract IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
61 changes: 57 additions & 4 deletions sdk/core/System.ClientModel/src/Convenience/ClientResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ namespace System.ClientModel;
/// </summary>
public class ClientResult
{
private readonly PipelineResponse _response;
private PipelineResponse? _response;

/// <summary>
/// Create a new instance of <see cref="ClientResult"/>.
/// </summary>
/// <remarks>If no <see cref="PipelineResponse"/> is provided when the
/// <see cref="ClientResult"/> instance is created, it is expected that
/// a derived type will call <see cref="SetRawResponse(PipelineResponse)"/>
/// prior to a user calling <see cref="GetRawResponse"/>.</remarks>
protected ClientResult()
{
}

/// <summary>
/// Create a new instance of <see cref="ClientResult"/> from a service
Expand All @@ -31,7 +42,39 @@ protected ClientResult(PipelineResponse response)
/// </summary>
/// <returns>the <see cref="PipelineResponse"/> received from the service.
/// </returns>
public PipelineResponse GetRawResponse() => _response;
/// <exception cref="InvalidOperationException">No
/// <see cref="PipelineResponse"/> value is currently available for this
/// <see cref="ClientResult"/> instance. This can happen when the instance
/// is a collection type like <see cref="AsyncResultCollection{T}"/>
/// that has not yet been enumerated.</exception>
public PipelineResponse GetRawResponse()
{
if (_response is null)
{
throw new InvalidOperationException("No response is associated " +
"with this result. If the result is a collection result " +
"type, this may be because no request has been sent to the " +
"server yet.");
}

return _response;
}

/// <summary>
/// Update the value returned from <see cref="GetRawResponse"/>.
/// </summary>
/// <remarks>This method may be called from types derived from
/// <see cref="ClientResult"/> that poll the service for status updates
/// or to retrieve additional collection values to update the raw response
/// to the response most recently returned from the service.</remarks>
/// <param name="response">The <see cref="PipelineResponse"/> to return
/// from <see cref="GetRawResponse"/>.</param>
protected void SetRawResponse(PipelineResponse response)
{
Argument.AssertNotNull(response, nameof(response));

_response = response;
}

#region Factory methods for ClientResult and subtypes

Expand All @@ -44,7 +87,11 @@ protected ClientResult(PipelineResponse response)
/// provided <paramref name="response"/>.
/// </returns>
public static ClientResult FromResponse(PipelineResponse response)
=> new ClientResult(response);
{
Argument.AssertNotNull(response, nameof(response));

return new ClientResult(response);
}

/// <summary>
/// Creates a new instance of <see cref="ClientResult{T}"/> that holds the
Expand All @@ -60,6 +107,8 @@ public static ClientResult FromResponse(PipelineResponse response)
/// </returns>
public static ClientResult<T> FromValue<T>(T value, PipelineResponse response)
{
Argument.AssertNotNull(response, nameof(response));

if (value is null)
{
string message = "ClientResult<T> contract guarantees that ClientResult<T>.Value is non-null. " +
Expand Down Expand Up @@ -90,7 +139,11 @@ public static ClientResult<T> FromValue<T>(T value, PipelineResponse response)
/// provided <paramref name="value"/> and <paramref name="response"/>.
/// </returns>
public static ClientResult<T?> FromOptionalValue<T>(T? value, PipelineResponse response)
=> new ClientResult<T?>(value, response);
{
Argument.AssertNotNull(response, nameof(response));

return new ClientResult<T?>(value, response);
}

#endregion
}
45 changes: 45 additions & 0 deletions sdk/core/System.ClientModel/src/Convenience/ResultCollectionOfT.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.ClientModel.Primitives;
using System.Collections;
using System.Collections.Generic;

namespace System.ClientModel;

/// <summary>
/// Represents a collection of results returned from a cloud service operation.
/// </summary>
public abstract class ResultCollection<T> : ClientResult, IEnumerable<T>
{
/// <summary>
/// Create a new instance of <see cref="ResultCollection{T}"/>.
/// </summary>
/// <remarks>If no <see cref="PipelineResponse"/> is provided when the
/// <see cref="ClientResult"/> instance is created, it is expected that
/// a derived type will call <see cref="ClientResult.SetRawResponse(PipelineResponse)"/>
/// prior to a user calling <see cref="ClientResult.GetRawResponse"/>.
/// This constructor is indended for use by collection implementations that
/// postpone sending a request until <see cref="GetEnumerator()"/>
/// is called. Such implementations will typically be returned from client
/// convenience methods so that callers of the methods don't need to
/// dispose the return value. </remarks>
protected internal ResultCollection() : base()
{
}

/// <summary>
/// Create a new instance of <see cref="ResultCollection{T}"/>.
/// </summary>
/// <param name="response">The <see cref="PipelineResponse"/> holding the
/// items in the collection, or the first set of the items in the collection.
/// </param>
protected internal ResultCollection(PipelineResponse response) : base(response)
{
}

/// <inheritdoc/>
public abstract IEnumerator<T> GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.ClientModel.Internal;

/// <summary>
/// Represents a collection of SSE events that can be enumerated as a C# async stream.
/// </summary>
internal class AsyncServerSentEventEnumerable : IAsyncEnumerable<ServerSentEvent>
{
private readonly Stream _contentStream;

public AsyncServerSentEventEnumerable(Stream contentStream)
{
Argument.AssertNotNull(contentStream, nameof(contentStream));

_contentStream = contentStream;

LastEventId = string.Empty;
ReconnectionInterval = Timeout.InfiniteTimeSpan;
}

public string LastEventId { get; private set; }

public TimeSpan ReconnectionInterval { get; private set; }

public IAsyncEnumerator<ServerSentEvent> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new AsyncServerSentEventEnumerator(_contentStream, this, cancellationToken);
}

private sealed class AsyncServerSentEventEnumerator : IAsyncEnumerator<ServerSentEvent>
{
private readonly ServerSentEventReader _reader;
private readonly AsyncServerSentEventEnumerable _enumerable;
private readonly CancellationToken _cancellationToken;

public ServerSentEvent Current { get; private set; }

public AsyncServerSentEventEnumerator(Stream contentStream,
AsyncServerSentEventEnumerable enumerable,
CancellationToken cancellationToken = default)
{
_reader = new(contentStream);
_enumerable = enumerable;
_cancellationToken = cancellationToken;
}

public async ValueTask<bool> MoveNextAsync()
{
ServerSentEvent? nextEvent = await _reader.TryGetNextEventAsync(_cancellationToken).ConfigureAwait(false);
_enumerable.LastEventId = _reader.LastEventId;
_enumerable.ReconnectionInterval = _reader.ReconnectionInterval;

if (nextEvent.HasValue)
{
Current = nextEvent.Value;
return true;
}

Current = default;
return false;
}

public ValueTask DisposeAsync()
{
// The creator of the enumerable has responsibility for disposing
// the content stream passed to the enumerable constructor.

#if NET6_0_OR_GREATER
return ValueTask.CompletedTask;
#else
return new ValueTask();
#endif
}
}
}
Loading

0 comments on commit 2a0e020

Please sign in to comment.