Skip to content

Commit

Permalink
Fixing connection usage (#3131)
Browse files Browse the repository at this point in the history
Fixes #3096
  • Loading branch information
sebastienros authored Feb 3, 2019
1 parent 7c5c773 commit abbc806
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 145 deletions.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ nav:
- Forms: OrchardCore.Modules/OrchardCore.Forms/README.md
- Admin Menu: OrchardCore.Modules/OrchardCore.AdminMenu/README.md
- Core Modules:
- Data: OrchardCore/OrchardCore.Data/README.md
- Dynamic Cache: OrchardCore.Modules/OrchardCore.DynamicCache/README.md
- GraphQL: OrchardCore.Modules/OrchardCore.Apis.GraphQL/README.md
- Localization: OrchardCore.Modules/OrchardCore.Localization/README.md
Expand Down
173 changes: 104 additions & 69 deletions src/OrchardCore.Modules/OrchardCore.Indexing/IndexingTaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,44 @@
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using OrchardCore.Modules;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OrchardCore.ContentManagement;
using OrchardCore.Data;
using OrchardCore.DeferredTasks;
using OrchardCore.Environment.Shell;
using OrchardCore.Modules;
using YesSql;

namespace OrchardCore.Indexing.Services
{
public class IndexingTaskManager : IIndexingTaskManager, IDisposable
/// <summary>
/// This is a scoped service that enlists tasks to be stored in the database.
/// It enlists a final task using the <see cref="IDeferredTaskEngine"/> such
/// that multiple calls to <see cref="CreateTaskAsync"/> can be done without incurring
/// a SQL query on every single one.
/// </summary>
public class IndexingTaskManager : IIndexingTaskManager
{
private readonly IClock _clock;
private readonly ISession _session;
private readonly IDeferredTaskEngine _deferredTaskEngine;
private readonly IDbConnectionAccessor _dbConnectionAccessor;
private readonly string _tablePrefix;
private readonly List<IndexingTask> _tasksQueue = new List<IndexingTask>();

public IndexingTaskManager(
ISession session,
IClock clock,
ShellSettings shellSettings,
IDeferredTaskEngine deferredTaskEngine,
IDbConnectionAccessor dbConnectionAccessor,
ILogger<IndexingTaskManager> logger)
{
_session = session;
_clock = clock;
_deferredTaskEngine = deferredTaskEngine;
_dbConnectionAccessor = dbConnectionAccessor;
Logger = logger;

_tablePrefix = session.Store.Configuration.TablePrefix;
_tablePrefix = shellSettings["TablePrefix"];
}

public ILogger Logger { get; set; }
Expand Down Expand Up @@ -54,103 +68,124 @@ public Task CreateTaskAsync(ContentItem contentItem, IndexingTaskTypes type)

lock (_tasksQueue)
{
if (_tasksQueue.Count == 0)
{
_deferredTaskEngine.AddTask(context => FlushAsync(context, _tasksQueue));
}

_tasksQueue.Add(indexingTask);
}

return Task.CompletedTask;
}

public void Dispose()
private static async Task FlushAsync(DeferredTaskContext context, IEnumerable<IndexingTask> tasks)
{
FlushAsync().Wait();
}
var localQueue = new List<IndexingTask>(tasks);

private async Task FlushAsync()
{
List<IndexingTask> localQueue;
var serviceProvider = context.ServiceProvider;

lock (_tasksQueue)
{
localQueue = new List<IndexingTask>(_tasksQueue);
}
var dbConnectionAccessor = serviceProvider.GetService<IDbConnectionAccessor>();
var shellSettings = serviceProvider.GetService<ShellSettings>();
var logger = serviceProvider.GetService<ILogger<IndexingTaskManager>>();
var tablePrefix = shellSettings["TablePrefix"];

var contentItemIds = new HashSet<string>();

if (!localQueue.Any())
// Remove duplicate tasks, only keep the last one
for (var i = localQueue.Count; i > 0; i--)
{
return;
var task = localQueue[i - 1];

if (contentItemIds.Contains(task.ContentItemId))
{
localQueue.RemoveAt(i - 1);
}
else
{
contentItemIds.Add(task.ContentItemId);
}
}

var transaction = await _session.DemandAsync();
var dialect = SqlDialectFactory.For(transaction.Connection);
// At this point, content items ids should be unique in localQueue
var ids = localQueue.Select(x => x.ContentItemId).ToArray();
var table = $"{tablePrefix}{nameof(IndexingTask)}";

try
using (var connection = dbConnectionAccessor.CreateConnection())
{
var contentItemIds = new HashSet<string>();
await connection.OpenAsync();

// Remove duplicate tasks, only keep the last one
for (var i = localQueue.Count; i > 0; i--)
using (var transaction = connection.BeginTransaction())
{
var task = localQueue[i - 1];
var dialect = SqlDialectFactory.For(transaction.Connection);

if (contentItemIds.Contains(task.ContentItemId))
try
{
localQueue.RemoveAt(i - 1);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Updating indexing tasks: {String.Join(", ", tasks.Select(x => x.ContentItemId))}");
}

// Page delete statements to prevent the limits from IN sql statements
var pageSize = 100;

var deleteCmd = $"delete from {dialect.QuoteForTableName(table)} where {dialect.QuoteForColumnName("ContentItemId")} {dialect.InOperator("@Ids")};";

do
{
var pageOfIds = ids.Take(pageSize).ToArray();

if (pageOfIds.Any())
{
await transaction.Connection.ExecuteAsync(deleteCmd, new { Ids = pageOfIds }, transaction);
ids = ids.Skip(pageSize).ToArray();
}

} while (ids.Any());

var insertCmd = $"insert into {dialect.QuoteForTableName(table)} ({dialect.QuoteForColumnName("CreatedUtc")}, {dialect.QuoteForColumnName("ContentItemId")}, {dialect.QuoteForColumnName("Type")}) values (@CreatedUtc, @ContentItemId, @Type);";
await transaction.Connection.ExecuteAsync(insertCmd, localQueue, transaction);

transaction.Commit();
}
else
catch (Exception e)
{
contentItemIds.Add(task.ContentItemId);
logger.LogError(e, "An error occured while updating indexing tasks");
throw;
}
}

// At this point, content items ids should be unique in _taskQueue
var ids = localQueue.Select(x => x.ContentItemId).ToArray();
var table = $"{_tablePrefix}{nameof(IndexingTask)}";

var deleteCmd = $"delete from {dialect.QuoteForTableName(table)} where {dialect.QuoteForColumnName("ContentItemId")} {dialect.InOperator("@Ids")};";
await transaction.Connection.ExecuteAsync(deleteCmd, new { Ids = ids }, transaction);

var insertCmd = $"insert into {dialect.QuoteForTableName(table)} ({dialect.QuoteForColumnName("CreatedUtc")}, {dialect.QuoteForColumnName("ContentItemId")}, {dialect.QuoteForColumnName("Type")}) values (@CreatedUtc, @ContentItemId, @Type);";
await transaction.Connection.ExecuteAsync(insertCmd, _tasksQueue, transaction);
}
catch(Exception e)
{
_session.Cancel();
Logger.LogError("An error occured while updating indexing tasks", e);
throw;
}

_tasksQueue.Clear();
}

public async Task<IEnumerable<IndexingTask>> GetIndexingTasksAsync(int afterTaskId, int count)
{
await FlushAsync();

var transaction = await _session.DemandAsync();

try
using (var connection = _dbConnectionAccessor.CreateConnection())
{
var dialect = SqlDialectFactory.For(transaction.Connection);
var sqlBuilder = dialect.CreateBuilder(_tablePrefix);
await connection.OpenAsync();

sqlBuilder.Select();
sqlBuilder.Table(nameof(IndexingTask));
sqlBuilder.Selector("*");

if (count > 0)
try
{
sqlBuilder.Take(count.ToString());
}
var dialect = SqlDialectFactory.For(connection);
var sqlBuilder = dialect.CreateBuilder(_tablePrefix);

sqlBuilder.WhereAlso($"{dialect.QuoteForColumnName("Id")} > @Id");
sqlBuilder.Select();
sqlBuilder.Table(nameof(IndexingTask));
sqlBuilder.Selector("*");

return await transaction.Connection.QueryAsync<IndexingTask>(sqlBuilder.ToSqlString(), new { Id = afterTaskId }, transaction);
}
catch (Exception e)
{
_session.Cancel();
if (count > 0)
{
sqlBuilder.Take(count.ToString());
}

sqlBuilder.WhereAlso($"{dialect.QuoteForColumnName("Id")} > @Id");

Logger.LogError(e, "An error occured while reading indexing tasks");
throw;
return await connection.QueryAsync<IndexingTask>(sqlBuilder.ToSqlString(), new { Id = afterTaskId });
}
catch (Exception e)
{
Logger.LogError(e, "An error occured while reading indexing tasks");
throw;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
using System;
using System.Data;
using System.Threading.Tasks;
using System.Data.Common;

namespace OrchardCore.Data.Abstractions
namespace OrchardCore.Data
{
public interface IDbConnectionAccessor
{
Task<IDbConnection> GetConnectionAsync();
DbConnection CreateConnection();
}
}
39 changes: 3 additions & 36 deletions src/OrchardCore/OrchardCore.Data/DbConnectionAccessor.cs
Original file line number Diff line number Diff line change
@@ -1,54 +1,21 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading.Tasks;
using OrchardCore.Data.Abstractions;
using YesSql;

namespace OrchardCore.Data
{
public class DbConnectionAccessor : IDbConnectionAccessor, IDisposable
public class DbConnectionAccessor : IDbConnectionAccessor
{
private readonly IStore _store;
private DbConnection _connection;
private bool _disposed = false;

public DbConnectionAccessor(IStore store)
{
_store = store ?? throw new ArgumentNullException(nameof(store));
}

public async Task<IDbConnection> GetConnectionAsync()
public DbConnection CreateConnection()
{
if (_connection == null)
{
_connection = _store.Configuration.ConnectionFactory.CreateConnection() as DbConnection;
}

if (_connection.State == ConnectionState.Closed)
{
await _connection?.OpenAsync();
}

return _connection;
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_connection.Dispose();
}
}
_disposed = true;
return _store.Configuration.ConnectionFactory.CreateConnection();
}
}
}
14 changes: 1 addition & 13 deletions src/OrchardCore/OrchardCore.Data/OrchardCoreBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Options;
using OrchardCore.Data;
using OrchardCore.Data.Abstractions;
using OrchardCore.Data.Migration;
using OrchardCore.Environment.Shell;
using OrchardCore.Environment.Shell.Models;
Expand All @@ -18,7 +17,6 @@
using YesSql.Provider.PostgreSql;
using YesSql.Provider.Sqlite;
using YesSql.Provider.SqlServer;
using YesSql.Services;

namespace Microsoft.Extensions.DependencyInjection
{
Expand Down Expand Up @@ -128,17 +126,7 @@ public static OrchardCoreBuilder AddDataAccess(this OrchardCoreBuilder builder)
return session;
});
services.AddScoped<IDbConnectionAccessor>(sp =>
{
var store = sp.GetService<IStore>();
if (store == null)
{
return null;
}
return new DbConnectionAccessor(store);
});
services.AddTransient<IDbConnectionAccessor, DbConnectionAccessor>();
});

return builder;
Expand Down
Loading

0 comments on commit abbc806

Please sign in to comment.