Skip to content

Commit

Permalink
Pushin hella P
Browse files Browse the repository at this point in the history
	new file:   .github/.gitignore
	new file:   HuggingFace/audio_transcription/MicrophoneTranscription/__init__.py
	new file:   HuggingFace/audio_transcription/MicrophoneTranscription/requirements.txt
	renamed:    HuggingFace/audio_transcription/at1.py -> HuggingFace/audio_transcription/MicrophoneTranscription/transcribe_microphone.py
	new file:   HuggingFace/audio_transcription/__pycache__/at1.cpython-38.pyc
	deleted:    HuggingFace/audio_transcription/requirements.txt
	new file:   HuggingFace/audio_transcription/test/__init__.py
	new file:   HuggingFace/audio_transcription/test/test_at1.py
	modified:   LangChain/Retrieval-Agents/qa_local_docs.py
	modified:   LangChain/Retrieval-Agents/stateful_chatbot.py
  • Loading branch information
Daethyra committed Oct 10, 2023
1 parent b334f32 commit f571451
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 69 deletions.
Empty file added .github/.gitignore
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pyaudio
numpy
torch
transformers
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import torch
from transformers import pipeline
from collections import deque
import sys

class RealTimeASR:
"""
Expand All @@ -20,6 +21,7 @@ def __init__(self):
model="openai/whisper-large-v2",
chunk_length_s=30,
device=self.device,
return_timestamps=True
)
self.transcription_cache = deque(maxlen=100)
self.sliding_window = np.array([])
Expand All @@ -33,17 +35,31 @@ def initialize_audio(self):
frames_per_buffer=1024)

def capture_and_transcribe(self):
"""
Continuously captures audio from the microphone, concatenates it to a sliding window, and transcribes the audio
using the ASR pipeline. If the sliding window is longer than 30 seconds, the pipeline is run on the first 30 seconds
of audio and the sliding window is shifted by 5 seconds. If there is a transcription in the cache, it is printed to
stdout.
Returns:
None
"""
while True:
# Capture audio from the microphone
audio_data = np.frombuffer(self.stream.read(1024), dtype=np.int16)

# Concatenate the audio data to the sliding window
self.sliding_window = np.concatenate((self.sliding_window, audio_data))

# If the sliding window is longer than 30 seconds, transcribe the first 30 seconds and shift the sliding window
if len(self.sliding_window) >= 16000 * 30:
transcription = self.asr_pipeline(self.sliding_window[:16000 * 30])
self.transcription_cache.append(transcription["text"])
self.sliding_window = self.sliding_window[16000 * 5:]

# If there is a transcription in the cache, print it to stdout
if len(self.transcription_cache) > 0:
print(self.transcription_cache.pop())
print(self.transcription_cache.pop(), file=sys.stdout, flush=True)

def close_stream(self):
self.stream.stop_stream()
Expand All @@ -58,4 +74,4 @@ def close_stream(self):
except KeyboardInterrupt:
print("Stopping transcription.")
finally:
asr_app.close_stream()
asr_app.close_stream()
Binary file not shown.
5 changes: 0 additions & 5 deletions HuggingFace/audio_transcription/requirements.txt

This file was deleted.

Empty file.
96 changes: 96 additions & 0 deletions HuggingFace/audio_transcription/test/test_at1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import unittest
import numpy as np
from io import StringIO
from contextlib import redirect_stdout
from ..MicrophoneTranscription.transcribe_microphone import RealTimeASR


import unittest
import numpy as np
from io import StringIO
from contextlib import redirect_stdout
from RealTimeASR import RealTimeASR

class TestRealTimeASR(unittest.TestCase):
"""
This class contains unit tests for the RealTimeASR class.
"""
def setUp(self):
"""
This method sets up the test environment before each test case is run.
"""
self.asr_app = RealTimeASR()
self.asr_app.initialize_audio()

def test_sliding_window(self):
"""
This method tests that the sliding window is correctly updated with new audio data.
"""
audio_data = np.ones(16000, dtype=np.int16)
self.asr_app.sliding_window = np.array([])
self.asr_app.sliding_window = np.concatenate((self.asr_app.sliding_window, audio_data))
self.assertEqual(len(self.asr_app.sliding_window), 16000)

def test_transcription_cache(self):
"""
This method tests that the transcription cache is correctly updated with new transcriptions.
"""
transcription = {"text": "hello world"}
self.asr_app.transcription_cache.append(transcription["text"])
self.assertEqual(len(self.asr_app.transcription_cache), 1)
self.assertEqual(self.asr_app.transcription_cache[0], "hello world")

def test_capture_and_transcribe(self):
"""
This method tests that the capture_and_transcribe method correctly transcribes audio.
"""
audio_data = np.ones(16000 * 30, dtype=np.int16)
self.asr_app.sliding_window = np.array([])
self.asr_app.sliding_window = np.concatenate((self.asr_app.sliding_window, audio_data))
with redirect_stdout(StringIO()):
self.asr_app.capture_and_transcribe()
self.assertEqual(len(self.asr_app.transcription_cache), 1)
self.assertTrue(isinstance(self.asr_app.transcription_cache[0], str))

def test_close_stream(self):
"""
This method tests that the stream is closed correctly.
"""
self.asr_app.close_stream()
self.assertTrue(self.asr_app.stream.is_stopped())
self.assertTrue(self.asr_app.stream.is_closed())

def test_device(self):
"""
This method tests that the device is correctly set.
"""
self.assertTrue(self.asr_app.device in ["cuda:0", "cpu"])

def test_asr_pipeline(self):
"""
This method tests that the ASR pipeline is correctly set.
"""
self.assertTrue(isinstance(self.asr_app.asr_pipeline, RealTimeASR))

def test_sliding_window_shift(self):
"""
This method tests that the sliding window is correctly shifted.
"""
audio_data = np.ones(16000 * 30, dtype=np.int16)
self.asr_app.sliding_window = np.array([])
self.asr_app.sliding_window = np.concatenate((self.asr_app.sliding_window, audio_data))
with redirect_stdout(StringIO()):
self.asr_app.capture_and_transcribe()
self.assertEqual(len(self.asr_app.transcription_cache), 1)
self.assertTrue(isinstance(self.asr_app.transcription_cache[0], str))
self.assertEqual(len(self.asr_app.sliding_window), 16000 * 5)

def tearDown(self):
"""
This method tears down the test environment after each test case is run.
"""
self.asr_app.close_stream()


if __name__ == '__main__':
unittest.main()
70 changes: 39 additions & 31 deletions LangChain/Retrieval-Agents/qa_local_docs.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import os
import glob
from typing import Generator, List, Tuple
from dotenv import load_dotenv
from retrying import retry
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI as OpenAILLM
from langchain.chains.question_answering import load_qa_chain
from langchain.vectorstores import cosine_similarity

# Define the retrying decorator for specific functions
def retry_if_value_error(exception):
def retry_if_value_error(exception: Exception) -> bool:
"""Return True if we should retry (in this case when it's a ValueError), False otherwise"""
return isinstance(exception, ValueError)

def retry_if_file_not_found_error(exception):
def retry_if_file_not_found_error(exception: Exception) -> bool:
"""Return True if we should retry (in this case when it's a FileNotFoundError), False otherwise"""
return isinstance(exception, FileNotFoundError)

Expand All @@ -33,13 +34,13 @@ class PDFProcessor:
Methods
-------
get_user_query(prompt="Please enter your query: "):
get_user_query(prompt: str = "Please enter your query: ") -> str:
Get query from the user.
load_pdfs_from_directory(directory_path='data/'):
load_pdfs_from_directory(directory_path: str = 'data/') -> List[List[str]]:
Load PDFs from a specified directory.
_load_and_split_document(file_path, chunk_size=2000, chunk_overlap=0):
_load_and_split_document(file_path: str, chunk_size: int = 2000, chunk_overlap: int = 0) -> List[str]:
Load and split a single document.
perform_similarity_search(docsearch, query):
perform_similarity_search(documents: List[List[str]], query: str, num_results: int = 10) -> List[Tuple[float, str]]:
Perform similarity search on documents.
"""

Expand All @@ -66,7 +67,7 @@ def _initialize_reusable_objects(self):
self.llm = OpenAILLM(temperature=0, openai_api_key=self.OPENAI_API_KEY)

@staticmethod
def get_user_query(prompt="Please enter your query: "):
def get_user_query(prompt: str = "Please enter your query: ") -> str:
"""
Get user input for a query.
Expand All @@ -79,15 +80,15 @@ def get_user_query(prompt="Please enter your query: "):
return input(prompt)

@retry(retry_on_exception=retry_if_file_not_found_error, stop_max_attempt_number=3)
def load_pdfs_from_directory(self, directory_path='data/') -> Generator: # <--- Configure directory path HERE <---
def load_pdfs_from_directory(self, directory_path: str = 'data/') -> List[List[str]]:
"""
Load all PDF files from a given directory lazily using a generator.
Load all PDF files from a given directory.
Parameters:
directory_path (str): Directory path to load PDFs from.
Yields:
list: List of text chunks from a loaded PDF.
Returns:
List[List[str]]: List of text chunks from loaded PDFs.
"""
try:
if not os.path.exists(directory_path):
Expand All @@ -97,13 +98,15 @@ def load_pdfs_from_directory(self, directory_path='data/') -> Generator: # <---
if not pdf_files:
raise FileNotFoundError(f"No PDF files found in the directory {directory_path}.")

texts = []
for pdf_file in pdf_files:
yield self._load_and_split_document(pdf_file)
texts.extend(self._load_and_split_document(pdf_file))
return texts
except FileNotFoundError as fe:
print(f"FileNotFoundError encountered: {fe}")
raise

def _load_and_split_document(self, file_path, chunk_size=2000, chunk_overlap=0):
def _load_and_split_document(self, file_path: str, chunk_size: int = 2000, chunk_overlap: int = 0) -> List[str]:
"""
Load and split a PDF document into text chunks.
Expand All @@ -113,7 +116,7 @@ def _load_and_split_document(self, file_path, chunk_size=2000, chunk_overlap=0):
chunk_overlap (int): Overlapping characters between chunks.
Returns:
list: List of text chunks.
List[str]: List of text chunks.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file {file_path} does not exist.")
Expand All @@ -122,20 +125,30 @@ def _load_and_split_document(self, file_path, chunk_size=2000, chunk_overlap=0):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
return text_splitter.split_documents(data)

def perform_similarity_search(self, docsearch, query):
def perform_similarity_search(self, documents: List[List[str]], query: str, num_results: int = 10) -> List[Tuple[float, str]]:
"""
Perform similarity search on documents based on a query.
Parameters:
docsearch (Chroma): Chroma object containing document vectors.
documents (List[List[str]]): List of documents to search.
query (str): User query for similarity search.
num_results (int): Number of results to return.
Returns:
list: List of similar documents or chunks.
List[Tuple[float, str]]: List of tuples containing similarity score and document or chunk.
"""
if not query:
raise ValueError("Query should not be empty.")
return docsearch.similarity_search(query)
try:
if not query:
raise ValueError("Query should not be empty.")
results = []
for document in documents:
similarity_score = cosine_similarity(document, query)
results.append((similarity_score, document))
results = sorted(results, key=lambda x: x[0], reverse=True)[:num_results]
return results
except Exception as e:
print(f"An error occurred: {e}")
raise

if __name__ == "__main__":
try:
Expand All @@ -147,19 +160,14 @@ def perform_similarity_search(self, docsearch, query):
num_docs = len(texts)
print(f'Loaded {num_docs} document(s).')

# Create a Chroma object for document similarity search
docsearch = Chroma.from_documents(texts, pdf_processor.embeddings)

# Load a QA chain
chain = load_qa_chain(pdf_processor.llm, chain_type="stuff")

# Get user query for similarity search
query = pdf_processor.get_user_query()

# Perform similarity search based on the query
result = pdf_processor.perform_similarity_search(docsearch, query)
results = pdf_processor.perform_similarity_search(texts, query)

# Run the QA chain on the result
chain.run(input_documents=result, question=query)
# Print the results
for i, result in enumerate(results):
print(f"{i+1}. Similarity score: {result[0]}, Document: {result[1]}")
except Exception as e:
print(f"An error occurred: {e}")
print(f"An error occurred: {e}")
Loading

0 comments on commit f571451

Please sign in to comment.