-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumeRabbitMQHostedService.cs
175 lines (137 loc) · 7.14 KB
/
ConsumeRabbitMQHostedService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*************************************************************************************************************************
* DEVELOPMENT BY : NURMAN HARIYANTO - PT.LSKK & PPTIK *
* VERSION : 2 *
* TYPE APPLICATION : WORKER *
* DESCRIPTION : GET DATA FROM MQTT (OUTPUT DEVICE) CHECK TO DB RULES AND SEND BACK (INPUT DEVICE) IF DATA EXIST *
*************************************************************************************************************************/
namespace worker_smarthome_cloud_server {
using System.Configuration;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.Data.Sqlite;
using System;
public class ConsumeRabbitMQHostedService: BackgroundService {
private readonly ILogger _logger;
private IConnection _connection;
private IModel _channel;
private static string RMQHost = ConfigurationManager.AppSettings["RMQHost"];
private static string RMQVHost = ConfigurationManager.AppSettings["RMQVHost"];
private static string RMQUsername = ConfigurationManager.AppSettings["RMQUsername"];
private static string RMQPassword = ConfigurationManager.AppSettings["RMQPassword"];
private static string RMQQueue = ConfigurationManager.AppSettings["RMQQueue"];
private static string RMQExc = ConfigurationManager.AppSettings["RMQExc"];
private static string RMQPubRoutingKey = ConfigurationManager.AppSettings["RMQPubRoutingKey"];
private static string DBPath = ConfigurationManager.AppSettings["DBPath"];
private static string InputGuid = "";
private static string ValueInput = "";
private static string OutputGuid = "";
private static string ValueOutput = "";
private static string MessageSend = "";
private static string DeviceName = "";
public ConsumeRabbitMQHostedService(ILoggerFactory loggerFactory) {
this._logger = loggerFactory.CreateLogger < ConsumeRabbitMQHostedService > ();
InitRabbitMQ();
}
private void InitRabbitMQ() {
var factory = new ConnectionFactory {
HostName = RMQHost, VirtualHost = RMQVHost, UserName = RMQUsername, Password = RMQPassword
};
// create connection
_connection = factory.CreateConnection();
// create channel
_channel = _connection.CreateModel();
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken) {
stoppingToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (ch, ea) => {
// received message
var body = ea.Body.ToArray();
var content = System.Text.Encoding.UTF8.GetString(body);
// handle the received message
HandleMessageToDB(content);
_channel.BasicAck(ea.DeliveryTag, true);
};
consumer.Shutdown += OnConsumerShutdown;
_channel.BasicConsume(RMQQueue, false, consumer);
return Task.CompletedTask;
}
private void HandleMessageToDB(string content) {
var connectionStringBuilder = new SqliteConnectionStringBuilder();
connectionStringBuilder.DataSource = DBPath;
var connectionDB = new SqliteConnection(connectionStringBuilder.ConnectionString);
//just print this message
// _logger.LogInformation($"consumer received {content}");
//And splite message to Query Parameters
string[] dataParsing = content.Split('#');
foreach(var datas in dataParsing) {
//System.Console.WriteLine($"{datas}>");
InputGuid = dataParsing[0];
ValueInput = dataParsing[1];
}
DateTime now = DateTime.Now;
String TimeStamp = now.ToString();
//_logger.LogInformation($"palyoad received at {TimeStamp}");
connectionDB.Open();
var selectCmd = connectionDB.CreateCommand();
selectCmd.CommandText = "SELECT * FROM Rules WHERE inputguid=@Guidinput AND inputvalue=@Valueinput";
selectCmd.Parameters.AddWithValue("@Guidinput", InputGuid);
selectCmd.Parameters.AddWithValue("@Valueinput", ValueInput);
using(var reader = selectCmd.ExecuteReader()) {
while (reader.Read()) {
OutputGuid = reader.GetString(3);
ValueOutput = reader.GetString(4);
MessageSend = OutputGuid + "#" + ValueOutput;
_channel.BasicPublish(exchange: RMQExc,
routingKey: RMQPubRoutingKey,
basicProperties: null,
body: Encoding.UTF8.GetBytes(MessageSend)
);
}
}
var selectRegistrationCmd = connectionDB.CreateCommand();
selectRegistrationCmd.CommandText = "SELECT * FROM Registration WHERE guid=@Guidinput";
selectRegistrationCmd.Parameters.AddWithValue("@Guidinput", InputGuid);
using(var reader = selectRegistrationCmd.ExecuteReader()) {
while (reader.Read()) {
DeviceName = reader.GetString(5);
}
}
using(var transaction = connectionDB.BeginTransaction()) {
var insertCmd = connectionDB.CreateCommand();
// // _logger.LogInformation($"Try insert data to DB ...");
insertCmd.CommandText = "insert INTO Log (inputguid,inputname,inputvalue,time)VALUES(@inputguid,@devicename,@valueinput,@timestamp)";
insertCmd.Parameters.AddWithValue("@inputguid", InputGuid);
insertCmd.Parameters.AddWithValue("@devicename", DeviceName);
insertCmd.Parameters.AddWithValue("@valueinput", ValueInput);
insertCmd.Parameters.AddWithValue("@timestamp", TimeStamp);
// // _logger.LogInformation($"ESuccess Get Data from payload..");
insertCmd.ExecuteNonQuery();
// // _logger.LogInformation($"Execute Command Insert..");
transaction.Commit();
// }
// _logger.LogInformation($"success insert data to DB");
// connectionDB.Close();
}
connectionDB.Close();
// _logger.LogInformation("Sucess Send Data");
}
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) {
_logger.LogInformation($"connection shut down {e.ReplyText}");
}
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) {
_logger.LogInformation($"consumer shutdown {e.ReplyText}");
}
public override void Dispose() {
_channel.Close();
_connection.Close();
base.Dispose();
}
}
}