Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/NoWAL during OldBodies #6227

Merged
merged 9 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using FluentAssertions;
using Nethermind.Blockchain.Blocks;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Test;
using Nethermind.Core.Test.Builders;
using Nethermind.Serialization.Rlp;
Expand Down Expand Up @@ -33,6 +34,19 @@ public void Test_can_insert_get_and_remove_blocks(bool cached)
store.Get(block.Number, block.Hash!, cached).Should().BeNull();
}

[Test]
public void Test_insert_would_pass_in_writeflag()
{
TestMemDb db = new TestMemDb();
BlockStore store = new BlockStore(db);

Block block = Build.A.Block.WithNumber(1).TestObject;
store.Insert(block, WriteFlags.DisableWAL);

byte[] key = Bytes.Concat(block.Number.ToBigEndianByteArray(), block.Hash!.BytesToArray());
db.KeyWasWrittenWithFlags(key, WriteFlags.DisableWAL);
}

[TestCase(true)]
[TestCase(false)]
public void Test_can_get_block_that_was_stored_with_hash(bool cached)
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he
}

public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None)
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None)
{
bool skipCanAcceptNewBlocks = (insertBlockOptions & BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks) != 0;
if (!CanAcceptNewBlocks)
Expand All @@ -293,7 +293,7 @@ public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBloc
throw new InvalidOperationException("Genesis block should not be inserted.");
}

_blockStore.Insert(block);
_blockStore.Insert(block, writeFlags: blockWriteFlags);
_headerStore.InsertBlockNumber(block.Hash, block.Number);

bool saveHeader = (insertBlockOptions & BlockTreeInsertBlockOptions.SaveHeader) != 0;
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void SetMetadata(byte[] key, byte[] value)
return _blockDb.Get(key);
}

public void Insert(Block block)
public void Insert(Block block, WriteFlags writeFlags = WriteFlags.None)
{
if (block.Hash is null)
{
Expand All @@ -53,7 +53,7 @@ public void Insert(Block block)
// by avoiding encoding back to RLP here (allocations measured on a sample 3M blocks Goerli fast sync
using NettyRlpStream newRlp = _blockDecoder.EncodeToNewNettyStream(block);

_blockDb.Set(block.Number, block.Hash, newRlp.AsSpan());
_blockDb.Set(block.Number, block.Hash, newRlp.AsSpan(), writeFlags);
}

private static void GetBlockNumPrefixedKey(long blockNumber, Hash256 blockHash, Span<byte> output)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Nethermind.Blockchain.Blocks;
/// </summary>
public interface IBlockStore
{
void Insert(Block block);
void Insert(Block block, WriteFlags writeFlags = WriteFlags.None);
void Delete(long blockNumber, Hash256 blockHash);
Block? Get(long blockNumber, Hash256 blockHash, bool shouldCache = true);
ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash);
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/IBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public interface IBlockTree : IBlockFinder
/// <param name="block">Block to add</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None);
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags bodiesWriteFlags = WriteFlags.None);

void UpdateHeadBlock(Hash256 blockHash);

Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task Accept(IBlockTreeVisitor blockTreeVisitor, CancellationToken c
public ChainLevelInfo FindLevel(long number) => _wrapped.FindLevel(number);
public BlockInfo FindCanonicalBlockInfo(long blockNumber) => _wrapped.FindCanonicalBlockInfo(blockNumber);

public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None) =>
public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None) =>
throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls");

public void Insert(IEnumerable<Block> blocks) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls");
Expand Down
5 changes: 3 additions & 2 deletions src/Nethermind/Nethermind.Core.Test/TestMemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class TestMemDb : MemDb, ITunableDb
public Func<byte[], byte[]>? ReadFunc { get; set; }
public Action<byte[]>? RemoveFunc { get; set; }

public bool WasFlushed { get; set; }
public bool WasFlushed => FlushCount > 0;
public int FlushCount { get; set; } = 0;

