Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce "busy buffer" logs #1641

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func newBuffer(nc net.Conn) buffer {
}
}

// busy retruns true if the buffer contains some read data.
methane marked this conversation as resolved.
Show resolved Hide resolved
func (b *buffer) busy() bool {
return b.length > 0
}

// flip replaces the active buffer with the background buffer
// this is a delayed flip that simply increases the buffer counter;
// the actual flip will be performed the next time we call `buffer.fill`
Expand Down
10 changes: 7 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,14 @@ func (mc *mysqlConn) Close() (err error) {
if !mc.closed.Load() {
err = mc.writeCommandPacket(comQuit)
}
mc.close()
return
}

// close closes the network connection and cleare results without sending COM_QUIT.
methane marked this conversation as resolved.
Show resolved Hide resolved
func (mc *mysqlConn) close() {
mc.cleanup()
mc.clearResult()
Comment on lines +129 to 131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential concurrency issue with mc.clearResult() in mc.close()

Calling mc.clearResult() inside mc.close() may introduce concurrency issues because mc.clearResult() is not thread-safe, and mc.close() can be invoked from multiple goroutines. Previously, the comment in mc.cleanup() mentions avoiding mc.clearResult() to prevent unsafe conditions.

Consider removing mc.clearResult() from mc.close() or ensuring it is called in a thread-safe manner to prevent race conditions.

return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Unsafe concurrent access in close()

The close() method calls mc.clearResult() which is not thread-safe, as noted in the cleanup() method's comment: "This function can be called from multiple goroutines. So we can not mc.clearResult() here." This could lead to race conditions.

Consider this safer implementation:

func (mc *mysqlConn) close() {
    mc.cleanup()
-   mc.clearResult()
}

If clearing the result is necessary, consider:

  1. Adding synchronization around clearResult()
  2. Documenting that close() must only be called from a single goroutine
  3. Moving the clearResult() responsibility to the caller
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// close closes the network connection and cleare results without sending COM_QUIT.
func (mc *mysqlConn) close() {
mc.cleanup()
mc.clearResult()
return
}
// close closes the network connection and cleare results without sending COM_QUIT.
func (mc *mysqlConn) close() {
mc.cleanup()
}


// Closes the network connection and unsets internal variables. Do not call this
Expand Down Expand Up @@ -637,7 +641,7 @@ func (mc *mysqlConn) CheckNamedValue(nv *driver.NamedValue) (err error) {
// ResetSession implements driver.SessionResetter.
// (From Go 1.10)
func (mc *mysqlConn) ResetSession(ctx context.Context) error {
if mc.closed.Load() {
if mc.closed.Load() || mc.buf.busy() {
return driver.ErrBadConn
}

Expand Down Expand Up @@ -671,7 +675,7 @@ func (mc *mysqlConn) ResetSession(ctx context.Context) error {
// IsValid implements driver.Validator interface
// (From Go 1.15)
func (mc *mysqlConn) IsValid() bool {
return !mc.closed.Load()
return !mc.closed.Load() && !mc.buf.busy()
}

var _ driver.SessionResetter = &mysqlConn{}
Expand Down
44 changes: 14 additions & 30 deletions packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {
// read packet header
data, err := mc.buf.readNext(4)
if err != nil {
mc.close()
if cerr := mc.canceled.Value(); cerr != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancelled connection was not closed. it caused busy buffer.

return nil, cerr
}
mc.log(err)
mc.Close()
return nil, ErrInvalidConn
}

Expand All @@ -45,7 +45,7 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {

// check packet sync [8 bit]
if data[3] != mc.sequence {
mc.Close()
mc.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sends COM_QUIT during reading packet. It printed "busy buffer" in log.

if data[3] > mc.sequence {
return nil, ErrPktSyncMul
}
Expand All @@ -59,7 +59,7 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {
// there was no previous packet
if prevData == nil {
mc.log(ErrMalformPkt)
mc.Close()
mc.close()
return nil, ErrInvalidConn
}

Expand All @@ -69,11 +69,11 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {
// read packet body [pktLen bytes]
data, err = mc.buf.readNext(pktLen)
if err != nil {
mc.close()
if cerr := mc.canceled.Value(); cerr != nil {
return nil, cerr
}
mc.log(err)
mc.Close()
return nil, ErrInvalidConn
}

Expand Down Expand Up @@ -125,10 +125,10 @@ func (mc *mysqlConn) writePacket(data []byte) error {

n, err := mc.netConn.Write(data[:4+size])
if err != nil {
mc.cleanup()
if cerr := mc.canceled.Value(); cerr != nil {
return cerr
}
mc.cleanup()
if n == 0 && pktLen == len(data)-4 {
// only for the first loop iteration when nothing was written yet
mc.log(err)
Expand Down Expand Up @@ -162,11 +162,6 @@ func (mc *mysqlConn) writePacket(data []byte) error {
func (mc *mysqlConn) readHandshakePacket() (data []byte, plugin string, err error) {
data, err = mc.readPacket()
if err != nil {
// for init we can rewrite this to ErrBadConn for sql.Driver to retry, since
// in connection initialization we don't risk retrying non-idempotent actions.
if err == ErrInvalidConn {
return nil, "", driver.ErrBadConn
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readHandshakePacket() is called during connect. And database/sql don't check ErrBadConn from connect.

return
}

Expand Down Expand Up @@ -312,9 +307,8 @@ func (mc *mysqlConn) writeHandshakeResponsePacket(authResp []byte, plugin string
// Calculate packet length and get buffer with that size
data, err := mc.buf.takeBuffer(pktLen + 4)
if err != nil {
// cannot take the buffer. Something must be wrong with the connection
mc.log(err)
return errBadConnNoWrite
mc.cleanup()
return err
}

// ClientFlags [32 bit]
Expand Down Expand Up @@ -404,9 +398,8 @@ func (mc *mysqlConn) writeAuthSwitchPacket(authData []byte) error {
pktLen := 4 + len(authData)
data, err := mc.buf.takeBuffer(pktLen)
if err != nil {
// cannot take the buffer. Something must be wrong with the connection
mc.log(err)
return errBadConnNoWrite
mc.cleanup()
return err
}

// Add the auth data [EOF]
Expand All @@ -424,9 +417,7 @@ func (mc *mysqlConn) writeCommandPacket(command byte) error {

data, err := mc.buf.takeSmallBuffer(4 + 1)
if err != nil {
// cannot take the buffer. Something must be wrong with the connection
mc.log(err)
return errBadConnNoWrite
return err
}

// Add command byte
Expand All @@ -443,9 +434,7 @@ func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error {
pktLen := 1 + len(arg)
data, err := mc.buf.takeBuffer(pktLen + 4)
if err != nil {
// cannot take the buffer. Something must be wrong with the connection
mc.log(err)
return errBadConnNoWrite
return err
}

// Add command byte
Expand All @@ -464,9 +453,7 @@ func (mc *mysqlConn) writeCommandPacketUint32(command byte, arg uint32) error {

data, err := mc.buf.takeSmallBuffer(4 + 1 + 4)
if err != nil {
// cannot take the buffer. Something must be wrong with the connection
mc.log(err)
return errBadConnNoWrite
return err
}

// Add command byte
Expand Down Expand Up @@ -1007,9 +994,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error {
// In this case the len(data) == cap(data) which is used to optimise the flow below.
}
if err != nil {
// cannot take the buffer. Something must be wrong with the connection
mc.log(err)
return errBadConnNoWrite
return err
}

// command [1 byte]
Expand Down Expand Up @@ -1207,8 +1192,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error {
if valuesCap != cap(paramValues) {
data = append(data[:pos], paramValues...)
if err = mc.buf.store(data); err != nil {
mc.log(err)
return errBadConnNoWrite
return err
}
}

Expand Down