Skip to content

Commit

Permalink
feat: Add LangfuseConnector secure key management and serialization (#…
Browse files Browse the repository at this point in the history
…1287)

* LangfuseConnector: add secret_key and public_key init params

* Update tests

* Linting

* Add serde test

* Lint

* PR feedback

* PR feedback
  • Loading branch information
vblagoje authored Jan 17, 2025
1 parent c33e81c commit 7e598d8
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict, Optional

from haystack import component, logging, tracing
from haystack import component, default_from_dict, default_to_dict, logging, tracing
from haystack.utils import Secret, deserialize_secrets_inplace

from haystack_integrations.tracing.langfuse import LangfuseTracer
from langfuse import Langfuse
Expand Down Expand Up @@ -94,7 +95,13 @@ async def shutdown_event():
"""

def __init__(self, name: str, public: bool = False):
def __init__(
self,
name: str,
public: bool = False,
public_key: Optional[Secret] = Secret.from_env_var("LANGFUSE_PUBLIC_KEY"), # noqa: B008
secret_key: Optional[Secret] = Secret.from_env_var("LANGFUSE_SECRET_KEY"), # noqa: B008
):
"""
Initialize the LangfuseConnector component.
Expand All @@ -103,9 +110,21 @@ def __init__(self, name: str, public: bool = False):
:param public: Whether the tracing data should be public or private. If set to `True`, the tracing data will be
publicly accessible to anyone with the tracing URL. If set to `False`, the tracing data will be private and
only accessible to the Langfuse account owner. The default is `False`.
:param public_key: The Langfuse public key. Defaults to reading from LANGFUSE_PUBLIC_KEY environment variable.
:param secret_key: The Langfuse secret key. Defaults to reading from LANGFUSE_SECRET_KEY environment variable.
"""
self.name = name
self.tracer = LangfuseTracer(tracer=Langfuse(), name=name, public=public)
self.public = public
self.secret_key = secret_key
self.public_key = public_key
self.tracer = LangfuseTracer(
tracer=Langfuse(
secret_key=secret_key.resolve_value() if secret_key else None,
public_key=public_key.resolve_value() if public_key else None,
),
name=name,
public=public,
)
tracing.enable_tracing(self.tracer)

@component.output_types(name=str, trace_url=str)
Expand All @@ -126,3 +145,28 @@ def run(self, invocation_context: Optional[Dict[str, Any]] = None):
invocation_context=invocation_context,
)
return {"name": self.name, "trace_url": self.tracer.get_trace_url()}

def to_dict(self) -> Dict[str, Any]:
"""
Serialize this component to a dictionary.
:returns: The serialized component as a dictionary.
"""
return default_to_dict(
self,
name=self.name,
public=self.public,
secret_key=self.secret_key.to_dict() if self.secret_key else None,
public_key=self.public_key.to_dict() if self.public_key else None,
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LangfuseConnector":
"""
Deserialize this component from a dictionary.
:param data: The dictionary representation of this component.
:returns: The deserialized component instance.
"""
deserialize_secrets_inplace(data["init_parameters"], keys=["secret_key", "public_key"])
return default_from_dict(cls, data)
84 changes: 76 additions & 8 deletions integrations/langfuse/tests/test_tracing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import random
import time
from urllib.parse import urlparse

Expand All @@ -9,6 +8,7 @@
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret
from requests.auth import HTTPBasicAuth

from haystack_integrations.components.connectors.langfuse import LangfuseConnector
Expand All @@ -19,6 +19,36 @@
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"


@pytest.fixture
def pipeline_with_env_vars(llm_class, expected_trace):
"""Pipeline factory using environment variables for Langfuse authentication"""
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector(name=f"Chat example - {expected_trace}", public=True))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", llm_class())
pipe.connect("prompt_builder.prompt", "llm.messages")
return pipe


@pytest.fixture
def pipeline_with_secrets(llm_class, expected_trace):
"""Pipeline factory using Secret objects for Langfuse authentication"""
pipe = Pipeline()
pipe.add_component(
"tracer",
LangfuseConnector(
name=f"Chat example - {expected_trace}",
public=True,
secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"),
public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"),
),
)
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", llm_class())
pipe.connect("prompt_builder.prompt", "llm.messages")
return pipe


@pytest.mark.integration
@pytest.mark.parametrize(
"llm_class, env_var, expected_trace",
Expand All @@ -28,16 +58,12 @@
(CohereChatGenerator, "COHERE_API_KEY", "Cohere"),
],
)
def test_tracing_integration(llm_class, env_var, expected_trace):
@pytest.mark.parametrize("pipeline_fixture", ["pipeline_with_env_vars", "pipeline_with_secrets"])
def test_tracing_integration(llm_class, env_var, expected_trace, pipeline_fixture, request):
if not all([os.environ.get("LANGFUSE_SECRET_KEY"), os.environ.get("LANGFUSE_PUBLIC_KEY"), os.environ.get(env_var)]):
pytest.skip(f"Missing required environment variables: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or {env_var}")

pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector(name=f"Chat example - {expected_trace}", public=True))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", llm_class())
pipe.connect("prompt_builder.prompt", "llm.messages")

pipe = request.getfixturevalue(pipeline_fixture)
messages = [
ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
ChatMessage.from_user("Tell me about {{location}}"),
Expand Down Expand Up @@ -77,3 +103,45 @@ def test_tracing_integration(llm_class, env_var, expected_trace):
# check if the trace contains the expected user_id
assert "user_42" in str(res.content)
break


def test_pipeline_serialization(monkeypatch):
"""Test that a pipeline with secrets can be properly serialized and deserialized"""

# Set test env vars
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "secret")
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "public")
monkeypatch.setenv("OPENAI_API_KEY", "openai_api_key")

# Create pipeline with OpenAI LLM
pipe = Pipeline()
pipe.add_component(
"tracer",
LangfuseConnector(
name="Chat example - OpenAI",
public=True,
secret_key=Secret.from_env_var("LANGFUSE_SECRET_KEY"),
public_key=Secret.from_env_var("LANGFUSE_PUBLIC_KEY"),
),
)
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator())
pipe.connect("prompt_builder.prompt", "llm.messages")

# Serialize
serialized = pipe.to_dict()

# Check serialized secrets
tracer_params = serialized["components"]["tracer"]["init_parameters"]
assert isinstance(tracer_params["secret_key"], dict)
assert tracer_params["secret_key"]["type"] == "env_var"
assert tracer_params["secret_key"]["env_vars"] == ["LANGFUSE_SECRET_KEY"]
assert isinstance(tracer_params["public_key"], dict)
assert tracer_params["public_key"]["type"] == "env_var"
assert tracer_params["public_key"]["env_vars"] == ["LANGFUSE_PUBLIC_KEY"]

# Deserialize
new_pipe = Pipeline.from_dict(serialized)

# Verify pipeline is the same
assert new_pipe == pipe

0 comments on commit 7e598d8

Please sign in to comment.