diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 8d54ed2e8f2d..9448c88ca213 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -320,6 +320,10 @@ func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error) if err := checkTopicMessageRetention(t.MessageRetentionDuration); err != nil { return nil, err } + // Take any ingestion setting to mean the topic is active. + if t.IngestionDataSourceSettings != nil { + t.State = pb.Topic_ACTIVE + } top := newTopic(t) s.topics[t.Name] = top return top.proto, nil @@ -384,6 +388,15 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p t.proto.SchemaSettings = &pb.SchemaSettings{} } t.proto.SchemaSettings.LastRevisionId = req.Topic.SchemaSettings.LastRevisionId + case "ingestion_data_source_settings": + if t.proto.IngestionDataSourceSettings == nil { + t.proto.IngestionDataSourceSettings = &pb.IngestionDataSourceSettings{} + } + t.proto.IngestionDataSourceSettings = req.Topic.IngestionDataSourceSettings + // Take any ingestion setting to mean the topic is active. + if t.proto.IngestionDataSourceSettings != nil { + t.proto.State = pb.Topic_ACTIVE + } default: return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path) } diff --git a/pubsub/topic.go b/pubsub/topic.go index 14ef1fe2c5a7..b85b4b1c342a 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -202,6 +202,23 @@ func newTopic(c *Client, name string) *Topic { } } +// TopicState denotes the possible states for a topic. +type TopicState int + +const ( + // TopicStateUnspecified is the default value. This value is unused. + TopicStateUnspecified = iota + + // TopicStateActive means the topic does not have any persistent errors. + TopicStateActive + + // TopicStateIngestionResourceError means ingestion from the data source + // has encountered a permanent error. + // See the more detailed error state in the corresponding ingestion + // source configuration. + TopicStateIngestionResourceError +) + // TopicConfig describes the configuration of a topic. type TopicConfig struct { // The fully qualified identifier for the topic, in the format "projects//topics/" @@ -232,6 +249,13 @@ type TopicConfig struct { // // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention. RetentionDuration optional.Duration + + // State is an output-only field indicating the state of the topic. + State TopicState + + // IngestionDataSourceSettings are settings for ingestion from a + // data source into this topic. + IngestionDataSourceSettings *IngestionDataSourceSettings } // String returns the printable globally unique name for the topic config. @@ -260,11 +284,12 @@ func (tc *TopicConfig) toProto() *pb.Topic { retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration)) } pbt := &pb.Topic{ - Labels: tc.Labels, - MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), - KmsKeyName: tc.KMSKeyName, - SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), - MessageRetentionDuration: retDur, + Labels: tc.Labels, + MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), + KmsKeyName: tc.KMSKeyName, + SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), + MessageRetentionDuration: retDur, + IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(), } return pbt } @@ -296,15 +321,23 @@ type TopicConfigToUpdate struct { // // Use the zero value &SchemaSettings{} to remove the schema from the topic. SchemaSettings *SchemaSettings + + // IngestionDataSourceSettings are settings for ingestion from a + // data source into this topic. + // + // Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic. + IngestionDataSourceSettings *IngestionDataSourceSettings } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { tc := TopicConfig{ - name: pbt.Name, - Labels: pbt.Labels, - MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), - KMSKeyName: pbt.KmsKeyName, - SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), + name: pbt.Name, + Labels: pbt.Labels, + MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), + KMSKeyName: pbt.KmsKeyName, + SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), + State: TopicState(pbt.State), + IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings), } if pbt.GetMessageRetentionDuration() != nil { tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration() @@ -364,6 +397,122 @@ func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePo return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions} } +// IngestionDataSourceSettings enables ingestion from a data source into this topic. +type IngestionDataSourceSettings struct { + Source IngestionDataSource +} + +// IngestionDataSource is the kind of ingestion source to be used. +type IngestionDataSource interface { + isIngestionDataSource() bool +} + +// AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams. +type AWSKinesisState int + +const ( + // AWSKinesisStateUnspecified is the default value. This value is unused. + AWSKinesisStateUnspecified = iota + + // AWSKinesisStateActive means ingestion is active. + AWSKinesisStateActive + + // AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis. + // This can happen if: + // - The provided `aws_role_arn` does not exist or does not have the + // appropriate permissions attached. + // - The provided `aws_role_arn` is not set up properly for Identity + // Federation using `gcp_service_account`. + // - The Pub/Sub SA is not granted the + // `iam.serviceAccounts.getOpenIdToken` permission on + // `gcp_service_account`. + AWSKinesisStatePermissionDenied + + // AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic. + // This can happen due to Pub/Sub SA has not been granted the appropriate publish + // permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher + AWSKinesisStatePublishPermissionDenied + + // AWSKinesisStateStreamNotFound means the Kinesis stream does not exist. + AWSKinesisStateStreamNotFound + + // AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist. + AWSKinesisStateConsumerNotFound +) + +// IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams. +type IngestionDataSourceAWSKinesis struct { + // State is an output-only field indicating the state of the kinesis connection. + State AWSKinesisState + + // StreamARN is the Kinesis stream ARN to ingest data from. + StreamARN string + + // ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced + // Fan-Out mode. The consumer must be already created and ready to be used. + ConsumerARN string + + // AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication + // with Kinesis. Check the Pub/Sub docs for how to set up this role and the + // required permissions that need to be attached to it. + AWSRoleARN string + + // GCPServiceAccount is the GCP service account to be used for Federated Identity + // authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for + // the provided role). The `aws_role_arn` must be set up with + // `accounts.google.com:sub` equals to this service account number. + GCPServiceAccount string +} + +var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil) + +func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool { + return true +} + +func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings { + if pbs == nil { + return nil + } + + s := &IngestionDataSourceSettings{} + if k := pbs.GetAwsKinesis(); k != nil { + s.Source = &IngestionDataSourceAWSKinesis{ + State: AWSKinesisState(k.State), + StreamARN: k.GetStreamArn(), + ConsumerARN: k.GetConsumerArn(), + AWSRoleARN: k.GetAwsRoleArn(), + GCPServiceAccount: k.GetGcpServiceAccount(), + } + } + return s +} + +func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings { + if i == nil { + return nil + } + // An empty/zero-valued config is treated the same as nil and clearing this setting. + if (IngestionDataSourceSettings{}) == *i { + return nil + } + pbs := &pb.IngestionDataSourceSettings{} + if out := i.Source; out != nil { + if k, ok := out.(*IngestionDataSourceAWSKinesis); ok { + pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{ + AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{ + State: pb.IngestionDataSourceSettings_AwsKinesis_State(k.State), + StreamArn: k.StreamARN, + ConsumerArn: k.ConsumerARN, + AwsRoleArn: k.AWSRoleARN, + GcpServiceAccount: k.GCPServiceAccount, + }, + } + } + } + return pbs +} + // Config returns the TopicConfig for the topic. func (t *Topic) Config(ctx context.Context) (TopicConfig, error) { pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) @@ -437,6 +586,10 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { pt.SchemaSettings = nil } } + if cfg.IngestionDataSourceSettings != nil { + pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto() + paths = append(paths, "ingestion_data_source_settings") + } return &pb.UpdateTopicRequest{ Topic: pt, UpdateMask: &fmpb.FieldMask{Paths: paths}, diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 4b7d73da3983..eb53bef5f370 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -108,6 +108,63 @@ func TestCreateTopicWithConfig(t *testing.T) { } } +func TestTopic_IngestionKinesis(t *testing.T) { + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + id := "test-topic-kinesis" + want := TopicConfig{ + IngestionDataSourceSettings: &IngestionDataSourceSettings{ + Source: &IngestionDataSourceAWSKinesis{ + StreamARN: "fake-stream-arn", + ConsumerARN: "fake-consumer-arn", + AWSRoleARN: "fake-aws-role-arn", + GCPServiceAccount: "fake-gcp-sa", + }, + }, + } + + topic := mustCreateTopicWithConfig(t, c, id, &want) + got, err := topic.Config(context.Background()) + if err != nil { + t.Fatalf("error getting topic config: %v", err) + } + want.State = TopicStateActive + opt := cmpopts.IgnoreUnexported(TopicConfig{}) + if !testutil.Equal(got, want, opt) { + t.Errorf("got %v, want %v", got, want) + } + + // Update ingestion settings. + ctx := context.Background() + settings := &IngestionDataSourceSettings{ + Source: &IngestionDataSourceAWSKinesis{ + StreamARN: "fake-stream-arn-2", + ConsumerARN: "fake-consumer-arn-2", + AWSRoleARN: "aws-role-arn-2", + GCPServiceAccount: "gcp-service-account-2", + }, + } + config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings}) + if err != nil { + t.Fatal(err) + } + if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) { + t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings) + } + + // Clear schema settings. + settings = &IngestionDataSourceSettings{} + config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings}) + if err != nil { + t.Fatal(err) + } + if config3.IngestionDataSourceSettings != nil { + t.Errorf("got: %+v, want nil", config3.IngestionDataSourceSettings) + } +} + func TestListTopics(t *testing.T) { ctx := context.Background() c, srv := newFake(t)