Skip to content

Commit

Permalink
feat: updated send message impl
Browse files Browse the repository at this point in the history
  • Loading branch information
WajahatAliAbid committed Sep 19, 2024
1 parent 4b0fd6c commit 964a404
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 12 deletions.
63 changes: 51 additions & 12 deletions cmd/sendMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"ZenExtensions/sqs-plus/helper"
"bufio"
"encoding/json"
"fmt"
"io"
"io/fs"
"net/url"
"os"
"path/filepath"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/pterm/pterm"
Expand All @@ -23,37 +25,34 @@ type JsonInfo struct {
FileName string `json:"fileName"`
}

func parseFile(filePath string) (*[]JsonInfo, error) {
results := []JsonInfo{}
func parseFile(filePath string) (*[]map[string]interface{}, error) {
results := []map[string]interface{}{}
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
byteValue, _ := io.ReadAll(file)
file.Close()
var result map[string]interface{}
err = json.Unmarshal(byteValue, &result)
if err == nil {
results = append(results, JsonInfo{
Message: result,
FileName: filePath,
})
results = append(results, result)
return &results, nil
}

file, _ = os.OpenFile(filePath, os.O_RDONLY, os.ModePerm)

scanner := bufio.NewScanner(file)
// optionally, resize scanner's capacity for lines over 64K, see next example
for scanner.Scan() {

var result map[string]interface{}
err = json.Unmarshal(scanner.Bytes(), &result)
if err != nil {
return nil, err
}

results = append(results, JsonInfo{
Message: result,
FileName: filePath,
})
results = append(results, result)
}

if err := scanner.Err(); err != nil {
Expand All @@ -77,6 +76,14 @@ func Find(root, ext string) ([]string, error) {
return a, nil
}

func chunkBy[T any](items []T, chunkSize int) [][]T {
var _chunks = make([][]T, 0, (len(items)/chunkSize)+1)
for chunkSize < len(items) {
items, _chunks = items[chunkSize:], append(_chunks, items[0:chunkSize:chunkSize])
}
return append(_chunks, items)
}

func RunSendMessages(cmd *cobra.Command, _ []string) {
config := cmd.Context().Value(AwsConfig{}).(aws.Config)
logger := pterm.DefaultBasicText
Expand All @@ -94,7 +101,7 @@ func RunSendMessages(cmd *cobra.Command, _ []string) {
os.Exit(1)
}

jsons := []JsonInfo{}
jsons := []map[string]interface{}{}

if fileName != "" {

Expand Down Expand Up @@ -147,6 +154,7 @@ func RunSendMessages(cmd *cobra.Command, _ []string) {
jsons = append(jsons, *infos...)
}
}
logger.Printfln("length: %d", len(jsons))

client := helper.New(&config)
queueName := cmd.Flag("queue-name").Value.String()
Expand All @@ -165,6 +173,37 @@ func RunSendMessages(cmd *cobra.Command, _ []string) {
}

logger.Printfln(queueUrl)
spinner := pterm.DefaultSpinner
spinner.Sequence = []string{
"🙈",
"🙉",
"🙊",
}
spinner.Delay = 300 * time.Millisecond
spinnerPrinter, _ := spinner.Start("Sending messages")
chunks := chunkBy(jsons, 10)

for _, chunk := range chunks {

err := client.SendMessages(
cmd.Context(),
&queueUrl,
&chunk,
)

spinnerPrinter.UpdateText(
fmt.Sprintf(
"Sent %d of %d messages",
len(chunk),
len(jsons),
),
)

if err != nil {
logger.Printfln(pterm.Red(err.Error()))
os.Exit(1)
}
}

}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.30.8 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gookit/color v1.5.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/MarvinJWendt/testza v0.3.0/go.mod h1:eFcL4I0idjtIx8P9C6KkAuLgATNKpX4/
github.com/MarvinJWendt/testza v0.4.2/go.mod h1:mSdhXiKH8sg/gQehJ63bINcCKp7RtYewEjXsvsVUPbE=
github.com/MarvinJWendt/testza v0.5.2 h1:53KDo64C1z/h/d/stCYCPY69bt/OSwjq5KpFNwi+zB4=
github.com/MarvinJWendt/testza v0.5.2/go.mod h1:xu53QFE5sCdjtMCKk8YMQ2MnymimEctc4n3EjyIYvEY=
github.com/andrijadukic/chunkify v0.0.0-20210214123219-30ad72b51655 h1:4uDFC06pbuUu56njbKLG0B9X0RJy/NTjcIGMR6Xx0eY=
github.com/andrijadukic/chunkify v0.0.0-20210214123219-30ad72b51655/go.mod h1:dga3cWNzN52eWVMMNkOpTYiw2x5SSuGEnNuX0H5KyPg=
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g=
github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
Expand Down Expand Up @@ -50,6 +52,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ=
github.com/gookit/color v1.5.0/go.mod h1:43aQb+Zerm/BWh2GnrgOQm7ffz7tvQXEKV6BFMl7wAo=
github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
Expand Down
37 changes: 37 additions & 0 deletions helper/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package helper

import (
"context"
"encoding/hex"
"encoding/json"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
)

type SqsClient struct {
Expand Down Expand Up @@ -137,3 +140,37 @@ func (s *SqsClient) ListQueuesWithAttributes(
) (*[]QueueInfo, error) {
return s.ListQueues(ctx, names, true, true)
}

func uuidv5(data string) string {
bytes := []byte(data)
id := uuid.NewSHA1(uuid.NameSpaceURL, bytes)
return hex.EncodeToString(id[:])
}

func (s *SqsClient) SendMessages(
ctx context.Context,
queueUrl *string,
messages *[]map[string]interface{},
) error {
if len(*messages) == 0 {
return nil
}
entries := []types.SendMessageBatchRequestEntry{}
for _, message := range *messages {
obj, _ := json.Marshal(message)
str_obj := string(obj)
id := uuidv5(str_obj)
entries = append(entries, types.SendMessageBatchRequestEntry{
Id: &id,
MessageBody: &str_obj,
})
}
_, err := s.client.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: queueUrl,
})
if err != nil {
return err
}
return nil
}

0 comments on commit 964a404

Please sign in to comment.