Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract HTTP ingestion API interactions and refactor #160

Merged
merged 7 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sample/Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ public static void Main()
Log.Logger = new LoggerConfiguration()
.MinimumLevel.ControlledBy(levelSwitch)
.WriteTo.Console()
.WriteTo.Seq("http://localhost:5341",
controlLevelSwitch: levelSwitch)
.WriteTo.Seq("http://localhost:5341", controlLevelSwitch: levelSwitch)
.CreateLogger();

Log.Information("Sample starting up");
Expand Down
29 changes: 16 additions & 13 deletions src/Serilog.Sinks.Seq/SeqLoggerConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
using Serilog.Sinks.Seq;
using System.Net.Http;
using Serilog.Sinks.PeriodicBatching;
using Serilog.Sinks.Seq.Batched;
using Serilog.Sinks.Seq.Audit;
using Serilog.Sinks.Seq.Http;

#if DURABLE
using Serilog.Sinks.Seq.Durable;
Expand All @@ -32,8 +34,12 @@ namespace Serilog
/// </summary>
public static class SeqLoggerConfigurationExtensions
{
const int DefaultBatchPostingLimit = 1000;
static readonly TimeSpan DefaultPeriod = TimeSpan.FromSeconds(2);
const int DefaultQueueSizeLimit = 100000;

/// <summary>
/// Adds a sink that writes log events to a <a href="https://getseq.net">Seq</a> server.
/// Write log events to a <a href="https://datalust.co/seq">Seq</a> server.
/// </summary>
/// <param name="loggerSinkConfiguration">The logger configuration.</param>
/// <param name="serverUrl">The base URL of the Seq server that log events will be written to.</param>
Expand Down Expand Up @@ -67,7 +73,7 @@ public static LoggerConfiguration Seq(
this LoggerSinkConfiguration loggerSinkConfiguration,
string serverUrl,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum,
int batchPostingLimit = SeqSink.DefaultBatchPostingLimit,
int batchPostingLimit = DefaultBatchPostingLimit,
TimeSpan? period = null,
string? apiKey = null,
string? bufferBaseFilename = null,
Expand All @@ -76,7 +82,7 @@ public static LoggerConfiguration Seq(
LoggingLevelSwitch? controlLevelSwitch = null,
HttpMessageHandler? messageHandler = null,
long? retainedInvalidPayloadsLimitBytes = null,
int queueSizeLimit = SeqSink.DefaultQueueSizeLimit)
int queueSizeLimit = DefaultQueueSizeLimit)
{
if (loggerSinkConfiguration == null) throw new ArgumentNullException(nameof(loggerSinkConfiguration));
if (serverUrl == null) throw new ArgumentNullException(nameof(serverUrl));
Expand All @@ -85,19 +91,17 @@ public static LoggerConfiguration Seq(
if (queueSizeLimit < 0)
throw new ArgumentOutOfRangeException(nameof(queueSizeLimit), "Queue size limit must be non-zero.");

var defaultedPeriod = period ?? SeqSink.DefaultPeriod;
var defaultedPeriod = period ?? DefaultPeriod;
var controlledSwitch = new ControlledLevelSwitch(controlLevelSwitch);

ILogEventSink sink;

if (bufferBaseFilename == null)
{
var batchedSink = new SeqSink(
serverUrl,
apiKey,
var batchedSink = new BatchedSeqSink(
new SeqIngestionApiClient(serverUrl, apiKey, messageHandler),
eventBodyLimitBytes,
controlledSwitch,
messageHandler);
controlledSwitch);

var options = new PeriodicBatchingSinkOptions
{
Expand Down Expand Up @@ -134,7 +138,7 @@ public static LoggerConfiguration Seq(
}

/// <summary>
/// Adds a sink that writes audit log events to a <a href="https://getseq.net">Seq</a> server. Auditing writes are
/// Write audit log events to a <a href="https://datalust.co/seq">Seq</a> server. Auditing writes are
/// synchronous and non-batched; any failures will propagate to the caller immediately as exceptions.
/// </summary>
/// <param name="loggerAuditSinkConfiguration">The logger configuration.</param>
Expand All @@ -155,9 +159,8 @@ public static LoggerConfiguration Seq(
if (loggerAuditSinkConfiguration == null) throw new ArgumentNullException(nameof(loggerAuditSinkConfiguration));
if (serverUrl == null) throw new ArgumentNullException(nameof(serverUrl));

return loggerAuditSinkConfiguration.Sink(
new SeqAuditSink(serverUrl, apiKey, messageHandler),
restrictedToMinimumLevel);
var ingestionApi = new SeqIngestionApiClient(serverUrl, apiKey, messageHandler);
return loggerAuditSinkConfiguration.Sink(new SeqAuditSink(ingestionApi), restrictedToMinimumLevel);
}
}
}
30 changes: 9 additions & 21 deletions src/Serilog.Sinks.Seq/Sinks/Seq/Audit/SeqAuditSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,32 @@

using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Debugging;
using Serilog.Events;
using Serilog.Formatting.Compact;
using Serilog.Formatting.Json;
using Serilog.Sinks.Seq.Http;

namespace Serilog.Sinks.Seq.Audit
{
/// <summary>
/// An <see cref="ILogEventSink"/> that synchronously propagates all <see cref="Emit"/> failures as exceptions.
/// </summary>
sealed class SeqAuditSink : ILogEventSink, IDisposable
{
readonly string? _apiKey;
readonly HttpClient _httpClient;
readonly SeqIngestionApi _ingestionApi;

static readonly JsonValueFormatter JsonValueFormatter = new("$type");

public SeqAuditSink(
string serverUrl,
string? apiKey,
HttpMessageHandler? messageHandler)
public SeqAuditSink(SeqIngestionApi ingestionApi)
{
if (serverUrl == null) throw new ArgumentNullException(nameof(serverUrl));
_httpClient = messageHandler != null ? new HttpClient(messageHandler) : new HttpClient();
_httpClient.BaseAddress = new Uri(SeqApi.NormalizeServerBaseAddress(serverUrl));
_apiKey = apiKey;
_ingestionApi = ingestionApi ?? throw new ArgumentNullException(nameof(ingestionApi));
}

public void Dispose()
{
_httpClient.Dispose();
_ingestionApi.Dispose();
}

public void Emit(LogEvent logEvent)
Expand All @@ -60,13 +54,7 @@ async Task EmitAsync(LogEvent logEvent)
var payload = new StringWriter();
CompactJsonFormatter.FormatEvent(logEvent, payload, JsonValueFormatter);

var content = new StringContent(payload.ToString(), Encoding.UTF8, SeqApi.CompactLogEventFormatMimeType);
if (!string.IsNullOrWhiteSpace(_apiKey))
content.Headers.Add(SeqApi.ApiKeyHeaderName, _apiKey);

var result = await _httpClient.PostAsync(SeqApi.BulkUploadResource, content).ConfigureAwait(false);
if (!result.IsSuccessStatusCode)
throw new LoggingFailedException($"Received failed result {result.StatusCode} when posting events to Seq.");
await _ingestionApi.IngestAsync(payload.ToString()).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Serilog.Sinks.Seq Copyright 2014-2019 Serilog Contributors
// Copyright © Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,48 +16,39 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Serilog.Debugging;
using Serilog.Events;
using Serilog.Sinks.PeriodicBatching;
using Serilog.Sinks.Seq.Http;

namespace Serilog.Sinks.Seq
namespace Serilog.Sinks.Seq.Batched
{
class SeqSink : IBatchedLogEventSink, IDisposable
/// <summary>
/// The default Seq sink, for use in combination with <see cref="PeriodicBatchingSink"/>.
/// </summary>
sealed class BatchedSeqSink : IBatchedLogEventSink, IDisposable
{
public const int DefaultBatchPostingLimit = 1000;
public static readonly TimeSpan DefaultPeriod = TimeSpan.FromSeconds(2);
public const int DefaultQueueSizeLimit = 100000;

static readonly TimeSpan RequiredLevelCheckInterval = TimeSpan.FromMinutes(2);

readonly string? _apiKey;
readonly ConstrainedBufferedFormatter _formatter;
readonly HttpClient _httpClient;
readonly SeqIngestionApi _ingestionApi;

DateTime _nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval);
readonly ControlledLevelSwitch _controlledSwitch;

public SeqSink(
string serverUrl,
string? apiKey,
public BatchedSeqSink(
SeqIngestionApi ingestionApi,
long? eventBodyLimitBytes,
ControlledLevelSwitch controlledSwitch,
HttpMessageHandler? messageHandler)
ControlledLevelSwitch controlledSwitch)
{
if (serverUrl == null) throw new ArgumentNullException(nameof(serverUrl));
_controlledSwitch = controlledSwitch ?? throw new ArgumentNullException(nameof(controlledSwitch));
_apiKey = apiKey;
_httpClient = messageHandler != null ? new HttpClient(messageHandler) : new HttpClient();
_httpClient.BaseAddress = new Uri(SeqApi.NormalizeServerBaseAddress(serverUrl));
_formatter = new ConstrainedBufferedFormatter(eventBodyLimitBytes);
_ingestionApi = ingestionApi ?? throw new ArgumentNullException(nameof(ingestionApi));
}

public void Dispose()
{
_httpClient.Dispose();
_ingestionApi.Dispose();
}

// The sink must emit at least one event on startup, and the server be
Expand All @@ -81,16 +72,11 @@ public async Task EmitBatchAsync(IEnumerable<LogEvent> events)
_formatter.Format(evt, payload);
}

var content = new StringContent(payload.ToString(), Encoding.UTF8, SeqApi.CompactLogEventFormatMimeType);
if (!string.IsNullOrWhiteSpace(_apiKey))
content.Headers.Add(SeqApi.ApiKeyHeaderName, _apiKey);

var result = await _httpClient.PostAsync(SeqApi.BulkUploadResource, content).ConfigureAwait(false);
if (!result.IsSuccessStatusCode)
throw new LoggingFailedException($"Received failed result {result.StatusCode} when posting events to Seq");
var clefPayload = payload.ToString();

var minimumAcceptedLevel = await _ingestionApi.IngestAsync(clefPayload);

var returned = await result.Content.ReadAsStringAsync();
_controlledSwitch.Update(SeqApi.ReadEventInputResult(returned));
_controlledSwitch.Update(minimumAcceptedLevel);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ namespace Serilog.Sinks.Seq
/// Wraps a <see cref="CompactJsonFormatter" /> to suppress formatting errors and apply the event body size
/// limit, if any. Placeholder events are logged when an event is unable to be written itself.
/// </summary>
class ConstrainedBufferedFormatter : ITextFormatter
sealed class ConstrainedBufferedFormatter : ITextFormatter
{
static readonly int NewLineByteCount = Encoding.UTF8.GetByteCount(Environment.NewLine);

readonly long? _eventBodyLimitBytes;
readonly CompactJsonFormatter _jsonFormatter = new CompactJsonFormatter(new JsonValueFormatter("$type"));
readonly CompactJsonFormatter _jsonFormatter = new(new JsonValueFormatter("$type"));

public ConstrainedBufferedFormatter(long? eventBodyLimitBytes)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Serilog.Sinks.Seq/Sinks/Seq/ControlledLevelSwitch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Serilog.Sinks.Seq
/// timer thread. An exception is <see cref="IsIncluded(LogEvent)"/>, which may be called
/// concurrently but performs no synchronization.
/// </summary>
class ControlledLevelSwitch
sealed class ControlledLevelSwitch
{
// If non-null, then background level checks will be performed; set either through the constructor
// or in response to a level specification from the server. Never set to null after being made non-null.
Expand Down
7 changes: 3 additions & 4 deletions src/Serilog.Sinks.Seq/Sinks/Seq/Durable/DurableSeqSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
using Serilog.Events;
using System.Net.Http;
using System.Text;
using Serilog.Sinks.Seq.Http;

namespace Serilog.Sinks.Seq.Durable
{
class DurableSeqSink : ILogEventSink, IDisposable
sealed class DurableSeqSink : ILogEventSink, IDisposable
{
readonly HttpLogShipper _shipper;
readonly Logger _sink;
Expand All @@ -46,13 +47,11 @@ public DurableSeqSink(

_shipper = new HttpLogShipper(
fileSet,
serverUrl,
apiKey,
new SeqIngestionApiClient(serverUrl, apiKey, messageHandler),
batchPostingLimit,
period,
eventBodyLimitBytes,
controlledSwitch,
messageHandler,
retainedInvalidPayloadsLimitBytes,
bufferSizeLimitBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

using System;

namespace Serilog.Sinks.Seq
namespace Serilog.Sinks.Seq.Durable
{
/// <summary>
/// Based on the BatchedConnectionStatus class from <see cref="Serilog.Sinks.PeriodicBatching.PeriodicBatchingSink"/>.
/// </summary>
class ExponentialBackoffConnectionSchedule
sealed class ExponentialBackoffConnectionSchedule
{
static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5);
static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(10);
Expand Down
2 changes: 1 addition & 1 deletion src/Serilog.Sinks.Seq/Sinks/Seq/Durable/FileSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace Serilog.Sinks.Seq.Durable
{
class FileSet
sealed class FileSet
{
readonly string _bookmarkFilename;
readonly string _candidateSearchPath;
Expand Down
Loading