Skip to content

Commit

Permalink
feat(worker/activitypub): implement mastodon worker to transform AP o…
Browse files Browse the repository at this point in the history
…bjects (#431)

fix: include federated schema in decentralzied schema

fix: replace federated schema paths with decentralized

fix: fix lint errors

fix: remove Type field from config file

fix: set and apply the defaultStartTime value pulled from VSL

feat: initialize ActivityPub network client for monitoring service

fix: resolve the issue of missing module params in func fetchWorkerInfo

fix: fix lint error

fix: add nil check case to debug

fix: fix the typo in extracting mastodon endpoint

fix: add missing ActivityPub network souce cases

feat: support the processing of Worker Status for independent federated workers

fix: debug for missing mastodon worker status

fix: initialize mastodon worker status successfully

fix: add `index count` to mastodon worker status

fix: combine federated handles functions into one

feat(deploy): add mastodon worker template for deployment (#554)
  • Loading branch information
FrankLi123 authored and kallydev committed Oct 12, 2024
1 parent 9635a1a commit 6a0b906
Show file tree
Hide file tree
Showing 25 changed files with 196 additions and 422 deletions.
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type Component struct {
type Module struct {
ID string `mapstructure:"id"`
Network network.Network `mapstructure:"network" validate:"required"`
Type string `mapstructure:"type"`
EndpointID string `mapstructure:"endpoint"`
IPFSGateways []string `mapstructure:"ipfs_gateways"`
Worker worker.Worker `mapstructure:"worker"`
Expand Down
8 changes: 4 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ component:
worker: mastodon
endpoint: mastodon
parameters:
mastodon_kafka_topic: activitypub_events
kafka_topic: activitypub_events
decentralized:
- network: ethereum
worker: core
Expand Down Expand Up @@ -171,7 +171,7 @@ component:
"worker": "mastodon",
"endpoint": "mastodon",
"parameters": {
"mastodon_kafka_topic": "activitypub_events"
"kafka_topic": "activitypub_events"
}
}
],
Expand Down Expand Up @@ -264,7 +264,7 @@ worker = "mastodon"
endpoint = "mastodon"
[component.federated.parameters]
mastodon_kafka_topic = "activitypub_events"
kafka_topic = "activitypub_events"
[[component.decentralized]]
network = "ethereum"
Expand Down Expand Up @@ -343,7 +343,7 @@ var configFileExpected = &File{
},
},
Parameters: &Parameters{
"mastodon_kafka_topic": "activitypub_events",
"kafka_topic": "activitypub_events",
},
},
},
Expand Down
10 changes: 10 additions & 0 deletions deploy/min/config.min.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ redis:
password:

endpoints:
mastodon:
url: your-kafka-ip:9092
farcaster:
url: https://nemes.farcaster.xyz:2281
ethereum:
Expand Down Expand Up @@ -91,3 +93,11 @@ component:
worker: momoka
endpoint: polygon
parameters:
federated:
# mastodon
- id: mastodon-core
network: mastodon
worker: mastodon
endpoint: mastodon
parameters:
kafka_topic: activitypub_events
12 changes: 12 additions & 0 deletions deploy/min/docker-compose.min.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ services:
- rss3_node_redis
entrypoint: [ "./node", "--worker.id=arweave-momoka" ]

mastodon-core:
restart: always
image: ghcr.io/rss3-network/node:v1.0.0
networks:
- default
volumes:
- ./config.min.yaml:/etc/rss3/node/config.yaml
depends_on:
- rss3_node_database
- rss3_node_redis
entrypoint: [ "./node", "--worker.id=mastodon-core" ]

volumes:
postgres:
redis:
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/source/activitypub/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Option represents the configuration options for the ActivityPub client.
type Option struct {
KafkaTopic string `json:"mastodon_kafka_topic"`
KafkaTopic string `json:"kafka_topic"`
TimestampStart int64 `json:"timestamp_start" mapstructure:"timestamp_start"`
}

Expand Down
3 changes: 1 addition & 2 deletions internal/node/component/federated/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File,
group.GET("/network/:network", c.GetNetworkActivities)
group.GET("/platform/:platform", c.GetPlatformActivities)
group.POST("/accounts", c.BatchGetAccountsActivities)
group.GET("/handles", c.GetAllActiveHandles)
group.GET("/handles/updated", c.GetUpdatedHandles)
group.GET("/handles", c.GetHandles)

