Dev #1

Merged
utshodey merged 5 commits from dev into master 2024-12-17 11:33:44 +00:00
7 changed files with 390 additions and 311 deletions
Showing only changes of commit 790d58402a - Show all commits

View file

@ -2,7 +2,7 @@ from fastapi import APIRouter, HTTPException
import json
from datetime import datetime
from typing import Dict, List
import httpx
from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL
from app.models.fact_check_models import (
GoogleFactCheckRequest as FactCheckRequest,
@ -12,7 +12,6 @@ from app.models.fact_check_models import (
TokenUsage
)
from app.websites.fact_checker_website import fetch_fact_checks, get_all_sources
from app.api.scrap_websites import SearchRequest, search_websites
fact_check_router = APIRouter()
@ -22,6 +21,39 @@ class CustomJSONEncoder(json.JSONEncoder):
return obj.isoformat()
return super().default(obj)
async def validate_api_key():
"""Validate the Google API key with a test request"""
async with httpx.AsyncClient() as client:
try:
test_url = f"{GOOGLE_FACT_CHECK_BASE_URL}claims:search"
params = {
"key": GOOGLE_API_KEY,
"query": "test",
"languageCode": "en-US",
"pageSize": 1
}
response = await client.get(test_url, params=params)
response.raise_for_status()
return True
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
raise HTTPException(
status_code=503,
detail=ErrorResponse(
detail="Invalid or expired API key",
error_code="INVALID_API_KEY",
path="/check-facts"
).dict()
)
raise HTTPException(
status_code=503,
detail=ErrorResponse(
detail=f"API validation failed: {str(e)}",
error_code="API_VALIDATION_ERROR",
path="/check-facts"
).dict()
)
@fact_check_router.post(
"/check-facts",
response_model=FactCheckResponse,
@ -34,7 +66,7 @@ class CustomJSONEncoder(json.JSONEncoder):
)
async def check_facts(request: FactCheckRequest) -> FactCheckResponse:
"""
Check facts using multiple fact-checking sources and fallback to web search
Check facts using multiple fact-checking sources
"""
all_results = []
verified_results = []
@ -50,10 +82,14 @@ async def check_facts(request: FactCheckRequest) -> FactCheckResponse:
).dict()
)
# Validate API key before proceeding
await validate_api_key()
# Get all sources in priority order
all_sources = get_all_sources()
all_sources_list = [] # To store source URLs
contexts_used = [] # To store context snippets
failed_sources = [] # Track failed sources
for source in all_sources:
try:
@ -78,75 +114,39 @@ async def check_facts(request: FactCheckRequest) -> FactCheckResponse:
if "textualRating" in review:
contexts_used.append(review["textualRating"])
except HTTPException:
except HTTPException as http_err:
failed_sources.append({
"source": source.domain,
"error": str(http_err.detail)
})
continue
except Exception as e:
# Log the error but continue with other sources
print(f"Error processing {source.domain}: {str(e)}")
failed_sources.append({
"source": source.domain,
"error": str(e)
})
continue
# If no results found, try searching websites
if not all_results:
try:
# Create search request
search_request = SearchRequest(
search_text=request.content,
source_types=["fact_checkers"]
)
# Perform website search
search_response = await search_websites(search_request)
# If AI fact check results are available, use them
if search_response.ai_fact_check_result:
# Create a claim from AI fact check result
ai_claim = {
"text": request.content,
"claimant": "AI Analysis",
"claimDate": datetime.now().isoformat(),
"claimReview": [{
"publisher": {
"name": "AI Fact Checker",
"site": "ai-fact-check"
},
"textualRating": search_response.ai_fact_check_result.verification_result["verdict"],
"title": "AI Fact Check Analysis",
"reviewDate": datetime.now().isoformat(),
"url": ""
}]
}
validated_claim = Claim(**ai_claim).dict()
all_results.append(validated_claim)
# Add sources and contexts
all_sources_list.extend(search_response.results.keys())
if search_response.ai_fact_check_result.verification_result["evidence"]:
contexts_used.extend(search_response.ai_fact_check_result.verification_result["evidence"])
except Exception as e:
print(f"Error during website search: {str(e)}")
# If still no results found after searching websites
if not all_results:
# Return partial results if some sources failed but we have data
if all_results:
verification_result = {
"verdict": "Partial Results Available" if failed_sources else "Complete Results",
"confidence": "Medium" if failed_sources else "High",
"evidence": contexts_used,
"reasoning": "Based on available fact checks",
"missing_info": f"{len(failed_sources)} sources failed" if failed_sources else None
}
else:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
detail="No fact check results found",
detail="No fact check results found. Failed sources: " +
", ".join([f"{f['source']}: {f['error']}" for f in failed_sources]),
error_code="NO_RESULTS_FOUND",
path="/check-facts"
).dict()
)
# Prepare the verification result
verification_result = {
"verdict": "Insufficient Information", # Default verdict
"confidence": "Low",
"evidence": contexts_used,
"reasoning": "Based on available fact checks and web search results",
"missing_info": "Additional verification may be needed"
}
# Create token usage information
token_usage = TokenUsage(
prompt_tokens=0,
@ -161,10 +161,12 @@ async def check_facts(request: FactCheckRequest) -> FactCheckResponse:
results=all_results,
verification_result=verification_result,
sources=list(set(all_sources_list)),
context_used=contexts_used,
token_usage=token_usage,
summary={
"total_sources": len(set(all_sources_list)),
"fact_checking_sites_queried": len(all_sources)
"fact_checking_sites_queried": len(all_sources),
"failed_sources": failed_sources
}
)

