Skip to content

Commit

Permalink
Handle exporter exceptions to prevent termination of worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Nevay committed Aug 9, 2022
1 parent 61d6af9 commit de53b81
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 39 deletions.
112 changes: 73 additions & 39 deletions src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace OpenTelemetry\SDK\Trace\SpanProcessor;

use function assert;
use Closure;
use function count;
use InvalidArgumentException;
use OpenTelemetry\Context\Context;
Expand All @@ -16,10 +15,16 @@
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use SplQueue;
use function sprintf;
use Throwable;

class BatchSpanProcessor implements SpanProcessorInterface
class BatchSpanProcessor implements SpanProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;

public const DEFAULT_SCHEDULE_DELAY = 5000;
public const DEFAULT_EXPORT_TIMEOUT = 30000;
public const DEFAULT_MAX_QUEUE_SIZE = 2048;
Expand All @@ -40,7 +45,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
private array $batch = [];
/** @var SplQueue<list<SpanDataInterface>> */
private SplQueue $queue;
/** @var SplQueue<array{int, Closure}> */
/** @var SplQueue<array{int, string, ?CancellationInterface, bool}> */
private SplQueue $flush;

private bool $closed = false;
Expand Down Expand Up @@ -105,72 +110,108 @@ public function onEnd(ReadableSpanInterface $span): void
if (count($this->batch) === $this->maxExportBatchSize) {
$this->enqueueBatch();
}
if ($this->autoFlush && !$this->running && $this->shouldFlush()) {
if ($this->autoFlush) {
$this->flush();
}
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
{
if ($this->closed) {
return $this->flush->isEmpty();
return false;
}

return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation));
return $this->flush(__FUNCTION__, $cancellation);
}

public function shutdown(?CancellationInterface $cancellation = null): bool
{
if ($this->closed) {
return $this->flush->isEmpty();
return false;
}

$this->closed = true;

return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation));
return $this->flush(__FUNCTION__, $cancellation);
}

private function flush(?Closure $forceFlush = null): bool
private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool
{
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
if ($flushMethod !== null) {
if ($this->flush->isEmpty() && $this->queue->isEmpty() && !$this->batch) {
return $this->exporter->$flushMethod($cancellation);
}

if ($forceFlush !== null) {
$this->flush->enqueue([$flushId, $forceFlush]);
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]);
}

$this->processFlushTasks();
if ($this->running) {
return false;
}

if (!$this->running) {
$this->running = true;
$success = true;
$exception = null;
$this->running = true;

try {
while ($this->shouldFlush()) {
if ($this->queue->isEmpty()) {
$this->enqueueBatch();
}
$batchSize = count($this->queue->bottom());
$this->batchId++;
try {
for (;;) {
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
[, $flushMethod, $cancellation, $caller] = $this->flush->dequeue();

try {
$this->exporter->export($this->queue->dequeue());
} finally {
$this->queueSize -= $batchSize;
$result = $this->exporter->$flushMethod($cancellation);
if ($caller) {
$success = $result;
}
} catch (Throwable $e) {
if ($caller) {
$exception = $e;

continue;
}
if ($this->logger !== null) {
$this->logger->error(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
}
}
}

if (!$this->shouldFlush()) {
break;
}

if ($this->queue->isEmpty()) {
$this->enqueueBatch();
}
$batchSize = count($this->queue->bottom());
$this->batchId++;

try {
$this->exporter->export($this->queue->dequeue());
} catch (Throwable $e) {
if ($this->logger !== null) {
$this->logger->error('Unhandled export error', ['exception' => $e]);
}
$this->processFlushTasks();
} finally {
$this->queueSize -= $batchSize;
}
} finally {
$this->running = false;
}
} finally {
$this->running = false;
}

if ($exception !== null) {
throw $exception;
}

return $this->batchId >= $flushId;
return $success;
}

private function shouldFlush(): bool
{
return !$this->queue->isEmpty()
|| !$this->flush->isEmpty()
|| $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
return !$this->flush->isEmpty()
|| $this->autoFlush && !$this->queue->isEmpty()
|| $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
}

private function enqueueBatch(): void
Expand All @@ -181,11 +222,4 @@ private function enqueueBatch(): void
$this->batch = [];
$this->nextScheduledRun = null;
}

private function processFlushTasks(): void
{
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
$this->flush->dequeue()[1]();
}
}
}
88 changes: 88 additions & 0 deletions tests/Unit/SDK/Trace/SpanProcessor/BatchSpanProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace OpenTelemetry\Tests\Unit\SDK\Trace\SpanProcessor;

