Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import sys | |
| import asyncio | |
| import socket | |
| import random | |
| import logging | |
| import warnings | |
| import unicodedata | |
| import email | |
| from email.policy import default | |
| from typing import List, Dict, Optional, Any | |
| from urllib.parse import urlparse | |
| # Third-party imports | |
| import httpx | |
| import uvicorn | |
| import joblib | |
| import torch | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from groq import AsyncGroq, RateLimitError, APIError | |
| from dotenv import load_dotenv | |
| from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning | |
| from playwright.async_api import async_playwright | |
| # Local imports | |
| import config | |
| from models import get_ml_models, get_dl_models, FinetunedBERT | |
| from feature_extraction import process_row | |
| load_dotenv() | |
| sys.path.append(os.path.join(config.BASE_DIR, 'Message_model')) | |
| # Attempt to import the local semantic model | |
| try: | |
| from predict import PhishingPredictor | |
| except ImportError: | |
| PhishingPredictor = None | |
| # ================================================================================== | |
| # 🌈 ULTRA-AESTHETIC LOGGING SETUP (VISUAL OVERHAUL) | |
| # ================================================================================== | |
| class UltraColorFormatter(logging.Formatter): | |
| # ANSI Color Codes | |
| GREY = "\x1b[38;5;240m" | |
| CYAN = "\x1b[36m" | |
| NEON_BLUE = "\x1b[38;5;39m" | |
| NEON_GREEN = "\x1b[38;5;82m" | |
| NEON_PURPLE = "\x1b[38;5;129m" | |
| YELLOW = "\x1b[33m" | |
| ORANGE = "\x1b[38;5;208m" | |
| RED = "\x1b[31m" | |
| BOLD_RED = "\x1b[31;1m" | |
| WHITE_BOLD = "\x1b[37;1m" | |
| RESET = "\x1b[0m" | |
| # Custom Formats based on Level | |
| FORMATS = { | |
| logging.DEBUG: GREY + " 🐞 [DEBUG] %(message)s" + RESET, | |
| logging.INFO: "%(message)s" + RESET, # Info handles its own coloring in code | |
| logging.WARNING: ORANGE + " ⚠️ [WARNING] %(message)s" + RESET, | |
| logging.ERROR: RED + " ❌ [ERROR] %(message)s" + RESET, | |
| logging.CRITICAL: BOLD_RED + "\n🚨 [CRITICAL] %(message)s\n" + RESET | |
| } | |
| def format(self, record): | |
| log_fmt = self.FORMATS.get(record.levelno) | |
| formatter = logging.Formatter(log_fmt) | |
| return formatter.format(record) | |
| logger = logging.getLogger("PhishingAPI") | |
| logger.setLevel(logging.INFO) | |
| ch = logging.StreamHandler(sys.stdout) | |
| ch.setFormatter(UltraColorFormatter()) | |
| if logger.hasHandlers(): | |
| logger.handlers.clear() | |
| logger.addHandler(ch) | |
| # --- VISUAL HELPER FUNCTIONS --- | |
| def log_section(title): | |
| logger.info(f"\n{UltraColorFormatter.NEON_PURPLE}┌{'─'*70}┐") | |
| logger.info(f"{UltraColorFormatter.NEON_PURPLE}│ {UltraColorFormatter.WHITE_BOLD}{title.center(68)}{UltraColorFormatter.NEON_PURPLE} │") | |
| logger.info(f"{UltraColorFormatter.NEON_PURPLE}└{'─'*70}┘{UltraColorFormatter.RESET}") | |
| def log_step(icon, text): | |
| logger.info(f"{UltraColorFormatter.CYAN} {icon} {text}{UltraColorFormatter.RESET}") | |
| def log_substep(text, value=""): | |
| val_str = f": {UltraColorFormatter.NEON_GREEN}{value}{UltraColorFormatter.RESET}" if value else "" | |
| logger.info(f"{UltraColorFormatter.GREY} ├─ {text}{val_str}") | |
| def log_success(text): | |
| logger.info(f"{UltraColorFormatter.NEON_GREEN} ✅ {text}{UltraColorFormatter.RESET}") | |
| def log_metric(label, value, warning=False): | |
| color = UltraColorFormatter.ORANGE if warning else UltraColorFormatter.NEON_BLUE | |
| logger.info(f" {color}📊 {label}: {UltraColorFormatter.WHITE_BOLD}{value}{UltraColorFormatter.RESET}") | |
| # ================================================================================== | |
| # --- CONFIGURATION --- | |
| MAX_INPUT_CHARS = 4000 | |
| MAX_CONCURRENT_REQUESTS = 5 | |
| MAX_URLS_TO_ANALYZE = 15 | |
| LLM_MAX_RETRIES = 3 | |
| app = FastAPI( | |
| title="Phishing Detection API (Robust Ensemble)", | |
| description="Multilingual phishing detection using Weighted Ensemble (ML/DL) + LLM Semantic Analysis + Live Scraping", | |
| version="2.6.0" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| request_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) | |
| # --- DATA MODELS --- | |
| class MessageInput(BaseModel): | |
| sender: Optional[str] = "" | |
| subject: Optional[str] = "" | |
| text: Optional[str] = "" | |
| metadata: Optional[Dict] = {} | |
| class PredictionResponse(BaseModel): | |
| confidence: float | |
| reasoning: str | |
| highlighted_text: str | |
| final_decision: str | |
| suggestion: str | |
| # --- UTILITIES --- | |
| class SmartAPIKeyRotator: | |
| def __init__(self): | |
| keys_str = os.environ.get('GROQ_API_KEYS', '') | |
| self.keys = [k.strip() for k in keys_str.split(',') if k.strip()] | |
| if not self.keys: | |
| single_key = os.environ.get('GROQ_API_KEY') | |
| if single_key: | |
| self.keys = [single_key] | |
| if not self.keys: | |
| logger.critical("CRITICAL: No GROQ_API_KEYS found in environment variables!") | |
| else: | |
| log_substep("API Key Rotator", f"Initialized with {len(self.keys)} keys") | |
| self.clients = [AsyncGroq(api_key=k) for k in self.keys] | |
| self.num_keys = len(self.clients) | |
| self.current_index = 0 | |
| def get_client_and_rotate(self): | |
| if not self.clients: | |
| return None | |
| client = self.clients[self.current_index] | |
| self.current_index = (self.current_index + 1) % self.num_keys | |
| return client | |
| # Global Model Placeholders | |
| ml_models = {} | |
| dl_models = {} | |
| bert_model = None | |
| semantic_model = None | |
| key_rotator: Optional[SmartAPIKeyRotator] = None | |
| ip_cache = {} | |
| def clean_and_parse_json(text: str) -> Dict: | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| pass | |
| text = re.sub(r"json\s*", "", text, flags=re.IGNORECASE) | |
| text = re.sub(r"", "", text) | |
| try: | |
| start = text.find('{') | |
| end = text.rfind('}') | |
| if start != -1 and end != -1: | |
| json_str = text[start:end+1] | |
| return json.loads(json_str) | |
| except Exception: | |
| pass | |
| logger.error(f"Failed to parse JSON from LLM response: {text[:50]}...") | |
| return {} | |
| class EnsembleScorer: | |
| WEIGHTS = {'ml': 0.30, 'dl': 0.20, 'bert': 0.20, 'semantic': 0.10, 'network': 0.20} | |
| def calculate_technical_score(predictions: Dict, network_data: List[Dict], urls: List[str]) -> Dict: | |
| score_accum = 0.0 | |
| weight_accum = 0.0 | |
| details = [] | |
| log_step("🧮", "Calculating Ensemble Weights") | |
| # ML Scores | |
| ml_scores = [p['raw_score'] for k, p in predictions.items() if k in ['logistic', 'svm', 'xgboost']] | |
| if ml_scores: | |
| avg_ml = np.mean(ml_scores) | |
| score_accum += avg_ml * EnsembleScorer.WEIGHTS['ml'] * 100 | |
| weight_accum += EnsembleScorer.WEIGHTS['ml'] | |
| details.append(f"ML Consensus: {avg_ml:.2f}") | |
| log_substep("ML Models Consensus", f"{avg_ml:.4f} (Weight: {EnsembleScorer.WEIGHTS['ml']})") | |
| # DL Scores | |
| dl_scores = [p['raw_score'] for k, p in predictions.items() if k in ['attention_blstm', 'rcnn']] | |
| if dl_scores: | |
| avg_dl = np.mean(dl_scores) | |
| score_accum += avg_dl * EnsembleScorer.WEIGHTS['dl'] * 100 | |
| weight_accum += EnsembleScorer.WEIGHTS['dl'] | |
| details.append(f"DL Consensus: {avg_dl:.2f}") | |
| log_substep("Deep Learning Consensus", f"{avg_dl:.4f} (Weight: {EnsembleScorer.WEIGHTS['dl']})") | |
| # BERT | |
| if 'bert' in predictions: | |
| bert_s = predictions['bert']['raw_score'] | |
| score_accum += bert_s * EnsembleScorer.WEIGHTS['bert'] * 100 | |
| weight_accum += EnsembleScorer.WEIGHTS['bert'] | |
| details.append(f"BERT Score: {bert_s:.2f}") | |
| log_substep("BERT Finetuned", f"{bert_s:.4f} (Weight: {EnsembleScorer.WEIGHTS['bert']})") | |
| # Semantic | |
| if 'semantic' in predictions: | |
| sem_s = predictions['semantic']['raw_score'] | |
| score_accum += sem_s * EnsembleScorer.WEIGHTS['semantic'] * 100 | |
| weight_accum += EnsembleScorer.WEIGHTS['semantic'] | |
| log_substep("Semantic Analysis", f"{sem_s:.4f} (Weight: {EnsembleScorer.WEIGHTS['semantic']})") | |
| # Network | |
| net_risk = 0.0 | |
| net_reasons = [] | |
| for net_info in network_data: | |
| if net_info.get('proxy') or net_info.get('hosting'): | |
| net_risk += 40 | |
| net_reasons.append("Hosted/Proxy IP") | |
| org = str(net_info.get('org', '')).lower() | |
| isp = str(net_info.get('isp', '')).lower() | |
| suspicious_hosts = ['hostinger', 'namecheap', 'digitalocean', 'hetzner', 'ovh', 'flokinet'] | |
| if any(x in org or x in isp for x in suspicious_hosts): | |
| net_risk += 20 | |
| net_reasons.append(f"Cheap Cloud Provider ({org[:15]}...)") | |
| net_risk = min(net_risk, 100) | |
| score_accum += net_risk * EnsembleScorer.WEIGHTS['network'] | |
| weight_accum += EnsembleScorer.WEIGHTS['network'] | |
| log_substep("Network Risk Calculated", f"{net_risk:.2f} (Weight: {EnsembleScorer.WEIGHTS['network']})") | |
| if net_reasons: | |
| details.append(f"Network Penalties: {', '.join(list(set(net_reasons)))}") | |
| if weight_accum == 0: | |
| final_score = 50.0 | |
| else: | |
| final_score = score_accum / weight_accum | |
| return { | |
| "score": min(max(final_score, 0), 100), | |
| "details": "; ".join(details), | |
| "network_risk": net_risk | |
| } | |
| def load_models(): | |
| global ml_models, dl_models, bert_model, semantic_model, key_rotator | |
| log_section("SYSTEM STARTUP: LOADING ASSETS") | |
| models_dir = config.MODELS_DIR | |
| # Load ML | |
| for model_name in ['logistic', 'svm', 'xgboost']: | |
| try: | |
| path = os.path.join(models_dir, f'{model_name}.joblib') | |
| if os.path.exists(path): | |
| ml_models[model_name] = joblib.load(path) | |
| log_substep(f"ML Model Loaded", model_name) | |
| except Exception: | |
| pass | |
| # Load DL | |
| for model_name in ['attention_blstm', 'rcnn']: | |
| try: | |
| path = os.path.join(models_dir, f'{model_name}.pt') | |
| if os.path.exists(path): | |
| template = get_dl_models(input_dim=len(config.NUMERICAL_FEATURES)) | |
| model = template[model_name] | |
| model.load_state_dict(torch.load(path, map_location='cpu')) | |
| model.eval() | |
| dl_models[model_name] = model | |
| log_substep(f"DL Model Loaded", model_name) | |
| except Exception: | |
| pass | |
| # Load BERT | |
| bert_path = os.path.join(config.BASE_DIR, 'finetuned_bert') | |
| if os.path.exists(bert_path): | |
| try: | |
| bert_model = FinetunedBERT(bert_path) | |
| log_substep("BERT Model", "Loaded Successfully") | |
| except Exception: | |
| pass | |
| # Load Semantic | |
| sem_path = os.path.join(config.BASE_DIR, 'Message_model', 'final_semantic_model') | |
| if os.path.exists(sem_path) and PhishingPredictor: | |
| try: | |
| semantic_model = PhishingPredictor(model_path=sem_path) | |
| log_substep("Semantic Model", "Loaded Successfully") | |
| except Exception: | |
| pass | |
| key_rotator = SmartAPIKeyRotator() | |
| # --- UPDATED PARSING LOGIC --- | |
| def extract_visible_text_and_links(raw_email: str) -> tuple: | |
| """ | |
| Parse a full raw email using Python's email library and extract: | |
| - extracted_text (merged plain text + HTML text + metadata) | |
| - links (list of all URLs found anywhere) | |
| """ | |
| log_step("📨", "Parsing Email MIME Structure") | |
| if not raw_email: | |
| logger.warning("Parsing received empty email input") | |
| return "", [] | |
| extracted_text_parts = [] | |
| links = set() | |
| # Attempt 1: Try parsing as a standard MIME Email message | |
| try: | |
| msg = email.message_from_string(raw_email, policy=default) | |
| # Extract basic metadata if available | |
| metadata = { | |
| "from": msg.get("From", ""), | |
| "to": msg.get("To", ""), | |
| "subject": msg.get("Subject", "") | |
| } | |
| for k, v in metadata.items(): | |
| if v: | |
| extracted_text_parts.append(f"{k.capitalize()}: {v}") | |
| log_substep(f"Metadata [{k}]", v[:50] + "..." if len(v) > 50 else v) | |
| part_count = 0 | |
| for part in msg.walk(): | |
| part_count += 1 | |
| content_type = part.get_content_type() | |
| content_disposition = str(part.get("Content-Disposition") or "") | |
| try: | |
| if content_type == "text/plain": | |
| text_data = part.get_payload(decode=True) | |
| if text_data: | |
| text_str = text_data.decode(part.get_content_charset() or "utf-8", errors="ignore") | |
| extracted_text_parts.append(text_str) | |
| links.update(re.findall(r'https?://\S+', text_str)) | |
| elif content_type == "text/html": | |
| html_data = part.get_payload(decode=True) | |
| if html_data: | |
| html_str = html_data.decode(part.get_content_charset() or "utf-8", errors="ignore") | |
| soup = BeautifulSoup(html_str, "html.parser") | |
| extracted_text_parts.append(soup.get_text(separator="\n")) | |
| for a in soup.find_all("a", href=True): | |
| links.add(a["href"]) | |
| for img in soup.find_all("img", src=True): | |
| links.add(img["src"]) | |
| elif "attachment" in content_disposition.lower() or "inline" in content_disposition.lower(): | |
| filename = part.get_filename() | |
| if filename: | |
| extracted_text_parts.append(f"[Attachment found: {filename}]") | |
| log_substep("Attachment", filename) | |
| except Exception as e: | |
| logger.warning(f"Error parsing email part: {e}") | |
| except Exception as e: | |
| logger.error(f"Email Parsing Failed: {e}") | |
| # Combine extracted parts | |
| extracted_text = "\n".join(extracted_text_parts).strip() | |
| # --- CRITICAL FIX FOR RAW HTML PAYLOADS --- | |
| # If MIME parsing failed to extract text (extracted_text is empty), | |
| # but the input looks like HTML, force a BeautifulSoup clean. | |
| if not extracted_text: | |
| if "<html" in raw_email.lower() or "<body" in raw_email.lower() or "<div" in raw_email.lower(): | |
| log_substep("Fallback", "Input appears to be Raw HTML, stripping tags...") | |
| try: | |
| soup = BeautifulSoup(raw_email, "html.parser") | |
| extracted_text = soup.get_text(separator="\n") | |
| # Also grab links from this raw HTML | |
| for a in soup.find_all("a", href=True): | |
| links.add(a["href"]) | |
| for img in soup.find_all("img", src=True): | |
| links.add(img["src"]) | |
| except Exception: | |
| extracted_text = raw_email | |
| else: | |
| extracted_text = raw_email | |
| # Final Regex sweep for links (catch-all) | |
| links.update(re.findall(r'https?://\S+', raw_email)) | |
| cleaned_links = [] | |
| for link in links: | |
| link = link.strip().strip("<>").replace('"', "") | |
| if link.startswith("http://") or link.startswith("https://"): | |
| cleaned_links.append(link) | |
| log_success(f"Parsed Content. Extracted {len(cleaned_links)} unique URLs.") | |
| return extracted_text, cleaned_links | |
| async def extract_url_features(urls: List[str]) -> pd.DataFrame: | |
| if not urls: | |
| return pd.DataFrame() | |
| log_step("🧬", f"Extracting Features for {len(urls)} URLs") | |
| df = pd.DataFrame({'url': urls}) | |
| whois_cache, ssl_cache = {}, {} | |
| tasks = [asyncio.to_thread(process_row, row, whois_cache, ssl_cache) for _, row in df.iterrows()] | |
| feature_list_raw = await asyncio.gather(*tasks, return_exceptions=True) | |
| feature_list = [] | |
| for i, f in enumerate(feature_list_raw): | |
| if isinstance(f, Exception): | |
| logger.error(f"Feature extraction error on {urls[i]}: {f}") | |
| feature_list.append({}) | |
| else: | |
| feature_list.append(f) | |
| log_substep("Feature Extraction", "Complete") | |
| return pd.concat([df, pd.DataFrame(feature_list)], axis=1) | |
| def get_model_predictions(features_df: pd.DataFrame, message_text: str) -> Dict: | |
| predictions = {} | |
| num_feats = config.NUMERICAL_FEATURES | |
| cat_feats = config.CATEGORICAL_FEATURES | |
| if not features_df.empty: | |
| try: | |
| log_step("🤖", "Running Machine Learning Inference") | |
| X = features_df[num_feats + cat_feats].copy() | |
| X[num_feats] = X[num_feats].fillna(-1) | |
| X[cat_feats] = X[cat_feats].fillna('N/A') | |
| # ML Models | |
| for name, model in ml_models.items(): | |
| try: | |
| probas = model.predict_proba(X)[:, 1] | |
| raw_score = float(np.max(probas)) | |
| predictions[name] = {'raw_score': raw_score} | |
| log_substep(f"ML: {name.ljust(10)}", f"{raw_score:.4f}") | |
| except: | |
| predictions[name] = {'raw_score': 0.5} | |
| # DL Models | |
| if dl_models: | |
| X_num = torch.tensor(X[num_feats].values.astype(np.float32)) | |
| with torch.no_grad(): | |
| for name, model in dl_models.items(): | |
| try: | |
| out = model(X_num) | |
| raw_score = float(torch.max(out).item()) | |
| predictions[name] = {'raw_score': raw_score} | |
| log_substep(f"DL: {name.ljust(10)}", f"{raw_score:.4f}") | |
| except: | |
| predictions[name] = {'raw_score': 0.5} | |
| # BERT | |
| if bert_model: | |
| try: | |
| scores = bert_model.predict_proba(features_df['url'].tolist()) | |
| avg_score = float(np.mean([s[1] for s in scores])) | |
| predictions['bert'] = {'raw_score': avg_score} | |
| log_substep("BERT Inference", f"{avg_score:.4f}") | |
| except: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Feature Pipeline Error: {e}") | |
| if semantic_model and message_text: | |
| try: | |
| log_step("🧠", "Running Semantic Text Analysis") | |
| res = semantic_model.predict(message_text) | |
| predictions['semantic'] = {'raw_score': float(res['phishing_probability'])} | |
| log_substep("Semantic Prob", f"{res['phishing_probability']:.4f}") | |
| except: | |
| pass | |
| return predictions | |
| async def get_network_data_raw(urls: List[str]) -> List[Dict]: | |
| data = [] | |
| unique_hosts = set() | |
| for url_str in urls: | |
| try: | |
| parsed = urlparse(url_str if url_str.startswith(('http', 'https')) else f"http://{url_str}") | |
| if parsed.hostname: | |
| unique_hosts.add(parsed.hostname) | |
| except: | |
| pass | |
| target_hosts = list(unique_hosts)[:5] | |
| log_step("🌍", f"Geo-Locating Hosts: {target_hosts}") | |
| async with httpx.AsyncClient(timeout=3.0) as client: | |
| for host in target_hosts: | |
| if host in ip_cache: | |
| data.append(ip_cache[host]) | |
| log_substep(f"Cache Hit", host) | |
| continue | |
| try: | |
| ip = await asyncio.to_thread(socket.gethostbyname, host) | |
| resp = await client.get(f"http://ip-api.com/json/{ip}?fields=status,message,country,isp,org,as,proxy,hosting") | |
| if resp.status_code == 200: | |
| geo = resp.json() | |
| if geo.get('status') == 'success': | |
| geo['ip'] = ip | |
| geo['host'] = host | |
| data.append(geo) | |
| ip_cache[host] = geo | |
| log_substep(f"Resolved {host}", f"{geo.get('org', 'Unknown')} [{geo.get('country', 'UNK')}]") | |
| except Exception: | |
| log_substep(f"Failed to resolve", host) | |
| await asyncio.sleep(0.2) | |
| return data | |
| async def scrape_landing_page(urls: list[str]) -> dict: | |
| # Cap URLs to 10 | |
| urls = urls[:10] | |
| results = {} | |
| async def scrape_single(url: str): | |
| nonlocal results | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context( | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" | |
| ) | |
| page = await context.new_page() | |
| try: | |
| target_url = url if url.startswith(("http", "https")) else f"http://{url}" | |
| await page.goto(target_url, timeout=10000, wait_until="domcontentloaded") | |
| content = await page.content() | |
| soup = BeautifulSoup(content, "html.parser") | |
| for tag in soup(["script", "style", "nav", "footer", "svg", "noscript"]): | |
| tag.decompose() | |
| text = soup.get_text(separator=" ", strip=True) | |
| text = unicodedata.normalize("NFKC", text) | |
| results[url] = text[:300] | |
| except Exception as e: | |
| results[url] = f"Error accessing page: {str(e)}" | |
| finally: | |
| await browser.close() | |
| except Exception as e: | |
| results[url] = f"Scraping failed: {str(e)}" | |
| # Run all tasks concurrently | |
| tasks = [scrape_single(u) for u in urls] | |
| await asyncio.gather(*tasks) | |
| return results | |
| # --- SYSTEM PROMPT --- | |
| SYSTEM_PROMPT = """You are the 'Maverick', an elite, autonomous Cybersecurity Judge. Your sole purpose is to analyze the provided Evidence Dossier and return a JSON object. | |
| **Core Rules:** | |
| 1. **The "One Bad Link" Rule:** If the email contains **ANY** suspicious or malicious URL, the Final Decision MUST be "phishing" (100% Confidence), even if other links are legitimate. | |
| 2. **Prioritize Ground Truth:** You must prioritize **Scraped Content** (e.g., a page asking for credentials) and **Network Data** (e.g., a Bank hosted on DigitalOcean) over the Technical Score. | |
| 3. **Override Authority:** Even if the 'Technical Ensemble Score' is low (e.g., 20/100), if you find a Critical Threat in the Scraped Data or Forensic Scan, you MUST override with a High Score (90-100). | |
| 4. **Suspicious Indicators:** | |
| - **Scraped Data:** Login forms on non-official domains, "Verify Identity" text, urgency. | |
| - **Network:** Mismatch between Sender Domain and Hosting (e.g., Microsoft email hosted on Namecheap). | |
| - **Forensics:** Hidden H1 tags, Typosquatting (paypa1.com), Mismatched hrefs. | |
| 5. **Confidence score:** | |
| -Give confidence score between 0-100 based on all the evidences and the decision being made. The score >50 should be given if the email seems phishing and <50 should be given if the email seems legitimate. | |
| 6. **Final Decision:** | |
| -Based on the evidences and confidence score, give the final decision , if the final score seems phishing then give final decision as phishing and if the final score seems legitimate then give final decision as legitimate. | |
| **8 ROBUST FEW-SHOT EXAMPLES:** | |
| **Example 1: Phishing (Credential Harvesting - Scraped Data Override)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: Action Required: Unusual Sign-in Activity Detected | |
| Technical Score: 35 / 100 | |
| Network Intelligence: Host: 162.241.2.1 | Org: Unified Layer (Cheap Hosting) | ISP: Bluehost | Proxy: False | |
| Scraped Content: "Microsoft 365. Sign in to your account. Email, phone, or Skype. No account? Create one. Can't access your account? Sign-in options. Terms of Use Privacy & Cookies. © Microsoft 2025. NOTE: This page is for authorized users only." | |
| Forensic Scan: Link: http://microsoft-online-verify.com/login.php | |
| Message: "Microsoft Security Alert | |
| We detected a sign-in attempt from a new device or location. | |
| **Account:** [email protected] | |
| **Date:** Fri, Nov 28, 2025 10:23 AM GMT | |
| **Location:** Moscow, Russia | |
| **IP Address:** 103.22.14.2 | |
| **Browser:** Firefox on Windows 10 | |
| If this wasn't you, your account may have been compromised. Please **verify your identity immediately** to secure your account and avoid permanent suspension. | |
| [Secure My Account] | |
| Thanks, | |
| The Microsoft Account Team" | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 99.0, | |
| "reasoning": "CRITICAL OVERRIDE. The Scraped Data mimics a Microsoft 365 Login portal ('Sign in to your account'), but the Network Data confirms the site is hosted on 'Unified Layer/Bluehost', NOT Microsoft's official Azure infrastructure. This is a classic credential harvesting attack using a fake security alert.", | |
| "highlighted_text": "Please @@verify your identity immediately@@ to secure your account and avoid permanent suspension. @@[Secure My Account]@@", | |
| "final_decision": "phishing", | |
| "suggestion": "Do not enter credentials. This is a fake login page hosted on non-Microsoft servers." | |
| }} | |
| **Example 2: Phishing (Hidden Malicious URL - Forensic Override)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: MANDATORY: Updated Employee Handbook & Compliance Policy 2025 | |
| Technical Score: 45 / 100 | |
| Network Intelligence: Host: docs.google.com (Google LLC) | |
| Scraped Content: "Google Docs. Sign in. Employee Handbook 2025.pdf. You need permission. Request access. Switch accounts. Google Workspace." | |
| Forensic Scan: CRITICAL: Found hidden URL in H1 tag: 'http://bit.ly/malware-redirect-payload' | |
| Message: "Dear Team, | |
| As part of our annual compliance audit (ISO 27001), all employees are required to review and sign the updated Employee Handbook for the fiscal year 2025. | |
| Please access the document via the secure Google Docs link below: | |
| [docs.google.com/handbook-2025](Link) | |
| Failure to acknowledge this document by Friday may result in a temporary suspension of network access. | |
| Regards, | |
| HR Compliance Team | |
| Wipro Limited" | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 98.0, | |
| "reasoning": "Phishing. While the visible body text points to a legitimate Google Docs URL, the email contains a hidden malicious URL ('bit.ly/malware-redirect-payload') embedded in the HTML header tags. This is a sophisticated evasion tactic designed to bypass filters while tricking the user.", | |
| "highlighted_text": "Please access the document via the secure Google Docs link below: [docs.google.com/handbook-2025] @@(Hidden Header URL Detected)@@", | |
| "final_decision": "phishing", | |
| "suggestion": "Do not click. A hidden malicious payload was detected in the email structure." | |
| }} | |
| **Example 3: Phishing (Typosquatting & Urgency)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: URGENT: Wallet Suspended - Case ID #99283-AX | |
| Technical Score: 88 / 100 | |
| Network Intelligence: Host: paypa1-resolution.com | Org: Namecheap Inc | ISP: Namecheap | |
| Scraped Content: "PayPal. Security Challenge. Enter your credit card number to verify ownership. Expiration Date. CVV. Billing Address. Submit. Copyright 1999-2025 PayPal. All rights reserved." | |
| Forensic Scan: Link: https://paypa1-resolution.com/verify-identity | |
| Message: "Hello Customer, | |
| Your PayPal wallet has been temporarily suspended due to suspicious transactions totaling $400.00 USD to 'Global-Tech-Solutions Ltd'. | |
| To restore full access to your funds, you must **verify your payment method immediately**. Failure to do so within 24 hours will result in the permanent closure of your account and forfeiture of remaining balance. | |
| [Resolve Issue Now] | |
| Thank you for being a valued customer." | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 99.0, | |
| "reasoning": "Phishing. Typosquatting detected ('paypa1' instead of 'paypal'). The Scraped Data confirms the landing page asks for credit card details (CVV/Expiry), and the domain is registered via Namecheap, not PayPal's official infrastructure. High urgency and threat of 'forfeiture' are clear indicators.", | |
| "highlighted_text": "Your PayPal wallet has been temporarily @@suspended@@. To restore full access, you must @@verify your payment method immediately@@. Failure to do so within 24 hours will result in @@permanent closure@@.", | |
| "final_decision": "phishing", | |
| "suggestion": "Delete immediately. This is an impersonation attack stealing financial data." | |
| }} | |
| **Example 4: Legitimate (Internal Corporate Email)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: Scheduled Maintenance - Server Room B - Sunday Nov 30 | |
| Technical Score: 15 / 100 | |
| Network Intelligence: Host: internal.daiict.ac.in | Org: DA-IICT | ISP: NKN (National Knowledge Network) | Proxy: False | |
| Scraped Content: "DA-IICT Intranet Login. Authorized Personnel Only. User ID. Password. Forgot Password? IT Helpdesk Extension: 4444." | |
| Forensic Scan: Link: https://internal.daiict.ac.in/wiki/maintenance-schedule | |
| Message: "Dear Faculty and Staff, | |
| The central server room (Room B) will be undergoing scheduled maintenance on Sunday, Nov 30, from 2:00 AM to 4:00 AM IST. | |
| **Services affected:** | |
| 1. Moodle (LMS) | |
| 2. Faculty Webmail | |
| 3. Student Portal | |
| Please ensure all work is saved before this time. For the full maintenance schedule and alternative access points, please check the wiki: | |
| https://internal.daiict.ac.in/wiki/maintenance-schedule | |
| Regards, | |
| IT Services" | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 5.0, | |
| "reasoning": "Legitimate. The sender domain, link domain, and network infrastructure (DA-IICT/NKN) all align perfectly. The scraped content matches an internal login page. The content is informational with specific details and lacks suspicious urgency or external hosting.", | |
| "highlighted_text": "The central server room (Room B) will be undergoing scheduled maintenance on Sunday. Please check the wiki: https://internal.daiict.ac.in/wiki/maintenance-schedule", | |
| "final_decision": "legitimate", | |
| "suggestion": "Safe internal communication regarding maintenance." | |
| }} | |
| **Example 5: Phishing (Legitimate Cloud Abuse - Google Forms)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: Job Offer: Senior Python Developer - Document Upload | |
| Technical Score: 60 / 100 | |
| Network Intelligence: Host: forms.gle (Google LLC) | |
| Scraped Content: "Untitled Form. Job Application - Senior Dev. Upload your Passport Front and Back. Upload SSN Card. Upload Drivers License. Submit. Never submit passwords through Google Forms." | |
| Forensic Scan: Link: https://forms.gle/xyz123abc | |
| Message: "Dear Candidate, | |
| Congratulations! We are pleased to offer you the Senior Python Developer position at TechSolutions Inc. | |
| To proceed with your background check and contract generation, we need you to upload the following documents immediately: | |
| 1. Scanned Copy of Passport | |
| 2. Social Security Number (SSN) Card | |
| 3. Driver's License | |
| Please upload them to our secure portal here: | |
| [Link to Google Form] | |
| We need this by EOD today." | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 92.0, | |
| "reasoning": "Phishing. Although the domain is legitimate (Google Forms), the Scraped Content reveals it is soliciting highly sensitive PII (Passport/SSN). Legitimate companies do not collect SSNs via public Google Forms. The use of a generic Gmail address for a 'Recruitment Officer' is also a red flag.", | |
| "highlighted_text": "To proceed with your background check... please @@upload the following documents immediately@@: 1. Scanned Copy of Passport 2. Social Security Number (SSN) Card. Please upload them to our secure portal here: @@[Link to Google Form]@@", | |
| "final_decision": "phishing", | |
| "suggestion": "Do not upload sensitive ID documents to public forms. This is likely identity theft." | |
| }} | |
| **Example 6: Legitimate (Transactional Alert)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: Transaction Alert: INR 5,000.00 Debited | |
| Technical Score: 8 / 100 | |
| Network Intelligence: Host: hdfcbank.net | Org: HDFC Bank Ltd | ISP: Sify Technologies | |
| Scraped Content: N/A (No Links) | |
| Forensic Scan: No URLs found. | |
| Message: "Dear Customer, | |
| INR 5,000.00 was debited from your A/c XX1234 on 28-Nov-2025. | |
| **Info:** UPI-12345-AmazonPay | |
| **Available Bal:** INR 42,000.00 | |
| If this transaction was not done by you, please call our 24x7 PhoneBanking number 1800-202-6161 immediately to block your card/account. | |
| Warm Regards, | |
| HDFC Bank" | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 2.0, | |
| "reasoning": "Legitimate. This is a standard text-only transactional alert. The sender domain matches HDFC Bank's official domain, and the network data confirms it. The phone number is a standard support line. There are no suspicious links.", | |
| "highlighted_text": "INR 5,000.00 was debited from your A/c XX1234 on 28-Nov-2025. Info: UPI-12345-AmazonPay.", | |
| "final_decision": "legitimate", | |
| "suggestion": "Safe transactional alert. No action needed unless the transaction is unrecognized." | |
| }} | |
| **Example 7: Phishing (CEO Fraud / BEC - No Links)** | |
| **Input:** | |
| Sender: [email protected] (Spoofed Name: "Elon Musk") | |
| Subject: Urgent Task - Confidential - DO NOT IGNORE | |
| Technical Score: 75 / 100 | |
| Network Intelligence: Host: mail.google.com (Google LLC) | |
| Scraped Content: N/A | |
| Forensic Scan: No URLs found. | |
| Message: "Akshat, | |
| I am currently in a closed-door meeting with the board of investors and cannot talk on the phone. I need a favor. | |
| I need you to purchase 5 Apple Gift Cards ($100 each) for a client gift. It is urgent and needs to be done in the next 30 minutes. I will reimburse you personally by this evening. | |
| Do not mention this to anyone else yet. Reply with the codes here as soon as you have them. | |
| Elon." | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 90.0, | |
| "reasoning": "Phishing (BEC). Classic Business Email Compromise. The Sender is using a generic Gmail address to impersonate a C-level executive. The request involves financial urgency (Gift Cards), secrecy ('closed-door meeting', 'do not mention'), and bypasses standard procurement channels.", | |
| "highlighted_text": "I need you to @@purchase 5 Apple Gift Cards@@ ($100 each) for a client gift. It is urgent... @@Reply with the codes here@@ as soon as you have them.", | |
| "final_decision": "phishing", | |
| "suggestion": "Do not reply. Verify this request with the CEO via a different, verified channel (Slack/Phone/Corporate Email)." | |
| }} | |
| **Example 8: Legitimate (Marketing with Trackers)** | |
| **Input:** | |
| Sender: [email protected] | |
| Subject: Recommended for you: Python for Everybody Specialization | |
| Technical Score: 20 / 100 | |
| Network Intelligence: Host: links.coursera.org | Org: Coursera Inc | ISP: Amazon.com | |
| Scraped Content: "Coursera. Master Python. Enroll for Free. Starts Nov 29. Financial Aid available. Top Instructors. University of Michigan. 4.8 Stars (120k ratings)." | |
| Forensic Scan: Link: https://links.coursera.org/track/click?id=12345&user=akshat | |
| Message: "Hi Student, | |
| Based on your interest in Data Science, we found a course you might like: | |
| **Python for Everybody Specialization** | |
| Offered by University of Michigan. | |
| Start learning today and build job-ready skills. | |
| [Enroll Now] | |
| See you in class, | |
| The Coursera Team | |
| 381 E. Evelyn Ave, Mountain View, CA 94041" | |
| **Correct Decision:** | |
| {{ | |
| "confidence": 10.0, | |
| "reasoning": "Legitimate. Standard marketing email from a known education platform. Network data confirms the link tracking domain belongs to Coursera (hosted on AWS). Scraped content is consistent with the offer. Address matches public records.", | |
| "highlighted_text": "Based on your interest in Data Science, we found a course you might like: Python for Everybody Specialization. [Enroll Now]", | |
| "final_decision": "legitimate", | |
| "suggestion": "Safe marketing email." | |
| }}""" | |
| async def get_groq_decision(ensemble_result: Dict, network_data: List[Dict], landing_page_text: str, cleaned_text: str, original_raw_html: str, readable_display_text: str, sender: str, subject: str): | |
| net_str = "No Network Data" | |
| if network_data: | |
| net_str = "\n".join([ | |
| f"- Host: {d.get('host')} | IP: {d.get('ip')} | Org: {d.get('org')} | ISP: {d.get('isp')} | Hosting/Proxy: {d.get('hosting') or d.get('proxy')}" | |
| for d in network_data | |
| ]) | |
| log_step("🔎", "Starting Forensic HTML Scan") | |
| forensic_report = [] | |
| try: | |
| soup = BeautifulSoup(original_raw_html, 'html.parser') | |
| # A. Scan Forms | |
| for form in soup.find_all('form'): | |
| action = form.get('action') | |
| if action: | |
| forensic_report.append(f"CRITICAL: Found URL in <form action>: {action}") | |
| # B. Scan Images | |
| for img in soup.find_all('img'): | |
| src = img.get('src') | |
| if src: | |
| forensic_report.append(f"Found URL in <img src>: {src}") | |
| # C. Scan Links | |
| for a in soup.find_all('a'): | |
| href = a.get('href') | |
| if href: | |
| forensic_report.append(f"Found URL in <a href>: {href}") | |
| # D. Scan Raw Text (Catches the H1 Case) | |
| url_pattern = r'(?:https?://|ftp://|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:/[\w\-\._~:/?#[\]@!$&\'()*+,;=]*)?' | |
| all_text_urls = set(re.findall(url_pattern, original_raw_html)) | |
| if all_text_urls: | |
| forensic_report.append(f"All URLs detected in raw text: {', '.join(all_text_urls)}") | |
| except Exception as e: | |
| logger.warning(f"Forensic Scan Error: {e}") | |
| forensic_report.append("Forensic scan failed to parse HTML structure.") | |
| forensic_str = "\n".join(forensic_report) if forensic_report else "No URLs found in forensic scan." | |
| log_substep("Forensic Scan", f"Found {len(forensic_report)} potential indicators") | |
| # CRITICAL: Ensure input text is truncated to fit token limits | |
| prompt_display_text = readable_display_text[:MAX_INPUT_CHARS] | |
| prompt = f""" | |
| **ANALYSIS CONTEXT** | |
| Sender: {sender} | |
| Subject: {subject} | |
| **FORENSIC URL SCAN (INTERNAL HTML ANALYSIS)** | |
| The system scanned the raw HTML and found these URLs (hidden in tags): | |
| {forensic_str} | |
| **TECHNICAL INDICATORS** | |
| Calculated Ensemble Score: {ensemble_result['score']:.2f} / 100 | |
| Key Factors: {ensemble_result['details']} | |
| **NETWORK GROUND TRUTH** | |
| {net_str} | |
| **LANDING PAGE PREVIEW (Scraped Text)** | |
| "{landing_page_text}" | |
| **MESSAGE CONTENT (READABLE VERSION)** | |
| "{prompt_display_text}" | |
| **TASK:** | |
| Analyze the "FORENSIC URL SCAN" findings. | |
| - If ANY URL in the forensic scan is NSFW/Adult or malicious, flag as PHISHING. | |
| - If a URL looks like a generated subdomain (e.g. 643646.me) or is unrelated to the sender, FLAG AS PHISHING immediately. | |
| - IMPORTANT: For the 'highlighted_text' field in your JSON response, use the **MESSAGE CONTENT (READABLE VERSION)** provided above. Do NOT output raw HTML tags. Just mark suspicious parts in the readable text with @@...@@. | |
| """ | |
| attempts = 0 | |
| while attempts < LLM_MAX_RETRIES: | |
| try: | |
| client = key_rotator.get_client_and_rotate() | |
| if not client: | |
| raise Exception("No Keys") | |
| log_step("🚀", f"Sending LLM Request (Attempt {attempts+1}/{LLM_MAX_RETRIES})") | |
| completion = await client.chat.completions.create( | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| model="meta-llama/llama-4-scout-17b-16e-instruct", | |
| temperature=0.1, | |
| max_tokens=4096, | |
| response_format={"type": "json_object"} | |
| ) | |
| raw_content = completion.choices[0].message.content | |
| log_substep("LLM Response Received", f"Length: {len(raw_content)} chars") | |
| parsed_json = clean_and_parse_json(raw_content) | |
| if parsed_json: | |
| log_success("LLM Response Parsed Successfully") | |
| return parsed_json | |
| else: | |
| raise ValueError("Empty or Invalid JSON from LLM") | |
| except RateLimitError as e: | |
| wait_time = 2 ** (attempts + 1) + random.uniform(0, 1) | |
| if hasattr(e, 'headers') and 'retry-after' in e.headers: | |
| try: | |
| wait_time = float(e.headers['retry-after']) + 1 | |
| except: | |
| pass | |
| logger.warning(f"LLM Rate Limit (429). Retrying in {wait_time:.2f}s...") | |
| await asyncio.sleep(wait_time) | |
| attempts += 1 | |
| except Exception as e: | |
| logger.warning(f"LLM Attempt {attempts+1} failed: {e}") | |
| attempts += 1 | |
| await asyncio.sleep(1) | |
| is_phishing = ensemble_result['score'] > 50 | |
| return { | |
| "confidence": ensemble_result['score'], | |
| "reasoning": f"LLM Unavailable after retries. Decision based purely on Technical Score ({ensemble_result['score']:.2f}).", | |
| "highlighted_text": readable_display_text, | |
| "final_decision": "phishing" if is_phishing else "legitimate", | |
| "suggestion": "Exercise caution. Automated analysis detected risks." if is_phishing else "Appears safe." | |
| } | |
| async def startup(): | |
| logger.info(f"\n{UltraColorFormatter.NEON_BLUE}{'='*70}") | |
| logger.info(f"{UltraColorFormatter.WHITE_BOLD} PHISHING DETECTION API v2.6.0 - SYSTEM STARTUP ".center(80)) | |
| logger.info(f"{UltraColorFormatter.NEON_BLUE}{'='*70}{UltraColorFormatter.RESET}") | |
| load_models() | |
| logger.info(f"\n{UltraColorFormatter.NEON_GREEN}🚀 SYSTEM READY AND LISTENING ON PORT 8000{UltraColorFormatter.RESET}\n") | |
| async def predict(input_data: MessageInput): | |
| log_section(f"NEW REQUEST: {input_data.sender}") | |
| if not input_data.text or not input_data.text.strip(): | |
| logger.warning("Received empty input text.") | |
| return PredictionResponse( | |
| confidence=0.0, | |
| reasoning="Empty input.", | |
| highlighted_text="", | |
| final_decision="legitimate", | |
| suggestion="None" | |
| ) | |
| async with request_semaphore: | |
| try: | |
| start_time = time.time() | |
| # Step 1: Parsing | |
| extracted_text, all_urls = extract_visible_text_and_links(input_data.text) | |
| # Cleaning | |
| url_pattern_for_cleaning = r'(?:https?://|ftp://|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:/[\w\-\._~:/?#[\]@!$&\'()*+,;=]*)?' | |
| cleaned_text_for_models = re.sub(url_pattern_for_cleaning, '', extracted_text) | |
| cleaned_text_for_models = ' '.join(cleaned_text_for_models.lower().split()) | |
| all_urls = all_urls[:MAX_URLS_TO_ANALYZE] | |
| if all_urls: | |
| log_step("🔗", f"Proceeding with {len(all_urls)} URLs") | |
| else: | |
| log_step("🚫", "No URLs Detected - Skipping Feature Extraction") | |
| features_df = pd.DataFrame() | |
| network_data_raw = [] | |
| landing_page_text = "" | |
| # Step 3: Async Tasks | |
| if all_urls: | |
| log_step("⚡", "Initiating Parallel Async Tasks") | |
| results = await asyncio.gather( | |
| extract_url_features(all_urls), | |
| get_network_data_raw(all_urls), | |
| scrape_landing_page(all_urls) | |
| ) | |
| features_df, network_data_raw, landing_page_text = results | |
| if isinstance(landing_page_text, dict): | |
| landing_page_text = "\n".join(f"{u}: {txt}" for u, txt in landing_page_text.items()) | |
| else: | |
| landing_page_text = str(landing_page_text) | |
| # Step 4: Ensemble | |
| predictions = await asyncio.to_thread(get_model_predictions, features_df, cleaned_text_for_models) | |
| ensemble_result = EnsembleScorer.calculate_technical_score(predictions, network_data_raw, all_urls) | |
| log_metric("Ensemble Technical Score", f"{ensemble_result['score']:.2f}/100", warning=ensemble_result['score']>50) | |
| # Step 5: LLM | |
| llm_result = await get_groq_decision( | |
| ensemble_result, | |
| network_data_raw, | |
| landing_page_text, | |
| cleaned_text_for_models, | |
| input_data.text, | |
| extracted_text, | |
| input_data.sender, | |
| input_data.subject | |
| ) | |
| # --- START OF FIX: LOGIC ENFORCEMENT --- | |
| # 1. Normalize the decision string | |
| final_dec = llm_result.get('final_decision', 'legitimate').lower() | |
| if final_dec not in ['phishing', 'legitimate']: | |
| final_dec = 'legitimate' | |
| # 2. Get the raw confidence | |
| final_confidence = float(llm_result.get('confidence', ensemble_result['score'])) | |
| # 3. FORCE ALIGNMENT (The Fix) | |
| # If verdict is Phishing, score MUST be > 50. | |
| if final_dec == "phishing" and final_confidence <= 50: | |
| logger.warning(f"⚠️ Consistency Fix: Verdict is Phishing but Score was {final_confidence}. Forcing to 85.0.") | |
| final_confidence = max(60.0, final_confidence) # Force to high confidence | |
| # If verdict is Legitimate, score MUST be < 50. | |
| elif final_dec == "legitimate" and final_confidence > 50: | |
| logger.warning(f"⚠️ Consistency Fix: Verdict is Legitimate but Score was {final_confidence}. Forcing to 15.0.") | |
| final_confidence = min(40.0, final_confidence) # Force to low confidenc | |
| # --- END OF FIX --- | |
| elapsed = time.time() - start_time | |
| log_section("REQUEST COMPLETE") | |
| log_metric("Execution Time", f"{elapsed:.2f}s") | |
| log_metric("Technical Score", f"{ensemble_result['score']:.0f}") | |
| decision_color = UltraColorFormatter.BOLD_RED if final_dec == "phishing" else UltraColorFormatter.NEON_GREEN | |
| logger.info(f" ⚖️ FINAL VERDICT: {decision_color}{final_dec.upper()}{UltraColorFormatter.RESET} (Conf: {final_confidence})") | |
| return PredictionResponse( | |
| confidence=final_confidence, | |
| reasoning=llm_result.get('reasoning', ensemble_result['details']), | |
| highlighted_text=llm_result.get('highlighted_text', extracted_text), | |
| final_decision=final_dec, | |
| suggestion=llm_result.get('suggestion', 'Check details carefully.') | |
| ) | |
| except Exception as e: | |
| logger.error(f"CRITICAL FAILURE in Prediction Pipeline: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) |