Skip to content

Commit

Permalink
Merge pull request #10 from jochumdev/feat/fix_big_catalog
Browse files Browse the repository at this point in the history
Feat/fix big catalog
  • Loading branch information
tizbac authored Dec 31, 2023
2 parents 649871b + 14bfc99 commit 8366461
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 115 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
dist/
proxmoxbackupgo_cli.exe
proxmoxbackupgo.exe
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module proxmoxbackupgo
go 1.19

require (
github.com/cornelk/hashmap v1.0.8
github.com/dchest/siphash v1.2.3
github.com/gen2brain/beeep v0.0.0-20230907135156-1a38885a97fc
github.com/getlantern/systray v1.2.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0=
github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
Expand Down
233 changes: 121 additions & 112 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"hash"
"os"
"runtime"
"sync/atomic"

"github.com/cornelk/hashmap"
"github.com/gen2brain/beeep"
"github.com/getlantern/systray"
"github.com/tawesoft/golib/v2/dialog"
Expand Down Expand Up @@ -46,9 +48,9 @@ func (c *ChunkState) Init() {
}

func main() {
newchunk := 0
reusechunk := 0
known_chunks_digest := make(map[string]bool)
var newchunk atomic.Uint64
var reusechunk atomic.Uint64
knownChunks := hashmap.New[string, bool]()

// Define command-line flags
baseURLFlag := flag.String("baseurl", "", "Base URL for the proxmox backup server, example: https://192.168.1.10:8007")
Expand Down Expand Up @@ -101,18 +103,20 @@ func main() {

backupdir := *backupSourceDirFlag

backupdir = createVSSSnapshot(backupdir)

fmt.Printf("Starting backup of %s\n", backupdir)

backupdir = createVSSSnapshot(backupdir)
//Remove VSS snapshot on windows, on linux for now NOP
defer VSSCleanup()

client.Connect(false)

A := &PXARArchive{}
A.archivename = "backup.pxar.didx"
archive := &PXARArchive{}
archive.archivename = "backup.pxar.didx"

previous_didx := client.DownloadPreviousToBytes(A.archivename)
previousDidx := client.DownloadPreviousToBytes(archive.archivename)

fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previous_didx))
fmt.Printf("Downloaded previous DIDX: %d bytes\n", len(previousDidx))

/*f2, _ := os.Create("test.didx")
defer f2.Close()
Expand All @@ -124,188 +128,193 @@ func main() {
we are going to upload to avoid unnecessary traffic and compression cpu usage
*/

if !bytes.HasPrefix(previous_didx, didxMagic) {
fmt.Printf("Previous index has wrong magic (%s)!\n", previous_didx[:8])
if !bytes.HasPrefix(previousDidx, didxMagic) {
fmt.Printf("Previous index has wrong magic (%s)!\n", previousDidx[:8])

} else {
//Header as per proxmox documentation is fixed size of 4096 bytes,
//then offset of type uint64 and sha256 digests follow , so 40 byte each record until EOF
previous_didx = previous_didx[4096:]
for i := 0; i*40 < len(previous_didx); i += 1 {
previousDidx = previousDidx[4096:]
for i := 0; i*40 < len(previousDidx); i += 1 {
e := DidxEntry{}
e.offset = binary.LittleEndian.Uint64(previous_didx[i*40 : i*40+8])
e.digest = previous_didx[i*40+8 : i*40+40]
e.offset = binary.LittleEndian.Uint64(previousDidx[i*40 : i*40+8])
e.digest = previousDidx[i*40+8 : i*40+40]
shahash := hex.EncodeToString(e.digest)
fmt.Printf("Previous: %s\n", shahash)
known_chunks_digest[shahash] = true
knownChunks.Set(shahash, true)
}
}

fmt.Printf("Known chunks: %d!\n", len(known_chunks_digest))
fmt.Printf("Known chunks: %d!\n", knownChunks.Len())
f := &os.File{}
if *pxarOut != "" {
f, _ = os.Create(*pxarOut)
defer f.Close()
}
/**/

PXAR_CHK := ChunkState{}
PXAR_CHK.Init()
pxarChunk := ChunkState{}
pxarChunk.Init()

PCAT1_CHK := ChunkState{}
PCAT1_CHK.Init()
pcat1Chunk := ChunkState{}
pcat1Chunk.Init()

PXAR_CHK.wrid = client.CreateDynamicIndex(A.archivename)
PCAT1_CHK.wrid = client.CreateDynamicIndex("catalog.pcat1.didx")
pxarChunk.wrid = client.CreateDynamicIndex(archive.archivename)
pcat1Chunk.wrid = client.CreateDynamicIndex("catalog.pcat1.didx")

A.writeCB = func(b []byte) {
chunkpos := PXAR_CHK.C.Scan(b)
archive.writeCB = func(b []byte) {
chunkpos := pxarChunk.C.Scan(b)

if chunkpos > 0 { //We test if cyclic polynomial hash returned the expected value for chunk boundary
for chunkpos > 0 {
if chunkpos == 0 {
pxarChunk.current_chunk = append(pxarChunk.current_chunk, b...)
}

PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b[:chunkpos]...)
for chunkpos > 0 {
pxarChunk.current_chunk = append(pxarChunk.current_chunk, b[:chunkpos]...)

h := sha256.New()
h.Write(PXAR_CHK.current_chunk)
bindigest := h.Sum(nil)
shahash := hex.EncodeToString(bindigest)
h := sha256.New()
h.Write(pxarChunk.current_chunk)
bindigest := h.Sum(nil)
shahash := hex.EncodeToString(bindigest)

if !known_chunks_digest[shahash] {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
newchunk++
client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
reusechunk++
}
if _, ok := knownChunks.GetOrInsert(shahash, true); ok {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
newchunk.Add(1)

binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk))))
PXAR_CHK.chunkdigests.Write(h.Sum(nil))
client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
reusechunk.Add(1)
}

PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos)
PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash)
PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk))
PXAR_CHK.chunkcount += 1
binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk))))
pxarChunk.chunkdigests.Write(h.Sum(nil))

