from fastapi import APIRouter, HTTPException from pydantic import BaseModel from typing import List, Dict, Optional from urllib.parse import urlencode, urlparse import urllib.parse import numpy as np from time import sleep import logging import requests from bs4 import BeautifulSoup import re from app.services.openai_client import OpenAIClient from app.config import OPENAI_API_KEY from app.websites.fact_checker_website import SOURCES, get_all_sources from app.api.ai_fact_check import ai_fact_check from app.models.fact_check_models import ( AIFactCheckRequest, AIFactCheckResponse, VerificationResult, TokenUsage ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) scrap_websites_router = APIRouter() # Configuration for scraping MAX_RETRIES = 2 RETRY_DELAY = 2 class SearchRequest(BaseModel): search_text: str source_types: List[str] = ["fact_checkers"] class UrlSimilarityInfo(BaseModel): url: str similarity: float extracted_text: str class SearchResponse(BaseModel): results: Dict[str, List[str]] error_messages: Dict[str, str] ai_fact_check_result: Optional[Dict] = None def extract_url_text(url: str) -> str: """Extract and process meaningful text from URL path with improved cleaning""" logger.debug(f"Extracting text from URL: {url}") try: parsed = urllib.parse.urlparse(url) path = parsed.path path = path.replace('.html', '').replace('/index', '').replace('.php', '') segments = [seg for seg in path.split('/') if seg] cleaned_segments = [] for segment in segments: segment = segment.replace('-', ' ').replace('_', ' ') if not (segment.replace(' ', '').isdigit() or all(part.isdigit() for part in segment.split() if part)): cleaned_segments.append(segment) common_words = { 'www', 'live', 'news', 'intl', 'index', 'world', 'us', 'uk', 'updates', 'update', 'latest', 'breaking', 'new', 'article' } text = ' '.join(cleaned_segments) words = [word.lower() for word in text.split() if word.lower() not in common_words and len(word) > 1] result = ' '.join(words) logger.debug(f"Extracted text: {result}") return result except Exception as e: logger.error(f"Error extracting text from URL {url}: {str(e)}") return '' def extract_search_results(html_content): """Extract URLs using multiple selectors and patterns""" soup = BeautifulSoup(html_content, 'html.parser') urls = set() # Using set to avoid duplicates # Multiple CSS selectors to try selectors = [ 'div.g div.yuRUbf > a', # Main result links 'div.g a.l', # Alternative link format 'div.rc a', # Another possible format 'div[class*="g"] > div > div > div > a', # Broader match 'a[href^="http"]' # Any http link ] for selector in selectors: try: elements = soup.select(selector) for element in elements: url = element.get('href') if url and url.startswith('http') and not url.startswith('https://www.google.com'): urls.add(url) except Exception as e: logger.debug(f"Error with selector {selector}: {str(e)}") # Also try finding URLs in the raw HTML using regex url_pattern = r'href="(https?://[^"]+)"' raw_urls = re.findall(url_pattern, html_content) for url in raw_urls: if not url.startswith('https://www.google.com'): urls.add(url) return list(urls) def google_search_scraper(search_text: str, site_domain: str, retry_count: int = 0) -> List[str]: """Scrape Google search results with multiple query formats""" logger.info(f"Searching for '{search_text}' on domain: {site_domain}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://www.google.com/', 'DNT': '1' } # Try different query formats query_formats = [ f"{search_text} site:{site_domain}", f"site:{site_domain} {search_text}", f"\"{search_text}\" site:{site_domain}" ] all_urls = set() for query in query_formats: try: google_url = f"https://www.google.com/search?q={urlencode({'q': query})}" logger.debug(f"Trying query format: {query}") response = requests.get(google_url, headers=headers) if response.status_code == 200: urls = extract_search_results(response.text) domain_urls = [url for url in urls if site_domain in urlparse(url).netloc] all_urls.update(domain_urls) else: logger.warning(f"Received status code {response.status_code} for query format: {query}") sleep(2) # Delay between requests except Exception as e: logger.error(f"Error processing query format '{query}': {str(e)}") if retry_count < MAX_RETRIES: sleep(RETRY_DELAY) return google_search_scraper(search_text, site_domain, retry_count + 1) valid_urls = list(all_urls) logger.info(f"Found {len(valid_urls)} unique URLs for domain: {site_domain}") return valid_urls[:5] # Return up to 5 URLs def calculate_similarity(query_embedding: List[float], url_embedding: List[float]) -> float: """Calculate cosine similarity between two embeddings""" query_array = np.array(query_embedding) url_array = np.array(url_embedding) similarity = np.dot(url_array, query_array) / ( np.linalg.norm(url_array) * np.linalg.norm(query_array) ) return float(similarity) @scrap_websites_router.post("/search", response_model=SearchResponse) async def search_websites(request: SearchRequest): logger.info(f"Starting search with query: {request.search_text}") logger.info(f"Source types requested: {request.source_types}") results = {} error_messages = {} # Initialize OpenAI client logger.debug("Initializing OpenAI client") openai_client = OpenAIClient(OPENAI_API_KEY) # Get domains based on requested source types domains = [] for source_type in request.source_types: if source_type in SOURCES: domains.extend([source.domain for source in SOURCES[source_type]]) if not domains: logger.warning("No valid source types provided. Using all available domains.") domains = [source.domain for source in get_all_sources()] logger.info(f"Processing {len(domains)} domains") # Enhance search text with key terms search_context = request.search_text logger.debug("Getting query embedding from OpenAI") query_embedding = openai_client.get_embeddings([search_context])[0] # Higher similarity threshold for better filtering SIMILARITY_THRESHOLD = 0.75 MAX_URLS_PER_DOMAIN = 2 # Adjusted to ensure total stays under 5 TOTAL_MAX_URLS = 5 # Maximum URLs allowed for AIFactCheckRequest total_urls_collected = 0 for domain in domains[:3]: # Limit to 3 domains for testing if total_urls_collected >= TOTAL_MAX_URLS: break logger.info(f"Processing domain: {domain}") try: urls = google_search_scraper(request.search_text, domain) valid_urls = [] logger.debug(f"Found {len(urls)} URLs for domain {domain}") for url in urls: if len(valid_urls) >= MAX_URLS_PER_DOMAIN or total_urls_collected >= TOTAL_MAX_URLS: break url_text = extract_url_text(url) if not url_text: logger.debug(f"No meaningful text extracted from URL: {url}") continue logger.debug("Getting URL embedding from OpenAI") url_embedding = openai_client.get_embeddings([url_text])[0] similarity = calculate_similarity(query_embedding, url_embedding) logger.debug(f"Similarity score for {url}: {similarity}") if similarity >= SIMILARITY_THRESHOLD: valid_urls.append(url) total_urls_collected += 1 results[domain] = valid_urls logger.info(f"Successfully processed domain {domain}. Found {len(valid_urls)} valid URLs") except HTTPException as e: logger.error(f"HTTP Exception for domain {domain}: {str(e.detail)}") error_messages[domain] = str(e.detail) except Exception as e: logger.error(f"Unexpected error for domain {domain}: {str(e)}") error_messages[domain] = f"Unexpected error for {domain}: {str(e)}" sleep(1) # Add delay between processing different domains logger.info("Search completed") logger.debug(f"Results found for {len(results)} domains") logger.debug(f"Errors encountered for {len(error_messages)} domains") # Collect all valid URLs from results all_valid_urls = [] for domain_urls in results.values(): all_valid_urls.extend(domain_urls) logger.info(f"Total valid URLs collected: {len(all_valid_urls)}") # Create request body for AI fact check if all_valid_urls: fact_check_request = AIFactCheckRequest( content=request.search_text, urls=all_valid_urls[:TOTAL_MAX_URLS] # Ensure we don't exceed the limit ) logger.info("Calling AI fact check service") try: ai_response = await ai_fact_check(fact_check_request) logger.info("AI fact check completed successfully") # Format AI fact check response formatted_response = { "query": ai_response.query, "token_usage": { "prompt_tokens": ai_response.token_usage.prompt_tokens, "completion_tokens": ai_response.token_usage.completion_tokens, "total_tokens": ai_response.token_usage.total_tokens }, "sources": ai_response.sources, "verification_result": { url: { "verdict": result.verdict, "confidence": result.confidence, "evidence": result.evidence, "reasoning": result.reasoning, "missing_info": result.missing_info } for url, result in ai_response.verification_result.items() } } # Return response with AI fact check results return SearchResponse( results=results, error_messages=error_messages, ai_fact_check_result=formatted_response ) except Exception as e: logger.error(f"Error during AI fact check: {str(e)}") error_messages["ai_fact_check"] = f"Error during fact checking: {str(e)}" # Return response without AI fact check if no valid URLs or error occurred return SearchResponse( results=results, error_messages=error_messages, ai_fact_check_result=None )