Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

periodically save check progress to verified_chunks #662

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ACKNOWLEDGEMENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ Duplicacy is based on the following open source projects:
|https://github.com/pcwizz/xattr | BSD-2-Clause |
|https://github.com/minio/blake2b-simd | Apache-2.0 |
|https://github.com/go-ole/go-ole | MIT |
https://github.com/ncw/swift | MIT |
|https://github.com/ncw/swift | MIT |
85 changes: 52 additions & 33 deletions src/duplicacy_snapshotmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"sort"
"strconv"
"strings"
"text/tabwriter"
"time"
"sync"
"sync/atomic"
"text/tabwriter"
"time"

"github.com/aryann/difflib"
)
Expand Down Expand Up @@ -191,7 +191,7 @@ type SnapshotManager struct {
fileChunk *Chunk
snapshotCache *FileStorage

chunkOperator *ChunkOperator
chunkOperator *ChunkOperator
}

// CreateSnapshotManager creates a snapshot manager
Expand Down Expand Up @@ -738,7 +738,7 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
totalFileSize := int64(0)
lastChunk := 0

snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(file *Entry)bool {
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(file *Entry) bool {
if file.IsFile() {
totalFiles++
totalFileSize += file.Size
Expand All @@ -753,7 +753,7 @@ func (manager *SnapshotManager) ListSnapshots(snapshotID string, revisionsToList
return true
})

snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(file *Entry)bool {
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(file *Entry) bool {
if file.IsFile() {
LOG_INFO("SNAPSHOT_FILE", "%s", file.String(maxSizeDigits))
}
Expand Down Expand Up @@ -908,7 +908,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
_, exist, _, err := manager.storage.FindChunk(0, chunkID, false)
if err != nil {
LOG_WARN("SNAPSHOT_VALIDATE", "Failed to check the existence of chunk %s: %v",
chunkID, err)
chunkID, err)
} else if exist {
LOG_INFO("SNAPSHOT_VALIDATE", "Chunk %s is confirmed to exist", chunkID)
continue
Expand Down Expand Up @@ -1031,9 +1031,10 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
if err != nil {
LOG_WARN("SNAPSHOT_VERIFY", "Failed to save the verified chunks file: %v", err)
} else {
LOG_INFO("SNAPSHOT_VERIFY", "Added %d chunks to the list of verified chunks", len(verifiedChunks) - numberOfVerifiedChunks)
LOG_INFO("SNAPSHOT_VERIFY", "Added %d chunks to the list of verified chunks", len(verifiedChunks)-numberOfVerifiedChunks)
}
}
numberOfVerifiedChunks = len(verifiedChunks)
}
}
defer saveVerifiedChunks()
Expand Down Expand Up @@ -1065,6 +1066,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
totalChunks := int64(len(chunkHashes))

chunkChannel := make(chan int, threads)
periodicalVerifiedChunksWriteDone := make(chan bool)
var wg sync.WaitGroup

wg.Add(threads)
Expand All @@ -1073,7 +1075,7 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe
defer CatchLogException()

for {
chunkIndex, ok := <- chunkChannel
chunkIndex, ok := <-chunkChannel
if !ok {
wg.Done()
return
Expand All @@ -1093,21 +1095,37 @@ func (manager *SnapshotManager) CheckSnapshots(snapshotID string, revisionsToChe

elapsedTime := time.Now().Sub(startTime).Seconds()
speed := int64(float64(downloadedChunkSize) / elapsedTime)
remainingTime := int64(float64(totalChunks - downloadedChunks) / float64(downloadedChunks) * elapsedTime)
remainingTime := int64(float64(totalChunks-downloadedChunks) / float64(downloadedChunks) * elapsedTime)
percentage := float64(downloadedChunks) / float64(totalChunks) * 100.0
LOG_INFO("VERIFY_PROGRESS", "Verified chunk %s (%d/%d), %sB/s %s %.1f%%",
chunkID, downloadedChunks, totalChunks, PrettySize(speed), PrettyTime(remainingTime), percentage)
chunkID, downloadedChunks, totalChunks, PrettySize(speed), PrettyTime(remainingTime), percentage)

manager.config.PutChunk(chunk)
}
} ()
}()
}

// Start a goroutine that periodically stores updated the verified_chunks file
go func() {
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ticker.C:
wg.Add(1)
saveVerifiedChunks()
wg.Done()
case <-periodicalVerifiedChunksWriteDone:
return
}
}
}()

for chunkIndex := range chunkHashes {
chunkChannel <- chunkIndex
}

close(chunkChannel)
periodicalVerifiedChunksWriteDone <- true
wg.Wait()
manager.chunkOperator.WaitForCompletion()

Expand Down Expand Up @@ -1289,10 +1307,10 @@ func (manager *SnapshotManager) PrintSnapshot(snapshot *Snapshot) bool {
}

