Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent concurrent ::export() calls in span processor #788

Merged
merged 6 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 147 additions & 76 deletions src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,142 +4,213 @@

namespace OpenTelemetry\SDK\Trace\SpanProcessor;

use function assert;
use function count;
use InvalidArgumentException;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SDK\Common\Environment\EnvironmentVariablesTrait;
use OpenTelemetry\SDK\Common\Environment\Variables as Env;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Time\ClockFactory;
use OpenTelemetry\SDK\Common\Time\ClockInterface;
use OpenTelemetry\SDK\Common\Time\StopWatch;
use OpenTelemetry\SDK\Common\Time\StopWatchFactory;
use OpenTelemetry\SDK\Common\Time\Util as TimeUtil;
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
use SplQueue;
use function sprintf;
use Throwable;

class BatchSpanProcessor implements SpanProcessorInterface
{
use EnvironmentVariablesTrait;
use LogsMessagesTrait;

public const DEFAULT_SCHEDULE_DELAY = 5000;
public const DEFAULT_EXPORT_TIMEOUT = 30000;
public const DEFAULT_MAX_QUEUE_SIZE = 2048;
public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;

private ?SpanExporterInterface $exporter;
private ?int $maxQueueSize;
private ?int $scheduledDelayMillis;
// @todo: Please, check if this code is needed. It creates an error in phpstan, since it's not used
/** @phpstan-ignore-next-line */
private ?int $exporterTimeoutMillis;
private ?int $maxExportBatchSize;
private bool $running = true;
private StopWatch $stopwatch;

private SpanExporterInterface $exporter;
private ClockInterface $clock;
private int $maxQueueSize;
private int $scheduledDelayNanos;
private int $maxExportBatchSize;
private bool $autoFlush;

private ?int $nextScheduledRun = null;
private bool $running = false;
private int $batchId = 0;
private int $queueSize = 0;
/** @var list<SpanDataInterface> */
private array $queue = [];
private array $batch = [];
/** @var SplQueue<list<SpanDataInterface>> */
private SplQueue $queue;
/** @var SplQueue<array{int, string, ?CancellationInterface, bool}> */
private SplQueue $flush;

private bool $closed = false;

public function __construct(
?SpanExporterInterface $exporter,
ClockInterface $clock = null,
int $maxQueueSize = null,
int $scheduledDelayMillis = null,
int $exporterTimeoutMillis = null,
int $maxExportBatchSize = null
SpanExporterInterface $exporter,
ClockInterface $clock,
int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE,
int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY,
int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT,
int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE,
bool $autoFlush = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be changed to default false before beta to be consistent with async implementations that will only export once the event loop gains control. Using default true for now to keep the scope of the PR small / to not break examples etc.

) {
$this->exporter = $exporter;
// @todo make the stopwatch a dependency rather than using the factory?
$this->stopwatch = StopWatchFactory::create($clock ?? ClockFactory::getDefault())->build();
$this->stopwatch->start();
$this->maxQueueSize = $maxQueueSize
?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_QUEUE_SIZE, self::DEFAULT_MAX_QUEUE_SIZE);
$this->scheduledDelayMillis = $scheduledDelayMillis
?: $this->getIntFromEnvironment(Env::OTEL_BSP_SCHEDULE_DELAY, self::DEFAULT_SCHEDULE_DELAY);
$this->exporterTimeoutMillis = $exporterTimeoutMillis
?: $this->getIntFromEnvironment(Env::OTEL_BSP_EXPORT_TIMEOUT, self::DEFAULT_EXPORT_TIMEOUT);
$this->maxExportBatchSize = $maxExportBatchSize
?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_EXPORT_BATCH_SIZE, self::DEFAULT_MAX_EXPORT_BATCH_SIZE);
if ($this->maxExportBatchSize > $this->maxQueueSize) {
throw new InvalidArgumentException(
sprintf('maxExportBatchSize should be smaller or equal to %s', $this->maxQueueSize)
);
if ($maxQueueSize <= 0) {
throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
}
if ($scheduledDelayMillis <= 0) {
throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis));
}
if ($exportTimeoutMillis <= 0) {
throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis));
}
if ($maxExportBatchSize <= 0) {
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize));
}
if ($maxExportBatchSize > $maxQueueSize) {
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize));
}