if err := c.InitMeter(); err != nil {
panic(err)
Expand Down
26 changes: 13 additions & 13 deletions internal/node/component/federated/handler_handles.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
package federated

import (
"errors"
"net/http"
"strconv"

"github.com/labstack/echo/v4"
"github.com/rss3-network/node/common/http/response"
)

// GetAllActiveHandles retrieves all active handles
func (c *Component) GetAllActiveHandles(ctx echo.Context) error {
handles, err := c.getAllHandles(ctx.Request().Context())
if err != nil {
return response.InternalError(ctx)
// GetHandles retrieves all active handles or updated handles based on the 'since' parameter
func (c *Component) GetHandles(ctx echo.Context) error {
sinceStr := ctx.QueryParam("since")
if sinceStr == "" {
return response.BadRequestError(ctx, errors.New("'since' parameter is required"))
}

return ctx.JSON(http.StatusOK, map[string]interface{}{"handles": handles})
}

// GetUpdatedHandles retrieves handles updated since a given timestamp
func (c *Component) GetUpdatedHandles(ctx echo.Context) error {
sinceStr := ctx.QueryParam("since")
since, err := strconv.ParseUint(sinceStr, 10, 64)

if err != nil {
return response.BadRequestError(ctx, err)
}

handles, err := c.getUpdatedHandles(ctx.Request().Context(), since)
var handles []string
if since == 0 {
handles, err = c.getAllHandles(ctx.Request().Context())
} else {
handles, err = c.getUpdatedHandles(ctx.Request().Context(), since)
}

if err != nil {
return response.InternalError(ctx)
}
Expand Down
6 changes: 6 additions & 0 deletions internal/node/component/info/handler_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func TestCalculateMinimumResources(t *testing.T) {
worker: rss.RSSHub,
expected: baseResource,
},
{
name: "ActivityPub - Mastodon",
network: network.Mastodon,
worker: decentralized.Mastodon,
expected: baseResource.Mul(2),
},
}

for _, tc := range tests {
Expand Down
41 changes: 37 additions & 4 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/rss3-network/protocol-go/schema/network"
"github.com/rss3-network/protocol-go/schema/tag"
"github.com/samber/lo"
"go.uber.org/zap"
)

type WorkerResponse struct {
Expand Down Expand Up @@ -47,13 +48,14 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error {

var response *WorkerResponse

if c.redisClient != nil {
switch {
case c.redisClient != nil:
// Fetch all worker info concurrently.
c.fetchAllWorkerInfo(ctx, workerInfoChan)

// Build the worker response.
response = c.buildWorkerResponse(workerInfoChan)
} else if c.config.Component.RSS != nil {
case c.config.Component.RSS != nil:
m := c.config.Component.RSS

response = &WorkerResponse{
Expand All @@ -67,6 +69,26 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error {
Status: worker.StatusReady},
},
}
case c.config.Component.Federated != nil:
federatedComponent := c.config.Component.Federated[0]
switch federatedComponent.Worker {
case decentralized.Mastodon:
response = &WorkerResponse{
Data: ComponentInfo{
RSS: &WorkerInfo{
WorkerID: federatedComponent.ID,
Network: federatedComponent.Network,
Worker: federatedComponent.Worker,
Tags: decentralized.ToTagsMap[decentralized.Mastodon],
Platform: decentralized.PlatformMastodon,
Status: worker.StatusReady},
},
}
default:
return nil
}
default:
return nil
}

return ctx.JSON(http.StatusOK, response)
Expand Down Expand Up @@ -114,8 +136,9 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work
response.Data.RSS = workerInfo
case network.EthereumSource, network.FarcasterSource, network.ArweaveSource, network.NearSource:
response.Data.Decentralized = append(response.Data.Decentralized, workerInfo)
default:
case network.ActivityPubSource:
response.Data.Federated = append(response.Data.Federated, workerInfo)
default:
}
}

Expand All @@ -124,6 +147,15 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work

// fetchWorkerInfo fetches the worker info with the different network source.
func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) *WorkerInfo {
if module == nil {
zap.L().Info("params module is nil in fetchWorkerInfo")

return &WorkerInfo{
WorkerID: "",
Status: worker.StatusUnknown,
}
}

// Fetch status and progress from a specific worker by id.
status, workerProgress := c.getWorkerStatusAndProgressByID(ctx, module.ID)

Expand All @@ -140,7 +172,7 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module)
}

switch module.Network.Source() {
case network.EthereumSource, network.ArweaveSource, network.FarcasterSource, network.NearSource:
case network.ActivityPubSource, network.EthereumSource, network.ArweaveSource, network.FarcasterSource, network.NearSource:
workerInfo.Platform = decentralized.ToPlatformMap[module.Worker.(decentralized.Worker)]
workerInfo.Tags = decentralized.ToTagsMap[module.Worker.(decentralized.Worker)]

Expand All @@ -156,6 +188,7 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module)
default:
}
}