[MethodImpl(MethodImplOptions.Synchronized)]
public override byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
Expand Down Expand Up @@ -107,6 +108,6 @@ public override IWriteBatch StartWriteBatch()

public override void Flush()
{
WasFlushed = true;
FlushCount++;
}
}
5 changes: 2 additions & 3 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ public Span<byte> GetSpan(ReadOnlySpan<byte> key)
return _rocksDb.GetSpan(key, _columnFamily);
}

public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags writeFlags = WriteFlags.None)
{
_mainDb.UpdateWriteMetrics();
_rocksDb.Put(key, value, _columnFamily, _mainDb.WriteOptions);
_mainDb.SetWithColumnFamily(key, _columnFamily, value, writeFlags);
}

public void DangerousReleaseMemory(in Span<byte> span) => _rocksDb.DangerousReleaseMemory(span);
Expand Down
23 changes: 4 additions & 19 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
SetWithColumnFamily(key, null, value, flags);
}

internal void SetWithColumnFamily(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf, byte[]? value, WriteFlags flags = WriteFlags.None)
internal void SetWithColumnFamily(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf, ReadOnlySpan<byte> value, WriteFlags flags = WriteFlags.None)
{
if (_isDisposing)
{
Expand All @@ -520,7 +520,7 @@ internal void SetWithColumnFamily(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf

try
{
if (value is null)
if (value.IsNull())
{
_db.Remove(key, cf, WriteFlagsToWriteOptions(flags));
}
Expand Down Expand Up @@ -596,24 +596,9 @@ public Span<byte> GetSpan(ReadOnlySpan<byte> key)
}
}

public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags writeFlags)
{
if (_isDisposing)
{
throw new ObjectDisposedException($"Attempted to write form a disposed database {Name}");
}

UpdateWriteMetrics();

try
{
_db.Put(key, value, null, WriteOptions);
}
catch (RocksDbSharpException e)
{
CreateMarkerIfCorrupt(e);
throw;
}
SetWithColumnFamily(key, null, value, writeFlags);
}

public void DangerousReleaseMemory(in Span<byte> span)
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public Span<byte> GetSpan(ReadOnlySpan<byte> key)
return Get(key);
}

public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags writeFlags)
{
Set(key, value.ToArray());
Set(key, value.ToArray(), writeFlags);
}

public void DangerousReleaseMemory(in Span<byte> span)
Expand Down
10 changes: 5 additions & 5 deletions src/Nethermind/Nethermind.Db/DbExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public static void Set(this IDb db, Hash256 key, Span<byte> value)
}
}

public static void Set(this IDb db, long blockNumber, Hash256 key, Span<byte> value)
public static void Set(this IDb db, long blockNumber, Hash256 key, Span<byte> value, WriteFlags writeFlags = WriteFlags.None)
{
Span<byte> blockNumberPrefixedKey = stackalloc byte[40];
GetBlockNumPrefixedKey(blockNumber, key, blockNumberPrefixedKey);
db.Set(blockNumberPrefixedKey, value);
db.Set(blockNumberPrefixedKey, value, writeFlags);
}

private static void GetBlockNumPrefixedKey(long blockNumber, ValueHash256 blockHash, Span<byte> output)
Expand All @@ -74,15 +74,15 @@ public static void Set(this IDb db, in ValueHash256 key, Span<byte> value)
}
}

public static void Set(this IDb db, ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
public static void Set(this IDb db, ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags writeFlags = WriteFlags.None)
{
if (db is IDbWithSpan dbWithSpan)
{
dbWithSpan.PutSpan(key, value);
dbWithSpan.PutSpan(key, value, flags: writeFlags);
}
else
{
db[key] = value.ToArray();
db.Set(key, value.ToArray(), flags: writeFlags);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Db/IDbWithSpan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using Nethermind.Core;
using Nethermind.Core.Buffers;
using Nethermind.Core.Extensions;

Expand All @@ -16,7 +17,7 @@ public interface IDbWithSpan : IDb
/// <param name="key"></param>
/// <returns>Can return null or empty Span on missing key</returns>
Span<byte> GetSpan(ReadOnlySpan<byte> key);
void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value);
void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags flags = WriteFlags.None);
void DangerousReleaseMemory(in Span<byte> span);
MemoryManager<byte>? GetOwnedMemory(ReadOnlySpan<byte> key)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Db/MemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ public virtual Span<byte> GetSpan(ReadOnlySpan<byte> key)
return Get(key).AsSpan();
}

