Skip to content

Commit

Permalink
feat: some refactorings.
Browse files Browse the repository at this point in the history
  • Loading branch information
jochumdev committed Dec 28, 2023
1 parent 0af60ec commit 229ebf7
Showing 1 changed file with 97 additions and 98 deletions.
195 changes: 97 additions & 98 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ func main() {

client.Connect(false)

A := &PXARArchive{}
A.archivename = "backup.pxar.didx"
archive := &PXARArchive{}
archive.archivename = "backup.pxar.didx"

previous_didx := client.DownloadPreviousToBytes(A.archivename)
previousDidx := client.DownloadPreviousToBytes(archive.archivename)

fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previous_didx))
fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previousDidx))

/*f2, _ := os.Create("test.didx")
defer f2.Close()
Expand All @@ -128,17 +128,17 @@ func main() {
we are going to upload to avoid unnecessary traffic and compression cpu usage
*/

if !bytes.HasPrefix(previous_didx, didxMagic) {
fmt.Printf("Previous index has wrong magic (%s)!\n", previous_didx[:8])
if !bytes.HasPrefix(previousDidx, didxMagic) {
fmt.Printf("Previous index has wrong magic (%s)!\n", previousDidx[:8])

} else {
//Header as per proxmox documentation is fixed size of 4096 bytes,
//then offset of type uint64 and sha256 digests follow , so 40 byte each record until EOF
previous_didx = previous_didx[4096:]
for i := 0; i*40 < len(previous_didx); i += 1 {
previousDidx = previousDidx[4096:]
for i := 0; i*40 < len(previousDidx); i += 1 {
e := DidxEntry{}
e.offset = binary.LittleEndian.Uint64(previous_didx[i*40 : i*40+8])
e.digest = previous_didx[i*40+8 : i*40+40]
e.offset = binary.LittleEndian.Uint64(previousDidx[i*40 : i*40+8])
e.digest = previousDidx[i*40+8 : i*40+40]
shahash := hex.EncodeToString(e.digest)
fmt.Printf("Previous: %s\n", shahash)
knownChunks.Set(shahash, true)
Expand All @@ -153,153 +153,152 @@ func main() {
}
/**/

PXAR_CHK := ChunkState{}
PXAR_CHK.Init()
pxarChunk := ChunkState{}
pxarChunk.Init()

PCAT1_CHK := ChunkState{}
PCAT1_CHK.Init()
pcat1Chunk := ChunkState{}
pcat1Chunk.Init()

PXAR_CHK.wrid = client.CreateDynamicIndex(A.archivename)
PCAT1_CHK.wrid = client.CreateDynamicIndex("catalog.pcat1.didx")
pxarChunk.wrid = client.CreateDynamicIndex(archive.archivename)
pcat1Chunk.wrid = client.CreateDynamicIndex("catalog.pcat1.didx")

A.writeCB = func(b []byte) {
chunkpos := PXAR_CHK.C.Scan(b)
archive.writeCB = func(b []byte) {
chunkpos := pxarChunk.C.Scan(b)

if chunkpos > 0 { //We test if cyclic polynomial hash returned the expected value for chunk boundary
for chunkpos > 0 {
if chunkpos == 0 {
pxarChunk.current_chunk = append(pxarChunk.current_chunk, b...)
}

PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b[:chunkpos]...)
for chunkpos > 0 {
pxarChunk.current_chunk = append(pxarChunk.current_chunk, b[:chunkpos]...)

h := sha256.New()
h.Write(PXAR_CHK.current_chunk)
bindigest := h.Sum(nil)
shahash := hex.EncodeToString(bindigest)
h := sha256.New()
h.Write(pxarChunk.current_chunk)
bindigest := h.Sum(nil)
shahash := hex.EncodeToString(bindigest)

if _, ok := knownChunks.GetOrInsert(shahash, true); ok {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
newchunk.Add(1)
if _, ok := knownChunks.GetOrInsert(shahash, true); ok {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
newchunk.Add(1)

client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
reusechunk.Add(1)
}
client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
reusechunk.Add(1)
}

binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk))))
PXAR_CHK.chunkdigests.Write(h.Sum(nil))
binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk))))
pxarChunk.chunkdigests.Write(h.Sum(nil))

PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos)
PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash)
PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk))
PXAR_CHK.chunkcount += 1
pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos)
pxarChunk.assignments = append(pxarChunk.assignments, shahash)
pxarChunk.pos += uint64(len(pxarChunk.current_chunk))
pxarChunk.chunkcount += 1

PXAR_CHK.current_chunk = b[chunkpos:]
chunkpos = PXAR_CHK.C.Scan(b[chunkpos:])
}
} else {
PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b...)
pxarChunk.current_chunk = b[chunkpos:]
chunkpos = pxarChunk.C.Scan(b[chunkpos:])
}

if *pxarOut != "" {
f.Write(b)
}
//
}

A.catalogWriteCB = func(b []byte) {
chunkpos := PCAT1_CHK.C.Scan(b)
archive.catalogWriteCB = func(b []byte) {
chunkpos := pcat1Chunk.C.Scan(b)

if chunkpos > 0 {
for chunkpos > 0 {
if chunkpos == 0 {
pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b...)
}

PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b[:chunkpos]...)
for chunkpos > 0 {
pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b[:chunkpos]...)

h := sha256.New()
h.Write(PCAT1_CHK.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))
h := sha256.New()
h.Write(pcat1Chunk.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))

