-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathwal.go
430 lines (384 loc) · 12.1 KB
/
wal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package raftwal
import (
"bytes"
"crypto/aes"
"encoding/binary"
"sort"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)
var errNotFound = errors.New("Unable to find raft entry")
// wal represents the entire entry log. It consists of one or more
// entryFile objects. This object is not lock protected but it's used by
// DiskStorage, which has a lock protecting the calls to this object.
type wal struct {
// files is the list of all log files ordered in ascending order by the first
// index in the file. The current file being written should always be accessible
// by looking at the last element of this slice.
files []*logFile
current *logFile
// nextEntryIdx is the index of the next entry to write to. When this value exceeds
// maxNumEntries the file will be rotated.
nextEntryIdx int
// dir is the directory to use to store files.
dir string
}
// allEntries returns all the entries in the range [lo, hi).
func (l *wal) allEntries(lo, hi, maxSize uint64) []raftpb.Entry {
var entries []raftpb.Entry
fileIdx, offset := l.slotGe(lo)
var size uint64
if offset < 0 {
// Start from the beginning of the entry file.
offset = 0
}
currFile := l.getEntryFile(fileIdx)
for {
// If offset is greater than maxNumEntries, then we need to move to the next file.
if offset >= maxNumEntries {
if fileIdx == -1 {
// We're already at the latest file. Return the entries we have.
return entries
}
// Move to the next file.
fileIdx++
if fileIdx >= len(l.files) {
// We're beyond the list of files in l.files. Move to the latest file.
fileIdx = -1
}
currFile = l.getEntryFile(fileIdx)
x.AssertTrue(currFile != nil)
// Reset the offset to start reading the next file from the beginning.
offset = 0
}
re := currFile.GetRaftEntry(offset)
// fmt.Printf("Got raft entry: %v\n", re.Index)
if re.Index >= hi {
return entries
}
if re.Index == 0 {
// This entry and all the following ones in this file are empty.
// Setting the offset to maxNumEntries will trigger a move to the next
// file in the next iteration.
offset = maxNumEntries
continue
}
size += uint64(re.Size())
if len(entries) > 0 && size > maxSize {
break
}
entries = append(entries, re)
offset++
}
return entries
}
// AddEntries adds the entries to the log. If there are entries in the log with the same index
// they will be overwritten and the entries after that zeroed out from the log.
func (l *wal) AddEntries(entries []raftpb.Entry) error {
if len(entries) == 0 {
return nil
}
// glog.Infof("AddEntries: %+v\n", entries)
fidx, eidx := l.slotGe(entries[0].Index)
// The first entry in the input is already in the log. We must remove the existing entry
// and all the entries after from the log.
if eidx >= 0 {
if fidx == -1 {
// The existing entry was found in the current file. We only have to zero out
// from the entries after the one in which the entry was found.
if l.nextEntryIdx > eidx {
z.ZeroOut(l.current.Data, entrySize*eidx, entrySize*l.nextEntryIdx)
}
} else {
// The existing entry was found in one of the previous file.
// The logic must do the following.
// 1. Delete all the files after the one in which the entry was found.
// 2. Zero out all the entries after the slot in which the entry was found
// in the file in which it was found.
// 3. Update the pointer to the current file and the list of previous files.
x.AssertTrue(fidx < len(l.files))
extra := l.files[fidx+1:]
extra = append(extra, l.current)
l.current = l.files[fidx]
for _, ef := range extra {
glog.V(2).Infof("Deleting extra file: %d\n", ef.fid)
if err := ef.delete(); err != nil {
glog.Errorf("deleting file: %s. error: %v\n", ef.Fd.Name(), err)
}
}
z.ZeroOut(l.current.Data, entrySize*eidx, logFileOffset)
l.files = l.files[:fidx]
}
l.nextEntryIdx = eidx
}
// Look at the previous entry to find the right offset at which to start writing the value of
// the Data field for each entry.
prev := l.nextEntryIdx - 1
var offset int
if prev >= 0 {
// There was a previous entry. Retrieve the offset and the size data from that entry to
// calculate the next offset.
e := l.current.getEntry(prev)
offset = int(e.DataOffset())
offset += sliceSize(l.current.Data, offset)
} else {
// At the start of the file so use entryFileOffset.
offset = logFileOffset
}
for _, re := range entries {
if l.nextEntryIdx >= maxNumEntries {
if err := l.rotate(re.Index); err != nil {
return err
}
l.nextEntryIdx, offset = 0, logFileOffset
}
var destBuf []byte
var next int
// If encryption is enabled then encrypt the data.
if x.WorkerConfig.EncryptionKey != nil {
var ebuf bytes.Buffer
curr := l.current
if err := y.XORBlockStream(
&ebuf, re.Data, curr.dataKey.Data, curr.generateIV(uint64(offset))); err != nil {
return err
}
re.Data = ebuf.Bytes()
}
// Allocate slice for the data and copy bytes.
destBuf, next = l.current.AllocateSlice(len(re.Data), offset)
x.AssertTrue(copy(destBuf, re.Data) == len(re.Data))
// Write the entry at the given slot.
buf := l.current.getEntry(l.nextEntryIdx)
marshalEntry(buf, re.Term, re.Index, uint64(offset), uint64(re.Type))
// Update values for the next entry.
offset = next
l.nextEntryIdx++
}
return nil
}
// generateIV will generate IV by appending given offset with the base IV.
func (lf *logFile) generateIV(offset uint64) []byte {
iv := make([]byte, aes.BlockSize)
// IV is of 16 bytes, in which first 8 bytes are obtained from baseIV
// and the remaining 8 bytes is obtained from the offset.
y.AssertTrue(baseIVsize == copy(iv[:baseIVsize], lf.baseIV))
binary.BigEndian.PutUint64(iv[baseIVsize:], offset)
return iv
}
// firstIndex returns the first index available in the entry log.
func (l *wal) firstIndex() uint64 {
if l == nil {
return 0
}
var fi uint64
if len(l.files) == 0 {
fi = l.current.getEntry(0).Index()
} else {
fi = l.files[0].getEntry(0).Index()
}
// If fi is zero return one because RAFT expects the first index to always
// be greater than zero.
if fi == 0 {
return 1
}
return fi
}
// LastIndex returns the last index in the log.
func (l *wal) LastIndex() uint64 {
if l.nextEntryIdx-1 >= 0 {
e := l.current.getEntry(l.nextEntryIdx - 1)
return e.Index()
}
for i := len(l.files) - 1; i >= 0; i-- {
ef := l.files[i]
e := ef.lastEntry()
if e.Index() > 0 {
return e.Index()
}
}
return 0
}
// getEntryFile returns right logFile corresponding to the fidx. A value of -1
// is meant to represent the current file, which is not yet stored in l.files.
func (l *wal) getEntryFile(fidx int) *logFile {
if fidx == -1 {
return l.current
}
if fidx >= len(l.files) {
return nil
}
return l.files[fidx]
}
// slotGe returns the file index and the slot within that file containing the
// entry with an index greater than equals to the provided raftIndex. A
// value of -1 for the file index means that the entry is in the current file.
// A value of -1 for slot means that the raftIndex is lower than whatever is
// present in the WAL, thus it is not present in the WAL.
func (l *wal) slotGe(raftIndex uint64) (int, int) {
// Look for the offset in the current file.
if offset := l.current.slotGe(raftIndex); offset >= 0 {
return -1, offset
}
// No previous files, therefore we can only go back to the start of the current file.
if len(l.files) == 0 {
return -1, -1
}
fileIdx := sort.Search(len(l.files), func(i int) bool {
return l.files[i].firstIndex() >= raftIndex
})
// fileIdx points to the first log file, whose firstIndex is >= raftIndex.
// If the firstIndex == raftIndex, then return.
if fileIdx < len(l.files) && l.files[fileIdx].firstIndex() == raftIndex {
return fileIdx, 0
}
// Otherwise, go back one file to the file which has firstIndex < raftIndex.
if fileIdx > 0 {
fileIdx--
}
offset := l.files[fileIdx].slotGe(raftIndex)
return fileIdx, offset
}
// seekEntry returns the entry with the given raftIndex if it exists. If the
// raftIndex is lower than all the entries in the log, raft.ErrCompacted is
// returned. If the raftIndex is higher than all the entries in the log,
// raft.ErrUnavailable is returned. If no match is found, errNotFound is
// returned. Finally, if an entry matches the raftIndex, it is returned.
func (l *wal) seekEntry(raftIndex uint64) (entry, error) {
if raftIndex == 0 {
return emptyEntry, nil
}
fidx, off := l.slotGe(raftIndex)
if off == -1 {
// The entry is not in the log because it was already processed and compacted.
return emptyEntry, raft.ErrCompacted
} else if off >= maxNumEntries {
// The log has not advanced past the given raftIndex.
return emptyEntry, raft.ErrUnavailable
}
ef := l.getEntryFile(fidx)
ent := ef.getEntry(off)
if ent.Index() == 0 {
// The log has not advanced past the given raftIndex.
return emptyEntry, raft.ErrUnavailable
}
if ent.Index() != raftIndex {
return emptyEntry, errNotFound
}
return ent, nil
}
// Term returns the term of entry with raft index = idx. It returns an error if
// a matching entry is not found.
func (l *wal) Term(idx uint64) (uint64, error) {
ent, err := l.seekEntry(idx)
return ent.Term(), err
}
// deleteBefore deletes all the files before the logFile containing the given raftIndex.
func (l *wal) deleteBefore(raftIndex uint64) {
fidx, off := l.slotGe(raftIndex)
if off < 0 || fidx >= len(l.files) {
return
}
var before []*logFile
if fidx == -1 { // current file
before = l.files
l.files = l.files[:0]
} else {
before = l.files[:fidx]
l.files = l.files[fidx:]
}
for _, ef := range before {
if err := ef.delete(); err != nil {
glog.Errorf("while deleting file: %s, err: %v\n", ef.Fd.Name(), err)
}
}
return
}
// reset deletes all the previous log files, and resets the current log file.
func (l *wal) reset() error {
for _, ef := range l.files {
if err := ef.delete(); err != nil {
return errors.Wrapf(err, "while deleting %s", ef.Fd.Name())
}
}
l.files = l.files[:0]
z.ZeroOut(l.current.Data, 0, logFileOffset)
l.nextEntryIdx = 0
return nil
}
// Moves the current logFile into l.files and creates a new logFile.
func (l *wal) rotate(firstIndex uint64) error {
// Select the name for the new file based on the names of the existing files.
nextFid := l.current.fid
x.AssertTrue(nextFid > 0)
for _, ef := range l.files {
if ef.fid > nextFid {
nextFid = ef.fid
}
}
nextFid += 1
go l.current.Sync() // Trigger a sync in the background.
ef, err := openLogFile(l.dir, nextFid)
if err != nil {
return errors.Wrapf(err, "while creating a new entry file")
}
// Move the existing current file to the end of the list of files and
// update the current file to the file that was just created.
l.files = append(l.files, l.current)
l.current = ef
return nil
}
func openWal(dir string) (*wal, error) {
e := &wal{
dir: dir,
}
files, err := getLogFiles(dir)
if err != nil {
return nil, err
}
out := files[:0]
var nextFid int64
for _, ef := range files {
if nextFid < ef.fid {
nextFid = ef.fid
}
if ef.firstIndex() == 0 {
if err := ef.delete(); err != nil {
return nil, err
}
} else {
out = append(out, ef)
}
}
e.files = out
if sz := len(e.files); sz > 0 {
e.current = e.files[sz-1]
e.nextEntryIdx = e.current.firstEmptySlot()
e.files = e.files[:sz-1]
return e, nil
}
// No files found. Create a new file.
nextFid += 1
ef, err := openLogFile(dir, nextFid)
e.current = ef
return e, err
}