PXAR_CHK.current_chunk = b[chunkpos:]
chunkpos = PXAR_CHK.C.Scan(b[chunkpos:])
}
} else {
PXAR_CHK.current_chunk = append(PXAR_CHK.current_chunk, b...)
pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos)
pxarChunk.assignments = append(pxarChunk.assignments, shahash)
pxarChunk.pos += uint64(len(pxarChunk.current_chunk))
pxarChunk.chunkcount += 1

pxarChunk.current_chunk = b[chunkpos:]
chunkpos = pxarChunk.C.Scan(b[chunkpos:])
}

if *pxarOut != "" {
f.Write(b)
}
//
}

A.catalogWriteCB = func(b []byte) {
chunkpos := PCAT1_CHK.C.Scan(b)
archive.catalogWriteCB = func(b []byte) {
chunkpos := pcat1Chunk.C.Scan(b)

if chunkpos == 0 {
pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b...)
}

var lastChunkPos uint64
for chunkpos > 0 {
pcat1Chunk.current_chunk = append(pcat1Chunk.current_chunk, b[:chunkpos]...)

if chunkpos > 0 {
for chunkpos > 0 {
h := sha256.New()
h.Write(pcat1Chunk.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))

PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b[:chunkpos]...)
fmt.Printf("Catalog: New chunk[%s] %d bytes, pos %d\n", shahash, len(pcat1Chunk.current_chunk), chunkpos)

h := sha256.New()
h.Write(PCAT1_CHK.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))
client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk)
binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk))))
pcat1Chunk.chunkdigests.Write(h.Sum(nil))

fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(PCAT1_CHK.current_chunk))
pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos)
pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash)
pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk))
pcat1Chunk.chunkcount += 1

client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk)
binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk))))
PCAT1_CHK.chunkdigests.Write(h.Sum(nil))
pcat1Chunk.current_chunk = b[chunkpos:]

PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos)
PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash)
PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk))
PCAT1_CHK.chunkcount += 1
//lastChunkPos is here so we know when pcat1Chunk.C.Scan loops from beginnning.
lastChunkPos = chunkpos
chunkpos = pcat1Chunk.C.Scan(b[chunkpos:])

PCAT1_CHK.current_chunk = b[chunkpos:]
chunkpos = PCAT1_CHK.C.Scan(b[chunkpos:])
if chunkpos < lastChunkPos {
break
}
} else {
PCAT1_CHK.current_chunk = append(PCAT1_CHK.current_chunk, b...)
}
}

//This is the entry point of backup job which will start streaming with the PCAT and PXAR write callback
//Data to be hashed and eventuall uploaded