fmt.Printf("Catalog: New chunk[%s] %d bytes, pos %d\n", shahash, len(PCAT1_CHK.current_chunk), chunkpos)
fmt.Printf("Catalog: New chunk[%s] %d bytes, pos %d\n", shahash, len(pcat1Chunk.current_chunk), chunkpos)

client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk)
binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk))))
PCAT1_CHK.chunkdigests.Write(h.Sum(nil))
client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk)
binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk))))
pcat1Chunk.chunkdigests.Write(h.Sum(nil))

PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos)
PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash)
PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk))
PCAT1_CHK.chunkcount += 1
pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos)
pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash)
pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk))
pcat1Chunk.chunkcount += 1

PCAT1_CHK.current_chunk = b[chunkpos:]
chunkpos = PCAT1_CHK.C.Scan(b[chunkpos:])
}
} else {
PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b...)
pcat1Chunk.current_chunk = b[chunkpos:]
chunkpos = pcat1Chunk.C.Scan(b[chunkpos:])
}
}

//This is the entry point of backup job which will start streaming with the PCAT and PXAR write callback
//Data to be hashed and eventuall uploaded

A.WriteDir(backupdir, "", true)
archive.WriteDir(backupdir, "", true)

//Here we write the remainder of data for which cyclic hash did not trigger

if len(PXAR_CHK.current_chunk) > 0 {
if len(pxarChunk.current_chunk) > 0 {
h := sha256.New()
h.Write(PXAR_CHK.current_chunk)
h.Write(pxarChunk.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))
binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk))))
PXAR_CHK.chunkdigests.Write(h.Sum(nil))
binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk))))
pxarChunk.chunkdigests.Write(h.Sum(nil))

if _, ok := knownChunks.GetOrInsert(shahash, true); ok {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk)
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk)
newchunk.Add(1)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
reusechunk.Add(1)
}
PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos)
PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash)
PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk))
PXAR_CHK.chunkcount += 1
pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos)
pxarChunk.assignments = append(pxarChunk.assignments, shahash)
pxarChunk.pos += uint64(len(pxarChunk.current_chunk))
pxarChunk.chunkcount += 1

}

if len(PCAT1_CHK.current_chunk) > 0 {
if len(pcat1Chunk.current_chunk) > 0 {
h := sha256.New()
h.Write(PCAT1_CHK.current_chunk)
h.Write(pcat1Chunk.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))
binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk))))
PCAT1_CHK.chunkdigests.Write(h.Sum(nil))

fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(PCAT1_CHK.current_chunk))
PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos)
PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash)
PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk))
PCAT1_CHK.chunkcount += 1
client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk)
binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk))))
pcat1Chunk.chunkdigests.Write(h.Sum(nil))

fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(pcat1Chunk.current_chunk))
pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos)
pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash)
pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk))
pcat1Chunk.chunkcount += 1
client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk)
}

//Avoid incurring in request entity too large by chunking assignment PUT requests in blocks of at most 128 chunks
for k := 0; k < len(PXAR_CHK.assignments); k += 128 {
for k := 0; k < len(pxarChunk.assignments); k += 128 {
k2 := k + 128
if k2 > len(PXAR_CHK.assignments) {
k2 = len(PXAR_CHK.assignments)
if k2 > len(pxarChunk.assignments) {
k2 = len(pxarChunk.assignments)
}
client.AssignChunks(PXAR_CHK.wrid, PXAR_CHK.assignments[k:k2], PXAR_CHK.assignments_offset[k:k2])
client.AssignChunks(pxarChunk.wrid, pxarChunk.assignments[k:k2], pxarChunk.assignments_offset[k:k2])
}

client.CloseDynamicIndex(PXAR_CHK.wrid, hex.EncodeToString(PXAR_CHK.chunkdigests.Sum(nil)), PXAR_CHK.pos, PXAR_CHK.chunkcount)
client.CloseDynamicIndex(pxarChunk.wrid, hex.EncodeToString(pxarChunk.chunkdigests.Sum(nil)), pxarChunk.pos, pxarChunk.chunkcount)

for k := 0; k < len(PCAT1_CHK.assignments); k += 128 {
for k := 0; k < len(pcat1Chunk.assignments); k += 128 {
k2 := k + 128
if k2 > len(PCAT1_CHK.assignments) {
k2 = len(PCAT1_CHK.assignments)
if k2 > len(pcat1Chunk.assignments) {
k2 = len(pcat1Chunk.assignments)
}
client.AssignChunks(PCAT1_CHK.wrid, PCAT1_CHK.assignments[k:k2], PCAT1_CHK.assignments_offset[k:k2])
client.AssignChunks(pcat1Chunk.wrid, pcat1Chunk.assignments[k:k2], pcat1Chunk.assignments_offset[k:k2])
}

client.CloseDynamicIndex(PCAT1_CHK.wrid, hex.EncodeToString(PCAT1_CHK.chunkdigests.Sum(nil)), PCAT1_CHK.pos, PCAT1_CHK.chunkcount)
client.CloseDynamicIndex(pcat1Chunk.wrid, hex.EncodeToString(pcat1Chunk.chunkdigests.Sum(nil)), pcat1Chunk.pos, pcat1Chunk.chunkcount)

client.UploadManifest()
client.Finish()
Expand Down

0 comments on commit 229ebf7

Please sign in to comment.