From 5895eb4259d0cd221503bac95e393a4abb9f8ce4 Mon Sep 17 00:00:00 2001 From: Jille Timmermans Date: Mon, 12 Feb 2024 21:43:55 +0100 Subject: [PATCH] s2: Add AsyncFlush method: Complete the block without flushing (#927) * s2: Add AsyncFlush method: Complete the block without flushing My use case is to transfer a large compressed S2 stream with a few changes very often. To get a small diff I want to end blocks at application decided points rather than at byte offsets. This allows me to remove the first byte without every single block changing. Flush() works for this, but it limits concurrency because it waits for the last block to be compressed rather than allowing that asynchronously. So I'd like to propose AsyncFlush, which flushes the buffer to a block, but doesn't flush the block to the io.Writer. There were actually a few places in the s2 code that also wanted to end the block, but didn't necessary want to flush to the writer. * Update s2/writer.go Co-authored-by: Klaus Post --------- Co-authored-by: Klaus Post --- s2/writer.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/s2/writer.go b/s2/writer.go index bba66a8766..1253ea675c 100644 --- a/s2/writer.go +++ b/s2/writer.go @@ -215,7 +215,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return 0, err } if len(w.ibuf) > 0 { - err := w.Flush() + err := w.AsyncFlush() if err != nil { return 0, err } @@ -225,7 +225,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { if err := w.EncodeBuffer(buf); err != nil { return 0, err } - return int64(len(buf)), w.Flush() + return int64(len(buf)), w.AsyncFlush() } for { inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen] @@ -354,7 +354,7 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { } // Flush queued data first. if len(w.ibuf) > 0 { - err := w.Flush() + err := w.AsyncFlush() if err != nil { return err } @@ -716,9 +716,9 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { return nRet, nil } -// Flush flushes the Writer to its underlying io.Writer. -// This does not apply padding. -func (w *Writer) Flush() error { +// AsyncFlush writes any buffered bytes to a block and starts compressing it. +// It does not wait for the output has been written as Flush() does. +func (w *Writer) AsyncFlush() error { if err := w.err(nil); err != nil { return err } @@ -738,6 +738,15 @@ func (w *Writer) Flush() error { } } } + return w.err(nil) +} + +// Flush flushes the Writer to its underlying io.Writer. +// This does not apply padding. +func (w *Writer) Flush() error { + if err := w.AsyncFlush(); err != nil { + return err + } if w.output == nil { return w.err(nil) }