Skip to content

Commit

Permalink
Merge branch 'master' into tsachi/add_snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
tsachiherman committed Sep 17, 2024
2 parents 806db46 + 99d2b9a commit 550ed9c
Show file tree
Hide file tree
Showing 80 changed files with 2,283 additions and 155 deletions.
32 changes: 24 additions & 8 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/ava-labs/avalanchego/node"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/perms"
"github.com/ava-labs/avalanchego/utils/ulimit"
Expand Down Expand Up @@ -71,6 +72,7 @@ func New(config node.Config) (App, error) {

n, err := node.New(&config, logFactory, log)
if err != nil {
log.Fatal("failed to initialize node", zap.Error(err))
log.Stop()
logFactory.Close()
return nil, fmt.Errorf("failed to initialize node: %w", err)
Expand All @@ -89,26 +91,40 @@ func Run(app App) int {
return 1
}

// register signals to kill the application
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT)
signal.Notify(signals, syscall.SIGTERM)
// register terminationSignals to kill the application
terminationSignals := make(chan os.Signal, 1)
signal.Notify(terminationSignals, syscall.SIGINT, syscall.SIGTERM)

stackTraceSignal := make(chan os.Signal, 1)
signal.Notify(stackTraceSignal, syscall.SIGABRT)

// start up a new go routine to handle attempts to kill the application
var eg errgroup.Group
eg.Go(func() error {
for range signals {
for range terminationSignals {
return app.Stop()
}
return nil
})

// start a goroutine to listen on SIGABRT signals,
// to print the stack trace to standard error.
go func() {
for range stackTraceSignal {
fmt.Fprint(os.Stderr, utils.GetStacktrace(true))
}
}()

// wait for the app to exit and get the exit code response
exitCode, err := app.ExitCode()

// shut down the signal go routine
signal.Stop(signals)
close(signals)
// shut down the termination signal go routine
signal.Stop(terminationSignals)
close(terminationSignals)

// shut down the stack trace go routine
signal.Stop(stackTraceSignal)
close(stackTraceSignal)

// if there was an error closing or running the application, report that error
if eg.Wait() != nil || err != nil {
Expand Down
3 changes: 1 addition & 2 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ var (
ErrMaxSliceLenExceeded = errors.New("max slice length exceeded")
ErrDoesNotImplementInterface = errors.New("does not implement interface")
ErrUnexportedField = errors.New("unexported field")
ErrExtraSpace = errors.New("trailing buffer space")
ErrMarshalZeroLength = errors.New("can't marshal zero length value")
ErrUnmarshalZeroLength = errors.New("can't unmarshal zero length value")
)

// Codec marshals and unmarshals
type Codec interface {
MarshalInto(interface{}, *wrappers.Packer) error
Unmarshal([]byte, interface{}) error
UnmarshalFrom(*wrappers.Packer, interface{}) error

// Returns the size, in bytes, of [value] when it's marshaled
Size(value interface{}) (int, error)
Expand Down
34 changes: 34 additions & 0 deletions codec/codectest/codectest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/utils/wrappers"

codecpkg "github.com/ava-labs/avalanchego/codec"
)

Expand Down Expand Up @@ -75,6 +77,7 @@ var (
{"Slice Length Overflow", TestSliceLengthOverflow},
{"Map", TestMap},
{"Can Marshal Large Slices", TestCanMarshalLargeSlices},
{"Implements UnmarshalFrom", TestImplementsUnmarshalFrom},
}

MultipleTagsTests = []NamedTest{
Expand Down Expand Up @@ -1153,3 +1156,34 @@ func FuzzStructUnmarshal(codec codecpkg.GeneralCodec, f *testing.F) {
require.Len(bytes, size)
})
}

func TestImplementsUnmarshalFrom(t testing.TB, codec codecpkg.GeneralCodec) {
require := require.New(t)

p := wrappers.Packer{MaxSize: 1024}
p.PackFixedBytes([]byte{0, 1, 2}) // pack 3 extra bytes prefix

mySlice := []bool{true, false, true, true}

require.NoError(codec.MarshalInto(mySlice, &p))

p.PackFixedBytes([]byte{7, 7, 7}) // pack 3 extra bytes suffix

bytesLen, err := codec.Size(mySlice)
require.NoError(err)
require.Equal(3+bytesLen+3, p.Offset)

p = wrappers.Packer{Bytes: p.Bytes, MaxSize: p.MaxSize, Offset: 3}

var sliceUnmarshaled []bool
require.NoError(codec.UnmarshalFrom(&p, &sliceUnmarshaled))
require.Equal(mySlice, sliceUnmarshaled)
require.Equal(
wrappers.Packer{
Bytes: p.Bytes,
MaxSize: p.MaxSize,
Offset: 11,
},
p,
)
}
15 changes: 14 additions & 1 deletion codec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
ErrCantPackVersion = errors.New("couldn't pack codec version")
ErrCantUnpackVersion = errors.New("couldn't unpack codec version")
ErrDuplicatedVersion = errors.New("duplicated codec version")
ErrExtraSpace = errors.New("trailing buffer space")
)

var _ Manager = (*manager)(nil)
Expand Down Expand Up @@ -157,5 +158,17 @@ func (m *manager) Unmarshal(bytes []byte, dest interface{}) (uint16, error) {
if !exists {
return version, ErrUnknownVersion
}
return version, c.Unmarshal(p.Bytes[p.Offset:], dest)

if err := c.UnmarshalFrom(&p, dest); err != nil {
return version, err
}
if p.Offset != len(bytes) {
return version, fmt.Errorf("%w: read %d provided %d",
ErrExtraSpace,
p.Offset,
len(bytes),
)
}

return version, nil
}
19 changes: 3 additions & 16 deletions codec/reflectcodec/type_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,31 +496,18 @@ func (c *genericCodec) marshal(
}
}

