added cicd modified

This commit is contained in:
Utsho Dey 2024-12-17 18:34:04 +06:00
parent 9c15f7a59c
commit 019e07e1b9
16 changed files with 481 additions and 430 deletions

View file

@ -11,7 +11,6 @@ cache:
stages: stages:
- setup - setup
- lint
- test - test
before_script: before_script:
@ -29,14 +28,6 @@ setup:
- venv/ - venv/
expire_in: 1 hour expire_in: 1 hour
lint:
stage: lint
needs:
- setup
script:
- black --check app/ main.py tests/
- flake8 app/ main.py tests/ --max-line-length=100
test: test:
stage: test stage: test
needs: needs:
@ -47,7 +38,7 @@ test:
# Start FastAPI server # Start FastAPI server
- uvicorn main:app --host 0.0.0.0 --port 8000 & - uvicorn main:app --host 0.0.0.0 --port 8000 &
# Wait for server to start # Wait for server to start
- sleep 10 - sleep 15
# Test health endpoint # Test health endpoint
- | - |
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/health) RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/health)

View file

@ -6,7 +6,7 @@ from app.models.ai_fact_check_models import (
AIFactCheckResponse, AIFactCheckResponse,
VerificationResult, VerificationResult,
TokenUsage, TokenUsage,
ErrorResponse ErrorResponse,
) )
from urllib.parse import urlparse from urllib.parse import urlparse
import asyncio import asyncio
@ -16,13 +16,11 @@ aifact_check_router = APIRouter()
openai_client = OpenAIClient(api_key=OPENAI_API_KEY) openai_client = OpenAIClient(api_key=OPENAI_API_KEY)
fact_checker = AIFactChecker(openai_client=openai_client) fact_checker = AIFactChecker(openai_client=openai_client)
@aifact_check_router.post( @aifact_check_router.post(
"/aicheck-facts", "/aicheck-facts",
response_model=AIFactCheckResponse, response_model=AIFactCheckResponse,
responses={ responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}},
400: {"model": ErrorResponse},
500: {"model": ErrorResponse}
}
) )
async def ai_fact_check(request: AIFactCheckRequest): async def ai_fact_check(request: AIFactCheckRequest):
""" """
@ -40,14 +38,14 @@ async def ai_fact_check(request: AIFactCheckRequest):
total_prompt_tokens = 0 total_prompt_tokens = 0
total_completion_tokens = 0 total_completion_tokens = 0
total_tokens = 0 total_tokens = 0
# Process all URLs concurrently # Process all URLs concurrently
tasks = [ tasks = [
fact_checker.check_fact(url=url, query=request.content) fact_checker.check_fact(url=url, query=request.content)
for url in request.urls for url in request.urls
] ]
fact_check_results = await asyncio.gather(*tasks, return_exceptions=True) fact_check_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results # Process results
for url, result in zip(request.urls, fact_check_results): for url, result in zip(request.urls, fact_check_results):
if isinstance(result, Exception): if isinstance(result, Exception):
@ -57,21 +55,21 @@ async def ai_fact_check(request: AIFactCheckRequest):
confidence="Low", confidence="Low",
evidence=f"Error checking URL: {str(result)}", evidence=f"Error checking URL: {str(result)}",
reasoning="URL processing failed", reasoning="URL processing failed",
missing_info="Could not access or process the URL" missing_info="Could not access or process the URL",
) )
continue continue
verification_result = VerificationResult( verification_result = VerificationResult(
verdict=result["verification_result"]["verdict"], verdict=result["verification_result"]["verdict"],
confidence=result["verification_result"]["confidence"], confidence=result["verification_result"]["confidence"],
evidence=result["verification_result"]["evidence"], evidence=result["verification_result"]["evidence"],
reasoning=result["verification_result"]["reasoning"], reasoning=result["verification_result"]["reasoning"],
missing_info=result["verification_result"].get("missing_info", None) missing_info=result["verification_result"].get("missing_info", None),
) )
results[url] = verification_result results[url] = verification_result
all_sources.update(result["sources"]) all_sources.update(result["sources"])
# Accumulate token usage # Accumulate token usage
total_prompt_tokens += result["token_usage"]["prompt_tokens"] total_prompt_tokens += result["token_usage"]["prompt_tokens"]
total_completion_tokens += result["token_usage"]["completion_tokens"] total_completion_tokens += result["token_usage"]["completion_tokens"]
@ -80,24 +78,22 @@ async def ai_fact_check(request: AIFactCheckRequest):
token_usage = TokenUsage( token_usage = TokenUsage(
prompt_tokens=total_prompt_tokens, prompt_tokens=total_prompt_tokens,
completion_tokens=total_completion_tokens, completion_tokens=total_completion_tokens,
total_tokens=total_tokens total_tokens=total_tokens,
) )
return AIFactCheckResponse( return AIFactCheckResponse(
query=request.content, query=request.content,
verification_result=results, verification_result=results,
sources=list(all_sources), sources=list(all_sources),
token_usage=token_usage token_usage=token_usage,
) )
except ValueError as e: except ValueError as e:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail=ErrorResponse( detail=ErrorResponse(
detail=str(e), detail=str(e), error_code="INVALID_URL", path="/aicheck-facts"
error_code="INVALID_URL", ).dict(),
path="/aicheck-facts"
).dict()
) )
except Exception as e: except Exception as e:
raise HTTPException( raise HTTPException(
@ -105,6 +101,6 @@ async def ai_fact_check(request: AIFactCheckRequest):
detail=ErrorResponse( detail=ErrorResponse(
detail=f"Error processing fact-check request: {str(e)}", detail=f"Error processing fact-check request: {str(e)}",
error_code="PROCESSING_ERROR", error_code="PROCESSING_ERROR",
path="/aicheck-facts" path="/aicheck-facts",
).dict() ).dict(),
) )

View file