$this->exporter = $exporter;
$this->clock = $clock;
$this->maxQueueSize = $maxQueueSize;
$this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000;
$this->maxExportBatchSize = $maxExportBatchSize;
$this->autoFlush = $autoFlush;

$this->queue = new SplQueue();
brettmc marked this conversation as resolved.
Show resolved Hide resolved
$this->flush = new SplQueue();
}

/**
* @inheritDoc
*/
public function onStart(ReadWriteSpanInterface $span, Context $parentContext): void
{
}

/**
* @inheritDoc
*/
public function onEnd(ReadableSpanInterface $span): void
{
if (null === $this->exporter) {
if ($this->closed) {
return;
}

if (!$this->running) {
if (!$span->getContext()->isSampled()) {
return;
}

if ($span->getContext()->isSampled() && !$this->queueReachedLimit()) {
$this->queue[] = $span->toSpanData();
if ($this->queueSize === $this->maxQueueSize) {
return;
}

if ($this->bufferReachedExportLimit() || $this->enoughTimeHasPassed()) {
$this->forceFlush();
$this->queueSize++;
$this->batch[] = $span->toSpanData();
$this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos;

if (count($this->batch) === $this->maxExportBatchSize) {
$this->enqueueBatch();
}
if ($this->autoFlush) {
$this->flush();
}
}

/** @inheritDoc */
public function forceFlush(?CancellationInterface $cancellation = null): bool
{
if (!$this->running || $this->exporter === null) {
return true;
if ($this->closed) {
return false;
}

$this->exporter->export($this->queue)->await();
$this->queue = [];
$this->stopwatch->reset();
$this->exporter->forceFlush();

return true;
return $this->flush(__FUNCTION__, $cancellation);
}

/** @inheritDoc */
public function shutdown(?CancellationInterface $cancellation = null): bool
{
if (!$this->running) {
return true;
if ($this->closed) {
return false;
}

if (null !== $this->exporter && $this->forceFlush()) {
$this->exporter->shutdown();
}
$this->running = false;
$this->closed = true;

return true;
return $this->flush(__FUNCTION__, $cancellation);
}

protected function bufferReachedExportLimit(): bool
private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool
{
return count($this->queue) >= $this->maxExportBatchSize;
if ($flushMethod !== null) {
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]);
}

if ($this->running) {
return false;
}

$success = true;
$exception = null;
$this->running = true;

try {
for (;;) {
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
[, $flushMethod, $cancellation, $propagateResult] = $this->flush->dequeue();

try {
$result = $this->exporter->$flushMethod($cancellation);
if ($propagateResult) {
$success = $result;
}
} catch (Throwable $e) {
if ($propagateResult) {
$exception = $e;

continue;
}
self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
}
}

if (!$this->shouldFlush()) {
break;
}

if ($this->queue->isEmpty()) {
$this->enqueueBatch();
}
$batchSize = count($this->queue->bottom());
$this->batchId++;

try {
$this->exporter->export($this->queue->dequeue())->await();
} catch (Throwable $e) {
self::logError('Unhandled export error', ['exception' => $e]);
} finally {
$this->queueSize -= $batchSize;
}
}
} finally {
$this->running = false;
}

if ($exception !== null) {
throw $exception;
}

return $success;
}

protected function queueReachedLimit(): bool
private function shouldFlush(): bool
{
return count($this->queue) >= $this->maxQueueSize;
return !$this->flush->isEmpty()
|| $this->autoFlush && !$this->queue->isEmpty()
|| $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
}

protected function enoughTimeHasPassed(): bool
private function enqueueBatch(): void
{
return TimeUtil::millisToNanos((int) $this->scheduledDelayMillis) < $this->stopwatch->getLastElapsedTime();
assert($this->batch !== []);

$this->queue->enqueue($this->batch);
$this->batch = [];
$this->nextScheduledRun = null;
}
}
Loading