Skip to content

Commit

Permalink
Allow disabling auto-flush in batch processor ::onEnd()
Browse files Browse the repository at this point in the history
  • Loading branch information
Nevay committed Aug 4, 2022
1 parent d4233b0 commit af668ad
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
private int $maxBatchSize;
private int $scheduledDelayNanos;
private int $maxExportBatchSize;
private bool $autoFlush;

private ?int $nextScheduledRun = null;
private bool $running = false;
Expand All @@ -51,7 +52,8 @@ public function __construct(
int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE,
int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY,
int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT,
int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE
int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE,
bool $autoFlush = true
) {
if ($maxQueueSize <= 0) {
throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
Expand All @@ -75,6 +77,7 @@ public function __construct(
$this->maxBatchSize = $maxQueueSize % $maxExportBatchSize;
$this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000;
$this->maxExportBatchSize = $maxExportBatchSize;
$this->autoFlush = $autoFlush;

$this->queue = new SplQueue();
$this->flush = new SplQueue();
Expand Down Expand Up @@ -102,9 +105,9 @@ public function onEnd(ReadableSpanInterface $span): void

if (count($this->batch) === $this->maxExportBatchSize) {
$this->enqueueBatch();
}
if ($this->autoFlush && !$this->running && $this->shouldFlush()) {
$this->flush();
} elseif ($this->clock->now() > $this->nextScheduledRun) {
$this->flush(static fn () => null);
}
}

Expand All @@ -130,36 +133,39 @@ public function shutdown(?CancellationInterface $cancellation = null): bool

private function flush(?Closure $forceFlush = null): bool
{
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;

if ($forceFlush !== null) {
$this->flush->enqueue([
$this->batchId + $this->queue->count() + (int) (bool) $this->batch,
$forceFlush,
]);
$this->flush->enqueue([$flushId, $forceFlush]);
}

if ($this->running) {
return false;
}
$this->processFlushTasks();

$this->running = true;
if (!$this->running) {
$this->running = true;

try {
$this->processFlushTasks();
while (!$this->queue->isEmpty() || !$this->flush->isEmpty()) {
if ($this->queue->isEmpty()) {
$this->enqueueBatch();
}
while (!$this->queue->isEmpty()) {
try {
while ($this->shouldFlush()) {
if ($this->queue->isEmpty()) {
$this->enqueueBatch();
}
$this->batchId++;
$this->exporter->export($this->queue->dequeue());
$this->processFlushTasks();
}
} finally {
$this->running = false;
}
} finally {
$this->running = false;
}

return true;
return $this->batchId >= $flushId;
}

private function shouldFlush(): bool
{
return !$this->queue->isEmpty()
|| !$this->flush->isEmpty()
|| $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
}

private function enqueueBatch(): void
Expand Down

0 comments on commit af668ad

Please sign in to comment.