Skip to content

Commit

Permalink
Prevent concurrent ::export() calls in span processor (#788)
Browse files Browse the repository at this point in the history
* Prevent concurrent `::export()` calls in batch span processor
* Prevent concurrent `::export()` calls in simple span processor
* Allow disabling auto-flush in batch processor `::onEnd()`
* Include in-flight batches in queue limit
* Handle exporter exceptions to prevent termination of worker
* Use `LogsMessagesTrait`
  • Loading branch information
Nevay authored Aug 24, 2022
1 parent 74eda3d commit 2457c76
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 162 deletions.
223 changes: 147 additions & 76 deletions src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,142 +4,213 @@

namespace OpenTelemetry\SDK\Trace\SpanProcessor;

use function assert;
use function count;
use InvalidArgumentException;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SDK\Common\Environment\EnvironmentVariablesTrait;
use OpenTelemetry\SDK\Common\Environment\Variables as Env;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Time\ClockFactory;
use OpenTelemetry\SDK\Common\Time\ClockInterface;
use OpenTelemetry\SDK\Common\Time\StopWatch;
use OpenTelemetry\SDK\Common\Time\StopWatchFactory;
use OpenTelemetry\SDK\Common\Time\Util as TimeUtil;
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
use SplQueue;
use function sprintf;
use Throwable;

class BatchSpanProcessor implements SpanProcessorInterface
{
use EnvironmentVariablesTrait;
use LogsMessagesTrait;

public const DEFAULT_SCHEDULE_DELAY = 5000;
public const DEFAULT_EXPORT_TIMEOUT = 30000;
public const DEFAULT_MAX_QUEUE_SIZE = 2048;
public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;

private ?SpanExporterInterface $exporter;
private ?int $maxQueueSize;
private ?int $scheduledDelayMillis;
// @todo: Please, check if this code is needed. It creates an error in phpstan, since it's not used
/** @phpstan-ignore-next-line */
private ?int $exporterTimeoutMillis;
private ?int $maxExportBatchSize;
private bool $running = true;
private StopWatch $stopwatch;

private SpanExporterInterface $exporter;
private ClockInterface $clock;
private int $maxQueueSize;
private int $scheduledDelayNanos;
private int $maxExportBatchSize;
private bool $autoFlush;

private ?int $nextScheduledRun = null;
private bool $running = false;
private int $batchId = 0;
private int $queueSize = 0;
/** @var list<SpanDataInterface> */
private array $queue = [];
private array $batch = [];
/** @var SplQueue<list<SpanDataInterface>> */
private SplQueue $queue;
/** @var SplQueue<array{int, string, ?CancellationInterface, bool}> */
private SplQueue $flush;

private bool $closed = false;

public function __construct(
?SpanExporterInterface $exporter,
ClockInterface $clock = null,
int $maxQueueSize = null,
int $scheduledDelayMillis = null,
int $exporterTimeoutMillis = null,
int $maxExportBatchSize = null
SpanExporterInterface $exporter,
ClockInterface $clock,
int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE,
int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY,
int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT,
int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE,
bool $autoFlush = true
) {
$this->exporter = $exporter;
// @todo make the stopwatch a dependency rather than using the factory?
$this->stopwatch = StopWatchFactory::create($clock ?? ClockFactory::getDefault())->build();
$this->stopwatch->start();
$this->maxQueueSize = $maxQueueSize
?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_QUEUE_SIZE, self::DEFAULT_MAX_QUEUE_SIZE);
$this->scheduledDelayMillis = $scheduledDelayMillis
?: $this->getIntFromEnvironment(Env::OTEL_BSP_SCHEDULE_DELAY, self::DEFAULT_SCHEDULE_DELAY);
$this->exporterTimeoutMillis = $exporterTimeoutMillis
?: $this->getIntFromEnvironment(Env::OTEL_BSP_EXPORT_TIMEOUT, self::DEFAULT_EXPORT_TIMEOUT);
$this->maxExportBatchSize = $maxExportBatchSize
?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_EXPORT_BATCH_SIZE, self::DEFAULT_MAX_EXPORT_BATCH_SIZE);
if ($this->maxExportBatchSize > $this->maxQueueSize) {
throw new InvalidArgumentException(
sprintf('maxExportBatchSize should be smaller or equal to %s', $this->maxQueueSize)
);
if ($maxQueueSize <= 0) {
throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
}
if ($scheduledDelayMillis <= 0) {
throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis));
}
if ($exportTimeoutMillis <= 0) {
throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis));
}
if ($maxExportBatchSize <= 0) {
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize));
}
if ($maxExportBatchSize > $maxQueueSize) {
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize));
}

