Skip to content

Commit

Permalink
feat(pubsub): support kinesis ingestion admin (#9458)
Browse files Browse the repository at this point in the history
* feat(pubsub): support kinesis ingestion admin

* add comments to kinesis struct fields

* reword comments

* capitalize Kinesis in comments
  • Loading branch information
hongalex authored Mar 7, 2024
1 parent 5ca0271 commit 9bba269
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 10 deletions.
13 changes: 13 additions & 0 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)
if err := checkTopicMessageRetention(t.MessageRetentionDuration); err != nil {
return nil, err
}
// Take any ingestion setting to mean the topic is active.
if t.IngestionDataSourceSettings != nil {
t.State = pb.Topic_ACTIVE
}
top := newTopic(t)
s.topics[t.Name] = top
return top.proto, nil
Expand Down Expand Up @@ -384,6 +388,15 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
t.proto.SchemaSettings = &pb.SchemaSettings{}
}
t.proto.SchemaSettings.LastRevisionId = req.Topic.SchemaSettings.LastRevisionId
case "ingestion_data_source_settings":
if t.proto.IngestionDataSourceSettings == nil {
t.proto.IngestionDataSourceSettings = &pb.IngestionDataSourceSettings{}
}
t.proto.IngestionDataSourceSettings = req.Topic.IngestionDataSourceSettings
// Take any ingestion setting to mean the topic is active.
if t.proto.IngestionDataSourceSettings != nil {
t.proto.State = pb.Topic_ACTIVE
}
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
Expand Down
173 changes: 163 additions & 10 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,23 @@ func newTopic(c *Client, name string) *Topic {
}
}

// TopicState denotes the possible states for a topic.
type TopicState int

const (
// TopicStateUnspecified is the default value. This value is unused.
TopicStateUnspecified = iota

// TopicStateActive means the topic does not have any persistent errors.
TopicStateActive

// TopicStateIngestionResourceError means ingestion from the data source
// has encountered a permanent error.
// See the more detailed error state in the corresponding ingestion
// source configuration.
TopicStateIngestionResourceError
)

// TopicConfig describes the configuration of a topic.
type TopicConfig struct {
// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
Expand Down Expand Up @@ -232,6 +249,13 @@ type TopicConfig struct {
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
RetentionDuration optional.Duration

// State is an output-only field indicating the state of the topic.
State TopicState

// IngestionDataSourceSettings are settings for ingestion from a
// data source into this topic.
IngestionDataSourceSettings *IngestionDataSourceSettings
}

// String returns the printable globally unique name for the topic config.
Expand Down Expand Up @@ -260,11 +284,12 @@ func (tc *TopicConfig) toProto() *pb.Topic {
retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
}
pbt := &pb.Topic{
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
MessageRetentionDuration: retDur,
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
MessageRetentionDuration: retDur,
IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(),
}
return pbt
}
Expand Down Expand Up @@ -296,15 +321,23 @@ type TopicConfigToUpdate struct {
//
// Use the zero value &SchemaSettings{} to remove the schema from the topic.
SchemaSettings *SchemaSettings

// IngestionDataSourceSettings are settings for ingestion from a
// data source into this topic.
//
// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
IngestionDataSourceSettings *IngestionDataSourceSettings
}

func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
tc := TopicConfig{
name: pbt.Name,
Labels: pbt.Labels,
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
KMSKeyName: pbt.KmsKeyName,
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
name: pbt.Name,
Labels: pbt.Labels,
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
KMSKeyName: pbt.KmsKeyName,
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
State: TopicState(pbt.State),
IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings),
}
if pbt.GetMessageRetentionDuration() != nil {
tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
Expand Down Expand Up @@ -364,6 +397,122 @@ func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePo
return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
}

// IngestionDataSourceSettings enables ingestion from a data source into this topic.
type IngestionDataSourceSettings struct {
Source IngestionDataSource
}

// IngestionDataSource is the kind of ingestion source to be used.
type IngestionDataSource interface {
isIngestionDataSource() bool
}

// AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams.
type AWSKinesisState int

const (
// AWSKinesisStateUnspecified is the default value. This value is unused.
AWSKinesisStateUnspecified = iota

// AWSKinesisStateActive means ingestion is active.
AWSKinesisStateActive

// AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis.
// This can happen if:
// - The provided `aws_role_arn` does not exist or does not have the
// appropriate permissions attached.
// - The provided `aws_role_arn` is not set up properly for Identity
// Federation using `gcp_service_account`.
// - The Pub/Sub SA is not granted the
// `iam.serviceAccounts.getOpenIdToken` permission on
// `gcp_service_account`.
AWSKinesisStatePermissionDenied

// AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic.
// This can happen due to Pub/Sub SA has not been granted the appropriate publish
// permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher
AWSKinesisStatePublishPermissionDenied

// AWSKinesisStateStreamNotFound means the Kinesis stream does not exist.
AWSKinesisStateStreamNotFound

// AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist.
AWSKinesisStateConsumerNotFound
)

// IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams.
type IngestionDataSourceAWSKinesis struct {
// State is an output-only field indicating the state of the kinesis connection.
State AWSKinesisState

// StreamARN is the Kinesis stream ARN to ingest data from.
StreamARN string

// ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced
// Fan-Out mode. The consumer must be already created and ready to be used.
ConsumerARN string

// AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication
// with Kinesis. Check the Pub/Sub docs for how to set up this role and the
// required permissions that need to be attached to it.
AWSRoleARN string

// GCPServiceAccount is the GCP service account to be used for Federated Identity
// authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for
// the provided role). The `aws_role_arn` must be set up with
// `accounts.google.com:sub` equals to this service account number.
GCPServiceAccount string
}

var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil)

func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
return true
}

func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
if pbs == nil {
return nil
}

s := &IngestionDataSourceSettings{}
if k := pbs.GetAwsKinesis(); k != nil {
s.Source = &IngestionDataSourceAWSKinesis{
State: AWSKinesisState(k.State),
StreamARN: k.GetStreamArn(),
ConsumerARN: k.GetConsumerArn(),
AWSRoleARN: k.GetAwsRoleArn(),
GCPServiceAccount: k.GetGcpServiceAccount(),
}
}
return s
}

func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings {
if i == nil {
return nil
}
// An empty/zero-valued config is treated the same as nil and clearing this setting.
if (IngestionDataSourceSettings{}) == *i {
return nil
}
pbs := &pb.IngestionDataSourceSettings{}
if out := i.Source; out != nil {
if k, ok := out.(*IngestionDataSourceAWSKinesis); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{
AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{
State: pb.IngestionDataSourceSettings_AwsKinesis_State(k.State),
StreamArn: k.StreamARN,
ConsumerArn: k.ConsumerARN,
AwsRoleArn: k.AWSRoleARN,
GcpServiceAccount: k.GCPServiceAccount,
},
}
}
}
return pbs
}

// Config returns the TopicConfig for the topic.
func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
Expand Down Expand Up @@ -437,6 +586,10 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
pt.SchemaSettings = nil
}
}
if cfg.IngestionDataSourceSettings != nil {
pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto()
paths = append(paths, "ingestion_data_source_settings")
}
return &pb.UpdateTopicRequest{
Topic: pt,
UpdateMask: &fmpb.FieldMask{Paths: paths},
Expand Down
57 changes: 57 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,63 @@ func TestCreateTopicWithConfig(t *testing.T) {
}
}

func TestTopic_IngestionKinesis(t *testing.T) {
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

id := "test-topic-kinesis"
want := TopicConfig{
IngestionDataSourceSettings: &IngestionDataSourceSettings{
Source: &IngestionDataSourceAWSKinesis{
StreamARN: "fake-stream-arn",
ConsumerARN: "fake-consumer-arn",
AWSRoleARN: "fake-aws-role-arn",
GCPServiceAccount: "fake-gcp-sa",
},
},
}

topic := mustCreateTopicWithConfig(t, c, id, &want)
got, err := topic.Config(context.Background())
if err != nil {
t.Fatalf("error getting topic config: %v", err)
}
want.State = TopicStateActive
opt := cmpopts.IgnoreUnexported(TopicConfig{})
if !testutil.Equal(got, want, opt) {
t.Errorf("got %v, want %v", got, want)
}

// Update ingestion settings.
ctx := context.Background()
settings := &IngestionDataSourceSettings{
Source: &IngestionDataSourceAWSKinesis{
StreamARN: "fake-stream-arn-2",
ConsumerARN: "fake-consumer-arn-2",
AWSRoleARN: "aws-role-arn-2",
GCPServiceAccount: "gcp-service-account-2",
},
}
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) {
t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings)
}

// Clear schema settings.
settings = &IngestionDataSourceSettings{}
config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
t.Fatal(err)
}
if config3.IngestionDataSourceSettings != nil {
t.Errorf("got: %+v, want nil", config3.IngestionDataSourceSettings)
}
}

func TestListTopics(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
Expand Down

0 comments on commit 9bba269

Please sign in to comment.