diff --git a/app/__pycache__/config.cpython-312.pyc b/app/__pycache__/config.cpython-312.pyc index 91b0688..22a68c5 100644 Binary files a/app/__pycache__/config.cpython-312.pyc and b/app/__pycache__/config.cpython-312.pyc differ diff --git a/app/api/__pycache__/fact_check.cpython-312.pyc b/app/api/__pycache__/fact_check.cpython-312.pyc index 7304a38..98c2526 100644 Binary files a/app/api/__pycache__/fact_check.cpython-312.pyc and b/app/api/__pycache__/fact_check.cpython-312.pyc differ diff --git a/app/api/fact_check.py b/app/api/fact_check.py index c5f494b..432f0de 100644 --- a/app/api/fact_check.py +++ b/app/api/fact_check.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, HTTPException import json from datetime import datetime from typing import Dict, List - +import httpx from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL from app.models.fact_check_models import ( GoogleFactCheckRequest as FactCheckRequest, @@ -12,7 +12,6 @@ from app.models.fact_check_models import ( TokenUsage ) from app.websites.fact_checker_website import fetch_fact_checks, get_all_sources -from app.api.scrap_websites import SearchRequest, search_websites fact_check_router = APIRouter() @@ -22,6 +21,39 @@ class CustomJSONEncoder(json.JSONEncoder): return obj.isoformat() return super().default(obj) +async def validate_api_key(): + """Validate the Google API key with a test request""" + async with httpx.AsyncClient() as client: + try: + test_url = f"{GOOGLE_FACT_CHECK_BASE_URL}claims:search" + params = { + "key": GOOGLE_API_KEY, + "query": "test", + "languageCode": "en-US", + "pageSize": 1 + } + response = await client.get(test_url, params=params) + response.raise_for_status() + return True + except httpx.HTTPStatusError as e: + if e.response.status_code == 403: + raise HTTPException( + status_code=503, + detail=ErrorResponse( + detail="Invalid or expired API key", + error_code="INVALID_API_KEY", + path="/check-facts" + ).dict() + ) + raise HTTPException( + status_code=503, + detail=ErrorResponse( + detail=f"API validation failed: {str(e)}", + error_code="API_VALIDATION_ERROR", + path="/check-facts" + ).dict() + ) + @fact_check_router.post( "/check-facts", response_model=FactCheckResponse, @@ -34,7 +66,7 @@ class CustomJSONEncoder(json.JSONEncoder): ) async def check_facts(request: FactCheckRequest) -> FactCheckResponse: """ - Check facts using multiple fact-checking sources and fallback to web search + Check facts using multiple fact-checking sources """ all_results = [] verified_results = [] @@ -50,10 +82,14 @@ async def check_facts(request: FactCheckRequest) -> FactCheckResponse: ).dict() ) + # Validate API key before proceeding + await validate_api_key() + # Get all sources in priority order all_sources = get_all_sources() all_sources_list = [] # To store source URLs contexts_used = [] # To store context snippets + failed_sources = [] # Track failed sources for source in all_sources: try: @@ -78,75 +114,39 @@ async def check_facts(request: FactCheckRequest) -> FactCheckResponse: if "textualRating" in review: contexts_used.append(review["textualRating"]) - except HTTPException: + except HTTPException as http_err: + failed_sources.append({ + "source": source.domain, + "error": str(http_err.detail) + }) continue except Exception as e: - # Log the error but continue with other sources - print(f"Error processing {source.domain}: {str(e)}") + failed_sources.append({ + "source": source.domain, + "error": str(e) + }) continue - # If no results found, try searching websites - if not all_results: - try: - # Create search request - search_request = SearchRequest( - search_text=request.content, - source_types=["fact_checkers"] - ) - - # Perform website search - search_response = await search_websites(search_request) - - # If AI fact check results are available, use them - if search_response.ai_fact_check_result: - # Create a claim from AI fact check result - ai_claim = { - "text": request.content, - "claimant": "AI Analysis", - "claimDate": datetime.now().isoformat(), - "claimReview": [{ - "publisher": { - "name": "AI Fact Checker", - "site": "ai-fact-check" - }, - "textualRating": search_response.ai_fact_check_result.verification_result["verdict"], - "title": "AI Fact Check Analysis", - "reviewDate": datetime.now().isoformat(), - "url": "" - }] - } - - validated_claim = Claim(**ai_claim).dict() - all_results.append(validated_claim) - - # Add sources and contexts - all_sources_list.extend(search_response.results.keys()) - if search_response.ai_fact_check_result.verification_result["evidence"]: - contexts_used.extend(search_response.ai_fact_check_result.verification_result["evidence"]) - - except Exception as e: - print(f"Error during website search: {str(e)}") - - # If still no results found after searching websites - if not all_results: + # Return partial results if some sources failed but we have data + if all_results: + verification_result = { + "verdict": "Partial Results Available" if failed_sources else "Complete Results", + "confidence": "Medium" if failed_sources else "High", + "evidence": contexts_used, + "reasoning": "Based on available fact checks", + "missing_info": f"{len(failed_sources)} sources failed" if failed_sources else None + } + else: raise HTTPException( status_code=404, detail=ErrorResponse( - detail="No fact check results found", + detail="No fact check results found. Failed sources: " + + ", ".join([f"{f['source']}: {f['error']}" for f in failed_sources]), error_code="NO_RESULTS_FOUND", path="/check-facts" ).dict() ) - # Prepare the verification result - verification_result = { - "verdict": "Insufficient Information", # Default verdict - "confidence": "Low", - "evidence": contexts_used, - "reasoning": "Based on available fact checks and web search results", - "missing_info": "Additional verification may be needed" - } - # Create token usage information token_usage = TokenUsage( prompt_tokens=0, @@ -161,10 +161,12 @@ async def check_facts(request: FactCheckRequest) -> FactCheckResponse: results=all_results, verification_result=verification_result, sources=list(set(all_sources_list)), + context_used=contexts_used, token_usage=token_usage, summary={ "total_sources": len(set(all_sources_list)), - "fact_checking_sites_queried": len(all_sources) + "fact_checking_sites_queried": len(all_sources), + "failed_sources": failed_sources } ) diff --git a/app/api/scrap_websites.py b/app/api/scrap_websites.py index 93fb31a..0dd584c 100644 --- a/app/api/scrap_websites.py +++ b/app/api/scrap_websites.py @@ -1,309 +1,342 @@ from fastapi import APIRouter, HTTPException -from pydantic import BaseModel -from typing import List, Dict, Optional -from urllib.parse import urlencode, urlparse -import urllib.parse -import numpy as np -from time import sleep +import httpx import logging -import requests -from bs4 import BeautifulSoup -import re +from urllib.parse import urlparse +import json from app.services.openai_client import OpenAIClient -from app.config import OPENAI_API_KEY +from app.config import OPENAI_API_KEY, GOOGLE_API_KEY, GOOGLE_ENGINE_ID from app.websites.fact_checker_website import SOURCES, get_all_sources from app.api.ai_fact_check import ai_fact_check +from typing import List, Dict, Optional +from pydantic import BaseModel from app.models.fact_check_models import ( - AIFactCheckRequest, - AIFactCheckResponse, - VerificationResult, - TokenUsage + AIFactCheckRequest, + FactCheckSource, + SourceType ) +# Define Pydantic models +class Publisher(BaseModel): + name: str + site: str + +class ClaimReview(BaseModel): + publisher: Publisher + textualRating: str + +class Claim(BaseModel): + claimReview: List[ClaimReview] + claimant: str + text: str + +class Summary(BaseModel): + fact_checking_sites_queried: int + total_sources: int + +class VerificationResult(BaseModel): + verdict: str + confidence: str + evidence: List[str] + reasoning: str + fact_check_type: str + +class SearchRequest(BaseModel): + search_text: str + source_types: List[str] + +class EnhancedFactCheckResponse(BaseModel): + query: str + results: List[Dict] + sources: List + summary: Summary + token_usage: Dict[str, int] + total_claims_found: int + verification_result: VerificationResult + # Configure logging logging.basicConfig( - level=logging.INFO, + level=logging.INFO, # Changed back to INFO from DEBUG format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) scrap_websites_router = APIRouter() -# Configuration for scraping -MAX_RETRIES = 2 -RETRY_DELAY = 2 +# Constants +RESULTS_PER_PAGE = 10 +MAX_PAGES = 5 +MAX_URLS_PER_DOMAIN = 5 +GOOGLE_SEARCH_URL = "https://www.googleapis.com/customsearch/v1" -class SearchRequest(BaseModel): - search_text: str - source_types: List[str] = ["fact_checkers"] - -class UrlSimilarityInfo(BaseModel): - url: str - similarity: float - extracted_text: str - -class SearchResponse(BaseModel): - results: Dict[str, List[str]] - error_messages: Dict[str, str] - ai_fact_check_result: Optional[Dict] = None - -def extract_url_text(url: str) -> str: - """Extract and process meaningful text from URL path with improved cleaning""" - logger.debug(f"Extracting text from URL: {url}") +def get_domain_from_url(url: str) -> str: + """Extract domain from URL with improved handling.""" try: - parsed = urllib.parse.urlparse(url) - path = parsed.path - path = path.replace('.html', '').replace('/index', '').replace('.php', '') - segments = [seg for seg in path.split('/') if seg] - cleaned_segments = [] - for segment in segments: - segment = segment.replace('-', ' ').replace('_', ' ') - if not (segment.replace(' ', '').isdigit() or - all(part.isdigit() for part in segment.split() if part)): - cleaned_segments.append(segment) - - common_words = { - 'www', 'live', 'news', 'intl', 'index', 'world', 'us', 'uk', - 'updates', 'update', 'latest', 'breaking', 'new', 'article' - } - - text = ' '.join(cleaned_segments) - words = [word.lower() for word in text.split() - if word.lower() not in common_words and len(word) > 1] - - result = ' '.join(words) - logger.debug(f"Extracted text: {result}") - return result + parsed = urlparse(url) + domain = parsed.netloc.lower() + # Remove 'www.' if present + if domain.startswith('www.'): + domain = domain[4:] + return domain except Exception as e: - logger.error(f"Error extracting text from URL {url}: {str(e)}") - return '' + logger.error(f"Error extracting domain from URL {url}: {str(e)}") + return "" -def extract_search_results(html_content): - """Extract URLs using multiple selectors and patterns""" - soup = BeautifulSoup(html_content, 'html.parser') - urls = set() # Using set to avoid duplicates +def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool: + """Check if domain matches any source with improved matching logic.""" + if not domain: + return False - # Multiple CSS selectors to try - selectors = [ - 'div.g div.yuRUbf > a', # Main result links - 'div.g a.l', # Alternative link format - 'div.rc a', # Another possible format - 'div[class*="g"] > div > div > div > a', # Broader match - 'a[href^="http"]' # Any http link - ] + domain = domain.lower() + if domain.startswith('www.'): + domain = domain[4:] - for selector in selectors: - try: - elements = soup.select(selector) - for element in elements: - url = element.get('href') - if url and url.startswith('http') and not url.startswith('https://www.google.com'): - urls.add(url) - except Exception as e: - logger.debug(f"Error with selector {selector}: {str(e)}") + for source in sources: + source_domain = source.domain.lower() + if source_domain.startswith('www.'): + source_domain = source_domain[4:] + + # Check exact match + if domain == source_domain: + logger.debug(f"Exact domain match found: {domain} = {source_domain}") + return True + + # Check if domain ends with source domain + if domain.endswith('.' + source_domain): + logger.debug(f"Subdomain match found: {domain} ends with {source_domain}") + return True - # Also try finding URLs in the raw HTML using regex - url_pattern = r'href="(https?://[^"]+)"' - raw_urls = re.findall(url_pattern, html_content) - for url in raw_urls: - if not url.startswith('https://www.google.com'): - urls.add(url) - - return list(urls) + logger.debug(f"No match found for domain: {domain}") + return False -def google_search_scraper(search_text: str, site_domain: str, retry_count: int = 0) -> List[str]: - """Scrape Google search results with multiple query formats""" - logger.info(f"Searching for '{search_text}' on domain: {site_domain}") +async def build_enhanced_search_query(query: str, sources: List[FactCheckSource]) -> str: + """Build search query with site restrictions.""" + site_queries = [f"site:{source.domain}" for source in sources] + site_restriction = " OR ".join(site_queries) + enhanced_query = f"({query}) ({site_restriction})" + logger.debug(f"Enhanced search query: {enhanced_query}") + return enhanced_query + +async def google_custom_search(query: str, sources: List[FactCheckSource], page: int = 1) -> Optional[Dict]: + """Perform Google Custom Search with enhanced query.""" + enhanced_query = await build_enhanced_search_query(query, sources) + start_index = ((page - 1) * RESULTS_PER_PAGE) + 1 - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', - 'Accept-Language': 'en-US,en;q=0.5', - 'Referer': 'https://www.google.com/', - 'DNT': '1' + params = { + "key": GOOGLE_API_KEY, + "cx": GOOGLE_ENGINE_ID, + "q": enhanced_query, + "num": RESULTS_PER_PAGE, + "start": start_index } - # Try different query formats - query_formats = [ - f"{search_text} site:{site_domain}", - f"site:{site_domain} {search_text}", - f"\"{search_text}\" site:{site_domain}" - ] - - all_urls = set() - - for query in query_formats: + async with httpx.AsyncClient(timeout=30.0) as client: try: - google_url = f"https://www.google.com/search?q={urlencode({'q': query})}" - logger.debug(f"Trying query format: {query}") + logger.info(f"Making API request to Google Custom Search with params: {params}") + response = await client.get(GOOGLE_SEARCH_URL, params=params) + response.raise_for_status() - response = requests.get(google_url, headers=headers) + data = response.json() - if response.status_code == 200: - urls = extract_search_results(response.text) - domain_urls = [url for url in urls if site_domain in urlparse(url).netloc] - all_urls.update(domain_urls) - else: - logger.warning(f"Received status code {response.status_code} for query format: {query}") + search_info = data.get('searchInformation', {}) + logger.info(f"Search info: Total results: {search_info.get('totalResults', 0)}, " + f"Time taken: {search_info.get('searchTime', 0)}s") - sleep(2) # Delay between requests + if 'error' in data: + error_details = data['error'] + logger.error(f"API Error: {error_details}") + raise HTTPException( + status_code=response.status_code, + detail=f"Google API Error: {error_details.get('message')}" + ) + + return data except Exception as e: - logger.error(f"Error processing query format '{query}': {str(e)}") - if retry_count < MAX_RETRIES: - sleep(RETRY_DELAY) - return google_search_scraper(search_text, site_domain, retry_count + 1) + logger.error(f"Search error: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Search error: {str(e)}") + +async def analyze_fact_check_results(openai_client: OpenAIClient, original_response: Dict) -> Dict: + """Analyze fact check results using OpenAI to generate a consolidated verdict.""" - valid_urls = list(all_urls) - logger.info(f"Found {len(valid_urls)} unique URLs for domain: {site_domain}") - return valid_urls[:5] # Return up to 5 URLs - -def calculate_similarity(query_embedding: List[float], url_embedding: List[float]) -> float: - """Calculate cosine similarity between two embeddings""" - query_array = np.array(query_embedding) - url_array = np.array(url_embedding) + # Extract verification results from sources + verification_results = [] + for url, result in original_response.get('verification_result', {}).items(): + verification_results.append(f""" + Source: {url} + Verdict: {result.get('verdict')} + Confidence: {result.get('confidence')} + Evidence: {result.get('evidence')} + Reasoning: {result.get('reasoning')} + """) - similarity = np.dot(url_array, query_array) / ( - np.linalg.norm(url_array) * np.linalg.norm(query_array) - ) - return float(similarity) + system_prompt = """You are a professional fact-checking analyzer. Your task is to analyze multiple fact-checking results + and provide a consolidated verdict. Respond with a valid JSON object containing your analysis.""" + + user_prompt = f""" + Analyze these fact-checking results and provide a final verdict. + + Query: {original_response.get('query', '')} + + Fact Check Results: + {'\n'.join(verification_results)}""" + try: + logger.info("Generating AI analysis of fact check results") + response = await openai_client.generate_text_response( + system_prompt=system_prompt, + user_prompt=user_prompt, + max_tokens=2000 + ) + + # Create the enhanced result structure + enhanced_result = { + "query": original_response.get('query', ''), + "results": [ + { + "claimReview": [ + { + "publisher": { + "name": source, + "site": source + }, + "textualRating": result.get('verdict', '') + } for source in original_response.get('sources', []) + ], + "claimant": "source", + "text": original_response.get('query', '') + } + ], + "sources": original_response.get('sources', []), + "summary": { + "fact_checking_sites_queried": len(original_response.get('sources', [])), + "total_sources": len(original_response.get('verification_result', {})) + }, + "verification_result": { + "verdict": next(iter(original_response.get('verification_result', {}).values()), {}).get('verdict', ''), + "confidence": next(iter(original_response.get('verification_result', {}).values()), {}).get('confidence', ''), + "evidence": [next(iter(original_response.get('verification_result', {}).values()), {}).get('evidence', '')], + "reasoning": next(iter(original_response.get('verification_result', {}).values()), {}).get('reasoning', ''), + "fact_check_type": "ai fact checker" + }, + "token_usage": original_response.get('token_usage', { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0 + }) + } + + enhanced_result["total_claims_found"] = len(enhanced_result.get("results", [])) + + logger.info("Successfully generated AI analysis") + return enhanced_result -@scrap_websites_router.post("/search", response_model=SearchResponse) + except Exception as e: + logger.error(f"Error in OpenAI analysis: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error in fact check analysis: {str(e)}") +@scrap_websites_router.post("/search", response_model=EnhancedFactCheckResponse) async def search_websites(request: SearchRequest): logger.info(f"Starting search with query: {request.search_text}") logger.info(f"Source types requested: {request.source_types}") - results = {} - error_messages = {} - - # Initialize OpenAI client - logger.debug("Initializing OpenAI client") - openai_client = OpenAIClient(OPENAI_API_KEY) - - # Get domains based on requested source types - domains = [] + # Get sources for requested types + selected_sources = [] for source_type in request.source_types: if source_type in SOURCES: - domains.extend([source.domain for source in SOURCES[source_type]]) + selected_sources.extend(SOURCES[source_type]) - if not domains: - logger.warning("No valid source types provided. Using all available domains.") - domains = [source.domain for source in get_all_sources()] + if not selected_sources: + logger.warning("No valid source types provided. Using all available sources.") + selected_sources = get_all_sources() - logger.info(f"Processing {len(domains)} domains") + logger.info(f"Selected sources: {[source.domain for source in selected_sources]}") - # Enhance search text with key terms - search_context = request.search_text - logger.debug("Getting query embedding from OpenAI") - query_embedding = openai_client.get_embeddings([search_context])[0] + # Initialize collections for URLs + all_urls = [] + domain_results = {} - # Higher similarity threshold for better filtering - SIMILARITY_THRESHOLD = 0.75 - MAX_URLS_PER_DOMAIN = 2 # Adjusted to ensure total stays under 5 - TOTAL_MAX_URLS = 5 # Maximum URLs allowed for AIFactCheckRequest - - total_urls_collected = 0 - for domain in domains[:3]: # Limit to 3 domains for testing - if total_urls_collected >= TOTAL_MAX_URLS: - break + try: + # Search and collect URLs + for page in range(1, MAX_PAGES + 1): + if len(all_urls) >= 50: + logger.info("Reached maximum URL limit of 50") + break - logger.info(f"Processing domain: {domain}") - try: - urls = google_search_scraper(request.search_text, domain) - valid_urls = [] + logger.info(f"Fetching page {page} of search results") + search_response = await google_custom_search(request.search_text, selected_sources, page) - logger.debug(f"Found {len(urls)} URLs for domain {domain}") + if not search_response or not search_response.get("items"): + logger.warning(f"No results found on page {page}") + break - for url in urls: - if len(valid_urls) >= MAX_URLS_PER_DOMAIN or total_urls_collected >= TOTAL_MAX_URLS: - break - - url_text = extract_url_text(url) - - if not url_text: - logger.debug(f"No meaningful text extracted from URL: {url}") + for item in search_response.get("items", []): + url = item.get("link") + if not url: continue - logger.debug("Getting URL embedding from OpenAI") - url_embedding = openai_client.get_embeddings([url_text])[0] - similarity = calculate_similarity(query_embedding, url_embedding) + domain = get_domain_from_url(url) + logger.debug(f"Processing URL: {url} with domain: {domain}") - logger.debug(f"Similarity score for {url}: {similarity}") + if is_valid_source_domain(domain, selected_sources): + if domain not in domain_results: + domain_results[domain] = [] + + if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN: + domain_results[domain].append({ + "url": url, + "title": item.get("title", ""), + "snippet": item.get("snippet", "") + }) + all_urls.append(url) + else: + logger.debug(f"Skipping URL {url} - domain not in allowed list") - if similarity >= SIMILARITY_THRESHOLD: - valid_urls.append(url) - total_urls_collected += 1 - - results[domain] = valid_urls - logger.info(f"Successfully processed domain {domain}. Found {len(valid_urls)} valid URLs") - - except HTTPException as e: - logger.error(f"HTTP Exception for domain {domain}: {str(e.detail)}") - error_messages[domain] = str(e.detail) - except Exception as e: - logger.error(f"Unexpected error for domain {domain}: {str(e)}") - error_messages[domain] = f"Unexpected error for {domain}: {str(e)}" + if len(all_urls) >= 50: + break - sleep(1) # Add delay between processing different domains - - logger.info("Search completed") - logger.debug(f"Results found for {len(results)} domains") - logger.debug(f"Errors encountered for {len(error_messages)} domains") - - # Collect all valid URLs from results - all_valid_urls = [] - for domain_urls in results.values(): - all_valid_urls.extend(domain_urls) - - logger.info(f"Total valid URLs collected: {len(all_valid_urls)}") - - # Create request body for AI fact check - if all_valid_urls: + logger.info(f"Total URLs collected: {len(all_urls)}") + + if not all_urls: + return EnhancedFactCheckResponse( + query=request.search_text, + results=[], + sources=[], + summary=Summary( + fact_checking_sites_queried=len(selected_sources), + total_sources=0 + ), + token_usage={ + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0 + }, + total_claims_found=0, + verification_result=VerificationResult( + verdict="Insufficient Evidence", + confidence="Low", + evidence=["No relevant sources found"], + reasoning="No fact-checking sources were found for this claim", + fact_check_type="ai fact checker" + ) + ) + + # Perform fact check with collected URLs fact_check_request = AIFactCheckRequest( content=request.search_text, - urls=all_valid_urls[:TOTAL_MAX_URLS] # Ensure we don't exceed the limit + urls=all_urls[:5] # Limit to 5 URLs ) - logger.info("Calling AI fact check service") - try: - ai_response = await ai_fact_check(fact_check_request) - logger.info("AI fact check completed successfully") - - # Format AI fact check response - formatted_response = { - "query": ai_response.query, - "token_usage": { - "prompt_tokens": ai_response.token_usage.prompt_tokens, - "completion_tokens": ai_response.token_usage.completion_tokens, - "total_tokens": ai_response.token_usage.total_tokens - }, - "sources": ai_response.sources, - "verification_result": { - url: { - "verdict": result.verdict, - "confidence": result.confidence, - "evidence": result.evidence, - "reasoning": result.reasoning, - "missing_info": result.missing_info - } for url, result in ai_response.verification_result.items() - } - } - - # Return response with AI fact check results - return SearchResponse( - results=results, - error_messages=error_messages, - ai_fact_check_result=formatted_response - ) - - except Exception as e: - logger.error(f"Error during AI fact check: {str(e)}") - error_messages["ai_fact_check"] = f"Error during fact checking: {str(e)}" - - # Return response without AI fact check if no valid URLs or error occurred - return SearchResponse( - results=results, - error_messages=error_messages, - ai_fact_check_result=None - ) \ No newline at end of file + logger.info(f"Performing fact check with {len(fact_check_request.urls)} URLs") + fact_check_response = await ai_fact_check(fact_check_request) + + # Get enhanced analysis + openai_client = OpenAIClient(OPENAI_API_KEY) + enhanced_response = await analyze_fact_check_results( + openai_client, + fact_check_response.dict() + ) + + return EnhancedFactCheckResponse(**enhanced_response) + + except Exception as e: + logger.error(f"Error during search/fact-check process: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/app/config.py b/app/config.py index a13fd4d..8b60dd0 100644 --- a/app/config.py +++ b/app/config.py @@ -5,6 +5,7 @@ load_dotenv() GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] GOOGLE_FACT_CHECK_BASE_URL= os.environ["GOOGLE_FACT_CHECK_BASE_URL"] +GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"] OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] FRONTEND_URL = os.environ["FRONTEND_URL"] \ No newline at end of file diff --git a/app/models/__pycache__/fact_check_models.cpython-312.pyc b/app/models/__pycache__/fact_check_models.cpython-312.pyc index 6df8e42..694cd33 100644 Binary files a/app/models/__pycache__/fact_check_models.cpython-312.pyc and b/app/models/__pycache__/fact_check_models.cpython-312.pyc differ diff --git a/app/models/scrap_websites_models.py b/app/models/scrap_websites_models.py new file mode 100644 index 0000000..1c629c5 --- /dev/null +++ b/app/models/scrap_websites_models.py @@ -0,0 +1,43 @@ +from pydantic import BaseModel +from typing import List, Dict + +class SearchRequest(BaseModel): + search_text: str + source_types: List[str] = ["fact_checkers"] + +class Publisher(BaseModel): + name: str + site: str + +class ClaimReview(BaseModel): + publisher: Publisher + textualRating: str + +class Claim(BaseModel): + claimReview: List[ClaimReview] + claimant: str + text: str + +class Summary(BaseModel): + fact_checking_sites_queried: int + total_sources: int + +class TokenUsage(BaseModel): + prompt_tokens: int + completion_tokens: int + total_tokens: int + +class VerificationResult(BaseModel): + verdict: str + confidence: str + evidence: List[str] + reasoning: str + +class EnhancedFactCheckResponse(BaseModel): + query: str + results: List[Claim] + sources: List[str] + summary: Summary + token_usage: Dict[str, int] + total_claims_found: int + verification_result: VerificationResult \ No newline at end of file