Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: use atomic type added in go1.19 #26820

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions accounts/keystore/keystore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,19 +397,19 @@ func TestImportRace(t *testing.T) {
t.Fatalf("failed to export account: %v", acc)
}
_, ks2 := tmpKeyStore(t, true)
var atom uint32
var atom atomic.Uint32
var wg sync.WaitGroup
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
if _, err := ks2.Import(json, "new", "new"); err != nil {
atomic.AddUint32(&atom, 1)
atom.Add(1)
}
}()
}
wg.Wait()
if atom != 1 {
if atom.Load() != 1 {
t.Errorf("Import is racy")
}
}
Expand Down
12 changes: 6 additions & 6 deletions accounts/usbwallet/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type Hub struct {
stateLock sync.RWMutex // Protects the internals of the hub from racey access

// TODO(karalabe): remove if hotplug lands on Windows
commsPend int // Number of operations blocking enumeration
commsLock sync.Mutex // Lock protecting the pending counter and enumeration
enumFails uint32 // Number of times enumeration has failed
commsPend int // Number of operations blocking enumeration
commsLock sync.Mutex // Lock protecting the pending counter and enumeration
enumFails atomic.Uint32 // Number of times enumeration has failed
}

// NewLedgerHub creates a new hardware wallet manager for Ledger devices.
Expand Down Expand Up @@ -151,7 +151,7 @@ func (hub *Hub) refreshWallets() {
return
}
// If USB enumeration is continually failing, don't keep trying indefinitely
if atomic.LoadUint32(&hub.enumFails) > 2 {
if hub.enumFails.Load() > 2 {
return
}
// Retrieve the current list of USB wallet devices
Expand All @@ -172,7 +172,7 @@ func (hub *Hub) refreshWallets() {
}
infos, err := usb.Enumerate(hub.vendorID, 0)
if err != nil {
failcount := atomic.AddUint32(&hub.enumFails, 1)
failcount := hub.enumFails.Add(1)
if runtime.GOOS == "linux" {
// See rationale before the enumeration why this is needed and only on Linux.
hub.commsLock.Unlock()
Expand All @@ -181,7 +181,7 @@ func (hub *Hub) refreshWallets() {
"vendor", hub.vendorID, "failcount", failcount, "err", err)
return
}
atomic.StoreUint32(&hub.enumFails, 0)
hub.enumFails.Store(0)

for _, info := range infos {
for _, id := range hub.productIDs {
Expand Down
6 changes: 3 additions & 3 deletions cmd/geth/attach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestRemoteDbWithHeaders(t *testing.T) {
}

func testReceiveHeaders(t *testing.T, ln net.Listener, gethArgs ...string) {
var ok uint32
var ok atomic.Bool
server := &http.Server{
Addr: "localhost:0",
Handler: &testHandler{func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -72,12 +72,12 @@ func testReceiveHeaders(t *testing.T, ln net.Listener, gethArgs ...string) {
if have, want := r.Header.Get("second"), "two"; have != want {
t.Fatalf("missing header, have %v want %v", have, want)
}
atomic.StoreUint32(&ok, 1)
ok.Store(true)
}}}
go server.Serve(ln)
defer server.Close()
runGeth(t, gethArgs...).WaitExit()
if atomic.LoadUint32(&ok) != 1 {
if !ok.Load() {
t.Fatal("Test fail, expected invocation to succeed")
}
}
14 changes: 7 additions & 7 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,16 @@ func importChain(ctx *cli.Context) error {
defer db.Close()

// Start periodically gathering memory profiles
var peakMemAlloc, peakMemSys uint64
var peakMemAlloc, peakMemSys atomic.Uint64
go func() {
stats := new(runtime.MemStats)
for {
runtime.ReadMemStats(stats)
if atomic.LoadUint64(&peakMemAlloc) < stats.Alloc {
atomic.StoreUint64(&peakMemAlloc, stats.Alloc)
if peakMemAlloc.Load() < stats.Alloc {
peakMemAlloc.Store(stats.Alloc)
}
if atomic.LoadUint64(&peakMemSys) < stats.Sys {
atomic.StoreUint64(&peakMemSys, stats.Sys)
if peakMemSys.Load() < stats.Sys {
peakMemSys.Store(stats.Sys)
}
time.Sleep(5 * time.Second)
}
Expand Down Expand Up @@ -303,8 +303,8 @@ func importChain(ctx *cli.Context) error {
mem := new(runtime.MemStats)
runtime.ReadMemStats(mem)

fmt.Printf("Object memory: %.3f MB current, %.3f MB peak\n", float64(mem.Alloc)/1024/1024, float64(atomic.LoadUint64(&peakMemAlloc))/1024/1024)
fmt.Printf("System memory: %.3f MB current, %.3f MB peak\n", float64(mem.Sys)/1024/1024, float64(atomic.LoadUint64(&peakMemSys))/1024/1024)
fmt.Printf("Object memory: %.3f MB current, %.3f MB peak\n", float64(mem.Alloc)/1024/1024, float64(peakMemAlloc.Load())/1024/1024)
fmt.Printf("System memory: %.3f MB current, %.3f MB peak\n", float64(mem.Sys)/1024/1024, float64(peakMemSys.Load())/1024/1024)
fmt.Printf("Allocations: %.3f million\n", float64(mem.Mallocs)/1000000)
fmt.Printf("GC pause: %v\n\n", time.Duration(mem.PauseTotalNs))

Expand Down
4 changes: 2 additions & 2 deletions cmd/geth/les_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func ipcEndpoint(ipcPath, datadir string) string {
// but windows require pipes to sit in "\\.\pipe\". Therefore, to run several
// nodes simultaneously, we need to distinguish between them, which we do by
// the pipe filename instead of folder.
var nextIPC = uint32(0)
var nextIPC atomic.Uint32

func startGethWithIpc(t *testing.T, name string, args ...string) *gethrpc {
ipcName := fmt.Sprintf("geth-%d.ipc", atomic.AddUint32(&nextIPC, 1))
ipcName := fmt.Sprintf("geth-%d.ipc", nextIPC.Add(1))
args = append([]string{"--networkid=42", "--port=0", "--authrpc.port", "0", "--ipcpath", ipcName}, args...)
t.Logf("Starting %v with rpc: %v", name, args)

Expand Down
12 changes: 6 additions & 6 deletions consensus/ethash/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func generateCache(dest []uint32, epoch uint64, seed []byte) {
rows := int(size) / hashBytes

// Start a monitoring goroutine to report progress on low end devices
var progress uint32
var progress atomic.Uint32

done := make(chan struct{})
defer close(done)
Expand All @@ -174,7 +174,7 @@ func generateCache(dest []uint32, epoch uint64, seed []byte) {
case <-done:
return
case <-time.After(3 * time.Second):
logger.Info("Generating ethash verification cache", "percentage", atomic.LoadUint32(&progress)*100/uint32(rows)/(cacheRounds+1), "elapsed", common.PrettyDuration(time.Since(start)))
logger.Info("Generating ethash verification cache", "percentage", progress.Load()*100/uint32(rows)/(cacheRounds+1), "elapsed", common.PrettyDuration(time.Since(start)))
}
}
}()
Expand All @@ -185,7 +185,7 @@ func generateCache(dest []uint32, epoch uint64, seed []byte) {
keccak512(cache, seed)
for offset := uint64(hashBytes); offset < size; offset += hashBytes {
keccak512(cache[offset:], cache[offset-hashBytes:offset])
atomic.AddUint32(&progress, 1)
progress.Add(1)
}
// Use a low-round version of randmemohash
temp := make([]byte, hashBytes)
Expand All @@ -200,7 +200,7 @@ func generateCache(dest []uint32, epoch uint64, seed []byte) {
bitutil.XORBytes(temp, cache[srcOff:srcOff+hashBytes], cache[xorOff:xorOff+hashBytes])
keccak512(cache[dstOff:], temp)

atomic.AddUint32(&progress, 1)
progress.Add(1)
}
}
// Swap the byte order on big endian systems and return
Expand Down Expand Up @@ -299,7 +299,7 @@ func generateDataset(dest []uint32, epoch uint64, cache []uint32) {
var pend sync.WaitGroup
pend.Add(threads)

var progress uint64
var progress atomic.Uint64
for i := 0; i < threads; i++ {
go func(id int) {
defer pend.Done()
Expand All @@ -323,7 +323,7 @@ func generateDataset(dest []uint32, epoch uint64, cache []uint32) {
}
copy(dataset[index*hashBytes:], item)

if status := atomic.AddUint64(&progress, 1); status%percent == 0 {
if status := progress.Add(1); status%percent == 0 {
logger.Info("Generating DAG in progress", "percentage", (status*100)/(size/hashBytes), "elapsed", common.PrettyDuration(time.Since(start)))
}
}
Expand Down
16 changes: 8 additions & 8 deletions consensus/ethash/ethash.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,12 @@ func (c *cache) finalizer() {

// dataset wraps an ethash dataset with some metadata to allow easier concurrent use.
type dataset struct {
epoch uint64 // Epoch for which this cache is relevant
dump *os.File // File descriptor of the memory mapped cache
mmap mmap.MMap // Memory map itself to unmap before releasing
dataset []uint32 // The actual cache data content
once sync.Once // Ensures the cache is generated only once
done uint32 // Atomic flag to determine generation status
epoch uint64 // Epoch for which this cache is relevant
dump *os.File // File descriptor of the memory mapped cache
mmap mmap.MMap // Memory map itself to unmap before releasing
dataset []uint32 // The actual cache data content
once sync.Once // Ensures the cache is generated only once
done atomic.Bool // Atomic flag to determine generation status
}

// newDataset creates a new ethash mining dataset and returns it as a plain Go
Expand All @@ -326,7 +326,7 @@ func newDataset(epoch uint64) *dataset {
func (d *dataset) generate(dir string, limit int, lock bool, test bool) {
d.once.Do(func() {
// Mark the dataset generated after we're done. This is needed for remote
defer atomic.StoreUint32(&d.done, 1)
defer d.done.Store(true)

csize := cacheSize(d.epoch*epochLength + 1)
dsize := datasetSize(d.epoch*epochLength + 1)
Expand Down Expand Up @@ -390,7 +390,7 @@ func (d *dataset) generate(dir string, limit int, lock bool, test bool) {
// or not (it may not have been started at all). This is useful for remote miners
// to default to verification caches instead of blocking on DAG generations.
func (d *dataset) generated() bool {
return atomic.LoadUint32(&d.done) == 1
return d.done.Load()
}

// finalizer closes any file handlers and memory maps open.
Expand Down
31 changes: 16 additions & 15 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type BlockChain struct {
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval int64 // Time interval (processing time) after which to flush a state
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

Expand Down Expand Up @@ -215,8 +215,8 @@ type BlockChain struct {

wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing
running atomic.Bool // 0 if chain is running, 1 when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
Expand Down Expand Up @@ -260,7 +260,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
flushInterval: atomic.Int64{},
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
Expand All @@ -273,6 +273,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
engine: engine,
vmConfig: vmConfig,
}
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
bc.forker = NewForkChoice(bc, shouldPreserve)
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
Expand Down Expand Up @@ -916,7 +917,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// This method has been exposed to allow tests to stop the blockchain while simulating
// a crash.
func (bc *BlockChain) stopWithoutSaving() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
if !bc.running.CompareAndSwap(false, true) {
return
}

Expand Down Expand Up @@ -998,12 +999,12 @@ func (bc *BlockChain) Stop() {
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.procInterrupt.Store(true)
}

// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
return bc.procInterrupt.Load()
}

func (bc *BlockChain) procFutureBlocks() {
Expand Down Expand Up @@ -1382,7 +1383,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory
flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
// If the header is missing (canonical chain behind), we're reorging a low
Expand Down Expand Up @@ -1735,16 +1736,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)

go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *atomic.Bool) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(interrupt) == 1 {
if interrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), followup, throwaway, &followupInterrupt)
Expand All @@ -1756,7 +1757,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
return it.index, err
}

Expand All @@ -1777,7 +1778,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
substart = time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
return it.index, err
}
proctime := time.Since(start)
Expand All @@ -1796,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
}
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
if err != nil {
return it.index, err
}
Expand Down Expand Up @@ -2496,5 +2497,5 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
// The interval is in terms of block processing time, not wall clock.
// It is thread-safe and can be called repeatedly without side effects.
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
atomic.StoreInt64(&bc.flushInterval, int64(interval))
bc.flushInterval.Store(int64(interval))
}
6 changes: 3 additions & 3 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Matcher struct {
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries

running uint32 // Atomic flag whether a session is live or not
running atomic.Bool // Atomic flag whether a session is live or not
}

// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
Expand Down Expand Up @@ -146,10 +146,10 @@ func (m *Matcher) addScheduler(idx uint) {
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer atomic.StoreUint32(&m.running, 0)
defer m.running.Store(false)

// Initiate a new matching round
session := &MatcherSession{
Expand Down
Loading