@ -4,16 +4,17 @@ from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL, OPENAI_API_KE
from app.api.scrap_websites import search_websites, SearchRequest from app.api.scrap_websites import search_websites, SearchRequest
from app.services.openai_client import OpenAIClient from app.services.openai_client import OpenAIClient
from app.models.fact_check_models import ( from app.models.fact_check_models import (
FactCheckRequest, FactCheckRequest,
FactCheckResponse, FactCheckResponse,
ErrorResponse, ErrorResponse,
Source Source,
) )
from app.websites.fact_checker_website import get_all_sources from app.websites.fact_checker_website import get_all_sources
fact_check_router = APIRouter() fact_check_router = APIRouter()
openai_client = OpenAIClient(OPENAI_API_KEY) openai_client = OpenAIClient(OPENAI_API_KEY)
async def generate_fact_report(query: str, fact_check_data: dict) -> FactCheckResponse: async def generate_fact_report(query: str, fact_check_data: dict) -> FactCheckResponse:
"""Generate a fact check report using OpenAI based on the fact check results.""" """Generate a fact check report using OpenAI based on the fact check results."""
try: try:
@ -55,7 +56,7 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
2. Specify verification dates when available 2. Specify verification dates when available
3. Name the fact-checking organizations involved 3. Name the fact-checking organizations involved
4. Describe the verification process""" 4. Describe the verification process"""
else: else:
system_prompt = base_system_prompt system_prompt = base_system_prompt
user_prompt = f"""Query: {query} user_prompt = f"""Query: {query}
@ -70,37 +71,34 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
4. Note any conflicting information between sources""" 4. Note any conflicting information between sources"""
response = await openai_client.generate_text_response( response = await openai_client.generate_text_response(
system_prompt=system_prompt, system_prompt=system_prompt, user_prompt=user_prompt, max_tokens=1000
user_prompt=user_prompt,
max_tokens=1000
) )
try: try:
# First try to parse the response directly # First try to parse the response directly
response_data = response["response"] response_data = response["response"]
# Clean up sources before validation # Clean up sources before validation
if isinstance(response_data.get('sources'), list): if isinstance(response_data.get("sources"), list):
cleaned_sources = [] cleaned_sources = []
for source in response_data['sources']: for source in response_data["sources"]:
if isinstance(source, str): if isinstance(source, str):
# Convert string sources to Source objects # Convert string sources to Source objects
url = source if source.startswith('http') else f"https://{source}" url = (
cleaned_sources.append({ source if source.startswith("http") else f"https://{source}"
"url": url, )
"name": source cleaned_sources.append({"url": url, "name": source})
})
elif isinstance(source, dict): elif isinstance(source, dict):
# Ensure URL has proper scheme # Ensure URL has proper scheme
url = source.get('url', '') url = source.get("url", "")
if url and not url.startswith('http'): if url and not url.startswith("http"):
source['url'] = f"https://{url}" source["url"] = f"https://{url}"
cleaned_sources.append(source) cleaned_sources.append(source)
response_data['sources'] = cleaned_sources response_data["sources"] = cleaned_sources
fact_check_response = FactCheckResponse(**response_data) fact_check_response = FactCheckResponse(**response_data)
return fact_check_response return fact_check_response
except Exception as validation_error: except Exception as validation_error:
print(f"Response validation error: {str(validation_error)}") print(f"Response validation error: {str(validation_error)}")
raise HTTPException( raise HTTPException(
@ -108,10 +106,10 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
detail=ErrorResponse( detail=ErrorResponse(
detail=f"Invalid response format: {str(validation_error)}", detail=f"Invalid response format: {str(validation_error)}",
error_code="VALIDATION_ERROR", error_code="VALIDATION_ERROR",
path="/check-facts" path="/check-facts",
).dict() ).dict(),
) )
except Exception as e: except Exception as e:
print(f"Error generating fact report: {str(e)}") print(f"Error generating fact report: {str(e)}")
raise HTTPException( raise HTTPException(
@ -119,10 +117,11 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
detail=ErrorResponse( detail=ErrorResponse(
detail="Error generating fact report", detail="Error generating fact report",
error_code="FACT_CHECK_ERROR", error_code="FACT_CHECK_ERROR",
path="/check-facts" path="/check-facts",
).dict() ).dict(),
) )
@fact_check_router.post("/check-facts", response_model=FactCheckResponse) @fact_check_router.post("/check-facts", response_model=FactCheckResponse)
async def check_facts(request: FactCheckRequest): async def check_facts(request: FactCheckRequest):
""" """
@ -134,52 +133,49 @@ async def check_facts(request: FactCheckRequest):
detail=ErrorResponse( detail=ErrorResponse(
detail="Google API key or base URL is not configured", detail="Google API key or base URL is not configured",
error_code="CONFIGURATION_ERROR", error_code="CONFIGURATION_ERROR",
path="/check-facts" path="/check-facts",
).dict() ).dict(),
) )
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
# Get fact checker sources from the centralized configuration # Get fact checker sources from the centralized configuration
fact_checker_sources = get_all_sources() fact_checker_sources = get_all_sources()
for source in fact_checker_sources: for source in fact_checker_sources:
params = { params = {
"key": GOOGLE_API_KEY, "key": GOOGLE_API_KEY,
"query": request.query, "query": request.query,
"languageCode": "en-US", "languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain, "reviewPublisherSiteFilter": source.domain,
"pageSize": 10 "pageSize": 10,
} }
try: try:
response = await client.get( response = await client.get(
GOOGLE_FACT_CHECK_BASE_URL, GOOGLE_FACT_CHECK_BASE_URL, params=params, headers=headers
params=params,
headers=headers
) )
response.raise_for_status() response.raise_for_status()
json_response = response.json() json_response = response.json()
if json_response.get("claims"): if json_response.get("claims"):
return await generate_fact_report(request.query, json_response) return await generate_fact_report(request.query, json_response)
except httpx.RequestError as e: except httpx.RequestError as e:
print(f"Error fetching results for site {source.domain}: {str(e)}") print(f"Error fetching results for site {source.domain}: {str(e)}")
continue continue
except Exception as e: except Exception as e:
print(f"Unexpected error for site {source.domain}: {str(e)}") print(f"Unexpected error for site {source.domain}: {str(e)}")
continue continue
try: try:
search_request = SearchRequest( search_request = SearchRequest(
search_text=request.query, search_text=request.query, source_types=["fact_checkers"]
source_types=["fact_checkers"]
) )
ai_response = await search_websites(search_request) ai_response = await search_websites(search_request)
return await generate_fact_report(request.query, ai_response) return await generate_fact_report(request.query, ai_response)
except Exception as e: except Exception as e:
print(f"Error in AI fact check: {str(e)}") print(f"Error in AI fact check: {str(e)}")
raise HTTPException( raise HTTPException(
@ -187,6 +183,6 @@ async def check_facts(request: FactCheckRequest):
detail=ErrorResponse( detail=ErrorResponse(
detail="No fact check results found", detail="No fact check results found",
error_code="NOT_FOUND", error_code="NOT_FOUND",
path="/check-facts" path="/check-facts",
).dict() ).dict(),
) )

View file

