Skip to content

Commit

Permalink
fix: unexpected new db connections
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Mar 22, 2023
1 parent 5b7d8f8 commit db72bfc
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
3 changes: 3 additions & 0 deletions interfaces/result_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package interfaces

import (
"github.com/crawlab-team/crawlab-db/generic"
"time"
)

type ResultService interface {
Insert(records ...interface{}) (err error)
List(query generic.ListQuery, opts *generic.ListOptions) (results []interface{}, err error)
Count(query generic.ListQuery) (n int, err error)
Index(fields []string)
SetTime(t time.Time)
GetTime() (t time.Time)
}
7 changes: 3 additions & 4 deletions result/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package result

import (
"fmt"
"github.com/crawlab-team/crawlab-core/entity"
"github.com/crawlab-team/crawlab-core/errors"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/models/models"
"github.com/crawlab-team/crawlab-core/models/service"
"github.com/crawlab-team/go-trace"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
"sync"
)

func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.ResultService, err error) {
Expand Down Expand Up @@ -37,7 +36,7 @@ func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.Res
return svc, nil
}

var store = entity.NewTTLMap(5 * time.Second)
var store = sync.Map{}

func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfaces.ResultService, err error) {
// model service
Expand All @@ -62,7 +61,7 @@ func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfa
storeKey := s.ColId.Hex() + ":" + s.DataSourceId.Hex()

// attempt to load result service from store
res := store.Load(storeKey)
res, _ := store.Load(storeKey)
if res != nil {
svc, ok := res.(interfaces.ResultService)
if ok {
Expand Down
11 changes: 11 additions & 0 deletions result/service_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)

type ServiceMongo struct {
Expand All @@ -23,6 +24,7 @@ type ServiceMongo struct {
// internals
colId primitive.ObjectID // _id of models.DataCollection
dc *models.DataCollection // models.DataCollection
t time.Time
}

func (svc *ServiceMongo) List(query generic.ListQuery, opts *generic.ListOptions) (results []interface{}, err error) {
Expand Down Expand Up @@ -84,6 +86,14 @@ func (svc *ServiceMongo) Index(fields []string) {
}
}

func (svc *ServiceMongo) SetTime(t time.Time) {
svc.t = t
}

func (svc *ServiceMongo) GetTime() (t time.Time) {
return svc.t
}

func (svc *ServiceMongo) getList(query bson.M, opts *mongo.FindOptions) (results []interface{}, err error) {
list, err := svc.modelColSvc.GetList(query, opts)
if err != nil {
Expand All @@ -110,6 +120,7 @@ func NewResultServiceMongo(colId primitive.ObjectID, _ primitive.ObjectID) (svc2
// service
svc := &ServiceMongo{
colId: colId,
t: time.Now(),
}

// dependency injection
Expand Down
33 changes: 29 additions & 4 deletions task/stats/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stats

import (
config2 "github.com/crawlab-team/crawlab-core/config"
"github.com/crawlab-team/crawlab-core/entity"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/models/service"
"github.com/crawlab-team/crawlab-core/node/config"
Expand All @@ -26,7 +25,8 @@ type Service struct {

// internals
mu sync.Mutex
resultServices *entity.TTLMap
resultServices sync.Map
rsTtl time.Duration
logDriver log.Driver
}

Expand All @@ -47,11 +47,16 @@ func (svc *Service) InsertLogs(id primitive.ObjectID, logs ...string) (err error
}

func (svc *Service) getResultService(id primitive.ObjectID) (resultSvc interfaces.ResultService, err error) {
// atomic operation
svc.mu.Lock()
defer svc.mu.Unlock()

// attempt to get from cache
res := svc.resultServices.Load(id.Hex())
res, _ := svc.resultServices.Load(id.Hex())
if res != nil {
// hit in cache
resultSvc, ok := res.(interfaces.ResultService)
resultSvc.SetTime(time.Now())
if ok {
return resultSvc, nil
}
Expand Down Expand Up @@ -83,6 +88,25 @@ func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) {
})
}

func (svc *Service) cleanup() {
for {
// atomic operation
svc.mu.Lock()

svc.resultServices.Range(func(key, value interface{}) bool {
rs := value.(interfaces.ResultService)
if time.Now().After(rs.GetTime().Add(svc.rsTtl)) {
svc.resultServices.Delete(key)
}
return true
})

svc.mu.Unlock()

time.Sleep(10 * time.Minute)
}
}

func NewTaskStatsService(opts ...Option) (svc2 interfaces.TaskStatsService, err error) {
// base service
baseSvc, err := task.NewBaseService()
Expand All @@ -92,8 +116,9 @@ func NewTaskStatsService(opts ...Option) (svc2 interfaces.TaskStatsService, err

// service
svc := &Service{
mu: sync.Mutex{},
TaskBaseService: baseSvc,
resultServices: entity.NewTTLMap(5 * time.Second),
resultServices: sync.Map{},
}

// apply options
Expand Down

0 comments on commit db72bfc

Please sign in to comment.