Skip to content

Commit

Permalink
perf(opentelemetry): Bucket spans for cleanup (#14154)
Browse files Browse the repository at this point in the history
Co-authored-by: Luca Forstner <[email protected]>
  • Loading branch information
fmorett and lforst authored Nov 27, 2024
1 parent 3e7969f commit a4138e9
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 99 deletions.
35 changes: 18 additions & 17 deletions packages/node/test/integration/transactions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,17 +577,9 @@ describe('Integration | Transactions', () => {
throw new Error('No exporter found, aborting test...');
}

let innerSpan1Id: string | undefined;
let innerSpan2Id: string | undefined;

void Sentry.startSpan({ name: 'test name' }, async () => {
const subSpan = Sentry.startInactiveSpan({ name: 'inner span 1' });
innerSpan1Id = subSpan.spanContext().spanId;
subSpan.end();

Sentry.startSpan({ name: 'inner span 2' }, innerSpan => {
innerSpan2Id = innerSpan.spanContext().spanId;
});
Sentry.startInactiveSpan({ name: 'inner span 1' }).end();
Sentry.startInactiveSpan({ name: 'inner span 2' }).end();

// Pretend this is pending for 10 minutes
await new Promise(resolve => setTimeout(resolve, 10 * 60 * 1000));
Expand All @@ -596,7 +588,13 @@ describe('Integration | Transactions', () => {
jest.advanceTimersByTime(1);

// Child-spans have been added to the exporter, but they are pending since they are waiting for their parent
expect(exporter['_finishedSpans'].length).toBe(2);
const finishedSpans1 = [];
exporter['_finishedSpanBuckets'].forEach((bucket: any) => {
if (bucket) {
finishedSpans1.push(...bucket.spans);
}
});
expect(finishedSpans1.length).toBe(2);
expect(beforeSendTransaction).toHaveBeenCalledTimes(0);

// Now wait for 5 mins
Expand All @@ -608,18 +606,21 @@ describe('Integration | Transactions', () => {
jest.advanceTimersByTime(1);

// Old spans have been cleared away
expect(exporter['_finishedSpans'].length).toBe(0);
const finishedSpans2 = [];
exporter['_finishedSpanBuckets'].forEach((bucket: any) => {
if (bucket) {
finishedSpans2.push(...bucket.spans);
}
});
expect(finishedSpans2.length).toBe(0);

// Called once for the 'other span'
expect(beforeSendTransaction).toHaveBeenCalledTimes(1);

expect(logs).toEqual(
expect.arrayContaining([
'SpanExporter has 1 unsent spans remaining',
'SpanExporter has 2 unsent spans remaining',
'SpanExporter exported 1 spans, 2 unsent spans remaining',
`SpanExporter dropping span inner span 1 (${innerSpan1Id}) because it is pending for more than 5 minutes.`,
`SpanExporter dropping span inner span 2 (${innerSpan2Id}) because it is pending for more than 5 minutes.`,
'SpanExporter dropped 2 spans because they were pending for more than 300 seconds.',
'SpanExporter exported 1 spans, 0 unsent spans remaining',
]),
);
});
Expand Down
162 changes: 100 additions & 62 deletions packages/opentelemetry/src/spanExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,60 +35,121 @@ type SpanNodeCompleted = SpanNode & { span: ReadableSpan };
const MAX_SPAN_COUNT = 1000;
const DEFAULT_TIMEOUT = 300; // 5 min

interface FinishedSpanBucket {
timestampInS: number;
spans: Set<ReadableSpan>;
}

/**
* A Sentry-specific exporter that converts OpenTelemetry Spans to Sentry Spans & Transactions.
*/
export class SentrySpanExporter {
private _flushTimeout: ReturnType<typeof setTimeout> | undefined;
private _finishedSpans: ReadableSpan[];
private _timeout: number;

public constructor(options?: { timeout?: number }) {
this._finishedSpans = [];
this._timeout = options?.timeout || DEFAULT_TIMEOUT;
/*
* A quick explanation on the buckets: We do bucketing of finished spans for efficiency. This span exporter is
* accumulating spans until a root span is encountered and then it flushes all the spans that are descendants of that
* root span. Because it is totally in the realm of possibilities that root spans are never finished, and we don't
* want to accumulate spans indefinitely in memory, we need to periodically evacuate spans. Naively we could simply
* store the spans in an array and each time a new span comes in we could iterate through the entire array and
* evacuate all spans that have an end-timestamp that is older than our limit. This could get quite expensive because
* we would have to iterate a potentially large number of spans every time we evacuate. We want to avoid these large
* bursts of computation.
*
* Instead we go for a bucketing approach and put spans into buckets, based on what second
* (modulo the time limit) the span was put into the exporter. With buckets, when we decide to evacuate, we can
* iterate through the bucket entries instead, which have an upper bound of items, making the evacuation much more
* efficient. Cleaning up also becomes much more efficient since it simply involves de-referencing a bucket within the
* bucket array, and letting garbage collection take care of the rest.
*/
private _finishedSpanBuckets: (FinishedSpanBucket | undefined)[];
private _finishedSpanBucketSize: number;
private _spansToBucketEntry: WeakMap<ReadableSpan, FinishedSpanBucket>;
private _lastCleanupTimestampInS: number;

public constructor(options?: {
/** Lower bound of time in seconds until spans that are buffered but have not been sent as part of a transaction get cleared from memory. */
timeout?: number;
}) {
this._finishedSpanBucketSize = options?.timeout || DEFAULT_TIMEOUT;
this._finishedSpanBuckets = new Array(this._finishedSpanBucketSize).fill(undefined);
this._lastCleanupTimestampInS = Math.floor(Date.now() / 1000);
this._spansToBucketEntry = new WeakMap();
}

/** Export a single span. */
public export(span: ReadableSpan): void {
this._finishedSpans.push(span);

// If the span has a local parent ID, we don't need to export anything just yet
if (getLocalParentId(span)) {
const openSpanCount = this._finishedSpans.length;
DEBUG_BUILD && logger.log(`SpanExporter has ${openSpanCount} unsent spans remaining`);
this._cleanupOldSpans();
return;
const currentTimestampInS = Math.floor(Date.now() / 1000);

if (this._lastCleanupTimestampInS !== currentTimestampInS) {
let droppedSpanCount = 0;
this._finishedSpanBuckets.forEach((bucket, i) => {
if (bucket && bucket.timestampInS <= currentTimestampInS - this._finishedSpanBucketSize) {
droppedSpanCount += bucket.spans.size;
this._finishedSpanBuckets[i] = undefined;
}
});
if (droppedSpanCount > 0) {
DEBUG_BUILD &&
logger.log(
`SpanExporter dropped ${droppedSpanCount} spans because they were pending for more than ${this._finishedSpanBucketSize} seconds.`,
);
}
this._lastCleanupTimestampInS = currentTimestampInS;
}

this._clearTimeout();

// If we got a parent span, we try to send the span tree
// Wait a tick for this, to ensure we avoid race conditions
this._flushTimeout = setTimeout(() => {
this.flush();
}, 1);
const currentBucketIndex = currentTimestampInS % this._finishedSpanBucketSize;
const currentBucket = this._finishedSpanBuckets[currentBucketIndex] || {
timestampInS: currentTimestampInS,
spans: new Set(),
};
this._finishedSpanBuckets[currentBucketIndex] = currentBucket;
currentBucket.spans.add(span);
this._spansToBucketEntry.set(span, currentBucket);

// If the span doesn't have a local parent ID (it's a root span), we're gonna flush all the ended spans
if (!getLocalParentId(span)) {
this._clearTimeout();

// If we got a parent span, we try to send the span tree
// Wait a tick for this, to ensure we avoid race conditions
this._flushTimeout = setTimeout(() => {
this.flush();
}, 1);
}
}

/** Try to flush any pending spans immediately. */
public flush(): void {
this._clearTimeout();

const openSpanCount = this._finishedSpans.length;
const finishedSpans: ReadableSpan[] = [];
this._finishedSpanBuckets.forEach(bucket => {
if (bucket) {
finishedSpans.push(...bucket.spans);
}
});

const sentSpans = maybeSend(finishedSpans);

const remainingSpans = maybeSend(this._finishedSpans);
const sentSpanCount = sentSpans.size;

const remainingOpenSpanCount = remainingSpans.length;
const sentSpanCount = openSpanCount - remainingOpenSpanCount;
const remainingOpenSpanCount = finishedSpans.length - sentSpanCount;

DEBUG_BUILD &&
logger.log(`SpanExporter exported ${sentSpanCount} spans, ${remainingOpenSpanCount} unsent spans remaining`);

this._cleanupOldSpans(remainingSpans);
sentSpans.forEach(span => {
const bucketEntry = this._spansToBucketEntry.get(span);
if (bucketEntry) {
bucketEntry.spans.delete(span);
}
});
}

/** Clear the exporter. */
public clear(): void {
this._finishedSpans = [];
this._finishedSpanBuckets = this._finishedSpanBuckets.fill(undefined);
this._clearTimeout();
}

Expand All @@ -99,52 +160,33 @@ export class SentrySpanExporter {
this._flushTimeout = undefined;
}
}

/**
* Remove any span that is older than 5min.
* We do this to avoid leaking memory.
*/
private _cleanupOldSpans(spans = this._finishedSpans): void {
const currentTimeSeconds = Date.now() / 1000;
this._finishedSpans = spans.filter(span => {
const shouldDrop = shouldCleanupSpan(span, currentTimeSeconds, this._timeout);
DEBUG_BUILD &&
shouldDrop &&
logger.log(
`SpanExporter dropping span ${span.name} (${
span.spanContext().spanId
}) because it is pending for more than 5 minutes.`,
);
return !shouldDrop;
});
}
}

/**
* Send the given spans, but only if they are part of a finished transaction.
*
* Returns the unsent spans.
* Returns the sent spans.
* Spans remain unsent when their parent span is not yet finished.
* This will happen regularly, as child spans are generally finished before their parents.
* But it _could_ also happen because, for whatever reason, a parent span was lost.
* In this case, we'll eventually need to clean this up.
*/
function maybeSend(spans: ReadableSpan[]): ReadableSpan[] {
function maybeSend(spans: ReadableSpan[]): Set<ReadableSpan> {
const grouped = groupSpansWithParents(spans);
const remaining = new Set(grouped);
const sentSpans = new Set<ReadableSpan>();

const rootNodes = getCompletedRootNodes(grouped);

rootNodes.forEach(root => {
remaining.delete(root);
const span = root.span;
sentSpans.add(span);
const transactionEvent = createTransactionForOtelSpan(span);

// We'll recursively add all the child spans to this array
const spans = transactionEvent.spans || [];

root.children.forEach(child => {
createAndFinishSpanForOtelSpan(child, spans, remaining);
createAndFinishSpanForOtelSpan(child, spans, sentSpans);
});

// spans.sort() mutates the array, but we do not use this anymore after this point
Expand All @@ -162,9 +204,7 @@ function maybeSend(spans: ReadableSpan[]): ReadableSpan[] {
captureEvent(transactionEvent);
});

return Array.from(remaining)
.map(node => node.span)
.filter((span): span is ReadableSpan => !!span);
return sentSpans;
}

function nodeIsCompletedRootNode(node: SpanNode): node is SpanNodeCompleted {
Expand All @@ -175,11 +215,6 @@ function getCompletedRootNodes(nodes: SpanNode[]): SpanNodeCompleted[] {
return nodes.filter(nodeIsCompletedRootNode);
}

function shouldCleanupSpan(span: ReadableSpan, currentTimeSeconds: number, maxStartTimeOffsetSeconds: number): boolean {
const cutoff = currentTimeSeconds - maxStartTimeOffsetSeconds;
return spanTimeInputToSeconds(span.startTime) < cutoff;
}

function parseSpan(span: ReadableSpan): { op?: string; origin?: SpanOrigin; source?: TransactionSource } {
const attributes = span.attributes;

Expand Down Expand Up @@ -260,16 +295,19 @@ function createTransactionForOtelSpan(span: ReadableSpan): TransactionEvent {
return transactionEvent;
}

function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], remaining: Set<SpanNode>): void {
remaining.delete(node);
function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], sentSpans: Set<ReadableSpan>): void {
const span = node.span;

if (span) {
sentSpans.add(span);
}

const shouldDrop = !span;

// If this span should be dropped, we still want to create spans for the children of this
if (shouldDrop) {
node.children.forEach(child => {
createAndFinishSpanForOtelSpan(child, spans, remaining);
createAndFinishSpanForOtelSpan(child, spans, sentSpans);
});
return;
}
Expand Down Expand Up @@ -308,7 +346,7 @@ function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], remai
spans.push(spanJSON);

node.children.forEach(child => {
createAndFinishSpanForOtelSpan(child, spans, remaining);
createAndFinishSpanForOtelSpan(child, spans, sentSpans);
});
}

Expand Down
Loading

0 comments on commit a4138e9

Please sign in to comment.