diff --git a/main.go b/main.go index 50808a7..30b05ad 100644 --- a/main.go +++ b/main.go @@ -111,12 +111,12 @@ func main() { client.Connect(false) - A := &PXARArchive{} - A.archivename = "backup.pxar.didx" + archive := &PXARArchive{} + archive.archivename = "backup.pxar.didx" - previous_didx := client.DownloadPreviousToBytes(A.archivename) + previousDidx := client.DownloadPreviousToBytes(archive.archivename) - fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previous_didx)) + fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previousDidx)) /*f2, _ := os.Create("test.didx") defer f2.Close() @@ -128,17 +128,17 @@ func main() { we are going to upload to avoid unnecessary traffic and compression cpu usage */ - if !bytes.HasPrefix(previous_didx, didxMagic) { - fmt.Printf("Previous index has wrong magic (%s)!\n", previous_didx[:8]) + if !bytes.HasPrefix(previousDidx, didxMagic) { + fmt.Printf("Previous index has wrong magic (%s)!\n", previousDidx[:8]) } else { //Header as per proxmox documentation is fixed size of 4096 bytes, //then offset of type uint64 and sha256 digests follow , so 40 byte each record until EOF - previous_didx = previous_didx[4096:] - for i := 0; i*40 < len(previous_didx); i += 1 { + previousDidx = previousDidx[4096:] + for i := 0; i*40 < len(previousDidx); i += 1 { e := DidxEntry{} - e.offset = binary.LittleEndian.Uint64(previous_didx[i*40 : i*40+8]) - e.digest = previous_didx[i*40+8 : i*40+40] + e.offset = binary.LittleEndian.Uint64(previousDidx[i*40 : i*40+8]) + e.digest = previousDidx[i*40+8 : i*40+40] shahash := hex.EncodeToString(e.digest) fmt.Printf("Previous: %s\n", shahash) knownChunks.Set(shahash, true) @@ -153,153 +153,152 @@ func main() { } /**/ - PXAR_CHK := ChunkState{} - PXAR_CHK.Init() + pxarChunk := ChunkState{} + pxarChunk.Init() - PCAT1_CHK := ChunkState{} - PCAT1_CHK.Init() + pcat1Chunk := ChunkState{} + pcat1Chunk.Init() - PXAR_CHK.wrid = client.CreateDynamicIndex(A.archivename) - PCAT1_CHK.wrid = client.CreateDynamicIndex("catalog.pcat1.didx") + pxarChunk.wrid = client.CreateDynamicIndex(archive.archivename) + pcat1Chunk.wrid = client.CreateDynamicIndex("catalog.pcat1.didx") - A.writeCB = func(b []byte) { - chunkpos := PXAR_CHK.C.Scan(b) + archive.writeCB = func(b []byte) { + chunkpos := pxarChunk.C.Scan(b) - if chunkpos > 0 { //We test if cyclic polynomial hash returned the expected value for chunk boundary - for chunkpos > 0 { + if chunkpos == 0 { + pxarChunk.current_chunk = append(pxarChunk.current_chunk, b...) + } - PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b[:chunkpos]...) + for chunkpos > 0 { + pxarChunk.current_chunk = append(pxarChunk.current_chunk, b[:chunkpos]...) - h := sha256.New() - h.Write(PXAR_CHK.current_chunk) - bindigest := h.Sum(nil) - shahash := hex.EncodeToString(bindigest) + h := sha256.New() + h.Write(pxarChunk.current_chunk) + bindigest := h.Sum(nil) + shahash := hex.EncodeToString(bindigest) - if _, ok := knownChunks.GetOrInsert(shahash, true); ok { - fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - newchunk.Add(1) + if _, ok := knownChunks.GetOrInsert(shahash, true); ok { + fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + newchunk.Add(1) - client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk) - } else { - fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - reusechunk.Add(1) - } + client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk) + } else { + fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + reusechunk.Add(1) + } - binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk)))) - PXAR_CHK.chunkdigests.Write(h.Sum(nil)) + binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk)))) + pxarChunk.chunkdigests.Write(h.Sum(nil)) - PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos) - PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash) - PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk)) - PXAR_CHK.chunkcount += 1 + pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos) + pxarChunk.assignments = append(pxarChunk.assignments, shahash) + pxarChunk.pos += uint64(len(pxarChunk.current_chunk)) + pxarChunk.chunkcount += 1 - PXAR_CHK.current_chunk = b[chunkpos:] - chunkpos = PXAR_CHK.C.Scan(b[chunkpos:]) - } - } else { - PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b...) + pxarChunk.current_chunk = b[chunkpos:] + chunkpos = pxarChunk.C.Scan(b[chunkpos:]) } + if *pxarOut != "" { f.Write(b) } // } - A.catalogWriteCB = func(b []byte) { - chunkpos := PCAT1_CHK.C.Scan(b) + archive.catalogWriteCB = func(b []byte) { + chunkpos := pcat1Chunk.C.Scan(b) - if chunkpos > 0 { - for chunkpos > 0 { + if chunkpos == 0 { + pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b...) + } - PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b[:chunkpos]...) + for chunkpos > 0 { + pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b[:chunkpos]...) - h := sha256.New() - h.Write(PCAT1_CHK.current_chunk) - shahash := hex.EncodeToString(h.Sum(nil)) + h := sha256.New() + h.Write(pcat1Chunk.current_chunk) + shahash := hex.EncodeToString(h.Sum(nil)) - fmt.Printf("Catalog: New chunk[%s] %d bytes, pos %d\n", shahash, len(PCAT1_CHK.current_chunk), chunkpos) + fmt.Printf("Catalog: New chunk[%s] %d bytes, pos %d\n", shahash, len(pcat1Chunk.current_chunk), chunkpos) - client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk) - binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk)))) - PCAT1_CHK.chunkdigests.Write(h.Sum(nil)) + client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk) + binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk)))) + pcat1Chunk.chunkdigests.Write(h.Sum(nil)) - PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos) - PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash) - PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk)) - PCAT1_CHK.chunkcount += 1 + pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos) + pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash) + pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk)) + pcat1Chunk.chunkcount += 1 - PCAT1_CHK.current_chunk = b[chunkpos:] - chunkpos = PCAT1_CHK.C.Scan(b[chunkpos:]) - } - } else { - PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b...) + pcat1Chunk.current_chunk = b[chunkpos:] + chunkpos = pcat1Chunk.C.Scan(b[chunkpos:]) } } //This is the entry point of backup job which will start streaming with the PCAT and PXAR write callback //Data to be hashed and eventuall uploaded - A.WriteDir(backupdir, "", true) + archive.WriteDir(backupdir, "", true) //Here we write the remainder of data for which cyclic hash did not trigger - if len(PXAR_CHK.current_chunk) > 0 { + if len(pxarChunk.current_chunk) > 0 { h := sha256.New() - h.Write(PXAR_CHK.current_chunk) + h.Write(pxarChunk.current_chunk) shahash := hex.EncodeToString(h.Sum(nil)) - binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk)))) - PXAR_CHK.chunkdigests.Write(h.Sum(nil)) + binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk)))) + pxarChunk.chunkdigests.Write(h.Sum(nil)) if _, ok := knownChunks.GetOrInsert(shahash, true); ok { - fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) - client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk) + fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) + client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk) newchunk.Add(1) } else { - fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk)) + fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk)) reusechunk.Add(1) } - PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos) - PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash) - PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk)) - PXAR_CHK.chunkcount += 1 + pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos) + pxarChunk.assignments = append(pxarChunk.assignments, shahash) + pxarChunk.pos += uint64(len(pxarChunk.current_chunk)) + pxarChunk.chunkcount += 1 } - if len(PCAT1_CHK.current_chunk) > 0 { + if len(pcat1Chunk.current_chunk) > 0 { h := sha256.New() - h.Write(PCAT1_CHK.current_chunk) + h.Write(pcat1Chunk.current_chunk) shahash := hex.EncodeToString(h.Sum(nil)) - binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk)))) - PCAT1_CHK.chunkdigests.Write(h.Sum(nil)) - - fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(PCAT1_CHK.current_chunk)) - PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos) - PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash) - PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk)) - PCAT1_CHK.chunkcount += 1 - client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk) + binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk)))) + pcat1Chunk.chunkdigests.Write(h.Sum(nil)) + + fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(pcat1Chunk.current_chunk)) + pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos) + pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash) + pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk)) + pcat1Chunk.chunkcount += 1 + client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk) } //Avoid incurring in request entity too large by chunking assignment PUT requests in blocks of at most 128 chunks - for k := 0; k < len(PXAR_CHK.assignments); k += 128 { + for k := 0; k < len(pxarChunk.assignments); k += 128 { k2 := k + 128 - if k2 > len(PXAR_CHK.assignments) { - k2 = len(PXAR_CHK.assignments) + if k2 > len(pxarChunk.assignments) { + k2 = len(pxarChunk.assignments) } - client.AssignChunks(PXAR_CHK.wrid, PXAR_CHK.assignments[k:k2], PXAR_CHK.assignments_offset[k:k2]) + client.AssignChunks(pxarChunk.wrid, pxarChunk.assignments[k:k2], pxarChunk.assignments_offset[k:k2]) } - client.CloseDynamicIndex(PXAR_CHK.wrid, hex.EncodeToString(PXAR_CHK.chunkdigests.Sum(nil)), PXAR_CHK.pos, PXAR_CHK.chunkcount) + client.CloseDynamicIndex(pxarChunk.wrid, hex.EncodeToString(pxarChunk.chunkdigests.Sum(nil)), pxarChunk.pos, pxarChunk.chunkcount) - for k := 0; k < len(PCAT1_CHK.assignments); k += 128 { + for k := 0; k < len(pcat1Chunk.assignments); k += 128 { k2 := k + 128 - if k2 > len(PCAT1_CHK.assignments) { - k2 = len(PCAT1_CHK.assignments) + if k2 > len(pcat1Chunk.assignments) { + k2 = len(pcat1Chunk.assignments) } - client.AssignChunks(PCAT1_CHK.wrid, PCAT1_CHK.assignments[k:k2], PCAT1_CHK.assignments_offset[k:k2]) + client.AssignChunks(pcat1Chunk.wrid, pcat1Chunk.assignments[k:k2], pcat1Chunk.assignments_offset[k:k2]) } - client.CloseDynamicIndex(PCAT1_CHK.wrid, hex.EncodeToString(PCAT1_CHK.chunkdigests.Sum(nil)), PCAT1_CHK.pos, PCAT1_CHK.chunkcount) + client.CloseDynamicIndex(pcat1Chunk.wrid, hex.EncodeToString(pcat1Chunk.chunkdigests.Sum(nil)), pcat1Chunk.pos, pcat1Chunk.chunkcount) client.UploadManifest() client.Finish()