@ -7,7 +7,7 @@ from pydantic import BaseModel
from app.models.ai_fact_check_models import ( from app.models.ai_fact_check_models import (
AIFactCheckRequest, AIFactCheckRequest,
FactCheckSource, FactCheckSource,
SourceType SourceType,
) )
from app.websites.fact_checker_website import SOURCES, get_all_sources from app.websites.fact_checker_website import SOURCES, get_all_sources
from app.api.ai_fact_check import ai_fact_check from app.api.ai_fact_check import ai_fact_check
@ -18,10 +18,10 @@ class SearchRequest(BaseModel):
search_text: str search_text: str
source_types: List[str] = ["fact_checkers"] source_types: List[str] = ["fact_checkers"]
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,51 +38,58 @@ def get_domain_from_url(url: str) -> str:
try: try:
parsed = urlparse(url) parsed = urlparse(url)
domain = parsed.netloc.lower() domain = parsed.netloc.lower()
if domain.startswith('www.'): if domain.startswith("www."):
domain = domain[4:] domain = domain[4:]
return domain return domain
except Exception as e: except Exception as e:
logger.error(f"Error extracting domain from URL {url}: {str(e)}") logger.error(f"Error extracting domain from URL {url}: {str(e)}")
return "" return ""
def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool: def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool:
"""Check if domain matches any source with improved matching logic.""" """Check if domain matches any source with improved matching logic."""
if not domain: if not domain:
return False return False
domain = domain.lower() domain = domain.lower()
if domain.startswith('www.'): if domain.startswith("www."):
domain = domain[4:] domain = domain[4:]
for source in sources: for source in sources:
source_domain = source.domain.lower() source_domain = source.domain.lower()
if source_domain.startswith('www.'): if source_domain.startswith("www."):
source_domain = source_domain[4:] source_domain = source_domain[4:]
if domain == source_domain or domain.endswith('.' + source_domain): if domain == source_domain or domain.endswith("." + source_domain):
return True return True
return False return False
async def build_enhanced_search_query(query: str, sources: List[FactCheckSource]) -> str:
async def build_enhanced_search_query(
query: str, sources: List[FactCheckSource]
) -> str:
"""Build search query with site restrictions.""" """Build search query with site restrictions."""
site_queries = [f"site:{source.domain}" for source in sources] site_queries = [f"site:{source.domain}" for source in sources]
site_restriction = " OR ".join(site_queries) site_restriction = " OR ".join(site_queries)
return f"({query}) ({site_restriction})" return f"({query}) ({site_restriction})"
async def google_custom_search(query: str, sources: List[FactCheckSource], page: int = 1) -> Optional[Dict]:
async def google_custom_search(
query: str, sources: List[FactCheckSource], page: int = 1
) -> Optional[Dict]:
"""Perform Google Custom Search with enhanced query.""" """Perform Google Custom Search with enhanced query."""
enhanced_query = await build_enhanced_search_query(query, sources) enhanced_query = await build_enhanced_search_query(query, sources)
start_index = ((page - 1) * RESULTS_PER_PAGE) + 1 start_index = ((page - 1) * RESULTS_PER_PAGE) + 1
params = { params = {
"key": GOOGLE_API_KEY, "key": GOOGLE_API_KEY,
"cx": GOOGLE_ENGINE_ID, "cx": GOOGLE_ENGINE_ID,
"q": enhanced_query, "q": enhanced_query,
"num": RESULTS_PER_PAGE, "num": RESULTS_PER_PAGE,
"start": start_index "start": start_index,
} }
async with httpx.AsyncClient(timeout=30.0) as client: async with httpx.AsyncClient(timeout=30.0) as client:
try: try:
response = await client.get(GOOGLE_SEARCH_URL, params=params) response = await client.get(GOOGLE_SEARCH_URL, params=params)
@ -92,69 +99,70 @@ async def google_custom_search(query: str, sources: List[FactCheckSource], page:
logger.error(f"Search error: {str(e)}") logger.error(f"Search error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search error: {str(e)}") raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
@scrap_websites_router.post("/search") @scrap_websites_router.post("/search")
async def search_websites(request: SearchRequest): async def search_websites(request: SearchRequest):
# Get the source types from the request # Get the source types from the request
source_types = request.source_types if request.source_types else ["fact_checkers"] source_types = request.source_types if request.source_types else ["fact_checkers"]
# Get sources based on requested types # Get sources based on requested types
selected_sources = [] selected_sources = []
for source_type in source_types: for source_type in source_types:
if source_type in SOURCES: if source_type in SOURCES:
selected_sources.extend(SOURCES[source_type]) selected_sources.extend(SOURCES[source_type])
# If no valid sources found, use fact checkers as default # If no valid sources found, use fact checkers as default
if not selected_sources: if not selected_sources:
selected_sources = SOURCES["fact_checkers"] selected_sources = SOURCES["fact_checkers"]
all_urls = [] all_urls = []
domain_results = {} domain_results = {}
try: try:
for page in range(1, MAX_PAGES + 1): for page in range(1, MAX_PAGES + 1):
if len(all_urls) >= 50: if len(all_urls) >= 50:
break break
search_response = await google_custom_search(request.search_text, selected_sources, page) search_response = await google_custom_search(
request.search_text, selected_sources, page
)
if not search_response or not search_response.get("items"): if not search_response or not search_response.get("items"):
break break
for item in search_response.get("items", []): for item in search_response.get("items", []):
url = item.get("link") url = item.get("link")
if not url: if not url:
continue continue
domain = get_domain_from_url(url) domain = get_domain_from_url(url)
if is_valid_source_domain(domain, selected_sources): if is_valid_source_domain(domain, selected_sources):
if domain not in domain_results: if domain not in domain_results:
domain_results[domain] = [] domain_results[domain] = []
if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN: if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN:
domain_results[domain].append({ domain_results[domain].append(
"url": url, {
"title": item.get("title", ""), "url": url,
"snippet": item.get("snippet", "") "title": item.get("title", ""),
}) "snippet": item.get("snippet", ""),
}
)
all_urls.append(url) all_urls.append(url)
if len(all_urls) >= 50: if len(all_urls) >= 50:
break break
if not all_urls: if not all_urls:
return { return {"status": "no_results", "urls_found": 0}
"status": "no_results",
"urls_found": 0
}
fact_check_request = AIFactCheckRequest( fact_check_request = AIFactCheckRequest(
content=request.search_text, content=request.search_text, urls=all_urls[:5]
urls=all_urls[:5]
) )
return await ai_fact_check(fact_check_request) return await ai_fact_check(fact_check_request)
except Exception as e: except Exception as e:
logger.error(f"Error during search/fact-check process: {str(e)}") logger.error(f"Error during search/fact-check process: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))

View file

@ -4,9 +4,9 @@ from dotenv import load_dotenv
load_dotenv() load_dotenv()
GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
GOOGLE_FACT_CHECK_BASE_URL= os.environ["GOOGLE_FACT_CHECK_BASE_URL"] GOOGLE_FACT_CHECK_BASE_URL = os.environ["GOOGLE_FACT_CHECK_BASE_URL"]
GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"] GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"]
GOOGLE_SEARCH_URL = os.environ["GOOGLE_SEARCH_URL"] GOOGLE_SEARCH_URL = os.environ["GOOGLE_SEARCH_URL"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
FRONTEND_URL = os.environ["FRONTEND_URL"] FRONTEND_URL = os.environ["FRONTEND_URL"]

View file

@ -4,38 +4,46 @@ from enum import Enum
from datetime import datetime from datetime import datetime
from urllib.parse import urlparse from urllib.parse import urlparse
# Common Models # Common Models
class TokenUsage(BaseModel): class TokenUsage(BaseModel):
prompt_tokens: Optional[int] = 0 prompt_tokens: Optional[int] = 0
completion_tokens: Optional[int] = 0 completion_tokens: Optional[int] = 0
total_tokens: Optional[int] = 0 total_tokens: Optional[int] = 0
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
detail: str detail: str
error_code: str = Field(..., description="Unique error code for this type of error") error_code: str = Field(..., description="Unique error code for this type of error")
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
path: Optional[str] = Field(None, description="The endpoint path where error occurred") path: Optional[str] = Field(
None, description="The endpoint path where error occurred"
)
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
"example": { json_schema_extra={
"detail": "Error description", "example": {
"error_code": "ERROR_CODE", "detail": "Error description",
"timestamp": "2024-12-09T16:49:30.905765", "error_code": "ERROR_CODE",
"path": "/check-facts" "timestamp": "2024-12-09T16:49:30.905765",
"path": "/check-facts",
}
} }
}) )
# Fact Check Models # Fact Check Models
class Publisher(BaseModel): class Publisher(BaseModel):
name: str name: str
site: Optional[str] = Field(None, description="Publisher's website") site: Optional[str] = Field(None, description="Publisher's website")
@validator('site') @validator("site")
def validate_site(cls, v): def validate_site(cls, v):
if v and not (v.startswith('http://') or v.startswith('https://')): if v and not (v.startswith("http://") or v.startswith("https://")):
return f"https://{v}" return f"https://{v}"
return v return v
class ClaimReview(BaseModel): class ClaimReview(BaseModel):
publisher: Publisher publisher: Publisher
url: Optional[HttpUrl] = None url: Optional[HttpUrl] = None
@ -44,21 +52,25 @@ class ClaimReview(BaseModel):
textualRating: Optional[str] = None textualRating: Optional[str] = None
languageCode: str = Field(default="en-US") languageCode: str = Field(default="en-US")
class Claim(BaseModel): class Claim(BaseModel):
text: str text: str
claimant: Optional[str] = None claimant: Optional[str] = None
claimDate: Optional[str] = None claimDate: Optional[str] = None
claimReview: List[ClaimReview] claimReview: List[ClaimReview]
class SourceType(str, Enum): class SourceType(str, Enum):
FACT_CHECKER = "fact_checker" FACT_CHECKER = "fact_checker"
NEWS_SITE = "news_site" NEWS_SITE = "news_site"
class FactCheckSource(BaseModel): class FactCheckSource(BaseModel):
domain: str domain: str
type: SourceType type: SourceType
priority: int = Field(default=1, ge=1, le=10) priority: int = Field(default=1, ge=1, le=10)
# Verification Models # Verification Models
class VerificationResult(BaseModel): class VerificationResult(BaseModel):
verdict: str = Field(..., description="True/False/Insufficient Information") verdict: str = Field(..., description="True/False/Insufficient Information")
@ -67,54 +79,56 @@ class VerificationResult(BaseModel):
reasoning: str reasoning: str
missing_info: Optional[str] = None missing_info: Optional[str] = None
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
"example": { json_schema_extra={
"verdict": "True", "example": {
"confidence": "High", "verdict": "True",
"evidence": ["Direct quote from source supporting the claim"], "confidence": "High",
"reasoning": "Detailed analysis of why the claim is considered true", "evidence": ["Direct quote from source supporting the claim"],
"missing_info": "Any caveats or limitations of the verification" "reasoning": "Detailed analysis of why the claim is considered true",
"missing_info": "Any caveats or limitations of the verification",
}
} }
}) )
# Request Models # Request Models
class BaseFactCheckRequest(BaseModel): class BaseFactCheckRequest(BaseModel):
content: str = Field( content: str = Field(
..., ..., min_length=10, max_length=1000, description="The claim to be fact-checked"
min_length=10,
max_length=1000,
description="The claim to be fact-checked"
) )
@validator('content') @validator("content")
def validate_content(cls, v): def validate_content(cls, v):
if not v.strip(): if not v.strip():
raise ValueError("Content cannot be empty or just whitespace") raise ValueError("Content cannot be empty or just whitespace")
return v.strip() return v.strip()
class GoogleFactCheckRequest(BaseFactCheckRequest): class GoogleFactCheckRequest(BaseFactCheckRequest):
language: str = Field(default="en-US", pattern="^[a-z]{2}-[A-Z]{2}$") language: str = Field(default="en-US", pattern="^[a-z]{2}-[A-Z]{2}$")
max_results_per_source: int = Field(default=10, ge=1, le=50) max_results_per_source: int = Field(default=10, ge=1, le=50)
class AIFactCheckRequest(BaseFactCheckRequest): class AIFactCheckRequest(BaseFactCheckRequest):
urls: List[str] = Field( urls: List[str] = Field(
..., ...,
min_items=1, min_items=1,
max_items=5, max_items=5,
description="List of URLs to check the content against. URLs will be prefixed with https:// if protocol is missing" description="List of URLs to check the content against. URLs will be prefixed with https:// if protocol is missing",
) )
@validator('urls') @validator("urls")
def validate_urls(cls, urls): def validate_urls(cls, urls):
validated_urls = [] validated_urls = []
for url in urls: for url in urls:
if not url.strip(): if not url.strip():
raise ValueError("URL cannot be empty") raise ValueError("URL cannot be empty")
# Add https:// if no protocol specified # Add https:// if no protocol specified
if not url.startswith(('http://', 'https://')): if not url.startswith(("http://", "https://")):
url = f'https://{url}' url = f"https://{url}"
try: try:
result = urlparse(url) result = urlparse(url)
if not result.netloc: if not result.netloc:
@ -122,18 +136,21 @@ class AIFactCheckRequest(BaseFactCheckRequest):
validated_urls.append(url) validated_urls.append(url)
except Exception as e: except Exception as e:
raise ValueError(f"Invalid URL {url}: {str(e)}") raise ValueError(f"Invalid URL {url}: {str(e)}")
return validated_urls return validated_urls
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
"example": { json_schema_extra={
"content": "Indian flag was drawn in BUET campus", "example": {
"urls": [ "content": "Indian flag was drawn in BUET campus",
"www.altnews.in/article-about-flag", "urls": [
"www.another-source.com/related-news" "www.altnews.in/article-about-flag",
] "www.another-source.com/related-news",
],
}
} }
}) )
# Response Models # Response Models
class BaseFactCheckResponse(BaseModel): class BaseFactCheckResponse(BaseModel):
@ -141,17 +158,20 @@ class BaseFactCheckResponse(BaseModel):
token_usage: TokenUsage token_usage: TokenUsage
sources: List[str] sources: List[str]
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
"example": { json_schema_extra={
"query": "Example statement to verify", "example": {
"token_usage": { "query": "Example statement to verify",
"prompt_tokens": 100, "token_usage": {
"completion_tokens": 50, "prompt_tokens": 100,
"total_tokens": 150 "completion_tokens": 50,
}, "total_tokens": 150,
"sources": ["source1.com", "source2.com"], },
"sources": ["source1.com", "source2.com"],
}
} }
}) )
class GoogleFactCheckResponse(BaseFactCheckResponse): class GoogleFactCheckResponse(BaseFactCheckResponse):
total_claims_found: int total_claims_found: int
@ -159,71 +179,80 @@ class GoogleFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[str, Any] verification_result: Dict[str, Any]
summary: Dict[str, int] summary: Dict[str, int]
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
"example": { json_schema_extra={
"query": "Example claim", "example": {
"total_claims_found": 1, "query": "Example claim",
"results": [{ "total_claims_found": 1,
"text": "Example claim text", "results": [
"claimant": "Source name", {
"claimReview": [{ "text": "Example claim text",
"publisher": { "claimant": "Source name",
"name": "Fact Checker", "claimReview": [
"site": "factchecker.com" {
}, "publisher": {
"textualRating": "True" "name": "Fact Checker",
}] "site": "factchecker.com",
}], },
"verification_result": { "textualRating": "True",
"verdict": "True", }
"confidence": "High", ],
"evidence": ["Supporting evidence"], }
"reasoning": "Detailed analysis" ],
}, "verification_result": {
"sources": ["factchecker.com"],
"token_usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150
},
"summary": {
"total_sources": 1,
"fact_checking_sites_queried": 10
}
}
})
class AIFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[str, VerificationResult] # Changed to Dict to store results per URL
model_config = ConfigDict(json_schema_extra={
"example": {
"query": "Indian flag was drawn in BUET campus",
"verification_result": {
"https://www.source1.com": {
"verdict": "True", "verdict": "True",
"confidence": "High", "confidence": "High",
"evidence": ["Supporting evidence from source 1"], "evidence": ["Supporting evidence"],
"reasoning": "Detailed analysis from source 1", "reasoning": "Detailed analysis",
"missing_info": None
}, },
"https://www.source2.com": { "sources": ["factchecker.com"],
"verdict": "True", "token_usage": {
"confidence": "Medium", "prompt_tokens": 100,
"evidence": ["Supporting evidence from source 2"], "completion_tokens": 50,
"reasoning": "Analysis from source 2", "total_tokens": 150,
"missing_info": "Additional context needed" },
} "summary": {"total_sources": 1, "fact_checking_sites_queried": 10},
},
"sources": ["source1.com", "source2.com"],
"token_usage": {
"prompt_tokens": 200,
"completion_tokens": 100,
"total_tokens": 300
} }
} }
}) )
class AIFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[
str, VerificationResult
] # Changed to Dict to store results per URL
model_config = ConfigDict(
json_schema_extra={
"example": {
"query": "Indian flag was drawn in BUET campus",
"verification_result": {
"https://www.source1.com": {
"verdict": "True",
"confidence": "High",
"evidence": ["Supporting evidence from source 1"],
"reasoning": "Detailed analysis from source 1",
"missing_info": None,
},
"https://www.source2.com": {
"verdict": "True",
"confidence": "Medium",
"evidence": ["Supporting evidence from source 2"],
"reasoning": "Analysis from source 2",
"missing_info": "Additional context needed",
},
},
"sources": ["source1.com", "source2.com"],
"token_usage": {
"prompt_tokens": 200,
"completion_tokens": 100,
"total_tokens": 300,
},
}
}
)
# Backwards compatibility aliases # Backwards compatibility aliases
FactCheckRequest = GoogleFactCheckRequest FactCheckRequest = GoogleFactCheckRequest
FactCheckResponse = GoogleFactCheckResponse FactCheckResponse = GoogleFactCheckResponse

