diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 1f8cfb5..1c820c0 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -1,4 +1,4 @@ -name: Go Checks +name: Golint on: push: @@ -33,4 +33,4 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v2 with: - version: v1.29 + version: v1.40 diff --git a/.golangci.toml b/.golangci.toml index aff2be0..7d3369e 100644 --- a/.golangci.toml +++ b/.golangci.toml @@ -17,6 +17,10 @@ locale = "US" check-shadowing = true disable = ["composites"] +[linters-settings.goimports] + +local-prefixes = "github.com/cloudchacho/hedwig-go" + [linters] -enable = ["misspell"] +enable = ["misspell", "gofmt", "goimports", "revive"] diff --git a/aws/aws.go b/aws/aws.go index c441365..e24874c 100644 --- a/aws/aws.go +++ b/aws/aws.go @@ -21,15 +21,15 @@ import ( "github.com/cloudchacho/hedwig-go" ) -type awsBackend struct { +type backend struct { settings *hedwig.Settings sqs sqsiface.SQSAPI sns snsiface.SNSAPI } -// AWSMetadata is additional metadata associated with a message -type AWSMetadata struct { +// Metadata is additional metadata associated with a message +type Metadata struct { // AWS receipt identifier ReceiptHandle string @@ -48,15 +48,15 @@ type AWSMetadata struct { const sqsWaitTimeoutSeconds int64 = 20 -func (a *awsBackend) getSQSQueueName() string { +func (a *backend) getSQSQueueName() string { return fmt.Sprintf("HEDWIG-%s", a.settings.QueueName) } -func (a *awsBackend) getSNSTopic(messageTopic string) string { +func (a *backend) getSNSTopic(messageTopic string) string { return fmt.Sprintf("arn:aws:sns:%s:%s:hedwig-%s", a.settings.AWSRegion, a.settings.AWSAccountID, messageTopic) } -func (a *awsBackend) getSQSQueueURL(ctx context.Context) (*string, error) { +func (a *backend) getSQSQueueURL(ctx context.Context) (*string, error) { out, err := a.sqs.GetQueueUrlWithContext(ctx, &sqs.GetQueueUrlInput{ QueueName: aws.String(a.getSQSQueueName()), }) @@ -68,7 +68,7 @@ func (a *awsBackend) getSQSQueueURL(ctx context.Context) (*string, error) { // isValidForSQS checks that the payload is allowed in SQS message body since only some UTF8 characters are allowed // ref: https://docs.amazonaws.cn/en_us/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html -func (a *awsBackend) isValidForSQS(payload []byte) bool { +func (a *backend) isValidForSQS(payload []byte) bool { if !utf8.Valid(payload) { return false } @@ -79,7 +79,7 @@ func (a *awsBackend) isValidForSQS(payload []byte) bool { } // Publish a message represented by the payload, with specified attributes to the specific topic -func (a *awsBackend) Publish(ctx context.Context, message *hedwig.Message, payload []byte, attributes map[string]string, topic string) (string, error) { +func (a *backend) Publish(ctx context.Context, message *hedwig.Message, payload []byte, attributes map[string]string, topic string) (string, error) { snsTopic := a.getSNSTopic(topic) var payloadStr string @@ -116,7 +116,7 @@ func (a *awsBackend) Publish(ctx context.Context, message *hedwig.Message, paylo // Receive messages from configured queue(s) and provide it through the callback. This should run indefinitely // until the context is canceled. Provider metadata should include all info necessary to ack/nack a message. -func (a *awsBackend) Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, callback hedwig.ConsumerCallback) error { +func (a *backend) Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, callback hedwig.ConsumerCallback) error { queueURL, err := a.getSQSQueueURL(ctx) if err != nil { return errors.Wrap(err, "failed to get SQS Queue URL") @@ -180,7 +180,7 @@ func (a *awsBackend) Receive(ctx context.Context, numMessages uint32, visibility if err != nil { receiveCount = -1 } - metadata := AWSMetadata{ + metadata := Metadata{ *queueMessage.ReceiptHandle, firstReceiveTime, sentTime, @@ -208,14 +208,14 @@ func (a *awsBackend) Receive(ctx context.Context, numMessages uint32, visibility } // NackMessage nacks a message on the queue -func (a *awsBackend) NackMessage(ctx context.Context, providerMetadata interface{}) error { +func (a *backend) NackMessage(ctx context.Context, providerMetadata interface{}) error { // not supported by AWS return nil } // AckMessage acknowledges a message on the queue -func (a *awsBackend) AckMessage(ctx context.Context, providerMetadata interface{}) error { - receipt := providerMetadata.(AWSMetadata).ReceiptHandle +func (a *backend) AckMessage(ctx context.Context, providerMetadata interface{}) error { + receipt := providerMetadata.(Metadata).ReceiptHandle queueURL, err := a.getSQSQueueURL(ctx) if err != nil { return errors.Wrap(err, "failed to get SQS Queue URL") @@ -227,13 +227,13 @@ func (a *awsBackend) AckMessage(ctx context.Context, providerMetadata interface{ return err } -// NewAWSBackend creates a backend for publishing and consuming from AWS -// The provider metadata produced by this backend will have concrete type: aws.AWSMetadata -func NewAWSBackend(settings *hedwig.Settings, sessionCache *AWSSessionsCache) hedwig.IBackend { +// NewBackend creates a backend for publishing and consuming from AWS +// The provider metadata produced by this backend will have concrete type: aws.Metadata +func NewBackend(settings *hedwig.Settings, sessionCache *SessionsCache) hedwig.IBackend { awsSession := sessionCache.GetSession(settings) - return &awsBackend{ + return &backend{ settings, sqs.New(awsSession), sns.New(awsSession), diff --git a/aws/aws_session.go b/aws/aws_session.go index 0567350..54c88fe 100644 --- a/aws/aws_session.go +++ b/aws/aws_session.go @@ -38,19 +38,19 @@ type sessionKey struct { awsSessionToken string } -// AWSSessionsCache is a cache that holds sessions -type AWSSessionsCache struct { +// SessionsCache is a cache that holds sessions +type SessionsCache struct { sessionMap sync.Map } // NewAWSSessionsCache creates a new session cache -func NewAWSSessionsCache() *AWSSessionsCache { - return &AWSSessionsCache{ +func NewAWSSessionsCache() *SessionsCache { + return &SessionsCache{ sessionMap: sync.Map{}, } } -func (c *AWSSessionsCache) getOrCreateSession(settings *hedwig.Settings) *session.Session { +func (c *SessionsCache) getOrCreateSession(settings *hedwig.Settings) *session.Session { key := sessionKey{awsRegion: settings.AWSRegion, awsAccessKeyID: settings.AWSAccessKey, awsSessionToken: settings.AWSSessionToken} s, ok := c.sessionMap.Load(key) if !ok { @@ -61,6 +61,6 @@ func (c *AWSSessionsCache) getOrCreateSession(settings *hedwig.Settings) *sessio } // GetSession retrieves a session if it is cached, otherwise creates one -func (c *AWSSessionsCache) GetSession(settings *hedwig.Settings) *session.Session { +func (c *SessionsCache) GetSession(settings *hedwig.Settings) *session.Session { return c.getOrCreateSession(settings) } diff --git a/aws/aws_test.go b/aws/aws_test.go index 3123442..af796e1 100644 --- a/aws/aws_test.go +++ b/aws/aws_test.go @@ -96,11 +96,14 @@ func (fs *fakeSQS) SendMessageWithContext(ctx aws.Context, in *sqs.SendMessageIn return args.Get(0).(*sqs.SendMessageOutput), args.Error(1) } +// revive:disable:var-naming func (fs *fakeSQS) GetQueueUrlWithContext(ctx aws.Context, in *sqs.GetQueueUrlInput, opts ...request.Option) (*sqs.GetQueueUrlOutput, error) { args := fs.Called(ctx, in, opts) return args.Get(0).(*sqs.GetQueueUrlOutput), args.Error(1) } +// revive:enable:var-naming + func (fs *fakeSQS) ReceiveMessageWithContext(ctx aws.Context, in *sqs.ReceiveMessageInput, opts ...request.Option) (*sqs.ReceiveMessageOutput, error) { args := fs.Called(ctx, in, opts) return args.Get(0).(*sqs.ReceiveMessageOutput), args.Error(1) @@ -151,11 +154,11 @@ func (s *BackendTestSuite) TestReceive() { s.Require().NoError(err) receiveCount := 1 body := `{"vehicle_id": "C_123"}` - messageId := "123" + messageID := "123" sqsMessage := sqs.Message{ ReceiptHandle: aws.String(receiptHandle), MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "foo": &sqs.MessageAttributeValue{StringValue: aws.String("bar")}, + "foo": {StringValue: aws.String("bar")}, }, Attributes: map[string]*string{ sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp: aws.String("1295500510456"), @@ -163,15 +166,15 @@ func (s *BackendTestSuite) TestReceive() { sqs.MessageSystemAttributeNameApproximateReceiveCount: aws.String(strconv.Itoa(int(receiveCount))), }, Body: aws.String(body), - MessageId: aws.String(messageId), + MessageId: aws.String(messageID), } body2 := `vbI9vCDijJg=` - messageId2 := "456" + messageID2 := "456" sqsMessage2 := sqs.Message{ ReceiptHandle: aws.String(receiptHandle), MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "foo": &sqs.MessageAttributeValue{StringValue: aws.String("bar")}, - "hedwig_encoding": &sqs.MessageAttributeValue{StringValue: aws.String("base64")}, + "foo": {StringValue: aws.String("bar")}, + "hedwig_encoding": {StringValue: aws.String("base64")}, }, Attributes: map[string]*string{ sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp: aws.String("1295500510456"), @@ -179,7 +182,7 @@ func (s *BackendTestSuite) TestReceive() { sqs.MessageSystemAttributeNameApproximateReceiveCount: aws.String(strconv.Itoa(int(receiveCount))), }, Body: aws.String(body2), - MessageId: aws.String(messageId2), + MessageId: aws.String(messageID2), } receiveOutput := &sqs.ReceiveMessageOutput{ Messages: []*sqs.Message{&sqsMessage, &sqsMessage2}, @@ -194,7 +197,7 @@ func (s *BackendTestSuite) TestReceive() { attributes := map[string]string{ "foo": "bar", } - providerMetadata := AWSMetadata{ + providerMetadata := Metadata{ ReceiptHandle: receiptHandle, FirstReceiveTime: firstReceiveTime.UTC(), SentTime: sentTime.UTC(), @@ -256,12 +259,12 @@ func (s *BackendTestSuite) TestReceiveFailedNonUTF8Decoding() { } receiveCount := 1 body := `foobar` - messageId := "123" + messageID := "123" sqsMessage := sqs.Message{ ReceiptHandle: aws.String(receiptHandle), MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "foo": &sqs.MessageAttributeValue{StringValue: aws.String("bar")}, - "hedwig_encoding": &sqs.MessageAttributeValue{StringValue: aws.String("base64")}, + "foo": {StringValue: aws.String("bar")}, + "hedwig_encoding": {StringValue: aws.String("base64")}, }, Attributes: map[string]*string{ sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp: aws.String("1295500510456"), @@ -269,7 +272,7 @@ func (s *BackendTestSuite) TestReceiveFailedNonUTF8Decoding() { sqs.MessageSystemAttributeNameApproximateReceiveCount: aws.String(strconv.Itoa(int(receiveCount))), }, Body: aws.String(body), - MessageId: aws.String(messageId), + MessageId: aws.String(messageID), } receiveOutput := &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{&sqsMessage}} s.fakeSQS.On("ReceiveMessageWithContext", ctx, receiveInput, []request.Option(nil)). @@ -452,11 +455,11 @@ func (s *BackendTestSuite) TestReceiveMissingAttributes() { } receiptHandle := "123" body := `{"vehicle_id": "C_123"}` - messageId := "123" + messageID := "123" sqsMessage := sqs.Message{ ReceiptHandle: aws.String(receiptHandle), MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "foo": &sqs.MessageAttributeValue{StringValue: aws.String("bar")}, + "foo": {StringValue: aws.String("bar")}, }, Attributes: map[string]*string{ sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp: aws.String(""), @@ -464,7 +467,7 @@ func (s *BackendTestSuite) TestReceiveMissingAttributes() { sqs.MessageSystemAttributeNameApproximateReceiveCount: aws.String(""), }, Body: aws.String(body), - MessageId: aws.String(messageId), + MessageId: aws.String(messageID), } receiveOutput := &sqs.ReceiveMessageOutput{ Messages: []*sqs.Message{&sqsMessage}, @@ -479,7 +482,7 @@ func (s *BackendTestSuite) TestReceiveMissingAttributes() { attributes := map[string]string{ "foo": "bar", } - providerMetadata := AWSMetadata{ + providerMetadata := Metadata{ ReceiptHandle: receiptHandle, FirstReceiveTime: time.Time{}, SentTime: time.Time{}, @@ -531,9 +534,9 @@ func (s *BackendTestSuite) TestPublish() { s.fakeSNS.On("PublishWithContext", ctx, expectedSnsInput, mock.Anything). Return(output, nil) - messageId, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) + messageID, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) s.NoError(err) - s.Equal(messageId, "123") + s.Equal(messageID, "123") s.fakeSNS.AssertExpectations(s.T()) } @@ -569,9 +572,9 @@ func (s *BackendTestSuite) TestPublishInvalidCharacters() { s.fakeSNS.On("PublishWithContext", ctx, expectedSnsInput, mock.Anything). Return(output, nil) - messageId, err := s.backend.Publish(ctx, s.message, invalidPayload, s.attributes, msgTopic) + messageID, err := s.backend.Publish(ctx, s.message, invalidPayload, s.attributes, msgTopic) s.NoError(err) - s.Equal(messageId, "123") + s.Equal(messageID, "123") s.fakeSNS.AssertExpectations(s.T()) } @@ -628,7 +631,7 @@ func (s *BackendTestSuite) TestAck() { s.fakeSQS.On("DeleteMessageWithContext", ctx, deleteInput, mock.Anything). Return(deleteOutput, nil) - err := s.backend.AckMessage(ctx, AWSMetadata{ReceiptHandle: receiptHandle}) + err := s.backend.AckMessage(ctx, Metadata{ReceiptHandle: receiptHandle}) s.NoError(err) s.fakeSQS.AssertExpectations(s.T()) @@ -657,7 +660,7 @@ func (s *BackendTestSuite) TestAckError() { s.fakeSQS.On("DeleteMessageWithContext", ctx, deleteInput, mock.Anything). Return((*sqs.DeleteMessageOutput)(nil), errors.New("failed to ack")) - err := s.backend.AckMessage(ctx, AWSMetadata{ReceiptHandle: receiptHandle}) + err := s.backend.AckMessage(ctx, Metadata{ReceiptHandle: receiptHandle}) s.EqualError(err, "failed to ack") s.fakeSQS.AssertExpectations(s.T()) @@ -675,7 +678,7 @@ func (s *BackendTestSuite) TestAckGetQueueError() { receiptHandle := "foobar" - err := s.backend.AckMessage(ctx, AWSMetadata{ReceiptHandle: receiptHandle}) + err := s.backend.AckMessage(ctx, Metadata{ReceiptHandle: receiptHandle}) s.EqualError(err, "failed to get SQS Queue URL: no internet") s.fakeSQS.AssertExpectations(s.T()) @@ -686,7 +689,7 @@ func (s *BackendTestSuite) TestNack() { receiptHandle := "foobar" - err := s.backend.NackMessage(ctx, AWSMetadata{ReceiptHandle: receiptHandle}) + err := s.backend.NackMessage(ctx, Metadata{ReceiptHandle: receiptHandle}) s.NoError(err) // no calls expected @@ -699,7 +702,7 @@ func (s *BackendTestSuite) TestNew() { type BackendTestSuite struct { suite.Suite - backend *awsBackend + backend *backend settings *hedwig.Settings fakeSQS *fakeSQS fakeSNS *fakeSNS @@ -739,7 +742,7 @@ func (s *BackendTestSuite) SetupTest() { payload := []byte(`{"vehicle_id": "C_123"}`) attributes := map[string]string{"foo": "bar"} - s.backend = NewAWSBackend(settings, NewAWSSessionsCache()).(*awsBackend) + s.backend = NewBackend(settings, NewAWSSessionsCache()).(*backend) s.backend.sqs = fakeSQS s.backend.sns = fakeSNS s.settings = settings diff --git a/consumer.go b/consumer.go index b0b1f28..2e8ae8e 100644 --- a/consumer.go +++ b/consumer.go @@ -57,7 +57,7 @@ func (c *queueConsumer) processMessage(ctx context.Context, payload []byte, attr return } - loggingFields = LoggingFields{"message_id": message.ID} + loggingFields = LoggingFields{"message_id": message.ID, "type": message.Type, "version": message.DataSchemaVersion} callbackKey := MessageTypeMajorVersion{message.Type, uint(message.DataSchemaVersion.Major())} var callback CallbackFunction diff --git a/examples/main.go b/examples/main.go index cb9b233..06a6af3 100644 --- a/examples/main.go +++ b/examples/main.go @@ -11,7 +11,7 @@ import ( "github.com/cloudchacho/hedwig-go/gcp" "github.com/cloudchacho/hedwig-go/jsonschema" "github.com/cloudchacho/hedwig-go/protobuf" - "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/proto" ) func settings(isProtobuf bool, publisherBackend string) *hedwig.Settings { @@ -36,7 +36,7 @@ func settings(isProtobuf bool, publisherBackend string) *hedwig.Settings { QueueName: queueName, Subscriptions: []string{"dev-user-created-v1"}, MessageRouting: map[hedwig.MessageTypeMajorVersion]string{ - hedwig.MessageTypeMajorVersion{ + { MessageType: "user-created", MajorVersion: 1, }: "dev-user-created-v1", @@ -94,7 +94,7 @@ func encoder(isProtobuf bool) hedwig.IEncoder { factoryRegistry := registry(isProtobuf) if isProtobuf { encoder, err = protobuf.NewMessageEncoder( - []protoreflect.Message{(&UserCreatedV1{}).ProtoReflect()}, + []proto.Message{&UserCreatedV1{}}, ) } else { encoder, err = jsonschema.NewMessageEncoder("schema.json", factoryRegistry) @@ -109,9 +109,9 @@ func encoder(isProtobuf bool) hedwig.IEncoder { func backend(settings *hedwig.Settings, publisherBackend string) hedwig.IBackend { if publisherBackend == "aws" { awsSessionCache := aws.NewAWSSessionsCache() - return aws.NewAWSBackend(settings, awsSessionCache) + return aws.NewBackend(settings, awsSessionCache) } else if publisherBackend == "gcp" { - return gcp.NewGCPBackend(settings) + return gcp.NewBackend(settings) } else { panic(fmt.Sprintf("unknown backend name: %s", publisherBackend)) } @@ -137,11 +137,11 @@ func runPublisher(isProtobuf bool, publisherBackend string) { if err != nil { panic(fmt.Sprintf("Failed to create message: %v", err)) } - messageId, err := publisher.Publish(context.Background(), message) + messageID, err := publisher.Publish(context.Background(), message) if err != nil { panic(fmt.Sprintf("Failed to publish message: %v", err)) } - fmt.Printf("Published message with id %s successfully with publish id: %s\n", message.ID, messageId) + fmt.Printf("Published message with id %s successfully with publish id: %s\n", message.ID, messageID) } func main() { diff --git a/gcp/gcp.go b/gcp/gcp.go index 5d9f1c5..398058a 100644 --- a/gcp/gcp.go +++ b/gcp/gcp.go @@ -13,15 +13,15 @@ import ( "github.com/cloudchacho/hedwig-go" ) -type gcpBackend struct { +type backend struct { settings *hedwig.Settings client *pubsub.Client } const defaultVisibilityTimeoutS = time.Second * 20 -// GCPMetadata is additional metadata associated with a message -type GCPMetadata struct { +// Metadata is additional metadata associated with a message +type Metadata struct { // Underlying pubsub message - ack id isn't exported so we have to store this object pubsubMessage *pubsub.Message @@ -35,7 +35,7 @@ type GCPMetadata struct { } // Publish a message represented by the payload, with specified attributes to the specific topic -func (g *gcpBackend) Publish(ctx context.Context, message *hedwig.Message, payload []byte, attributes map[string]string, topic string) (string, error) { +func (g *backend) Publish(ctx context.Context, message *hedwig.Message, payload []byte, attributes map[string]string, topic string) (string, error) { err := g.ensureClient(ctx) if err != nil { return "", err @@ -51,16 +51,16 @@ func (g *gcpBackend) Publish(ctx context.Context, message *hedwig.Message, paylo Attributes: attributes, }, ) - messageId, err := result.Get(ctx) + messageID, err := result.Get(ctx) if err != nil { return "", errors.Wrap(err, "Failed to publish message to Pub/Sub") } - return messageId, nil + return messageID, nil } // Receive messages from configured queue(s) and provide it through the callback. This should run indefinitely // until the context is canceled. Provider metadata should include all info necessary to ack/nack a message. -func (g *gcpBackend) Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, callback hedwig.ConsumerCallback) error { +func (g *backend) Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, callback hedwig.ConsumerCallback) error { err := g.ensureClient(ctx) if err != nil { return err @@ -86,6 +86,7 @@ func (g *gcpBackend) Receive(ctx context.Context, numMessages uint32, visibility for _, subscription := range subscriptions { pubsubSubscription := g.client.Subscription(subscription) + pubsubSubscription.ReceiveSettings.NumGoroutines = 1 pubsubSubscription.ReceiveSettings.MaxOutstandingMessages = int(numMessages) if visibilityTimeout != 0 { pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = visibilityTimeout @@ -94,7 +95,7 @@ func (g *gcpBackend) Receive(ctx context.Context, numMessages uint32, visibility } group.Go(func() error { recvErr := pubsubSubscription.Receive(gctx, func(ctx context.Context, message *pubsub.Message) { - metadata := GCPMetadata{ + metadata := Metadata{ pubsubMessage: message, PublishTime: message.PublishTime, DeliveryAttempt: *message.DeliveryAttempt, @@ -114,18 +115,18 @@ func (g *gcpBackend) Receive(ctx context.Context, numMessages uint32, visibility } // NackMessage nacks a message on the queue -func (g *gcpBackend) NackMessage(ctx context.Context, providerMetadata interface{}) error { - providerMetadata.(GCPMetadata).pubsubMessage.Nack() +func (g *backend) NackMessage(ctx context.Context, providerMetadata interface{}) error { + providerMetadata.(Metadata).pubsubMessage.Nack() return nil } // AckMessage acknowledges a message on the queue -func (g *gcpBackend) AckMessage(ctx context.Context, providerMetadata interface{}) error { - providerMetadata.(GCPMetadata).pubsubMessage.Ack() +func (g *backend) AckMessage(ctx context.Context, providerMetadata interface{}) error { + providerMetadata.(Metadata).pubsubMessage.Ack() return nil } -func (g *gcpBackend) ensureClient(ctx context.Context) error { +func (g *backend) ensureClient(ctx context.Context) error { googleCloudProject := g.settings.GoogleCloudProject if googleCloudProject == "" { creds, err := google.FindDefaultCredentials(ctx) @@ -149,8 +150,8 @@ func (g *gcpBackend) ensureClient(ctx context.Context) error { return nil } -// NewGCPBackend creates a backend for publishing and consuming from GCP -// The provider metadata produced by this backend will have concrete type: gcp.GCPMetadata -func NewGCPBackend(settings *hedwig.Settings) hedwig.IBackend { - return &gcpBackend{settings: settings} +// NewBackend creates a backend for publishing and consuming from GCP +// The provider metadata produced by this backend will have concrete type: gcp.Metadata +func NewBackend(settings *hedwig.Settings) hedwig.IBackend { + return &backend{settings: settings} } diff --git a/gcp/gcp_test.go b/gcp/gcp_test.go index a7ddb30..18b1a1f 100644 --- a/gcp/gcp_test.go +++ b/gcp/gcp_test.go @@ -55,7 +55,7 @@ func (s *BackendTestSuite) publish(payload []byte, attributes map[string]string, } func (s *BackendTestSuite) TestReceive() { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() numMessages := uint32(10) visibilityTimeout := time.Second * 10 @@ -73,7 +73,7 @@ func (s *BackendTestSuite) TestReceive() { err = s.publish(payload2, attributes2, "hedwig-dev-user-created-v1") s.Require().NoError(err) - s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), payload, attributes, mock.AnythingOfType("gcp.GCPMetadata")). + s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), payload, attributes, mock.AnythingOfType("gcp.Metadata")). // message must be acked or Receive never returns Run(func(args mock.Arguments) { err := s.backend.AckMessage(ctx, args.Get(3)) @@ -81,7 +81,7 @@ func (s *BackendTestSuite) TestReceive() { }). Return(). Once() - s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), payload2, attributes2, mock.AnythingOfType("gcp.GCPMetadata")). + s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), payload2, attributes2, mock.AnythingOfType("gcp.Metadata")). // message must be acked or Receive never returns Run(func(args mock.Arguments) { err := s.backend.AckMessage(ctx, args.Get(3)) @@ -92,27 +92,27 @@ func (s *BackendTestSuite) TestReceive() { // force method to return after just one loop After(time.Millisecond * 110) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond*200)) + defer cancel() ch := make(chan bool) go func() { err := s.backend.Receive(ctx, numMessages, visibilityTimeout, s.fakeConsumerCallback.Callback) - s.EqualError(err, "context canceled") + s.True(err.Error() == "draining" || err == context.DeadlineExceeded) ch <- true close(ch) }() - time.Sleep(time.Millisecond * 500) - cancel() // wait for co-routine to finish <-ch s.fakeConsumerCallback.AssertExpectations(s.T()) - providerMetadata := s.fakeConsumerCallback.Mock.Calls[0].Arguments.Get(3).(gcp.GCPMetadata) + providerMetadata := s.fakeConsumerCallback.Mock.Calls[0].Arguments.Get(3).(gcp.Metadata) s.Equal(1, providerMetadata.DeliveryAttempt) } func (s *BackendTestSuite) TestReceiveCrossProject() { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() numMessages := uint32(10) visibilityTimeout := time.Second * 10 @@ -126,7 +126,7 @@ func (s *BackendTestSuite) TestReceiveCrossProject() { err := s.publish(payload, attributes, "hedwig-dev-user-created-v1") s.Require().NoError(err) - s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), payload, attributes, mock.AnythingOfType("gcp.GCPMetadata")). + s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), payload, attributes, mock.AnythingOfType("gcp.Metadata")). // message must be acked or Receive never returns Run(func(args mock.Arguments) { err := s.backend.AckMessage(ctx, args.Get(3)) @@ -135,39 +135,38 @@ func (s *BackendTestSuite) TestReceiveCrossProject() { Return(). Once() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond*200)) + defer cancel() ch := make(chan bool) go func() { err := s.backend.Receive(ctx, numMessages, visibilityTimeout, s.fakeConsumerCallback.Callback) - s.EqualError(err, "context canceled") + s.True(err.Error() == "draining" || err == context.DeadlineExceeded) ch <- true close(ch) }() - time.Sleep(time.Millisecond * 500) - cancel() // wait for co-routine to finish <-ch s.fakeConsumerCallback.AssertExpectations(s.T()) - providerMetadata := s.fakeConsumerCallback.Mock.Calls[0].Arguments.Get(3).(gcp.GCPMetadata) + providerMetadata := s.fakeConsumerCallback.Mock.Calls[0].Arguments.Get(3).(gcp.Metadata) s.Equal(1, providerMetadata.DeliveryAttempt) } func (s *BackendTestSuite) TestReceiveNoMessages() { - ctx, cancel := context.WithCancel(context.Background()) numMessages := uint32(10) visibilityTimeout := time.Second * 10 + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*200)) + defer cancel() ch := make(chan bool) go func() { err := s.backend.Receive(ctx, numMessages, visibilityTimeout, s.fakeConsumerCallback.Callback) - s.EqualError(err, "context canceled") + s.True(err.Error() == "draining" || err == context.DeadlineExceeded) ch <- true close(ch) }() - time.Sleep(time.Millisecond * 1) - cancel() // wait for co-routine to finish <-ch @@ -202,9 +201,9 @@ func (s *BackendTestSuite) TestPublish() { msgTopic := "dev-user-created-v1" - messageId, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) + messageID, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) s.NoError(err) - s.NotEmpty(messageId) + s.NotEmpty(messageID) err = s.client.Subscription("hedwig-dev-myapp-dev-user-created-v1").Receive(ctx, func(_ context.Context, message *pubsub.Message) { cancel() @@ -223,22 +222,25 @@ func (s *BackendTestSuite) TestPublishFailure() { } func (s *BackendTestSuite) TestAck() { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() msgTopic := "dev-user-created-v1" - messageId, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) + messageID, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) s.NoError(err) - s.NotEmpty(messageId) + s.NotEmpty(messageID) - err = s.client.Subscription("hedwig-dev-myapp-dev-user-created-v1").Receive(ctx, func(_ context.Context, message *pubsub.Message) { - cancel() + ctx2, cancel2 := context.WithCancel(ctx) + err = s.client.Subscription("hedwig-dev-myapp-dev-user-created-v1").Receive(ctx2, func(_ context.Context, message *pubsub.Message) { + defer cancel2() s.Equal(message.Data, s.payload) s.Equal(message.Attributes, s.attributes) message.Ack() }) s.NoError(err) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond*200)) + defer cancel() ch := make(chan bool) go func() { err := s.client.Subscription("hedwig-dev-myapp-dev-user-created-v1").Receive(ctx, func(_ context.Context, message *pubsub.Message) { @@ -247,40 +249,38 @@ func (s *BackendTestSuite) TestAck() { ch <- true s.Require().NoError(err) }() - time.Sleep(time.Millisecond * 100) - cancel() // wait for co-routine to finish <-ch } func (s *BackendTestSuite) TestNack() { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() numMessages := uint32(10) visibilityTimeout := time.Second * 10 msgTopic := "dev-user-created-v1" - messageId, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) + messageID, err := s.backend.Publish(ctx, s.message, s.payload, s.attributes, msgTopic) s.NoError(err) - s.NotEmpty(messageId) + s.NotEmpty(messageID) - s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), s.payload, s.attributes, mock.AnythingOfType("gcp.GCPMetadata")). + s.fakeConsumerCallback.On("Callback", mock.AnythingOfType("*context.cancelCtx"), s.payload, s.attributes, mock.AnythingOfType("gcp.Metadata")). Run(func(args mock.Arguments) { err := s.backend.NackMessage(ctx, args.Get(3)) s.Require().NoError(err) }). Return() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond*200)) + defer cancel() ch := make(chan bool) go func() { err := s.backend.Receive(ctx, numMessages, visibilityTimeout, s.fakeConsumerCallback.Callback) - s.EqualError(err, "context canceled") + s.True(err.Error() == "draining" || err == context.DeadlineExceeded) ch <- true close(ch) }() - time.Sleep(time.Millisecond * 200) - cancel() // wait for co-routine to finish <-ch @@ -414,7 +414,7 @@ func (s *BackendTestSuite) SetupTest() { payload := []byte(`{"vehicle_id": "C_123"}`) attributes := map[string]string{"foo": "bar"} - s.backend = gcp.NewGCPBackend(settings) + s.backend = gcp.NewBackend(settings) s.settings = settings s.message = message s.validator = validator diff --git a/jsonschema/jsonschema.go b/jsonschema/jsonschema.go index 4c1bfd7..3e4b5fb 100644 --- a/jsonschema/jsonschema.go +++ b/jsonschema/jsonschema.go @@ -178,15 +178,17 @@ func NewEncoderFromBytes(schemaFile []byte, dataRegistry hedwig.DataFactoryRegis return nil, err } - if value, ok := schema.Extensions[xVersionKey]; !ok { + var value interface{} + var ok bool + + if value, ok = schema.Extensions[xVersionKey]; !ok { return nil, errors.Errorf("Missing x-version from schema definition for %s", messageType) - } else { - xVersion := value.(*semver.Version) - if xVersion.Major() != int64(majorVersion) { - return nil, errors.Errorf("Invalid x-version: %d.%d for: %s/%s", - xVersion.Major(), xVersion.Minor(), messageType, version, - ) - } + } + xVersion := value.(*semver.Version) + if xVersion.Major() != int64(majorVersion) { + return nil, errors.Errorf("Invalid x-version: %d.%d for: %s/%s", + xVersion.Major(), xVersion.Minor(), messageType, version, + ) } schemaKey := hedwig.MessageTypeMajorVersion{messageType, majorVersion} diff --git a/models_test.go b/models_test.go index 1d0a4f5..03de81a 100644 --- a/models_test.go +++ b/models_test.go @@ -164,14 +164,14 @@ func TestMessagePublish(t *testing.T) { require.NoError(t, err) publisher := &fakePublisher{} - messageId := "123" + messageID := "123" publisher.On("Publish", ctx, m). - Return(messageId, nil) + Return(messageID, nil) - returnedMessageId, err := m.Publish(ctx, publisher) + returnedmessageID, err := m.Publish(ctx, publisher) assert.NoError(t, err) - assert.Equal(t, messageId, returnedMessageId) + assert.Equal(t, messageID, returnedmessageID) publisher.AssertExpectations(t) } diff --git a/protobuf/protobuf.go b/protobuf/protobuf.go index b61e3d4..f5025b2 100644 --- a/protobuf/protobuf.go +++ b/protobuf/protobuf.go @@ -73,21 +73,22 @@ func NewMessageEncoderFromMessageTypes(protoMessages map[hedwig.MessageTypeMajor // This method will try to read message type from message_options, and if not specified, // assume that the messages are named as: `V`. If that doesn't work // for your use case, use NewMessageEncoderFromMessageTypes and provide an explicit mapping. -func NewMessageEncoder(protoMessages []protoreflect.Message) (hedwig.IEncoder, error) { +func NewMessageEncoder(protoMessages []proto.Message) (hedwig.IEncoder, error) { protoMessagesMap := map[hedwig.MessageTypeMajorVersion]protoreflect.Message{} versions := map[hedwig.MessageTypeMajorVersion]*semver.Version{} for _, msg := range protoMessages { - desc := msg.Descriptor() + desc := msg.ProtoReflect().Descriptor() var messageType string var majorVersion uint if matches := messageNameRegex.FindStringSubmatch(string(desc.Name())); len(matches) > 0 { messageType = matches[1] - if majorVersionSigned, err := strconv.Atoi(matches[2]); err != nil { + var err error + var majorVersionSigned int + if majorVersionSigned, err = strconv.Atoi(matches[2]); err != nil { // will never happen, message name already passed regex return nil, err - } else { - majorVersion = uint(majorVersionSigned) } + majorVersion = uint(majorVersionSigned) } options := desc.Options().(*descriptorpb.MessageOptions) hedwigMsgOpts := proto.GetExtension(options, E_MessageOptions).(*MessageOptions) @@ -116,7 +117,7 @@ func NewMessageEncoder(protoMessages []protoreflect.Message) (hedwig.IEncoder, e if _, ok := protoMessagesMap[schemaKey]; ok { return nil, errors.Errorf("duplicate message found for %s %d", messageType, majorVersion) } - protoMessagesMap[schemaKey] = msg + protoMessagesMap[schemaKey] = msg.ProtoReflect() versions[schemaKey] = version } return &messageEncoder{protoMsgs: protoMessagesMap, versions: versions}, nil diff --git a/protobuf/protobuf_test.go b/protobuf/protobuf_test.go index af60f75..e9682d5 100644 --- a/protobuf/protobuf_test.go +++ b/protobuf/protobuf_test.go @@ -262,11 +262,11 @@ type EncoderTestSuite struct { } func (s *EncoderTestSuite) SetupTest() { - protoMsgs := []protoreflect.Message{ - (&internal.TripCreatedV1{}).ProtoReflect(), - (&internal.TripCreatedV2{}).ProtoReflect(), - (&internal.DeviceCreatedV1{}).ProtoReflect(), - (&internal.VehicleCreatedV1{}).ProtoReflect(), + protoMsgs := []proto.Message{ + &internal.TripCreatedV1{}, + &internal.TripCreatedV2{}, + &internal.DeviceCreatedV1{}, + &internal.VehicleCreatedV1{}, } encoder, err := protobuf.NewMessageEncoder(protoMsgs) require.NoError(s.T(), err) @@ -330,8 +330,8 @@ func TestNewMessageEncoderFromMessageTypesMajorVersionMismatch(t *testing.T) { func TestInvalidSchemaBadNameNoMessageType(t *testing.T) { assertions := assert.New(t) - invalidProtoMsgs := []protoreflect.Message{ - (&internal.DeviceCreated{}).ProtoReflect(), + invalidProtoMsgs := []proto.Message{ + &internal.DeviceCreated{}, } v, err := protobuf.NewMessageEncoder(invalidProtoMsgs) assertions.Nil(v) @@ -340,8 +340,8 @@ func TestInvalidSchemaBadNameNoMessageType(t *testing.T) { func TestInvalidSchemaBadNameNoMajorVersion(t *testing.T) { assertions := assert.New(t) - invalidProtoMsgs := []protoreflect.Message{ - (&internal.DeviceCreatedNew{}).ProtoReflect(), + invalidProtoMsgs := []proto.Message{ + &internal.DeviceCreatedNew{}, } v, err := protobuf.NewMessageEncoder(invalidProtoMsgs) assertions.Nil(v) @@ -350,8 +350,8 @@ func TestInvalidSchemaBadNameNoMajorVersion(t *testing.T) { func TestInvalidSchemaMajorVersionMismatch(t *testing.T) { assertions := assert.New(t) - invalidProtoMsgs := []protoreflect.Message{ - (&internal.TripCreatedV4{}).ProtoReflect(), + invalidProtoMsgs := []proto.Message{ + &internal.TripCreatedV4{}, } v, err := protobuf.NewMessageEncoder(invalidProtoMsgs) assertions.Nil(v) @@ -360,9 +360,9 @@ func TestInvalidSchemaMajorVersionMismatch(t *testing.T) { func TestInvalidSchemaDuplicate(t *testing.T) { assertions := assert.New(t) - invalidProtoMsgs := []protoreflect.Message{ - (&internal.TripCreatedV2{}).ProtoReflect(), - (&internal.TripCreatedV2New{}).ProtoReflect(), + invalidProtoMsgs := []proto.Message{ + &internal.TripCreatedV2{}, + &internal.TripCreatedV2New{}, } v, err := protobuf.NewMessageEncoder(invalidProtoMsgs) assertions.Nil(v) diff --git a/publisher_test.go b/publisher_test.go index 13085d9..7a7288d 100644 --- a/publisher_test.go +++ b/publisher_test.go @@ -29,14 +29,14 @@ func (s *PublisherTestSuite) TestPublish() { s.validator.On("Serialize", message). Return(payload, headers, nil) - messageId := "123" + messageID := "123" s.backend.On("Publish", ctx, message, payload, headers, "dev-user-created-v1"). - Return(messageId, nil) + Return(messageID, nil) - receivedMessageId, err := s.publisher.Publish(ctx, message) + receivedmessageID, err := s.publisher.Publish(ctx, message) s.Nil(err) - s.Equal(messageId, receivedMessageId) + s.Equal(messageID, receivedmessageID) s.backend.AssertExpectations(s.T()) } diff --git a/validator.go b/validator.go index 014b097..1eb8812 100644 --- a/validator.go +++ b/validator.go @@ -59,40 +59,38 @@ type messageValidator struct { // decodeMetaAttributes decodes message transport attributes as MetaAttributes func (v *messageValidator) decodeMetaAttributes(attributes map[string]string) (MetaAttributes, error) { metaAttrs := MetaAttributes{} - if value, ok := attributes["hedwig_format_version"]; !ok { + var value string + var ok bool + if value, ok = attributes["hedwig_format_version"]; !ok { return metaAttrs, errors.New("value not found for attribute: 'hedwig_format_version'") - } else { - if version, err := semver.NewVersion(value); err != nil { - return metaAttrs, errors.Errorf("invalid value '%s' found for attribute: 'hedwig_format_version'", value) - } else { - metaAttrs.FormatVersion = version - } } - if value, ok := attributes["hedwig_id"]; !ok { + var version *semver.Version + var err error + if version, err = semver.NewVersion(value); err != nil { + return metaAttrs, errors.Errorf("invalid value '%s' found for attribute: 'hedwig_format_version'", value) + } + metaAttrs.FormatVersion = version + if value, ok = attributes["hedwig_id"]; !ok { return metaAttrs, errors.New("value not found for attribute: 'hedwig_id'") - } else { - metaAttrs.ID = value } - if value, ok := attributes["hedwig_message_timestamp"]; !ok { + metaAttrs.ID = value + if value, ok = attributes["hedwig_message_timestamp"]; !ok { return metaAttrs, errors.New("value not found for attribute: 'hedwig_id'") - } else { - if timestamp, err := strconv.ParseInt(value, 10, 64); err != nil { - return metaAttrs, errors.Errorf("invalid value '%s' found for attribute: 'hedwig_message_timestamp'", value) - } else { - unixTime := time.Unix(0, timestamp*int64(time.Millisecond)) - metaAttrs.Timestamp = unixTime - } } - if value, ok := attributes["hedwig_publisher"]; !ok { + var timestamp int64 + if timestamp, err = strconv.ParseInt(value, 10, 64); err != nil { + return metaAttrs, errors.Errorf("invalid value '%s' found for attribute: 'hedwig_message_timestamp'", value) + } + unixTime := time.Unix(0, timestamp*int64(time.Millisecond)) + metaAttrs.Timestamp = unixTime + if value, ok = attributes["hedwig_publisher"]; !ok { return metaAttrs, errors.New("value not found for attribute: 'hedwig_publisher'") - } else { - metaAttrs.Publisher = value } - if value, ok := attributes["hedwig_schema"]; !ok { + metaAttrs.Publisher = value + if value, ok = attributes["hedwig_schema"]; !ok { return metaAttrs, errors.New("value not found for attribute: 'hedwig_schema'") - } else { - metaAttrs.Schema = value } + metaAttrs.Schema = value if len(attributes) != 0 { metaAttrs.Headers = map[string]string{}