forked from cosh/syslogToKusto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
266 lines (210 loc) · 11.4 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
using Microsoft.Extensions.Configuration;
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Microsoft.ApplicationInsights.Extensibility;
using System.Diagnostics;
using System.Text;
using Kusto.Data;
using Kusto.Ingest;
using System.Net.Sockets;
using System.Net;
using System.Text.Json;
namespace syslogToKusto
{
internal class Program
{
private static readonly JsonSerializerOptions serializerOptions = new JsonSerializerOptions();
static void Main(string[] args)
{
Settings settings = GetSettings();
serializerOptions.WriteIndented = false;
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Warning)
.AddFilter("System", LogLevel.Warning)
.AddFilter("syslogToKusto", LogLevel.Debug)
.AddConsole();
if (!String.IsNullOrWhiteSpace(settings.APPINSIGHTS_CONNECTIONSTRING))
{
builder.AddApplicationInsights(
telemetryConf => { telemetryConf.ConnectionString = settings.APPINSIGHTS_CONNECTIONSTRING; },
loggerOptions => { });
}
});
ILogger logger = loggerFactory.CreateLogger<Program>();
var tasks = new List<Task>();
var ingestionJobs = new ConcurrentQueue<IngestionJob>();
var messages = new ConcurrentQueue<string>();
tasks.Add(Task.Run(() => Batch(logger, settings.BatchSettings, messages, ingestionJobs)));
tasks.Add(Task.Run(() => Listen(logger, settings.ListenPort, messages, settings.SyslogServerName)));
tasks.Add(Task.Run(() => SendToKusto(logger, ingestionJobs, settings.Kusto)));
logger.LogInformation($"Created {tasks.Count} tasks to work on ingestion of syslog data into kusto");
Task.WaitAll(tasks.ToArray());
}
private static async Task Listen(ILogger logger, int listenPort, ConcurrentQueue<string> messages, string syslogServerName)
{
using var udpSocket = new Socket(SocketType.Dgram, ProtocolType.Udp);
udpSocket.Bind(new IPEndPoint(IPAddress.Any, listenPort));
logger.LogInformation($"Listening {listenPort}");
await DoReceiveAsync(logger, udpSocket, messages, syslogServerName);
}
private static async Task DoReceiveAsync(ILogger logger, Socket udpSocket, ConcurrentQueue<string> messages, string syslogServerName)
{
byte[] buffer = GC.AllocateArray<byte>(length: 1000, pinned: true);
Memory<byte> bufferMem = buffer.AsMemory();
string syslogMessage;
while (true)
{
try
{
var endpoint = new IPEndPoint(IPAddress.Any, 0);
var result = await udpSocket.ReceiveFromAsync(bufferMem, SocketFlags.None, endpoint);
syslogMessage = CreateMessageForKusto(Encoding.UTF8.GetString(bufferMem.ToArray(), 0, result.ReceivedBytes), result, syslogServerName);
messages.Enqueue(syslogMessage);
}
catch (Exception e)
{
logger.LogError(e, "Error receiving message from socket");
}
}
}
private static string CreateMessageForKusto(string payload, SocketReceiveFromResult result, string syslogServerName)
{
MessageForKusto helper = new MessageForKusto();
helper.Payload = payload.Trim();
helper.ReceivedBytes = result.ReceivedBytes;
IPEndPoint endpoint = result.RemoteEndPoint as IPEndPoint;
if(endpoint != null) {
helper.RemoteEndPoint = new EndpointInfo() { AddressFamily = endpoint.Address.AddressFamily, Address = endpoint.Address.ToString(), Port = endpoint.Port };
}
helper.SyslogServerName = syslogServerName;
return JsonSerializer.Serialize(helper, serializerOptions);
}
private static void Batch(ILogger logger,
SettingsBatching batchSettings, ConcurrentQueue<string> messages, ConcurrentQueue<IngestionJob> ingestionJobs)
{
var timeLimit = batchSettings.BatchLimitInMinutes * 60 * 1000;
var sw = Stopwatch.StartNew();
StringBuilder sb = new StringBuilder();
int eventCount = 0;
while (true)
{
String syslogEvent;
while (messages.TryDequeue(out syslogEvent))
{
//still something to add
sb.AppendLine(syslogEvent);
eventCount++;
}
//nothing to add any longer, enough for an ingestion command?
if (eventCount > batchSettings.BatchLimitNumberOfEvents || sw.ElapsedMilliseconds > timeLimit)
{
if (eventCount > 0)
{
string tempFile = Path.GetTempFileName();
File.WriteAllText(tempFile, sb.ToString());
ingestionJobs.Enqueue(new IngestionJob() { ToBeIngested = tempFile, BatchInfo = batchSettings });
logger.LogInformation($"Created a file {tempFile} with {eventCount} events");
}
//prepare for next batch
sw.Restart();
eventCount = 0;
sb.Clear();
}
Thread.Sleep(1000);
}
}
private static void SendToKusto(ILogger logger, ConcurrentQueue<IngestionJob> ingestionJobs, SettingsKusto kusto)
{
var kustoConnectionStringBuilderEngine =
new KustoConnectionStringBuilder($"https://{kusto.ClusterName}.kusto.windows.net").WithAadApplicationKeyAuthentication(
applicationClientId: kusto.ClientId,
applicationKey: kusto.ClientSecret,
authority: kusto.TenantId);
using (IKustoIngestClient client = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilderEngine))
{
while (true)
{
IngestionJob job;
while (ingestionJobs.TryDequeue(out job))
{
//Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties(databaseName: kusto.DbName, tableName: job.BatchInfo.KustoTable);
kustoIngestionProperties.SetAppropriateMappingReference(job.BatchInfo.MappingName, Kusto.Data.Common.DataSourceFormat.json);
kustoIngestionProperties.Format = Kusto.Data.Common.DataSourceFormat.json;
logger.LogDebug($"About start ingestion into table {job.BatchInfo.KustoTable} using file {job.ToBeIngested}");
//ingest
Ingest(logger, client, job, kustoIngestionProperties, kusto);
logger.LogInformation($"Finished ingestion into table {job.BatchInfo.KustoTable} using file {job.ToBeIngested}");
Thread.Sleep(100);
File.Delete(job.ToBeIngested);
logger.LogDebug($"Deleted file {job.ToBeIngested} because of successful ingestion");
Thread.Sleep(10000);
}
}
}
}
static void Ingest(ILogger logger, IKustoIngestClient client, IngestionJob job, KustoIngestionProperties kustoIngestionProperties, SettingsKusto kusto)
{
int retries = 0;
while (retries < kusto.MaxRetries)
{
try
{
client.IngestFromStorage(job.ToBeIngested, kustoIngestionProperties);
return;
}
catch (Exception e)
{
logger.LogError(e, $"Could not ingest {job.ToBeIngested} into table {job.BatchInfo.KustoTable}.");
retries++;
Thread.Sleep(kusto.MsBetweenRetries);
}
}
}
private static Settings GetSettings()
{
string developmentConfiguration = "appsettingsDevelopment.json";
string configFile = "appsettings.json";
string fileUsedForConfiguration = null;
if (File.Exists(developmentConfiguration))
{
fileUsedForConfiguration = developmentConfiguration;
}
else
{
fileUsedForConfiguration = configFile;
}
IConfiguration config = new ConfigurationBuilder()
.AddJsonFile(fileUsedForConfiguration)
.AddEnvironmentVariables()
.Build();
Settings settings = config.GetRequiredSection("Settings").Get<Settings>();
// Override any settings from environment variables, useful for Docker Container configurations
settings.ListenPort = GetEnvironmentVariable<int>(nameof(Settings.ListenPort), settings.ListenPort);
settings.SyslogServerName = GetEnvironmentVariable<string>(nameof(Settings.SyslogServerName), settings.SyslogServerName);
settings.Kusto.ClientId = GetEnvironmentVariable<string>(nameof(SettingsKusto.ClientId), settings.Kusto.ClientId);
settings.Kusto.ClientSecret = GetEnvironmentVariable<string>(nameof(SettingsKusto.ClientSecret), settings.Kusto.ClientSecret);
settings.Kusto.ClusterName = GetEnvironmentVariable<string>(nameof(SettingsKusto.ClusterName), settings.Kusto.ClusterName);
settings.Kusto.TenantId = GetEnvironmentVariable<string>(nameof(SettingsKusto.TenantId), settings.Kusto.TenantId);
settings.Kusto.DbName = GetEnvironmentVariable<string>(nameof(SettingsKusto.DbName), settings.Kusto.DbName);
settings.Kusto.MaxRetries = GetEnvironmentVariable<int>(nameof(SettingsKusto.MaxRetries), settings.Kusto.MaxRetries);
settings.Kusto.MsBetweenRetries = GetEnvironmentVariable<int>(nameof(SettingsKusto.MsBetweenRetries), settings.Kusto.MsBetweenRetries);
settings.BatchSettings.KustoTable = GetEnvironmentVariable<string>(nameof(SettingsBatching.KustoTable), settings.BatchSettings.KustoTable);
settings.BatchSettings.MappingName = GetEnvironmentVariable<string>(nameof(SettingsBatching.MappingName), settings.BatchSettings.MappingName);
settings.BatchSettings.BatchLimitInMinutes = GetEnvironmentVariable<int>(nameof(SettingsBatching.BatchLimitInMinutes), settings.BatchSettings.BatchLimitInMinutes);
settings.BatchSettings.BatchLimitNumberOfEvents = GetEnvironmentVariable<int>(nameof(SettingsBatching.BatchLimitNumberOfEvents), settings.BatchSettings.BatchLimitNumberOfEvents);
return settings;
}
private static T GetEnvironmentVariable<T>(string name, T defaultValue)
{
string value = Environment.GetEnvironmentVariable(name);
if (!string.IsNullOrEmpty(value))
{
return (T)Convert.ChangeType(value, typeof(T));
}
return defaultValue;
}
}
}