Skip to content

Commit

Permalink
feat(worker/mastodon): apply Postgres DB for mastodon handle storage …
Browse files Browse the repository at this point in the history
…and modify query functions (#578)

Co-authored-by: brucexc <[email protected]>
  • Loading branch information
FrankLi123 and brucexc authored Oct 16, 2024
1 parent 60689f8 commit 47c389c
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 178 deletions.
4 changes: 2 additions & 2 deletions internal/database/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type DatasetENSNamehash interface {
}

type DatasetMastodonHandle interface {
LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error)
SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error
SaveRecentMastodonHandles(ctx context.Context, handles []*model.MastodonHandle) error
GetUpdatedMastodonHandles(ctx context.Context, query model.QueryMastodonHandles) ([]*model.MastodonHandle, error)
}

var _ goose.Logger = (*SugaredLogger)(nil)
Expand Down
85 changes: 57 additions & 28 deletions internal/database/dialer/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -380,45 +381,73 @@ func (c *client) SaveDatasetENSNamehash(ctx context.Context, namehash *model.ENS
return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
}

// LoadDatasetMastodonHandle loads a Mastodon handle.
func (c *client) LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error) {
var value table.DatasetMastodonHandle
func (c *client) SaveRecentMastodonHandles(ctx context.Context, handles []*model.MastodonHandle) error {
// build the mastodon update handle table
values := make([]table.DatasetMastodonUpdateHandle, 0, len(handles))

if err := c.database.WithContext(ctx).
Where("handle = ?", handle).
First(&value).
Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
// Iterate through the handles and import them into the values slice
for _, handle := range handles {
var value table.DatasetMastodonUpdateHandle
if err := value.Import(handle); err != nil {
return err
}

// Initialize a default handle.
value = table.DatasetMastodonHandle{
Handle: handle,
LastUpdated: time.Now(),
}
values = append(values, value)
}

return value.Export()
onConflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "handle"}},
UpdateAll: true,
}

return c.database.WithContext(ctx).Clauses(onConflictClause).CreateInBatches(&values, math.MaxUint8).Error
}