// Unmarshal unmarshals [bytes] into [dest], where [dest] must be a pointer or
// UnmarshalFrom unmarshals [p.Bytes] into [dest], where [dest] must be a pointer or
// interface
func (c *genericCodec) Unmarshal(bytes []byte, dest interface{}) error {
func (c *genericCodec) UnmarshalFrom(p *wrappers.Packer, dest interface{}) error {
if dest == nil {
return codec.ErrUnmarshalNil
}

p := wrappers.Packer{
Bytes: bytes,
}
destPtr := reflect.ValueOf(dest)
if destPtr.Kind() != reflect.Ptr {
return errNeedPointer
}
if err := c.unmarshal(&p, destPtr.Elem(), nil /*=typeStack*/); err != nil {
return err
}
if p.Offset != len(bytes) {
return fmt.Errorf("%w: read %d provided %d",
codec.ErrExtraSpace,
p.Offset,
len(bytes),
)
}
return nil
return c.unmarshal(p, destPtr.Elem(), nil /*=typeStack*/)
}

// Unmarshal from p.Bytes into [value]. [value] must be addressable.
Expand Down
13 changes: 9 additions & 4 deletions scripts/build_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set -euo pipefail

# e.g.,
# ./scripts/build_image.sh # Build local single-arch image
# SKIP_BUILD_RACE=1 ./scripts/build_image.sh # Build local single-arch image but skip building -r image
# DOCKER_IMAGE=myavalanchego ./scripts/build_image.sh # Build local single arch image with a custom image name
# DOCKER_IMAGE=avaplatform/avalanchego ./scripts/build_image.sh # Build and push multi-arch image to docker hub
# DOCKER_IMAGE=localhost:5001/avalanchego ./scripts/build_image.sh # Build and push multi-arch image to private registry
Expand All @@ -25,10 +26,12 @@ set -euo pipefail
# Directory above this script
AVALANCHE_PATH=$( cd "$( dirname "${BASH_SOURCE[0]}" )"; cd .. && pwd )

SKIP_BUILD_RACE="${SKIP_BUILD_RACE:-}"

# Load the constants
source "$AVALANCHE_PATH"/scripts/constants.sh

if [[ $image_tag == *"-r" ]]; then
if [[ -z "${SKIP_BUILD_RACE}" && $image_tag == *"-r" ]]; then
echo "Branch name must not end in '-r'"
exit 1
fi
Expand Down Expand Up @@ -84,9 +87,11 @@ echo "Building Docker Image with tags: $DOCKER_IMAGE:$commit_hash , $DOCKER_IMAG
${DOCKER_CMD} -t "$DOCKER_IMAGE:$commit_hash" -t "$DOCKER_IMAGE:$image_tag" \
"$AVALANCHE_PATH" -f "$AVALANCHE_PATH/Dockerfile"

echo "Building Docker Image with tags: $DOCKER_IMAGE:$commit_hash-r , $DOCKER_IMAGE:$image_tag-r"
${DOCKER_CMD} --build-arg="RACE_FLAG=-r" -t "$DOCKER_IMAGE:$commit_hash-r" -t "$DOCKER_IMAGE:$image_tag-r" \
"$AVALANCHE_PATH" -f "$AVALANCHE_PATH/Dockerfile"
if [[ -z "${SKIP_BUILD_RACE}" ]]; then
echo "Building Docker Image with tags (race detector): $DOCKER_IMAGE:$commit_hash-r , $DOCKER_IMAGE:$image_tag-r"
${DOCKER_CMD} --build-arg="RACE_FLAG=-r" -t "$DOCKER_IMAGE:$commit_hash-r" -t "$DOCKER_IMAGE:$image_tag-r" \
"$AVALANCHE_PATH" -f "$AVALANCHE_PATH/Dockerfile"
fi

# Only tag the latest image for the master branch when images are pushed to a registry
if [[ "${DOCKER_IMAGE}" == *"/"* && $image_tag == "master" ]]; then
Expand Down
30 changes: 26 additions & 4 deletions snow/engine/snowman/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"github.com/ava-labs/avalanchego/utils/units"
)

