Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize graph object buffering and flushing #395

Merged
merged 26 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b9ef640
Optimize graph object buffering and flushing
austinkelleher Dec 13, 2020
319f1d2
Improve tests for FileSystemGraphObjectStore
austinkelleher Dec 14, 2020
27949a9
Throw if the two graph object maps get out of sync
austinkelleher Dec 16, 2020
6db9779
Initial continuous upload support
austinkelleher Dec 15, 2020
8d22caf
More tests around continuous uploads and various improvements
austinkelleher Dec 16, 2020
8196339
Additional test for FileSystemGraphObjectStore callbacks
austinkelleher Dec 16, 2020
1271491
Mark step as a failure if uploading fails in a step
austinkelleher Dec 16, 2020
b05beb8
Export relevant functions and types from uploader
austinkelleher Dec 16, 2020
5c0ad98
Remove old comment, update test descriptions.
austinkelleher Dec 16, 2020
b1c0804
Share graph object creation test utils across tests and cleanup
austinkelleher Dec 16, 2020
43e57a9
Fix typo in test function
austinkelleher Dec 16, 2020
2b1052c
Add tests for job state upload calls
austinkelleher Dec 16, 2020
47843b7
Fix test function names
austinkelleher Dec 16, 2020
ca68b8a
Test assertion improvements
austinkelleher Dec 16, 2020
06b3394
Only write prettified files to the file system on local collection
austinkelleher Dec 16, 2020
55bb7b5
Change prettyFile to prettifyFiles
austinkelleher Dec 16, 2020
2059ac5
Merge pull request #399 from JupiterOne/1849-unpretty-local-files
austinkelleher Dec 16, 2020
a627688
Merge pull request #398 from JupiterOne/1765-continuous-upload-tests
austinkelleher Dec 16, 2020
6a2c2bf
Merge pull request #397 from JupiterOne/1848-test-cleanup
austinkelleher Dec 16, 2020
b2b44c3
Merge pull request #396 from JupiterOne/1765-continuous-uploads
austinkelleher Dec 16, 2020
aba2ed6
Improve uploader queue to respect queue size instead of waiting for idle
austinkelleher Dec 18, 2020
cfbada4
Change queue size check to >= for safety
austinkelleher Dec 18, 2020
e3d379a
Merge pull request #400 from JupiterOne/1896-queue-size
austinkelleher Dec 18, 2020
cd586e1
Used shared graph object utilities and update CHANGELOG.md
austinkelleher Dec 18, 2020
d370104
Add version to CHANGELOG.md
austinkelleher Dec 18, 2020
3cb5160
Publish
austinkelleher Dec 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions packages/integration-sdk-runtime/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { BucketMap, FileSystemGraphObjectStore } from '../index';
import { FileSystemGraphObjectStore } from '../index';