public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags writeFlags)
{
Set(key, value.ToArray());
Set(key, value.ToArray(), writeFlags);
}

public void DangerousReleaseMemory(in Span<byte> span)
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Db/ReadOnlyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ public virtual void ClearTempChanges()
}

public Span<byte> GetSpan(ReadOnlySpan<byte> key) => _memDb.Get(key).AsSpan();
public void PutSpan(ReadOnlySpan<byte> keyBytes, ReadOnlySpan<byte> value)
public void PutSpan(ReadOnlySpan<byte> keyBytes, ReadOnlySpan<byte> value, WriteFlags writeFlags = WriteFlags.None)
{
if (!_createInMemWriteStore)
{
throw new InvalidOperationException($"This {nameof(ReadOnlyDb)} did not expect any writes.");
}

_memDb.Set(keyBytes, value.ToArray());
_memDb.Set(keyBytes, value.ToArray(), writeFlags);
}

public void DangerousReleaseMemory(in Span<byte> span) { }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Test;
using Nethermind.Core.Test.Builders;
using Nethermind.Logging;
using Nethermind.Synchronization.FastBlocks;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
using Nethermind.Synchronization.Reporting;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Synchronization.Test.FastBlocks;

public class BodiesSyncFeedTests
{
[Test]
public async Task ShouldCallFlushPeriodically()
{
BlockTree syncingFromBlockTree = Build.A.BlockTree()
.OfChainLength(100)
.TestObject;

TestMemDb blocksDb = new TestMemDb();
BlockTree syncingTooBlockTree = Build.A.BlockTree()
.WithBlocksDb(blocksDb)
.TestObject;

for (int i = 1; i < 100; i++)
{
Block block = syncingFromBlockTree.FindBlock(i, BlockTreeLookupOptions.None)!;
syncingTooBlockTree.Insert(block.Header);
}

Block pivot = syncingFromBlockTree.FindBlock(99, BlockTreeLookupOptions.None)!;

SyncConfig syncConfig = new SyncConfig()
{
FastSync = true,
PivotHash = pivot.Hash!.ToString(),
PivotNumber = pivot.Number.ToString(),
AncientBodiesBarrier = 0,
FastBlocks = true,
DownloadBodiesInFastSync = true,
};

BodiesSyncFeed syncFeed = new BodiesSyncFeed(
Substitute.For<ISyncModeSelector>(),
syncingTooBlockTree,
Substitute.For<ISyncPeerPool>(),
syncConfig,
new NullSyncReport(),
blocksDb,
LimboLogs.Instance,
flushDbInterval: 10
);

syncFeed.InitializeFeed();
BodiesSyncBatch req = (await syncFeed.PrepareRequest())!;
blocksDb.FlushCount.Should().Be(1);

async Task HandleAndPrepareNextRequest()
{
req.Response = new OwnedBlockBodies(req.Infos.Take(8).Select((info) =>
syncingFromBlockTree.FindBlock(info!.BlockNumber, BlockTreeLookupOptions.None)!.Body).ToArray());

syncFeed.HandleResponse(req);
req = (await syncFeed.PrepareRequest())!;
}

await HandleAndPrepareNextRequest();
blocksDb.FlushCount.Should().Be(1);

await HandleAndPrepareNextRequest();
blocksDb.FlushCount.Should().Be(2);

await HandleAndPrepareNextRequest();
blocksDb.FlushCount.Should().Be(2);

await HandleAndPrepareNextRequest();
blocksDb.FlushCount.Should().Be(3);
}
}
Loading