case network.RSSSource:
workerInfo.Tags = rss.ToTagsMap[module.Worker.(rss.Worker)]
}
Expand Down
2 changes: 1 addition & 1 deletion internal/node/component/info/network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Parameters struct {
BlockReceiptBatchSize *ConfigDetail `json:"block_receipts_batch_size,omitempty"`
APIKey *ConfigDetail `json:"api_key,omitempty"`
Authentication *Authentication `json:"authentication,omitempty"`
MastodonKafkaTopic *ConfigDetail `json:"mastodon_kafka_topic,omitempty"`
KafkaTopic *ConfigDetail `json:"kafka_topic,omitempty"`
}

type workerConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/node/component/info/network_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (

// defines the demand level of a network
var (
highDemandNetworks = []network.Network{network.Ethereum, network.Polygon, network.Arbitrum, network.Base, network.Gnosis, network.BinanceSmartChain, network.Optimism, network.Arweave, network.Farcaster, network.Near}
highDemandNetworks = []network.Network{network.Ethereum, network.Polygon, network.Arbitrum, network.Base, network.Gnosis, network.BinanceSmartChain, network.Optimism, network.Arweave, network.Farcaster, network.Near, network.Mastodon}
highDemandWorkers = []worker.Worker{decentralized.Core, decentralized.Momoka}
midDemandWorkers = []worker.Worker{decentralized.Uniswap, decentralized.OpenSea, decentralized.Stargate, decentralized.Curve}
)
Expand Down
2 changes: 1 addition & 1 deletion internal/node/indexer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func NewServer(ctx context.Context, config *config.Module, databaseClient databa

switch config.Network.Source() {
case network.ActivityPubSource:
instance.monitorClient, err = monitor.NewActivityPubClient(config.EndpointID, config.Parameters)
instance.monitorClient, err = monitor.NewActivityPubClient(config.Endpoint, config.Parameters, config.Worker)
if err != nil {
return nil, fmt.Errorf("error occurred in creating new activitypub monitorClient: %w", err)
}
Expand Down
59 changes: 41 additions & 18 deletions internal/node/monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/rss3-network/node/provider/farcaster"
"github.com/rss3-network/node/provider/httpx"
"github.com/rss3-network/node/provider/near"
"github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/node/schema/worker/decentralized"
)

type Client interface {
Expand Down Expand Up @@ -278,38 +280,59 @@ func (c *activitypubClient) TargetState(_ *config.Parameters) (uint64, uint64) {

// LatestState returns the latest state of the Kafka consuming process
func (c *activitypubClient) LatestState(ctx context.Context) (uint64, uint64, error) {
// Poll the Kafka consumer to verify its working state
fetches := c.activitypubClient.GetKafkaConsumer().PollFetches(ctx)
consumer := c.activitypubClient.GetKafkaConsumer()
// Create a very short timeout for the poll operation
pollCtx, cancel := context.WithTimeout(ctx, 1000*time.Millisecond)
defer cancel()

// Use PollFetches with the short timeout
fetches := consumer.PollFetches(pollCtx)

// Check if the poll operation timed out
if pollCtx.Err() == context.DeadlineExceeded {
return 0, 0, fmt.Errorf("poll operation timed out, possible consumer issue")
}

if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
fmt.Printf("consumer poll fetch error: %v\n", e.Err)
}

return 0, 0, fmt.Errorf("consumer poll fetch error: %v", fetches.Errors())
}
// If no errors, assume the service is healthy
// The service is healthy
return 0, 0, nil
}

// NewActivityPubClient returns a new ActivityPub client.
func NewActivityPubClient(endpoint string, param *config.Parameters) (Client, error) {
base, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("parse ActivityPub endpoint: %w", err)
func NewActivityPubClient(endpoint config.Endpoint, param *config.Parameters, worker worker.Worker) (Client, error) {
var kafkaTopic string

if param != nil {
if topic, ok := (*param)[mastodon.KafkaTopic]; ok {
kafkaTopic = topic.(string)
} else {
return nil, fmt.Errorf("kafka_topic not found in parameters")
}
} else {
return nil, fmt.Errorf("parameters are nil")
}

// Retrieve kafkaTopic from the parameters
kafkaTopic := (*param)["mastodon_kafka_topic"].(string)
// Retrieve worker type from the parameters
workerType := worker.Name()

base.Path = path.Join(base.Path, kafkaTopic)
switch workerType {
case decentralized.Mastodon.String():
mastodonClient, err := mastodon.NewClient(endpoint.URL, kafkaTopic)

// Create a new activitypub(mastodon) client
mastodonClient, err := mastodon.NewClient(endpoint, kafkaTopic)
if err != nil {
return nil, fmt.Errorf("create ActivityPub client: %w", err)
}
if err != nil {
return nil, fmt.Errorf("create Mastodon client: %w", err)
}

return &activitypubClient{
activitypubClient: mastodonClient,
}, nil
return &activitypubClient{
activitypubClient: mastodonClient,
}, nil
default:
return nil, fmt.Errorf("unsupported worker type: %s", workerType)
}
}
Loading

0 comments on commit 6a0b906

Please sign in to comment.