View file

@ -3,74 +3,73 @@ from typing import List, Literal, Union
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
class VerdictEnum(str, Enum): class VerdictEnum(str, Enum):
TRUE = "True" TRUE = "True"
FALSE = "False" FALSE = "False"
PARTIALLY_TRUE = "Partially True" PARTIALLY_TRUE = "Partially True"
UNVERIFIED = "Unverified" UNVERIFIED = "Unverified"
class ConfidenceEnum(str, Enum): class ConfidenceEnum(str, Enum):
HIGH = "High" HIGH = "High"
MEDIUM = "Medium" MEDIUM = "Medium"
LOW = "Low" LOW = "Low"
class FactCheckRequest(BaseModel): class FactCheckRequest(BaseModel):
query: str = Field( query: str = Field(
..., ...,
min_length=3, min_length=3,
max_length=500, max_length=500,
description="The claim or statement to be fact-checked", description="The claim or statement to be fact-checked",
example="Did NASA confirm finding alien structures on Mars in 2024?" example="Did NASA confirm finding alien structures on Mars in 2024?",
) )
class Source(BaseModel): class Source(BaseModel):
url: str url: str
name: str = "" name: str = ""
@validator('url') @validator("url")
def validate_url(cls, v): def validate_url(cls, v):
# Basic URL validation without requiring HTTP/HTTPS # Basic URL validation without requiring HTTP/HTTPS
if not v or len(v) < 3: if not v or len(v) < 3:
raise ValueError("URL must not be empty and must be at least 3 characters") raise ValueError("URL must not be empty and must be at least 3 characters")
return v return v
class FactCheckResponse(BaseModel): class FactCheckResponse(BaseModel):
claim: str = Field( claim: str = Field(
..., ...,
min_length=10, min_length=10,
max_length=1000, max_length=1000,
description="The exact claim being verified" description="The exact claim being verified",
)
verdict: VerdictEnum = Field(
...,
description="The verification verdict"
) )
verdict: VerdictEnum = Field(..., description="The verification verdict")
confidence: ConfidenceEnum = Field( confidence: ConfidenceEnum = Field(
..., ..., description="Confidence level in the verdict"
description="Confidence level in the verdict"
) )
sources: List[Source] = Field( sources: List[Source] = Field(
..., ..., min_items=1, description="List of sources used in verification"
min_items=1,
description="List of sources used in verification"
) )
evidence: str = Field( evidence: str = Field(
..., ...,
min_length=20, min_length=20,
max_length=500, max_length=500,
description="Concise summary of key evidence" description="Concise summary of key evidence",
) )
explanation: str = Field( explanation: str = Field(
..., ...,
min_length=50, min_length=50,
max_length=1000, max_length=1000,
description="Detailed explanation of verification findings" description="Detailed explanation of verification findings",
) )
additional_context: str = Field( additional_context: str = Field(
..., ...,
min_length=20, min_length=20,
max_length=500, max_length=500,
description="Important context about the verification" description="Important context about the verification",
) )
class Config: class Config:
@ -82,20 +81,21 @@ class FactCheckResponse(BaseModel):
"sources": [ "sources": [
{ {
"url": "https://www.nasa.gov/mars-exploration", "url": "https://www.nasa.gov/mars-exploration",
"name": "NASA Mars Exploration" "name": "NASA Mars Exploration",
}, },
{ {
"url": "https://factcheck.org/2024/mars-claims", "url": "https://factcheck.org/2024/mars-claims",
"name": "FactCheck.org" "name": "FactCheck.org",
} },
], ],
"evidence": "NASA has made no such announcement. Recent Mars rover images show natural rock formations.", "evidence": "NASA has made no such announcement. Recent Mars rover images show natural rock formations.",
"explanation": "Multiple fact-checking organizations investigated this claim. NASA's official communications and Mars mission reports from 2024 contain no mention of alien structures. The viral images being shared are misidentified natural geological formations.", "explanation": "Multiple fact-checking organizations investigated this claim. NASA's official communications and Mars mission reports from 2024 contain no mention of alien structures. The viral images being shared are misidentified natural geological formations.",
"additional_context": "Similar false claims about alien structures on Mars have circulated periodically since the first Mars rovers began sending back images." "additional_context": "Similar false claims about alien structures on Mars have circulated periodically since the first Mars rovers began sending back images.",
} }
} }
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
detail: str detail: str
error_code: str = Field(..., example="VALIDATION_ERROR") error_code: str = Field(..., example="VALIDATION_ERROR")
path: str = Field(..., example="/check-facts") path: str = Field(..., example="/check-facts")

