Skip to content

Commit

Permalink
First part of fix for #400 (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
cowtowncoder authored Oct 15, 2023
1 parent 4260f58 commit 5bffe8b
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.fasterxml.jackson.core.format.InputAccessor;
import com.fasterxml.jackson.core.format.MatchStrength;
import com.fasterxml.jackson.core.io.IOContext;

import com.fasterxml.jackson.core.util.RecyclerPool;
import com.fasterxml.jackson.dataformat.avro.apacheimpl.ApacheCodecRecycler;
import com.fasterxml.jackson.dataformat.avro.apacheimpl.AvroRecyclerPools;
import com.fasterxml.jackson.dataformat.avro.deser.*;

/**
Expand Down Expand Up @@ -40,6 +42,12 @@ public class AvroFactory extends JsonFactory
/**********************************************************
*/

/**
* @since 2.16
*/
protected RecyclerPool<ApacheCodecRecycler> _avroRecyclerPool
= AvroRecyclerPools.defaultPool();

protected int _avroParserFeatures;

protected int _avroGeneratorFeatures;
Expand Down Expand Up @@ -463,6 +471,7 @@ protected AvroGenerator _createGenerator(OutputStream out, IOContext ctxt) throw
{
int feats = _avroGeneratorFeatures;
AvroGenerator gen = new AvroGenerator(ctxt, _generatorFeatures, feats,
_avroRecyclerPool.acquireAndLinkPooled(),
_objectCodec, out);
return gen;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ private Feature(boolean defaultState) {
* @since 2.16
*/
protected final static EncoderFactory ENCODER_FACTORY = EncoderFactory.get();


/**
* @since 2.16
*/
protected ApacheCodecRecycler _apacheCodecRecycler;

/**
* @since 2.16
*/
Expand Down Expand Up @@ -144,6 +149,7 @@ private Feature(boolean defaultState) {
*/

public AvroGenerator(IOContext ctxt, int jsonFeatures, int avroFeatures,
ApacheCodecRecycler apacheCodecRecycler,
ObjectCodec codec, OutputStream output)
throws IOException
{
Expand All @@ -153,8 +159,9 @@ public AvroGenerator(IOContext ctxt, int jsonFeatures, int avroFeatures,
_output = output;
_avroContext = AvroWriteContext.nullContext();

_apacheCodecRecycler = apacheCodecRecycler;
final boolean buffering = isEnabled(Feature.AVRO_BUFFERING);
BinaryEncoder encoderToReuse = ApacheCodecRecycler.acquireEncoder();
BinaryEncoder encoderToReuse = _apacheCodecRecycler.acquireEncoder();
_encoder = buffering
? ENCODER_FACTORY.binaryEncoder(output, encoderToReuse)
: ENCODER_FACTORY.directBinaryEncoder(output, encoderToReuse);
Expand Down Expand Up @@ -626,10 +633,15 @@ protected final void _verifyValueWrite(String typeMsg) throws IOException {
@Override
protected void _releaseBuffers() {
// no super implementation to call
BinaryEncoder e = _encoder;
if (e != null) {
_encoder = null;
ApacheCodecRecycler.release(e);
ApacheCodecRecycler recycler = _apacheCodecRecycler;
if (recycler != null) {
_apacheCodecRecycler = null;
BinaryEncoder e = _encoder;
if (e != null) {
_encoder = null;
recycler.release(e);
}
recycler.releaseToPool();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ public AvroFactory copy()
@Override
protected AvroParser _createParser(InputStream in, IOContext ctxt) throws IOException {
return new ApacheAvroParserImpl(ctxt, _parserFeatures, _avroParserFeatures,
_avroRecyclerPool.acquireAndLinkPooled(),
_objectCodec, in);
}

@Override
protected AvroParser _createParser(byte[] data, int offset, int len, IOContext ctxt) throws IOException {
return new ApacheAvroParserImpl(ctxt, _parserFeatures, _avroParserFeatures,
_avroRecyclerPool.acquireAndLinkPooled(),
_objectCodec, data, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.Writer;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;

import com.fasterxml.jackson.core.*;
Expand All @@ -18,11 +19,22 @@
*/
public class ApacheAvroParserImpl extends AvroParserImpl
{
/*
/**********************************************************
/* Configuration
/**********************************************************
*/

/**
* @since 2.16
*/
protected final static DecoderFactory DECODER_FACTORY = DecoderFactory.get();

/**
* @since 2.16
*/
protected ApacheCodecRecycler _apacheCodecRecycler;

/*
/**********************************************************
/* Input source config
Expand Down Expand Up @@ -71,6 +83,7 @@ public class ApacheAvroParserImpl extends AvroParserImpl
*/

public ApacheAvroParserImpl(IOContext ctxt, int parserFeatures, int avroFeatures,
ApacheCodecRecycler apacheCodecRecycler,
ObjectCodec codec, InputStream in)
{
super(ctxt, parserFeatures, avroFeatures, codec);
Expand All @@ -80,20 +93,23 @@ public ApacheAvroParserImpl(IOContext ctxt, int parserFeatures, int avroFeatures
_inputEnd = 0;
_bufferRecyclable = true;

_apacheCodecRecycler = apacheCodecRecycler;
final boolean buffering = Feature.AVRO_BUFFERING.enabledIn(avroFeatures);
BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder();
BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder();
_decoder = buffering
? DECODER_FACTORY.binaryDecoder(in, decoderToReuse)
: DECODER_FACTORY.directBinaryDecoder(in, decoderToReuse);
}

public ApacheAvroParserImpl(IOContext ctxt, int parserFeatures, int avroFeatures,
ApacheCodecRecycler apacheCodecRecycler,
ObjectCodec codec,
byte[] buffer, int offset, int len)
{
super(ctxt, parserFeatures, avroFeatures, codec);
_inputStream = null;
BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder();
_apacheCodecRecycler = apacheCodecRecycler;
BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder();
_decoder = DECODER_FACTORY.binaryDecoder(buffer, offset, len, decoderToReuse);
}

Expand All @@ -107,14 +123,18 @@ protected void _releaseBuffers() throws IOException {
_ioContext.releaseReadIOBuffer(buf);
}
}
BinaryDecoder d = _decoder;
if (d != null) {
_decoder = null;
ApacheCodecRecycler.release(d);
ApacheCodecRecycler recycler = _apacheCodecRecycler;
if (recycler != null) {
_apacheCodecRecycler = null;
BinaryDecoder d = _decoder;
if (d != null) {
_decoder = null;
recycler.release(d);
}
recycler.releaseToPool();
}
}


/*
/**********************************************************
/* Abstract method impls, i/o access
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.fasterxml.jackson.dataformat.avro.apacheimpl;

import java.lang.ref.SoftReference;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import com.fasterxml.jackson.core.util.RecyclerPool;
import com.fasterxml.jackson.core.util.RecyclerPool.WithPool;

import org.apache.avro.io.*;

/**
Expand All @@ -12,51 +15,74 @@
* @since 2.8.7
*/
public final class ApacheCodecRecycler
implements WithPool<ApacheCodecRecycler>
{
protected final static ThreadLocal<SoftReference<ApacheCodecRecycler>> _recycler
= new ThreadLocal<SoftReference<ApacheCodecRecycler>>();

private final AtomicReference<BinaryDecoder> decoderRef = new AtomicReference<>();
private final AtomicReference<BinaryEncoder> encoderRef = new AtomicReference<>();

private ApacheCodecRecycler() { }
private RecyclerPool<ApacheCodecRecycler> _pool;

ApacheCodecRecycler() { }

/*
/**********************************************************
/* Public API
/**********************************************************
*/

public static BinaryDecoder acquireDecoder() {
return _recycler().decoderRef.getAndSet(null);
public BinaryDecoder acquireDecoder() {
return decoderRef.getAndSet(null);
}

public static BinaryEncoder acquireEncoder() {
return _recycler().encoderRef.getAndSet(null);
public BinaryEncoder acquireEncoder() {
return encoderRef.getAndSet(null);
}

public static void release(BinaryDecoder dec) {
_recycler().decoderRef.set(dec);
public void release(BinaryDecoder dec) {
decoderRef.set(dec);
}

public static void release(BinaryEncoder enc) {
_recycler().encoderRef.set(enc);
public void release(BinaryEncoder enc) {
encoderRef.set(enc);
}

/*
/**********************************************************
/* Internal per-instance methods
/* WithPool implementation
/**********************************************************
*/

/**
* Method called by owner of this recycler instance, to provide reference to
* {@link RecyclerPool} into which instance is to be released (if any)
*
* @since 2.16
*/
@Override
public ApacheCodecRecycler withPool(RecyclerPool<ApacheCodecRecycler> pool) {
if (this._pool != null) {
throw new IllegalStateException("ApacheCodecRecycler already linked to pool: "+pool);
}
// assign to pool to which this BufferRecycler belongs in order to release it
// to the same pool when the work will be completed
_pool = Objects.requireNonNull(pool);
return this;
}

private static ApacheCodecRecycler _recycler() {
SoftReference<ApacheCodecRecycler> ref = _recycler.get();
ApacheCodecRecycler r = (ref == null) ? null : ref.get();

if (r == null) {
r = new ApacheCodecRecycler();
_recycler.set(new SoftReference<>(r));
/**
* Method called when owner of this recycler no longer wishes use it; this should
* return it to pool passed via {@code withPool()} (if any).
*
* @since 2.16
*/
@Override
public void releaseToPool() {
if (_pool != null) {
RecyclerPool<ApacheCodecRecycler> tmpPool = _pool;
// nullify the reference to the pool in order to avoid the risk of releasing
// the same BufferRecycler more than once, thus compromising the pool integrity
_pool = null;
tmpPool.releasePooled(this);
}
return r;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.fasterxml.jackson.dataformat.avro.apacheimpl;

import java.lang.ref.SoftReference;

import com.fasterxml.jackson.core.util.BufferRecycler;
import com.fasterxml.jackson.core.util.RecyclerPool;

public final class AvroRecyclerPools
{
/**
* @return the default {@link RecyclerPool} implementation
* which is the thread local based one:
* basically alias to {@link #threadLocalPool()}).
*/
public static RecyclerPool<ApacheCodecRecycler> defaultPool() {
return threadLocalPool();
}

/**
* Accessor for getting the shared/global {@link ThreadLocalPool} instance
* (due to design only one instance ever needed)
*
* @return Globally shared instance of {@link ThreadLocalPool}
*/
public static RecyclerPool<ApacheCodecRecycler> threadLocalPool() {
return ThreadLocalPool.GLOBAL;
}

/*
/**********************************************************************
/* Concrete RecyclerPool implementations for recycling BufferRecyclers
/**********************************************************************
*/

/**
* {@link ThreadLocal}-based {@link RecyclerPool} implementation used for
* recycling {@link BufferRecycler} instances:
* see {@link RecyclerPool.ThreadLocalPoolBase} for full explanation
* of functioning.
*/
public static class ThreadLocalPool
extends RecyclerPool.ThreadLocalPoolBase<ApacheCodecRecycler>
{
private static final long serialVersionUID = 1L;

protected static final ThreadLocalPool GLOBAL = new ThreadLocalPool();

protected final static ThreadLocal<SoftReference<ApacheCodecRecycler>> _recycler
= new ThreadLocal<SoftReference<ApacheCodecRecycler>>();

private ThreadLocalPool() { }

@Override
public ApacheCodecRecycler acquirePooled() {
SoftReference<ApacheCodecRecycler> ref = _recycler.get();
ApacheCodecRecycler r = (ref == null) ? null : ref.get();

if (r == null) {
r = new ApacheCodecRecycler();
_recycler.set(new SoftReference<>(r));
}
return r;
}

// // // JDK serialization support

protected Object readResolve() { return GLOBAL; }
}

}

0 comments on commit 5bffe8b

Please sign in to comment.