-
Notifications
You must be signed in to change notification settings - Fork 0
/
LogsEventsProcessingWorker.cs
161 lines (143 loc) · 5.73 KB
/
LogsEventsProcessingWorker.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Kinesis;
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;
using Amazon.S3;
using Amazon.S3.Model;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Poc.LambdaExtension.Logging
{
public class LogsEventsProcessingWorker
: BackgroundService
{
private readonly ILogger<LogsEventsProcessingWorker> _logger;
private readonly HttpClient _httpClient;
private readonly ExtensionClient _extensionClient;
private readonly LoggingApiClient _loggingApiClient;
private readonly ConcurrentQueue<string> _logsQueue;
private readonly IAmazonS3 _s3Client;
private readonly IAmazonKinesisFirehose _kinesisFirehoseClient;
private readonly IAmazonKinesis _kinesisClient;
private readonly string _bucketName;
private readonly string _bucketKeyPrefeix;
private readonly string _functionName;
public LogsEventsProcessingWorker(
ILogger<LogsEventsProcessingWorker> logger,
HttpClient httpClient,
ExtensionClient extensionClient,
LoggingApiClient loggingApiClient,
IAmazonS3 s3Client,
IAmazonKinesisFirehose kinesisFirehoseClient,
IAmazonKinesis kinesisClient,
ConcurrentQueue<string> logsQueue
)
{
_logger = logger;
_httpClient = httpClient;
_extensionClient = extensionClient;
_loggingApiClient = loggingApiClient;
_s3Client = s3Client;
_kinesisFirehoseClient = kinesisFirehoseClient;
_kinesisClient = kinesisClient;
_logsQueue = logsQueue;
_bucketName = "zanfranceschi";
_bucketKeyPrefeix = "logs";
_functionName = Environment.GetEnvironmentVariable("AWS_LAMBDA_FUNCTION_NAME") ?? "sem-nome";
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await EnsureLogsApiIsRunning();
await _extensionClient.ProcessEvents(
onInit: async agentId =>
{
await _loggingApiClient.Subscribe(agentId);
_logger.LogInformation($"logging api subscribed with agentId '{agentId}'");
},
onInvoke: async payload =>
{
await ProcessLogsEvents();
},
onShutdown: payload =>
{
_logger.LogInformation($"shutdown: {payload}");
return ProcessLogsEvents();
});
}
private async Task EnsureLogsApiIsRunning()
{
_logger.LogInformation($"Checking for Logs API service...");
bool isRunning = false;
while (isRunning == false)
{
try
{
var response = await _httpClient.GetAsync($"http://127.0.0.1:{Configs.AGENT_LOGSAPI_PORT}/logging");
if (response.IsSuccessStatusCode)
{
isRunning = true;
_logger.LogInformation($"Logs API service is ok.");
}
else
{
_logger.LogInformation($"Logs API service is not ok: {await response.Content.ReadAsStringAsync()}");
}
}
catch(Exception ex)
{
_logger.LogError(ex, "Error checking Logs API");
}
}
}
private async Task ProcessLogsEvents()
{
string logsEventPayload;
while (_logsQueue.TryDequeue(out logsEventPayload))
{
try
{
// S3
_logger.LogInformation($"executing ProcessLogsEvents with payload {logsEventPayload}");
var date = DateTime.Now;
string key = $"{_bucketKeyPrefeix}/{_functionName}/{date.ToString("yyyy-MM")}/{Guid.NewGuid().ToString("N").ToUpper()}";
var request = new PutObjectRequest
{
BucketName = _bucketName,
ContentBody = logsEventPayload,
Key = key
};
var response = await _s3Client.PutObjectAsync(request);
// using var stream = new MemoryStream(Encoding.UTF8.GetBytes(logsEventPayload));
// Firehose
// var record = new Record
// {
// Data = stream
// };
// var kinesisPutResponse = await _kinesisFirehoseClient.PutRecordAsync("logs-s3", record);
// Kinesis Data Streams
// var response = await _kinesisClient.PutRecordAsync(new Amazon.Kinesis.Model.PutRecordRequest
// {
// Data = stream,
// StreamName = "logs-s3",
// PartitionKey = Guid.NewGuid().ToString()
// });
// _logger.LogInformation($"logged to shard {response.ShardId}");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error executing LogsCallback");
}
}
}
}
}