View file

@ -1,38 +1,46 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Dict from typing import List, Dict
class SearchRequest(BaseModel): class SearchRequest(BaseModel):
search_text: str search_text: str
source_types: List[str] = ["fact_checkers"] source_types: List[str] = ["fact_checkers"]
class Publisher(BaseModel): class Publisher(BaseModel):
name: str name: str
site: str site: str
class ClaimReview(BaseModel): class ClaimReview(BaseModel):
publisher: Publisher publisher: Publisher
textualRating: str textualRating: str
class Claim(BaseModel): class Claim(BaseModel):
claimReview: List[ClaimReview] claimReview: List[ClaimReview]
claimant: str claimant: str
text: str text: str
class Summary(BaseModel): class Summary(BaseModel):
fact_checking_sites_queried: int fact_checking_sites_queried: int
total_sources: int total_sources: int
class TokenUsage(BaseModel): class TokenUsage(BaseModel):
prompt_tokens: int prompt_tokens: int
completion_tokens: int completion_tokens: int
total_tokens: int total_tokens: int
class VerificationResult(BaseModel): class VerificationResult(BaseModel):
verdict: str verdict: str
confidence: str confidence: str
evidence: List[str] evidence: List[str]
reasoning: str reasoning: str
class EnhancedFactCheckResponse(BaseModel): class EnhancedFactCheckResponse(BaseModel):
query: str query: str
results: List[Claim] results: List[Claim]
@ -40,4 +48,4 @@ class EnhancedFactCheckResponse(BaseModel):
summary: Summary summary: Summary
token_usage: Dict[str, int] token_usage: Dict[str, int]
total_claims_found: int total_claims_found: int
verification_result: VerificationResult verification_result: VerificationResult