A.WriteDir(backupdir, "", true)
archive.WriteDir(backupdir, "", true)

//Here we write the remainder of data for which cyclic hash did not trigger

if len(PXAR_CHK.current_chunk) > 0 {
if len(pxarChunk.current_chunk) > 0 {
h := sha256.New()
h.Write(PXAR_CHK.current_chunk)
h.Write(pxarChunk.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))
binary.Write(PXAR_CHK.chunkdigests, binary.LittleEndian, (PXAR_CHK.pos + uint64(len(PXAR_CHK.current_chunk))))
PXAR_CHK.chunkdigests.Write(h.Sum(nil))
binary.Write(pxarChunk.chunkdigests, binary.LittleEndian, (pxarChunk.pos + uint64(len(pxarChunk.current_chunk))))
pxarChunk.chunkdigests.Write(h.Sum(nil))

if !known_chunks_digest[shahash] {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
client.UploadCompressedChunk(PXAR_CHK.wrid, shahash, PXAR_CHK.current_chunk)
newchunk++
if _, ok := knownChunks.GetOrInsert(shahash, true); ok {
fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
client.UploadCompressedChunk(pxarChunk.wrid, shahash, pxarChunk.current_chunk)
newchunk.Add(1)
} else {
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(PXAR_CHK.current_chunk))
reusechunk++
fmt.Printf("Reuse chunk[%s] %d bytes\n", shahash, len(pxarChunk.current_chunk))
reusechunk.Add(1)
}
PXAR_CHK.assignments_offset = append(PXAR_CHK.assignments_offset, PXAR_CHK.pos)
PXAR_CHK.assignments = append(PXAR_CHK.assignments, shahash)
PXAR_CHK.pos += uint64(len(PXAR_CHK.current_chunk))
PXAR_CHK.chunkcount += 1
pxarChunk.assignments_offset = append(pxarChunk.assignments_offset, pxarChunk.pos)
pxarChunk.assignments = append(pxarChunk.assignments, shahash)
pxarChunk.pos += uint64(len(pxarChunk.current_chunk))
pxarChunk.chunkcount += 1

}

