-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathtkrzw_dbm_ulog.h
342 lines (300 loc) · 10.5 KB
/
tkrzw_dbm_ulog.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
/*************************************************************************************************
* DBM update logger implementations
*
* Copyright 2020 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*************************************************************************************************/
#ifndef _TKRZW_DBM_ULOG_H
#define _TKRZW_DBM_ULOG_H
#include <deque>
#include <string>
#include <string_view>
#include <cinttypes>
#include "tkrzw_dbm.h"
#include "tkrzw_lib_common.h"
#include "tkrzw_message_queue.h"
#include "tkrzw_thread_util.h"
namespace tkrzw {
/**
* DBM update logger to store logs into a string deque.
*/
class DBMUpdateLoggerStrDeque final : public DBM::UpdateLogger {
public:
/**
* Constructor.
* @param delim The delimiter put between fields in a log.
*/
explicit DBMUpdateLoggerStrDeque(const std::string& delim = "\t");
/**
* Writes a log for modifying an existing record or adding a new record.
* @param key The key of the record.
* @param value The new value of the record.
* @return The result status.
*/
Status WriteSet(std::string_view key, std::string_view value) override;
/**
* Writes a log for removing an existing record.
* @param key The key of the record.
* @return The result status.
*/
Status WriteRemove(std::string_view key) override;
/**
* Writes a log for removing all records.
* @return The result status.
*/
Status WriteClear() override;
/**
* Gets the number of logs.
* @return The number of logs.
*/
int64_t GetSize();
/**
* Gets the first log in the queue and removes it.
* @param text The pointer to a string object to store the result. If it is nullptr,
* assignment is not done.
* @return True on success or false if there's no log in the queue.
*/
bool PopFront(std::string* text);
/**
* Gets the last log in the queue and removes it.
* @param text The pointer to a string object to store the result. If it is nullptr,
* assignment is not done.
* @return True on success or false if there's no log in the queue.
*/
bool PopBack(std::string* text);
/**
* Removes all logs.
*/
void Clear();
private:
/** The delimiter string. */
std::string delim_;
/** Log strings. */
std::deque<std::string> logs_;
/** Mutex to guard the logs. */
SpinMutex mutex_;
};
/**
* DBM update logger to replicate updates in another DBM.
*/
class DBMUpdateLoggerDBM final : public DBM::UpdateLogger {
public:
/**
* Constructor.
* @param dbm A DBM object to store logs in. The ownership is not taken.
*/
explicit DBMUpdateLoggerDBM(DBM* dbm);
/**
* Writes a log for modifying an existing record or adding a new record.
* @param key The key of the record.
* @param value The new value of the record.
* @return The result status.
*/
Status WriteSet(std::string_view key, std::string_view value) override;
/**
* Writes a log for removing an existing record.
* @param key The key of the record.
* @return The result status.
*/
Status WriteRemove(std::string_view key) override;
/**
* Writes a log for removing all records.
* @return The result status.
*/
Status WriteClear() override;
/**
* Synchronizes the metadata and content to the file system.
* @param hard True to do physical synchronization with the hardware or false to do only
* logical synchronization with the file system.
* @return The result status.
*/
Status Synchronize(bool hard) override;
private:
/** A DBM object to store logs in. */
DBM* dbm_;
};
/**
* Update logger adapter for the second shard and later.
*/
class DBMUpdateLoggerSecondShard final : public DBM::UpdateLogger {
public:
/**
* Default constructor.
*/
DBMUpdateLoggerSecondShard() : ulog_(nullptr) {}
/**
* Constructor.
* @param ulog The logger to do actual logging.
*/
explicit DBMUpdateLoggerSecondShard(DBM::UpdateLogger* ulog) : ulog_(ulog) {}
/**
* Set the update logger to do actual logging.
* @param ulog The update logger to do actual logging.
*/
void SetUpdateLogger(DBM::UpdateLogger* ulog) {
ulog_ = ulog;
}
/**
* Writes a log for modifying an existing record or adding a new record.
* @param key The key of the record.
* @param value The new value of the record.
* @return The result status.
*/
Status WriteSet(std::string_view key, std::string_view value) override {
return ulog_->WriteSet(key, value);
}
/**
* Writes a log for removing an existing record.
* @param key The key of the record.
* @return The result status.
*/
Status WriteRemove(std::string_view key) override {
return ulog_->WriteRemove(key);
}
/**
* Writes a log for removing all records.
* @return The result status.
* @details This does no operation.
*/
Status WriteClear() override {
return Status(Status::SUCCESS);
}
public:
/** The internal update logger. */
DBM::UpdateLogger* ulog_;
};
/**
* DBM update logger with a message queue.
*/
class DBMUpdateLoggerMQ final : public DBM::UpdateLogger {
public:
/**
* Enumeration for operation types.
*/
enum OpType : int32_t {
/** Invalid operation. */
OP_VOID = 0,
/** To modify or add a record. */
OP_SET = 1,
/** To remove a record. */
OP_REMOVE = 2,
/** To remove all records. */
OP_CLEAR = 3,
};
/**
* Common structure of an update log.
*/
struct UpdateLog {
/** The operation type. */
OpType op_type;
/** The server ID. */
int32_t server_id;
/** The DBM index. */
int32_t dbm_index;
/** The key of the record. */
std::string_view key;
/** The value of the record. */
std::string_view value;
};
/**
* Constructor.
* @param mq The message queue object to store update logs. The ownership is not taken.
* @param server_id The server ID of the process.
* @param dbm_index The index of the DBM on the server.
* @param fixed_timestamp If not negative, the timestamp is fixed to the value.
*/
explicit DBMUpdateLoggerMQ(MessageQueue* mq, int32_t server_id = 0, int32_t dbm_index = 0,
int64_t fixed_timestamp = -1);
/**
* Writes a log for modifying an existing record or adding a new record.
* @param key The key of the record.
* @param value The new value of the record.
* @return The result status.
*/
Status WriteSet(std::string_view key, std::string_view value) override;
/**
* Writes a log for removing an existing record.
* @param key The key of the record.
* @return The result status.
*/
Status WriteRemove(std::string_view key) override;
/**
* Writes a log for removing all records.
* @return The result status.
*/
Status WriteClear() override;
/**
* Synchronizes the metadata and content to the file system.
* @param hard True to do physical synchronization with the hardware or false to do only
* logical synchronization with the file system.
* @return The result status.
*/
Status Synchronize(bool hard) override;
/**
* Overwrites the server ID of the current thread.
* @param server_id The server ID of the process. If it is negative, the thread local setting
* is undone. If it is INT32MIN, logging of the current thread is disable.
* @details This affects logging of only the current thread regardless of the logger instance.
*/
static void OverwriteThreadServerID(int32_t server_id);
/**
* Parses an update log message.
* @param message The update log message.
* @param op The pointer to the update log object to store the result. The life duration of
* the key and the value fields is the same as the given message.
* @return The result status.
*/
static Status ParseUpdateLog(std::string_view message, UpdateLog* op);
/**
* Applys the operation in an update log to a database.
* @param dbm The DBM object of the database.
* @param message The update log message.
* @param server_id The server ID to focus on. A negative applies a filter which ignores the
* message if the server ID mathces the absolute value. Zero or a positive applies a filter
* which adopts the message if the server ID matches the value.
* @param dbm_index The DBM index to focus on. A negative applies a filter which ignores the
* message if the DBM index mathces the absolute value. Zero or a positive applies a filter
* which adopts the message if the DBM index matches the value.
* @return The result status. If the log is ignored due to the filter, INFEASIBLE_ERROR is
* returned.
*/
static Status ApplyUpdateLog(
DBM* dbm, std::string_view message,
int32_t server_id = INT32MIN + 1, int32_t dbm_index = INT32MIN + 1);
/**
* Applys the operations in the message queue files.
* @param dbm The DBM object of the database.
* @param prefix The prefix for the message queue file names.
* @param min_timestamp The minimum timestamp in milliseconds of messages to read.
* @param server_id The server ID to focus on. A negative applies a filter which ignores the
* message if the server ID mathces the absolute value. Zero or a positive applies a filter
* which adopts the message if the server ID matches the value.
* @param dbm_index The DBM index to focus on. A negative applies a filter which ignores the
* message if the DBM index mathces the absolute value. Zero or a positive applies a filter
* which adopts the message if the DBM index matches the value.
* @return The result status.
*/
static Status ApplyUpdateLogFromFiles(
DBM* dbm, const std::string& prefix, double min_timestamp = 0,
int32_t server_id = INT32MIN + 1, int32_t dbm_index = INT32MIN + 1);
private:
/** The thread local server ID. */
static thread_local int32_t thread_local_server_id_;
/** The message queue. */
MessageQueue* mq_;
/** The server ID of the process. */
int32_t server_id_;
/** The index of the DBM on the server. */
int32_t dbm_index_;
/** The fixed timestamp. */
int64_t fixed_timestamp_;
};
} // namespace tkrzw
#endif // _TKRZW_DBM_ULOG_H
// END OF FILE