Skip to content

Commit

Permalink
AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression (#…
Browse files Browse the repository at this point in the history
…2439)

* AVRO-3802: [Csharp] Fix memory leak on avro deflate codec decompression

* AVRO-3802: [Csharp] Address PR comments

* AVRO-3802: [Csharp] Revert IDisposable change in Encoder and Decoder

* AVRO-3802: [Csharp] Remove implicit filtering of target sequence

---------

Co-authored-by: Camil Abraham <[email protected]>
  • Loading branch information
CamilAbraham and Camil Abraham authored Aug 27, 2023
1 parent 23c4700 commit 8eab8f1
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 77 deletions.
32 changes: 7 additions & 25 deletions lang/csharp/src/apache/main/File/DeflateCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,14 @@ public override void Compress(MemoryStream inputStream, MemoryStream outputStrea
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{

MemoryStream inStream = new MemoryStream(compressedData);
MemoryStream outStream = new MemoryStream();

using (DeflateStream Decompress =
new DeflateStream(inStream,
CompressionMode.Decompress))
{
CopyTo(Decompress, outStream);
}

return outStream.ToArray();
}

/// <summary>
/// Copies to stream.
/// </summary>
/// <param name="from">stream you are copying from</param>
/// <param name="to">stream you are copying to</param>
private static void CopyTo(Stream from, Stream to)
{
byte[] buffer = new byte[4096];
int read;
while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
using (MemoryStream inStream = new MemoryStream(compressedData, 0, length))
using (MemoryStream outStream = new MemoryStream())
{
to.Write(buffer, 0, read);
using (DeflateStream decompress = new DeflateStream(inStream, CompressionMode.Decompress))
{
decompress.CopyTo(outStream);
}
return outStream.ToArray();
}
}

Expand Down
29 changes: 15 additions & 14 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using Avro.IO;
using System.IO;
using System.Linq;

namespace Avro.Generic
{
Expand Down Expand Up @@ -290,21 +291,21 @@ protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Sch
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs.Fields.Where(rf => !writerSchema.Contains(rf.Name)))
{
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

object obj = null;
TryGetField(rec, rf.Name, rf.Pos, out obj);
AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
}

return rec;
Expand Down
40 changes: 21 additions & 19 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
var readerDefaultOrdinal = null != readerSchema.Default ? readerSchema.Ordinal(readerSchema.Default) : -1;

foreach (var symbol in writerSchema.Symbols)
{
{
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
Expand Down Expand Up @@ -274,27 +274,29 @@ private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSch
{
if (writerSchema.Contains(rf.Name)) continue;

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
using (var defaultStream = new MemoryStream())
{
var defaultEncoder = new BinaryEncoder(defaultStream);

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
var defaultBytes = defaultStream.ToArray();

var readItem = ResolveReader(rf.Schema, rf.Schema);
var readItem = ResolveReader(rf.Schema, rf.Schema);

var rfInstance = rf;
if(IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream( defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
var rfInstance = rf;
if (IsReusable(rf.Schema.Tag))
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
else
{
readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
}
}
}

Expand Down
14 changes: 7 additions & 7 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Avro.IO
/// </summary>
public class BinaryEncoder : Encoder
{
private readonly Stream Stream;
private readonly Stream stream;

/// <summary>
/// Initializes a new instance of the <see cref="BinaryEncoder"/> class without a backing
Expand All @@ -42,7 +42,7 @@ public BinaryEncoder() : this(null)
/// <param name="stream">Stream to write to.</param>
public BinaryEncoder(Stream stream)
{
this.Stream = stream;
this.stream = stream;
}

/// <summary>
Expand Down Expand Up @@ -203,30 +203,30 @@ public void WriteFixed(byte[] data)
/// <inheritdoc/>
public void WriteFixed(byte[] data, int start, int len)
{
Stream.Write(data, start, len);
stream.Write(data, start, len);
}

private void writeBytes(byte[] bytes)
{
Stream.Write(bytes, 0, bytes.Length);
stream.Write(bytes, 0, bytes.Length);
}

private void writeBytes(byte[] bytes, int offset, int length)
{
Stream.Write(bytes, offset, length);
stream.Write(bytes, offset, length);
}

private void writeByte(byte b)
{
Stream.WriteByte(b);
stream.WriteByte(b);
}

/// <summary>
/// Flushes the underlying stream.
/// </summary>
public void Flush()
{
Stream.Flush();
stream.Flush();
}
}
}
24 changes: 13 additions & 11 deletions lang/csharp/src/apache/main/Specific/SpecificReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,22 @@ protected override object ReadRecord(object reuse, RecordSchema writerSchema, Sc
}
}

var defaultStream = new MemoryStream();
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
using (var defaultStream = new MemoryStream())
{
if (writerSchema.Contains(rf.Name)) continue;
var defaultEncoder = new BinaryEncoder(defaultStream);
var defaultDecoder = new BinaryDecoder(defaultStream);
foreach (Field rf in rs)
{
if (writerSchema.Contains(rf.Name)) continue;

defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading
defaultStream.Position = 0; // reset for writing
Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
defaultStream.Flush();
defaultStream.Position = 0; // reset for reading

obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
obj = rec.Get(rf.Pos);
rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
}
}

return rec;
Expand Down
2 changes: 1 addition & 1 deletion lang/csharp/src/apache/test/File/FileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
Expand Down Expand Up @@ -555,7 +556,6 @@ private static IEnumerable<TestCaseData> TestPartialReadSource()
/// position in stream
/// </summary>
/// <param name="schemaStr"></param>
/// <param name="value"></param>
/// <param name="codecType"></param>
[TestCaseSource(nameof(TestPartialReadSource))]
public void TestPartialRead(string schemaStr, Codec.Type codecType, int position, int expectedRecords)
Expand Down

0 comments on commit 8eab8f1

Please sign in to comment.