if len(PCAT1_CHK.current_chunk) > 0 {
if len(pcat1Chunk.current_chunk) > 0 {
h := sha256.New()
h.Write(PCAT1_CHK.current_chunk)
h.Write(pcat1Chunk.current_chunk)
shahash := hex.EncodeToString(h.Sum(nil))
binary.Write(PCAT1_CHK.chunkdigests, binary.LittleEndian, (PCAT1_CHK.pos + uint64(len(PCAT1_CHK.current_chunk))))
PCAT1_CHK.chunkdigests.Write(h.Sum(nil))

fmt.Printf("New chunk[%s] %d bytes\n", shahash, len(PCAT1_CHK.current_chunk))
PCAT1_CHK.assignments_offset = append(PCAT1_CHK.assignments_offset, PCAT1_CHK.pos)
PCAT1_CHK.assignments = append(PCAT1_CHK.assignments, shahash)
PCAT1_CHK.pos += uint64(len(PCAT1_CHK.current_chunk))
PCAT1_CHK.chunkcount += 1
client.UploadCompressedChunk(PCAT1_CHK.wrid, shahash, PCAT1_CHK.current_chunk)
binary.Write(pcat1Chunk.chunkdigests, binary.LittleEndian, (pcat1Chunk.pos + uint64(len(pcat1Chunk.current_chunk))))
pcat1Chunk.chunkdigests.Write(h.Sum(nil))

fmt.Printf("Catalog: New chunk[%s] %d bytes\n", shahash, len(pcat1Chunk.current_chunk))
pcat1Chunk.assignments_offset = append(pcat1Chunk.assignments_offset, pcat1Chunk.pos)
pcat1Chunk.assignments = append(pcat1Chunk.assignments, shahash)
pcat1Chunk.pos += uint64(len(pcat1Chunk.current_chunk))
pcat1Chunk.chunkcount += 1
client.UploadCompressedChunk(pcat1Chunk.wrid, shahash, pcat1Chunk.current_chunk)
}

//Avoid incurring in request entity too large by chunking assignment PUT requests in blocks of at most 128 chunks
for k := 0; k < len(PXAR_CHK.assignments); k += 128 {
for k := 0; k < len(pxarChunk.assignments); k += 128 {
k2 := k + 128
if k2 > len(PXAR_CHK.assignments) {
k2 = len(PXAR_CHK.assignments)
if k2 > len(pxarChunk.assignments) {
k2 = len(pxarChunk.assignments)
}
client.AssignChunks(PXAR_CHK.wrid, PXAR_CHK.assignments[k:k2], PXAR_CHK.assignments_offset[k:k2])
client.AssignChunks(pxarChunk.wrid, pxarChunk.assignments[k:k2], pxarChunk.assignments_offset[k:k2])
}

client.CloseDynamicIndex(PXAR_CHK.wrid, hex.EncodeToString(PXAR_CHK.chunkdigests.Sum(nil)), PXAR_CHK.pos, PXAR_CHK.chunkcount)
client.CloseDynamicIndex(pxarChunk.wrid, hex.EncodeToString(pxarChunk.chunkdigests.Sum(nil)), pxarChunk.pos, pxarChunk.chunkcount)

for k := 0; k < len(PCAT1_CHK.assignments); k += 128 {
for k := 0; k < len(pcat1Chunk.assignments); k += 128 {
k2 := k + 128
if k2 > len(PCAT1_CHK.assignments) {
k2 = len(PCAT1_CHK.assignments)
if k2 > len(pcat1Chunk.assignments) {
k2 = len(pcat1Chunk.assignments)
}
client.AssignChunks(PCAT1_CHK.wrid, PCAT1_CHK.assignments[k:k2], PCAT1_CHK.assignments_offset[k:k2])
client.AssignChunks(pcat1Chunk.wrid, pcat1Chunk.assignments[k:k2], pcat1Chunk.assignments_offset[k:k2])
}

client.CloseDynamicIndex(PCAT1_CHK.wrid, hex.EncodeToString(PCAT1_CHK.chunkdigests.Sum(nil)), PCAT1_CHK.pos, PCAT1_CHK.chunkcount)
client.CloseDynamicIndex(pcat1Chunk.wrid, hex.EncodeToString(pcat1Chunk.chunkdigests.Sum(nil)), pcat1Chunk.pos, pcat1Chunk.chunkcount)

client.UploadManifest()
client.Finish()

fmt.Printf("New %d , Reused %d\n", newchunk, reusechunk)

//Remove VSS snapshot on windows, on linux for now NOP
VSSCleanup()
fmt.Printf("New %d , Reused %d\n", newchunk.Load(), reusechunk.Load())
if runtime.GOOS == "windows" {
systray.Quit()
beeep.Notify("Proxmox Backup Go", fmt.Sprintf("Backup complete\nChunks New %d , Reused %d\n", newchunk, reusechunk), "")
beeep.Notify("Proxmox Backup Go", fmt.Sprintf("Backup complete\nChunks New %d , Reused %d\n", newchunk.Load(), reusechunk.Load()), "")
}

}
2 changes: 1 addition & 1 deletion pbsapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (pbs *PBSClient) Connect(reader bool) {
conn.Write([]byte("Upgrade: proxmox-backup-reader-protocol-v1\r\n"))
}
conn.Write([]byte("Connection: Upgrade\r\n\r\n"))
fmt.Println("Reading response to upgrade...\n")
fmt.Printf("Reading response to upgrade...\n")
buf := make([]byte, 0)
for !strings.HasSuffix(string(buf), "\r\n\r\n") && !strings.HasSuffix(string(buf), "\n\n") {
//fmt.Println(buf)
Expand Down
3 changes: 1 addition & 2 deletions pxar.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ func (a *PXARArchive) WriteDir(path string, dirname string, toplevel bool) Catal
a.Flush()

goodbyteitems := make([]GoodByeItem, 0)

catalog_files := make([]CatalogFile, 0)
catalog_dirs := make([]CatalogDir, 0)

Expand All @@ -301,8 +300,8 @@ func (a *PXARArchive) WriteDir(path string, dirname string, toplevel bool) Catal
})
} else {
F := a.WriteFile(filepath.Join(path, file.Name()), file.Name())
catalog_files = append(catalog_files, F)

catalog_files = append(catalog_files, F)
goodbyteitems = append(goodbyteitems, GoodByeItem{
offset: startpos,
hash: siphash.Hash(0x83ac3f1cfbb450db, 0xaa4f1b6879369fbd, []byte(file.Name())),
Expand Down

0 comments on commit 8366461

Please sign in to comment.