Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offer API to adjust the ArrayPool #1122

Merged
merged 4 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void Serialize(ref MessagePackWriter writer, TCollection value, MessagePa
}
else
{
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
var scratch = scratchRental.Value;
MessagePackWriter scratchWriter = writer.Clone(scratch);
Expand Down Expand Up @@ -950,7 +950,7 @@ public void Serialize(ref MessagePackWriter writer, IEnumerable value, MessagePa

IMessagePackFormatter<object> formatter = options.Resolver.GetFormatterWithVerify<object>();

using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
var scratch = scratchRental.Value;
MessagePackWriter scratchWriter = writer.Clone(scratch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void Serialize(ref MessagePackWriter writer, object value, MessagePackSer
}

// mark will be written at the end, when size is known
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
MessagePackWriter scratchWriter = writer.Clone(scratchRental.Value);
scratchWriter.WriteString(typeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public partial class MessagePackSerializer
/// <exception cref="MessagePackSerializationException">Thrown if an error occurs during serialization.</exception>
public static void SerializeToJson<T>(TextWriter textWriter, T obj, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
using (var sequenceRental = SequencePool.Shared.Rent())
options = options ?? DefaultOptions;

using (var sequenceRental = options.Pool.Rent())
{
var msgpackWriter = new MessagePackWriter(sequenceRental.Value)
{
Expand Down Expand Up @@ -85,7 +87,7 @@ public static void ConvertToJson(ref MessagePackReader reader, TextWriter jsonWr
{
if (options.Compression.IsCompression())
{
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
if (TryDecompress(ref reader, scratchRental.Value))
{
Expand Down Expand Up @@ -133,7 +135,9 @@ public static void ConvertFromJson(string str, ref MessagePackWriter writer, Mes
/// </summary>
public static byte[] ConvertFromJson(string str, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
using (var scratchRental = SequencePool.Shared.Rent())
options = options ?? DefaultOptions;

using (var scratchRental = options.Pool.Rent())
{
var writer = new MessagePackWriter(scratchRental.Value)
{
Expand All @@ -155,14 +159,15 @@ public static byte[] ConvertFromJson(string str, MessagePackSerializerOptions op
public static void ConvertFromJson(TextReader reader, ref MessagePackWriter writer, MessagePackSerializerOptions options = null)
{
options = options ?? DefaultOptions;

if (options.Compression.IsCompression())
{
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
MessagePackWriter scratchWriter = writer.Clone(scratchRental.Value);
using (var jr = new TinyJsonReader(reader, false))
{
FromJsonCore(jr, ref scratchWriter);
FromJsonCore(jr, ref scratchWriter, options);
}

scratchWriter.Flush();
Expand All @@ -173,12 +178,12 @@ public static void ConvertFromJson(TextReader reader, ref MessagePackWriter writ
{
using (var jr = new TinyJsonReader(reader, false))
{
FromJsonCore(jr, ref writer);
FromJsonCore(jr, ref writer, options);
}
}
}

private static uint FromJsonCore(TinyJsonReader jr, ref MessagePackWriter writer)
private static uint FromJsonCore(TinyJsonReader jr, ref MessagePackWriter writer, MessagePackSerializerOptions options)
{
uint count = 0;
while (jr.Read())
Expand All @@ -189,10 +194,10 @@ private static uint FromJsonCore(TinyJsonReader jr, ref MessagePackWriter writer
break;
case TinyJsonToken.StartObject:
// Set up a scratch area to serialize the collection since we don't know its length yet, which must be written first.
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
MessagePackWriter scratchWriter = writer.Clone(scratchRental.Value);
var mapCount = FromJsonCore(jr, ref scratchWriter);
var mapCount = FromJsonCore(jr, ref scratchWriter, options);
scratchWriter.Flush();

mapCount = mapCount / 2; // remove propertyname string count.
Expand All @@ -206,10 +211,10 @@ private static uint FromJsonCore(TinyJsonReader jr, ref MessagePackWriter writer
return count; // break
case TinyJsonToken.StartArray:
// Set up a scratch area to serialize the collection since we don't know its length yet, which must be written first.
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
MessagePackWriter scratchWriter = writer.Clone(scratchRental.Value);
var arrayCount = FromJsonCore(jr, ref scratchWriter);
var arrayCount = FromJsonCore(jr, ref scratchWriter, options);
scratchWriter.Flush();

writer.WriteArrayHeader(arrayCount);
Expand Down Expand Up @@ -299,7 +304,7 @@ private static void ToJsonCore(ref MessagePackReader reader, TextWriter writer,
WriteJsonString(reader.ReadString(), writer);
break;
case MessagePackType.Binary:
ArraySegment<byte> segment = ByteArraySegmentFormatter.Instance.Deserialize(ref reader, DefaultOptions);
ArraySegment<byte> segment = ByteArraySegmentFormatter.Instance.Deserialize(ref reader, options);
writer.Write("\"" + Convert.ToBase64String(segment.Array, segment.Offset, segment.Count) + "\"");
break;
case MessagePackType.Array:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static void Serialize<T>(ref MessagePackWriter writer, T value, MessagePa
{
if (options.Compression.IsCompression() && !PrimitiveChecker<T>.IsMessagePackFixedSizePrimitive)
{
using (var scratchRental = SequencePool.Shared.Rent())
using (var scratchRental = options.Pool.Rent())
{
var scratch = scratchRental.Value;
MessagePackWriter scratchWriter = writer.Clone(scratch);
Expand Down Expand Up @@ -119,7 +119,8 @@ public static byte[] Serialize<T>(T value, MessagePackSerializerOptions options
scratchArray = array = new byte[65536];
}

var msgpackWriter = new MessagePackWriter(SequencePool.Shared, array)
options = options ?? DefaultOptions;
var msgpackWriter = new MessagePackWriter(options.Pool, array)
{
CancellationToken = cancellationToken,
};
Expand All @@ -137,8 +138,10 @@ public static byte[] Serialize<T>(T value, MessagePackSerializerOptions options
/// <exception cref="MessagePackSerializationException">Thrown when any error occurs during serialization.</exception>
public static void Serialize<T>(Stream stream, T value, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
options = options ?? DefaultOptions;
cancellationToken.ThrowIfCancellationRequested();
using (SequencePool.Rental sequenceRental = SequencePool.Shared.Rent())

using (SequencePool.Rental sequenceRental = options.Pool.Rent())
{
Serialize<T>(sequenceRental.Value, value, options, cancellationToken);

Expand Down Expand Up @@ -168,8 +171,10 @@ public static void Serialize<T>(Stream stream, T value, MessagePackSerializerOpt
/// <exception cref="MessagePackSerializationException">Thrown when any error occurs during serialization.</exception>
public static async Task SerializeAsync<T>(Stream stream, T value, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
options = options ?? DefaultOptions;
cancellationToken.ThrowIfCancellationRequested();
using (SequencePool.Rental sequenceRental = SequencePool.Shared.Rent())

using (SequencePool.Rental sequenceRental = options.Pool.Rent())
{
Serialize<T>(sequenceRental.Value, value, options, cancellationToken);

Expand Down Expand Up @@ -222,7 +227,7 @@ public static T Deserialize<T>(ref MessagePackReader reader, MessagePackSerializ
{
if (options.Compression.IsCompression())
{
using (var msgPackUncompressedRental = SequencePool.Shared.Rent())
using (var msgPackUncompressedRental = options.Pool.Rent())
{
var msgPackUncompressed = msgPackUncompressedRental.Value;
if (TryDecompress(ref reader, msgPackUncompressed))
Expand Down Expand Up @@ -315,12 +320,14 @@ public static T Deserialize<T>(ReadOnlyMemory<byte> buffer, MessagePackSerialize
/// </remarks>
public static T Deserialize<T>(Stream stream, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
options = options ?? DefaultOptions;

if (TryDeserializeFromMemoryStream(stream, options, cancellationToken, out T result))
{
return result;
}

using (var sequenceRental = SequencePool.Shared.Rent())
using (var sequenceRental = options.Pool.Rent())
{
var sequence = sequenceRental.Value;
try
Expand Down Expand Up @@ -362,12 +369,14 @@ public static T Deserialize<T>(Stream stream, MessagePackSerializerOptions optio
/// </remarks>
public static async ValueTask<T> DeserializeAsync<T>(Stream stream, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
options = options ?? DefaultOptions;

if (TryDeserializeFromMemoryStream(stream, options, cancellationToken, out T result))
{
return result;
}

using (var sequenceRental = SequencePool.Shared.Rent())
using (var sequenceRental = options.Pool.Rent())
{
var sequence = sequenceRental.Value;
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using Nerdbank.Streams;

namespace MessagePack
{
Expand Down Expand Up @@ -60,6 +61,7 @@ protected MessagePackSerializerOptions(MessagePackSerializerOptions copyFrom)
this.OmitAssemblyVersion = copyFrom.OmitAssemblyVersion;
this.AllowAssemblyVersionMismatch = copyFrom.AllowAssemblyVersionMismatch;
this.Security = copyFrom.Security;
this.Pool = copyFrom.Pool;
}

/// <summary>
Expand Down Expand Up @@ -113,6 +115,11 @@ protected MessagePackSerializerOptions(MessagePackSerializerOptions copyFrom)
/// </value>
public MessagePackSecurity Security { get; private set; } = MessagePackSecurity.TrustedData;

/// <summary>
/// Gets a thread-safe pool of reusable <see cref="Sequence{T}"/> objects.
/// </summary>
public SequencePool Pool { get; private set; } = new SequencePool();
AArnott marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Gets a type given a string representation of the type.
/// </summary>
Expand Down Expand Up @@ -259,6 +266,28 @@ public MessagePackSerializerOptions WithSecurity(MessagePackSecurity security)
return result;
}

/// <summary>
/// Gets a copy of these options with the <see cref="Pool"/> property set to a new value.
/// </summary>
/// <param name="pool">The new value for the <see cref="Pool"/> property.</param>
/// <returns>The new instance.</returns>
public MessagePackSerializerOptions WithPool(SequencePool pool)
{
if (pool is null)
{
throw new ArgumentNullException(nameof(pool));
}

if (this.Pool == pool)
{
return this;
}

var result = this.Clone();
AArnott marked this conversation as resolved.
Show resolved Hide resolved
result.Pool = pool;
return result;
}

/// <summary>
/// Creates a clone of this instance with the same properties set.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public partial class MessagePackStreamReader : IDisposable
{
private readonly Stream stream;
private readonly bool leaveOpen;
private SequencePool.Rental sequenceRental = SequencePool.Shared.Rent();
private SequencePool.Rental sequenceRental;
private SequencePosition? endOfLastMessage;

/// <summary>
Expand All @@ -39,9 +39,36 @@ public MessagePackStreamReader(Stream stream)
/// <param name="stream">The stream to read from.</param>
/// <param name="leaveOpen">If true, leaves the stream open after this <see cref="MessagePackStreamReader"/> is disposed; otherwise, false.</param>
public MessagePackStreamReader(Stream stream, bool leaveOpen)
: this(stream, SequencePool.Shared, leaveOpen)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessagePackStreamReader"/> class.
/// </summary>
/// <param name="stream">The stream to read from. This stream will be disposed of when this <see cref="MessagePackStreamReader"/> is disposed.</param>
/// <param name="pool">The pool to get result.</param>
public MessagePackStreamReader(Stream stream, SequencePool pool)
: this(stream, pool, leaveOpen: false)
{
}
AArnott marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Initializes a new instance of the <see cref="MessagePackStreamReader"/> class.
/// </summary>
/// <param name="stream">The stream to read from.</param>
/// <param name="pool">The pool to get result.</param>
/// <param name="leaveOpen">If true, leaves the stream open after this <see cref="MessagePackStreamReader"/> is disposed; otherwise, false.</param>
public MessagePackStreamReader(Stream stream, SequencePool pool, bool leaveOpen)
AArnott marked this conversation as resolved.
Show resolved Hide resolved
{
if (pool == null)
{
throw new ArgumentNullException(nameof(pool));
}

this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
this.leaveOpen = leaveOpen;
this.sequenceRental = pool.Rent();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ namespace MessagePack
/// <summary>
/// A thread-safe, alloc-free reusable object pool.
/// </summary>
internal class SequencePool
#if MESSAGEPACK_INTERNAL
internal
#else
public
#endif
class SequencePool
{
/// <summary>
/// A thread-safe pool of reusable <see cref="Sequence{T}"/> objects.
/// </summary>
/// <remarks>
/// We use a <see cref="maxSize"/> that allows every processor to be involved in messagepack serialization concurrently,
/// plus one nested serialization per processor (since LZ4 and sometimes other nested serializations may exist).
/// </remarks>
internal static readonly SequencePool Shared = new SequencePool(Environment.ProcessorCount * 2);
internal static readonly SequencePool Shared = new SequencePool();

/// <summary>
/// The value to use for <see cref="Sequence{T}.MinimumSpanLength"/>.
Expand All @@ -41,18 +42,41 @@ internal class SequencePool
/// <summary>
/// The array pool which we share with all <see cref="Sequence{T}"/> objects created by this <see cref="SequencePool"/> instance.
/// </summary>
private readonly ArrayPool<byte> arrayPool;

/// <summary>
/// Initializes a new instance of the <see cref="SequencePool"/> class.
/// </summary>
/// <remarks>
/// We use a <see cref="maxSize"/> that allows every processor to be involved in messagepack serialization concurrently,
/// plus one nested serialization per processor (since LZ4 and sometimes other nested serializations may exist).
/// </remarks>
public SequencePool()
: this(Environment.ProcessorCount * 2, ArrayPool<byte>.Create(80 * 1024, 100))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SequencePool"/> class.
/// </summary>
/// <param name="maxSize">The maximum size to allow the pool to grow.</param>
/// <devremarks>
/// We allow 100 arrays to be shared (instead of the default 50) and reduce the max array length from the default 1MB to something more reasonable for our expected use.
/// </devremarks>
private readonly ArrayPool<byte> arrayPool = ArrayPool<byte>.Create(80 * 1024, 100);
public SequencePool(int maxSize)
: this(maxSize, ArrayPool<byte>.Create(80 * 1024, 100))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SequencePool"/> class.
/// </summary>
/// <param name="maxSize">The maximum size to allow the pool to grow.</param>
internal SequencePool(int maxSize)
/// <param name="arrayPool">Array pool that will be used.</param>
public SequencePool(int maxSize, ArrayPool<byte> arrayPool)
{
this.maxSize = maxSize;
this.arrayPool = arrayPool;
}

/// <summary>
Expand Down
Loading