added perplexity for checking fact

This commit is contained in:
smfahim25 2025-04-09 17:02:56 +06:00
parent afe5c1d576
commit f7b2d77ce4
9 changed files with 225 additions and 44 deletions

View file

@ -1,9 +1,12 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
import httpx
import asyncio import asyncio
import logging import logging
import httpx
import json
import re
from typing import Union, Optional, Dict, Any from typing import Union, Optional, Dict, Any
from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL, OPENAI_API_KEY from datetime import datetime
from app.config import OPENAI_API_KEY,PERPLEXITY_API_KEY
from app.api.scrap_websites import search_websites, SearchRequest from app.api.scrap_websites import search_websites, SearchRequest
from app.services.openai_client import OpenAIClient, AIFactChecker from app.services.openai_client import OpenAIClient, AIFactChecker
from app.services.image_text_extractor import ImageTextExtractor from app.services.image_text_extractor import ImageTextExtractor
@ -12,12 +15,10 @@ from app.models.fact_check_models import (
FactCheckRequest, FactCheckRequest,
FactCheckResponse, FactCheckResponse,
UnverifiedFactCheckResponse, UnverifiedFactCheckResponse,
ErrorResponse,
Source, Source,
VerdictEnum, VerdictEnum,
ConfidenceEnum ConfidenceEnum
) )
from app.websites.fact_checker_website import get_all_sources
# Setup logging # Setup logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,10 +43,11 @@ async def process_url_content(url: str) -> Optional[str]:
logger.error(f"Error extracting text from URL: {str(e)}") logger.error(f"Error extracting text from URL: {str(e)}")
return None return None
# Assuming the enums and models like FactCheckResponse, VerdictEnum, etc., are already imported
async def process_fact_check(query: str) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]: async def process_fact_check(query: str) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Process a single fact check query.""" if not PERPLEXITY_API_KEY:
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL: logger.error("Perplexity API key not configured")
return UnverifiedFactCheckResponse( return UnverifiedFactCheckResponse(
claim=query, claim=query,
verdict=VerdictEnum.UNVERIFIED, verdict=VerdictEnum.UNVERIFIED,
@ -56,51 +58,229 @@ async def process_fact_check(query: str) -> Union[FactCheckResponse, UnverifiedF
additional_context="This is a temporary system configuration issue." additional_context="This is a temporary system configuration issue."
) )
headers = {"Content-Type": "application/json"} url = "https://api.perplexity.ai/chat/completions"
async with httpx.AsyncClient() as client: headers = {
fact_checker_sources = get_all_sources() "accept": "application/json",
"content-type": "application/json",
"Authorization": f"Bearer {PERPLEXITY_API_KEY}"
}
for source in fact_checker_sources: payload = {
params = { "model": "sonar",
"key": GOOGLE_API_KEY, "messages": [
"query": query, {
"languageCode": "en-US", "role": "system",
"reviewPublisherSiteFilter": source.domain, "content": (
"pageSize": 10, "You are a precise fact checker. Analyze the following claim and determine if it's true, false, or partially true. "
"Provide a clear verdict, confidence level (HIGH, MEDIUM, LOW), and cite reliable sources. "
"Format your response as JSON with fields: verdict, confidence, sources (array of URLs), "
"evidence (key facts as a string), and explanation (detailed reasoning as a string)."
)
},
{
"role": "user",
"content": f"Fact check this claim: {query}"
}
]
} }
try: try:
response = await client.get( async with httpx.AsyncClient(timeout=30) as client:
GOOGLE_FACT_CHECK_BASE_URL, params=params, headers=headers response = await client.post(url, headers=headers, json=payload)
)
response.raise_for_status() response.raise_for_status()
json_response = response.json() result = response.json()
perplexity_response = result["choices"][0]["message"]["content"]
if json_response.get("claims"):
return await generate_fact_report(query, json_response)
except Exception as e:
logger.error(f"Error with source {source.domain}: {str(e)}")
continue
# Attempt to extract JSON
try: try:
search_request = SearchRequest( parsed_data = json.loads(perplexity_response)
search_text=query, except json.JSONDecodeError:
source_types=["fact_checkers"] match = re.search(r'\{.*\}', perplexity_response, re.DOTALL)
if match:
parsed_data = json.loads(match.group(0))
else:
parsed_data = extract_fact_check_info(perplexity_response)
verdict_mapping = {
"true": VerdictEnum.TRUE,
"false": VerdictEnum.FALSE,
"partially true": VerdictEnum.PARTIALLY_TRUE,
"partially false": VerdictEnum.PARTIALLY_TRUE,
"unverified": VerdictEnum.UNVERIFIED
}
confidence_mapping = {
"high": ConfidenceEnum.HIGH,
"medium": ConfidenceEnum.MEDIUM,
"low": ConfidenceEnum.LOW
}
raw_verdict = parsed_data.get("verdict", "").lower()
verdict = verdict_mapping.get(raw_verdict, VerdictEnum.UNVERIFIED)
raw_confidence = parsed_data.get("confidence", "").lower()
confidence = confidence_mapping.get(raw_confidence, ConfidenceEnum.MEDIUM)
sources = [
Source(
url=url,
domain=extract_domain(url),
title=f"Source from {extract_domain(url)}",
publisher=extract_domain(url),
date_published=None,
snippet="Source cited by Perplexity AI"
)
for url in parsed_data.get("sources", [])
]
# Convert evidence to string if it's not already
evidence = parsed_data.get("evidence", "")
if isinstance(evidence, dict):
# Convert dictionary evidence to string format
evidence_str = ""
for key, value in evidence.items():
evidence_str += f"{key}: {value}\n"
evidence = evidence_str.strip()
# Convert explanation to string if it's not already
explanation = parsed_data.get("explanation", "")
if isinstance(explanation, dict):
explanation_str = ""
for key, value in explanation.items():
explanation_str += f"{key}: {value}\n"
explanation = explanation_str.strip()
return FactCheckResponse(
claim=query,
verdict=verdict,
confidence=confidence,
sources=sources,
evidence=evidence,
explanation=explanation,
additional_context=f"Fact checked using PlanPost AI on {datetime.now().strftime('%Y-%m-%d')}"
) )
ai_response = await search_websites(search_request)
return await generate_fact_report(query, ai_response)
except Exception as e: except Exception as e:
logger.error(f"Error in AI fact check: {str(e)}") logger.error(f"Fact check error: {str(e)}")
return await generate_fact_report(query, { return UnverifiedFactCheckResponse(
"status": "no_results", claim=query,
"verification_result": { verdict=VerdictEnum.UNVERIFIED,
"no_sources_found": True, confidence=ConfidenceEnum.LOW,
"reason": str(e) sources=[],
evidence=str(e),
explanation="Failed to contact Perplexity AI or parse its response.",
additional_context="Possible API issue or malformed response."
)
def extract_domain(url: str) -> str:
"""Extract domain from URL.
Args:
url: The URL to extract domain from
Returns:
The domain name or "unknown" if parsing fails
"""
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
domain = parsed_url.netloc
return domain if domain else "unknown"
except Exception as e:
logger.warning(f"Failed to extract domain from URL {url}: {str(e)}")
return "unknown"
def extract_fact_check_info(text_response: str) -> Dict[str, Any]:
"""Extract fact-checking information from a text response when JSON parsing fails.
Args:
text_response: The text response from Perplexity AI
Returns:
A dictionary with fact-checking information extracted from the text
"""
import re
result = {
"verdict": "unverified",
"confidence": "medium",
"sources": [],
"evidence": "",
"explanation": ""
} }
})
# Try to extract verdict with more comprehensive pattern matching
verdict_patterns = [
r'verdict[:\s]+(true|false|partially true|partially false|inconclusive|unverified)',
r'(true|false|partially true|partially false|inconclusive|unverified)[:\s]+verdict',
r'claim is (true|false|partially true|partially false|inconclusive|unverified)',
r'statement is (true|false|partially true|partially false|inconclusive|unverified)'
]
for pattern in verdict_patterns:
verdict_match = re.search(pattern, text_response.lower(), re.IGNORECASE)
if verdict_match:
result["verdict"] = verdict_match.group(1)
break
# Try to extract confidence with multiple patterns
confidence_patterns = [
r'confidence[:\s]+(high|medium|low)',
r'(high|medium|low)[:\s]+confidence',
r'confidence level[:\s]+(high|medium|low)',
r'(high|medium|low)[:\s]+confidence level'
]
for pattern in confidence_patterns:
confidence_match = re.search(pattern, text_response.lower(), re.IGNORECASE)
if confidence_match:
result["confidence"] = confidence_match.group(1)
break
# Try to extract URLs as sources - more robust pattern
urls = re.findall(r'https?://[^\s"\'\]\)]+', text_response)
# Filter out any malformed URLs
valid_urls = []
for url in urls:
if '.' in url and len(url) > 10: # Basic validation
valid_urls.append(url)
result["sources"] = valid_urls
# Try to extract evidence and explanation with multiple patterns
evidence_patterns = [
r'evidence[:\s]+(.*?)(?=explanation|\Z)',
r'key facts[:\s]+(.*?)(?=explanation|\Z)',
r'facts[:\s]+(.*?)(?=explanation|\Z)'
]
for pattern in evidence_patterns:
evidence_match = re.search(pattern, text_response, re.IGNORECASE | re.DOTALL)
if evidence_match:
result["evidence"] = evidence_match.group(1).strip()
break
explanation_patterns = [
r'explanation[:\s]+(.*?)(?=\Z)',
r'reasoning[:\s]+(.*?)(?=\Z)',
r'analysis[:\s]+(.*?)(?=\Z)'
]
for pattern in explanation_patterns:
explanation_match = re.search(pattern, text_response, re.IGNORECASE | re.DOTALL)
if explanation_match:
result["explanation"] = explanation_match.group(1).strip()
break
# If no structured information found, use the whole response as evidence
if not result["evidence"] and not result["explanation"]:
result["evidence"] = text_response
# Generate a minimal explanation if none was found
result["explanation"] = "The fact-checking service provided information about this claim but did not structure it in the expected format. The full response has been included as evidence for you to review."
return result
async def generate_fact_report(query: str, fact_check_data: dict | AIFactCheckResponse) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]: async def generate_fact_report(query: str, fact_check_data: dict | AIFactCheckResponse) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:

View file

@ -7,6 +7,7 @@ GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
GOOGLE_FACT_CHECK_BASE_URL = os.environ["GOOGLE_FACT_CHECK_BASE_URL"] GOOGLE_FACT_CHECK_BASE_URL = os.environ["GOOGLE_FACT_CHECK_BASE_URL"]
GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"] GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"]
GOOGLE_SEARCH_URL = os.environ["GOOGLE_SEARCH_URL"] GOOGLE_SEARCH_URL = os.environ["GOOGLE_SEARCH_URL"]
PERPLEXITY_API_KEY= os.environ["PERPLEXITY_API_KEY"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
FRONTEND_URL = os.environ["FRONTEND_URL"] FRONTEND_URL = os.environ["FRONTEND_URL"]

View file

@ -43,7 +43,7 @@ mdurl==0.1.2
multidict==6.1.0 multidict==6.1.0
mypy-extensions==1.0.0 mypy-extensions==1.0.0
numpy==1.26.4 numpy==1.26.4
openai==0.28.0 openai==1.23.6
orjson==3.10.12 orjson==3.10.12
packaging==24.2 packaging==24.2
pathspec==0.12.1 pathspec==0.12.1