This repository has been archived by the owner on Nov 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
service.go
565 lines (508 loc) · 17.3 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
package service
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/crypto"
core "github.com/libp2p/go-libp2p-core/peer"
"github.com/oklog/ulid/v2"
"github.com/textileio/bidbot/buildinfo"
pb "github.com/textileio/bidbot/gen/v1"
"github.com/textileio/bidbot/lib/auction"
"github.com/textileio/bidbot/lib/cast"
"github.com/textileio/bidbot/lib/datauri"
"github.com/textileio/bidbot/lib/dshelper/txndswrap"
"github.com/textileio/bidbot/lib/filclient"
"github.com/textileio/bidbot/service/limiter"
"github.com/textileio/bidbot/service/lotusclient"
"github.com/textileio/bidbot/service/pricing"
bidstore "github.com/textileio/bidbot/service/store"
tcrypto "github.com/textileio/crypto"
"github.com/textileio/go-libp2p-pubsub-rpc/finalizer"
"github.com/textileio/go-libp2p-pubsub-rpc/peer"
golog "github.com/textileio/go-log/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
var (
log = golog.Logger("bidbot/service")
// bidsAckTimeout is the max duration bidbot will wait for an ack after bidding in an auction.
bidsAckTimeout = time.Second * 30
// BidsExpiration is the duration to wait for a proposal CID after
// which bidbot will consider itself not winning in an auction, so the
// resources can be freed up.
BidsExpiration = 10 * time.Minute
// dataURIValidateTimeout is the timeout used when validating a data uri.
dataURIValidateTimeout = time.Minute
errWouldExceedRunningBytesLimit = errors.New(auction.ErrStringWouldExceedRunningBytesLimit)
)
// Config defines params for Service configuration.
type Config struct {
Peer peer.Config
BidParams BidParams
AuctionFilters AuctionFilters
BytesLimiter limiter.Limiter
ConcurrentImports int
ChunkedDownload bool
SealingSectorsLimit int
PricingRules pricing.PricingRules
PricingRulesStrict bool
ConcurrentDownloads int
}
// BidParams defines how bids are made.
type BidParams struct {
// StorageProviderID is your Filecoin StorageProvider ID used to make deals.
StorageProviderID string
// WalletAddrSig is a signature from your owner Lotus wallet address used to authenticate bids.
WalletAddrSig []byte
// AskPrice in attoFIL per GiB per epoch.
AskPrice int64
// VerifiedAskPrice in attoFIL per GiB per epoch.
VerifiedAskPrice int64
// FastRetrieval is whether or not you're offering fast retrieval for the deal data.
FastRetrieval bool
// DealStartWindow is the number of epochs after which won deals must start be on-chain.
DealStartWindow uint64
// DealDataDirectory is the directory to which deal data will be written.
DealDataDirectory string
// DealDataFetchAttempts is the number of times fetching deal data cid will be attempted.
DealDataFetchAttempts uint32
// DealDataFetchTimeout is the timeout fetching deal data cid.
DealDataFetchTimeout time.Duration
// DiscardOrphanDealsAfter is the time after which deals with no progress will be removed.
DiscardOrphanDealsAfter time.Duration
}
// Validate ensures BidParams are valid.
func (p *BidParams) Validate() error {
if p.DealStartWindow == 0 {
return fmt.Errorf("invalid deal start window; must be greater than zero")
}
if p.DealDataFetchAttempts == 0 {
return fmt.Errorf("invalid deal data fetch attempts; must be greater than zero")
}
if p.DealDataDirectory == "" {
return fmt.Errorf("invalid deal data directory; must not be empty")
}
if err := os.MkdirAll(p.DealDataDirectory, os.ModePerm); err != nil {
return fmt.Errorf("initializing data directory: %v", err)
}
testFile := filepath.Join(p.DealDataDirectory, ulid.MustNew(ulid.Now(), rand.Reader).String())
if err := ioutil.WriteFile(testFile, []byte("testing"), 0644); err != nil {
return fmt.Errorf("checking write access to data directory: %v", err)
}
if err := os.Remove(testFile); err != nil {
return fmt.Errorf("removing data directory write test file: %v", err)
}
return nil
}
// AuctionFilters specifies filters used when selecting auctions to bid on.
type AuctionFilters struct {
// DealDuration sets the min and max deal duration to bid on.
DealDuration MinMaxFilter
// DealSize sets the min and max deal size to bid on.
DealSize MinMaxFilter
}
// Validate ensures AuctionFilters are valid.
func (f *AuctionFilters) Validate() error {
if err := f.DealDuration.Validate(); err != nil {
return fmt.Errorf("invalid deal duration filter: %v", err)
}
if err := f.DealDuration.Validate(); err != nil {
return fmt.Errorf("invalid deal size filter: %v", err)
}
return nil
}
// MinMaxFilter is used to specify a range for an auction filter.
type MinMaxFilter struct {
Min uint64
Max uint64
}
// Validate ensures the filter is a valid min max window.
func (f *MinMaxFilter) Validate() error {
if f.Min > f.Max {
return errors.New("min must be less than or equal to max")
}
return nil
}
// Service is a miner service that subscribes to auctions.
type Service struct {
commChannel CommChannel
decryptKey tcrypto.DecryptionKey
fc filclient.FilClient
lc lotusclient.LotusClient
store *bidstore.Store
paused int32 // atomic. 1 == paused; 0 == not paused
bidParams BidParams
auctionFilters AuctionFilters
bytesLimiter limiter.Limiter
sealingSectorsLimit int
pricingRules pricing.PricingRules
pricingRulesStrict bool
ctx context.Context
finalizer *finalizer.Finalizer
}
// New returns a new Service.
func New(
conf Config,
store txndswrap.TxnDatastore,
lc lotusclient.LotusClient,
fc filclient.FilClient,
) (*Service, error) {
if err := conf.BidParams.Validate(); err != nil {
return nil, fmt.Errorf("validating bid parameters: %v", err)
}
if err := conf.AuctionFilters.Validate(); err != nil {
return nil, fmt.Errorf("validating auction filters: %v", err)
}
fin := finalizer.NewFinalizer()
ctx, cancel := context.WithCancel(context.Background())
fin.Add(finalizer.NewContextCloser(cancel))
commChannel, err := NewLibp2pPubsub(ctx, conf.Peer)
if err != nil {
return nil, fin.Cleanupf("creating peer: %v", err)
}
fin.Add(commChannel)
// Create bid store
s, err := bidstore.NewStore(
store,
fc,
lc,
conf.BidParams.DealDataDirectory,
conf.BidParams.DealDataFetchAttempts,
conf.BidParams.DealDataFetchTimeout,
progressReporter{commChannel, ctx},
conf.BidParams.DiscardOrphanDealsAfter,
conf.BytesLimiter,
conf.ConcurrentImports,
conf.ChunkedDownload,
conf.ConcurrentDownloads,
)
if err != nil {
return nil, fin.Cleanupf("creating bid store: %v", err)
}
fin.Add(s)
// Verify StorageProvider ID
ok, err := fc.VerifyBidder(
conf.BidParams.WalletAddrSig,
commChannel.ID(),
conf.BidParams.StorageProviderID)
if err != nil {
return nil, fin.Cleanupf("verifying StorageProvider ID: %v", err)
}
if !ok {
return nil, fin.Cleanup(fmt.Errorf("invalid StorageProvider ID or signature"))
}
privKey, err := crypto.MarshalPrivateKey(conf.Peer.PrivKey)
if err != nil {
return nil, fin.Cleanupf("marshaling private key: %v", err)
}
decryptKey, err := tcrypto.DecryptionKeyFromBytes(privKey)
if err != nil {
return nil, fin.Cleanupf("creating decryption key: %v", err)
}
srv := &Service{
commChannel: commChannel,
decryptKey: decryptKey,
fc: fc,
lc: lc,
store: s,
bidParams: conf.BidParams,
auctionFilters: conf.AuctionFilters,
bytesLimiter: conf.BytesLimiter,
sealingSectorsLimit: conf.SealingSectorsLimit,
pricingRules: conf.PricingRules,
pricingRulesStrict: conf.PricingRulesStrict,
ctx: ctx,
finalizer: fin,
}
if srv.pricingRules == nil {
srv.pricingRules = pricing.EmptyRules{}
}
srv.finalizer.AddFn(srv.healthChecks())
log.Info("service started")
return srv, nil
}
// Close the service.
func (s *Service) Close() error {
log.Info("service was shutdown")
return s.finalizer.Cleanup(nil)
}
// Subscribe to the deal auction feed. Upon success, it reports basic bidbot information to auctioneer after some delay.
// If bootstrap is true, the peer will dial the configured bootstrap addresses before joining the deal auction feed.
func (s *Service) Subscribe(bootstrap bool) error {
err := s.commChannel.Subscribe(bootstrap, s)
if err == nil {
time.AfterFunc(30*time.Second, s.reportStartup)
}
return err
}
// PeerInfo returns the public information of the market peer.
func (s *Service) PeerInfo() (*peer.Info, error) {
return s.commChannel.Info()
}
// ListBids lists bids by applying a store.Query.
func (s *Service) ListBids(query bidstore.Query) ([]*bidstore.Bid, error) {
return s.store.ListBids(query)
}
// GetBid gets the bid with specific ID.
func (s *Service) GetBid(ctx context.Context, id auction.BidID) (*bidstore.Bid, error) {
return s.store.GetBid(ctx, id)
}
// WriteDataURI writes a data uri resource to the configured deal data directory.
func (s *Service) WriteDataURI(payloadCid, uri string) (string, error) {
return s.store.WriteDataURI("", payloadCid, uri, 0)
}
// SetPaused sets the service state to pause bidding or not.
func (s *Service) SetPaused(paused bool) {
var v int32
if paused {
v = 1
}
atomic.StoreInt32(&s.paused, v)
}
func (s *Service) isPaused() bool {
return atomic.LoadInt32(&s.paused) == 1
}
// AuctionsHandler implements MessageHandler.
func (s *Service) AuctionsHandler(from core.ID, a *pb.Auction) error {
if s.isPaused() {
log.Info("not bidding when bidbot is paused")
return nil
}
ajson, err := json.MarshalIndent(a, "", " ")
if err != nil {
return fmt.Errorf("marshaling json: %v", err)
}
log.Infof("auction details:\n%s", string(ajson))
go func() {
ctx, cls := context.WithTimeout(context.Background(), time.Second*30)
defer cls()
if err := s.makeBid(ctx, a, from); err != nil {
log.Errorf("making bid: %v", err)
}
}()
return nil
}
func (s *Service) makeBid(ctx context.Context, a *pb.Auction, from core.ID) error {
if rejectReason := s.filterAuction(a); rejectReason != "" {
log.Infof("not bidding in auction %s from %s: %s", a.Id, from, rejectReason)
return nil
}
if s.sealingSectorsLimit > 0 {
n, err := s.lc.CurrentSealingSectors()
if err != nil {
log.Errorf("fail to get number of sealing sectors, continuing: %v", err)
} else if n > s.sealingSectorsLimit {
log.Infof("not bidding: lotus already has %d sealing sectors", n)
return nil
}
}
if err := s.store.HealthCheck(); err != nil {
return fmt.Errorf("store not ready to bid: %v", err)
}
// Get current chain height
currentEpoch, err := s.fc.GetChainHeight()
if err != nil {
return fmt.Errorf("getting chain height: %v", err)
}
startEpoch := s.bidParams.DealStartWindow + currentEpoch
if a.FilEpochDeadline > 0 && a.FilEpochDeadline < startEpoch {
log.Infof("auction %s from %s requires epoch no later than %d, but I can only promise epoch %d, skip bidding",
a.Id, from, a.FilEpochDeadline, startEpoch)
return nil
}
prices, valid := s.pricingRules.PricesFor(a)
log.Infof("pricing engine result valid for auction %s?: %v, details: %+v", a.Id, valid, prices)
if !valid {
// fail to load rules, allow bidding unless pricingRulesStrict is set.
if s.pricingRulesStrict {
return nil
}
prices.AllowBidding = true
}
if !prices.AllowBidding {
return nil
}
if !prices.UnverifiedPriceValid {
prices.UnverifiedPrice = s.bidParams.AskPrice
}
if !prices.VerifiedPriceValid {
prices.VerifiedPrice = s.bidParams.VerifiedAskPrice
}
bid := &pb.Bid{
AuctionId: a.Id,
StorageProviderId: s.bidParams.StorageProviderID,
WalletAddrSig: []byte("***"),
AskPrice: prices.UnverifiedPrice,
VerifiedAskPrice: prices.VerifiedPrice,
StartEpoch: startEpoch,
FastRetrieval: s.bidParams.FastRetrieval,
}
bidj, err := json.MarshalIndent(bid, "", " ")
if err != nil {
return fmt.Errorf("marshaling json: %v", err)
}
log.Infof("bidding in auction %s from %s: \n%s", a.Id, from, string(bidj))
bid.WalletAddrSig = s.bidParams.WalletAddrSig
// Submit bid to auctioneer
ctx2, cancel2 := context.WithTimeout(s.ctx, bidsAckTimeout)
defer cancel2()
id, err := s.commChannel.PublishBid(ctx2, auction.BidsTopic(auction.ID(a.Id)), bid)
if err != nil {
return fmt.Errorf("sending bid: %v", err)
}
payloadCid, err := cid.Parse(a.PayloadCid)
if err != nil {
return fmt.Errorf("parsing payload cid: %v", err)
}
// Save bid locally
if err := s.store.SaveBid(ctx, bidstore.Bid{
ID: auction.BidID(id),
AuctionID: auction.ID(a.Id),
AuctioneerID: from,
PayloadCid: payloadCid,
ClientAddress: a.ClientAddress,
DealSize: a.DealSize,
DealDuration: a.DealDuration,
AskPrice: bid.AskPrice,
VerifiedAskPrice: bid.VerifiedAskPrice,
StartEpoch: bid.StartEpoch,
FastRetrieval: bid.FastRetrieval,
}); err != nil {
return fmt.Errorf("saving bid: %v", err)
}
log.Debugf("created bid %s in auction %s", id, a.Id)
return nil
}
func (s *Service) filterAuction(auction *pb.Auction) (rejectReason string) {
if !auction.EndsAt.IsValid() || auction.EndsAt.AsTime().Before(time.Now()) {
return "auction ended or has an invalid end time"
}
if auction.DealSize < s.auctionFilters.DealSize.Min ||
auction.DealSize > s.auctionFilters.DealSize.Max {
return fmt.Sprintf("deal size falls outside of the range [%d, %d]",
s.auctionFilters.DealSize.Min,
s.auctionFilters.DealSize.Max)
}
if auction.DealDuration < s.auctionFilters.DealDuration.Min ||
auction.DealDuration > s.auctionFilters.DealDuration.Max {
return fmt.Sprintf("deal duration falls outside of the range [%d, %d]",
s.auctionFilters.DealDuration.Min,
s.auctionFilters.DealDuration.Max)
}
return ""
}
// WinsHandler implements MessageHandler.
func (s *Service) WinsHandler(ctx context.Context, wb *pb.WinningBid) error {
bid, err := s.store.GetBid(ctx, auction.BidID(wb.BidId))
if err != nil {
return fmt.Errorf("getting bid: %v", err)
}
// request for some quota, which may be used or gets expired if not winning
// the auction.
granted := s.bytesLimiter.Request(wb.AuctionId, bid.DealSize, BidsExpiration)
if !granted {
return errWouldExceedRunningBytesLimit
}
decrypted, err := s.decryptKey.Decrypt(wb.Encrypted)
if err != nil {
return fmt.Errorf("decrypting sources: %v", err)
}
confidential := &pb.WinningBidConfidential{}
if err := proto.Unmarshal(decrypted, confidential); err != nil {
return fmt.Errorf("unmarshaling sources: %v", err)
}
sources, err := cast.SourcesFromPb(confidential.Sources)
if err != nil {
return fmt.Errorf("sources from pb: %v", err)
}
// Ensure we can fetch the data
dataURI, err := datauri.NewFromSources(bid.PayloadCid.String(), sources)
if err != nil {
return fmt.Errorf("parsing data uri: %v", err)
}
ctx, cancel := context.WithTimeout(s.ctx, dataURIValidateTimeout)
defer cancel()
if err := dataURI.Validate(ctx); err != nil {
return fmt.Errorf("validating data uri: %v", err)
}
if err := s.store.SetAwaitingProposalCid(ctx, auction.BidID(wb.BidId), sources); err != nil {
return fmt.Errorf("setting awaiting proposal cid: %v", err)
}
log.Infof("bid %s won in auction %s; awaiting proposal cid", wb.BidId, wb.AuctionId)
return nil
}
// ProposalsHandler implements MessageHandler.
func (s *Service) ProposalsHandler(ctx context.Context, prop *pb.WinningBidProposal) error {
if _, err := uuid.Parse(prop.DealUid); err == nil {
// Handle deal proposal made with Boost.
log.Infof("bid %s received deal uid %s in auction %s", prop.BidId, prop.DealUid, prop.AuctionId)
if err := s.store.SetDealUID(ctx, auction.BidID(prop.BidId), prop.DealUid); err != nil {
return fmt.Errorf("setting deal uuid: %v", err)
}
} else {
// Handle legacy deal proposal.
log.Infof("bid %s received proposal cid %s in auction %s", prop.BidId, prop.ProposalCid, prop.AuctionId)
pcid, err := cid.Decode(prop.ProposalCid)
if err != nil {
return fmt.Errorf("decoding proposal cid: %v", err)
}
if err := s.store.SetProposalCid(ctx, auction.BidID(prop.BidId), pcid); err != nil {
return fmt.Errorf("setting proposal cid: %v", err)
}
}
// ready to fetch data, so the requested quota is actually in use.
s.bytesLimiter.Secure(prop.AuctionId)
return nil
}
func (s *Service) reportStartup() {
_, unconfigured := s.pricingRules.(pricing.EmptyRules)
event := &pb.BidbotEvent{
Ts: timestamppb.New(time.Now()),
Type: &pb.BidbotEvent_Startup_{Startup: &pb.BidbotEvent_Startup{
SemanticVersion: buildinfo.GitSummary,
DealStartWindow: s.bidParams.DealStartWindow,
StorageProviderId: s.bidParams.StorageProviderID,
CidGravityConfigured: !unconfigured,
CidGravityStrict: s.pricingRulesStrict,
}},
}
s.commChannel.PublishBidbotEvent(s.ctx, event)
}
func (s *Service) healthChecks() func() {
tk := time.NewTicker(10 * time.Minute)
stop := make(chan struct{})
go func() {
for {
select {
case <-stop:
return
case <-tk.C:
if err := s.store.HealthCheck(); err != nil {
log.Errorf("store not healthy: %v", err)
s.reportUnhealthy(err)
}
}
}
}()
return func() { close(stop) }
}
func (s *Service) reportUnhealthy(err error) {
event := &pb.BidbotEvent{
Ts: timestamppb.New(time.Now()),
Type: &pb.BidbotEvent_Unhealthy_{Unhealthy: &pb.BidbotEvent_Unhealthy{
StorageProviderId: s.bidParams.StorageProviderID,
Error: err.Error(),
}},
}
s.commChannel.PublishBidbotEvent(s.ctx, event)
}