Merge branch 'dev-local' into 'dev'

Dev local

See merge request planpostai/fact-checker-backend!3
This commit is contained in:
Utsho Dey 2024-12-19 11:41:59 +00:00
commit afe5c1d576
7 changed files with 377 additions and 73 deletions

View file

@ -1,9 +1,13 @@
from fastapi import APIRouter, HTTPException
import httpx
from typing import Union
import asyncio
import logging
from typing import Union, Optional, Dict, Any
from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL, OPENAI_API_KEY
from app.api.scrap_websites import search_websites, SearchRequest
from app.services.openai_client import OpenAIClient
from app.services.openai_client import OpenAIClient, AIFactChecker
from app.services.image_text_extractor import ImageTextExtractor
from app.models.ai_fact_check_models import AIFactCheckResponse
from app.models.fact_check_models import (
FactCheckRequest,
FactCheckResponse,
@ -15,11 +19,91 @@ from app.models.fact_check_models import (
)
from app.websites.fact_checker_website import get_all_sources
# Setup logging
logger = logging.getLogger(__name__)
fact_check_router = APIRouter()
openai_client = OpenAIClient(OPENAI_API_KEY)
ai_fact_checker = AIFactChecker(openai_client)
image_text_extractor = ImageTextExtractor(OPENAI_API_KEY)
async def generate_fact_report(query: str, fact_check_data: dict) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
async def process_url_content(url: str) -> Optional[str]:
"""Extract text content from the provided URL."""
try:
# Add await here
text = await image_text_extractor.extract_text(url, is_url=True)
if text:
logger.info(f"Successfully extracted text from URL: {text}")
else:
logger.warning(f"No text could be extracted from URL: {url}")
return text
except Exception as e:
logger.error(f"Error extracting text from URL: {str(e)}")
return None
async def process_fact_check(query: str) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Process a single fact check query."""
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL:
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="The fact-checking service is not properly configured.",
explanation="The system is missing required API configuration for fact-checking services.",
additional_context="This is a temporary system configuration issue."
)
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
fact_checker_sources = get_all_sources()
for source in fact_checker_sources:
params = {
"key": GOOGLE_API_KEY,
"query": query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain,
"pageSize": 10,
}
try:
response = await client.get(
GOOGLE_FACT_CHECK_BASE_URL, params=params, headers=headers
)
response.raise_for_status()
json_response = response.json()
if json_response.get("claims"):
return await generate_fact_report(query, json_response)
except Exception as e:
logger.error(f"Error with source {source.domain}: {str(e)}")
continue
try:
search_request = SearchRequest(
search_text=query,
source_types=["fact_checkers"]
)
ai_response = await search_websites(search_request)
return await generate_fact_report(query, ai_response)
except Exception as e:
logger.error(f"Error in AI fact check: {str(e)}")
return await generate_fact_report(query, {
"status": "no_results",
"verification_result": {
"no_sources_found": True,
"reason": str(e)
}
})
async def generate_fact_report(query: str, fact_check_data: dict | AIFactCheckResponse) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Generate a fact check report using OpenAI based on the fact check results."""
try:
base_system_prompt = """You are a professional fact-checking reporter. Your task is to create a detailed fact check report based on the provided data. Focus on accuracy, clarity, and proper citation of sources.
@ -31,12 +115,23 @@ Rules:
4. Maintain objectivity in the report
5. If no reliable sources are found, provide a clear explanation why"""
# Handle both dictionary and AIFactCheckResponse
if hasattr(fact_check_data, 'verification_result'):
# It's an AIFactCheckResponse
has_sources = bool(fact_check_data.sources)
verification_result = fact_check_data.verification_result
fact_check_data_dict = fact_check_data.dict()
else:
# It's a dictionary
has_sources = bool(fact_check_data.get("claims") or fact_check_data.get("urls_found"))
verification_result = fact_check_data.get("verification_result", {})
fact_check_data_dict = fact_check_data
# If no sources were found, return an unverified response
if not fact_check_data.get("claims") and (
not fact_check_data.get("urls_found") or
fact_check_data.get("status") == "no_results" or
fact_check_data.get("verification_result", {}).get("no_sources_found")
):
if not has_sources or (
isinstance(fact_check_data, dict) and
fact_check_data.get("status") == "no_results"
) or (verification_result and verification_result.get("no_sources_found")):
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
@ -63,10 +158,10 @@ Rules:
"additional_context": "Important context about the verification process, limitations, or broader implications (1-2 sentences)"
}"""
if "claims" in fact_check_data:
if isinstance(fact_check_data, dict) and "claims" in fact_check_data:
system_prompt = base_system_prompt
user_prompt = f"""Query: {query}
Fact Check Results: {fact_check_data}
Fact Check Results: {fact_check_data_dict}
{base_user_prompt}
@ -75,11 +170,10 @@ Rules:
2. Specify verification dates when available
3. Name the fact-checking organizations involved
4. Describe the verification process"""
else:
system_prompt = base_system_prompt
user_prompt = f"""Query: {query}
Fact Check Results: {fact_check_data}
Fact Check Results: {fact_check_data_dict}
{base_user_prompt}
@ -116,7 +210,7 @@ Rules:
return FactCheckResponse(**response_data)
except Exception as validation_error:
print(f"Response validation error: {str(validation_error)}")
logger.error(f"Response validation error: {str(validation_error)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
@ -128,7 +222,7 @@ Rules:
)
except Exception as e:
print(f"Error generating fact report: {str(e)}")
logger.error(f"Error generating fact report: {str(e)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
@ -138,69 +232,138 @@ Rules:
explanation="The system encountered an unexpected error while processing the fact check request.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
)
async def combine_fact_reports(query: str, url_text: str, query_result: Dict[str, Any], url_result: Dict[str, Any]) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Combine fact check results from query and URL into a single comprehensive report."""
try:
system_prompt = """You are a professional fact-checking reporter. Your task is to create a comprehensive fact check report by combining and analyzing multiple fact-checking results. Focus on accuracy, clarity, and proper citation of all sources.
Rules:
1. Include all source URLs and names from both result sets
2. Compare and contrast findings from different sources
3. Include dates when available
4. Note any discrepancies between sources
5. Provide a balanced, objective analysis"""
user_prompt = f"""Original Query: {query}
Extracted Text from URL: {url_text}
First Fact Check Result: {query_result}
Second Fact Check Result: {url_result}
Generate a comprehensive fact check report in this exact JSON format:
{{
"claim": "Write the exact claim being verified",
"verdict": "One of: True/False/Partially True/Unverified",
"confidence": "One of: High/Medium/Low",
"sources": [
{{
"url": "Full URL of the source",
"name": "Name of the source organization"
}}
],
"evidence": "A concise summary of the key evidence from both sources (2-3 sentences)",
"explanation": "A detailed explanation combining findings from both fact checks (3-4 sentences)",
"additional_context": "Important context about differences or similarities in findings (1-2 sentences)"
}}
The report should:
1. Combine sources from both fact checks
2. Compare findings from both analyses
3. Note any differences in conclusions
4. Provide a unified verdict based on all available information"""
response = await openai_client.generate_text_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
max_tokens=1000
)
response_data = response["response"]
# Clean up sources from both results
if isinstance(response_data.get("sources"), list):
cleaned_sources = []
for source in response_data["sources"]:
if isinstance(source, str):
url = source if source.startswith("http") else f"https://{source}"
cleaned_sources.append({"url": url, "name": source})
elif isinstance(source, dict):
url = source.get("url", "")
if url and not url.startswith("http"):
source["url"] = f"https://{url}"
cleaned_sources.append(source)
response_data["sources"] = cleaned_sources
if response_data["verdict"] == "Unverified" or not response_data.get("sources"):
return UnverifiedFactCheckResponse(**response_data)
return FactCheckResponse(**response_data)
except Exception as e:
logger.error(f"Error combining fact reports: {str(e)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="An error occurred while combining fact check reports.",
explanation="The system encountered an error while trying to combine results from multiple sources.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
)
@fact_check_router.post("/check-facts", response_model=Union[FactCheckResponse, UnverifiedFactCheckResponse])
async def check_facts(request: FactCheckRequest):
"""
Fetch fact check results and generate a comprehensive report.
Handles both query-based and URL-based fact checking.
"""
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL:
return UnverifiedFactCheckResponse(
claim=request.query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="The fact-checking service is not properly configured.",
explanation="The system is missing required API configuration for fact-checking services.",
additional_context="This is a temporary system configuration issue."
)
url_text = None
query_result = None
url_result = None
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
fact_checker_sources = get_all_sources()
for source in fact_checker_sources:
params = {
"key": GOOGLE_API_KEY,
"query": request.query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain,
"pageSize": 10,
}
try:
response = await client.get(
GOOGLE_FACT_CHECK_BASE_URL, params=params, headers=headers
)
response.raise_for_status()
json_response = response.json()
if json_response.get("claims"):
return await generate_fact_report(request.query, json_response)
except httpx.RequestError as e:
print(f"Error fetching results for site {source.domain}: {str(e)}")
continue
except Exception as e:
print(f"Unexpected error for site {source.domain}: {str(e)}")
continue
try:
search_request = SearchRequest(
search_text=request.query,
source_types=["fact_checkers"]
# If URL is provided, try to extract text
if request.url:
url_text = await process_url_content(request.url)
if not url_text and not request.query:
# Only return early if URL text extraction failed and no query provided
return UnverifiedFactCheckResponse(
claim=f"URL check requested: {request.url}",
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="Unable to extract text from the provided URL.",
explanation="The system could not process the content from the provided URL. The URL might be invalid or inaccessible.",
additional_context="Please provide a valid URL or a text query for fact-checking."
)
# If URL text was successfully extracted, process it
if url_text:
logger.info(f"Processing fact check for extracted text: {url_text}")
url_result = await process_fact_check(url_text)
ai_response = await search_websites(search_request)
return await generate_fact_report(request.query, ai_response)
# Process query if provided
if request.query:
query_result = await process_fact_check(request.query)
except Exception as e:
print(f"Error in AI fact check: {str(e)}")
return await generate_fact_report(request.query, {
"status": "no_results",
"verification_result": {
"no_sources_found": True,
"reason": str(e)
}
})
# If both results are available, combine them
if query_result and url_result and url_text:
return await combine_fact_reports(request.query, url_text,
query_result.dict(), url_result.dict())
# If only one result is available
if query_result:
return query_result
if url_result:
return url_result
# If no valid results
return UnverifiedFactCheckResponse(
claim=request.query or f"URL: {request.url}",
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="Failed to process fact-checking request.",
explanation="The system encountered errors while processing the fact checks.",
additional_context="Please try again with different input or contact support if the issue persists."
)

View file

@ -1,5 +1,5 @@
from pydantic import BaseModel, Field, HttpUrl, validator
from typing import List, Literal, Union
from pydantic import BaseModel, Field, HttpUrl, validator, root_validator
from typing import List, Literal, Union, Optional
from datetime import datetime
from enum import Enum
@ -18,13 +18,34 @@ class ConfidenceEnum(str, Enum):
class FactCheckRequest(BaseModel):
query: str = Field(
...,
query: Optional[str] = Field(
None,
min_length=3,
max_length=500,
description="The claim or statement to be fact-checked",
example="Did NASA confirm finding alien structures on Mars in 2024?",
)
url: Optional[str] = Field(
None,
description="URL to be fact-checked",
example="https://example.com/article",
)
@root_validator(pre=True)
def validate_at_least_one(cls, values):
"""Validate that at least one of query or url is provided."""
query = values.get('query')
url = values.get('url')
if not query and not url:
raise ValueError("At least one of 'query' or 'url' must be provided")
return values
@validator('url')
def validate_url(cls, v):
"""Validate URL format if provided."""
if v is not None and len(v) < 3:
raise ValueError("URL must be at least 3 characters")
return v
class Source(BaseModel):

View file

@ -0,0 +1,119 @@
import base64
import requests
import os
from io import BytesIO
from typing import Tuple, Optional
import logging
import aiohttp
logger = logging.getLogger(__name__)
class ImageTextExtractor:
def __init__(self, api_key: str):
"""Initialize ImageTextExtractor with OpenAI API key."""
self.api_key = api_key
self.api_url = "https://api.openai.com/v1/chat/completions"
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
def encode_image(self, image_path: str) -> str:
"""Encode a local image into base64."""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except Exception as e:
logger.error(f"Error encoding image: {str(e)}")
raise Exception(f"Error encoding image: {e}")
async def fetch_image_from_url(self, image_url: str) -> Tuple[str, str]:
"""Fetch an image from a URL and encode it as base64."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
if response.status != 200:
raise Exception(f"Failed to fetch image: Status {response.status}")
content_type = response.headers.get('Content-Type', '')
if "text/html" in content_type:
raise ValueError("The URL points to a webpage, not an image")
if "image" not in content_type:
raise ValueError("The URL does not point to a valid image")
image_data = await response.read()
image_format = "jpeg" if "jpeg" in content_type or "jpg" in content_type else "png"
base64_image = base64.b64encode(image_data).decode('utf-8')
return base64_image, image_format
except aiohttp.ClientError as e:
logger.error(f"Error fetching image from URL: {str(e)}")
raise Exception(f"Error fetching image from URL: {e}")
except ValueError as e:
raise
except Exception as e:
logger.error(f"Unexpected error processing image URL: {str(e)}")
raise Exception(f"Unexpected error processing image: {e}")
async def extract_text(self, image_input: str, is_url: bool = False) -> Optional[str]:
"""Extract text from an image, either from a local path or URL."""
try:
if is_url:
try:
base64_image, image_format = await self.fetch_image_from_url(image_input)
except ValueError as e:
if "webpage" in str(e):
return None
raise
else:
if not os.path.exists(image_input):
raise FileNotFoundError(f"Image file not found: {image_input}")
base64_image = self.encode_image(image_input)
image_format = "jpeg" if image_input.endswith(".jpg") else "png"
payload = {
"model": "gpt-4-turbo-2024-04-09", # Updated model name
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Extract and return only the key text from this image in the original language. Do not provide translations or explanations."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/{image_format};base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
async with aiohttp.ClientSession() as session:
async with session.post(self.api_url, headers=self.headers, json=payload) as response:
if response.status != 200:
error_content = await response.text()
logger.error(f"API request failed: Status {response.status}, Response: {error_content}")
raise Exception(f"API request failed with status {response.status}")
result = await response.json()
logger.debug(f"GPT-4 API Response: {result}")
if 'choices' in result and len(result['choices']) > 0:
extracted_text = result['choices'][0]['message']['content'].strip()
if extracted_text:
return extracted_text
return None
except (aiohttp.ClientError, ValueError, FileNotFoundError) as e:
logger.error(f"Error in text extraction: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected error in text extraction: {str(e)}")
return None
return None

View file

@ -47,6 +47,7 @@ openai==0.28.0
orjson==3.10.12
packaging==24.2
pathspec==0.12.1
pillow==11.0.0
platformdirs==4.3.6
pluggy==1.5.0
propcache==0.2.1