diff --git a/guides/automated_monitoring_alerting/.env.example b/guides/automated_monitoring_alerting/.env.example new file mode 100644 index 0000000..654c25c --- /dev/null +++ b/guides/automated_monitoring_alerting/.env.example @@ -0,0 +1,2 @@ +ZEROENTROPY_API_KEY = "your api key" +SLACK_WEBHOOK_URL="your webhook url" \ No newline at end of file diff --git a/guides/automated_monitoring_alerting/README.md b/guides/automated_monitoring_alerting/README.md new file mode 100644 index 0000000..698945b --- /dev/null +++ b/guides/automated_monitoring_alerting/README.md @@ -0,0 +1,49 @@ +# Automated Monitoring & Alerting with ZeroEntropy + +This guide demonstrates how to set up an automated workflow that monitors a folder for new documents, indexes them with ZeroEntropy, and sends an alert when new, relevant content is detected based on semantic queries. + +## Overview + +- **Monitor** a folder for new or updated files (PDF, TXT, CSV, etc.) +- **Index** new documents automatically using the ZeroEntropy API +- **Query** the new content for matches to a set of important topics or keywords +- **Alert** your team via email or Slack if relevant content is found + +Some use cases for this could be: +- Keeping teams up-to-date with new, relevant information +- Automating knowledge discovery in shared drives or document repositories +- Surfacing important updates as soon as they appear + +## Prerequisites + +- Python 3.8+ +- ZeroEntropy API key ([Get yours here](https://dashboard.zeroentropy.dev)) +- `zeroentropy`, `python-dotenv`, `watchdog`, and `requests` Python packages +- (Optional) Email or Slack webhook for alerts + +## Setup Instructions + +1. **Install dependencies:** + ```bash + pip install zeroentropy python-dotenv watchdog requests + ``` +2. **Create a `.env` file** with your ZeroEntropy API key: + ```bash + ZEROENTROPY_API_KEY=your_api_key_here + ``` +3. **Configure your alerting method** in the script (email or Slack) +4. **Run the script:** + ```bash + python monitor_and_alert.py + ``` + +## How it Works + +- The script watches a specified folder for new or changed files. +- When a new file is detected, it is indexed with ZeroEntropy. +- The script runs a set of semantic queries against the new content. +- If any query returns a relevant result, an alert is sent to your team. + +--- + +See `monitor_and_alert.py` for the implementation and configuration options. \ No newline at end of file diff --git a/guides/automated_monitoring_alerting/index.py b/guides/automated_monitoring_alerting/index.py new file mode 100644 index 0000000..4a981c6 --- /dev/null +++ b/guides/automated_monitoring_alerting/index.py @@ -0,0 +1,92 @@ +from zeroentropy import AsyncZeroEntropy, ConflictError +import asyncio +from dotenv import load_dotenv +from tqdm.asyncio import tqdm +import os +import base64 + +load_dotenv() + +zclient = AsyncZeroEntropy() +sem = asyncio.Semaphore(16) + +async def index_document(document_path: str, collection_name: str) -> None: + response = None + if not os.path.exists(document_path): + raise FileNotFoundError(f"File {document_path} not found") + + # for csv we will index each row as a separate document + if os.path.splitext(document_path)[1] == ".csv": + with open(document_path) as f: + documents = f.readlines() + for i, document in enumerate(documents): + async with sem: + for _retry in range(3): + try: + content = { "type": "text", "text": document } + response = await zclient.documents.add( + collection_name=collection_name, + path=f"{document_path}_{i}", + content=content, + metadata={"type": "csv"}, + ) + break + except ConflictError as e: + print(f"Document '{document_path}' already exists in collection '{collection_name}'") + # for pdf we need to specifify the type so we can use OCR + elif os.path.splitext(document_path)[1] == ".pdf": + with open(document_path, "rb") as f: + pdf_bytes = f.read() + pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8") + async with sem: + for _retry in range(3): + try: + content = { "type": "auto", "base64_data": pdf_base64 } #this will automatically OCR the PDF + response = await zclient.documents.add( + collection_name=collection_name, + path=document_path, + content=content, + metadata={"type": "pdf"}, + ) + break + except ConflictError as e: + print(f"Document '{document_path}' already exists in collection '{collection_name}'") + #for txt no need to use OCR + elif os.path.splitext(document_path)[1] == ".txt": + with open(document_path, "r", encoding="utf-8") as f: + text = f.read() + async with sem: + for _retry in range(3): + try: + content = { "type": "text", "text": text } + response = await zclient.documents.add( + collection_name=collection_name, + path=document_path, + content=content, + metadata={"type": "text"}, + ) + break + except ConflictError as e: + print(f"Document '{document_path}' already exists in collection '{collection_name}'") + else: + print(f"Unsupported file type: {os.path.splitext(document_path)[1]}") + return None + return response + +async def main(): + #list all files in the data folder + + DATA_DIR = "./data" + COLLECTION_NAME = "default" + + documents_path = [os.path.join(DATA_DIR, file) for file in os.listdir(DATA_DIR)] + try: + await zclient.collections.add(collection_name=COLLECTION_NAME) + except ConflictError: + print(f"Collection '{COLLECTION_NAME}' already exists") + print(f"Indexing {len(documents_path)} documents in collection '{COLLECTION_NAME}'") + await tqdm.gather(*[index_document(document_path, COLLECTION_NAME) for document_path in documents_path], desc="Indexing Documents") + print(f"Indexing completed for collection '{COLLECTION_NAME}'") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/guides/automated_monitoring_alerting/monitor_and_alert.py b/guides/automated_monitoring_alerting/monitor_and_alert.py new file mode 100644 index 0000000..abf20e9 --- /dev/null +++ b/guides/automated_monitoring_alerting/monitor_and_alert.py @@ -0,0 +1,94 @@ +import os +import time +import threading +import asyncio +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from zeroentropy import ZeroEntropy +from slack_sdk.webhook import WebhookClient +from index import index_document +from query import query_collection +from dotenv import load_dotenv +load_dotenv() + + +# --- Configuration --- +FOLDER_TO_WATCH = './example' # Folder to monitor +COLLECTION_NAME = 'example' +# Add queries depending on the use case +QUERIES = [ + 'security vulnerability' +] +# For now the implementation supports slack alerts. +ALERT_METHOD = 'slack' + +SLACK_WEBHOOK_URL = os.getenv('SLACK_WEBHOOK_URL') +API_KEY = os.getenv('ZEROENTROPY_API_KEY') + + + +zclient = ZeroEntropy(api_key=API_KEY) + +# sending alert via slack +def send_slack_alert(message): + webhook = WebhookClient(SLACK_WEBHOOK_URL) + response = webhook.send(text=message) + if response.status_code != 200: + print(f"Failed to send Slack alert: {response.status_code}") + else: + print(f"Slack alert sent successfully: {response.status_code}") + + +# Indexing and querying the documents and sending alerts if relevant content is found. +def index_and_alert(filepath): + ext = os.path.splitext(filepath)[1].lower() + if ext not in ['.txt', '.pdf', '.csv']: + print(f'Skipping unsupported file type: {filepath}') + return + + async def async_index_and_query(): + try: + await index_document(filepath, COLLECTION_NAME) + print(f'Indexed: {filepath}') + for query in QUERIES: + results = await query_collection(COLLECTION_NAME, query, top_k_csv=1, top_k_txt=1) + if results and hasattr(results[0], 'score') and results[0].score > 0.2: + alert_msg = f'Relevant content found for query "{query}" in file {filepath}.' + send_slack_alert(alert_msg) + except Exception as e: + print(f'Error indexing or querying {filepath}: {e}') + + # Run the async function in a new event loop in a thread + threading.Thread(target=lambda: asyncio.run(async_index_and_query())).start() + +# Watchdog handler to monitor the folder for new files and modified files. +class NewFileHandler(FileSystemEventHandler): + def on_created(self, event): + if not event.is_directory: + threading.Thread(target=index_and_alert, args=(event.src_path,)).start() + def on_modified(self, event): + if not event.is_directory: + threading.Thread(target=index_and_alert, args=(event.src_path,)).start() + +def main(): + # check if collection exists and create it if it doesn't + try: + zclient.collections.add(collection_name=COLLECTION_NAME) + except Exception: + pass + + # Start monitoring + event_handler = NewFileHandler() + observer = Observer() + observer.schedule(event_handler, FOLDER_TO_WATCH, recursive=False) + observer.start() + print(f'Watching folder: {FOLDER_TO_WATCH}') + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + observer.join() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/guides/automated_monitoring_alerting/query.py b/guides/automated_monitoring_alerting/query.py new file mode 100644 index 0000000..29c32a1 --- /dev/null +++ b/guides/automated_monitoring_alerting/query.py @@ -0,0 +1,53 @@ +from zeroentropy import AsyncZeroEntropy +import asyncio +from dotenv import load_dotenv + +load_dotenv() + +zclient = AsyncZeroEntropy() + +async def query_collection(collection_name: str, query: str, top_k_csv: int = 5, top_k_txt: int = 10) -> None: + # get the top 5 rows of the csv + response_csv = await zclient.queries.top_documents(collection_name=collection_name, + k=top_k_csv, + query=query, + filter={ + "type": + {"$eq":"csv"} + } + ) + response_txt = await zclient.queries.top_snippets(collection_name=collection_name, + k=top_k_txt, + query=query, + precise_responses = True, # this controls the length of the snippets (around 200 chars or 2000 chars more or less) + filter={ + "type": + {"$ne":"csv"} + } + ) + # get the content of the documents csv (not included in the response for top documents) + final_response = [] + for result in response_csv.results: + document_content = await zclient.documents.get_info(collection_name=collection_name, path=result.path, include_content=True) + response = { + "path": result.path, + "content": document_content.document.content, + "score": result.score, + "metadata": result.metadata + } + final_response.append(response) + + # combine the response with the snippets + return final_response + response_txt.results + +async def main(): + COLLECTION_NAME = "default" + query = "This is a test query" + response = await query_collection(COLLECTION_NAME, query, top_k_csv=5, top_k_txt=10) + for i, result in enumerate(response): + print(f"Result {i+1}:") + print(result) + print("\n") + +if __name__ == "__main__": + asyncio.run(main())