$this->exporter = $exporter;
$this->clock = $clock;
$this->maxQueueSize = $maxQueueSize;
$this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000;
$this->maxExportBatchSize = $maxExportBatchSize;
$this->autoFlush = $autoFlush;

$this->queue = new SplQueue();
$this->flush = new SplQueue();
}

/**
* @inheritDoc
*/
public function onStart(ReadWriteSpanInterface $span, Context $parentContext): void
{
}

/**
* @inheritDoc
*/
public function onEnd(ReadableSpanInterface $span): void
{
if (null === $this->exporter) {
if ($this->closed) {
return;
}

if (!$this->running) {
if (!$span->getContext()->isSampled()) {
return;
}

if ($span->getContext()->isSampled() && !$this->queueReachedLimit()) {
$this->queue[] = $span->toSpanData();
if ($this->queueSize === $this->maxQueueSize) {
return;
}

if ($this->bufferReachedExportLimit() || $this->enoughTimeHasPassed()) {
$this->forceFlush();
$this->queueSize++;
$this->batch[] = $span->toSpanData();
$this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos;

if (count($this->batch) === $this->maxExportBatchSize) {
$this->enqueueBatch();
}
if ($this->autoFlush) {
$this->flush();
}
}

/** @inheritDoc */
public function forceFlush(?CancellationInterface $cancellation = null): bool
{
if (!$this->running || $this->exporter === null) {
return true;
if ($this->closed) {
return false;
}

$this->exporter->export($this->queue)->await();
$this->queue = [];
$this->stopwatch->reset();
$this->exporter->forceFlush();

return true;
return $this->flush(__FUNCTION__, $cancellation);
}

/** @inheritDoc */
public function shutdown(?CancellationInterface $cancellation = null): bool
{
if (!$this->running) {
return true;
if ($this->closed) {
return false;
}

if (null !== $this->exporter && $this->forceFlush()) {
$this->exporter->shutdown();
}
$this->running = false;
$this->closed = true;

return true;
return $this->flush(__FUNCTION__, $cancellation);
}

protected function bufferReachedExportLimit(): bool
private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool
{
return count($this->queue) >= $this->maxExportBatchSize;
if ($flushMethod !== null) {
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]);
}

if ($this->running) {
return false;
}

$success = true;
$exception = null;
$this->running = true;

try {
for (;;) {
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
[, $flushMethod, $cancellation, $propagateResult] = $this->flush->dequeue();

try {
$result = $this->exporter->$flushMethod($cancellation);
if ($propagateResult) {
$success = $result;
}
} catch (Throwable $e) {
if ($propagateResult) {
$exception = $e;

continue;
}
self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
}
}

if (!$this->shouldFlush()) {
break;
}

if ($this->queue->isEmpty()) {
$this->enqueueBatch();
}
$batchSize = count($this->queue->bottom());
$this->batchId++;

try {
$this->exporter->export($this->queue->dequeue())->await();
} catch (Throwable $e) {
self::logError('Unhandled export error', ['exception' => $e]);
} finally {
$this->queueSize -= $batchSize;
}
}
} finally {
$this->running = false;
}

if ($exception !== null) {
throw $exception;
}

return $success;
}

protected function queueReachedLimit(): bool
private function shouldFlush(): bool
{
return count($this->queue) >= $this->maxQueueSize;
return !$this->flush->isEmpty()
|| $this->autoFlush && !$this->queue->isEmpty()
|| $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
}

protected function enoughTimeHasPassed(): bool
private function enqueueBatch(): void
{
return TimeUtil::millisToNanos((int) $this->scheduledDelayMillis) < $this->stopwatch->getLastElapsedTime();
assert($this->batch !== []);

$this->queue->enqueue($this->batch);
$this->batch = [];
$this->nextScheduledRun = null;
}
}
Loading

0 comments on commit 2457c76

Please sign in to comment.