Skip to content

Commit

Permalink
Add a storage backend for Storj
Browse files Browse the repository at this point in the history
The url format is storj://satellite/bucket/path.  You can get the
satellite along with the api access key when requesting an Access
Grant of type API Access.
  • Loading branch information
gilbertchen committed Sep 30, 2022
1 parent cde660e commit 6009f64
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/duplicacy_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,23 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor
}
SavePassword(preference, "fabric_token", token)
return smeStorage
} else if matched[1] == "storj" {
satellite := matched[2] + matched[3]
bucket := matched[5]
storageDir := ""
index := strings.Index(bucket, "/")
if index >= 0 {
storageDir = bucket[index + 1:]
bucket = bucket[:index]
}
apiKey := GetPassword(preference, "storj_key", "Enter the API access key:", true, resetPassword)
passphrase := GetPassword(preference, "storj_passphrase", "Enter the passphrase:", true, resetPassword)
storjStorage, err := CreateStorjStorage(satellite, apiKey, passphrase, bucket, storageDir, threads)
if err != nil {
LOG_ERROR("STORAGE_CREATE", "Failed to load the Storj storage at %s: %v", storageURL, err)
return nil
}
return storjStorage
} else {
LOG_ERROR("STORAGE_CREATE", "The storage type '%s' is not supported", matched[1])
return nil
Expand Down
7 changes: 7 additions & 0 deletions src/duplicacy_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func loadStorage(localStoragePath string, threads int) (Storage, error) {
}
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err
} else if *testStorageName == "storj" {
storage, err := CreateStorjStorage(config["satellite"], config["key"], config["passphrase"], config["bucket"], config["storage_path"], threads)
if err != nil {
return nil, err
}
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, err
}

return nil, fmt.Errorf("Invalid storage named: %s", *testStorageName)
Expand Down
184 changes: 184 additions & 0 deletions src/duplicacy_storjstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright (c) Acrosync LLC. All rights reserved.
// Free for personal use and commercial trial
// Commercial use requires per-user licenses available from https://duplicacy.com

package duplicacy

import (
"fmt"
"io"
"context"

"storj.io/uplink"
)

// StorjStorage is a storage backend for Storj.
type StorjStorage struct {
StorageBase

project *uplink.Project
bucket string
storageDir string
numberOfThreads int
}

// CreateStorjStorage creates a Storj storage.
func CreateStorjStorage(satellite string, apiKey string, passphrase string,
bucket string, storageDir string, threads int) (storage *StorjStorage, err error) {

ctx := context.Background()
access, err := uplink.RequestAccessWithPassphrase(ctx, satellite, apiKey, passphrase)
if err != nil {
return nil, fmt.Errorf("cannot request the access grant: %v", err)
}

project, err := uplink.OpenProject(ctx, access)
if err != nil {
return nil, fmt.Errorf("cannot open the project: %v", err)
}

_, err = project.StatBucket(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("cannot found the bucket: %v", err)
}

if storageDir != "" && storageDir[len(storageDir) - 1] != '/' {
storageDir += "/"
}

storage = &StorjStorage {
project: project,
bucket: bucket,
storageDir: storageDir,
numberOfThreads: threads,
}

storage.DerivedStorage = storage
storage.SetDefaultNestingLevels([]int{2, 3}, 2)
return storage, nil
}

// ListFiles return the list of files and subdirectories under 'dir' (non-recursively).
func (storage *StorjStorage) ListFiles(threadIndex int, dir string) (
files []string, sizes []int64, err error) {

fullPath := storage.storageDir + dir
if fullPath != "" && fullPath[len(fullPath) - 1] != '/' {
fullPath += "/"
}

options := uplink.ListObjectsOptions {
Prefix: fullPath,
System: true, // request SystemMetadata which includes ContentLength
}
objects := storage.project.ListObjects(context.Background(), storage.bucket, &options)
for objects.Next() {
if objects.Err() != nil {
return nil, nil, objects.Err()
}
item := objects.Item()
name := item.Key[len(fullPath):]
size := item.System.ContentLength
if item.IsPrefix {
if name != "" && name[len(name) - 1] != '/' {
name += "/"
size = 0
}
}
files = append(files, name)
sizes = append(sizes, size)
}

return files, sizes, nil
}

// DeleteFile deletes the file or directory at 'filePath'.
func (storage *StorjStorage) DeleteFile(threadIndex int, filePath string) (err error) {

_, err = storage.project.DeleteObject(context.Background(), storage.bucket,
storage.storageDir + filePath)
return err
}

// MoveFile renames the file.
func (storage *StorjStorage) MoveFile(threadIndex int, from string, to string) (err error) {
err = storage.project.MoveObject(context.Background(), storage.bucket,
storage.storageDir + from, storage.bucket, storage.storageDir + to, nil)

return err
}

// CreateDirectory creates a new directory.
func (storage *StorjStorage) CreateDirectory(threadIndex int, dir string) (err error) {
return nil
}

// GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *StorjStorage) GetFileInfo(threadIndex int, filePath string) (
exist bool, isDir bool, size int64, err error) {
info, err := storage.project.StatObject(context.Background(), storage.bucket,
storage.storageDir + filePath)

if info == nil {
return false, false, 0, nil
} else if err != nil {
return false, false, 0, err
} else {
return true, info.IsPrefix, info.System.ContentLength, nil
}
}

// DownloadFile reads the file at 'filePath' into the chunk.
func (storage *StorjStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
file, err := storage.project.DownloadObject(context.Background(), storage.bucket,
storage.storageDir + filePath, nil)
if err != nil {
return err
}
defer file.Close()

if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
return err
}

return nil
}

// UploadFile writes 'content' to the file at 'filePath'
func (storage *StorjStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) {

file, err := storage.project.UploadObject(context.Background(), storage.bucket,
storage.storageDir + filePath, nil)
if err != nil {
return err
}

reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = io.Copy(file, reader)
if err != nil {
return err
}

err = file.Commit()
if err != nil {
return err
}

return nil
}

// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
// managing snapshots.
func (storage *StorjStorage) IsCacheNeeded() bool { return true }

// If the 'MoveFile' method is implemented.
func (storage *StorjStorage) IsMoveFileImplemented() bool { return true }

// If the storage can guarantee strong consistency.
func (storage *StorjStorage) IsStrongConsistent() bool { return false }

// If the storage supports fast listing of files names.
func (storage *StorjStorage) IsFastListing() bool { return true }

// Enable the test mode.
func (storage *StorjStorage) EnableTestMode() {}

2 comments on commit 6009f64

@gilbertchen
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Duplicacy Forum. There might be relevant details there:

https://forum.duplicacy.com/t/storj-storage-backend-added/6738/1

@gilbertchen
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Duplicacy Forum. There might be relevant details there:

https://forum.duplicacy.com/t/here-is-an-article-i-wrote-for-using-the-cli-to-backup-to-storj-cloud-beta-re-writing-sections/6896/18

Please sign in to comment.