From 0b38267f13d5e17fcf8dfbb89168ef7fb520deb8 Mon Sep 17 00:00:00 2001 From: Achille Date: Fri, 21 Jun 2019 15:13:31 -0700 Subject: [PATCH] add kafka.SeekDontCheck (#295) * add kafka.SeekDontCheck * fix logic * touch less code * use SeekDontCheck in ReadBatch * add test --- conn.go | 31 ++++++++++++++++++++++++++++++- conn_test.go | 26 ++++++++++++++++++++++++++ go.mod | 1 + go.sum | 1 + 4 files changed, 58 insertions(+), 1 deletion(-) diff --git a/conn.go b/conn.go index 5bccf095a..94ab002bf 100644 --- a/conn.go +++ b/conn.go @@ -595,6 +595,12 @@ const ( SeekAbsolute = 1 // Seek to an absolute offset. SeekEnd = 2 // Seek relative to the last offset available in the partition. SeekCurrent = 3 // Seek relative to the current offset. + + // This flag may be combined to any of the SeekAbsolute and SeekCurrent + // constants to skip the bound check that the connection would do otherwise. + // Programs can use this flag to avoid making a metadata request to the kafka + // broker to read the current first and last offsets of the partition. + SeekDontCheck = 1 << 31 ) // Seek sets the offset for the next read or write operation according to whence, which @@ -604,12 +610,32 @@ const ( // as in lseek(2) or os.Seek. // The method returns the new absolute offset of the connection. func (c *Conn) Seek(offset int64, whence int) (int64, error) { + seekDontCheck := (whence & SeekDontCheck) != 0 + whence &= ^SeekDontCheck + switch whence { case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent: default: return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence) } + if seekDontCheck { + if whence == SeekAbsolute { + c.mutex.Lock() + c.offset = offset + c.mutex.Unlock() + return offset, nil + } + + if whence == SeekCurrent { + c.mutex.Lock() + c.offset += offset + offset = c.offset + c.mutex.Unlock() + return offset, nil + } + } + if whence == SeekAbsolute { c.mutex.Lock() unchanged := offset == c.offset @@ -618,6 +644,7 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) { return offset, nil } } + if whence == SeekCurrent { c.mutex.Lock() offset = c.offset + offset @@ -726,7 +753,9 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)} } - offset, err := c.Seek(c.Offset()) + offset, whence := c.Offset() + + offset, err := c.Seek(offset, whence|SeekDontCheck) if err != nil { return &Batch{err: dontExpectEOF(err)} } diff --git a/conn_test.go b/conn_test.go index b79338233..fea36adcd 100644 --- a/conn_test.go +++ b/conn_test.go @@ -152,6 +152,11 @@ func TestConn(t *testing.T) { function: testConnSeekRandomOffset, }, + { + scenario: "unchecked seeks allow the connection to be positionned outside the boundaries of the partition", + function: testConnSeekDontCheck, + }, + { scenario: "writing and reading messages sequentially should preserve the order", function: testConnWriteReadSequentially, @@ -439,6 +444,27 @@ func testConnSeekRandomOffset(t *testing.T, conn *Conn) { } } +func testConnSeekDontCheck(t *testing.T, conn *Conn) { + for i := 0; i != 10; i++ { + if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil { + t.Fatal(err) + } + } + + offset, err := conn.Seek(42, SeekAbsolute|SeekDontCheck) + if err != nil { + t.Error(err) + } + + if offset != 42 { + t.Error("bad offset:", offset) + } + + if _, err := conn.ReadMessage(1024); err != OffsetOutOfRange { + t.Error("unexpected error:", err) + } +} + func testConnWriteReadSequentially(t *testing.T, conn *Conn) { for i := 0; i != 10; i++ { if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil { diff --git a/go.mod b/go.mod index bcbd43037..e08dded80 100644 --- a/go.mod +++ b/go.mod @@ -9,4 +9,5 @@ require ( github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xdg/stringprep v1.0.0 // indirect golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect + golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 ) diff --git a/go.sum b/go.sum index d5c7940d6..651d75197 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,7 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=