Skip to content
This repository has been archived by the owner on Jul 7, 2024. It is now read-only.

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrise committed May 1, 2022
1 parent 4178ad0 commit 3fbf73c
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 65 deletions.
2 changes: 1 addition & 1 deletion refs/core
Submodule core updated from 1c8195 to fa94fc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Omnius.Axis.Engines.Internal.Models;
using Omnius.Axis.Models;

namespace Omnius.Axis.Engines.Internal.Entities;

internal record CachedNodeLocationEntity
{
public NodeLocationEntity? Value { get; set; }

public DateTime CreationTime { get; set; }

public DateTime LastConnectionTime { get; set; }

public static CachedNodeLocationEntity Import(CachedNodeLocation value)
{
return new CachedNodeLocationEntity()
{
Value = NodeLocationEntity.Import(value.Value),
CreationTime = value.CreationTime,
LastConnectionTime = value.LastConnectionTime,
};
}

public CachedNodeLocation Export()
{
return new CachedNodeLocation(this.Value?.Export() ?? NodeLocation.Empty, this.CreationTime, this.LastConnectionTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Omnius.Axis.Models;

namespace Omnius.Axis.Engines.Internal.Models;

internal record CachedNodeLocation
{
public CachedNodeLocation(NodeLocation value, DateTime creationTime, DateTime lastConnectionTime)
{
this.Value = value;
this.CreationTime = creationTime;
this.LastConnectionTime = lastConnectionTime;
}

public NodeLocation Value { get; }

public DateTime CreationTime { get; }

public DateTime LastConnectionTime { get; }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using LiteDB;
using Omnius.Axis.Engines.Internal.Entities;
using Omnius.Axis.Models;
using Omnius.Axis.Utils;
using Omnius.Core;
using Omnius.Core.Helpers;

Expand All @@ -17,7 +18,7 @@ public NodeFinderRepository(string dirPath)
_database = new LiteDatabase(Path.Combine(dirPath, "lite.db"));
_database.UtcDate = true;

this.NodeLocations = new NodeLocationRepository(_database);
this.NodeLocations = new CachedNodeLocationRepository(_database);
}

protected override void OnDispose(bool disposing)
Expand All @@ -30,17 +31,17 @@ public async ValueTask MigrateAsync(CancellationToken cancellationToken = defaul
await this.NodeLocations.MigrateAsync(cancellationToken);
}

public NodeLocationRepository NodeLocations { get; }
public CachedNodeLocationRepository NodeLocations { get; }

public sealed class NodeLocationRepository
public sealed class CachedNodeLocationRepository
{
private const string CollectionName = "node_locations";
private const string CollectionName = "cached_node_locations";

private readonly LiteDatabase _database;

private readonly object _lockObject = new();

public NodeLocationRepository(LiteDatabase database)
public CachedNodeLocationRepository(LiteDatabase database)
{
_database = database;
}
Expand All @@ -49,31 +50,87 @@ internal async ValueTask MigrateAsync(CancellationToken cancellationToken = defa
{
lock (_lockObject)
{
if (_database.GetDocumentVersion(CollectionName) <= 0)
{
var col = this.GetCollection();
col.EnsureIndex(x => x.Value, true);
}

_database.SetDocumentVersion(CollectionName, 1);
}
}

private ILiteCollection<NodeLocationEntity> GetCollection()
private ILiteCollection<CachedNodeLocationEntity> GetCollection()
{
var col = _database.GetCollection<NodeLocationEntity>(CollectionName);
var col = _database.GetCollection<CachedNodeLocationEntity>(CollectionName);
return col;
}

public IEnumerable<NodeLocation> Load()
public IEnumerable<NodeLocation> FindAll()
{
lock (_lockObject)
{
var col = this.GetCollection();
return col.FindAll().Select(n => n.Export()).ToArray();
return col.FindAll().Where(n => n.Value != null).Select(n => n.Value!.Export()).ToArray();
}
}

public void Save(IEnumerable<NodeLocation> nodeLocations)
public bool TryInsert(NodeLocation value, DateTime creationTime)
{
lock (_lockObject)
{
var itemEntity = new CachedNodeLocationEntity()
{
Value = NodeLocationEntity.Import(value),
CreationTime = creationTime,
LastConnectionTime = DateTime.MinValue
};

var col = this.GetCollection();
col.DeleteAll();
col.InsertBulk(nodeLocations.Select(n => NodeLocationEntity.Import(n)));

if (col.Exists(n => n.Value == itemEntity.Value)) return false;

col.Insert(itemEntity);
return true;
}
}

public void Upsert(NodeLocation value, DateTime creationTime, DateTime lastConnectionTime)
{
lock (_lockObject)
{
var itemEntity = new CachedNodeLocationEntity()
{
Value = NodeLocationEntity.Import(value),
CreationTime = creationTime,
LastConnectionTime = lastConnectionTime
};

var col = this.GetCollection();

_database.BeginTrans();

col.DeleteMany(n => n.Value == itemEntity.Value);
col.Insert(itemEntity);

_database.Commit();
}
}

public void TrimExcess(int capacity)
{
lock (_lockObject)
{
var col = this.GetCollection();

_database.BeginTrans();

foreach (var extra in col.FindAll().OrderBy(n => n.CreationTime).OrderByDescending(n => n.LastConnectionTime).Skip(capacity).ToArray())
{
col.DeleteMany(n => n.Value == extra.Value);
}

_database.Commit();
}
}
}
Expand Down
47 changes: 5 additions & 42 deletions src/Omnius.Axis.Engines/Implementations/NodeFinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public sealed partial class NodeFinder : AsyncDisposableBase, INodeFinder

private readonly byte[] _myId;

private readonly LinkedList<NodeLocation> _cloudNodeLocations = new();
private readonly VolatileHashSet<OmniAddress> _connectedAddressSet;
private ImmutableHashSet<SessionStatus> _sessionStatusSet = ImmutableHashSet<SessionStatus>.Empty;

Expand Down Expand Up @@ -78,7 +77,7 @@ private NodeFinder(ISessionConnector sessionConnector, ISessionAccepter sessionA

_events = new Events(_getPushContentCluesFuncPipe.Listener, _getWantContentCluesFuncPipe.Listener);

_connectedAddressSet = new VolatileHashSet<OmniAddress>(TimeSpan.FromMinutes(3), TimeSpan.FromSeconds(30), _batchActionDispatcher);
_connectedAddressSet = new VolatileHashSet<OmniAddress>(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(10), _batchActionDispatcher);

_receivedPushContentLocationMap = new VolatileListDictionary<ContentClue, NodeLocation>(TimeSpan.FromMinutes(30), TimeSpan.FromSeconds(30), _batchActionDispatcher);
_receivedGiveContentLocationMap = new VolatileListDictionary<ContentClue, NodeLocation>(TimeSpan.FromMinutes(30), TimeSpan.FromSeconds(30), _batchActionDispatcher);
Expand All @@ -88,11 +87,6 @@ private async ValueTask InitAsync(CancellationToken cancellationToken = default)
{
await _nodeFinderRepo.MigrateAsync(cancellationToken);

foreach (var nodeLocation in _nodeFinderRepo.NodeLocations.Load())
{
_cloudNodeLocations.AddLast(nodeLocation);
}

_connectLoopTask = this.ConnectLoopAsync(_cancellationTokenSource.Token);
_acceptLoopTask = this.AcceptLoopAsync(_cancellationTokenSource.Token);
_sendLoopTask = this.SendLoopAsync(_cancellationTokenSource.Token);
Expand Down Expand Up @@ -155,7 +149,7 @@ public async ValueTask<IEnumerable<NodeLocation>> GetCloudNodeLocationsAsync(Can
{
lock (_lockObject)
{
return _cloudNodeLocations.ToArray();
return _nodeFinderRepo.NodeLocations.FindAll();
}
}

Expand All @@ -165,9 +159,7 @@ public async ValueTask AddCloudNodeLocationsAsync(IEnumerable<NodeLocation> node
{
foreach (var nodeLocation in nodeLocations)
{
if (_cloudNodeLocations.Count >= 2048) return;
if (_cloudNodeLocations.Any(n => n.Addresses.Any(m => nodeLocation.Addresses.Contains(m)))) continue;
_cloudNodeLocations.AddLast(nodeLocation);
_nodeFinderRepo.NodeLocations.TryInsert(nodeLocation, DateTime.UtcNow);
}
}
}
Expand All @@ -193,21 +185,7 @@ private void RefreshCloudNodeLocation(NodeLocation nodeLocation)
{
lock (_lockObject)
{
_cloudNodeLocations.RemoveAll(n => n.Addresses.Any(m => nodeLocation.Addresses.Contains(m)));
_cloudNodeLocations.AddFirst(nodeLocation);
}
}

private bool RemoveCloudNodeLocation(NodeLocation nodeLocation)
{
lock (_lockObject)
{
if (_cloudNodeLocations.Count >= 1024)
{
_cloudNodeLocations.Remove(nodeLocation);
}

return false;
_nodeFinderRepo.NodeLocations.Upsert(nodeLocation, DateTime.UtcNow, DateTime.UtcNow);
}
}

Expand Down Expand Up @@ -259,7 +237,6 @@ private async ValueTask<bool> TryConnectAsync(NodeLocation nodeLocation, Cancell
return true;
}

this.RemoveCloudNodeLocation(nodeLocation);
return false;
}
catch (OperationCanceledException e)
Expand All @@ -276,7 +253,7 @@ private async ValueTask<bool> TryConnectAsync(NodeLocation nodeLocation, Cancell

private async ValueTask<IEnumerable<NodeLocation>> FindNodeLocationsForConnecting(CancellationToken cancellationToken = default)
{
var nodeLocations = _cloudNodeLocations.ToArray();
var nodeLocations = _nodeFinderRepo.NodeLocations.FindAll().ToList();
_random.Shuffle(nodeLocations);

var ignoredAddressSet = await this.GetIgnoredAddressSet(cancellationToken);
Expand Down Expand Up @@ -550,11 +527,6 @@ private async Task ComputeLoopAsync(CancellationToken cancellationToken)

await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken).ConfigureAwait(false);

if (saveCloudNodeLocationsStopwatch.TryRestartIfElapsedOrStopped(TimeSpan.FromMinutes(5)))
{
await this.SaveCloudNodeLocationsAsync(cancellationToken);
}

if (!trimDeadSessionsStopwatch.TryRestartIfElapsedOrStopped(TimeSpan.FromMinutes(1)))
{
await this.TrimDeadSessionsAsync(cancellationToken);
Expand All @@ -576,15 +548,6 @@ private async Task ComputeLoopAsync(CancellationToken cancellationToken)
}
}

private async ValueTask SaveCloudNodeLocationsAsync(CancellationToken cancellationToken = default)
{
var results = new List<NodeLocation>();
results.AddRange(_sessionStatusSet.Select(n => n.NodeLocation));
results.AddRange(_cloudNodeLocations);

_nodeFinderRepo.NodeLocations.Save(results);
}

private async ValueTask TrimDeadSessionsAsync(CancellationToken cancellationToken = default)
{
var now = DateTime.UtcNow;
Expand Down
11 changes: 1 addition & 10 deletions src/Omnius.Axis.Models/_RocketPack/NodeLocation.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
using System.Text;

namespace Omnius.Axis.Models;

public sealed partial class NodeLocation
{
public override string ToString()
{
var sb = new StringBuilder();

foreach (var a in this.Addresses)
{
sb.AppendLine(a.ToString());
}

return sb.ToString();
return string.Join(",", this.Addresses);
}
}

0 comments on commit 3fbf73c

Please sign in to comment.