diff --git a/internal/database/client.go b/internal/database/client.go index ab27e5bb..81681901 100644 --- a/internal/database/client.go +++ b/internal/database/client.go @@ -61,8 +61,8 @@ type DatasetENSNamehash interface { } type DatasetMastodonHandle interface { - LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error) - SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error + SaveRecentMastodonHandles(ctx context.Context, handles []*model.MastodonHandle) error + GetUpdatedMastodonHandles(ctx context.Context, query model.QueryMastodonHandles) ([]*model.MastodonHandle, error) } var _ goose.Logger = (*SugaredLogger)(nil) diff --git a/internal/database/dialer/postgres/client.go b/internal/database/dialer/postgres/client.go index 0751d8b7..8e9f52e2 100644 --- a/internal/database/dialer/postgres/client.go +++ b/internal/database/dialer/postgres/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "strings" "time" @@ -380,45 +381,73 @@ func (c *client) SaveDatasetENSNamehash(ctx context.Context, namehash *model.ENS return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error } -// LoadDatasetMastodonHandle loads a Mastodon handle. -func (c *client) LoadDatasetMastodonHandle(ctx context.Context, handle string) (*model.MastodonHandle, error) { - var value table.DatasetMastodonHandle +func (c *client) SaveRecentMastodonHandles(ctx context.Context, handles []*model.MastodonHandle) error { + // build the mastodon update handle table + values := make([]table.DatasetMastodonUpdateHandle, 0, len(handles)) - if err := c.database.WithContext(ctx). - Where("handle = ?", handle). - First(&value). - Error; err != nil { - if !errors.Is(err, gorm.ErrRecordNotFound) { - return nil, err + // Iterate through the handles and import them into the values slice + for _, handle := range handles { + var value table.DatasetMastodonUpdateHandle + if err := value.Import(handle); err != nil { + return err } - // Initialize a default handle. - value = table.DatasetMastodonHandle{ - Handle: handle, - LastUpdated: time.Now(), - } + values = append(values, value) } - return value.Export() + onConflictClause := clause.OnConflict{ + Columns: []clause.Column{{Name: "handle"}}, + UpdateAll: true, + } + + return c.database.WithContext(ctx).Clauses(onConflictClause).CreateInBatches(&values, math.MaxUint8).Error } -// SaveDatasetMastodonHandle saves a Mastodon handle. -func (c *client) SaveDatasetMastodonHandle(ctx context.Context, handle *model.MastodonHandle) error { - clauses := []clause.Expression{ - clause.OnConflict{ - Columns: []clause.Column{{Name: "handle"}}, - DoUpdates: clause.Assignments(map[string]interface{}{ - "last_updated": time.Now(), - }), - }, +func (c *client) GetUpdatedMastodonHandles(ctx context.Context, query model.QueryMastodonHandles) ([]*model.MastodonHandle, error) { + databaseStatement := c.database.WithContext(ctx).Table(table.DatasetMastodonUpdateHandle{}.TableName()) + + if query.Cursor != nil { + var handleCursor *table.DatasetMastodonUpdateHandle + + if err := c.database.WithContext(ctx).First(&handleCursor, "handle = ?", query.Cursor).Error; err != nil { + return nil, fmt.Errorf("get handle cursor: %w", err) + } + + databaseStatement = databaseStatement.Where("updated_at < ? OR (updated_at = ? AND created_at < ?)", handleCursor.UpdatedAt, handleCursor.UpdatedAt, handleCursor.CreatedAt) } - var value table.DatasetMastodonHandle - if err := value.Import(handle); err != nil { - return err + if query.Since != nil { + databaseStatement = databaseStatement.Where("updated_at > ?", time.UnixMilli(int64(*query.Since))) } - return c.database.WithContext(ctx).Clauses(clauses...).Create(&value).Error + if query.Limit != nil { + databaseStatement = databaseStatement.Limit(*query.Limit) + } + + databaseStatement = databaseStatement.Order("updated_at DESC, created_at DESC") + + var handles []*table.DatasetMastodonUpdateHandle + + if err := databaseStatement.Find(&handles).Error; err != nil { + return nil, err + } + + result := make([]*model.MastodonHandle, 0, len(handles)) + + for _, handle := range handles { + var ( + value *model.MastodonHandle + err error + ) + + if value, err = handle.Export(); err != nil { + return nil, err + } + + result = append(result, value) + } + + return result, nil } // Dial dials a database. diff --git a/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql b/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql index b5e988ea..522f2866 100644 --- a/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql +++ b/internal/database/dialer/postgres/migration/20240922233919_add_mastodon_handles.sql @@ -1,12 +1,15 @@ -- +goose Up -CREATE TABLE IF NOT EXISTS dataset_mastodon_handles ( - handle VARCHAR(255) PRIMARY KEY, - last_updated TIMESTAMP NOT NULL +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS dataset_mastodon_update_handles ( + "handle" text PRIMARY KEY, + "created_at" timestamptz NOT NULL DEFAULT now(), + "updated_at" timestamptz NOT NULL DEFAULT now() ); -CREATE INDEX idx_mastodon_handles_last_updated ON dataset_mastodon_handles(last_updated); +CREATE INDEX idx_mastodon_update_handles_time_at ON dataset_mastodon_update_handles(updated_at DESC,created_at DESC); +-- +goose StatementEnd -- +goose Down -- +goose StatementBegin -DROP TABLE IF EXISTS dataset_mastodon_handles; --- +goose StatementEnd \ No newline at end of file +DROP TABLE IF EXISTS dataset_mastodon_update_handles; +-- +goose StatementEnd diff --git a/internal/database/dialer/postgres/table/dataset_mastodon_handle.go b/internal/database/dialer/postgres/table/dataset_mastodon_handle.go deleted file mode 100644 index 72e97c78..00000000 --- a/internal/database/dialer/postgres/table/dataset_mastodon_handle.go +++ /dev/null @@ -1,30 +0,0 @@ -package table - -import ( - "time" - - "github.com/rss3-network/node/internal/database/model" -) - -var _ model.MastodonHandleTransformer = (*DatasetMastodonHandle)(nil) - -type DatasetMastodonHandle struct { - Handle string `gorm:"column:handle;primaryKey"` - LastUpdated time.Time `gorm:"column:last_updated;not null"` -} - -func (d *DatasetMastodonHandle) Import(handle *model.MastodonHandle) error { - d.Handle = handle.Handle - d.LastUpdated = handle.LastUpdated - - return nil -} - -func (d *DatasetMastodonHandle) Export() (*model.MastodonHandle, error) { - handle := model.MastodonHandle{ - Handle: d.Handle, - LastUpdated: d.LastUpdated, - } - - return &handle, nil -} diff --git a/internal/database/dialer/postgres/table/dataset_mastodon_update_handle.go b/internal/database/dialer/postgres/table/dataset_mastodon_update_handle.go new file mode 100644 index 00000000..09ee5070 --- /dev/null +++ b/internal/database/dialer/postgres/table/dataset_mastodon_update_handle.go @@ -0,0 +1,37 @@ +package table + +import ( + "time" + + "github.com/rss3-network/node/internal/database/model" +) + +var _ model.MastodonHandleTransformer = (*DatasetMastodonUpdateHandle)(nil) + +type DatasetMastodonUpdateHandle struct { + Handle string `gorm:"column:handle;primaryKey"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"` + UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"` +} + +func (d *DatasetMastodonUpdateHandle) Import(handle *model.MastodonHandle) error { + d.Handle = handle.Handle + d.UpdatedAt = handle.UpdatedAt + d.CreatedAt = handle.CreatedAt + + return nil +} + +func (d *DatasetMastodonUpdateHandle) Export() (*model.MastodonHandle, error) { + handle := model.MastodonHandle{ + Handle: d.Handle, + UpdatedAt: d.UpdatedAt, + CreatedAt: d.CreatedAt, + } + + return &handle, nil +} + +func (DatasetMastodonUpdateHandle) TableName() string { + return "dataset_mastodon_update_handles" +} diff --git a/internal/database/model/dataset_mastodon_handle.go b/internal/database/model/dataset_mastodon_handle.go index 159e78af..7bc59aa0 100644 --- a/internal/database/model/dataset_mastodon_handle.go +++ b/internal/database/model/dataset_mastodon_handle.go @@ -8,6 +8,19 @@ type MastodonHandleTransformer interface { } type MastodonHandle struct { - Handle string `json:"handle"` - LastUpdated time.Time `json:"last_updated"` + Handle string `json:"handle"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type PaginatedMastodonHandles struct { + Handles []string + TotalCount int64 + NextCursor string +} + +type QueryMastodonHandles struct { + Since *uint64 + Limit *int + Cursor *string } diff --git a/internal/engine/worker/federated/activitypub/mastodon/worker.go b/internal/engine/worker/federated/activitypub/mastodon/worker.go index 92a7c3ee..7e4aedee 100644 --- a/internal/engine/worker/federated/activitypub/mastodon/worker.go +++ b/internal/engine/worker/federated/activitypub/mastodon/worker.go @@ -19,7 +19,6 @@ import ( "github.com/rss3-network/node/provider/activitypub" "github.com/rss3-network/node/provider/activitypub/mastodon" "github.com/rss3-network/node/provider/httpx" - "github.com/rss3-network/node/provider/redis" "github.com/rss3-network/node/schema/worker/federated" "github.com/rss3-network/protocol-go/schema" activityx "github.com/rss3-network/protocol-go/schema/activity" @@ -27,6 +26,7 @@ import ( "github.com/rss3-network/protocol-go/schema/network" "github.com/rss3-network/protocol-go/schema/tag" "github.com/rss3-network/protocol-go/schema/typex" + "github.com/samber/lo" "go.uber.org/zap" ) @@ -195,7 +195,7 @@ func (w *worker) handleSingleActivityPubCreate(ctx context.Context, message acti } // Store unique handles of this activity - err := w.saveMastodonHandle(ctx, handles) + err := w.saveMastodonHandles(ctx, handles) if err != nil { zap.L().Error("failed to save mastodon handle", zap.Error(err), zap.String("currentUserHandle", currentUserHandle)) return err @@ -298,7 +298,7 @@ func (w *worker) handleSingleActivityPubAnnounce(ctx context.Context, message ac } // Store the current user's unique handle - err = w.saveMastodonHandle(ctx, handles) + err = w.saveMastodonHandles(ctx, handles) if err != nil { zap.L().Error("failed to save mastodon handle", zap.Error(err), zap.String("currentUserHandle", currentUserHandle)) return err @@ -307,22 +307,21 @@ func (w *worker) handleSingleActivityPubAnnounce(ctx context.Context, message ac return nil } -// saveMastodonHandle store the unique handles into the relevant DB table -func (w *worker) saveMastodonHandle(ctx context.Context, handles []string) error { - for _, handleString := range handles { - handle := &model.MastodonHandle{ - Handle: handleString, - LastUpdated: time.Now(), - } +// saveMastodonHandles store the unique handles into the relevant DB table +func (w *worker) saveMastodonHandles(ctx context.Context, handles []string) error { + // Find all unique handles + uniqueHandles := lo.Uniq(handles) - if err := w.databaseClient.SaveDatasetMastodonHandle(ctx, handle); err != nil { - return fmt.Errorf("failed to save Mastodon handle: %w", err) - } + // Convert map back to a slice to ensure all handles are unique + handleSlice := make([]*model.MastodonHandle, 0, len(uniqueHandles)) + for _, handle := range uniqueHandles { + handleSlice = append(handleSlice, &model.MastodonHandle{ + Handle: handle, + }) + } - // Add handle update to Redis set - if err := redis.AddHandleUpdate(ctx, w.redisClient, handleString); err != nil { - return fmt.Errorf("failed to add handle update to Redis: %w", err) - } + if err := w.databaseClient.SaveRecentMastodonHandles(ctx, handleSlice); err != nil { + return fmt.Errorf("failed to update recent handles: %w", err) } return nil diff --git a/internal/node/component/federated/data.go b/internal/node/component/federated/data.go index 601099f1..78f76513 100644 --- a/internal/node/component/federated/data.go +++ b/internal/node/component/federated/data.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/rss3-network/node/internal/database/model" - "github.com/rss3-network/node/provider/redis" activityx "github.com/rss3-network/protocol-go/schema/activity" networkx "github.com/rss3-network/protocol-go/schema/network" "github.com/samber/lo" @@ -35,17 +34,27 @@ func (c *Component) getCursor(ctx context.Context, cursor *string) (*activityx.A return nil, nil } - str := strings.Split(*cursor, ":") - if len(str) != 2 { - return nil, fmt.Errorf("invalid cursor") + cleanedCursor := *cursor + prefix := "https://" + + if strings.HasPrefix(cleanedCursor, "https://") { + cleanedCursor = cleanedCursor[8:] + } else if strings.HasPrefix(cleanedCursor, "http://") { + cleanedCursor = cleanedCursor[7:] + prefix = "http://" + } + + id, networkStr, found := strings.Cut(cleanedCursor, ":") + if !found { + return nil, fmt.Errorf("invalid cursor: missing network") } - network, err := networkx.NetworkString(str[1]) + network, err := networkx.NetworkString(networkStr) if err != nil { return nil, fmt.Errorf("invalid cursor: %w", err) } - data, _, err := c.getActivity(ctx, model.ActivityQuery{ID: lo.ToPtr(str[0]), Network: lo.ToPtr(network)}) + data, _, err := c.getActivity(ctx, model.ActivityQuery{ID: lo.ToPtr(prefix + id), Network: lo.ToPtr(network)}) if err != nil { return nil, fmt.Errorf("failed to get cursor: %w", err) } @@ -61,11 +70,7 @@ func (c *Component) transformCursor(_ context.Context, activity *activityx.Activ return fmt.Sprintf("%s:%s", activity.ID, activity.Network) } -// redis data retrieval: -func (c *Component) getAllHandles(ctx context.Context) ([]string, error) { - return redis.GetAllHandles(ctx, c.redisClient) -} - -func (c *Component) getUpdatedHandles(ctx context.Context, since uint64) ([]string, error) { - return redis.GetRecentHandleUpdates(ctx, c.redisClient, since) +// getUpdatedHandles retrieves the updated Mastodon handles from the database. +func (c *Component) getUpdatedHandles(ctx context.Context, query model.QueryMastodonHandles) ([]*model.MastodonHandle, error) { + return c.databaseClient.GetUpdatedMastodonHandles(ctx, query) } diff --git a/internal/node/component/federated/handler_handles.go b/internal/node/component/federated/handler_handles.go index 391d709d..88ab3b79 100644 --- a/internal/node/component/federated/handler_handles.go +++ b/internal/node/component/federated/handler_handles.go @@ -1,36 +1,69 @@ package federated import ( - "errors" "net/http" - "strconv" "github.com/labstack/echo/v4" "github.com/rss3-network/node/common/http/response" + "github.com/rss3-network/node/internal/database/model" + "github.com/samber/lo" ) +var defaultLimit = 100 + // GetHandles retrieves all active handles or updated handles based on the 'since' parameter func (c *Component) GetHandles(ctx echo.Context) error { - sinceStr := ctx.QueryParam("since") - if sinceStr == "" { - return response.BadRequestError(ctx, errors.New("'since' parameter is required")) + var request HandleRequest + if err := ctx.Bind(&request); err != nil { + return response.BadRequestError(ctx, err) } - since, err := strconv.ParseUint(sinceStr, 10, 64) - if err != nil { - return response.BadRequestError(ctx, err) + if request.Limit == 0 { + request.Limit = defaultLimit + } + // Validate request + if err := ctx.Validate(&request); err != nil { + return response.ValidationFailedError(ctx, err) } - var handles []string - if since == 0 { - handles, err = c.getAllHandles(ctx.Request().Context()) - } else { - handles, err = c.getUpdatedHandles(ctx.Request().Context(), since) + query := model.QueryMastodonHandles{ + Limit: lo.ToPtr(request.Limit), + Since: lo.Ternary(request.Since > 0, lo.ToPtr(request.Since), nil), + Cursor: lo.Ternary(request.Cursor != "", lo.ToPtr(request.Cursor), nil), } + res, err := c.getUpdatedHandles(ctx.Request().Context(), query) if err != nil { return response.InternalError(ctx) } - return ctx.JSON(http.StatusOK, map[string]interface{}{"handles": handles}) + handles := make([]string, 0, len(res)) + + var cursor string + + for i, handle := range res { + handles = append(handles, handle.Handle) + + if i == len(res)-1 && len(res) == request.Limit { + cursor = handle.Handle + } + } + + return ctx.JSON(http.StatusOK, PaginatedHandlesResponse{ + Handles: handles, + Cursor: cursor, + TotalCount: int64(len(handles)), + }) +} + +type HandleRequest struct { + Since uint64 `query:"since"` + Limit int `query:"limit" validate:"omitempty,min=1,max=500"` + Cursor string `query:"cursor"` +} + +type PaginatedHandlesResponse struct { + Handles []string `json:"handles"` + Cursor string `json:"cursor,omitempty"` + TotalCount int64 `json:"total_count"` } diff --git a/provider/redis/handle_updates.go b/provider/redis/handle_updates.go deleted file mode 100644 index b5cc0d0d..00000000 --- a/provider/redis/handle_updates.go +++ /dev/null @@ -1,66 +0,0 @@ -package redis - -import ( - "context" - "fmt" - "time" - - "github.com/redis/rueidis" - "go.uber.org/zap" -) - -const handleUpdatesKey = "handle_updates" - -// GetAllHandles retrieves all handles from the sorted set without filtering by score -func GetAllHandles(ctx context.Context, client rueidis.Client) ([]string, error) { - // Check if the key exists - existsCmd := client.B().Exists().Key(handleUpdatesKey).Build() - exists, err := client.Do(ctx, existsCmd).AsInt64() - - if err != nil { - zap.L().Error("failed to check if key exists", zap.Error(err)) - return nil, fmt.Errorf("failed to check if key exists: %w", err) - } - - if exists == 0 { - zap.L().Warn("handle updates key does not exist") - return []string{}, nil - } - - // Get all members without scores - cmd := client.B().Zrange().Key(handleUpdatesKey).Min("0").Max("-1").Build() - result, err := client.Do(ctx, cmd).AsStrSlice() - - if err != nil { - zap.L().Error("failed to get handles", zap.Error(err)) - return nil, fmt.Errorf("failed to get handles: %w", err) - } - - return result, nil -} - -// AddHandleUpdate adds a handle to the sorted set with the current timestamp as score -func AddHandleUpdate(ctx context.Context, client rueidis.Client, handle string) error { - cmd := client.B().Zadd().Key(handleUpdatesKey).ScoreMember().ScoreMember(float64(time.Now().Unix()), handle).Build() - - return client.Do(ctx, cmd).Error() -} - -// GetRecentHandleUpdates retrieves handles updated since the given timestamp -func GetRecentHandleUpdates(ctx context.Context, client rueidis.Client, since uint64) ([]string, error) { - cmd := client.B().Zrangebyscore().Key(handleUpdatesKey).Min(fmt.Sprintf("%d", since)).Max("+inf").Build() - result, err := client.Do(ctx, cmd).AsStrSlice() - - if err != nil { - zap.L().Error("failed to get recent handle updates", zap.Uint64("since", since), zap.Error(err)) - return nil, fmt.Errorf("failed to get recent handle updates: %w", err) - } - - return result, nil -} - -// RemoveOldHandleUpdates removes handles updated before the given time -func RemoveOldHandleUpdates(ctx context.Context, client rueidis.Client, before time.Time) error { - cmd := client.B().Zremrangebyscore().Key(handleUpdatesKey).Min("-inf").Max(fmt.Sprintf("%d", before.Unix())).Build() - return client.Do(ctx, cmd).Error() -}