Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(upload): check space left in allocation before uploading #600

Merged
merged 6 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev/blobber/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func RegisterHandlers(r *mux.Router, m mock.ResponseMap) {
r.HandleFunc("/v1/writemarker/lock/{allocation}", mock.WithResponse(m)).Methods(http.MethodDelete)
r.HandleFunc("/v1/hashnode/root/{allocation}", mock.WithResponse(m)).Methods(http.MethodGet)

r.HandleFunc("/v1/file/meta/{allocation}", mock.WithResponse(m)).Methods(http.MethodPost)

r.NotFoundHandler = Handle404(m)
}

Expand Down
3 changes: 3 additions & 0 deletions sdks/blobber/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ const (

// EndpointRootHashnode api endpoint of getting root hashnode of an allocation
EndpointRootHashnode = "/v1/hashnode/root/"

// EndpointFileMeta api endpoint of file meta
EndpointFileMeta = "/v1/file/meta/"
)
36 changes: 36 additions & 0 deletions zboxcore/sdk/allocation_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestAllocation_UpdateFile(t *testing.T) {
Tx: "TestAllocation_UpdateFile",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
}
setupMockAllocation(t, a)

Expand All @@ -57,6 +58,10 @@ func TestAllocation_UpdateFile(t *testing.T) {
StatusCode: http.StatusOK,
Body: respBuf,
}
respMap[http.MethodPost+":"+blobber.EndpointFileMeta+a.Tx] = devmock.Response{
StatusCode: http.StatusOK,
Body: []byte("{\"actual_file_size\":1}"),
}

server := dev.NewBlobberServer(respMap)
defer server.Close()
Expand All @@ -83,6 +88,7 @@ func TestAllocation_UploadFile(t *testing.T) {
Tx: "TestAllocation_UploadFile",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
}