use InvalidArgumentException;
use LogicException;
use Mockery;
use Mockery\Adapter\Phpunit\MockeryTestCase;
use OpenTelemetry\API\Trace as API;
Expand All @@ -15,7 +16,9 @@
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor;
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
use OpenTelemetry\Tests\Unit\SDK\Util\TestClock;
use Psr\Log\LoggerInterface;

/**
* @covers \OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor
Expand Down Expand Up @@ -346,6 +349,91 @@ public function test_shutdown_shutdowns_exporter(): void
$processor->shutdown();
}

public function test_throwing_exporter_export(): void
{
$exporter = $this->createMock(SpanExporterInterface::class);
$exporter->method('forceFlush')->willReturn(true);
$exporter->method('export')->willThrowException(new LogicException());

$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('error');

$processor = new BatchSpanProcessor($exporter, $this->testClock);
$processor->setLogger($logger);

$span = $this->createSampledSpanMock();
$processor->onStart($span, Context::getCurrent());
$processor->onEnd($span);

$processor->forceFlush();
}

public function test_throwing_exporter_flush(): void
{
$exporter = $this->createMock(SpanExporterInterface::class);
$exporter->method('forceFlush')->willThrowException(new LogicException());

$this->expectException(LogicException::class);

$processor = new BatchSpanProcessor($exporter, $this->testClock);
$span = $this->createSampledSpanMock();
$processor->onStart($span, Context::getCurrent());
$processor->onEnd($span);

$processor->forceFlush();
}

public function test_throwing_exporter_flush_cannot_rethrow_in_original_caller_logs_error(): void
{
$exporter = $this->createMock(SpanExporterInterface::class);
$exporter->method('forceFlush')->willReturnCallback(function () use (&$processor) {
/** @var SpanProcessorInterface $processor */
$span = $this->createSampledSpanMock();
$processor->onStart($span, Context::getCurrent());
$processor->onEnd($span);

return $processor->shutdown();
});
$exporter->method('shutdown')->willThrowException(new LogicException());

$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('error');

$processor = new BatchSpanProcessor($exporter, $this->testClock);
$processor->setLogger($logger);

$span = $this->createSampledSpanMock();
$processor->onStart($span, Context::getCurrent());
$processor->onEnd($span);

$processor->forceFlush();
}

public function test_throwing_exporter_flush_rethrows_in_original_caller(): void
{
$exporter = $this->createMock(SpanExporterInterface::class);
$exporter->method('forceFlush')->willReturnCallback(function () use (&$processor) {
/** @var SpanProcessorInterface $processor */
$span = $this->createSampledSpanMock();
$processor->onStart($span, Context::getCurrent());
$processor->onEnd($span);
$processor->shutdown();

throw new LogicException();
});
$exporter->expects($this->once())->method('shutdown');

$this->expectException(LogicException::class);

$processor = new BatchSpanProcessor($exporter, $this->testClock);

$span = $this->createSampledSpanMock();
$processor->onStart($span, Context::getCurrent());
$processor->onEnd($span);

$processor->forceFlush();
}

public function test_span_processor_throws_on_invalid_max_queue_size(): void
{
$this->expectException(InvalidArgumentException::class);
Expand Down

0 comments on commit de53b81

Please sign in to comment.