View file

@ -1,309 +1,342 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Optional
from urllib.parse import urlencode, urlparse
import urllib.parse
import numpy as np
from time import sleep
import httpx
import logging
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urlparse
import json
from app.services.openai_client import OpenAIClient
from app.config import OPENAI_API_KEY
from app.config import OPENAI_API_KEY, GOOGLE_API_KEY, GOOGLE_ENGINE_ID
from app.websites.fact_checker_website import SOURCES, get_all_sources
from app.api.ai_fact_check import ai_fact_check
from typing import List, Dict, Optional
from pydantic import BaseModel
from app.models.fact_check_models import (
AIFactCheckRequest,
AIFactCheckResponse,
VerificationResult,
TokenUsage
FactCheckSource,
SourceType
)
# Define Pydantic models
class Publisher(BaseModel):
name: str
site: str
class ClaimReview(BaseModel):
publisher: Publisher
textualRating: str
class Claim(BaseModel):
claimReview: List[ClaimReview]
claimant: str
text: str
class Summary(BaseModel):
fact_checking_sites_queried: int
total_sources: int
class VerificationResult(BaseModel):
verdict: str
confidence: str
evidence: List[str]
reasoning: str
fact_check_type: str
class SearchRequest(BaseModel):
search_text: str
source_types: List[str]
class EnhancedFactCheckResponse(BaseModel):
query: str
results: List[Dict]
sources: List
summary: Summary
token_usage: Dict[str, int]
total_claims_found: int
verification_result: VerificationResult
# Configure logging
logging.basicConfig(
level=logging.INFO,
level=logging.INFO, # Changed back to INFO from DEBUG
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
scrap_websites_router = APIRouter()
# Configuration for scraping
MAX_RETRIES = 2
RETRY_DELAY = 2
# Constants
RESULTS_PER_PAGE = 10
MAX_PAGES = 5
MAX_URLS_PER_DOMAIN = 5
GOOGLE_SEARCH_URL = "https://www.googleapis.com/customsearch/v1"
class SearchRequest(BaseModel):
search_text: str
source_types: List[str] = ["fact_checkers"]
class UrlSimilarityInfo(BaseModel):
url: str
similarity: float
extracted_text: str
class SearchResponse(BaseModel):
results: Dict[str, List[str]]
error_messages: Dict[str, str]
ai_fact_check_result: Optional[Dict] = None
def extract_url_text(url: str) -> str:
"""Extract and process meaningful text from URL path with improved cleaning"""
logger.debug(f"Extracting text from URL: {url}")
def get_domain_from_url(url: str) -> str:
"""Extract domain from URL with improved handling."""
try:
parsed = urllib.parse.urlparse(url)
path = parsed.path
path = path.replace('.html', '').replace('/index', '').replace('.php', '')
segments = [seg for seg in path.split('/') if seg]
cleaned_segments = []
for segment in segments:
segment = segment.replace('-', ' ').replace('_', ' ')
if not (segment.replace(' ', '').isdigit() or
all(part.isdigit() for part in segment.split() if part)):
cleaned_segments.append(segment)
common_words = {
'www', 'live', 'news', 'intl', 'index', 'world', 'us', 'uk',
'updates', 'update', 'latest', 'breaking', 'new', 'article'
}
text = ' '.join(cleaned_segments)
words = [word.lower() for word in text.split()
if word.lower() not in common_words and len(word) > 1]
result = ' '.join(words)
logger.debug(f"Extracted text: {result}")
return result
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove 'www.' if present
if domain.startswith('www.'):
domain = domain[4:]
return domain
except Exception as e:
logger.error(f"Error extracting text from URL {url}: {str(e)}")
return ''
logger.error(f"Error extracting domain from URL {url}: {str(e)}")
return ""
def extract_search_results(html_content):
"""Extract URLs using multiple selectors and patterns"""
soup = BeautifulSoup(html_content, 'html.parser')
urls = set() # Using set to avoid duplicates
def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool:
"""Check if domain matches any source with improved matching logic."""
if not domain:
return False
# Multiple CSS selectors to try
selectors = [
'div.g div.yuRUbf > a', # Main result links
'div.g a.l', # Alternative link format
'div.rc a', # Another possible format
'div[class*="g"] > div > div > div > a', # Broader match
'a[href^="http"]' # Any http link
]
domain = domain.lower()
if domain.startswith('www.'):
domain = domain[4:]
for selector in selectors:
try:
elements = soup.select(selector)
for element in elements:
url = element.get('href')
if url and url.startswith('http') and not url.startswith('https://www.google.com'):
urls.add(url)
except Exception as e:
logger.debug(f"Error with selector {selector}: {str(e)}")
for source in sources:
source_domain = source.domain.lower()
if source_domain.startswith('www.'):
source_domain = source_domain[4:]
# Also try finding URLs in the raw HTML using regex
url_pattern = r'href="(https?://[^"]+)"'
raw_urls = re.findall(url_pattern, html_content)
for url in raw_urls:
if not url.startswith('https://www.google.com'):
urls.add(url)
# Check exact match
if domain == source_domain:
logger.debug(f"Exact domain match found: {domain} = {source_domain}")
return True
return list(urls)
# Check if domain ends with source domain
if domain.endswith('.' + source_domain):
logger.debug(f"Subdomain match found: {domain} ends with {source_domain}")
return True
def google_search_scraper(search_text: str, site_domain: str, retry_count: int = 0) -> List[str]:
"""Scrape Google search results with multiple query formats"""
logger.info(f"Searching for '{search_text}' on domain: {site_domain}")
logger.debug(f"No match found for domain: {domain}")
return False
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1'
async def build_enhanced_search_query(query: str, sources: List[FactCheckSource]) -> str:
"""Build search query with site restrictions."""
site_queries = [f"site:{source.domain}" for source in sources]
site_restriction = " OR ".join(site_queries)
enhanced_query = f"({query}) ({site_restriction})"
logger.debug(f"Enhanced search query: {enhanced_query}")
return enhanced_query
async def google_custom_search(query: str, sources: List[FactCheckSource], page: int = 1) -> Optional[Dict]:
"""Perform Google Custom Search with enhanced query."""
enhanced_query = await build_enhanced_search_query(query, sources)
start_index = ((page - 1) * RESULTS_PER_PAGE) + 1
params = {
"key": GOOGLE_API_KEY,
"cx": GOOGLE_ENGINE_ID,
"q": enhanced_query,
"num": RESULTS_PER_PAGE,
"start": start_index
}
# Try different query formats
query_formats = [
f"{search_text} site:{site_domain}",
f"site:{site_domain} {search_text}",
f"\"{search_text}\" site:{site_domain}"
]
all_urls = set()
for query in query_formats:
async with httpx.AsyncClient(timeout=30.0) as client:
try:
google_url = f"https://www.google.com/search?q={urlencode({'q': query})}"
logger.debug(f"Trying query format: {query}")
logger.info(f"Making API request to Google Custom Search with params: {params}")
response = await client.get(GOOGLE_SEARCH_URL, params=params)
response.raise_for_status()
response = requests.get(google_url, headers=headers)
data = response.json()
if response.status_code == 200:
urls = extract_search_results(response.text)
domain_urls = [url for url in urls if site_domain in urlparse(url).netloc]
all_urls.update(domain_urls)
else:
logger.warning(f"Received status code {response.status_code} for query format: {query}")
search_info = data.get('searchInformation', {})
logger.info(f"Search info: Total results: {search_info.get('totalResults', 0)}, "
f"Time taken: {search_info.get('searchTime', 0)}s")
sleep(2) # Delay between requests
if 'error' in data:
error_details = data['error']
logger.error(f"API Error: {error_details}")
raise HTTPException(
status_code=response.status_code,
detail=f"Google API Error: {error_details.get('message')}"
)
return data
except Exception as e:
logger.error(f"Error processing query format '{query}': {str(e)}")
if retry_count < MAX_RETRIES:
sleep(RETRY_DELAY)
return google_search_scraper(search_text, site_domain, retry_count + 1)
logger.error(f"Search error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
valid_urls = list(all_urls)
logger.info(f"Found {len(valid_urls)} unique URLs for domain: {site_domain}")
return valid_urls[:5] # Return up to 5 URLs
async def analyze_fact_check_results(openai_client: OpenAIClient, original_response: Dict) -> Dict:
"""Analyze fact check results using OpenAI to generate a consolidated verdict."""
def calculate_similarity(query_embedding: List[float], url_embedding: List[float]) -> float:
"""Calculate cosine similarity between two embeddings"""
query_array = np.array(query_embedding)
url_array = np.array(url_embedding)
# Extract verification results from sources
verification_results = []
for url, result in original_response.get('verification_result', {}).items():
verification_results.append(f"""
Source: {url}
Verdict: {result.get('verdict')}
Confidence: {result.get('confidence')}
Evidence: {result.get('evidence')}
Reasoning: {result.get('reasoning')}
""")
similarity = np.dot(url_array, query_array) / (
np.linalg.norm(url_array) * np.linalg.norm(query_array)
)
return float(similarity)
system_prompt = """You are a professional fact-checking analyzer. Your task is to analyze multiple fact-checking results
and provide a consolidated verdict. Respond with a valid JSON object containing your analysis."""
user_prompt = f"""
Analyze these fact-checking results and provide a final verdict.
@scrap_websites_router.post("/search", response_model=SearchResponse)
Query: {original_response.get('query', '')}
Fact Check Results:
{'\n'.join(verification_results)}"""
try:
logger.info("Generating AI analysis of fact check results")
response = await openai_client.generate_text_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
max_tokens=2000
)
# Create the enhanced result structure
enhanced_result = {
"query": original_response.get('query', ''),
"results": [
{
"claimReview": [
{
"publisher": {
"name": source,
"site": source
},
"textualRating": result.get('verdict', '')
} for source in original_response.get('sources', [])
],
"claimant": "source",
"text": original_response.get('query', '')
}
],
"sources": original_response.get('sources', []),
"summary": {
"fact_checking_sites_queried": len(original_response.get('sources', [])),
"total_sources": len(original_response.get('verification_result', {}))
},
"verification_result": {
"verdict": next(iter(original_response.get('verification_result', {}).values()), {}).get('verdict', ''),
"confidence": next(iter(original_response.get('verification_result', {}).values()), {}).get('confidence', ''),
"evidence": [next(iter(original_response.get('verification_result', {}).values()), {}).get('evidence', '')],
"reasoning": next(iter(original_response.get('verification_result', {}).values()), {}).get('reasoning', ''),
"fact_check_type": "ai fact checker"
},
"token_usage": original_response.get('token_usage', {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
})
}
enhanced_result["total_claims_found"] = len(enhanced_result.get("results", []))
logger.info("Successfully generated AI analysis")
return enhanced_result
except Exception as e:
logger.error(f"Error in OpenAI analysis: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error in fact check analysis: {str(e)}")
@scrap_websites_router.post("/search", response_model=EnhancedFactCheckResponse)
async def search_websites(request: SearchRequest):
logger.info(f"Starting search with query: {request.search_text}")
logger.info(f"Source types requested: {request.source_types}")
results = {}
error_messages = {}
# Initialize OpenAI client
logger.debug("Initializing OpenAI client")
openai_client = OpenAIClient(OPENAI_API_KEY)
# Get domains based on requested source types
domains = []
# Get sources for requested types
selected_sources = []
for source_type in request.source_types:
if source_type in SOURCES:
domains.extend([source.domain for source in SOURCES[source_type]])
selected_sources.extend(SOURCES[source_type])
if not domains:
logger.warning("No valid source types provided. Using all available domains.")
domains = [source.domain for source in get_all_sources()]
if not selected_sources:
logger.warning("No valid source types provided. Using all available sources.")
selected_sources = get_all_sources()
logger.info(f"Processing {len(domains)} domains")
logger.info(f"Selected sources: {[source.domain for source in selected_sources]}")
# Enhance search text with key terms
search_context = request.search_text
logger.debug("Getting query embedding from OpenAI")
query_embedding = openai_client.get_embeddings([search_context])[0]
# Initialize collections for URLs
all_urls = []
domain_results = {}
# Higher similarity threshold for better filtering
SIMILARITY_THRESHOLD = 0.75
MAX_URLS_PER_DOMAIN = 2 # Adjusted to ensure total stays under 5
TOTAL_MAX_URLS = 5 # Maximum URLs allowed for AIFactCheckRequest
try:
# Search and collect URLs
for page in range(1, MAX_PAGES + 1):
if len(all_urls) >= 50:
logger.info("Reached maximum URL limit of 50")
break
total_urls_collected = 0
for domain in domains[:3]: # Limit to 3 domains for testing
if total_urls_collected >= TOTAL_MAX_URLS:
break
logger.info(f"Fetching page {page} of search results")
search_response = await google_custom_search(request.search_text, selected_sources, page)
logger.info(f"Processing domain: {domain}")
try:
urls = google_search_scraper(request.search_text, domain)
valid_urls = []
if not search_response or not search_response.get("items"):
logger.warning(f"No results found on page {page}")
break
logger.debug(f"Found {len(urls)} URLs for domain {domain}")
for url in urls:
if len(valid_urls) >= MAX_URLS_PER_DOMAIN or total_urls_collected >= TOTAL_MAX_URLS:
break
url_text = extract_url_text(url)
if not url_text:
logger.debug(f"No meaningful text extracted from URL: {url}")
for item in search_response.get("items", []):
url = item.get("link")
if not url:
continue
logger.debug("Getting URL embedding from OpenAI")
url_embedding = openai_client.get_embeddings([url_text])[0]
similarity = calculate_similarity(query_embedding, url_embedding)
domain = get_domain_from_url(url)
logger.debug(f"Processing URL: {url} with domain: {domain}")
logger.debug(f"Similarity score for {url}: {similarity}")
if is_valid_source_domain(domain, selected_sources):
if domain not in domain_results:
domain_results[domain] = []
if similarity >= SIMILARITY_THRESHOLD:
valid_urls.append(url)
total_urls_collected += 1
if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN:
domain_results[domain].append({
"url": url,
"title": item.get("title", ""),
"snippet": item.get("snippet", "")
})
all_urls.append(url)
else:
logger.debug(f"Skipping URL {url} - domain not in allowed list")
results[domain] = valid_urls
logger.info(f"Successfully processed domain {domain}. Found {len(valid_urls)} valid URLs")
if len(all_urls) >= 50:
break
except HTTPException as e:
logger.error(f"HTTP Exception for domain {domain}: {str(e.detail)}")
error_messages[domain] = str(e.detail)
except Exception as e:
logger.error(f"Unexpected error for domain {domain}: {str(e)}")
error_messages[domain] = f"Unexpected error for {domain}: {str(e)}"
logger.info(f"Total URLs collected: {len(all_urls)}")
sleep(1) # Add delay between processing different domains
logger.info("Search completed")
logger.debug(f"Results found for {len(results)} domains")
logger.debug(f"Errors encountered for {len(error_messages)} domains")
# Collect all valid URLs from results
all_valid_urls = []
for domain_urls in results.values():
all_valid_urls.extend(domain_urls)
logger.info(f"Total valid URLs collected: {len(all_valid_urls)}")
# Create request body for AI fact check
if all_valid_urls:
fact_check_request = AIFactCheckRequest(
content=request.search_text,
urls=all_valid_urls[:TOTAL_MAX_URLS] # Ensure we don't exceed the limit
)
logger.info("Calling AI fact check service")
try:
ai_response = await ai_fact_check(fact_check_request)
logger.info("AI fact check completed successfully")
# Format AI fact check response
formatted_response = {
"query": ai_response.query,
"token_usage": {
"prompt_tokens": ai_response.token_usage.prompt_tokens,
"completion_tokens": ai_response.token_usage.completion_tokens,
"total_tokens": ai_response.token_usage.total_tokens
if not all_urls:
return EnhancedFactCheckResponse(
query=request.search_text,
results=[],
sources=[],
summary=Summary(
fact_checking_sites_queried=len(selected_sources),
total_sources=0
),
token_usage={
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
},
"sources": ai_response.sources,
"verification_result": {
url: {
"verdict": result.verdict,
"confidence": result.confidence,
"evidence": result.evidence,
"reasoning": result.reasoning,
"missing_info": result.missing_info
} for url, result in ai_response.verification_result.items()
}
}
# Return response with AI fact check results
return SearchResponse(
results=results,
error_messages=error_messages,
ai_fact_check_result=formatted_response
total_claims_found=0,
verification_result=VerificationResult(
verdict="Insufficient Evidence",
confidence="Low",
evidence=["No relevant sources found"],
reasoning="No fact-checking sources were found for this claim",
fact_check_type="ai fact checker"
)
)
except Exception as e:
logger.error(f"Error during AI fact check: {str(e)}")
error_messages["ai_fact_check"] = f"Error during fact checking: {str(e)}"
# Perform fact check with collected URLs
fact_check_request = AIFactCheckRequest(
content=request.search_text,
urls=all_urls[:5] # Limit to 5 URLs
)
# Return response without AI fact check if no valid URLs or error occurred
return SearchResponse(
results=results,
error_messages=error_messages,
ai_fact_check_result=None
)
logger.info(f"Performing fact check with {len(fact_check_request.urls)} URLs")
fact_check_response = await ai_fact_check(fact_check_request)
# Get enhanced analysis
openai_client = OpenAIClient(OPENAI_API_KEY)
enhanced_response = await analyze_fact_check_results(
openai_client,
fact_check_response.dict()
)
return EnhancedFactCheckResponse(**enhanced_response)
except Exception as e:
logger.error(f"Error during search/fact-check process: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))

View file

@ -5,6 +5,7 @@ load_dotenv()
GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
GOOGLE_FACT_CHECK_BASE_URL= os.environ["GOOGLE_FACT_CHECK_BASE_URL"]
GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
FRONTEND_URL = os.environ["FRONTEND_URL"]

View file

@ -0,0 +1,43 @@
from pydantic import BaseModel
from typing import List, Dict
class SearchRequest(BaseModel):
search_text: str
source_types: List[str] = ["fact_checkers"]
class Publisher(BaseModel):
name: str
site: str
class ClaimReview(BaseModel):
publisher: Publisher
textualRating: str
class Claim(BaseModel):
claimReview: List[ClaimReview]
claimant: str
text: str
class Summary(BaseModel):
fact_checking_sites_queried: int
total_sources: int
class TokenUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class VerificationResult(BaseModel):
verdict: str
confidence: str
evidence: List[str]
reasoning: str
class EnhancedFactCheckResponse(BaseModel):
query: str
results: List[Claim]
sources: List[str]
summary: Summary
token_usage: Dict[str, int]
total_claims_found: int
verification_result: VerificationResult