Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer: avoid sending duplicate data #217

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

producer: avoid sending duplicate data #217

wants to merge 1 commit into from

Conversation

ghost
Copy link

@ghost ghost commented Sep 5, 2017

for {
	select {
	case t := <-w.transactionChan:
		w.transactions = append(w.transactions, t)
		err := w.conn.WriteCommand(t.cmd)
		if err != nil {
			w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
			w.close()
		}
	case data := <-w.responseChan:
		w.popTransaction(FrameTypeResponse, data)
	case data := <-w.errorChan:
		w.popTransaction(FrameTypeError, data)
	case <-w.closeChan:
		goto exit
	case <-w.exitChan:
		goto exit
	}
}

after producer called Stop, there may be data in channal w.responseChan, and exitChan may has been closed, and select randomly selected goto exit.

then in

func (w *Producer) transactionCleanup() {
	// clean up transactions we can easily account for
	for _, t := range w.transactions {
		t.Error = ErrNotConnected
		t.finish()
	}
	w.transactions = w.transactions[:0]
	... ...
}

if we don't read responseChan, the successfully sent data may be send again after creating a new producer and publish again.

@mreiferson mreiferson changed the title feat(producer): avoid sending duplicate data producer: avoid sending duplicate data Sep 5, 2017
@mreiferson mreiferson added the bug label Sep 5, 2017
@mreiferson
Copy link
Member

@zwb-ict great catch! I think this might explain the long-standing bug in #171! ⭐️

l:
for {
select {
case data := <-w.responseChan:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably have the same race in w.errorChan, so we should flush that channel too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. The real error reason should pass to user, not just set error reason to ErrNotConnected. I will add this code.

for _, t := range w.transactions {
t.Error = ErrNotConnected
t.finish()
l:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this label is necessary since it's not a nested for loop.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In for loop, there's a select in it, without label, break only jump out from select not for.

for {
select {
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be popped and finished with ErrNotConnected, like below?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the release code like below:

func (w *Producer) transactionCleanup() {
l:
	for {
		select {
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		default:
			// clean up transactions we can easily account for
			for _, t := range w.transactions {
				t.Error = ErrNotConnected
				t.finish()
			}
			w.transactions = w.transactions[:0]
			break l
		}
	}
        ... ...
}

The transaction poped in chan w.responseChan branch means successfully sent, so transaction's error reason is empty and should not be set.
The transaction poped in chan w.errorChan branch means sent failed, should be send again, and transaction's error reason should be set to the real reason, so user can know what happened.

Copy link
Author

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update my code, to add

case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)

for _, t := range w.transactions {
t.Error = ErrNotConnected
t.finish()
l:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In for loop, there's a select in it, without label, break only jump out from select not for.

l:
for {
select {
case data := <-w.responseChan:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. The real error reason should pass to user, not just set error reason to ErrNotConnected. I will add this code.

for {
select {
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the release code like below:

func (w *Producer) transactionCleanup() {
l:
	for {
		select {
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		default:
			// clean up transactions we can easily account for
			for _, t := range w.transactions {
				t.Error = ErrNotConnected
				t.finish()
			}
			w.transactions = w.transactions[:0]
			break l
		}
	}
        ... ...
}

The transaction poped in chan w.responseChan branch means successfully sent, so transaction's error reason is empty and should not be set.
The transaction poped in chan w.errorChan branch means sent failed, should be send again, and transaction's error reason should be set to the real reason, so user can know what happened.

@mreiferson
Copy link
Member

@zwb-ict I've looked a bit more into this. I don't think the race condition you've identified here is possible. The underlying connection only calls to the OnClose delegate (which calls close(w.closeChan)) after fully cleaning up the connection (see https://github.com/nsqio/go-nsq/blob/master/conn.go#L694). Given that w.responseChan and w.errorChan are both unbuffered channels, it implies that there cannot be a message sitting on those channels once w.closeChan is closed, which would only happen after the connection was done reading from the connection.

@ghost
Copy link
Author

ghost commented Sep 6, 2017

@mreiferson
Is there a possible when user called Stop?

func (w *Producer) Stop() {
	w.guard.Lock()
	if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
		w.guard.Unlock()
		return
	}
	w.log(LogLevelInfo, "stopping")
	close(w.exitChan)
	w.close()
	w.guard.Unlock()
	w.wg.Wait()
}

close(w.exitChan) is called before w.close() (which will call w.conn.close())

So router may exit for because of case <-w.exitChan:

func (w *Producer) router() {
	for {
		select {
		case t := <-w.transactionChan:
			w.transactions = append(w.transactions, t)
			err := w.conn.WriteCommand(t.cmd)
			if err != nil {
				w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
				w.close()
			}
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
	w.log(LogLevelInfo, "exiting router")
}

Before producer's goroutine run w.transactionCleanup(). conn's goroutine may pass data into w.responseChan or w.errorChan before w.conn.close().

@ghost
Copy link
Author

ghost commented Sep 6, 2017

@mreiferson
Copy link
Member

Is there a possible when user called Stop?

@zwb-ict yes, it looks like in that case the race would be possible.

below https://github.com/nsqio/go-nsq/blob/master/conn.go#L536, should we add goto exit like https://github.com/nsqio/go-nsq/blob/master/conn.go#L508?

Hmmm, possibly, but let's not do that in this PR?

@ghost
Copy link
Author

ghost commented Sep 7, 2017

@mreiferson yes, let's not do that in this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant