Skip to content

Commit

Permalink
Added Workers system for standalone tasks
Browse files Browse the repository at this point in the history
### Added
- New Workers system for task-specific data extraction
  - Base Worker class in core module
  - PricingResearchWorker implementation
  - Plugin integration (Serper, Jina, LiteLLM)
- Automated pricing data extraction capabilities
  - Plan detection
  - Feature extraction
  - Price analysis
  - Subscriber limit detection

### Changed
- Enhanced LiteLLM integration for structured data
- Improved content extraction accuracy
- Standardized worker output format

### Documentation
- Added workers.md documentation
- Updated plugin integration guides
- Added pricing research examples

This commit introduces a new Workers system that simplifies complex data extraction tasks by combining multiple plugins into focused, single-purpose executors. The initial implementation includes a PricingResearchWorker that can extract structured pricing data from any SaaS website.
  • Loading branch information
tomaslau committed Nov 10, 2024
1 parent 6c4672e commit 2d52ccc
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 0 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,29 @@ If you encounter issues:
## Credits

Standing on the shoulders of the open-source giants, built with ☕️ and dedication by a marketer who codes.

## Workers

Workers are standalone task executors that combine multiple plugins for specific data extraction needs. Perfect for automated research and monitoring tasks.

### Available Workers

- **PricingResearchWorker**: Extracts structured pricing data from any SaaS website
```python
from pynions.workers import PricingResearchWorker

async def analyze_pricing():
worker = PricingResearchWorker()
result = await worker.execute({"domain": "example.com"})
print(json.dumps(result, indent=2))
```

### Features

- 🎯 Task-specific implementations
- 🔄 Automated data extraction
- 📊 Structured output
- 🛠 Plugin integration
- ⚡ Efficient processing

See [Workers Documentation](docs/workers.md) for more details.
26 changes: 26 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,32 @@ summary: "Updates, bug fixes and improvements."
kind: "detailed"
---

## v0.2.22 - Nov 10, 2024

### Added

- New Workers system for standalone data extraction tasks
- Added base Worker class in core module
- Added PricingResearchWorker for automated pricing analysis
- Integrated with existing plugins (Serper, Jina, LiteLLM)
- Automated pricing data extraction capabilities
- Accurate plan detection
- Feature extraction
- Price point analysis
- Subscriber limit detection

### Changed

- Enhanced LiteLLM integration for structured data extraction
- Improved content extraction accuracy in Jina plugin
- Standardized worker output format

### Documentation

- Added workers documentation and examples
- Updated plugin integration guides
- Added pricing research examples

## v0.2.21 - Nov 10, 2024

### Changed
Expand Down
103 changes: 103 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
---
title: "Workers"
publishedAt: "2024-11-10"
updatedAt: "2024-11-10"
summary: "Standalone task executors that combine multiple plugins for specific data extraction needs."
kind: "detailed"
---

## Overview
Workers are specialized task executors that combine multiple plugins to perform specific data extraction and analysis tasks. Unlike workflows that chain multiple steps together, workers are focused on single, well-defined tasks that require coordination between multiple plugins.

## Features
- 🎯 Task-specific implementations
- 🔄 Automated data extraction
- 📊 Structured output
- 🛠 Plugin integration
- ⚡ Efficient processing

## Available Workers

### PricingResearchWorker
Extracts structured pricing data from any SaaS website by combining:
1. **Serper Web Search**: Finds pricing pages
2. **Jina AI Reader**: Extracts clean content
3. **LiteLLM**: Analyzes and structures pricing data

#### Usage

```python
from pynions.workers import PricingResearchWorker
async def analyze_pricing():
worker = PricingResearchWorker()
result = await worker.execute({"domain": "example.com"})
print(json.dumps(result, indent=2))
```


#### Output Structure

```json
{
"domain": "example.com",
"source": "https://example.com/pricing",
"pricing": {
"plans": ["plan names"],
"pricing": {
"plan_name": {
"monthly_price": 0.0,
"annual_price": 0.0,
"features": ["feature list"],
"limits": {"limit_type": "limit value"}
}
},
"currency": "USD"
}
}
```


## Creating Custom Workers

