-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose GCP PubSub PublishSettings to Gizmo users #239
Expose GCP PubSub PublishSettings to Gizmo users #239
Conversation
Expose publish settings
pubsub/gcp/gcp.go
Outdated
if p.topic.PublishSettings.DelayThreshold == 0 { | ||
_, err = res.Get(ctx) | ||
} else { | ||
// if the DelayThreshold > 0, we use a goroutine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two issues with this approach
- you don't handle/surface the error
- you possibly lose messages messages if your process shuts down while Get() is blocking for the delay
I don't have a good suggestion right now how to improve this but I would think you need a way to track outstanding publishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this perhaps just a matter of documentation? The application using this option would want to make sure it shuts down gracefully allowing for an interval equivalent to the value of DelayThreshold, but that's not something we can control here. I'm adding a note to that effect in the config comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documenting this issue is definitely a start but it still doesn't address the problem of not surfacing the errors. Using log.Printf()
as you do right now isn't playing nice with existing logging setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you're saying. I'm updating it to use the logger created for the pubsub package.
pubsub/gcp/gcp.go
Outdated
_, err := res.Get(ctx) | ||
var err error | ||
if p.topic.PublishSettings.DelayThreshold <= 1 * time.Millisecond { | ||
_, err = res.Get(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up'd this to 1 since that's the default
pubsub/gcp/gcp.go
Outdated
if p.topic.PublishSettings.DelayThreshold == 0 { | ||
_, err = res.Get(ctx) | ||
} else { | ||
// if the DelayThreshold > 0, we use a goroutine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documenting this issue is definitely a start but it still doesn't address the problem of not surfacing the errors. Using log.Printf()
as you do right now isn't playing nice with existing logging setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, just one minor nit but aside from then i think this is ok.
pubsub/gcp/gcp.go
Outdated
// otherwise we'll block here until that time interval passes | ||
go func(res *gpubsub.PublishResult, ctx context.Context, m []byte) { | ||
_, err := res.Get(ctx) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's inline this to make it clear it's not the err
from the outside and scoped just to this part here.
if _, err := res.Get(ctx); err != nil {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's updated.
…e settings where creating a publisher to specify batching criteria. In order to prevent blocking, we use a goroutine to get the publish result if there's a DelayThreshold for batching greater than the 1ms default.
…nski/gizmo into expose-publish-settings
Can this be merged/tagged? |
My apologies for the delay, this week was busy. I thought about this more and I don't think that spinning up an orphaned goroutine inside PublishRaw() without any knowledge/involvement by the caller is a good idea, this will only cause trouble down the road. If you publish with a delay and use a goroutine then the caller needs to start and own that goroutine instead of it being buried inside a library call with no error tracking. |
Having the goroutine on the caller side was actually my original approach, and I went back and forth on it before making the PR. I'm good with changing it as you suggest. |
…will need to account for the blocking.
…ettings will need to account for the blocking." This reverts commit 57cc4fa.
…will need to account for the blocking in Publish requests
pubsub/gcp/gcp.go
Outdated
t := c.Topic(cfg.Topic) | ||
// Update PublishSettings from cfg.PublishSettings | ||
// but never set thresholds to 0. | ||
if cfg.PublishSettings.DelayThreshold > 0*time.Millisecond { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> 0*time.Millisecond
- i think you can change these two to > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed and updated.
👍 |
This adds a PublishSettings object to the config passed to GCP's NewPublisher() and applies those settings with some guard rails to the publisher that's created. These updates are very close to the Google side of things, so I don't have specific new tests, but I'm happy to add them if people have suggestions.