Skip to content

Commit

Permalink
Revived the OpenAI / Pinecone embedding automation plugin module.
Browse files Browse the repository at this point in the history
	deleted:    Auto-Embedder/PinEbed.py
	renamed:    Auto-Embedder/.env.template -> OP-Stack-Automation/.env.template
	new file:   OP-Stack-Automation/README.md
	new file:   OP-Stack-Automation/pinembed.py
  • Loading branch information
Daethyra committed Sep 18, 2023
1 parent b48a246 commit cc3d32e
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 60 deletions.
60 changes: 0 additions & 60 deletions Auto-Embedder/PinEbed.py

This file was deleted.

File renamed without changes.
62 changes: 62 additions & 0 deletions OP-Stack-Automation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Automate the OP stack with `pinembed`

##### *A plugin for automating the retrieval of text embeddings from OpenAI and storing them in Pinecone.*

## Overview

This document outlines the recent updates made to a Python module designed for automating the retrieval of text embeddings from OpenAI and storing them in Pinecone. If you're new to this, think of text embeddings as numerical representations of textual data, and Pinecone as a storage service for these embeddings.

The key enhancements include the introduction of mechanisms to control the rate of API requests (rate-limiting) and improvements in the organization of the code (modularity). These changes aim to make the module robust and adaptable to API limitations, suitable for developers of all levels.

## Table of Contents