resp := &WMLockResult{
Expand All @@ -95,6 +101,10 @@ func TestAllocation_UploadFile(t *testing.T) {
StatusCode: http.StatusOK,
Body: respBuf,
}
respMap[http.MethodPost+":"+blobber.EndpointFileMeta+a.Tx] = devmock.Response{
StatusCode: http.StatusOK,
Body: []byte("{\"actual_file_size\":1}"),
}

server := dev.NewBlobberServer(respMap)

Expand Down Expand Up @@ -146,6 +156,11 @@ func TestAllocation_UpdateFileWithThumbnail(t *testing.T) {
StatusCode: http.StatusOK,
Body: respBuf,
}
respMap[http.MethodPost+":"+blobber.EndpointFileMeta+"TestAllocation_UpdateFileWithThumbnail"] = devmock.Response{
StatusCode: http.StatusOK,
Body: []byte("{\"actual_file_size\":1}"),
}

server := dev.NewBlobberServer(respMap)
defer server.Close()

Expand All @@ -158,6 +173,7 @@ func TestAllocation_UpdateFileWithThumbnail(t *testing.T) {
Tx: "TestAllocation_UpdateFileWithThumbnail",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
}

if teardown := setupMockFileAndReferencePathResult(t, a.ID, mockLocalPath); teardown != nil {
Expand Down Expand Up @@ -204,6 +220,7 @@ func TestAllocation_UploadFileWithThumbnail(t *testing.T) {
Tx: "TestAllocation_UploadFileWithThumbnail",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
}

resp := &WMLockResult{
Expand All @@ -216,6 +233,11 @@ func TestAllocation_UploadFileWithThumbnail(t *testing.T) {
StatusCode: http.StatusOK,
Body: respBuf,
}
respMap[http.MethodPost+":"+blobber.EndpointFileMeta+a.Tx] = devmock.Response{
StatusCode: http.StatusOK,
Body: []byte("{\"actual_file_size\":1}"),
}

server := dev.NewBlobberServer(respMap)
defer server.Close()

Expand All @@ -241,6 +263,7 @@ func TestAllocation_EncryptAndUpdateFile(t *testing.T) {
Tx: "TestAllocation_EncryptAndUpdateFile",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
}

if teardown := setupMockFileAndReferencePathResult(t, a.Tx, mockLocalPath); teardown != nil {
Expand All @@ -257,6 +280,11 @@ func TestAllocation_EncryptAndUpdateFile(t *testing.T) {
StatusCode: http.StatusOK,
Body: respBuf,
}
respMap[http.MethodPost+":"+blobber.EndpointFileMeta+a.Tx] = devmock.Response{
StatusCode: http.StatusOK,
Body: []byte("{\"actual_file_size\":1}"),
}

server := dev.NewBlobberServer(respMap)
defer server.Close()

Expand Down Expand Up @@ -284,6 +312,7 @@ func TestAllocation_EncryptAndUploadFile(t *testing.T) {
Tx: "TestAllocation_EncryptAndUploadFile",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
}

resp := &WMLockResult{
Expand All @@ -296,6 +325,11 @@ func TestAllocation_EncryptAndUploadFile(t *testing.T) {
StatusCode: http.StatusOK,
Body: respBuf,
}
respMap[http.MethodPost+":"+blobber.EndpointFileMeta+a.Tx] = devmock.Response{
StatusCode: http.StatusOK,
Body: []byte("{\"actual_file_size\":1}"),
}

server := dev.NewBlobberServer(respMap)
defer server.Close()

Expand Down Expand Up @@ -367,6 +401,7 @@ func TestAllocation_EncryptAndUploadFileWithThumbnail(t *testing.T) {
Tx: "TestAllocation_EncryptAndUploadFileWithThumbnail",
ParityShards: 2,
DataShards: 2,
Size: 2 * GB,
ctx: context.TODO(),
}

Expand Down Expand Up @@ -783,6 +818,7 @@ func TestAllocation_RepairFile(t *testing.T) {
a := &Allocation{
ParityShards: tt.numBlobbers / 2,
DataShards: tt.numBlobbers / 2,
Size: 2 * GB,
}
a.uploadChan = make(chan *UploadRequest, 10)
a.downloadChan = make(chan *DownloadRequest, 10)
Expand Down
41 changes: 32 additions & 9 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ var (
return coreEncryption.Hash(left + right)
}

ErrInvalidChunkSize = errors.New("chunk: chunk size is too small. it must greater than 272 if file is uploaded with encryption")
ErrInvalidChunkSize = errors.New("chunk: chunk size is too small. it must greater than 272 if file is uploaded with encryption")
ErrNoEnoughSpaceLeftInAllocation = errors.New("alloc: no enough space left in allocation")
)

// DefaultChunkSize default chunk size for file and thumbnail
Expand Down Expand Up @@ -90,6 +91,25 @@ func CreateChunkedUpload(
return nil, thrown.Throw(constants.ErrInvalidParameter, "allocationObj")
}

opCode := OpUpload
spaceLeft := allocationObj.Size
if allocationObj.Stats != nil {
spaceLeft -= allocationObj.Stats.UsedSize
}

if isUpdate {
f, err := allocationObj.GetFileMeta(fileMeta.RemotePath)
if err != nil {
return nil, err
}
spaceLeft += f.ActualFileSize
opCode = OpUpdate
}

if fileMeta.ActualSize > spaceLeft {
return nil, ErrNoEnoughSpaceLeftInAllocation
}

consensus := Consensus{
mu: &sync.RWMutex{},
consensusThresh: allocationObj.consensusThreshold,
Expand All @@ -98,6 +118,7 @@ func CreateChunkedUpload(

uploadMask := zboxutil.NewUint128(1).Lsh(uint64(len(allocationObj.Blobbers))).Sub64(1)
if isRepair {
opCode = OpUpdate
found, repairRequired, _, err := allocationObj.RepairRequired(fileMeta.RemotePath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -129,6 +150,7 @@ func CreateChunkedUpload(
commitTimeOut: DefaultUploadTimeOut,
maskMu: &sync.Mutex{},
ctx: allocationObj.ctx,
opCode: opCode,
}

if isUpdate {
Expand Down Expand Up @@ -287,6 +309,7 @@ type ChunkedUpload struct {
// isRepair identifies if upload is repair operation
isRepair bool

opCode int
uploadTimeOut time.Duration
commitTimeOut time.Duration
maskMu *sync.Mutex
Expand Down Expand Up @@ -377,7 +400,7 @@ func (su *ChunkedUpload) createEncscheme() encryption.EncryptionScheme {
func (su *ChunkedUpload) Start() error {

if su.statusCallback != nil {
su.statusCallback.Started(su.allocationObj.ID, su.fileMeta.RemotePath, OpUpload, int(su.fileMeta.ActualSize)+int(su.fileMeta.ActualThumbnailSize))
su.statusCallback.Started(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(su.fileMeta.ActualSize)+int(su.fileMeta.ActualThumbnailSize))
}

for {
Expand All @@ -387,7 +410,7 @@ func (su *ChunkedUpload) Start() error {
// chunk, err := su.chunkReader.Next()
if err != nil {
if su.statusCallback != nil {
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, OpUpload, err)
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, su.opCode, err)
}
return err
}
Expand All @@ -400,7 +423,7 @@ func (su *ChunkedUpload) Start() error {
su.fileMeta.ActualHash, err = su.fileHasher.GetFileHash()
if err != nil {
if su.statusCallback != nil {
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, OpUpload, err)
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, su.opCode, err)
}
return err
}
Expand All @@ -416,7 +439,7 @@ func (su *ChunkedUpload) Start() error {
err = su.processUpload(chunks.chunkStartIndex, chunks.chunkEndIndex, chunks.fileShards, chunks.thumbnailShards, chunks.isFinal, chunks.totalReadSize)
if err != nil {
if su.statusCallback != nil {
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, OpUpload, err)
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, su.opCode, err)
}
return err
}
Expand All @@ -429,7 +452,7 @@ func (su *ChunkedUpload) Start() error {
su.saveProgress()

if su.statusCallback != nil {
su.statusCallback.InProgress(su.allocationObj.ID, su.fileMeta.RemotePath, OpUpload, int(su.progress.UploadLength), nil)
su.statusCallback.InProgress(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(su.progress.UploadLength), nil)
}
}

Expand All @@ -455,7 +478,7 @@ func (su *ChunkedUpload) Start() error {

if err != nil {
if su.statusCallback != nil {
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, OpUpload, err)
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.Path, su.opCode, err)
}
return err
}
Expand Down Expand Up @@ -618,13 +641,13 @@ func (su *ChunkedUpload) processCommit() error {
su.allocationObj.deleteFile(su.fileMeta.RemotePath, su.consensus.getConsensus(), su.consensus.getConsensus()) //nolint
}
if su.statusCallback != nil {
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.RemotePath, OpUpload, err)
su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, err)
}
return err
}

if su.statusCallback != nil {
su.statusCallback.Completed(su.allocationObj.ID, su.fileMeta.RemotePath, su.fileMeta.RemoteName, su.fileMeta.MimeType, int(su.progress.UploadLength), OpUpload)
su.statusCallback.Completed(su.allocationObj.ID, su.fileMeta.RemotePath, su.fileMeta.RemoteName, su.fileMeta.MimeType, int(su.progress.UploadLength), su.opCode)
}

return nil
Expand Down