// Don't print the ending bracket
fmt.Printf("%s", string(description[:len(description) - 2]))
fmt.Printf("%s", string(description[:len(description)-2]))
fmt.Printf(",\n \"files\": [\n")
isFirstFile := true
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func (file *Entry) bool {
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(file *Entry) bool {

fileDescription, _ := json.MarshalIndent(file.convertToObject(false), "", " ")

Expand Down Expand Up @@ -1322,7 +1340,7 @@ func (manager *SnapshotManager) VerifySnapshot(snapshot *Snapshot) bool {
}

files := make([]*Entry, 0)
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func (file *Entry) bool {
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(file *Entry) bool {
if file.IsFile() && file.Size != 0 {
file.Attributes = nil
files = append(files, file)
Expand Down Expand Up @@ -1426,7 +1444,7 @@ func (manager *SnapshotManager) RetrieveFile(snapshot *Snapshot, file *Entry, la
func (manager *SnapshotManager) FindFile(snapshot *Snapshot, filePath string, suppressError bool) *Entry {

var found *Entry
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func (entry *Entry) bool {
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(entry *Entry) bool {
if entry.Path == filePath {
found = entry
return false
Expand Down Expand Up @@ -1479,8 +1497,8 @@ func (manager *SnapshotManager) PrintFile(snapshotID string, revision int, path

file := manager.FindFile(snapshot, path, false)
if !manager.RetrieveFile(snapshot, file, nil, func(chunk []byte) {
fmt.Printf("%s", chunk)
}) {
fmt.Printf("%s", chunk)
}) {
LOG_ERROR("SNAPSHOT_RETRIEVE", "File %s is corrupted in snapshot %s at revision %d",
path, snapshot.ID, snapshot.Revision)
return false
Expand All @@ -1500,7 +1518,7 @@ func (manager *SnapshotManager) Diff(top string, snapshotID string, revisions []
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
} ()
}()

var leftSnapshot *Snapshot
var rightSnapshot *Snapshot
Expand All @@ -1517,10 +1535,10 @@ func (manager *SnapshotManager) Diff(top string, snapshotID string, revisions []
go func() {
defer CatchLogException()
rightSnapshot.ListLocalFiles(top, nobackupFile, filtersFile, excludeByAttribute, localListingChannel, nil, nil)
} ()
}()

for entry := range localListingChannel {
entry.Attributes = nil // attributes are not compared
entry.Attributes = nil // attributes are not compared
rightSnapshotFiles = append(rightSnapshotFiles, entry)
}

Expand Down Expand Up @@ -1725,7 +1743,7 @@ func (manager *SnapshotManager) ShowHistory(top string, snapshotID string, revis
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
} ()
}()

var err error

Expand Down Expand Up @@ -1821,15 +1839,16 @@ func (manager *SnapshotManager) resurrectChunk(fossilPath string, chunkID string

// PruneSnapshots deletes snapshots by revisions, tags, or a retention policy. The main idea is two-step
// fossil collection.
// 1. Delete snapshots specified by revision, retention policy, with a tag. Find any resulting unreferenced
// chunks, and mark them as fossils (by renaming). After that, create a fossil collection file containing
// fossils collected during current run, and temporary files encountered. Also in the file is the latest
// revision for each snapshot id. Save this file to a local directory.
//
// 2. On next run, check if there is any new revision for each snapshot. Or if the lastest revision is too
// old, for instance, more than 7 days. This step is to identify snapshots that were being created while
// step 1 is in progress. For each fossil reference by any of these snapshots, move them back to the
// normal chunk directory.
// 1. Delete snapshots specified by revision, retention policy, with a tag. Find any resulting unreferenced
// chunks, and mark them as fossils (by renaming). After that, create a fossil collection file containing
// fossils collected during current run, and temporary files encountered. Also in the file is the latest
// revision for each snapshot id. Save this file to a local directory.
//
// 2. On next run, check if there is any new revision for each snapshot. Or if the lastest revision is too
// old, for instance, more than 7 days. This step is to identify snapshots that were being created while
// step 1 is in progress. For each fossil reference by any of these snapshots, move them back to the
// normal chunk directory.
//
// Note that a snapshot being created when step 2 is in progress may reference a fossil. To avoid this
// problem, never remove the lastest revision (unless exclusive is true), and only cache chunks referenced
Expand All @@ -1853,7 +1872,7 @@ func (manager *SnapshotManager) PruneSnapshots(selfID string, snapshotID string,
defer func() {
manager.chunkOperator.Stop()
manager.chunkOperator = nil
} ()
}()

prefPath := GetDuplicacyPreferencePath()
logDir := path.Join(prefPath, "logs")
Expand Down Expand Up @@ -2544,7 +2563,7 @@ func (manager *SnapshotManager) CheckSnapshot(snapshot *Snapshot) (err error) {
numberOfChunks, len(snapshot.ChunkLengths))
}

snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func (entry *Entry) bool {
snapshot.ListRemoteFiles(manager.config, manager.chunkOperator, func(entry *Entry) bool {

if lastEntry != nil && lastEntry.Compare(entry) >= 0 && !strings.Contains(lastEntry.Path, "\ufffd") {
err = fmt.Errorf("The entry %s appears before the entry %s", lastEntry.Path, entry.Path)
Expand Down Expand Up @@ -2598,7 +2617,7 @@ func (manager *SnapshotManager) CheckSnapshot(snapshot *Snapshot) (err error) {
if entry.Size != fileSize {
err = fmt.Errorf("The file %s has a size of %d but the total size of chunks is %d",
entry.Path, entry.Size, fileSize)
return false
return false
}

return true
Expand Down Expand Up @@ -2647,7 +2666,7 @@ func (manager *SnapshotManager) DownloadFile(path string, derivationKey string)
err = manager.storage.UploadFile(0, path, newChunk.GetBytes())
if err != nil {
LOG_WARN("DOWNLOAD_REWRITE", "Failed to re-uploaded the file %s: %v", path, err)
} else{
} else {
LOG_INFO("DOWNLOAD_REWRITE", "The file %s has been re-uploaded", path)
}
}
Expand Down