const nonVerifiedCacheSize = 64 * units.MiB
const (
nonVerifiedCacheSize = 64 * units.MiB
errInsufficientStake = "insufficient connected stake"
)

var _ common.Engine = (*Engine)(nil)

Expand Down Expand Up @@ -587,7 +590,7 @@ func (e *Engine) sendChits(ctx context.Context, nodeID ids.NodeID, requestID uin
// Because we only return accepted state here, it's fairly likely
// that the requested height is higher than the last accepted block.
// That means that this code path is actually quite common.
e.Ctx.Log.Debug("failed fetching accepted block",
e.Ctx.Log.Debug("unable to retrieve accepted block",
zap.Stringer("nodeID", nodeID),
zap.Uint64("requestedHeight", requestedHeight),
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
Expand Down Expand Up @@ -615,7 +618,7 @@ func (e *Engine) sendChits(ctx context.Context, nodeID ids.NodeID, requestID uin
// Because it is possible for a byzantine node to spam requests at
// old heights on a pruning network, we log this as debug. However,
// this case is unexpected to be hit by correct peers.
e.Ctx.Log.Debug("failed fetching accepted block",
e.Ctx.Log.Debug("unable to retrieve accepted block",
zap.Stringer("nodeID", nodeID),
zap.Uint64("requestedHeight", requestedHeight),
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
Expand All @@ -630,7 +633,7 @@ func (e *Engine) sendChits(ctx context.Context, nodeID ids.NodeID, requestID uin
var ok bool
preferenceAtHeight, ok = e.Consensus.PreferenceAtHeight(requestedHeight)
if !ok {
e.Ctx.Log.Debug("failed fetching processing block",
e.Ctx.Log.Debug("processing block not found",
zap.Stringer("nodeID", nodeID),
zap.Uint64("requestedHeight", requestedHeight),
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
Expand Down Expand Up @@ -871,6 +874,10 @@ func (e *Engine) sendQuery(
blkBytes []byte,
push bool,
) {
if e.abortDueToInsufficientConnectedStake(blkID) {
return
}

e.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", e.Validators),
)
Expand Down Expand Up @@ -916,6 +923,21 @@ func (e *Engine) sendQuery(
}
}

func (e *Engine) abortDueToInsufficientConnectedStake(blkID ids.ID) bool {
stakeConnectedRatio := e.Config.ConnectedValidators.ConnectedPercent()
minConnectedStakeToQuery := float64(e.Params.AlphaConfidence) / float64(e.Params.K)

if stakeConnectedRatio < minConnectedStakeToQuery {
e.Ctx.Log.Debug("dropped query for block",
zap.String("reason", errInsufficientStake),
zap.Stringer("blkID", blkID),
zap.Float64("ratio", stakeConnectedRatio),
)
return true
}
return false
}

// issue [blk] to consensus
// If [push] is true, a push query will be used. Otherwise, a pull query will be
// used.
Expand Down
39 changes: 39 additions & 0 deletions snow/engine/snowman/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/snow/consensus/snowman/snowmantest"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/common/tracker"
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
"github.com/ava-labs/avalanchego/snow/engine/snowman/ancestor"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block/blocktest"
"github.com/ava-labs/avalanchego/snow/engine/snowman/getter"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)
Expand Down Expand Up @@ -3182,3 +3184,40 @@ func TestShouldIssueBlock(t *testing.T) {
})
}
}

type mockConnVDR struct {
tracker.Peers
percent float64
}

func (m *mockConnVDR) ConnectedPercent() float64 {
return m.percent
}

type logBuffer struct {
bytes.Buffer
}

func (logBuffer) Close() error {
return nil
}

func TestEngineAbortQueryWhenInPartition(t *testing.T) {
require := require.New(t)

// Buffer to record the log entries
buff := logBuffer{}

conf := DefaultConfig(t)
// Overwrite the log to record what it says
conf.Ctx.Log = logging.NewLogger("", logging.NewWrappedCore(logging.Verbo, &buff, logging.Plain.ConsoleEncoder()))
conf.Params = snowball.DefaultParameters
conf.ConnectedValidators = &mockConnVDR{percent: 0.7, Peers: conf.ConnectedValidators}

_, _, _, _, engine := setup(t, conf)

// Gossip will cause a pull query if enough stake is connected
engine.sendQuery(context.Background(), ids.ID{}, nil, false)

require.Contains(buff.String(), errInsufficientStake)
}
3 changes: 3 additions & 0 deletions snow/networking/router/chain_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ava-labs/avalanchego/snow/networking/benchlist"
"github.com/ava-labs/avalanchego/snow/networking/handler"
"github.com/ava-labs/avalanchego/snow/networking/timeout"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/linked"
"github.com/ava-labs/avalanchego/utils/logging"
Expand Down Expand Up @@ -377,6 +378,7 @@ func (cr *ChainRouter) Shutdown(ctx context.Context) {
chainLog := chain.Context().Log
if err != nil {
chainLog.Warn("timed out while shutting down",
zap.String("stack", utils.GetStacktrace(true)),
zap.Error(err),
)
} else {
Expand Down Expand Up @@ -700,6 +702,7 @@ func (cr *ChainRouter) removeChain(ctx context.Context, chainID ids.ID) {
chainLog := chain.Context().Log
if err != nil {
chainLog.Warn("timed out while shutting down",
zap.String("stack", utils.GetStacktrace(true)),
zap.Error(err),
)
} else {
Expand Down
Loading

0 comments on commit 550ed9c

Please sign in to comment.