Skip to content

Commit

Permalink
Smarter chunking, more docs
Browse files Browse the repository at this point in the history
Also: fix viaInputStream queueSize parameter
  • Loading branch information
erikvanoosten committed Oct 14, 2024
1 parent 41c3dd1 commit 11eab1d
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 106 deletions.
31 changes: 21 additions & 10 deletions brotli/src/main/scala/zio/compress/Brotli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,29 @@ import zio.compress.JavaIoInterop.viaInputStreamByte

//noinspection ScalaFileName
object BrotliDecompressor {
def make(customDictionary: Option[Array[Byte]] = None): BrotliDecompressor =
new BrotliDecompressor(customDictionary)

/** Makes a pipeline that accepts a Brotli compressed byte stream and produces a decompressed byte stream.
*
* @param customDictionary
* a custom dictionary, or `None` for no custom dictionary
* @param chunkSize
* The maximum chunk size of the outgoing ZStream. Defaults to `ZStream.DefaultChunkSize` (4KiB).
*/
def make(
customDictionary: Option[Array[Byte]] = None,
chunkSize: Int = ZStream.DefaultChunkSize
): BrotliDecompressor =
new BrotliDecompressor(customDictionary, chunkSize)
}

//noinspection ScalaFileName
class BrotliDecompressor private(customDictionary: Option[Array[Byte]]) extends Decompressor {
override def decompress: ZPipeline[Any, Throwable, Byte, Byte] =
viaInputStreamByte { inputStream =>
new BrotliInputStream(
inputStream,
BrotliInputStream.DEFAULT_INTERNAL_BUFFER_SIZE,
customDictionary.orNull
)
class BrotliDecompressor private (customDictionary: Option[Array[Byte]], chunkSize: Int) extends Decompressor {
override def decompress: ZPipeline[Any, Throwable, Byte, Byte] = {
// BrotliInputStream.read does its best to read as many bytes as requested; no buffering needed.
viaInputStreamByte(chunkSize) { inputStream =>
// We don't read byte-by-byte so we set the smallest byte-by-byte buffer size possible.
val byteByByteReadBufferSize = 1
new BrotliInputStream(inputStream, byteByByteReadBufferSize, customDictionary.orNull)
}
}
}
44 changes: 20 additions & 24 deletions bzip2/src/main/scala/zio/compress/Bzip2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,16 @@ import zio.stream._
import org.apache.commons.compress.compressors.bzip2.{BZip2CompressorInputStream, BZip2CompressorOutputStream}
import zio.compress.JavaIoInterop.{viaInputStreamByte, viaOutputStreamByte}

sealed abstract class Bzip2BlockSize(val jValue: Int)
object Bzip2BlockSize {
case object BlockSize100KB extends Bzip2BlockSize(1)
case object BlockSize200KB extends Bzip2BlockSize(2)
case object BlockSize300KB extends Bzip2BlockSize(3)
case object BlockSize400KB extends Bzip2BlockSize(4)
case object BlockSize500KB extends Bzip2BlockSize(5)
case object BlockSize600KB extends Bzip2BlockSize(6)
case object BlockSize700KB extends Bzip2BlockSize(7)
case object BlockSize800KB extends Bzip2BlockSize(8)
case object BlockSize900KB extends Bzip2BlockSize(9)

private val Values: Seq[Bzip2BlockSize] =
Seq(BlockSize100KB, BlockSize200KB, BlockSize300KB, BlockSize400KB, BlockSize500KB, BlockSize600KB, BlockSize700KB, BlockSize800KB, BlockSize900KB)

/** @param blockSize100KB a bzip2 block size in 100KB increments, valid values: 1 to 9 */
def fromBzip2BlockSize(blockSize100KB: Int): Option[Bzip2BlockSize] =
Values.find(_.jValue == blockSize100KB)
}

object Bzip2Compressor {

/** Make a pipeline that accepts a stream of bytes and produces a stream with Bzip2 compressed bytes.
*
* Note: Bzip2 uses a lot of memory. See [[BZip2CompressorOutputStream]] for an overview of the required heap size
* for each block size.
*
* @param blockSize
* the block size to use. Defaults to 900KB.
*/
def make(blockSize: Option[Bzip2BlockSize] = None): Bzip2Compressor =
new Bzip2Compressor(blockSize)
}
Expand All @@ -40,11 +29,18 @@ class Bzip2Compressor private (blockSize: Option[Bzip2BlockSize]) extends Compre
}

object Bzip2Decompressor {
def make(): Bzip2Decompressor =
new Bzip2Decompressor()

/** Makes a pipeline that accepts a Bzip2 compressed byte stream and produces a decompressed byte stream.
*
* @param chunkSize
* The maximum chunk size of the outgoing ZStream. Defaults to `ZStream.DefaultChunkSize` (4KiB).
*/
def make(chunkSize: Int = ZStream.DefaultChunkSize): Bzip2Decompressor =
new Bzip2Decompressor(chunkSize)
}

class Bzip2Decompressor private extends Decompressor {
class Bzip2Decompressor private (chunkSize: Int = ZStream.DefaultChunkSize) extends Decompressor {
override def decompress: ZPipeline[Any, Throwable, Byte, Byte] =
viaInputStreamByte(new BZip2CompressorInputStream(_))
// BrotliInputStream.read does its best to read as many bytes as requested; no buffering needed.
viaInputStreamByte(chunkSize)(new BZip2CompressorInputStream(_))
}
36 changes: 36 additions & 0 deletions bzip2/src/main/scala/zio/compress/Bzip2BlockSize.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package zio.compress

sealed abstract class Bzip2BlockSize(val jValue: Int)

object Bzip2BlockSize {
case object BlockSize100KB extends Bzip2BlockSize(1)
case object BlockSize200KB extends Bzip2BlockSize(2)
case object BlockSize300KB extends Bzip2BlockSize(3)
case object BlockSize400KB extends Bzip2BlockSize(4)
case object BlockSize500KB extends Bzip2BlockSize(5)
case object BlockSize600KB extends Bzip2BlockSize(6)
case object BlockSize700KB extends Bzip2BlockSize(7)
case object BlockSize800KB extends Bzip2BlockSize(8)
case object BlockSize900KB extends Bzip2BlockSize(9)

private val Values: Seq[Bzip2BlockSize] =
Seq(
BlockSize100KB,
BlockSize200KB,
BlockSize300KB,
BlockSize400KB,
BlockSize500KB,
BlockSize600KB,
BlockSize700KB,
BlockSize800KB,
BlockSize900KB
)

/** Converts a bzip2 block size from [[Int]] to [[Bzip2BlockSize]].
*
* @param blockSize100KB
* a bzip2 block size in 100KB increments, valid values: 1 to 9
*/
def fromBzip2BlockSize(blockSize100KB: Int): Option[Bzip2BlockSize] =
Values.find(_.jValue == blockSize100KB)
}
2 changes: 1 addition & 1 deletion core/src/main/scala/zio/compress/Defaults.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.compress

object Defaults {
val DefaultChunkSize: Int = 1024 * 64
val DefaultChunkSize: Int = 64 * 1024
val DefaultChunkedQueueSize: Int = 2
}
104 changes: 78 additions & 26 deletions core/src/main/scala/zio/compress/JavaIoInterop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,50 @@ private[compress] object JavaIoInterop {
/** Makes a pipeline that makes the incoming ZStream available for reading via an InputStream. This InputStream can
* then be wrapped by `makeInputStream`. All bytes read from the wrapped InputStream are put in the outgoing ZStream.
*
* @param makeInputStream
* Create the wrapped InputStream.
* This illustrates how data flows in the pipeline implementation:
* {{{
* Incoming ZStream --> queue <-- InputStream <-- wrapped InputStream <-- outgoing ZStream
*
* --> push
* <-- pull
* }}}
*
* This implementation always tries to read `chunkSize` bytes from the wrapped InputStream. However, when fewer bytes
* are available, the chunks might be smaller than `chunkSize`. If the wrapped InputStream has a good `available()`
* implementation, consider wrapping the wrapped InputStream with a `BufferedInputStream`. If the chunks are still
* too small, then consider re-chunking the outgoing ZStream, e.g. with `ZStream.rechunk`.
*
* @param chunkSize
* The maximum chunk size of the outgoing ZStream. Defaults to `ZStream.DefaultChunkSize` (4KiB).
* @param queueSize
* Chunks of the incoming ZStream go through an internal queue. This parameter determines the size of that queue.
* Defaults to 2.
* @param makeInputStream
* Create the wrapped InputStream.
* @return
* The created pipeline.
*/
def viaInputStreamByte(
makeInputStream: InputStream => InputStream,
chunkSize: Int = ZStream.DefaultChunkSize,
queueSize: Int = Defaults.DefaultChunkedQueueSize
): ZPipeline[Any, Throwable, Byte, Byte] =
)(makeInputStream: InputStream => InputStream): ZPipeline[Any, Throwable, Byte, Byte] =
viaInputStream[Byte](queueSize) { inputStream =>
ZIO.attemptBlocking(ZStream.fromInputStream(makeInputStream(inputStream)))
ZIO.attemptBlocking(ZStream.fromInputStream(makeInputStream(inputStream), chunkSize))
}

/** Makes a pipeline that makes the incoming ZStream available for reading via an InputStream. This is then used by
* function `streamReader` to produce the outgoing ZStream.
*
* This illustrates how data flows in the pipeline implementation:
* {{{
* Incoming ZStream --> queue <-- InputStream <-- wrapped InputStream <--streamReader-- outgoing ZStream
*
* --> push
* <-- pull
* }}}
*
* @param queueSize
* Chunks of the incoming ZStream go through an internal queue. This parameter determines the size of that queue.
* Chunks of the incoming ZStream go to an internal queue. This parameter determines the size of that queue.
* Defaults to 2.
* @param streamReader
* A ZIO that reads from the given incoming InputStream to produce the outgoing ZStream. The outgoing ZStream must
Expand All @@ -51,7 +74,7 @@ private[compress] object JavaIoInterop {
.map(Take.chunk)
.run(ZSink.fromQueue(queue))
.onDoneCause(
cause => queue.offer(Take.failCause(cause)),
(cause: Cause[Throwable]) => queue.offer(Take.failCause(cause)),
_ => queue.offer(Take.end)
)
.forkScoped
Expand All @@ -61,11 +84,34 @@ private[compress] object JavaIoInterop {
}
}

/** Makes a pipeline that captures the output of an OutputStream (created by `makeOutputStream`) and makes it
* available as the outgoing ZStream. The incoming ZStream of bytes is written to the OutputStream.
/** Makes a pipeline that captures the output of an OutputStream and makes it available as the outgoing ZStream. The
* OutputStream can then be wrapped (via `makeOutputStream`). The incoming ZStream of bytes is written to the wrapped
* OutputStream.
*
* @see
* [[viaOutputStream]]
* This illustrates how data flows in the pipeline implementation:
* {{{
* Incoming ZStream --> wrapping OutputStream --> buffered OutputStream --> queue <-- outgoing ZStream
*
* --> push
* <-- pull
* }}}
*
* Often, the wrapping OutputStreams needs to write to its underlying outputStream immediately (from the
* constructor). Usually this is just a few bytes, for example a magic header indicating file type. Since the
* internal queue is bounded (see parameter `queueSize`), and the queue reader starts later, there is limited space
* for this data. To be precise, `queue-size * chunk-size` bytes (defaults to 2 * 64KiB = 128KiB) are available.
*
* The wrapped OutputStream is closed when the incoming ZStream ends.
*
* @param makeOutputStream
* A function that creates the wrapped OutputStream from an OutputStream that writes to the internal queue.
* @param chunkSize
* The chunk size of the outgoing ZStream (also the size of the buffer in the buffered OutputStream). Defaults to
* 64KiB.
* @param queueSize
* The internal queue size. Defaults to 2.
* @return
* The created pipeline.
*/
def viaOutputStreamByte(
makeOutputStream: OutputStream => OutputStream,
Expand All @@ -76,30 +122,36 @@ private[compress] object JavaIoInterop {
stream.runForeachChunk(chunk => ZIO.attemptBlocking(outputStream.write(chunk.toArray)))
}

/** Makes a pipeline that captures the output of an OutputStream of type `OS` (created by `makeOutputStream`) and
* makes it available as the outgoing ZStream. The input ZStream (with items of type `In`) is written to the
* OutputStream by `streamWriter`.
/** Makes a pipeline that captures the output of an OutputStream and makes it available as the outgoing ZStream. The
* OutputStream can then be wrapped (via `makeOutputStream`). The input ZStream (with items of type `In`) is written
* to the wrapped OutputStream by `streamWriter`.
*
* This illustrates how data flows in the pipeline implementation:
* {{{
* Incoming ZStream --streamWriter--> wrapping OutputStream --> buffered OutputStream --> queue --> outgoing ZStream
* }}}
*
* Often, the wrapping OutputStreams needs to write to its underlying outputStream immediately (from the
* constructor). Usually this is just a few bytes, for example a magic header indicating file type. Since the
* internal queue is bounded (see parameter `queueSize`), and the queue reader starts later, there is limited space
* for this data. To be precise, `queue-size * chunk-size` bytes (defaults to 2 * 64KiB = 128KiB) are available.
*
* Many of these OutputStreams immediately start writing to their underlying outputStream from their constructor.
* (Usually just a few bytes, for example a magic header indicating file type.) Since the internal queue is bounded
* (see `queueSize`), and the queue reader starts last, the output stream is buffered. The buffer needs to be large
* enough to fit all the initially written data. Each time the buffer fills up, the pipeline produces a chunk in the
* outgoing ZStream. Therefore, the buffer size is set with the `chunkSize` parameter. With the default settings we
* get large buffers and a small queue.
* The wrapped OutputStream is closed when the incoming ZStream ends.
*
* @param makeOutputStream
* Create the wrapped OutputStream.
* A function that creates the wrapped OutputStream from an OutputStream that writes to the internal queue.
* @param chunkSize
* The internal buffer size, also the chunk size of the outgoing ZStream, defaults to 64KiB.
* The chunk size of the outgoing ZStream (also the size of the buffer in the buffered OutputStream). Defaults to
* 64KiB.
* @param queueSize
* The internal queue size, defaults to 2.
* The internal queue size. Defaults to 2.
* @param streamWriter
* Function that writes items from the given incoming ZStream to the given OutputStream. The OutputStream should
* _not_ be closed when the stream ends.
* Function that writes items from the incoming ZStream to the wrapped OutputStream. The wrapped OutputStream
* should _not_ be closed when the incoming ZStream ends.
* @tparam In
* Type of incoming items.
* @tparam OS
* Type of the OutputStream.
* Type of the wrapping OutputStream.
* @return
* The created pipeline.
*/
Expand Down
56 changes: 44 additions & 12 deletions gzip/src/main/scala/zio/compress/Gzip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,73 @@ import zio.stream.compression.{CompressionLevel, CompressionStrategy}

object GzipCompressor {
private val CompressionLevels = Seq(
DefaultCompression, NoCompression, BestSpeed, CompressionLevel2, CompressionLevel3, CompressionLevel4,
CompressionLevel5, CompressionLevel6, CompressionLevel7, CompressionLevel8, BestCompression
DefaultCompression,
NoCompression,
BestSpeed,
CompressionLevel2,
CompressionLevel3,
CompressionLevel4,
CompressionLevel5,
CompressionLevel6,
CompressionLevel7,
CompressionLevel8,
BestCompression
)
private val CompressionStrategies = Seq(DefaultStrategy, Filtered, HuffmanOnly)

/** @param level a gzip compression level, valid values: -1 (default), 0 (fastest) to 9 (best compression) */
/** Converts a deflate compression level from [[Int]] to [[CompressionLevel]].
*
* @param level
* a deflate compression level, valid values: -1 (default), 0 (no compression), 1 (fastest) to 9 (best compression)
*/
def intToCompressionLevel(level: Int): Option[CompressionLevel] =
CompressionLevels.find(_.jValue == level)

/** @param strategy a gzip compression strategy, valid values: 0 (default), 1 (filtered) or 2 (huffman only) */
/** Converts a deflate compression strategy from [[Int]] to [[CompressionStrategy]].
*
* @param strategy
* a deflate compression strategy, valid values: 0 (default), 1 (filtered) or 2 (huffman only)
*/
def intToCompressionStrategy(strategy: Int): Option[CompressionStrategy] =
CompressionStrategies.find(_.jValue == strategy)

/** Make a pipeline that accepts a stream of bytes and produces a stream with Gzip compressed bytes.
*
* @param deflateLevel
* the deflate compression level
* @param deflateStrategy
* a deflate compression strategy, valid values: 0 (default), 1 (filtered) or 2 (huffman only)
* @param bufferSize
* the maximum chunk size of the outgoing ZStream. Defaults to 64KiB.
*/
def make(
deflateLevel: Option[CompressionLevel] = None,
deflateStrategy: Option[CompressionStrategy] = None,
bufferSize: Int = Defaults.DefaultChunkSize
): GzipCompressor =
new GzipCompressor(deflateLevel, deflateStrategy, bufferSize)
new GzipCompressor(
deflateLevel.getOrElse(DefaultCompression),
deflateStrategy.getOrElse(CompressionStrategy.DefaultStrategy),
bufferSize
)
}

class GzipCompressor private (
deflateLevel: Option[CompressionLevel],
deflateStrategy: Option[CompressionStrategy],
deflateLevel: CompressionLevel,
deflateStrategy: CompressionStrategy,
bufferSize: Int
) extends Compressor {
override def compress: ZPipeline[Any, Nothing, Byte, Byte] =
ZPipeline.gzip(
bufferSize,
deflateLevel.getOrElse(DefaultCompression),
deflateStrategy.getOrElse(CompressionStrategy.DefaultStrategy)
)
ZPipeline.gzip(bufferSize, deflateLevel, deflateStrategy)
}

object GzipDecompressor {

/** Makes a pipeline that accepts a Gzip compressed byte stream and produces a decompressed byte stream.
*
* @param bufferSize
* the used buffer size. Defaults to 64KiB.
*/
def make(bufferSize: Int = Defaults.DefaultChunkSize): GzipDecompressor =
new GzipDecompressor(bufferSize)
}
Expand Down
Loading

0 comments on commit 11eab1d

Please sign in to comment.