Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Jan 20, 2025
1 parent b9fbe92 commit 0207dcb
Show file tree
Hide file tree
Showing 2 changed files with 386 additions and 0 deletions.
105 changes: 105 additions & 0 deletions pkg/services/meta/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package meta

import (
"errors"
"fmt"
"sync"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
)

const (
// raw storage prefixes
rootKey = 0x00
)

// TODO
type ContainerLister interface {
// TODO
List() (map[cid.ID]struct{}, error)
}

// Meta TODO
type Meta struct {
l *zap.Logger
stopCh chan struct{}

cnrH util.Uint160
actualContainers map[cid.ID]struct{}
cLister ContainerLister

m sync.RWMutex
ws *rpcclient.WSClient
bCh chan *block.Header
aerCh chan *state.ContainedNotificationEvent

mpt *mpt.Trie
st storage.Store
}

// New TODO
func New(l *zap.Logger, cLister ContainerLister, ws *rpcclient.WSClient, containerH util.Uint160, path string) (*Meta, error) {
st, err := storage.NewBoltDBStore(dbconfig.BoltDBOptions{FilePath: path, ReadOnly: false})
if err != nil {
return nil, fmt.Errorf("open bolt store: %w", err)
}

Check warning on line 53 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L49-L53

Added lines #L49 - L53 were not covered by tests

var prevRootNode mpt.Node
root, err := st.Get([]byte{rootKey})
if !errors.Is(err, storage.ErrKeyNotFound) {
if err != nil {
return nil, fmt.Errorf("get state root from db: %w", err)
}

Check warning on line 60 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L55-L60

Added lines #L55 - L60 were not covered by tests

if len(root) != util.Uint256Size {
return nil, fmt.Errorf("root hash from db is %d bytes long, expect %d", len(root), util.Uint256Size)
}

Check warning on line 64 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L62-L64

Added lines #L62 - L64 were not covered by tests

prevRootNode = mpt.NewHashNode([util.Uint256Size]byte(root))

Check warning on line 66 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L66

Added line #L66 was not covered by tests
}

mTrie := mpt.NewTrie(prevRootNode, mpt.ModeGC, storage.NewMemCachedStore(st))

return &Meta{
l: l,
stopCh: make(chan struct{}),
cnrH: containerH,
cLister: cLister,
ws: ws,
bCh: make(chan *block.Header),
aerCh: make(chan *state.ContainedNotificationEvent),
mpt: mTrie,
st: st}, nil

Check warning on line 80 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L69-L80

Added lines #L69 - L80 were not covered by tests
}

// Start TODO
func (m *Meta) Start() error {
var err error
m.actualContainers, err = m.cLister.List()
if err != nil {
return fmt.Errorf("fetch node's containers: %w", err)
}

Check warning on line 89 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L84-L89

Added lines #L84 - L89 were not covered by tests

err = m.subscribeForMeta()
if err != nil {
return fmt.Errorf("subscribe for meta notifications: %w", err)
}

Check warning on line 94 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L91-L94

Added lines #L91 - L94 were not covered by tests

go m.listenNotifications()

return nil

Check warning on line 98 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L96-L98

Added lines #L96 - L98 were not covered by tests
}

// TODO
func (m *Meta) Stop() error {
close(m.stopCh)
return nil

Check warning on line 104 in pkg/services/meta/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/meta.go#L102-L104

Added lines #L102 - L104 were not covered by tests
}
281 changes: 281 additions & 0 deletions pkg/services/meta/notifications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package meta

import (
"crypto/sha256"
"encoding/binary"
"fmt"
"math/big"
"slices"
"time"

"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectsdk "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

func (m *Meta) subscribeForMeta() error {
_, err := m.ws.ReceiveHeadersOfAddedBlocks(nil, m.bCh)
if err != nil {
return fmt.Errorf("subscribe for block headers: %w", err)
}

Check warning on line 26 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L22-L26

Added lines #L22 - L26 were not covered by tests

evName := "ObjectPut"
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &evName}, m.aerCh)
if err != nil {
return fmt.Errorf("subscribe for object notifications: %w", err)
}

Check warning on line 32 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L28-L32

Added lines #L28 - L32 were not covered by tests

// TODO: catch deleted containers too?

return nil

Check warning on line 36 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L36

Added line #L36 was not covered by tests
}

func (m *Meta) listenNotifications() {
for {
select {
case h := <-m.bCh:
m.m.Lock()
err := m.handleBlock(h.Index)
if err != nil {
m.l.Error(fmt.Sprintf("processing %d block", h.Index), zap.Error(err))
}
m.m.Unlock()
case aer := <-m.aerCh:
// TODO: https://github.com/nspcc-dev/neo-go/issues/3779 receive somehow notifications from blocks
ev, err := parseNotification(aer)
if err != nil {
m.l.Error("invalid notification received", zap.Stringer("notificaiton container", aer.Container), zap.Error(err))
continue

Check warning on line 54 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L39-L54

Added lines #L39 - L54 were not covered by tests
}
if _, ok := m.actualContainers[ev.cID]; !ok {
m.l.Debug("skipping notification", zap.Stringer("notification container", aer.Container))
continue

Check warning on line 58 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L56-L58

Added lines #L56 - L58 were not covered by tests
}
err = m.handleEvent(ev)
if err != nil {
m.l.Error("handling notification", zap.Stringer("notification container", aer.Container), zap.Error(err))
}
case <-m.stopCh:
m.l.Info("stop listening meta notifications")
return

Check warning on line 66 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L60-L66

Added lines #L60 - L66 were not covered by tests
}
}
}

const (
gcInterval = 100
collapseDepth = 10
)

// handleBlock must be called with global mutex taken
func (m *Meta) handleBlock(i uint32) error {
m.mpt.Flush(i)
m.mpt.Collapse(collapseDepth)
stateRoot := m.mpt.StateRoot()
m.mpt.Store.Put([]byte{rootKey}, stateRoot[:])

_, err := m.mpt.Store.PersistSync()
if err != nil {
return fmt.Errorf("persisting %d block: %w", i, err)
}

Check warning on line 86 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L77-L86

Added lines #L77 - L86 were not covered by tests

if i/gcInterval == 0 {
err := m.gc(i)
if err != nil {
m.l.Warn(fmt.Sprintf("gc cycle for %d block", i), zap.Error(err))
}

Check warning on line 92 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L88-L92

Added lines #L88 - L92 were not covered by tests
}

m.actualContainers, err = m.cLister.List()
if err != nil {
return fmt.Errorf("list containers: %w", err)
}

Check warning on line 98 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L95-L98

Added lines #L95 - L98 were not covered by tests

// TODO drop containers that node does not belong to anymore?

return nil

Check warning on line 102 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L102

Added line #L102 was not covered by tests
}

func (m *Meta) gc(index uint32) error {
start := time.Now()

var removed int
var stored int
err := m.st.SeekGC(storage.SeekRange{
// TODO: what prefix?
Prefix: []byte{byte(storage.DataMPT)},
}, func(k, v []byte) bool {
stored++
if !mpt.IsActiveValue(v) {
h := binary.LittleEndian.Uint32(v[len(v)-4:])
if h <= index {
removed++
stored--
return false
}

Check warning on line 121 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L105-L121

Added lines #L105 - L121 were not covered by tests
}
return true

Check warning on line 123 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L123

Added line #L123 was not covered by tests
})
if err != nil {
return err
}

Check warning on line 127 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L125-L127

Added lines #L125 - L127 were not covered by tests

took := time.Since(start)
m.l.Debug("cleaned up garbage", zap.Uint32("block", index), zap.Duration("took time", took),
zap.Int("removed", removed), zap.Int("stored", stored))

return nil

Check warning on line 133 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L129-L133

Added lines #L129 - L133 were not covered by tests
}

type event struct {
cID cid.ID
oID oid.ID
size *big.Int
network *big.Int
firstObject []byte
prevObject []byte
deletedObjects []byte
lockedObjects []byte
typ objectsdk.Type
}

const (
// from meta data map but also used in MPT
cidIndex = iota
oidIndex
sizeIndex
vubIndex
networkMagicIndex
firstPart
previousPart
deletedIndex
lockedIndex
typeIndex
)

func parseNotification(ev *state.ContainedNotificationEvent) (event, error) {
const expectedNotificationArgs = 3
const minRequiredMetaFields = 5
var res event

arr, ok := ev.Item.Value().([]stackitem.Item)
if !ok {
return res, fmt.Errorf("unexpected notification stack item: %T", ev.Item.Value())
}
if len(arr) == expectedNotificationArgs {
return res, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs)
}

Check warning on line 173 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L162-L173

Added lines #L162 - L173 were not covered by tests

cID, ok := arr[0].Value().([]byte)
if !ok {
return res, fmt.Errorf("unexpected container ID stack item: %T", arr[0].Value())
}
oID, ok := arr[1].Value().([]byte)
if !ok {
return res, fmt.Errorf("unexpected object ID stack item: %T", arr[1].Value())
}
meta, ok := arr[2].Value().([]stackitem.MapElement)
if !ok {
return res, fmt.Errorf("unexpected meta stack item: %T", arr[2].Value())
}

Check warning on line 186 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L175-L186

