Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker/activitypub): implement mastodon worker to transform AP objects #431

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 83 additions & 35 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/rss3-network/node/schema/worker/federated"
"github.com/rss3-network/node/schema/worker/rss"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
Expand All @@ -33,6 +34,10 @@ endpoints:
url: https://rpc.ankr.com/eth
http_headers:
user-agent: rss3-node
mastodon:
url: https://30.10.000.00:9092/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
url: https://30.10.000.00:9092/
The endpoint should be defined in the configuration file rather than hardcoding them. Sensitive information like this should not appear in the code, as it poses a security risk.

Copy link
Contributor Author

@FrankLi123 FrankLi123 Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The url is a mock url used for the config test. Changed its value to https://0.0.0.0:9092/ for better view.

http_headers:
user-agent: rss3-node
database:
driver: cockroachdb
partition: true
Expand Down Expand Up @@ -67,6 +72,12 @@ component:
password: pass
access_key: abc
access_code: def
federated:
network: mastodon
worker: mastodon
endpoint: mastodon
parameters:
mastodon_kafka_topic: activitypub_events
decentralized:
- network: ethereum
worker: core
Expand All @@ -89,6 +100,12 @@ component:
"http_headers": {
"user-agent": "rss3-node"
}
},
"mastodon": {
"url": "https://30.10.000.00:9092/",
"http_headers": {
"user-agent": "rss3-node"
}
}
},
"discovery": {
Expand Down Expand Up @@ -147,6 +164,16 @@ component:
}
}
],
"federated": [
{
"network": "mastodon",
"worker": "mastodon",
"endpoint": "mastodon",
"parameters": {
"mastodon_kafka_topic": "activitypub_events"
}
}
],
"decentralized": [
{
"network": "ethereum",
Expand Down Expand Up @@ -181,6 +208,12 @@ url = "https://rpc.ankr.com/eth"
[endpoints.ethereum.http_headers]
user-agent = "rss3-node"

[endpoints.mastodon]
url = "https://30.10.000.00:9092/"

[endpoints.mastodon.http_headers]
user-agent = "rss3-node"

[discovery.server]
endpoint = "https://node.mydomain.com/"
global_indexer_endpoint = "https://gi.rss3.dev/"
Expand Down Expand Up @@ -222,6 +255,14 @@ password = "pass"
access_key = "abc"
access_code = "def"

[[component.federated]]
network = "mastodon"
worker = "mastodon"
endpoint = "mastodon"

[component.federated.parameters]
mastodon_kafka_topic = "activitypub_events"

[[component.decentralized]]
network = "ethereum"
worker = "core"
Expand Down Expand Up @@ -253,6 +294,12 @@ var configFileExpected = &File{
"user-agent": "rss3-node",
},
},
"mastodon": {
URL: "https://30.10.000.00:9092/",
HTTPHeaders: map[string]string{
"user-agent": "rss3-node",
},
},
},
Discovery: &Discovery{
Operator: &Operator{
Expand Down Expand Up @@ -283,7 +330,22 @@ var configFileExpected = &File{
},
},
},
Federated: nil,
Federated: []*Module{
{
Network: network.Mastodon,
Worker: federated.Mastodon,
EndpointID: "mastodon",
Endpoint: Endpoint{
URL: "https://30.10.000.00:9092/",
HTTPHeaders: map[string]string{
"user-agent": "rss3-node",
},
},
Parameters: &Parameters{
"mastodon_kafka_topic": "activitypub_events",
},
},
},
Decentralized: []*Module{
{
Network: network.Ethereum,
Expand Down Expand Up @@ -370,40 +432,6 @@ func TestSetupConfig(t *testing.T) {
AssertConfig(t, f, configFileExpected)
}

// func TestConfigEnvOverride(t *testing.T) {
// t.Parallel()
//
// exceptEnvironment := "testing"
// exceptDatabaseURI := "postgres://mock@localhost:26257/defaultdb"
// exceptMetricsEndpoint := "127.0.0.1:9000"
//
// t.Setenv("NODE_ENVIRONMENT", exceptEnvironment)
// t.Setenv("NODE_DATABASE_URI", exceptDatabaseURI)
// t.Setenv("NODE_OBSERVABILITY_OPENTELEMETRY_METRICS_ENDPOINT", exceptMetricsEndpoint)
//
// configDir := "/etc/rss3/node"
// fs := afero.NewMemMapFs()
//
// err := fs.Mkdir(configDir, 0o777)
// assert.NoError(t, err)
//
// file, err := fs.Create(path.Join(configDir, configName))
// assert.NoError(t, err)
//
// _, err = file.WriteString(configExampleYaml)
// require.NoError(t, err)
//
// v := viper.New()
// v.SetFs(fs)
//
// f, err := _Setup(configName, "yaml", v)
// assert.NoError(t, err)
//
// assert.Equal(t, exceptEnvironment, f.Environment)
// assert.Equal(t, exceptDatabaseURI, f.Database.URI)
// assert.Equal(t, exceptMetricsEndpoint, f.Observability.OpenTelemetry.Metrics.Endpoint)
// }

func TestConfigFilePath(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -500,6 +528,26 @@ func AssertConfig(t *testing.T, expect, got *File) {
assert.Equal(t, expect.Discovery, got.Discovery)
})

t.Run("federated", func(t *testing.T) {
for i, federated := range expect.Component.Federated {
func(_expect, got *Module) {
t.Run(fmt.Sprintf("federated-%d", i), func(t *testing.T) {
t.Parallel()
assert.Equal(t, _expect, got)
})
}(federated, got.Component.Federated[i])
}

for i, indexer := range got.Component.Federated {
func(_except, got *Module) {
t.Run(fmt.Sprintf("%s-%s", indexer.Network, indexer.Worker), func(t *testing.T) {
t.Parallel()
AssertIndexer(t, _except, got)
})
}(configFileExpected.Component.Federated[i], indexer)
}
})

t.Run("decentralized", func(t *testing.T) {
for i, rss := range expect.Component.RSS {
func(_except, got *Module) {
Expand Down
20 changes: 16 additions & 4 deletions internal/engine/source/activitypub/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package activitypub

import (
"fmt"
"strconv"
"time"

"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/provider/activitypub"
Expand All @@ -13,6 +13,9 @@ import (

var _ engine.Task = (*Task)(nil)

// TODO: should be pulled from VSL (NetworkParams contract)
var defaultStartTime = "2024-07-22T00:00:00Z"

type Task struct {
Network network.Network
Message activitypub.Object
Expand All @@ -27,10 +30,19 @@ func (t Task) GetNetwork() network.Network {
}

func (t Task) GetTimestamp() uint64 {
publishedTimeStamp := t.Message.Published
timestamp, _ := strconv.ParseUint(publishedTimeStamp, 10, 64)
// Use default time if Published is empty
timeStr := t.Message.Published
if timeStr == "" {
timeStr = defaultStartTime
}

return timestamp
parsedTime, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
fmt.Println("Error parsing time:", err)
return 0
}
// Convert the time.Time object to a Unix timestamp and cast to uint64
return uint64(parsedTime.Unix())
}

func (t Task) Validate() error {
Expand Down
Loading
Loading