Skip to content

Commit

Permalink
Free memory on request finish (#240)
Browse files Browse the repository at this point in the history
* refactor(asyncloader): free memory on request finish

* test(responsecache): test free behavior

* style(asyncloader): cleanup code to check error

* style(responsecache): add explanatory comment

* fix(asyncloader): only free when there are blocks left to free
  • Loading branch information
hannahhoward authored Oct 7, 2021
1 parent 9c5c74c commit e389abd
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 21 deletions.
14 changes: 10 additions & 4 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,22 @@ func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) {
al.stateLk.Lock()
defer al.stateLk.Unlock()
responseCache := al.responseCache
aq, ok := al.requestQueues[requestID]
if ok {
al.alternateQueues[aq].responseCache.FinishRequest(requestID)
responseCache = al.alternateQueues[aq].responseCache
delete(al.requestQueues, requestID)
return
}
al.responseCache.FinishRequest(requestID)
toFree := responseCache.FinishRequest(requestID)
if toFree > 0 {
err := al.allocator.ReleaseBlockMemory(p, toFree)
if err != nil {
log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err)
}
}
}

func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestRegisterUnregister(t *testing.T) {
err = asyncLoader.UnregisterPersistenceOption("other")
require.EqualError(t, err, "cannot unregister while requests are in progress")
asyncLoader.CompleteResponsesFor(requestID2)
asyncLoader.CleanupRequest(requestID2)
asyncLoader.CleanupRequest(p, requestID2)
err = asyncLoader.UnregisterPersistenceOption("other")
require.NoError(t, err)

Expand Down
17 changes: 12 additions & 5 deletions requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var log = logging.Logger("graphsync")
// UnverifiedBlockStore is an interface for storing blocks
// as they come in and removing them as they are verified
type UnverifiedBlockStore interface {
PruneBlocks(func(ipld.Link) bool)
PruneBlocks(func(ipld.Link, uint64) bool)
PruneBlock(ipld.Link)
VerifyBlock(ipld.Link, ipld.LinkContext) ([]byte, error)
AddUnverifiedBlock(ipld.Link, []byte)
Expand All @@ -42,15 +42,22 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache {
}

// FinishRequest indicate there is no more need to track blocks tied to this
// response
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) {
// response. It returns the total number of bytes in blocks that were being
// tracked but are no longer in memory
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 {
rc.responseCacheLk.Lock()
rc.linkTracker.FinishRequest(requestID)

rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link) bool {
return rc.linkTracker.BlockRefCount(link) == 0
toFree := uint64(0)
rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool {
shouldPrune := rc.linkTracker.BlockRefCount(link) == 0
if shouldPrune {
toFree += amt
}
return shouldPrune
})
rc.responseCacheLk.Unlock()
return toFree
}

// AttemptLoad attempts to laod the given block from the cache
Expand Down
10 changes: 6 additions & 4 deletions requestmanager/asyncloader/responsecache/responsecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []by
ubs.inMemoryBlocks[lnk] = data
}

func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) {
for link := range ubs.inMemoryBlocks {
if shouldPrune(link) {
func (ubs *fakeUnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) {
for link, data := range ubs.inMemoryBlocks {
if shouldPrune(link, uint64(len(data))) {
delete(ubs.inMemoryBlocks, link)
}
}
Expand Down Expand Up @@ -134,14 +134,16 @@ func TestResponseCacheManagingLinks(t *testing.T) {
require.NoError(t, err)
require.Nil(t, data, "no data should be returned for unknown block")

responseCache.FinishRequest(requestID1)
toFree := responseCache.FinishRequest(requestID1)
// should remove only block 0, since it now has no refering outstanding requests
require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0])
require.Equal(t, toFree, uint64(len(blks[0].RawData())))

responseCache.FinishRequest(requestID2)
// should remove last block since are no remaining references
require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3])
require.Equal(t, toFree, uint64(len(blks[3].RawData())))

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte)

// PruneBlocks removes blocks from the unverified store without committing them,
// if the passed in function returns true for the given link
func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) {
func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) {
for link, data := range ubs.inMemoryBlocks {
if shouldPrune(link) {
if shouldPrune(link, uint64(len(data))) {
delete(ubs.inMemoryBlocks, link)
ubs.dataSize = ubs.dataSize - uint64(len(data))
}
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type AsyncLoader interface {
blks []blocks.Block)
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID graphsync.RequestID)
CleanupRequest(requestID graphsync.RequestID)
CleanupRequest(p peer.ID, requestID graphsync.RequestID)
}

// RequestManager tracks outgoing requests and processes incoming reponses
Expand Down
4 changes: 2 additions & 2 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func TestPauseResume(t *testing.T) {
// verify no further responses come through
time.Sleep(100 * time.Millisecond)
testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
td.fal.CleanupRequest(rr.gsr.ID())
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
Expand Down Expand Up @@ -893,7 +893,7 @@ func TestPauseResumeExternal(t *testing.T) {
// verify no further responses come through
time.Sleep(100 * time.Millisecond)
testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
td.fal.CleanupRequest(rr.gsr.ID())
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i
rm.connManager.Unprotect(ipr.p, requestID.Tag())
delete(rm.inProgressRequestStatuses, requestID)
ipr.cancelFn()
rm.asyncLoader.CleanupRequest(requestID)
rm.asyncLoader.CleanupRequest(ipr.p, requestID)
if ipr.traverser != nil {
ipr.traverserCancel()
ipr.traverser.Shutdown(rm.ctx)
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/testloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (fal *FakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID)

// CleanupRequest simulates the effect of cleaning up the request by removing any response channels
// for the request
func (fal *FakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
func (fal *FakeAsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) {
fal.responseChannelsLk.Lock()
for key := range fal.responseChannels {
if key.requestID == requestID {
Expand Down

0 comments on commit e389abd

Please sign in to comment.