View file

@ -9,6 +9,7 @@ import json
import aiohttp import aiohttp
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
class OpenAIClient: class OpenAIClient:
def __init__(self, api_key: str): def __init__(self, api_key: str):
""" """
@ -16,7 +17,9 @@ class OpenAIClient:
""" """
openai.api_key = api_key openai.api_key = api_key
async def generate_text_response(self, system_prompt: str, user_prompt: str, max_tokens: int) -> dict: async def generate_text_response(
self, system_prompt: str, user_prompt: str, max_tokens: int
) -> dict:
""" """
Generate a response using OpenAI's chat completion API. Generate a response using OpenAI's chat completion API.
""" """
@ -25,19 +28,19 @@ class OpenAIClient:
model="gpt-4", model="gpt-4",
messages=[ messages=[
{"role": "system", "content": system_prompt}, {"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt} {"role": "user", "content": user_prompt},
], ],
max_tokens=max_tokens max_tokens=max_tokens,
) )
content = response['choices'][0]['message']['content'] content = response["choices"][0]["message"]["content"]
# Parse the JSON string into a dictionary # Parse the JSON string into a dictionary
parsed_content = json.loads(content) parsed_content = json.loads(content)
return { return {
"response": parsed_content, # Now returns a dictionary instead of string "response": parsed_content, # Now returns a dictionary instead of string
"prompt_tokens": response['usage']['prompt_tokens'], "prompt_tokens": response["usage"]["prompt_tokens"],
"completion_tokens": response['usage']['completion_tokens'], "completion_tokens": response["usage"]["completion_tokens"],
"total_tokens": response['usage']['total_tokens'] "total_tokens": response["usage"]["total_tokens"],
} }
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise Exception(f"Failed to parse OpenAI response as JSON: {str(e)}") raise Exception(f"Failed to parse OpenAI response as JSON: {str(e)}")
@ -50,14 +53,14 @@ class OpenAIClient:
""" """
try: try:
response = openai.Embedding.create( response = openai.Embedding.create(
input=texts, input=texts, model="text-embedding-ada-002"
model="text-embedding-ada-002"
) )
embeddings = [data['embedding'] for data in response['data']] embeddings = [data["embedding"] for data in response["data"]]
return embeddings return embeddings
except Exception as e: except Exception as e:
raise Exception(f"OpenAI embedding error: {str(e)}") raise Exception(f"OpenAI embedding error: {str(e)}")
class AIFactChecker: class AIFactChecker:
def __init__(self, openai_client: OpenAIClient): def __init__(self, openai_client: OpenAIClient):
"""Initialize the fact checker with OpenAI client.""" """Initialize the fact checker with OpenAI client."""
@ -66,65 +69,71 @@ class AIFactChecker:
chunk_size=1000, chunk_size=1000,
chunk_overlap=200, chunk_overlap=200,
length_function=len, length_function=len,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
) )
async def scrape_webpage(self, url: str) -> List[Document]: async def scrape_webpage(self, url: str) -> List[Document]:
"""Scrape webpage content without saving HTML files.""" """Scrape webpage content without saving HTML files."""
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url) as response: async with session.get(url) as response:
if response.status != 200: if response.status != 200:
raise Exception(f"Failed to fetch URL: {url}, status: {response.status}") raise Exception(
f"Failed to fetch URL: {url}, status: {response.status}"
)
html_content = await response.text() html_content = await response.text()
# Parse HTML with BeautifulSoup # Parse HTML with BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser') soup = BeautifulSoup(html_content, "html.parser")
# Create a Document with the parsed content # Create a Document with the parsed content
doc = Document( doc = Document(
page_content=soup.get_text(separator='\n', strip=True), page_content=soup.get_text(separator="\n", strip=True),
metadata={"source": url} metadata={"source": url},
) )
# Split into chunks # Split into chunks
docs_chunks = self.text_splitter.split_documents([doc]) docs_chunks = self.text_splitter.split_documents([doc])
logger.info(f"Successfully scraped webpage | chunks={len(docs_chunks)}") logger.info(
f"Successfully scraped webpage | chunks={len(docs_chunks)}"
)
return docs_chunks return docs_chunks
except Exception as e: except Exception as e:
logger.error(f"Error scraping webpage | url={url} | error={str(e)}") logger.error(f"Error scraping webpage | url={url} | error={str(e)}")
raise raise
def find_relevant_chunks( def find_relevant_chunks(
self, self,
query_embedding: List[float], query_embedding: List[float],
doc_embeddings: List[List[float]], doc_embeddings: List[List[float]],
docs: List[Document] docs: List[Document],
) -> List[Document]: ) -> List[Document]:
"""Find most relevant document chunks using cosine similarity.""" """Find most relevant document chunks using cosine similarity."""
try: try:
query_array = np.array(query_embedding) query_array = np.array(query_embedding)
chunks_array = np.array(doc_embeddings) chunks_array = np.array(doc_embeddings)
similarities = np.dot(chunks_array, query_array) / ( similarities = np.dot(chunks_array, query_array) / (
np.linalg.norm(chunks_array, axis=1) * np.linalg.norm(query_array) np.linalg.norm(chunks_array, axis=1) * np.linalg.norm(query_array)
) )
top_indices = np.argsort(similarities)[-5:][::-1] top_indices = np.argsort(similarities)[-5:][::-1]
return [docs[i] for i in top_indices] return [docs[i] for i in top_indices]
except Exception as e: except Exception as e:
logger.error(f"Error finding relevant chunks | error={str(e)}") logger.error(f"Error finding relevant chunks | error={str(e)}")
raise raise
async def verify_fact(self, query: str, relevant_docs: List[Document]) -> Dict[str, Any]: async def verify_fact(
self, query: str, relevant_docs: List[Document]
) -> Dict[str, Any]:
"""Verify fact using OpenAI's API with context from relevant documents.""" """Verify fact using OpenAI's API with context from relevant documents."""
try: try:
context = "\n\n".join([doc.page_content for doc in relevant_docs]) context = "\n\n".join([doc.page_content for doc in relevant_docs])
system_prompt = """You are a professional fact-checking assistant. Analyze the provided context system_prompt = """You are a professional fact-checking assistant. Analyze the provided context
and determine if the given statement is true, false, or if there isn't enough information. and determine if the given statement is true, false, or if there isn't enough information.
@ -136,32 +145,37 @@ class AIFactChecker:
"reasoning": "Your detailed analysis and reasoning", "reasoning": "Your detailed analysis and reasoning",
"missing_info": "Any important missing information (if applicable)" "missing_info": "Any important missing information (if applicable)"
}""" }"""
user_prompt = f"""Context: user_prompt = f"""Context:
{context} {context}
Statement to verify: "{query}" Statement to verify: "{query}"
Analyze the statement based on the provided context and return your response in the specified JSON format.""" Analyze the statement based on the provided context and return your response in the specified JSON format."""
response = await self.openai_client.generate_text_response( response = await self.openai_client.generate_text_response(
system_prompt=system_prompt, system_prompt=system_prompt, user_prompt=user_prompt, max_tokens=800
user_prompt=user_prompt,
max_tokens=800
) )
sources = list(set([doc.metadata.get('source', 'Unknown source') for doc in relevant_docs])) sources = list(
set(
[
doc.metadata.get("source", "Unknown source")
for doc in relevant_docs
]
)
)
return { return {
"verification_result": response["response"], # This is now a dictionary "verification_result": response["response"], # This is now a dictionary
"sources": sources, "sources": sources,
"token_usage": { "token_usage": {
"prompt_tokens": response["prompt_tokens"], "prompt_tokens": response["prompt_tokens"],
"completion_tokens": response["completion_tokens"], "completion_tokens": response["completion_tokens"],
"total_tokens": response["total_tokens"] "total_tokens": response["total_tokens"],
} },
} }
except Exception as e: except Exception as e:
logger.error(f"Error verifying fact | error={str(e)}") logger.error(f"Error verifying fact | error={str(e)}")
raise raise
@ -170,16 +184,18 @@ class AIFactChecker:
"""Main method to check a fact against a webpage.""" """Main method to check a fact against a webpage."""
try: try:
docs = await self.scrape_webpage(url) docs = await self.scrape_webpage(url)
doc_texts = [doc.page_content for doc in docs] doc_texts = [doc.page_content for doc in docs]
doc_embeddings = self.openai_client.get_embeddings(doc_texts) doc_embeddings = self.openai_client.get_embeddings(doc_texts)
query_embedding = self.openai_client.get_embeddings([query]) query_embedding = self.openai_client.get_embeddings([query])
relevant_docs = self.find_relevant_chunks(query_embedding[0], doc_embeddings, docs) relevant_docs = self.find_relevant_chunks(
query_embedding[0], doc_embeddings, docs
)
verification_result = await self.verify_fact(query, relevant_docs) verification_result = await self.verify_fact(query, relevant_docs)
return verification_result return verification_result
except Exception as e: except Exception as e:
logger.error(f"Error checking fact | error={str(e)}") logger.error(f"Error checking fact | error={str(e)}")
raise raise

