Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(publisher): add fileserver worker #189

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions publisher/cmd/worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func loadConfig(configFile string) (config.Worker, error) {
return config, nil
}

func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, error) {
func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) {
config, err := loadConfig(configFile)
if err != nil {
return nil, nil, err
log.Fatal().Err(err).Msg("load config failed")
}

// Configure Redis client.
Expand All @@ -58,7 +58,7 @@ func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, er
Logger: kafka.LoggerFunc(log.Printf),
})

return kafkaReader, worker, nil
return kafkaReader, worker
}

func initFsWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) {
Expand Down
2 changes: 1 addition & 1 deletion publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func main() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

tiupPublishRequestKafkaReader, tiupWorker := initFsWorkerFromConfig(*tiupConfigFile)
tiupPublishRequestKafkaReader, tiupWorker := initTiupWorkerFromConfig(*tiupConfigFile)
fsPublishRequestKafkaReader, fsWorker := initFsWorkerFromConfig(*fsConfigFile)

// Create channel used by both the signal handler and server goroutines
Expand Down
4 changes: 1 addition & 3 deletions publisher/example/config/service-example.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

event_source: http://publisher.site/
kafka:
brokers:
- "kafka-broker-1:9092"
Expand All @@ -12,7 +12,5 @@ kafka:
redis:
addr: "redis-server:6379"
db: 0
username: "redis_user"
password: "redis_password"

event_source: http://publisher.site/
18 changes: 18 additions & 0 deletions publisher/example/config/worker-example.fileserver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
kafka:
brokers:
- example-bootstrap.kafka:9092
topic: example-topic
consumer_group: example-group-fs

redis:
addr: "redis-server:6379"
db: 0
password: "redis_password"

options:
lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here.
s3.endpoint: <endpoint>
s3.region: BEIJING
s3.bucket_name: <bucket-name>
s3.access_key: <access-key>
s3.secret_key: <secret-key>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ kafka:
- example-bootstrap.kafka:9092
topic: example-topic
consumer_group: example-group

redis:
addr: "redis-server:6379"
db: 0
password: "redis_password"

options:
mirror_url: http://tiup.mirror.site
lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here.
nightly_interval: 1h
nightly_interval: 1h
67 changes: 50 additions & 17 deletions publisher/pkg/impl/fileserver_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/go-redis/redis/v8"
"github.com/ks3sdklib/aws-sdk-go/aws"
"github.com/ks3sdklib/aws-sdk-go/aws/awsutil"
"github.com/ks3sdklib/aws-sdk-go/aws/credentials"
"github.com/ks3sdklib/aws-sdk-go/service/s3"
"github.com/rs/zerolog"
)

const defaultMaxKS3Retries = 3

type fsWorker struct {
logger zerolog.Logger
redisClient redis.Cmdable
Expand All @@ -41,14 +42,21 @@ func NewFsWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options map[
handler.options.LarkWebhookURL = options["lark_webhook_url"]
handler.options.S3.BucketName = options["s3.bucket_name"]

cre := credentials.NewStaticCredentials(options["s3_access_key_id"],
options["s3_secret_access_key"],
options["s3_session_token"])
cre := credentials.NewStaticCredentials(
options["s3.access_key"],
options["s3.secret_key"],
options["s3.session_token"])
handler.s3Client = s3.New(&aws.Config{
Credentials: cre,
Region: options["s3.region"],
Endpoint: options["s3.endpoint"],
Region: options["s3.region"], // Ref: https://docs.ksyun.com/documents/6761
Endpoint: options["s3.endpoint"], // Ref: https://docs.ksyun.com/documents/6761
MaxRetries: defaultMaxKS3Retries,
Logger: logger,
})
// Enable KS3 log when debug mode is enabled.
if logger.GetLevel() <= zerolog.DebugLevel {
handler.s3Client.Config.LogLevel = aws.LogOn
}

return &handler, nil
}
Expand Down Expand Up @@ -135,20 +143,45 @@ func (p *fsWorker) notifyLark(publishInfo *PublishInfo, err error) {
}

func (p *fsWorker) publish(content io.ReadSeeker, info *PublishInfo) error {
targetPath := targetFsFullPaths(info)
// upload file to the KingSoft cloud object bucket with the target path as key.

key := targetPath[0]
resp, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.options.S3.BucketName), // 存储空间名称,必填
Key: aws.String(key), // 对象的key,必填
Body: content, // 要上传的文件,必填
ACL: aws.String("public-read"), // 对象的访问权限,非必填
keys := targetFsFullPaths(info)
if len(keys) == 0 {
return nil
}

bucketName := p.options.S3.BucketName

// upload the artifact files to KS3 bucket.
for _, key := range keys {
_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: content,
})
if err != nil {
p.logger.
Err(err).
Str("bucket", bucketName).
Str("key", key).
Msg("failed to upload file to KS3 bucket.")
return err
}
}

// update git ref sha: download/refs/pingcap/<comp>/<branch>/sha1
refKV := targetFsRefKeyValue(info)
_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(refKV[0]),
Body: bytes.NewReader([]byte(refKV[1])),
})
if err != nil {
p.logger.
Err(err).
Str("bucket", bucketName).
Str("key", refKV[0]).
Msg("failed to upload content in KS3 bucket.")
return err
}
fmt.Println(awsutil.StringValue(resp))
return nil

return nil
}
18 changes: 16 additions & 2 deletions publisher/pkg/impl/funcs_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,23 @@ func targetFsFullPaths(p *PublishInfo) []string {
var ret []string

// the <branch>/<commit> path: pingcap/<comp>/<branch>/<commit>/<entrypoint>
ret = append(ret, filepath.Join("pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint))
ret = append(ret, filepath.Join("download/builds/pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint))
// the <branch>/<commit> path: pingcap/<comp>/<commit>/<entrypoint>
ret = append(ret, filepath.Join("pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint))
ret = append(ret, filepath.Join("download/builds/pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint))

return ret
}

func targetFsRefKeyValue(p *PublishInfo) [2]string {
var ret [2]string
verParts := strings.Split(p.Version, "#")
if len(verParts) > 1 {
ret[0] = fmt.Sprintf("download/refs/pingcap/%s/%s/sha1", p.Name, verParts[0])
ret[1] = verParts[1]
} else {
ret[0] = fmt.Sprintf("download/refs/pingcap/%s/%s/sha1", p.Name, "master")
ret[1] = verParts[0]
}

return ret
}