diff --git a/.gitignore b/.gitignore index 849ddff..8efed37 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ dist/ +proxmoxbackupgo_cli.exe +proxmoxbackupgo.exe diff --git a/go.mod b/go.mod index dba0978..1f06c01 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module proxmoxbackupgo go 1.19 require ( + github.com/cornelk/hashmap v1.0.8 github.com/dchest/siphash v1.2.3 github.com/gen2brain/beeep v0.0.0-20230907135156-1a38885a97fc github.com/getlantern/systray v1.2.2 diff --git a/go.sum b/go.sum index fdc6810..b853152 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0= github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= +github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= +github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= diff --git a/main.go b/main.go index ac1c2d5..ae56a97 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,9 @@ import ( "hash" "os" "runtime" + "sync/atomic" + "github.com/cornelk/hashmap" "github.com/gen2brain/beeep" "github.com/getlantern/systray" "github.com/tawesoft/golib/v2/dialog" @@ -46,9 +48,9 @@ func (c *ChunkState) Init() { } func main() { - newchunk := 0 - reusechunk := 0 - known_chunks_digest := make(map[string]bool) + var newchunk atomic.Uint64 + var reusechunk atomic.Uint64 + knownChunks := hashmap.New[string, bool]() // Define command-line flags baseURLFlag := flag.String("baseurl", "", "Base URL for the proxmox backup server, example: https://192.168.1.10:8007") @@ -101,18 +103,20 @@ func main() { backupdir := *backupSourceDirFlag - backupdir = createVSSSnapshot(backupdir) - fmt.Printf("Starting backup of %s\n", backupdir) + backupdir = createVSSSnapshot(backupdir) + //Remove VSS snapshot on windows, on linux for now NOP + defer VSSCleanup() + client.Connect(false) - A := &PXARArchive{} - A.archivename = "backup.pxar.didx" + archive := &PXARArchive{} + archive.archivename = "backup.pxar.didx" - previous_didx := client.DownloadPreviousToBytes(A.archivename) + previousDidx := client.DownloadPreviousToBytes(archive.archivename) - fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previous_didx)) + fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previousDidx)) /*f2, _ := os.Create("test.didx") defer f2.Close() @@ -124,24 +128,24 @@ func main() { we are going to upload to avoid unnecessary traffic and compression cpu usage */ - if !bytes.HasPrefix(previous_didx, didxMagic) { - fmt.Printf("Previous index has wrong magic (%s)!\n", previous_didx[:8]) + if !bytes.HasPrefix(previousDidx, didxMagic) { + fmt.Printf("Previous index has wrong magic (%s)!\n", previousDidx[:8]) } else { //Header as per proxmox documentation is fixed size of 4096 bytes, //then offset of type uint64 and sha256 digests follow , so 40 byte each record until EOF - previous_didx = previous_didx[4096:] - for i := 0; i*40 < len(previous_didx); i += 1 { + previousDidx = previousDidx[4096:] + for i := 0; i*40 < len(previousDidx); i += 1 { e := DidxEntry{} - e.offset = binary.LittleEndian.Uint64(previous_didx[i*40 : i*40+8]) - e.digest = previous_didx[i*40+8 : i*40+40] + e.offset = binary.LittleEndian.Uint64(previousDidx[i*40 : i*40+8]) + e.digest = previousDidx[i*40+8 : i*40+40] shahash := hex.EncodeToString(e.digest) fmt.Printf("Previous: %s\n", shahash) - known_chunks_digest[shahash] = true + knownChunks.Set(shahash, true) } } - fmt.Printf("Known chunks: %d!\n", len(known_chunks_digest)) + fmt.Printf("Known chunks: %d!\n", knownChunks.Len()) f := &os.File{} if *pxarOut != "" { f, _ = os.Create(*pxarOut) @@ -149,163 +153,168 @@ func main() { } /**/ - PXAR_CHK := ChunkState{} - PXAR_CHK.Init() + pxarChunk := ChunkState{} + pxarChunk.Init() - PCAT1_CHK := ChunkState{} - PCAT1_CHK.Init() + pcat1Chunk := ChunkState{} + pcat1Chunk.Init() - PXAR_CHK.wrid = client.CreateDynamicIndex(A.archivename) - PCAT1_CHK.wrid = client.CreateDynamicIndex("catalog.pcat1.didx") + pxarChunk.wrid = client.CreateDynamicIndex(archive.archivename) + pcat1Chunk.wrid = client.CreateDynamicIndex("catalog.pcat1.didx") - A.writeCB = func(b []byte) { - chunkpos := PXAR_CHK.C.Scan(b) + archive.writeCB = func(b []byte) { + chunkpos := pxarChunk.C.Scan(b) - if chunkpos > 0 { //We test if cyclic polynomial hash returned the expected value for chunk boundary - for chunkpos > 0 { + if chunkpos == 0 { + pxarChunk.current_chunk = append(pxarChunk.current_chunk, b...) + } - PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b[:chunkpos]...) + for chunkpos > 0 { + pxarChunk.current_chunk = append(pxarChunk.current_chunk, b[:chunkpos]...) - h := sha256.New() - h.Write(PXAR_CHK.current_chunk) - bindigest := h.Sum(nil) - shahash := hex.EncodeToString(bindigest) + h := sha256.New() + h.Write(pxarChunk.current_chunk) + bindigest := h.Sum(nil) + shahash := hex.EncodeToString(bindigest) - if !known_chunks_digest[shahash] { - fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - newchunk++ - client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk) - } else { - fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - reusechunk++ - } + if _, ok := knownChunks.GetOrInsert(shahash, true); ok { + fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + newchunk.Add(1) - binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk)))) - PXAR_CHK.chunkdigests.Write(h.Sum(nil)) + client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk) + } else { + fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + reusechunk.Add(1) + } - PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos) - PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash) - PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk)) - PXAR_CHK.chunkcount += 1 + binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk)))) + pxarChunk.chunkdigests.Write(h.Sum(nil)) - PXAR_CHK.current_chunk = b[chunkpos:] - chunkpos = PXAR_CHK.C.Scan(b[chunkpos:]) - } - } else { - PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b...) + pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos) + pxarChunk.assignments = append(pxarChunk.assignments, shahash) + pxarChunk.pos += uint64(len(pxarChunk.current_chunk)) + pxarChunk.chunkcount += 1 + + pxarChunk.current_chunk = b[chunkpos:] + chunkpos = pxarChunk.C.Scan(b[chunkpos:]) } + if *pxarOut != "" { f.Write(b) } // } - A.catalogWriteCB = func(b []byte) { - chunkpos := PCAT1_CHK.C.Scan(b) + archive.catalogWriteCB = func(b []byte) { + chunkpos := pcat1Chunk.C.Scan(b) + + if chunkpos == 0 { + pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b...) + } + + var lastChunkPos uint64 + for chunkpos > 0 { + pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b[:chunkpos]...) - if chunkpos > 0 { - for chunkpos > 0 { + h := sha256.New() + h.Write(pcat1Chunk.current_chunk) + shahash := hex.EncodeToString(h.Sum(nil)) - PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b[:chunkpos]...) + fmt.Printf("Catalog: New chunk[%s] %d bytes, pos %d\n", shahash, len(pcat1Chunk.current_chunk), chunkpos) - h := sha256.New() - h.Write(PCAT1_CHK.current_chunk) - shahash := hex.EncodeToString(h.Sum(nil)) + client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk) + binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk)))) + pcat1Chunk.chunkdigests.Write(h.Sum(nil)) - fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(PCAT1_CHK.current_chunk)) + pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos) + pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash) + pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk)) + pcat1Chunk.chunkcount += 1 - client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk) - binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk)))) - PCAT1_CHK.chunkdigests.Write(h.Sum(nil)) + pcat1Chunk.current_chunk = b[chunkpos:] - PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos) - PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash) - PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk)) - PCAT1_CHK.chunkcount += 1 + //lastChunkPos is here so we know when pcat1Chunk.C.Scan loops from beginnning. + lastChunkPos = chunkpos + chunkpos = pcat1Chunk.C.Scan(b[chunkpos:]) - PCAT1_CHK.current_chunk = b[chunkpos:] - chunkpos = PCAT1_CHK.C.Scan(b[chunkpos:]) + if chunkpos < lastChunkPos { + break } - } else { - PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b...) } } //This is the entry point of backup job which will start streaming with the PCAT and PXAR write callback //Data to be hashed and eventuall uploaded - A.WriteDir(backupdir, "", true) + archive.WriteDir(backupdir, "", true) //Here we write the remainder of data for which cyclic hash did not trigger - if len(PXAR_CHK.current_chunk) > 0 { + if len(pxarChunk.current_chunk) > 0 { h := sha256.New() - h.Write(PXAR_CHK.current_chunk) + h.Write(pxarChunk.current_chunk) shahash := hex.EncodeToString(h.Sum(nil)) - binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk)))) - PXAR_CHK.chunkdigests.Write(h.Sum(nil)) + binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk)))) + pxarChunk.chunkdigests.Write(h.Sum(nil)) - if !known_chunks_digest[shahash] { - fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk) - newchunk++ + if _, ok := knownChunks.GetOrInsert(shahash, true); ok { + fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk) + newchunk.Add(1) } else { - fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - reusechunk++ + fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + reusechunk.Add(1) } - PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos) - PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash) - PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk)) - PXAR_CHK.chunkcount += 1 + pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos) + pxarChunk.assignments = append(pxarChunk.assignments, shahash) + pxarChunk.pos += uint64(len(pxarChunk.current_chunk)) + pxarChunk.chunkcount += 1 } - if len(PCAT1_CHK.current_chunk) > 0 { + if len(pcat1Chunk.current_chunk) > 0 { h := sha256.New() - h.Write(PCAT1_CHK.current_chunk) + h.Write(pcat1Chunk.current_chunk) shahash := hex.EncodeToString(h.Sum(nil)) - binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk)))) - PCAT1_CHK.chunkdigests.Write(h.Sum(nil)) - - fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PCAT1_CHK.current_chunk)) - PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos) - PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash) - PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk)) - PCAT1_CHK.chunkcount += 1 - client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk) + binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk)))) + pcat1Chunk.chunkdigests.Write(h.Sum(nil)) + + fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(pcat1Chunk.current_chunk)) + pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos) + pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash) + pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk)) + pcat1Chunk.chunkcount += 1 + client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk) } //Avoid incurring in request entity too large by chunking assignment PUT requests in blocks of at most 128 chunks - for k := 0; k < len(PXAR_CHK.assignments); k += 128 { + for k := 0; k < len(pxarChunk.assignments); k += 128 { k2 := k + 128 - if k2 > len(PXAR_CHK.assignments) { - k2 = len(PXAR_CHK.assignments) + if k2 > len(pxarChunk.assignments) { + k2 = len(pxarChunk.assignments) } - client.AssignChunks(PXAR_CHK.wrid, PXAR_CHK.assignments[k:k2], PXAR_CHK.assignments_offset[k:k2]) + client.AssignChunks(pxarChunk.wrid, pxarChunk.assignments[k:k2], pxarChunk.assignments_offset[k:k2]) } - client.CloseDynamicIndex(PXAR_CHK.wrid, hex.EncodeToString(PXAR_CHK.chunkdigests.Sum(nil)), PXAR_CHK.pos, PXAR_CHK.chunkcount) + client.CloseDynamicIndex(pxarChunk.wrid, hex.EncodeToString(pxarChunk.chunkdigests.Sum(nil)), pxarChunk.pos, pxarChunk.chunkcount) - for k := 0; k < len(PCAT1_CHK.assignments); k += 128 { + for k := 0; k < len(pcat1Chunk.assignments); k += 128 { k2 := k + 128 - if k2 > len(PCAT1_CHK.assignments) { - k2 = len(PCAT1_CHK.assignments) + if k2 > len(pcat1Chunk.assignments) { + k2 = len(pcat1Chunk.assignments) } - client.AssignChunks(PCAT1_CHK.wrid, PCAT1_CHK.assignments[k:k2], PCAT1_CHK.assignments_offset[k:k2]) + client.AssignChunks(pcat1Chunk.wrid, pcat1Chunk.assignments[k:k2], pcat1Chunk.assignments_offset[k:k2]) } - client.CloseDynamicIndex(PCAT1_CHK.wrid, hex.EncodeToString(PCAT1_CHK.chunkdigests.Sum(nil)), PCAT1_CHK.pos, PCAT1_CHK.chunkcount) + client.CloseDynamicIndex(pcat1Chunk.wrid, hex.EncodeToString(pcat1Chunk.chunkdigests.Sum(nil)), pcat1Chunk.pos, pcat1Chunk.chunkcount) client.UploadManifest() client.Finish() - fmt.Printf("New %d , Reused %d\n", newchunk, reusechunk) - - //Remove VSS snapshot on windows, on linux for now NOP - VSSCleanup() + fmt.Printf("New %d , Reused %d\n", newchunk.Load(), reusechunk.Load()) if runtime.GOOS == "windows" { systray.Quit() - beeep.Notify("Proxmox Backup Go", fmt.Sprintf("Backup complete\nChunks New %d , Reused %d\n", newchunk, reusechunk), "") + beeep.Notify("Proxmox Backup Go", fmt.Sprintf("Backup complete\nChunks New %d , Reused %d\n", newchunk.Load(), reusechunk.Load()), "") } } diff --git a/pbsapi.go b/pbsapi.go index e91509d..0fceb9a 100644 --- a/pbsapi.go +++ b/pbsapi.go @@ -377,7 +377,7 @@ func (pbs *PBSClient) Connect(reader bool) { conn.Write([]byte("Upgrade: proxmox-backup-reader-protocol-v1\r\n")) } conn.Write([]byte("Connection: Upgrade\r\n\r\n")) - fmt.Println("Reading response to upgrade...\n") + fmt.Printf("Reading response to upgrade...\n") buf := make([]byte, 0) for !strings.HasSuffix(string(buf), "\r\n\r\n") && !strings.HasSuffix(string(buf), "\n\n") { //fmt.Println(buf) diff --git a/pxar.go b/pxar.go index 2270c9b..3239253 100644 --- a/pxar.go +++ b/pxar.go @@ -284,7 +284,6 @@ func (a *PXARArchive) WriteDir(path string, dirname string, toplevel bool) Catal a.Flush() goodbyteitems := make([]GoodByeItem, 0) - catalog_files := make([]CatalogFile, 0) catalog_dirs := make([]CatalogDir, 0) @@ -301,8 +300,8 @@ func (a *PXARArchive) WriteDir(path string, dirname string, toplevel bool) Catal }) } else { F := a.WriteFile(filepath.Join(path, file.Name()), file.Name()) - catalog_files = append(catalog_files, F) + catalog_files = append(catalog_files, F) goodbyteitems = append(goodbyteitems, GoodByeItem{ offset: startpos, hash: siphash.Hash(0x83ac3f1cfbb450db, 0xaa4f1b6879369fbd, []byte(file.Name())),