Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse | |
| import threading | |
| import asyncio | |
| import mysql.connector | |
| import json | |
| import logging | |
| import pandas as pd | |
| from llama_cpp import Llama | |
| from transformers import pipeline | |
| import os | |
| app = FastAPI() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Email and database configuration | |
| DB_CONFIG = os.getenv('db') | |
| # System prompt for LLM | |
| prompt = os.getenv('prompt') | |
| # Function to insert extracted shipment details into MySQL database | |
| def insert_data(extracted_details): | |
| try: | |
| mydb = mysql.connector.connect(**DB_CONFIG) | |
| cursor = mydb.cursor() | |
| # Skip insertion if all required fields are empty | |
| required_fields = ['origin', 'destination', 'expected_shipment_datetime', | |
| 'types_of_service', 'warehouse', 'description', | |
| 'quantities', 'carrier_details'] | |
| if all(extracted_details.get(field) in [None, ""] for field in required_fields): | |
| logger.info("Skipping insertion: All extracted values are empty.") | |
| return | |
| sql = """ | |
| INSERT INTO shipment_details ( | |
| origin, destination, expected_shipment_datetime, types_of_service, | |
| warehouse, description, quantities, carrier_details | |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
| """ | |
| values = ( | |
| extracted_details.get('origin'), | |
| extracted_details.get('destination'), | |
| extracted_details.get('expected_shipment_datetime'), | |
| extracted_details.get('types_of_service'), | |
| extracted_details.get('warehouse'), | |
| extracted_details.get('description'), | |
| extracted_details.get('quantities'), | |
| extracted_details.get('carrier_details') | |
| ) | |
| cursor.execute(sql, values) | |
| mydb.commit() | |
| logger.info("Data inserted successfully.") | |
| except mysql.connector.Error as db_err: | |
| logger.error(f"Database error: {db_err}") | |
| except Exception as ex: | |
| logger.error(f"Error inserting data: {ex}") | |
| # Function to read and process emails | |
| def read_email(): | |
| logger.info("Loading Llama model...") | |
| llm = Llama.from_pretrained( | |
| repo_id="microsoft/Phi-3-mini-4k-instruct-gguf", | |
| filename="Phi-3-mini-4k-instruct-fp16.gguf", n_ctx=2048 | |
| ) | |
| logger.info("Llama model loaded.") | |
| logger.info("Reading emails from CSV...") | |
| df = pd.read_csv('./emails.csv') | |
| for i in df['Body']: | |
| logger.info(f"Processing email: {i}") | |
| output = llm( | |
| f"<|system|>\n{prompt}<|end|><|user|>\n{i}<|end|>\n<|assistant|>", | |
| max_tokens=256, | |
| stop=["<|end|>"], | |
| echo=False) | |
| logger.info("Extracting details...") | |
| t = output['choices'][0]['text'] | |
| logger.info('the model output : \n',t) | |
| extracted_details = json.loads(t[t.find('{'):t.find('}') + 1].replace("'", '"')) | |
| extracted_details = {key.lower().replace(" ", "_"): value for key, value in extracted_details.items()} | |
| # Add meta data placeholders | |
| meta_data = { | |
| 'sender': None, | |
| 'receiver': None, | |
| 'cc': None, | |
| 'bcc': None, | |
| 'subject': None | |
| } | |
| extracted_details.update(meta_data) | |
| logger.info(f"Full extracted data: {extracted_details}") | |
| insert_data(extracted_details) | |
| # Global variable to control the email processing loop | |
| running = False | |
| # HTML content for the web interface | |
| html_content = """ | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Email Processing</title> | |
| <style> | |
| body { font-family: Arial, sans-serif; margin: 50px; } | |
| h1 { color: #333; } | |
| button { | |
| padding: 10px 20px; | |
| margin: 10px; | |
| background-color: #4CAF50; | |
| color: white; | |
| border: none; | |
| cursor: pointer; | |
| } | |
| button.stop { background-color: #f44336; } | |
| #status { font-weight: bold; } | |
| </style> | |
| <script> | |
| async function startLoop() { | |
| const response = await fetch('/start', { method: 'POST' }); | |
| const result = await response.text(); | |
| document.getElementById("status").innerHTML = result; | |
| } | |
| async function stopLoop() { | |
| const response = await fetch('/stop', { method: 'POST' }); | |
| const result = await response.text(); | |
| document.getElementById("status").innerHTML = result; | |
| } | |
| </script> | |
| </head> | |
| <body> | |
| <h1>Email Processing Status: <span id="status">{{ status }}</span></h1> | |
| <button onclick="startLoop()">Start</button> | |
| <button class="stop" onclick="stopLoop()">Stop</button> | |
| </body> | |
| </html> | |
| """ | |
| # Function to process emails in a loop asynchronously | |
| async def email_processing_loop(): | |
| global running | |
| logger.info("Starting email processing loop...") | |
| while running: | |
| logger.info("Processing emails...") | |
| read_email() | |
| await asyncio.sleep(10) # Non-blocking delay for the loop | |
| # Endpoint to display the current email processor status | |
| async def home(): | |
| global running | |
| print(os.getenv('db')) | |
| status = "Running" if running else "Stopped" | |
| return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200) | |
| # Endpoint to start the email processing loop | |
| async def start_email_loop(): | |
| global running | |
| if not running: | |
| running = True | |
| asyncio.ensure_future(email_processing_loop()) | |
| logger.info("Email processing loop started.") | |
| return "Running" | |
| else: | |
| return "Already running" | |
| # Endpoint to stop the email processing loop | |
| async def stop_email_loop(): | |
| global running | |
| if running: | |
| running = False | |
| logger.info("Email processing loop stopped.") | |
| return "Stopped" | |
| else: | |
| return "Already stopped" | |
| if __name__ == "__main__": | |
| logger.info("Starting FastAPI server...") | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |