Compare commits

..

14 commits

Author SHA1 Message Date
Utsho Dey
c8735de51e fact check from image is functional 2024-12-19 16:49:17 +06:00
Utsho Dey
7c4dd378cd fact check from image is functional 2024-12-19 16:47:18 +06:00
Utsho Dey
9298352f2e fact check from image is functional 2024-12-19 16:37:57 +06:00
Utsho Dey
a1a699f9b3 dockerfile added 2024-12-18 17:39:00 +06:00
Utsho Dey
56335cbfa7 fixed pipeline error 2024-12-18 13:16:48 +06:00
Utsho Dey
15a0061a0d fixed response 2024-12-18 13:10:03 +06:00
Utsho Dey
9be0343695 added curl command 2024-12-17 18:51:05 +06:00
Utsho Dey
f32745326b added .env 2024-12-17 18:46:38 +06:00
Utsho Dey
b79c746e15 added .env 2024-12-17 18:43:42 +06:00
Utsho Dey
019e07e1b9 added cicd modified 2024-12-17 18:34:04 +06:00
Utsho Dey
9c15f7a59c added cicd 2024-12-17 18:27:37 +06:00
Utsho Dey
954c01432b added cicd 2024-12-17 18:24:39 +06:00
Utsho Dey
49c9c9c92d added cicd 2024-12-17 18:23:13 +06:00
Utsho Dey
d59f5c884e content fact checked is functional 2024-12-17 18:05:50 +06:00
26 changed files with 1121 additions and 596 deletions

4
.flake8 Normal file
View file

@ -0,0 +1,4 @@
[flake8]
max-line-length = 100
exclude = .git,__pycache__,dist,*.egg-info,venv
extend-ignore = E203

42
.gitignore vendored
View file

@ -1,4 +1,42 @@
env # Environment
env/
.env .env
venv/
ENV/
# Python
__pycache__/
*.py[cod]
*$py.class
.Python
*.so
.pytest_cache/
.coverage
.coverage.*
coverage.xml
*.cover
htmlcov/
# IDEs and editors
.idea/
.vscode/
*.swp
*.swo
*~
# Project specific
test.py test.py
__pycache__ *.log
.pip-cache/
# Temporary files
*.tmp
.DS_Store
# Distribution / packaging
dist/
build/
*.egg-info/
# Docker
.docker/

52
.gitlab-ci.yml Normal file
View file

@ -0,0 +1,52 @@
image: python:3.10-slim
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.pip-cache"
PYTHONPATH: "$CI_PROJECT_DIR"
cache:
paths:
- .pip-cache
- venv/
stages:
- setup
- test
before_script:
- apt-get update
- apt-get install -y curl
- python --version
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
setup:
stage: setup
script:
- pip install --no-cache-dir -r requirements.txt
artifacts:
paths:
- venv/
expire_in: 1 hour
test:
stage: test
needs:
- setup
script:
# Run all tests
- pytest tests/ -v
# Start FastAPI server
- uvicorn main:app --host 0.0.0.0 --port 8000 &
# Wait for server to start
- sleep 15
# Test health endpoint
- |
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/health)
if [ "$RESPONSE" = "200" ]; then
echo "✅ Health check passed"
else
echo "❌ Health check failed with status $RESPONSE"
exit 1
fi

View file

@ -0,0 +1,8 @@
FROM python:3.12
COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
ENTRYPOINT ["gunicorn", "main:app", "--workers", "4", "--timeout", "90", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

View file

@ -6,7 +6,7 @@ from app.models.ai_fact_check_models import (
AIFactCheckResponse, AIFactCheckResponse,
VerificationResult, VerificationResult,
TokenUsage, TokenUsage,
ErrorResponse ErrorResponse,
) )
from urllib.parse import urlparse from urllib.parse import urlparse
import asyncio import asyncio
@ -16,13 +16,11 @@ aifact_check_router = APIRouter()
openai_client = OpenAIClient(api_key=OPENAI_API_KEY) openai_client = OpenAIClient(api_key=OPENAI_API_KEY)
fact_checker = AIFactChecker(openai_client=openai_client) fact_checker = AIFactChecker(openai_client=openai_client)
@aifact_check_router.post( @aifact_check_router.post(
"/aicheck-facts", "/aicheck-facts",
response_model=AIFactCheckResponse, response_model=AIFactCheckResponse,
responses={ responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}},
400: {"model": ErrorResponse},
500: {"model": ErrorResponse}
}
) )
async def ai_fact_check(request: AIFactCheckRequest): async def ai_fact_check(request: AIFactCheckRequest):
""" """
@ -57,7 +55,7 @@ async def ai_fact_check(request: AIFactCheckRequest):
confidence="Low", confidence="Low",
evidence=f"Error checking URL: {str(result)}", evidence=f"Error checking URL: {str(result)}",
reasoning="URL processing failed", reasoning="URL processing failed",
missing_info="Could not access or process the URL" missing_info="Could not access or process the URL",
) )
continue continue
@ -66,7 +64,7 @@ async def ai_fact_check(request: AIFactCheckRequest):
confidence=result["verification_result"]["confidence"], confidence=result["verification_result"]["confidence"],
evidence=result["verification_result"]["evidence"], evidence=result["verification_result"]["evidence"],
reasoning=result["verification_result"]["reasoning"], reasoning=result["verification_result"]["reasoning"],
missing_info=result["verification_result"].get("missing_info", None) missing_info=result["verification_result"].get("missing_info", None),
) )
results[url] = verification_result results[url] = verification_result
@ -80,24 +78,22 @@ async def ai_fact_check(request: AIFactCheckRequest):
token_usage = TokenUsage( token_usage = TokenUsage(
prompt_tokens=total_prompt_tokens, prompt_tokens=total_prompt_tokens,
completion_tokens=total_completion_tokens, completion_tokens=total_completion_tokens,
total_tokens=total_tokens total_tokens=total_tokens,
) )
return AIFactCheckResponse( return AIFactCheckResponse(
query=request.content, query=request.content,
verification_result=results, verification_result=results,
sources=list(all_sources), sources=list(all_sources),
token_usage=token_usage token_usage=token_usage,
) )
except ValueError as e: except ValueError as e:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail=ErrorResponse( detail=ErrorResponse(
detail=str(e), detail=str(e), error_code="INVALID_URL", path="/aicheck-facts"
error_code="INVALID_URL", ).dict(),
path="/aicheck-facts"
).dict()
) )
except Exception as e: except Exception as e:
raise HTTPException( raise HTTPException(
@ -105,6 +101,6 @@ async def ai_fact_check(request: AIFactCheckRequest):
detail=ErrorResponse( detail=ErrorResponse(
detail=f"Error processing fact-check request: {str(e)}", detail=f"Error processing fact-check request: {str(e)}",
error_code="PROCESSING_ERROR", error_code="PROCESSING_ERROR",
path="/aicheck-facts" path="/aicheck-facts",
).dict() ).dict(),
) )

View file

@ -1,20 +1,109 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
import httpx import httpx
import asyncio
import logging
from typing import Union, Optional, Dict, Any
from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL, OPENAI_API_KEY from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL, OPENAI_API_KEY
from app.api.scrap_websites import search_websites, SearchRequest from app.api.scrap_websites import search_websites, SearchRequest
from app.services.openai_client import OpenAIClient from app.services.openai_client import OpenAIClient, AIFactChecker
from app.services.image_text_extractor import ImageTextExtractor
from app.models.ai_fact_check_models import AIFactCheckResponse
from app.models.fact_check_models import ( from app.models.fact_check_models import (
FactCheckRequest, FactCheckRequest,
FactCheckResponse, FactCheckResponse,
UnverifiedFactCheckResponse,
ErrorResponse, ErrorResponse,
Source Source,
VerdictEnum,
ConfidenceEnum
) )
from app.websites.fact_checker_website import get_all_sources from app.websites.fact_checker_website import get_all_sources
# Setup logging
logger = logging.getLogger(__name__)
fact_check_router = APIRouter() fact_check_router = APIRouter()
openai_client = OpenAIClient(OPENAI_API_KEY) openai_client = OpenAIClient(OPENAI_API_KEY)
ai_fact_checker = AIFactChecker(openai_client)
image_text_extractor = ImageTextExtractor(OPENAI_API_KEY)
async def generate_fact_report(query: str, fact_check_data: dict) -> FactCheckResponse:
async def process_url_content(url: str) -> Optional[str]:
"""Extract text content from the provided URL."""
try:
# Add await here
text = await image_text_extractor.extract_text(url, is_url=True)
if text:
logger.info(f"Successfully extracted text from URL: {text}")
else:
logger.warning(f"No text could be extracted from URL: {url}")
return text
except Exception as e:
logger.error(f"Error extracting text from URL: {str(e)}")
return None
async def process_fact_check(query: str) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Process a single fact check query."""
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL:
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="The fact-checking service is not properly configured.",
explanation="The system is missing required API configuration for fact-checking services.",
additional_context="This is a temporary system configuration issue."
)
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
fact_checker_sources = get_all_sources()
for source in fact_checker_sources:
params = {
"key": GOOGLE_API_KEY,
"query": query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain,
"pageSize": 10,
}
try:
response = await client.get(
GOOGLE_FACT_CHECK_BASE_URL, params=params, headers=headers
)
response.raise_for_status()
json_response = response.json()
if json_response.get("claims"):
return await generate_fact_report(query, json_response)
except Exception as e:
logger.error(f"Error with source {source.domain}: {str(e)}")
continue
try:
search_request = SearchRequest(
search_text=query,
source_types=["fact_checkers"]
)
ai_response = await search_websites(search_request)
return await generate_fact_report(query, ai_response)
except Exception as e:
logger.error(f"Error in AI fact check: {str(e)}")
return await generate_fact_report(query, {
"status": "no_results",
"verification_result": {
"no_sources_found": True,
"reason": str(e)
}
})
async def generate_fact_report(query: str, fact_check_data: dict | AIFactCheckResponse) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Generate a fact check report using OpenAI based on the fact check results.""" """Generate a fact check report using OpenAI based on the fact check results."""
try: try:
base_system_prompt = """You are a professional fact-checking reporter. Your task is to create a detailed fact check report based on the provided data. Focus on accuracy, clarity, and proper citation of sources. base_system_prompt = """You are a professional fact-checking reporter. Your task is to create a detailed fact check report based on the provided data. Focus on accuracy, clarity, and proper citation of sources.
@ -23,7 +112,35 @@ Rules:
1. Include all source URLs and names in the sources list 1. Include all source URLs and names in the sources list
2. Keep the explanation focused on verifiable facts 2. Keep the explanation focused on verifiable facts
3. Include dates when available 3. Include dates when available
4. Maintain objectivity in the report""" 4. Maintain objectivity in the report
5. If no reliable sources are found, provide a clear explanation why"""
# Handle both dictionary and AIFactCheckResponse
if hasattr(fact_check_data, 'verification_result'):
# It's an AIFactCheckResponse
has_sources = bool(fact_check_data.sources)
verification_result = fact_check_data.verification_result
fact_check_data_dict = fact_check_data.dict()
else:
# It's a dictionary
has_sources = bool(fact_check_data.get("claims") or fact_check_data.get("urls_found"))
verification_result = fact_check_data.get("verification_result", {})
fact_check_data_dict = fact_check_data
# If no sources were found, return an unverified response
if not has_sources or (
isinstance(fact_check_data, dict) and
fact_check_data.get("status") == "no_results"
) or (verification_result and verification_result.get("no_sources_found")):
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="No fact-checking sources have verified this claim yet.",
explanation="Our search across reputable fact-checking websites did not find any formal verification of this claim. This doesn't mean the claim is false - just that it hasn't been formally fact-checked yet.",
additional_context="The claim may be too recent for fact-checkers to have investigated, or it may not have been widely circulated enough to warrant formal fact-checking."
)
base_user_prompt = """Generate a comprehensive fact check report in this exact JSON format: base_user_prompt = """Generate a comprehensive fact check report in this exact JSON format:
{ {
@ -39,14 +156,12 @@ Rules:
"evidence": "A concise summary of the key evidence (1-2 sentences)", "evidence": "A concise summary of the key evidence (1-2 sentences)",
"explanation": "A detailed explanation including who verified it, when it was verified, and the key findings (2-3 sentences)", "explanation": "A detailed explanation including who verified it, when it was verified, and the key findings (2-3 sentences)",
"additional_context": "Important context about the verification process, limitations, or broader implications (1-2 sentences)" "additional_context": "Important context about the verification process, limitations, or broader implications (1-2 sentences)"
} }"""
Ensure all URLs in sources are complete (including https:// if missing) and each source has both a URL and name.""" if isinstance(fact_check_data, dict) and "claims" in fact_check_data:
if "claims" in fact_check_data:
system_prompt = base_system_prompt system_prompt = base_system_prompt
user_prompt = f"""Query: {query} user_prompt = f"""Query: {query}
Fact Check Results: {fact_check_data} Fact Check Results: {fact_check_data_dict}
{base_user_prompt} {base_user_prompt}
@ -55,11 +170,10 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
2. Specify verification dates when available 2. Specify verification dates when available
3. Name the fact-checking organizations involved 3. Name the fact-checking organizations involved
4. Describe the verification process""" 4. Describe the verification process"""
else: else:
system_prompt = base_system_prompt system_prompt = base_system_prompt
user_prompt = f"""Query: {query} user_prompt = f"""Query: {query}
Fact Check Results: {fact_check_data} Fact Check Results: {fact_check_data_dict}
{base_user_prompt} {base_user_prompt}
@ -76,117 +190,180 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
) )
try: try:
# First try to parse the response directly
response_data = response["response"] response_data = response["response"]
# Clean up sources before validation if isinstance(response_data.get("sources"), list):
if isinstance(response_data.get('sources'), list):
cleaned_sources = [] cleaned_sources = []
for source in response_data['sources']: for source in response_data["sources"]:
if isinstance(source, str): if isinstance(source, str):
# Convert string sources to Source objects url = source if source.startswith("http") else f"https://{source}"
url = source if source.startswith('http') else f"https://{source}" cleaned_sources.append({"url": url, "name": source})
cleaned_sources.append({
"url": url,
"name": source
})
elif isinstance(source, dict): elif isinstance(source, dict):
# Ensure URL has proper scheme url = source.get("url", "")
url = source.get('url', '') if url and not url.startswith("http"):
if url and not url.startswith('http'): source["url"] = f"https://{url}"
source['url'] = f"https://{url}"
cleaned_sources.append(source) cleaned_sources.append(source)
response_data['sources'] = cleaned_sources response_data["sources"] = cleaned_sources
fact_check_response = FactCheckResponse(**response_data) if response_data["verdict"] == "Unverified" or not response_data.get("sources"):
return fact_check_response return UnverifiedFactCheckResponse(**response_data)
return FactCheckResponse(**response_data)
except Exception as validation_error: except Exception as validation_error:
print(f"Response validation error: {str(validation_error)}") logger.error(f"Response validation error: {str(validation_error)}")
raise HTTPException( return UnverifiedFactCheckResponse(
status_code=422, claim=query,
detail=ErrorResponse( verdict=VerdictEnum.UNVERIFIED,
detail=f"Invalid response format: {str(validation_error)}", confidence=ConfidenceEnum.LOW,
error_code="VALIDATION_ERROR", sources=[],
path="/check-facts" evidence="An error occurred while processing the fact check results.",
).dict() explanation="The system encountered an error while validating the fact check results.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
) )
except Exception as e: except Exception as e:
print(f"Error generating fact report: {str(e)}") logger.error(f"Error generating fact report: {str(e)}")
raise HTTPException( return UnverifiedFactCheckResponse(
status_code=500, claim=query,
detail=ErrorResponse( verdict=VerdictEnum.UNVERIFIED,
detail="Error generating fact report", confidence=ConfidenceEnum.LOW,
error_code="FACT_CHECK_ERROR", sources=[],
path="/check-facts" evidence="An error occurred while generating the fact check report.",
).dict() explanation="The system encountered an unexpected error while processing the fact check request.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
) )
@fact_check_router.post("/check-facts", response_model=FactCheckResponse) async def combine_fact_reports(query: str, url_text: str, query_result: Dict[str, Any], url_result: Dict[str, Any]) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Combine fact check results from query and URL into a single comprehensive report."""
try:
system_prompt = """You are a professional fact-checking reporter. Your task is to create a comprehensive fact check report by combining and analyzing multiple fact-checking results. Focus on accuracy, clarity, and proper citation of all sources.
Rules:
1. Include all source URLs and names from both result sets
2. Compare and contrast findings from different sources
3. Include dates when available
4. Note any discrepancies between sources
5. Provide a balanced, objective analysis"""
user_prompt = f"""Original Query: {query}
Extracted Text from URL: {url_text}
First Fact Check Result: {query_result}
Second Fact Check Result: {url_result}
Generate a comprehensive fact check report in this exact JSON format:
{{
"claim": "Write the exact claim being verified",
"verdict": "One of: True/False/Partially True/Unverified",
"confidence": "One of: High/Medium/Low",
"sources": [
{{
"url": "Full URL of the source",
"name": "Name of the source organization"
}}
],
"evidence": "A concise summary of the key evidence from both sources (2-3 sentences)",
"explanation": "A detailed explanation combining findings from both fact checks (3-4 sentences)",
"additional_context": "Important context about differences or similarities in findings (1-2 sentences)"
}}
The report should:
1. Combine sources from both fact checks
2. Compare findings from both analyses
3. Note any differences in conclusions
4. Provide a unified verdict based on all available information"""
response = await openai_client.generate_text_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
max_tokens=1000
)
response_data = response["response"]
# Clean up sources from both results
if isinstance(response_data.get("sources"), list):
cleaned_sources = []
for source in response_data["sources"]:
if isinstance(source, str):
url = source if source.startswith("http") else f"https://{source}"
cleaned_sources.append({"url": url, "name": source})
elif isinstance(source, dict):
url = source.get("url", "")
if url and not url.startswith("http"):
source["url"] = f"https://{url}"
cleaned_sources.append(source)
response_data["sources"] = cleaned_sources
if response_data["verdict"] == "Unverified" or not response_data.get("sources"):
return UnverifiedFactCheckResponse(**response_data)
return FactCheckResponse(**response_data)
except Exception as e:
logger.error(f"Error combining fact reports: {str(e)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="An error occurred while combining fact check reports.",
explanation="The system encountered an error while trying to combine results from multiple sources.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
)
@fact_check_router.post("/check-facts", response_model=Union[FactCheckResponse, UnverifiedFactCheckResponse])
async def check_facts(request: FactCheckRequest): async def check_facts(request: FactCheckRequest):
""" """
Fetch fact check results and generate a comprehensive report. Fetch fact check results and generate a comprehensive report.
Handles both query-based and URL-based fact checking.
""" """
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL: url_text = None
raise HTTPException( query_result = None
status_code=500, url_result = None
detail=ErrorResponse(
detail="Google API key or base URL is not configured", # If URL is provided, try to extract text
error_code="CONFIGURATION_ERROR", if request.url:
path="/check-facts" url_text = await process_url_content(request.url)
).dict() if not url_text and not request.query:
# Only return early if URL text extraction failed and no query provided
return UnverifiedFactCheckResponse(
claim=f"URL check requested: {request.url}",
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="Unable to extract text from the provided URL.",
explanation="The system could not process the content from the provided URL. The URL might be invalid or inaccessible.",
additional_context="Please provide a valid URL or a text query for fact-checking."
) )
headers = {"Content-Type": "application/json"} # If URL text was successfully extracted, process it
async with httpx.AsyncClient() as client: if url_text:
# Get fact checker sources from the centralized configuration logger.info(f"Processing fact check for extracted text: {url_text}")
fact_checker_sources = get_all_sources() url_result = await process_fact_check(url_text)
for source in fact_checker_sources: # Process query if provided
params = { if request.query:
"key": GOOGLE_API_KEY, query_result = await process_fact_check(request.query)
"query": request.query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain,
"pageSize": 10
}
try: # If both results are available, combine them
response = await client.get( if query_result and url_result and url_text:
GOOGLE_FACT_CHECK_BASE_URL, return await combine_fact_reports(request.query, url_text,
params=params, query_result.dict(), url_result.dict())
headers=headers
) # If only one result is available
response.raise_for_status() if query_result:
json_response = response.json() return query_result
if url_result:
if json_response.get("claims"): return url_result
return await generate_fact_report(request.query, json_response)
# If no valid results
except httpx.RequestError as e: return UnverifiedFactCheckResponse(
print(f"Error fetching results for site {source.domain}: {str(e)}") claim=request.query or f"URL: {request.url}",
continue verdict=VerdictEnum.UNVERIFIED,
except Exception as e: confidence=ConfidenceEnum.LOW,
print(f"Unexpected error for site {source.domain}: {str(e)}") sources=[],
continue evidence="Failed to process fact-checking request.",
explanation="The system encountered errors while processing the fact checks.",
try: additional_context="Please try again with different input or contact support if the issue persists."
search_request = SearchRequest(
search_text=request.query,
source_types=["fact_checkers"]
)
ai_response = await search_websites(search_request)
return await generate_fact_report(request.query, ai_response)
except Exception as e:
print(f"Error in AI fact check: {str(e)}")
raise HTTPException(
status_code=404,
detail=ErrorResponse(
detail="No fact check results found",
error_code="NOT_FOUND",
path="/check-facts"
).dict()
) )

View file

@ -7,7 +7,7 @@ from pydantic import BaseModel
from app.models.ai_fact_check_models import ( from app.models.ai_fact_check_models import (
AIFactCheckRequest, AIFactCheckRequest,
FactCheckSource, FactCheckSource,
SourceType SourceType,
) )
from app.websites.fact_checker_website import SOURCES, get_all_sources from app.websites.fact_checker_website import SOURCES, get_all_sources
from app.api.ai_fact_check import ai_fact_check from app.api.ai_fact_check import ai_fact_check
@ -18,10 +18,10 @@ class SearchRequest(BaseModel):
search_text: str search_text: str
source_types: List[str] = ["fact_checkers"] source_types: List[str] = ["fact_checkers"]
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,39 +38,46 @@ def get_domain_from_url(url: str) -> str:
try: try:
parsed = urlparse(url) parsed = urlparse(url)
domain = parsed.netloc.lower() domain = parsed.netloc.lower()
if domain.startswith('www.'): if domain.startswith("www."):
domain = domain[4:] domain = domain[4:]
return domain return domain
except Exception as e: except Exception as e:
logger.error(f"Error extracting domain from URL {url}: {str(e)}") logger.error(f"Error extracting domain from URL {url}: {str(e)}")
return "" return ""
def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool: def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool:
"""Check if domain matches any source with improved matching logic.""" """Check if domain matches any source with improved matching logic."""
if not domain: if not domain:
return False return False
domain = domain.lower() domain = domain.lower()
if domain.startswith('www.'): if domain.startswith("www."):
domain = domain[4:] domain = domain[4:]
for source in sources: for source in sources:
source_domain = source.domain.lower() source_domain = source.domain.lower()
if source_domain.startswith('www.'): if source_domain.startswith("www."):
source_domain = source_domain[4:] source_domain = source_domain[4:]
if domain == source_domain or domain.endswith('.' + source_domain): if domain == source_domain or domain.endswith("." + source_domain):
return True return True
return False return False
async def build_enhanced_search_query(query: str, sources: List[FactCheckSource]) -> str:
async def build_enhanced_search_query(
query: str, sources: List[FactCheckSource]
) -> str:
"""Build search query with site restrictions.""" """Build search query with site restrictions."""
site_queries = [f"site:{source.domain}" for source in sources] site_queries = [f"site:{source.domain}" for source in sources]
site_restriction = " OR ".join(site_queries) site_restriction = " OR ".join(site_queries)
return f"({query}) ({site_restriction})" return f"({query}) ({site_restriction})"
async def google_custom_search(query: str, sources: List[FactCheckSource], page: int = 1) -> Optional[Dict]:
async def google_custom_search(
query: str, sources: List[FactCheckSource], page: int = 1
) -> Optional[Dict]:
"""Perform Google Custom Search with enhanced query.""" """Perform Google Custom Search with enhanced query."""
enhanced_query = await build_enhanced_search_query(query, sources) enhanced_query = await build_enhanced_search_query(query, sources)
start_index = ((page - 1) * RESULTS_PER_PAGE) + 1 start_index = ((page - 1) * RESULTS_PER_PAGE) + 1
@ -80,7 +87,7 @@ async def google_custom_search(query: str, sources: List[FactCheckSource], page:
"cx": GOOGLE_ENGINE_ID, "cx": GOOGLE_ENGINE_ID,
"q": enhanced_query, "q": enhanced_query,
"num": RESULTS_PER_PAGE, "num": RESULTS_PER_PAGE,
"start": start_index "start": start_index,
} }
async with httpx.AsyncClient(timeout=30.0) as client: async with httpx.AsyncClient(timeout=30.0) as client:
@ -92,6 +99,7 @@ async def google_custom_search(query: str, sources: List[FactCheckSource], page:
logger.error(f"Search error: {str(e)}") logger.error(f"Search error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search error: {str(e)}") raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
@scrap_websites_router.post("/search") @scrap_websites_router.post("/search")
async def search_websites(request: SearchRequest): async def search_websites(request: SearchRequest):
# Get the source types from the request # Get the source types from the request
@ -115,7 +123,9 @@ async def search_websites(request: SearchRequest):
if len(all_urls) >= 50: if len(all_urls) >= 50:
break break
search_response = await google_custom_search(request.search_text, selected_sources, page) search_response = await google_custom_search(
request.search_text, selected_sources, page
)
if not search_response or not search_response.get("items"): if not search_response or not search_response.get("items"):
break break
@ -132,25 +142,23 @@ async def search_websites(request: SearchRequest):
domain_results[domain] = [] domain_results[domain] = []
if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN: if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN:
domain_results[domain].append({ domain_results[domain].append(
{
"url": url, "url": url,
"title": item.get("title", ""), "title": item.get("title", ""),
"snippet": item.get("snippet", "") "snippet": item.get("snippet", ""),
}) }
)
all_urls.append(url) all_urls.append(url)
if len(all_urls) >= 50: if len(all_urls) >= 50:
break break
if not all_urls: if not all_urls:
return { return {"status": "no_results", "urls_found": 0}
"status": "no_results",
"urls_found": 0
}
fact_check_request = AIFactCheckRequest( fact_check_request = AIFactCheckRequest(
content=request.search_text, content=request.search_text, urls=all_urls[:5]
urls=all_urls[:5]
) )
return await ai_fact_check(fact_check_request) return await ai_fact_check(fact_check_request)

View file

@ -4,38 +4,46 @@ from enum import Enum
from datetime import datetime from datetime import datetime
from urllib.parse import urlparse from urllib.parse import urlparse
# Common Models # Common Models
class TokenUsage(BaseModel): class TokenUsage(BaseModel):
prompt_tokens: Optional[int] = 0 prompt_tokens: Optional[int] = 0
completion_tokens: Optional[int] = 0 completion_tokens: Optional[int] = 0
total_tokens: Optional[int] = 0 total_tokens: Optional[int] = 0
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
detail: str detail: str
error_code: str = Field(..., description="Unique error code for this type of error") error_code: str = Field(..., description="Unique error code for this type of error")
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
path: Optional[str] = Field(None, description="The endpoint path where error occurred") path: Optional[str] = Field(
None, description="The endpoint path where error occurred"
)
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
json_schema_extra={
"example": { "example": {
"detail": "Error description", "detail": "Error description",
"error_code": "ERROR_CODE", "error_code": "ERROR_CODE",
"timestamp": "2024-12-09T16:49:30.905765", "timestamp": "2024-12-09T16:49:30.905765",
"path": "/check-facts" "path": "/check-facts",
} }
}) }
)
# Fact Check Models # Fact Check Models
class Publisher(BaseModel): class Publisher(BaseModel):
name: str name: str
site: Optional[str] = Field(None, description="Publisher's website") site: Optional[str] = Field(None, description="Publisher's website")
@validator('site') @validator("site")
def validate_site(cls, v): def validate_site(cls, v):
if v and not (v.startswith('http://') or v.startswith('https://')): if v and not (v.startswith("http://") or v.startswith("https://")):
return f"https://{v}" return f"https://{v}"
return v return v
class ClaimReview(BaseModel): class ClaimReview(BaseModel):
publisher: Publisher publisher: Publisher
url: Optional[HttpUrl] = None url: Optional[HttpUrl] = None
@ -44,21 +52,25 @@ class ClaimReview(BaseModel):
textualRating: Optional[str] = None textualRating: Optional[str] = None
languageCode: str = Field(default="en-US") languageCode: str = Field(default="en-US")
class Claim(BaseModel): class Claim(BaseModel):
text: str text: str
claimant: Optional[str] = None claimant: Optional[str] = None
claimDate: Optional[str] = None claimDate: Optional[str] = None
claimReview: List[ClaimReview] claimReview: List[ClaimReview]
class SourceType(str, Enum): class SourceType(str, Enum):
FACT_CHECKER = "fact_checker" FACT_CHECKER = "fact_checker"
NEWS_SITE = "news_site" NEWS_SITE = "news_site"
class FactCheckSource(BaseModel): class FactCheckSource(BaseModel):
domain: str domain: str
type: SourceType type: SourceType
priority: int = Field(default=1, ge=1, le=10) priority: int = Field(default=1, ge=1, le=10)
# Verification Models # Verification Models
class VerificationResult(BaseModel): class VerificationResult(BaseModel):
verdict: str = Field(..., description="True/False/Insufficient Information") verdict: str = Field(..., description="True/False/Insufficient Information")
@ -67,44 +79,46 @@ class VerificationResult(BaseModel):
reasoning: str reasoning: str
missing_info: Optional[str] = None missing_info: Optional[str] = None
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
json_schema_extra={
"example": { "example": {
"verdict": "True", "verdict": "True",
"confidence": "High", "confidence": "High",
"evidence": ["Direct quote from source supporting the claim"], "evidence": ["Direct quote from source supporting the claim"],
"reasoning": "Detailed analysis of why the claim is considered true", "reasoning": "Detailed analysis of why the claim is considered true",
"missing_info": "Any caveats or limitations of the verification" "missing_info": "Any caveats or limitations of the verification",
} }
}) }
)
# Request Models # Request Models
class BaseFactCheckRequest(BaseModel): class BaseFactCheckRequest(BaseModel):
content: str = Field( content: str = Field(
..., ..., min_length=10, max_length=1000, description="The claim to be fact-checked"
min_length=10,
max_length=1000,
description="The claim to be fact-checked"
) )
@validator('content') @validator("content")
def validate_content(cls, v): def validate_content(cls, v):
if not v.strip(): if not v.strip():
raise ValueError("Content cannot be empty or just whitespace") raise ValueError("Content cannot be empty or just whitespace")
return v.strip() return v.strip()
class GoogleFactCheckRequest(BaseFactCheckRequest): class GoogleFactCheckRequest(BaseFactCheckRequest):
language: str = Field(default="en-US", pattern="^[a-z]{2}-[A-Z]{2}$") language: str = Field(default="en-US", pattern="^[a-z]{2}-[A-Z]{2}$")
max_results_per_source: int = Field(default=10, ge=1, le=50) max_results_per_source: int = Field(default=10, ge=1, le=50)
class AIFactCheckRequest(BaseFactCheckRequest): class AIFactCheckRequest(BaseFactCheckRequest):
urls: List[str] = Field( urls: List[str] = Field(
..., ...,
min_items=1, min_items=1,
max_items=5, max_items=5,
description="List of URLs to check the content against. URLs will be prefixed with https:// if protocol is missing" description="List of URLs to check the content against. URLs will be prefixed with https:// if protocol is missing",
) )
@validator('urls') @validator("urls")
def validate_urls(cls, urls): def validate_urls(cls, urls):
validated_urls = [] validated_urls = []
for url in urls: for url in urls:
@ -112,8 +126,8 @@ class AIFactCheckRequest(BaseFactCheckRequest):
raise ValueError("URL cannot be empty") raise ValueError("URL cannot be empty")
# Add https:// if no protocol specified # Add https:// if no protocol specified
if not url.startswith(('http://', 'https://')): if not url.startswith(("http://", "https://")):
url = f'https://{url}' url = f"https://{url}"
try: try:
result = urlparse(url) result = urlparse(url)
@ -125,15 +139,18 @@ class AIFactCheckRequest(BaseFactCheckRequest):
return validated_urls return validated_urls
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
json_schema_extra={
"example": { "example": {
"content": "Indian flag was drawn in BUET campus", "content": "Indian flag was drawn in BUET campus",
"urls": [ "urls": [
"www.altnews.in/article-about-flag", "www.altnews.in/article-about-flag",
"www.another-source.com/related-news" "www.another-source.com/related-news",
] ],
} }
}) }
)
# Response Models # Response Models
class BaseFactCheckResponse(BaseModel): class BaseFactCheckResponse(BaseModel):
@ -141,17 +158,20 @@ class BaseFactCheckResponse(BaseModel):
token_usage: TokenUsage token_usage: TokenUsage
sources: List[str] sources: List[str]
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
json_schema_extra={
"example": { "example": {
"query": "Example statement to verify", "query": "Example statement to verify",
"token_usage": { "token_usage": {
"prompt_tokens": 100, "prompt_tokens": 100,
"completion_tokens": 50, "completion_tokens": 50,
"total_tokens": 150 "total_tokens": 150,
}, },
"sources": ["source1.com", "source2.com"], "sources": ["source1.com", "source2.com"],
} }
}) }
)
class GoogleFactCheckResponse(BaseFactCheckResponse): class GoogleFactCheckResponse(BaseFactCheckResponse):
total_claims_found: int total_claims_found: int
@ -159,44 +179,51 @@ class GoogleFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[str, Any] verification_result: Dict[str, Any]
summary: Dict[str, int] summary: Dict[str, int]
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
json_schema_extra={
"example": { "example": {
"query": "Example claim", "query": "Example claim",
"total_claims_found": 1, "total_claims_found": 1,
"results": [{ "results": [
{
"text": "Example claim text", "text": "Example claim text",
"claimant": "Source name", "claimant": "Source name",
"claimReview": [{ "claimReview": [
{
"publisher": { "publisher": {
"name": "Fact Checker", "name": "Fact Checker",
"site": "factchecker.com" "site": "factchecker.com",
}, },
"textualRating": "True" "textualRating": "True",
}] }
}], ],
}
],
"verification_result": { "verification_result": {
"verdict": "True", "verdict": "True",
"confidence": "High", "confidence": "High",
"evidence": ["Supporting evidence"], "evidence": ["Supporting evidence"],
"reasoning": "Detailed analysis" "reasoning": "Detailed analysis",
}, },
"sources": ["factchecker.com"], "sources": ["factchecker.com"],
"token_usage": { "token_usage": {
"prompt_tokens": 100, "prompt_tokens": 100,
"completion_tokens": 50, "completion_tokens": 50,
"total_tokens": 150 "total_tokens": 150,
}, },
"summary": { "summary": {"total_sources": 1, "fact_checking_sites_queried": 10},
"total_sources": 1,
"fact_checking_sites_queried": 10
} }
} }
}) )
class AIFactCheckResponse(BaseFactCheckResponse): class AIFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[str, VerificationResult] # Changed to Dict to store results per URL verification_result: Dict[
str, VerificationResult
] # Changed to Dict to store results per URL
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(
json_schema_extra={
"example": { "example": {
"query": "Indian flag was drawn in BUET campus", "query": "Indian flag was drawn in BUET campus",
"verification_result": { "verification_result": {
@ -205,24 +232,26 @@ class AIFactCheckResponse(BaseFactCheckResponse):
"confidence": "High", "confidence": "High",
"evidence": ["Supporting evidence from source 1"], "evidence": ["Supporting evidence from source 1"],
"reasoning": "Detailed analysis from source 1", "reasoning": "Detailed analysis from source 1",
"missing_info": None "missing_info": None,
}, },
"https://www.source2.com": { "https://www.source2.com": {
"verdict": "True", "verdict": "True",
"confidence": "Medium", "confidence": "Medium",
"evidence": ["Supporting evidence from source 2"], "evidence": ["Supporting evidence from source 2"],
"reasoning": "Analysis from source 2", "reasoning": "Analysis from source 2",
"missing_info": "Additional context needed" "missing_info": "Additional context needed",
} },
}, },
"sources": ["source1.com", "source2.com"], "sources": ["source1.com", "source2.com"],
"token_usage": { "token_usage": {
"prompt_tokens": 200, "prompt_tokens": 200,
"completion_tokens": 100, "completion_tokens": 100,
"total_tokens": 300 "total_tokens": 300,
},
} }
} }
}) )
# Backwards compatibility aliases # Backwards compatibility aliases
FactCheckRequest = GoogleFactCheckRequest FactCheckRequest = GoogleFactCheckRequest

View file

@ -1,54 +1,106 @@
from pydantic import BaseModel, Field, HttpUrl, validator from pydantic import BaseModel, Field, HttpUrl, validator, root_validator
from typing import List, Literal, Union from typing import List, Literal, Union, Optional
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
class VerdictEnum(str, Enum): class VerdictEnum(str, Enum):
TRUE = "True" TRUE = "True"
FALSE = "False" FALSE = "False"
PARTIALLY_TRUE = "Partially True" PARTIALLY_TRUE = "Partially True"
UNVERIFIED = "Unverified" UNVERIFIED = "Unverified"
class ConfidenceEnum(str, Enum): class ConfidenceEnum(str, Enum):
HIGH = "High" HIGH = "High"
MEDIUM = "Medium" MEDIUM = "Medium"
LOW = "Low" LOW = "Low"
class FactCheckRequest(BaseModel): class FactCheckRequest(BaseModel):
query: str = Field( query: Optional[str] = Field(
..., None,
min_length=3, min_length=3,
max_length=500, max_length=500,
description="The claim or statement to be fact-checked", description="The claim or statement to be fact-checked",
example="Did NASA confirm finding alien structures on Mars in 2024?" example="Did NASA confirm finding alien structures on Mars in 2024?",
) )
url: Optional[str] = Field(
None,
description="URL to be fact-checked",
example="https://example.com/article",
)
@root_validator(pre=True)
def validate_at_least_one(cls, values):
"""Validate that at least one of query or url is provided."""
query = values.get('query')
url = values.get('url')
if not query and not url:
raise ValueError("At least one of 'query' or 'url' must be provided")
return values
@validator('url')
def validate_url(cls, v):
"""Validate URL format if provided."""
if v is not None and len(v) < 3:
raise ValueError("URL must be at least 3 characters")
return v
class Source(BaseModel): class Source(BaseModel):
url: str url: str
name: str = "" name: str = ""
@validator('url') @validator("url")
def validate_url(cls, v): def validate_url(cls, v):
# Basic URL validation without requiring HTTP/HTTPS
if not v or len(v) < 3: if not v or len(v) < 3:
raise ValueError("URL must not be empty and must be at least 3 characters") raise ValueError("URL must not be empty and must be at least 3 characters")
return v return v
class UnverifiedFactCheckResponse(BaseModel):
claim: str = Field(
...,
min_length=10,
max_length=1000,
description="The exact claim being verified",
)
verdict: VerdictEnum = Field(..., description="The verification verdict")
confidence: ConfidenceEnum = Field(..., description="Confidence level in the verdict")
sources: List[Source] = Field(
default=[],
description="List of sources used in verification"
)
evidence: str = Field(
...,
min_length=20,
max_length=500,
description="Concise summary of key evidence",
)
explanation: str = Field(
...,
min_length=50,
max_length=1000,
description="Detailed explanation of verification findings",
)
additional_context: str = Field(
...,
min_length=20,
max_length=500,
description="Important context about the verification",
)
class FactCheckResponse(BaseModel): class FactCheckResponse(BaseModel):
claim: str = Field( claim: str = Field(
..., ...,
min_length=10, min_length=10,
max_length=1000, max_length=1000,
description="The exact claim being verified" description="The exact claim being verified",
)
verdict: VerdictEnum = Field(
...,
description="The verification verdict"
)
confidence: ConfidenceEnum = Field(
...,
description="Confidence level in the verdict"
) )
verdict: VerdictEnum = Field(..., description="The verification verdict")
confidence: ConfidenceEnum = Field(..., description="Confidence level in the verdict")
sources: List[Source] = Field( sources: List[Source] = Field(
..., ...,
min_items=1, min_items=1,
@ -58,19 +110,19 @@ class FactCheckResponse(BaseModel):
..., ...,
min_length=20, min_length=20,
max_length=500, max_length=500,
description="Concise summary of key evidence" description="Concise summary of key evidence",
) )
explanation: str = Field( explanation: str = Field(
..., ...,
min_length=50, min_length=50,
max_length=1000, max_length=1000,
description="Detailed explanation of verification findings" description="Detailed explanation of verification findings",
) )
additional_context: str = Field( additional_context: str = Field(
..., ...,
min_length=20, min_length=20,
max_length=500, max_length=500,
description="Important context about the verification" description="Important context about the verification",
) )
class Config: class Config:
@ -82,19 +134,16 @@ class FactCheckResponse(BaseModel):
"sources": [ "sources": [
{ {
"url": "https://www.nasa.gov/mars-exploration", "url": "https://www.nasa.gov/mars-exploration",
"name": "NASA Mars Exploration" "name": "NASA Mars Exploration",
},
{
"url": "https://factcheck.org/2024/mars-claims",
"name": "FactCheck.org"
} }
], ],
"evidence": "NASA has made no such announcement. Recent Mars rover images show natural rock formations.", "evidence": "NASA has made no such announcement. Recent Mars rover images show natural rock formations.",
"explanation": "Multiple fact-checking organizations investigated this claim. NASA's official communications and Mars mission reports from 2024 contain no mention of alien structures. The viral images being shared are misidentified natural geological formations.", "explanation": "Multiple fact-checking organizations investigated this claim. NASA's official communications and Mars mission reports from 2024 contain no mention of alien structures.",
"additional_context": "Similar false claims about alien structures on Mars have circulated periodically since the first Mars rovers began sending back images." "additional_context": "Similar false claims about alien structures on Mars have circulated periodically.",
} }
} }
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
detail: str detail: str
error_code: str = Field(..., example="VALIDATION_ERROR") error_code: str = Field(..., example="VALIDATION_ERROR")

View file

@ -1,38 +1,46 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Dict from typing import List, Dict
class SearchRequest(BaseModel): class SearchRequest(BaseModel):
search_text: str search_text: str
source_types: List[str] = ["fact_checkers"] source_types: List[str] = ["fact_checkers"]
class Publisher(BaseModel): class Publisher(BaseModel):
name: str name: str
site: str site: str
class ClaimReview(BaseModel): class ClaimReview(BaseModel):
publisher: Publisher publisher: Publisher
textualRating: str textualRating: str
class Claim(BaseModel): class Claim(BaseModel):
claimReview: List[ClaimReview] claimReview: List[ClaimReview]
claimant: str claimant: str
text: str text: str
class Summary(BaseModel): class Summary(BaseModel):
fact_checking_sites_queried: int fact_checking_sites_queried: int
total_sources: int total_sources: int
class TokenUsage(BaseModel): class TokenUsage(BaseModel):
prompt_tokens: int prompt_tokens: int
completion_tokens: int completion_tokens: int
total_tokens: int total_tokens: int
class VerificationResult(BaseModel): class VerificationResult(BaseModel):
verdict: str verdict: str
confidence: str confidence: str
evidence: List[str] evidence: List[str]
reasoning: str reasoning: str
class EnhancedFactCheckResponse(BaseModel): class EnhancedFactCheckResponse(BaseModel):
query: str query: str
results: List[Claim] results: List[Claim]

View file

@ -0,0 +1,119 @@
import base64
import requests
import os
from io import BytesIO
from typing import Tuple, Optional
import logging
import aiohttp
logger = logging.getLogger(__name__)
class ImageTextExtractor:
def __init__(self, api_key: str):
"""Initialize ImageTextExtractor with OpenAI API key."""
self.api_key = api_key
self.api_url = "https://api.openai.com/v1/chat/completions"
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
def encode_image(self, image_path: str) -> str:
"""Encode a local image into base64."""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except Exception as e:
logger.error(f"Error encoding image: {str(e)}")
raise Exception(f"Error encoding image: {e}")
async def fetch_image_from_url(self, image_url: str) -> Tuple[str, str]:
"""Fetch an image from a URL and encode it as base64."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
if response.status != 200:
raise Exception(f"Failed to fetch image: Status {response.status}")
content_type = response.headers.get('Content-Type', '')
if "text/html" in content_type:
raise ValueError("The URL points to a webpage, not an image")
if "image" not in content_type:
raise ValueError("The URL does not point to a valid image")
image_data = await response.read()
image_format = "jpeg" if "jpeg" in content_type or "jpg" in content_type else "png"
base64_image = base64.b64encode(image_data).decode('utf-8')
return base64_image, image_format
except aiohttp.ClientError as e:
logger.error(f"Error fetching image from URL: {str(e)}")
raise Exception(f"Error fetching image from URL: {e}")
except ValueError as e:
raise
except Exception as e:
logger.error(f"Unexpected error processing image URL: {str(e)}")
raise Exception(f"Unexpected error processing image: {e}")
async def extract_text(self, image_input: str, is_url: bool = False) -> Optional[str]:
"""Extract text from an image, either from a local path or URL."""
try:
if is_url:
try:
base64_image, image_format = await self.fetch_image_from_url(image_input)
except ValueError as e:
if "webpage" in str(e):
return None
raise
else:
if not os.path.exists(image_input):
raise FileNotFoundError(f"Image file not found: {image_input}")
base64_image = self.encode_image(image_input)
image_format = "jpeg" if image_input.endswith(".jpg") else "png"
payload = {
"model": "gpt-4-turbo-2024-04-09", # Updated model name
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Extract and return only the key text from this image in the original language. Do not provide translations or explanations."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/{image_format};base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
async with aiohttp.ClientSession() as session:
async with session.post(self.api_url, headers=self.headers, json=payload) as response:
if response.status != 200:
error_content = await response.text()
logger.error(f"API request failed: Status {response.status}, Response: {error_content}")
raise Exception(f"API request failed with status {response.status}")
result = await response.json()
logger.debug(f"GPT-4 API Response: {result}")
if 'choices' in result and len(result['choices']) > 0:
extracted_text = result['choices'][0]['message']['content'].strip()
if extracted_text:
return extracted_text
return None
except (aiohttp.ClientError, ValueError, FileNotFoundError) as e:
logger.error(f"Error in text extraction: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected error in text extraction: {str(e)}")
return None
return None

View file

@ -1,4 +1,3 @@
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain_community.document_transformers import BeautifulSoupTransformer from langchain_community.document_transformers import BeautifulSoupTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document from langchain_core.documents import Document
@ -7,6 +6,9 @@ import numpy as np
import logging as logger import logging as logger
import openai import openai
import json import json
import aiohttp
from bs4 import BeautifulSoup
class OpenAIClient: class OpenAIClient:
def __init__(self, api_key: str): def __init__(self, api_key: str):
@ -15,7 +17,9 @@ class OpenAIClient:
""" """
openai.api_key = api_key openai.api_key = api_key
async def generate_text_response(self, system_prompt: str, user_prompt: str, max_tokens: int) -> dict: async def generate_text_response(
self, system_prompt: str, user_prompt: str, max_tokens: int
) -> dict:
""" """
Generate a response using OpenAI's chat completion API. Generate a response using OpenAI's chat completion API.
""" """
@ -24,19 +28,19 @@ class OpenAIClient:
model="gpt-4", model="gpt-4",
messages=[ messages=[
{"role": "system", "content": system_prompt}, {"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt} {"role": "user", "content": user_prompt},
], ],
max_tokens=max_tokens max_tokens=max_tokens,
) )
content = response['choices'][0]['message']['content'] content = response["choices"][0]["message"]["content"]
# Parse the JSON string into a dictionary # Parse the JSON string into a dictionary
parsed_content = json.loads(content) parsed_content = json.loads(content)
return { return {
"response": parsed_content, # Now returns a dictionary instead of string "response": parsed_content, # Now returns a dictionary instead of string
"prompt_tokens": response['usage']['prompt_tokens'], "prompt_tokens": response["usage"]["prompt_tokens"],
"completion_tokens": response['usage']['completion_tokens'], "completion_tokens": response["usage"]["completion_tokens"],
"total_tokens": response['usage']['total_tokens'] "total_tokens": response["usage"]["total_tokens"],
} }
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise Exception(f"Failed to parse OpenAI response as JSON: {str(e)}") raise Exception(f"Failed to parse OpenAI response as JSON: {str(e)}")
@ -49,14 +53,14 @@ class OpenAIClient:
""" """
try: try:
response = openai.Embedding.create( response = openai.Embedding.create(
input=texts, input=texts, model="text-embedding-ada-002"
model="text-embedding-ada-002"
) )
embeddings = [data['embedding'] for data in response['data']] embeddings = [data["embedding"] for data in response["data"]]
return embeddings return embeddings
except Exception as e: except Exception as e:
raise Exception(f"OpenAI embedding error: {str(e)}") raise Exception(f"OpenAI embedding error: {str(e)}")
class AIFactChecker: class AIFactChecker:
def __init__(self, openai_client: OpenAIClient): def __init__(self, openai_client: OpenAIClient):
"""Initialize the fact checker with OpenAI client.""" """Initialize the fact checker with OpenAI client."""
@ -65,20 +69,36 @@ class AIFactChecker:
chunk_size=1000, chunk_size=1000,
chunk_overlap=200, chunk_overlap=200,
length_function=len, length_function=len,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
) )
async def scrape_webpage(self, url: str) -> List[Document]: async def scrape_webpage(self, url: str) -> List[Document]:
"""Scrape webpage content using LangChain's AsyncHtmlLoader.""" """Scrape webpage content without saving HTML files."""
try: try:
loader = AsyncHtmlLoader([url]) async with aiohttp.ClientSession() as session:
docs = await loader.aload() async with session.get(url) as response:
if response.status != 200:
raise Exception(
f"Failed to fetch URL: {url}, status: {response.status}"
)
bs_transformer = BeautifulSoupTransformer() html_content = await response.text()
docs_transformed = bs_transformer.transform_documents(docs)
docs_chunks = self.text_splitter.split_documents(docs_transformed)
logger.info(f"Successfully scraped webpage | chunks={len(docs_chunks)}") # Parse HTML with BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
# Create a Document with the parsed content
doc = Document(
page_content=soup.get_text(separator="\n", strip=True),
metadata={"source": url},
)
# Split into chunks
docs_chunks = self.text_splitter.split_documents([doc])
logger.info(
f"Successfully scraped webpage | chunks={len(docs_chunks)}"
)
return docs_chunks return docs_chunks
except Exception as e: except Exception as e:
@ -89,7 +109,7 @@ class AIFactChecker:
self, self,
query_embedding: List[float], query_embedding: List[float],
doc_embeddings: List[List[float]], doc_embeddings: List[List[float]],
docs: List[Document] docs: List[Document],
) -> List[Document]: ) -> List[Document]:
"""Find most relevant document chunks using cosine similarity.""" """Find most relevant document chunks using cosine similarity."""
try: try:
@ -107,7 +127,9 @@ class AIFactChecker:
logger.error(f"Error finding relevant chunks | error={str(e)}") logger.error(f"Error finding relevant chunks | error={str(e)}")
raise raise
async def verify_fact(self, query: str, relevant_docs: List[Document]) -> Dict[str, Any]: async def verify_fact(
self, query: str, relevant_docs: List[Document]
) -> Dict[str, Any]:
"""Verify fact using OpenAI's API with context from relevant documents.""" """Verify fact using OpenAI's API with context from relevant documents."""
try: try:
context = "\n\n".join([doc.page_content for doc in relevant_docs]) context = "\n\n".join([doc.page_content for doc in relevant_docs])
@ -132,12 +154,17 @@ class AIFactChecker:
Analyze the statement based on the provided context and return your response in the specified JSON format.""" Analyze the statement based on the provided context and return your response in the specified JSON format."""
response = await self.openai_client.generate_text_response( response = await self.openai_client.generate_text_response(
system_prompt=system_prompt, system_prompt=system_prompt, user_prompt=user_prompt, max_tokens=800
user_prompt=user_prompt,
max_tokens=800
) )
sources = list(set([doc.metadata.get('source', 'Unknown source') for doc in relevant_docs])) sources = list(
set(
[
doc.metadata.get("source", "Unknown source")
for doc in relevant_docs
]
)
)
return { return {
"verification_result": response["response"], # This is now a dictionary "verification_result": response["response"], # This is now a dictionary
@ -145,8 +172,8 @@ class AIFactChecker:
"token_usage": { "token_usage": {
"prompt_tokens": response["prompt_tokens"], "prompt_tokens": response["prompt_tokens"],
"completion_tokens": response["completion_tokens"], "completion_tokens": response["completion_tokens"],
"total_tokens": response["total_tokens"] "total_tokens": response["total_tokens"],
} },
} }
except Exception as e: except Exception as e:
@ -162,7 +189,9 @@ class AIFactChecker:
doc_embeddings = self.openai_client.get_embeddings(doc_texts) doc_embeddings = self.openai_client.get_embeddings(doc_texts)
query_embedding = self.openai_client.get_embeddings([query]) query_embedding = self.openai_client.get_embeddings([query])
relevant_docs = self.find_relevant_chunks(query_embedding[0], doc_embeddings, docs) relevant_docs = self.find_relevant_chunks(
query_embedding[0], doc_embeddings, docs
)
verification_result = await self.verify_fact(query, relevant_docs) verification_result = await self.verify_fact(query, relevant_docs)
return verification_result return verification_result

View file

@ -1,7 +1,12 @@
from typing import Dict, List from typing import Dict, List
import requests import requests
from fastapi import HTTPException from fastapi import HTTPException
from app.models.ai_fact_check_models import FactCheckSource, ErrorResponse, FactCheckRequest, SourceType from app.models.ai_fact_check_models import (
FactCheckSource,
ErrorResponse,
FactCheckRequest,
SourceType,
)
# Sources configuration with validation # Sources configuration with validation
SOURCES = { SOURCES = {
@ -113,7 +118,7 @@ SOURCES = {
"thejournal.ie/factcheck", "thejournal.ie/factcheck",
"journalistsresource.org", "journalistsresource.org",
"metafact.io", "metafact.io",
"reporterslab.org/fact-checking" "reporterslab.org/fact-checking",
] ]
], ],
"news_sites": [ "news_sites": [
@ -133,16 +138,14 @@ SOURCES = {
"www.risingbd.com/english", "www.risingbd.com/english",
"www.dailyindustry.news", "www.dailyindustry.news",
"www.bangladeshpost.net", "www.bangladeshpost.net",
"www.daily-bangladesh.com/english" "www.daily-bangladesh.com/english",
]
] ]
],
} }
async def fetch_fact_checks( async def fetch_fact_checks(
api_key: str, api_key: str, base_url: str, query: str, site: FactCheckSource
base_url: str,
query: str,
site: FactCheckSource
) -> Dict: ) -> Dict:
""" """
Fetch fact checks from a specific site using the Google Fact Check API Fetch fact checks from a specific site using the Google Fact Check API
@ -156,7 +159,7 @@ async def fetch_fact_checks(
"query": query, "query": query,
"languageCode": "en-US", "languageCode": "en-US",
"reviewPublisherSiteFilter": site.domain, "reviewPublisherSiteFilter": site.domain,
"pageSize": 10 "pageSize": 10,
} }
response = requests.get(base_url, params=params) response = requests.get(base_url, params=params)
@ -168,19 +171,18 @@ async def fetch_fact_checks(
detail=ErrorResponse( detail=ErrorResponse(
detail=f"Error fetching from {site.domain}: {str(e)}", detail=f"Error fetching from {site.domain}: {str(e)}",
error_code="FACT_CHECK_SERVICE_ERROR", error_code="FACT_CHECK_SERVICE_ERROR",
path="/check-facts" path="/check-facts",
).dict() ).dict(),
) )
except ValueError as e: except ValueError as e:
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=ErrorResponse( detail=ErrorResponse(
detail=str(e), detail=str(e), error_code="CONFIGURATION_ERROR", path="/check-facts"
error_code="CONFIGURATION_ERROR", ).dict(),
path="/check-facts"
).dict()
) )
def get_all_sources() -> List[FactCheckSource]: def get_all_sources() -> List[FactCheckSource]:
""" """
Get all sources sorted by priority Get all sources sorted by priority

5
docker-compose.yaml Normal file
View file

@ -0,0 +1,5 @@
services:
backend:
build: .
container_name: backend-service
restart: always

BIN
images-test.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

21
main.py
View file

@ -7,25 +7,14 @@ from app.config import FRONTEND_URL
# Initialize FastAPI app # Initialize FastAPI app
app = FastAPI( app = FastAPI(
title="Your API Title", title="Your API Title", description="Your API Description", version="1.0.0"
description="Your API Description",
version="1.0.0"
) )
# CORS configuration # CORS configuration
origins = [
FRONTEND_URL,
"http://localhost",
"http://localhost:5173",
"http://0.0.0.0",
"http://0.0.0.0:5173",
]
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=origins, allow_origins=["*"], # Only wildcard
allow_credentials=True, allow_credentials=False, # Changed to False to work with wildcard
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*"],
) )
@ -44,10 +33,6 @@ app.include_router(fact_check_router, prefix="")
app.include_router(aifact_check_router, prefix="") app.include_router(aifact_check_router, prefix="")
app.include_router(scrap_websites_router, prefix="") app.include_router(scrap_websites_router, prefix="")
# Include routers (uncomment and modify as needed)
# from routes import some_router
# app.include_router(some_router, prefix="/your-prefix", tags=["your-tag"])
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

View file

@ -1,6 +1,88 @@
certifi==2024.8.30 aiofiles==24.1.0
aiohappyeyeballs==2.4.4
aiohttp==3.11.10
aiosignal==1.3.2
annotated-types==0.7.0
anyio==4.7.0
attrs==24.3.0
beautifulsoup4==4.12.3
black==24.10.0
certifi==2024.12.14
charset-normalizer==3.4.0 charset-normalizer==3.4.0
click==8.1.7
dataclasses-json==0.6.7
dnspython==2.7.0
email_validator==2.2.0
fastapi==0.115.6
fastapi-cli==0.0.7
flake8==7.1.1
frozenlist==1.5.0
greenlet==3.1.1
gunicorn==23.0.0
h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10 idna==3.10
iniconfig==2.0.0
itsdangerous==2.2.0
Jinja2==3.1.4
jsonpatch==1.33
jsonpointer==3.0.0
langchain==0.3.12
langchain-community==0.3.12
langchain-core==0.3.25
langchain-text-splitters==0.3.3
langsmith==0.2.3
markdown-it-py==3.0.0
MarkupSafe==3.0.2
marshmallow==3.23.1
mccabe==0.7.0
mdurl==0.1.2
multidict==6.1.0
mypy-extensions==1.0.0
numpy==1.26.4
openai==0.28.0
orjson==3.10.12
packaging==24.2
pathspec==0.12.1
pillow==11.0.0
platformdirs==4.3.6
pluggy==1.5.0
propcache==0.2.1
pycodestyle==2.12.1
pydantic==2.10.3
pydantic-extra-types==2.10.1
pydantic-settings==2.7.0
pydantic_core==2.27.1
pyflakes==3.2.0
Pygments==2.18.0
pytest==8.3.4
python-dateutil==2.9.0.post0
python-dotenv==1.0.1 python-dotenv==1.0.1
python-json-logger==3.2.1
python-multipart==0.0.20
PyYAML==6.0.2
requests==2.32.3 requests==2.32.3
requests-toolbelt==1.0.0
rich==13.9.4
rich-toolkit==0.12.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
soupsieve==2.6
SQLAlchemy==2.0.36
starlette==0.41.3
tenacity==9.0.0
tqdm==4.67.1
typer==0.15.1
typing-inspect==0.9.0
typing_extensions==4.12.2
ujson==5.10.0
urllib3==2.2.3 urllib3==2.2.3
uvicorn==0.34.0
uvloop==0.21.0
watchfiles==1.0.3
websockets==14.1
yarl==1.18.3

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

18
tests/test_main.py Normal file
View file

@ -0,0 +1,18 @@
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_root_endpoint():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Welcome to your FastAPI application"}
def test_health_endpoint():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "healthy"}
def test_cors_headers():
response = client.get("/", headers={"Origin": "http://localhost:5173"})
assert response.headers["access-control-allow-origin"] == "*"