// SaveDatasetMastodonHandle saves a Mastodon handle.
func (c *client) SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error {
clauses := []clause.Expression{
clause.OnConflict{
Columns: []clause.Column{{Name: "handle"}},
DoUpdates: clause.Assignments(map[string]interface{}{
"last_updated": time.Now(),
}),
},
func (c *client) GetUpdatedMastodonHandles(ctx context.Context, query model.QueryMastodonHandles) ([]*model.MastodonHandle, error) {
databaseStatement := c.database.WithContext(ctx).Table(table.DatasetMastodonUpdateHandle{}.TableName())

if query.Cursor != nil {
var handleCursor *table.DatasetMastodonUpdateHandle

if err := c.database.WithContext(ctx).First(&handleCursor, "handle = ?", query.Cursor).Error; err != nil {
return nil, fmt.Errorf("get handle cursor: %w", err)
}

databaseStatement = databaseStatement.Where("updated_at < ? OR (updated_at = ? AND created_at < ?)", handleCursor.UpdatedAt, handleCursor.UpdatedAt, handleCursor.CreatedAt)
}

var value table.DatasetMastodonHandle
if err := value.Import(handle); err != nil {
return err
if query.Since != nil {
databaseStatement = databaseStatement.Where("updated_at > ?", time.UnixMilli(int64(*query.Since)))
}

return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error
if query.Limit != nil {
databaseStatement = databaseStatement.Limit(*query.Limit)
}

databaseStatement = databaseStatement.Order("updated_at DESC, created_at DESC")

var handles []*table.DatasetMastodonUpdateHandle

if err := databaseStatement.Find(&handles).Error; err != nil {
return nil, err
}

result := make([]*model.MastodonHandle, 0, len(handles))

for _, handle := range handles {
var (
value *model.MastodonHandle
err error
)

if value, err = handle.Export(); err != nil {
return nil, err
}

result = append(result, value)
}

return result, nil
}

// Dial dials a database.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS dataset_mastodon_handles (
handle VARCHAR(255) PRIMARY KEY,
last_updated TIMESTAMP NOT NULL
-- +goose StatementBegin
CREATE TABLE IF NOT EXISTS dataset_mastodon_update_handles (
"handle" text PRIMARY KEY,
"created_at" timestamptz NOT NULL DEFAULT now(),
"updated_at" timestamptz NOT NULL DEFAULT now()
);

CREATE INDEX idx_mastodon_handles_last_updated ON dataset_mastodon_handles(last_updated);
CREATE INDEX idx_mastodon_update_handles_time_at ON dataset_mastodon_update_handles(updated_at DESC,created_at DESC);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS dataset_mastodon_handles;
-- +goose StatementEnd
DROP TABLE IF EXISTS dataset_mastodon_update_handles;
-- +goose StatementEnd
30 changes: 0 additions & 30 deletions internal/database/dialer/postgres/table/dataset_mastodon_handle.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package table

import (
"time"

"github.com/rss3-network/node/internal/database/model"
)

var _ model.MastodonHandleTransformer = (*DatasetMastodonUpdateHandle)(nil)

type DatasetMastodonUpdateHandle struct {
Handle string `gorm:"column:handle;primaryKey"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}

func (d *DatasetMastodonUpdateHandle) Import(handle *model.MastodonHandle) error {
d.Handle = handle.Handle
d.UpdatedAt = handle.UpdatedAt
d.CreatedAt = handle.CreatedAt

return nil
}

func (d *DatasetMastodonUpdateHandle) Export() (*model.MastodonHandle, error) {
handle := model.MastodonHandle{
Handle: d.Handle,
UpdatedAt: d.UpdatedAt,
CreatedAt: d.CreatedAt,
}

return &handle, nil
}

func (DatasetMastodonUpdateHandle) TableName() string {
return "dataset_mastodon_update_handles"
}
17 changes: 15 additions & 2 deletions internal/database/model/dataset_mastodon_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ type MastodonHandleTransformer interface {
}

type MastodonHandle struct {
Handle string `json:"handle"`
LastUpdated time.Time `json:"last_updated"`
Handle string `json:"handle"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

type PaginatedMastodonHandles struct {
Handles []string
TotalCount int64
NextCursor string
}

type QueryMastodonHandles struct {
Since *uint64
Limit *int
Cursor *string
}
33 changes: 16 additions & 17 deletions internal/engine/worker/federated/activitypub/mastodon/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"github.com/rss3-network/node/provider/activitypub"
"github.com/rss3-network/node/provider/activitypub/mastodon"
"github.com/rss3-network/node/provider/httpx"
"github.com/rss3-network/node/provider/redis"
"github.com/rss3-network/node/schema/worker/federated"
"github.com/rss3-network/protocol-go/schema"
activityx "github.com/rss3-network/protocol-go/schema/activity"
"github.com/rss3-network/protocol-go/schema/metadata"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/rss3-network/protocol-go/schema/tag"
"github.com/rss3-network/protocol-go/schema/typex"
"github.com/samber/lo"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -195,7 +195,7 @@ func (w *worker) handleSingleActivityPubCreate(ctx context.Context, message acti
}

// Store unique handles of this activity
err := w.saveMastodonHandle(ctx, handles)
err := w.saveMastodonHandles(ctx, handles)
if err != nil {
zap.L().Error("failed to save mastodon handle", zap.Error(err), zap.String("currentUserHandle", currentUserHandle))
return err
Expand Down Expand Up @@ -298,7 +298,7 @@ func (w *worker) handleSingleActivityPubAnnounce(ctx context.Context, message ac
}

// Store the current user's unique handle
err = w.saveMastodonHandle(ctx, handles)
err = w.saveMastodonHandles(ctx, handles)
if err != nil {
zap.L().Error("failed to save mastodon handle", zap.Error(err), zap.String("currentUserHandle", currentUserHandle))
return err
Expand All @@ -307,22 +307,21 @@ func (w *worker) handleSingleActivityPubAnnounce(ctx context.Context, message ac
return nil
}

// saveMastodonHandle store the unique handles into the relevant DB table
func (w *worker) saveMastodonHandle(ctx context.Context, handles []string) error {
for _, handleString := range handles {
handle := &model.MastodonHandle{
Handle: handleString,
LastUpdated: time.Now(),
}
// saveMastodonHandles store the unique handles into the relevant DB table
func (w *worker) saveMastodonHandles(ctx context.Context, handles []string) error {
// Find all unique handles
uniqueHandles := lo.Uniq(handles)

if err := w.databaseClient.SaveDatasetMastodonHandle(ctx, handle); err != nil {
return fmt.Errorf("failed to save Mastodon handle: %w", err)
}
// Convert map back to a slice to ensure all handles are unique
handleSlice := make([]*model.MastodonHandle, 0, len(uniqueHandles))
for _, handle := range uniqueHandles {
handleSlice = append(handleSlice, &model.MastodonHandle{
Handle: handle,
})
}

// Add handle update to Redis set
if err := redis.AddHandleUpdate(ctx, w.redisClient, handleString); err != nil {
return fmt.Errorf("failed to add handle update to Redis: %w", err)
}
if err := w.databaseClient.SaveRecentMastodonHandles(ctx, handleSlice); err != nil {
return fmt.Errorf("failed to update recent handles: %w", err)
}

return nil
Expand Down
31 changes: 18 additions & 13 deletions internal/node/component/federated/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"

"github.com/rss3-network/node/internal/database/model"
"github.com/rss3-network/node/provider/redis"
activityx "github.com/rss3-network/protocol-go/schema/activity"
networkx "github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
Expand Down Expand Up @@ -35,17 +34,27 @@ func (c *Component) getCursor(ctx context.Context, cursor *string) (*activityx.A
return nil, nil
}

str := strings.Split(*cursor, ":")
if len(str) != 2 {
return nil, fmt.Errorf("invalid cursor")
cleanedCursor := *cursor
prefix := "https://"

if strings.HasPrefix(cleanedCursor, "https://") {
cleanedCursor = cleanedCursor[8:]
} else if strings.HasPrefix(cleanedCursor, "http://") {
cleanedCursor = cleanedCursor[7:]
prefix = "http://"
}

id, networkStr, found := strings.Cut(cleanedCursor, ":")
if !found {
return nil, fmt.Errorf("invalid cursor: missing network")
}

network, err := networkx.NetworkString(str[1])
network, err := networkx.NetworkString(networkStr)
if err != nil {
return nil, fmt.Errorf("invalid cursor: %w", err)
}

data, _, err := c.getActivity(ctx, model.ActivityQuery{ID: lo.ToPtr(str[0]), Network: lo.ToPtr(network)})
data, _, err := c.getActivity(ctx, model.ActivityQuery{ID: lo.ToPtr(prefix + id), Network: lo.ToPtr(network)})
if err != nil {
return nil, fmt.Errorf("failed to get cursor: %w", err)
}
Expand All @@ -61,11 +70,7 @@ func (c *Component) transformCursor(_ context.Context, activity *activityx.Activ
return fmt.Sprintf("%s:%s", activity.ID, activity.Network)
}

// redis data retrieval:
func (c *Component) getAllHandles(ctx context.Context) ([]string, error) {
return redis.GetAllHandles(ctx, c.redisClient)
}

func (c *Component) getUpdatedHandles(ctx context.Context, since uint64) ([]string, error) {
return redis.GetRecentHandleUpdates(ctx, c.redisClient, since)
// getUpdatedHandles retrieves the updated Mastodon handles from the database.
func (c *Component) getUpdatedHandles(ctx context.Context, query model.QueryMastodonHandles) ([]*model.MastodonHandle, error) {
return c.databaseClient.GetUpdatedMastodonHandles(ctx, query)
}
Loading

0 comments on commit 47c389c

Please sign in to comment.