Skip to content

Commit

Permalink
Fix flushing stalls under pressure
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Jun 22, 2023
1 parent de98961 commit 6664103
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
2 changes: 2 additions & 0 deletions internal/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ func (inode *Inode) SendDelete() {
if err != nil {
log.Errorf("Failed to delete object %v: %v", key, err)
inode.mu.Unlock()
inode.fs.WakeupFlusher()
return
}
forget := false
Expand Down Expand Up @@ -1315,6 +1316,7 @@ func (dir *Inode) SendMkDir() {
dir.recordFlushError(err)
if err != nil {
log.Errorf("Failed to create directory object %v: %v", key, err)
dir.fs.WakeupFlusher()
return
}
if dir.CacheState == ST_CREATED || dir.CacheState == ST_MODIFIED {
Expand Down
8 changes: 4 additions & 4 deletions internal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,8 @@ func (inode *Inode) GetMultiReader(offset uint64, size uint64) (reader *MultiRea
func (inode *Inode) recordFlushError(err error) {
inode.flushError = err
inode.flushErrorTime = time.Now()
// The original idea was to schedule retry only if err != nil
// However, current version unblocks flushing in case of bugs, so... okay. Let it be
inode.fs.ScheduleRetryFlush()
}

Expand Down Expand Up @@ -1557,7 +1559,6 @@ func (inode *Inode) SendUpload() bool {
partLocked := false
partEvicted := false
partZero := false
hasEvictedParts := false
canComplete := true
processPart := func() bool {
// Don't flush parts that are being currently flushed
Expand Down Expand Up @@ -1591,8 +1592,6 @@ func (inode *Inode) SendUpload() bool {
return true
}
}
} else if partDirty && partEvicted {
hasEvictedParts = true
}
return false
}
Expand Down Expand Up @@ -1633,7 +1632,7 @@ func (inode *Inode) SendUpload() bool {
}
}
if canComplete && (inode.fileHandles == 0 || inode.forceFlush ||
atomic.LoadInt32(&inode.fs.wantFree) > 0 && hasEvictedParts) {
atomic.LoadInt32(&inode.fs.wantFree) > 0) {
// Complete the multipart upload
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, 1)
Expand All @@ -1645,6 +1644,7 @@ func (inode *Inode) SendUpload() bool {
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
}()
initiated = true
}

return initiated
Expand Down

0 comments on commit 6664103

Please sign in to comment.