Added lines #L175 - L186 were not covered by tests

if len(cID) != sha256.Size {
return res, fmt.Errorf("unexpected container ID len: %d", len(cID))
}
if len(oID) != sha256.Size {
return res, fmt.Errorf("unexpected object ID len: %d", len(oID))
}
if len(meta) < minRequiredMetaFields {
return res, fmt.Errorf("unexpected number of meta fields: %d, expected at least: %d", len(meta), minRequiredMetaFields)
}

Check warning on line 196 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L188-L196

Added lines #L188 - L196 were not covered by tests

res.cID = cid.ID(cID)
res.oID = oid.ID(oID)
res.size, ok = meta[sizeIndex].Value.Value().(*big.Int)
if !ok {
return res, fmt.Errorf("unexpected object size type: %T", meta[sizeIndex].Value.Value())
}
res.network, ok = meta[networkMagicIndex].Value.Value().(*big.Int)
if !ok {
return res, fmt.Errorf("unexpected network type: %T", meta[networkMagicIndex].Value.Value())
}
if len(meta) >= firstPart {
res.firstObject, ok = meta[firstPart].Value.Value().([]byte)
return res, fmt.Errorf("unexpected first part type: %T", meta[firstPart].Value.Value())
}
if len(meta) >= previousPart {
res.prevObject, ok = meta[previousPart].Value.Value().([]byte)
return res, fmt.Errorf("unexpected previous part type: %T", meta[previousPart].Value.Value())
}
if len(meta) >= typeIndex {
typ, ok := meta[typeIndex].Value.Value().(*big.Int)
if !ok {
return res, fmt.Errorf("unexpected type of field: %T", meta[typeIndex].Value.Value())
}
res.typ = objectsdk.Type(typ.Uint64())

if len(meta) >= deletedIndex {
if res.typ != objectsdk.TypeTombstone {
return res, fmt.Errorf("found deleted objects for non-TS object type: %d", res.typ)
}

Check warning on line 226 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L198-L226

Added lines #L198 - L226 were not covered by tests

res.deletedObjects, ok = meta[deletedIndex].Value.Value().([]byte)
if !ok {
return res, fmt.Errorf("unexpected deleted objects type: %T", meta[deletedIndex].Value.Value())
}

Check warning on line 231 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L228-L231

Added lines #L228 - L231 were not covered by tests
}
if len(meta) >= lockedIndex {
if res.typ != objectsdk.TypeLock {
return res, fmt.Errorf("found locked objects for non-LOCK object type: %d", res.typ)
}

Check warning on line 236 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L233-L236

Added lines #L233 - L236 were not covered by tests

res.lockedObjects, ok = meta[lockedIndex].Value.Value().([]byte)
if !ok {
return res, fmt.Errorf("unexpected locked objects type: %T", meta[lockedIndex].Value.Value())
}

Check warning on line 241 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L238-L241

Added lines #L238 - L241 were not covered by tests
}
}

return res, nil

Check warning on line 245 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L245

Added line #L245 was not covered by tests
}

func (m *Meta) handleEvent(e event) error {
// TODO check network magic?
_ = e.network

bMap := make(map[string][]byte)
commPrefix := slices.Concat(e.cID[:], e.oID[:])

bMap[string(append([]byte{oidIndex}, commPrefix...))] = []byte{1}
bMap[string(append([]byte{sizeIndex}, commPrefix...))] = e.size.Bytes()
if len(e.firstObject) == sha256.Size {
bMap[string(append([]byte{firstPart}, commPrefix...))] = e.firstObject
}
if len(e.prevObject) == sha256.Size {
bMap[string(append([]byte{previousPart}, commPrefix...))] = e.prevObject
}
if len(e.deletedObjects) > 0 {
bMap[string(append([]byte{deletedIndex}, commPrefix...))] = e.deletedObjects
}
if len(e.lockedObjects) > 0 {
bMap[string(append([]byte{lockedIndex}, commPrefix...))] = e.lockedObjects
}
if e.typ != objectsdk.TypeRegular {
bMap[string(append([]byte{typeIndex}, commPrefix...))] = []byte{byte(e.typ)}
}

Check warning on line 271 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L248-L271

Added lines #L248 - L271 were not covered by tests

b := mpt.MapToMPTBatch(bMap)

_, err := m.mpt.PutBatch(b)
if err != nil {
return fmt.Errorf("MPT batch put: %w", err)
}

Check warning on line 278 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L273-L278

Added lines #L273 - L278 were not covered by tests

return nil

Check warning on line 280 in pkg/services/meta/notifications.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/meta/notifications.go#L280

Added line #L280 was not covered by tests
}

0 comments on commit 0207dcb

Please sign in to comment.