View file

@ -1,120 +1,125 @@
from typing import Dict, List from typing import Dict, List
import requests import requests
from fastapi import HTTPException from fastapi import HTTPException
from app.models.ai_fact_check_models import FactCheckSource, ErrorResponse, FactCheckRequest, SourceType from app.models.ai_fact_check_models import (
FactCheckSource,
ErrorResponse,
FactCheckRequest,
SourceType,
)
# Sources configuration with validation # Sources configuration with validation
SOURCES = { SOURCES = {
"fact_checkers": [ "fact_checkers": [
FactCheckSource(domain=domain, type=SourceType.FACT_CHECKER, priority=1) FactCheckSource(domain=domain, type=SourceType.FACT_CHECKER, priority=1)
for domain in [ for domain in [
"snopes.com", "snopes.com",
"politifact.com", "politifact.com",
"factcheck.org", "factcheck.org",
"reuters.com/fact-check", "reuters.com/fact-check",
"apnews.com/hub/ap-fact-check", "apnews.com/hub/ap-fact-check",
"bbc.com/news/reality_check", "bbc.com/news/reality_check",
"fullfact.org", "fullfact.org",
"afp.com/fact-check", "afp.com/fact-check",
"truthorfiction.com", "truthorfiction.com",
"leadstories.com", "leadstories.com",
"checkyourfact.com", "checkyourfact.com",
"washingtonpost.com/news/fact-checker", "washingtonpost.com/news/fact-checker",
"factcheck.kz", "factcheck.kz",
"poynter.org/ifcn", "poynter.org/ifcn",
"factcheckeu.info", "factcheckeu.info",
"africacheck.org", "africacheck.org",
"thequint.com/webqoof", "thequint.com/webqoof",
"altnews.in", "altnews.in",
"facta.news", "facta.news",
"factcheckni.org", "factcheckni.org",
"mythdetector.ge", "mythdetector.ge",
"verificado.mx", "verificado.mx",
"euvsdisinfo.eu", "euvsdisinfo.eu",
"factcheck.afp.com", "factcheck.afp.com",
"newtral.es", "newtral.es",
"maldita.es", "maldita.es",
"faktograf.hr", "faktograf.hr",
"demagog.org.pl", "demagog.org.pl",
"factnameh.com", "factnameh.com",
"faktiskt.se", "faktiskt.se",
"teyit.org", "teyit.org",
"factly.in", "factly.in",
"boom.live", "boom.live",
"stopfake.org", "stopfake.org",
"factcheck.ge", "factcheck.ge",
"factcheck.kg", "factcheck.kg",
"factcheck.uz", "factcheck.uz",
"factcheck.tj", "factcheck.tj",
"factcheck.az", "factcheck.az",
"factcheck.am", "factcheck.am",
"factcheck.md", "factcheck.md",
"verafiles.org", "verafiles.org",
"rappler.com/fact-check", "rappler.com/fact-check",
"vera.com.gt", "vera.com.gt",
"chequeado.com", "chequeado.com",
"aosfatos.org", "aosfatos.org",
"lasillavacia.com/detector-mentiras", "lasillavacia.com/detector-mentiras",
"colombiacheck.com", "colombiacheck.com",
"ecuadorchequea.com", "ecuadorchequea.com",
"elsurti.com/checado", "elsurti.com/checado",
"verificat.cat", "verificat.cat",
"mafindo.or.id", "mafindo.or.id",
"tempo.co/cek-fakta", "tempo.co/cek-fakta",
"factcheck.mk", "factcheck.mk",
"raskrinkavanje.ba", "raskrinkavanje.ba",
"faktograf.hr", "faktograf.hr",
"demagog.cz", "demagog.cz",
"faktabaari.fi", "faktabaari.fi",
"correctiv.org", "correctiv.org",
"mimikama.at", "mimikama.at",
"factcheck.vlaanderen", "factcheck.vlaanderen",
"factuel.afp.com", "factuel.afp.com",
"nieuwscheckers.nl", "nieuwscheckers.nl",
"faktisk.no", "faktisk.no",
"tjekdet.dk", "tjekdet.dk",
"ellinikahoaxes.gr", "ellinikahoaxes.gr",
"faktograf.id", "faktograf.id",
"stopfake.kz", "stopfake.kz",
"pesacheck.org", "pesacheck.org",
"dubawa.org", "dubawa.org",
"namibiafactcheck.org.na", "namibiafactcheck.org.na",
"zimfact.org", "zimfact.org",
"ghanafact.com", "ghanafact.com",
"factspace.africa", "factspace.africa",
"factcrescendo.com", "factcrescendo.com",
"vishvasnews.com", "vishvasnews.com",
"factcheck.lk", "factcheck.lk",
"newschecker.in", "newschecker.in",
"boomlive.in", "boomlive.in",
"digiteye.in", "digiteye.in",
"indiatoday.in/fact-check", "indiatoday.in/fact-check",
"factcrescendo.com", "factcrescendo.com",
"piyasa.com/fact-check", "piyasa.com/fact-check",
"taiwanese.facts.news", "taiwanese.facts.news",
"taiwanfactcheck.com", "taiwanfactcheck.com",
"mygopen.com", "mygopen.com",
"tfc-taiwan.org.tw", "tfc-taiwan.org.tw",
"cofacts.tw", "cofacts.tw",
"rumor.taipei", "rumor.taipei",
"fact.qq.com", "fact.qq.com",
"factcheck.afp.com/list", "factcheck.afp.com/list",
"acfta.org", "acfta.org",
"crosscheck.firstdraftnews.org", "crosscheck.firstdraftnews.org",
"healthfeedback.org", "healthfeedback.org",
"climatefeedback.org", "climatefeedback.org",
"sciencefeedback.co", "sciencefeedback.co",
"factcheck.aap.com.au", "factcheck.aap.com.au",
"emergent.info", "emergent.info",
"hoax-slayer.net", "hoax-slayer.net",
"truthorfiction.com", "truthorfiction.com",
"factcheck.media", "factcheck.media",
"mediawise.org", "mediawise.org",
"thejournal.ie/factcheck", "thejournal.ie/factcheck",
"journalistsresource.org", "journalistsresource.org",
"metafact.io", "metafact.io",
"reporterslab.org/fact-checking" "reporterslab.org/fact-checking",
] ]
], ],
"news_sites": [ "news_sites": [
FactCheckSource(domain=domain, type=SourceType.NEWS_SITE, priority=2) FactCheckSource(domain=domain, type=SourceType.NEWS_SITE, priority=2)
@ -133,16 +138,14 @@ SOURCES = {
"www.risingbd.com/english", "www.risingbd.com/english",
"www.dailyindustry.news", "www.dailyindustry.news",
"www.bangladeshpost.net", "www.bangladeshpost.net",
"www.daily-bangladesh.com/english" "www.daily-bangladesh.com/english",
] ]
] ],
} }
async def fetch_fact_checks( async def fetch_fact_checks(
api_key: str, api_key: str, base_url: str, query: str, site: FactCheckSource
base_url: str,
query: str,
site: FactCheckSource
) -> Dict: ) -> Dict:
""" """
Fetch fact checks from a specific site using the Google Fact Check API Fetch fact checks from a specific site using the Google Fact Check API
@ -156,9 +159,9 @@ async def fetch_fact_checks(
"query": query, "query": query,
"languageCode": "en-US", "languageCode": "en-US",
"reviewPublisherSiteFilter": site.domain, "reviewPublisherSiteFilter": site.domain,
"pageSize": 10 "pageSize": 10,
} }
response = requests.get(base_url, params=params) response = requests.get(base_url, params=params)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
@ -168,23 +171,22 @@ async def fetch_fact_checks(
detail=ErrorResponse( detail=ErrorResponse(
detail=f"Error fetching from {site.domain}: {str(e)}", detail=f"Error fetching from {site.domain}: {str(e)}",
error_code="FACT_CHECK_SERVICE_ERROR", error_code="FACT_CHECK_SERVICE_ERROR",
path="/check-facts" path="/check-facts",
).dict() ).dict(),
) )
except ValueError as e: except ValueError as e:
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=ErrorResponse( detail=ErrorResponse(
detail=str(e), detail=str(e), error_code="CONFIGURATION_ERROR", path="/check-facts"
error_code="CONFIGURATION_ERROR", ).dict(),
path="/check-facts"
).dict()
) )
def get_all_sources() -> List[FactCheckSource]: def get_all_sources() -> List[FactCheckSource]:
""" """
Get all sources sorted by priority Get all sources sorted by priority
""" """
# all_sources = SOURCES["fact_checkers"] + SOURCES["news_sites"] # all_sources = SOURCES["fact_checkers"] + SOURCES["news_sites"]
all_sources = SOURCES["fact_checkers"] all_sources = SOURCES["fact_checkers"]
return sorted(all_sources, key=lambda x: x.priority) return sorted(all_sources, key=lambda x: x.priority)

10
main.py
View file

@ -7,9 +7,7 @@ from app.config import FRONTEND_URL
# Initialize FastAPI app # Initialize FastAPI app
app = FastAPI( app = FastAPI(
title="Your API Title", title="Your API Title", description="Your API Description", version="1.0.0"
description="Your API Description",
version="1.0.0"
) )
# CORS configuration # CORS configuration
@ -30,16 +28,19 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
# Basic root endpoint # Basic root endpoint
@app.get("/") @app.get("/")
async def root(): async def root():
return {"message": "Welcome to your FastAPI application"} return {"message": "Welcome to your FastAPI application"}
# Health check endpoint # Health check endpoint
@app.get("/health") @app.get("/health")
async def health_check(): async def health_check():
return {"status": "healthy"} return {"status": "healthy"}
app.include_router(fact_check_router, prefix="") app.include_router(fact_check_router, prefix="")
app.include_router(aifact_check_router, prefix="") app.include_router(aifact_check_router, prefix="")
app.include_router(scrap_websites_router, prefix="") app.include_router(scrap_websites_router, prefix="")
@ -50,4 +51,5 @@ app.include_router(scrap_websites_router, prefix="")
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

View file

@ -3,16 +3,19 @@ from main import app
client = TestClient(app) client = TestClient(app)
def test_root_endpoint(): def test_root_endpoint():
response = client.get("/") response = client.get("/")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"message": "Welcome to your FastAPI application"} assert response.json() == {"message": "Welcome to your FastAPI application"}
def test_health_endpoint(): def test_health_endpoint():
response = client.get("/health") response = client.get("/health")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"status": "healthy"} assert response.json() == {"status": "healthy"}
def test_cors_headers(): def test_cors_headers():
response = client.get("/", headers={"Origin": "http://localhost:5173"}) response = client.get("/", headers={"Origin": "http://localhost:5173"})
assert response.headers["access-control-allow-origin"] == "http://localhost:5173" assert response.headers["access-control-allow-origin"] == "http://localhost:5173"