- [Automate the OP stack with `pinembed`](#automate-the-op-stack-with-pinembed)
- [A plugin for automating the retrieval of text embeddings from OpenAI and storing them in Pinecone.](#a-plugin-for-automating-the-retrieval-of-text-embeddings-from-openai-and-storing-them-in-pinecone)
- [Overview](#overview)
- [Table of Contents](#table-of-contents)
- [What Changed?](#what-changed)
- [Intended Usage and Capabilities](#intended-usage-and-capabilities)
- [Who Should Use This?](#who-should-use-this)
- [What Can It Do?](#what-can-it-do)
- [Introduction to Rate Limiting](#introduction-to-rate-limiting)
- [What is Rate Limiting?](#what-is-rate-limiting)
- [How is it Implemented?](#how-is-it-implemented)
- [Modular Code Organization and Configuration](#modular-code-organization-and-configuration)
- [What is Modular Code?](#what-is-modular-code)
- [Glossary](#glossary)

#### What Changed?

The original implementation had a single class `PineconeHandler` that handled both OpenAI and Pinecone functionalities. The updated version introduces a separate `OpenAIHandler` class, isolating OpenAI-specific functionalities. Furthermore, environment variables are now managed better using `dotenv`.

### Intended Usage and Capabilities

#### Who Should Use This?

This module is designed to be user-friendly and robust, suitable for both beginners and experienced developers.

#### What Can It Do?

It can handle high volumes of text data, transform them into embeddings via the OpenAI API, and store them in Pinecone, all without violating any API limitations. It employs `asyncio` for efficient asynchronous operations.

### Introduction to Rate Limiting

#### What is Rate Limiting?

Rate limiting is the practice of controlling the number of requests sent to an API within a given time frame. This is important to ensure that we don't overwhelm the API service.

#### How is it Implemented?

The module now incorporates a `RateLimiter` class. This class is designed to manage the rate of API requests, enabling the module to align with the API limitations of both OpenAI (3,500 requests per minute) and Pinecone (100 vectors per upsert request).

### Modular Code Organization and Configuration

#### What is Modular Code?

Modular code means that the code is organized into separate sections or 'modules,' each handling a specific functionality. This makes the code easier to understand, test, and maintain.

### Glossary

- **API (Application Programming Interface)**: A set of rules that allows different software entities to communicate with each other.
- **Embedding**: A set of numerical values that represent the features of textual data.
- **Upsert**: A database operation that inserts rows into a database table if they do not already exist, or updates them if they do.
144 changes: 144 additions & 0 deletions OP-Stack-Automation/pinembed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Importing required libraries
import os
import logging
from dotenv import load_dotenv
from typing import Dict, Any, Tuple
import pinecone
import openai
from asyncio import gather, run

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configure rate limiting functionality
class RateLimiter:
def __init__(self, max_calls: int, period: int):
self.max_calls = max_calls
self.period = period
self.calls = []

def __call__(self, func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
current_time = datetime.now()
self.calls = [call for call in self.calls if current_time - call < timedelta(seconds=self.period)]

if len(self.calls) < self.max_calls:
self.calls.append(current_time)
return await func(*args, **kwargs)
else:
sleep_time = (self.calls[0] + timedelta(seconds=self.period)) - current_time
await asyncio.sleep(sleep_time.total_seconds())
self.calls.pop(0)
self.calls.append(datetime.now())
return await func(*args, **kwargs)

return wrapper

# OpenAI Rate Limiter: 3500 RPM
openai_limiter = RateLimiter(max_calls=3500, period=60)

# Pinecone Rate Limiter: 100 vectors per request (Assuming 1 request takes 1 second)
pinecone_limiter = RateLimiter(max_calls=100, period=1)


class OpenAIHandler:
"""Handles text embedding generation using OpenAI's API."""

def __init__(self) -> None:
"""Initialize OpenAI API using environment variables."""
load_dotenv()
self.api_key = os.getenv('OPENAI_API_KEY')
self.model_engine = os.getenv('MODEL', 'text-embeddings-ada-002')
openai.api_key = self.api_key
logger.info(f"OpenAI API initialized with model {self.model_engine}.")

# Applying to OpenAI API calls
@openai_limiter
async def generate_embedding(self, text: str) -> Tuple[str, Any]:
"""Generate text embedding.
Parameters:
text (str): The text to generate the embedding for.
Returns:
Tuple[str, Any]: The text and its corresponding embedding.
"""
try:
response = openai.Embedding.create(
model=self.model_engine,
texts=[text]
)
if 'embeddings' in response:
return text, response['embeddings'][0]['embedding']
else:
logger.error(f"Unexpected response format: {response}")
return text, None
except Exception as e:
logger.error(f"Error generating embedding for text: {e}")
return text, None


class PineconeHandler:
"""Handles data embedding storage in Pinecone."""

def __init__(self) -> None:
"""Initialize Pinecone using environment variables."""
load_dotenv()
self.api_key = os.getenv('PINECONE_API_KEY')
self.environment = os.getenv('PINECONE_ENVIRONMENT', 'us-central1-gcp')
self.index_name = os.getenv('PINEDEX', 'default_index_name')
pinecone.init(api_key=self.api_key)
self.index = pinecone.Index(index_name=self.index_name)
logger.info(f"Pinecone initialized with index {self.index_name}.")

# Applying to Pinecone upserts
@pinecone_limiter
async def store_embedding(self, data_id: str, embedding: Any, text: str) -> None:
"""Store the embedding vector in Pinecone.
Parameters:
data_id (str): The data ID for the embedding.
embedding (Any): The embedding vector.
text (str): The original text.
"""
try:
if embedding is not None:
self.index.upsert(vectors=[(data_id, embedding, {'text': text})])
logger.info(f"Embedding for data ID {data_id} stored in Pinecone.")
else:
logger.warning(f"Null embedding for data ID {data_id}. Skipping storage.")
except Exception as e:
logger.error(f"Error storing embedding for data ID {data_id}: {e}")


class EmbeddingManager:
"""Manages the process of generating and storing text embeddings."""

def __init__(self) -> None:
"""Initialize OpenAI and Pinecone handlers."""
self.openai_handler = OpenAIHandler()
self.pinecone_handler = PineconeHandler()

async def process_data(self, data: Dict[str, Any]) -> None:
"""Process a data entry to generate embeddings and store in Pinecone.
Parameters:
data (Dict[str, Any]): The data in JSON format.
"""
text = data['text']
data_id = data['id']
text, embedding = await self.openai_handler.generate_embedding(text=text)
await self.pinecone_handler.store_embedding(data_id=data_id, embedding=embedding, text=text)


if __name__ == "__main__":
# Initialize the EmbeddingManager
embedding_manager = EmbeddingManager()

# Sample data | Replace with your own test data
sample_data = [{'id': '1', 'text': 'Hello world'}, {'id': '2', 'text': 'How are you?'}]

# Process the sample data
run(gather(*(embedding_manager.process_data(data) for data in sample_data)))

0 comments on commit cc3d32e

Please sign in to comment.