Skip to content

Commit

Permalink
feat: merge mastodon worker logics to mastodon core (#580)
Browse files Browse the repository at this point in the history
  • Loading branch information
pseudoyu authored Oct 16, 2024
1 parent 47c389c commit ed9a138
Show file tree
Hide file tree
Showing 14 changed files with 33 additions and 36 deletions.
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/rss3-network/node/internal/stream"
"github.com/rss3-network/node/provider/ethereum"
"github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/node/schema/worker/federated"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
"github.com/spf13/viper"
Expand Down Expand Up @@ -254,6 +255,13 @@ func _Setup(configName, configType string, v *viper.Viper) (*File, error) {
return nil, fmt.Errorf("unmarshal config file: %w", err)
}

// Add extra logic to convert federated worker string to correct worker type.
for _, module := range configFile.Component.Federated {
if federatedWorker := federated.GetValueByWorkerStr(module.Worker.Name()); federatedWorker != 0 {
module.Worker = federatedWorker
}
}

// Use a function to load the endpoint for each module, because mapstructure doesn't support the use of custom unmarshaler.
// Reference https://github.com/mitchellh/mapstructure/issues/115.
if err := configFile.LoadModulesEndpoint(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ component:
access_code: def
federated:
network: mastodon
worker: mastodon
worker: core
endpoint: mastodon
parameters:
kafka_topic: activitypub_events
Expand Down Expand Up @@ -169,7 +169,7 @@ component:
"federated": [
{
"network": "mastodon",
"worker": "mastodon",
"worker": "core",
"endpoint": "mastodon",
"parameters": {
"kafka_topic": "activitypub_events"
Expand Down Expand Up @@ -261,7 +261,7 @@ access_code = "def"
[[component.federated]]
network = "mastodon"
worker = "mastodon"
worker = "core"
endpoint = "mastodon"
[component.federated.parameters]
Expand Down Expand Up @@ -335,7 +335,7 @@ var configFileExpected = &File{
Federated: []*Module{
{
Network: network.Mastodon,
Worker: federated.Mastodon,
Worker: federated.Core,
EndpointID: "mastodon",
Endpoint: Endpoint{
URL: "https://0.0.0.0:9092/",
Expand Down
6 changes: 1 addition & 5 deletions internal/engine/worker/decentralized/core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,17 @@ import (

"github.com/redis/rueidis"
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/arweave"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/ethereum"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/farcaster"
"github.com/rss3-network/node/internal/engine/worker/decentralized/core/near"
"github.com/rss3-network/node/internal/engine/worker/federated/activitypub/mastodon"
"github.com/rss3-network/protocol-go/schema/network"
)

// NewWorker creates a new core worker.
func NewWorker(config *config.Module, databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
func NewWorker(config *config.Module, redisClient rueidis.Client) (engine.Worker, error) {
switch config.Network.Source() {
case network.ActivityPubSource:
return mastodon.NewWorker(databaseClient, redisClient)
case network.EthereumSource:
return ethereum.NewWorker(config, redisClient)
case network.ArweaveSource:
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/worker/decentralized/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (

func New(config *config.Module, databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
if config.Worker == decentralized.Core {
return core.NewWorker(config, databaseClient, redisClient)
return core.NewWorker(config, redisClient)
}

return newNonCoreWorker(config, databaseClient, redisClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type worker struct {
}

func (w *worker) Name() string {
return federated.Mastodon.String()
return federated.Core.String()
}

func (w *worker) Platform() string {
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/worker/federated/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/internal/engine/worker/federated/activitypub/mastodon"
"github.com/rss3-network/node/internal/engine/worker/federated/core/mastodon"
"github.com/rss3-network/node/schema/worker/federated"
)

func New(config *config.Module, databaseClient database.Client, redisClient rueidis.Client) (engine.Worker, error) {
switch config.Worker {
case federated.Mastodon:
case federated.Core:
return mastodon.NewWorker(databaseClient, redisClient)
default:
return nil, fmt.Errorf("[federated/factory.go] unsupported worker %s", config.Worker)
Expand Down
7 changes: 0 additions & 7 deletions internal/node/component/info/handler_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/rss3-network/node/config/parameter"
"github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/rss3-network/node/schema/worker/federated"
"github.com/rss3-network/node/schema/worker/rss"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -117,12 +116,6 @@ func TestCalculateMinimumResources(t *testing.T) {
worker: rss.RSSHub,
expected: baseResource,
},
{
name: "ActivityPub - Mastodon",
network: network.Mastodon,
worker: federated.Mastodon,
expected: baseResource.Mul(2),
},
}

for _, tc := range tests {
Expand Down
4 changes: 2 additions & 2 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error {
case c.config.Component.Federated != nil:
f := c.config.Component.Federated[0]
switch f.Worker {
case federated.Mastodon:
case federated.Core:
response = &WorkerResponse{
Data: ComponentInfo{
RSS: &WorkerInfo{
WorkerID: f.ID,
Network: f.Network,
Worker: f.Worker,
Tags: federated.ToTagsMap[federated.Mastodon],
Tags: federated.ToTagsMap[federated.Core],
Platform: rss.ToPlatformMap[f.Worker.(rss.Worker)].String(),
Status: worker.StatusReady},
},
Expand Down
4 changes: 2 additions & 2 deletions internal/node/component/info/network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ var NetworkToWorkersMap = map[network.Network][]worker.Worker{
decentralized.Zerion,
},
network.Mastodon: {
federated.Mastodon,
federated.Core,
},
network.Near: {
decentralized.Core,
Expand Down Expand Up @@ -448,7 +448,7 @@ var NetworkToWorkersMap = map[network.Network][]worker.Worker{
// WorkerToConfigMap is a map of worker to config.
var WorkerToConfigMap = map[network.Source]map[worker.Worker]workerConfig{
network.ActivityPubSource: {
federated.Mastodon: customWorkerConfig(federated.Mastodon, network.ActivityPubSource, &Parameters{
federated.Core: customWorkerConfig(federated.Core, network.ActivityPubSource, &Parameters{
KafkaTopic: &ConfigDetail{
IsRequired: true,
Type: StringType,
Expand Down
2 changes: 1 addition & 1 deletion internal/node/monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func NewActivityPubClient(endpoint config.Endpoint, param *config.Parameters, wo
workerType := worker.Name()

switch workerType {
case federated.Mastodon.String():
case federated.Core.String():
mastodonClient, err := mastodon.NewClient(endpoint.URL, kafkaTopic, nil)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion schema/worker/federated/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ func (p *Platform) UnmarshalParam(param string) error {

// ToPlatformMap is a map of worker to platform
var ToPlatformMap = map[Worker]Platform{
Mastodon: PlatformMastodon,
Core: PlatformMastodon,
}
4 changes: 2 additions & 2 deletions schema/worker/federated/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type Worker int

const (
Mastodon Worker = iota + 1 // mastodon
Core Worker = iota + 1 // core
)

func (w Worker) Component() string {
Expand Down Expand Up @@ -39,5 +39,5 @@ func GetValueByWorkerStr(workerStr string) Worker {

// ToTagsMap is a map of worker to tags
var ToTagsMap = map[Worker][]tag.Tag{
Mastodon: {tag.Social},
Core: {tag.Social},
}
16 changes: 8 additions & 8 deletions schema/worker/federated/worker_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ed9a138

Please sign in to comment.