1. Inherit from base Worker class
```python
from pynions.core import Worker
class CustomWorker(Worker):
def init(self):
# Initialize required plugins
self.plugin1 = Plugin1()
self.plugin2 = Plugin2()
async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Implement your worker logic
pass


## Best Practices

1. **Plugin Integration**
- Initialize plugins in constructor
- Handle plugin errors gracefully
- Validate plugin responses

2. **Data Processing**
- Use structured input/output
- Validate extracted data
- Clean and normalize output

3. **Error Handling**
- Handle network timeouts
- Validate input parameters
- Provide meaningful error messages

4. **Performance**
- Minimize API calls
- Process only required data
- Use efficient data structures

## Common Issues
- API rate limits
- Content extraction failures
- Data validation errors
- Network timeouts

Need help? Check our [Debugging Guide](debugging.md) for solutions.
2 changes: 2 additions & 0 deletions pynions/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from .workflow import Workflow, WorkflowStep
from .config import Config
from .datastore import DataStore
from .worker import Worker

__all__ = [
"Plugin",
"Workflow",
"WorkflowStep",
"Config",
"DataStore",
"Worker",
]
24 changes: 24 additions & 0 deletions pynions/core/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from abc import ABC, abstractmethod
from typing import Dict, Any
import logging


class Worker(ABC):
"""Base class for all Pynions workers"""

def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.logger = logging.getLogger(self.__class__.__name__)

@abstractmethod
async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the worker's task"""
pass

def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""Validate input data"""
return True # Override in subclasses

def validate_output(self, output: Dict[str, Any]) -> bool:
"""Validate output data"""
return True # Override in subclasses
101 changes: 101 additions & 0 deletions pynions/workers/pricing_research_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import asyncio
import json
from typing import Dict, Any
from pynions.core import Worker
from pynions.plugins.serper import SerperWebSearch
from pynions.plugins.jina import JinaAIReader
from pynions.plugins.litellm_plugin import LiteLLM


class PricingResearchWorker(Worker):
"""Worker for extracting pricing data from a website"""

def __init__(self):
self.serper = SerperWebSearch({"max_results": 1})
self.jina = JinaAIReader()
self.llm = LiteLLM(
{
"model": "gpt-4o-mini",
"temperature": 0.1,
"max_tokens": 1000,
}
)

async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract and structure pricing data from a domain"""
domain = input_data["domain"]
print(f"\n🔍 Analyzing pricing for {domain}")

try:
# Get pricing page URL
search_result = await self.serper.execute(
{"query": f"site:{domain} pricing"}
)
if not search_result.get("organic"):
return None

url = search_result["organic"][0]["link"]
print(f"📄 Found pricing page: {url}")

# Extract content
content = await self.jina.execute({"url": url})
if not content or not content.get("content"):
return None

print(f"✅ Extracted {len(content['content'])} characters")

# Analyze with LLM - using full content
response = await self.llm.execute(
{
"messages": [
{
"role": "system",
"content": """You are a precise pricing data extractor. Your task is to extract EXACT pricing information from websites.
Instructions:
1. Only include information that is explicitly stated in the content
2. Use exact prices, features, and limits as shown
3. Do not make assumptions or fill in missing data
4. If a value is not found, exclude it from the output
Output format:
{
"plans": ["exact plan names found"],
"pricing": {
"plan_name": {
"monthly_price": exact_number_from_content,
"annual_price": exact_number_from_content,
"features": ["exact feature text"],
"limits": {"exact limit name": "exact limit value"}
}
},
"currency": "exact currency code found"
}""",
},
{
"role": "user",
"content": f"Extract the pricing structure from this content. Only include explicitly stated information:\n\n{content['content']}",
},
]
}
)

# Parse response
pricing_data = json.loads(response["content"])
return {"domain": domain, "source": url, "pricing": pricing_data}

except Exception as e:
print(f"❌ Error: {str(e)}")
return None


# Test
if __name__ == "__main__":

async def test():
worker = PricingResearchWorker()
result = await worker.execute({"domain": "rewardful.com"})
if result:
print("\nPricing Data:")
print(json.dumps(result, indent=2))

asyncio.run(test())
Loading

0 comments on commit 2d52ccc

Please sign in to comment.