Skip to content

Commit

Permalink
Use RetryingExporter in HTTP log exporter
Browse files Browse the repository at this point in the history
This unifies the implementation of the OTLP exporters and the HTTP log
exporter.

Next step is to consolidate the remaining HTTP exporters.

Fixes open-telemetry#4043.
  • Loading branch information
LarsMichelsen committed Sep 7, 2024
1 parent 3e3a700 commit 0cc4f5b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
from io import BytesIO
from os import environ
from typing import Dict, Optional, Sequence
from time import sleep

import requests

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
RetryingExporter,
)
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.sdk.environment_variables import (
Expand Down Expand Up @@ -124,6 +124,7 @@ def __init__(
{"Content-Encoding": self._compression.value}
)
self._shutdown = False
self._exporter = RetryingExporter(self._export, LogExportResult)

def _export(self, serialized_data: bytes):
data = serialized_data
Expand All @@ -135,14 +136,31 @@ def _export(self, serialized_data: bytes):
elif self._compression == Compression.Deflate:
data = zlib.compress(serialized_data)

return self._session.post(
resp = self._session.post(
url=self._endpoint,
data=data,
verify=self._certificate_file,
timeout=self._timeout,
cert=self._client_cert,
)

if resp.ok:
return LogExportResult.SUCCESS

if self._retryable(resp):
_logger.warning(
"Transient error %s encountered while exporting logs batch.",
resp.reason,
)
raise RetryableExportError(None)

_logger.error(
"Failed to export logs batch code: %s, reason: %s",
resp.status_code,
resp.text,
)
return LogExportResult.FAILURE

@staticmethod
def _retryable(resp: requests.Response) -> bool:
if resp.status_code == 408:
Expand All @@ -159,34 +177,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
return LogExportResult.FAILURE

serialized_data = encode_logs(batch).SerializeToString()

for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE

resp = self._export(serialized_data)
# pylint: disable=no-else-return
if resp.ok:
return LogExportResult.SUCCESS
elif self._retryable(resp):
_logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
resp.reason,
delay,
)
sleep(delay)
continue
else:
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
resp.status_code,
resp.text,
)
return LogExportResult.FAILURE
return LogExportResult.FAILURE
return self._exporter.export_with_retry(serialized_data)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def test_exported_log_without_span_id(self):
self.fail("No log records found")

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
@patch("opentelemetry.exporter.otlp.proto.common.exporter.sleep")
def test_exponential_backoff(self, mock_sleep):
# return a retryable error
responses.add(
Expand Down Expand Up @@ -358,12 +358,14 @@ def _get_sdk_log_data() -> List[LogData]:

return [log1, log2, log3, log4]

@patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True))
def test_2xx_status_code(self, mock_otlp_metric_exporter):
def test_2xx_status_code(self):
"""
Test that any HTTP 2XX code returns a successful result
"""

self.assertEqual(
OTLPLogExporter().export(MagicMock()), LogExportResult.SUCCESS
OTLPLogExporter(
session=Mock(**{"post.return_value": Mock(ok=True)})
).export(MagicMock()),
LogExportResult.SUCCESS,
)

0 comments on commit 0cc4f5b

Please sign in to comment.