Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add custom Langfuse span handling support #1313

Merged
merged 10 commits into from
Jan 28, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import httpx
from haystack import component, default_from_dict, default_to_dict, logging, tracing
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.utils.base_serialization import deserialize_class_instance, serialize_class_instance

from haystack_integrations.tracing.langfuse import LangfuseTracer
from haystack_integrations.tracing.langfuse import LangfuseTracer, SpanHandler
from langfuse import Langfuse

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -94,6 +95,24 @@ async def shutdown_event():
print(response["tracer"]["trace_url"])
```

For advanced use cases, you can also customize how spans are created and processed by
providing a custom SpanHandler. This allows you to add custom metrics, set warning levels,
or attach additional metadata to your Langfuse traces:

```python
from haystack_integrations.tracing.langfuse import DefaultSpanHandler, LangfuseSpan
from typing import Optional

class CustomSpanHandler(DefaultSpanHandler):

def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
# Custom span handling logic, customize Langfuse spans however it fits you
# see DefaultSpanHandler for how we create and process spans by default
pass

connector = LangfuseConnector(span_handler=CustomSpanHandler())
```

"""

def __init__(
Expand All @@ -103,6 +122,7 @@ def __init__(
public_key: Optional[Secret] = Secret.from_env_var("LANGFUSE_PUBLIC_KEY"), # noqa: B008
secret_key: Optional[Secret] = Secret.from_env_var("LANGFUSE_SECRET_KEY"), # noqa: B008
httpx_client: Optional[httpx.Client] = None,
span_handler: Optional[SpanHandler] = None,
):
"""
Initialize the LangfuseConnector component.
Expand All @@ -117,11 +137,16 @@ def __init__(
:param httpx_client: Optional custom httpx.Client instance to use for Langfuse API calls. Note that when
deserializing a pipeline from YAML, any custom client is discarded and Langfuse will create its own default
client, since HTTPX clients cannot be serialized.
:param span_handler: Optional custom handler for processing spans. If None, uses DefaultSpanHandler.
The span handler controls how spans are created and processed, allowing customization of span types
based on component types and additional processing after spans are yielded. See SpanHandler class for
details on implementing custom handlers.
"""
self.name = name
self.public = public
self.secret_key = secret_key
self.public_key = public_key
self.span_handler = span_handler
self.tracer = LangfuseTracer(
tracer=Langfuse(
secret_key=secret_key.resolve_value() if secret_key else None,
Expand All @@ -130,6 +155,7 @@ def __init__(
),
name=name,
public=public,
span_handler=span_handler,
)
tracing.enable_tracing(self.tracer)

Expand Down Expand Up @@ -158,13 +184,15 @@ def to_dict(self) -> Dict[str, Any]:

:returns: The serialized component as a dictionary.
"""
span_handler = serialize_class_instance(self.span_handler) if self.span_handler else None
return default_to_dict(
self,
name=self.name,
public=self.public,
secret_key=self.secret_key.to_dict() if self.secret_key else None,
public_key=self.public_key.to_dict() if self.public_key else None,
# Note: httpx_client is not serialized as it's not serializable
span_handler=span_handler,
)

@classmethod
Expand All @@ -175,5 +203,9 @@ def from_dict(cls, data: Dict[str, Any]) -> "LangfuseConnector":
:param data: The dictionary representation of this component.
:returns: The deserialized component instance.
"""
deserialize_secrets_inplace(data["init_parameters"], keys=["secret_key", "public_key"])
init_params = data["init_parameters"]
deserialize_secrets_inplace(init_params, keys=["secret_key", "public_key"])
init_params["span_handler"] = (
deserialize_class_instance(init_params["span_handler"]) if init_params["span_handler"] else None
)
return default_from_dict(cls, data)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: 2024-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
from .tracer import LangfuseTracer
from .tracer import DefaultSpanHandler, LangfuseSpan, LangfuseTracer, SpanContext, SpanHandler

__all__ = ["LangfuseTracer"]
__all__ = ["DefaultSpanHandler", "LangfuseSpan", "LangfuseTracer", "SpanContext", "SpanHandler"]
Loading
Loading