describe('#storage', () => {
test('should expose BucketMap', () => {
expect(BucketMap).not.toEqual(undefined);
});

test('should expose FileSystemGraphObjectStore', () => {
expect(FileSystemGraphObjectStore).not.toEqual(undefined);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import waitForExpect from 'wait-for-expect';

import {
Entity,
GraphObjectStore,
IntegrationError,
IntegrationExecutionContext,
IntegrationInstance,
Expand Down Expand Up @@ -132,13 +133,14 @@ describe('executeStepDependencyGraph', () => {
async function executeSteps(
steps: IntegrationStep[],
stepStartStates: StepStartStates = getDefaultStepStartStates(steps),
graphObjectStore: GraphObjectStore = new FileSystemGraphObjectStore(),
) {
return executeStepDependencyGraph({
executionContext,
inputGraph: buildStepDependencyGraph(steps),
stepStartStates,
duplicateKeyTracker: new DuplicateKeyTracker(),
graphObjectStore: new FileSystemGraphObjectStore(),
graphObjectStore,
});
}

Expand Down Expand Up @@ -423,20 +425,27 @@ describe('executeStepDependencyGraph', () => {
}
});

test('should perform a flush of the jobState after a step was executed', async () => {
let jobStateFlushSpy;

await executeSteps([
test('should perform a flush of the jobState after execution completed', async () => {
const steps: IntegrationStep[] = [
{
id: 'a',
name: 'a',
entities: [],
relationships: [],
executionHandler: ({ jobState }) => {
jobStateFlushSpy = jest.spyOn(jobState, 'flush');
executionHandler: () => {
return Promise.resolve();
},
},
]);
];

const graphObjectStore = new FileSystemGraphObjectStore();
const jobStateFlushSpy = jest.spyOn(graphObjectStore, 'flush');

await executeSteps(
steps,
getDefaultStepStartStates(steps),
graphObjectStore,
);

expect(jobStateFlushSpy).toHaveBeenCalledTimes(1);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ export function executeStepDependencyGraph<
status = StepResultStatus.FAILURE;
}

await context.jobState.flush();
updateStepResultStatus(stepId, status, typeTracker);
enqueueLeafSteps();
}
Expand All @@ -294,6 +293,7 @@ export function executeStepDependencyGraph<

void promiseQueue
.onIdle()
.then(() => graphObjectStore.flush())
.then(() => resolve([...stepResultsMap.values()]))
.catch(reject);
});
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,49 @@ import {
GraphObjectStore,
} from '@jupiterone/integration-sdk-core';

import { BucketMap } from './BucketMap';
import { flushDataToDisk } from './flushDataToDisk';
import {
iterateEntityTypeIndex,
iterateRelationshipTypeIndex,
} from './indices';
import { InMemoryGraphObjectStore } from '../memory';

export const GRAPH_OBJECT_BUFFER_THRESHOLD = 500; // arbitrarily selected, subject to tuning
export const DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD = 500;

// it is important that this value is set to 1
// to ensure that only one operation can be performed at a time.
const BINARY_SEMAPHORE_CONCURRENCY = 1;

export interface FileSystemGraphObjectStoreParams {
/**
* The maximum number of graph objects that this store can buffer in memory
* before writing to disk. Machines with more memory should consider bumping
* this value up.
*
* Default: 500
*/
graphObjectBufferThreshold: number;
}

export class FileSystemGraphObjectStore implements GraphObjectStore {
semaphore: Sema;
entityStorageMap: BucketMap<Entity>;
relationshipStorageMap: BucketMap<Relationship>;
private readonly semaphore: Sema;
private readonly localGraphObjectStore = new InMemoryGraphObjectStore();
private readonly graphObjectBufferThreshold: number;

constructor() {
this.entityStorageMap = new BucketMap();
this.relationshipStorageMap = new BucketMap();
constructor(params?: FileSystemGraphObjectStoreParams) {
this.semaphore = new Sema(BINARY_SEMAPHORE_CONCURRENCY);
this.graphObjectBufferThreshold =
params?.graphObjectBufferThreshold ||
DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD;
}

async addEntities(storageDirectoryPath: string, newEntities: Entity[]) {
this.entityStorageMap.add(storageDirectoryPath, newEntities);
this.localGraphObjectStore.addEntities(storageDirectoryPath, newEntities);

if (this.entityStorageMap.totalItemCount >= GRAPH_OBJECT_BUFFER_THRESHOLD) {
if (
this.localGraphObjectStore.getTotalEntityItemCount() >=
this.graphObjectBufferThreshold
austinkelleher marked this conversation as resolved.
Show resolved Hide resolved
) {
await this.flushEntitiesToDisk();
}
}
Expand All @@ -48,18 +63,27 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
storageDirectoryPath: string,
newRelationships: Relationship[],
) {
this.relationshipStorageMap.add(storageDirectoryPath, newRelationships);
this.localGraphObjectStore.addRelationships(
storageDirectoryPath,
newRelationships,
);

if (
this.relationshipStorageMap.totalItemCount >=
GRAPH_OBJECT_BUFFER_THRESHOLD
this.localGraphObjectStore.getTotalRelationshipItemCount() >=
this.graphObjectBufferThreshold
) {
await this.flushRelationshipsToDisk();
}
}

async getEntity({ _key, _type }: GraphObjectLookupKey): Promise<Entity> {
Copy link
Contributor

@mknoedel mknoedel Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought: what information is typically gotten from previous entities? My first thought would be that it would usually only be foreign keys, which is likely just an id. Is it worth considering when we flush InMemoryGraphObjectStore, we maintain a map similar to the implementation of DuplicateKeyTracker. That way we could likely store much more than 500 entities and relationships for most of the use-cases of getEntity.
Maybe we could get lucky and reduce roundtrips to the disk a good amount of the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are definitely lots of additional caching improvements that we can make!

await this.flushEntitiesToDisk();
const bufferedEntity = this.localGraphObjectStore.findEntity(_key);

if (bufferedEntity) {
// This entity has not yet been flushed to disk
return bufferedEntity;
}

const entities: Entity[] = [];

await this.iterateEntities({ _type }, async (e) => {
Expand Down Expand Up @@ -87,7 +111,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
) {
await this.flushEntitiesToDisk();
await this.localGraphObjectStore.iterateEntities(filter, iteratee);

await iterateEntityTypeIndex({
type: filter._type,
Expand All @@ -99,7 +123,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
filter: GraphObjectFilter,
iteratee: GraphObjectIteratee<T>,
) {
await this.flushRelationshipsToDisk();
await this.localGraphObjectStore.iterateRelationships(filter, iteratee);

await iterateRelationshipTypeIndex({
type: filter._type,
Expand All @@ -116,32 +140,35 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {

async flushEntitiesToDisk() {
await this.lockOperation(() =>
pMap([...this.entityStorageMap.keys()], (storageDirectoryPath) => {
const entities = this.entityStorageMap.get(storageDirectoryPath) ?? [];
this.entityStorageMap.delete(storageDirectoryPath);

return flushDataToDisk({
storageDirectoryPath,
collectionType: 'entities',
data: entities,
});
}),
pMap(
this.localGraphObjectStore.collectEntitiesByStep(),
async ([stepId, entities]) => {
await flushDataToDisk({
storageDirectoryPath: stepId,
collectionType: 'entities',
data: entities,
});

this.localGraphObjectStore.flushEntities(entities);
},
),
);
}

async flushRelationshipsToDisk() {
await this.lockOperation(() =>
pMap([...this.relationshipStorageMap.keys()], (storageDirectoryPath) => {
const relationships =
this.relationshipStorageMap.get(storageDirectoryPath) ?? [];
this.relationshipStorageMap.delete(storageDirectoryPath);

return flushDataToDisk({
storageDirectoryPath,
collectionType: 'relationships',
data: relationships,
});
}),
pMap(
this.localGraphObjectStore.collectRelationshipsByStep(),
async ([stepId, relationships]) => {
await flushDataToDisk({
storageDirectoryPath: stepId,
collectionType: 'relationships',
data: relationships,
});

this.localGraphObjectStore.flushRelationships(relationships);
},
),
);
}

Expand All @@ -166,7 +193,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore {
* and prematurely end, causing the next step to start up before the
* data it depends on is present on disk.
*/
async lockOperation(operation: () => Promise<any>) {
private async lockOperation<T>(operation: () => Promise<T>) {
austinkelleher marked this conversation as resolved.
Show resolved Hide resolved
await this.semaphore.acquire();
try {
await operation();
Expand Down

This file was deleted.

Loading