Compare commits

...
Sign in to create a new pull request.

14 commits

Author SHA1 Message Date
Utsho Dey
c8735de51e fact check from image is functional 2024-12-19 16:49:17 +06:00
Utsho Dey
7c4dd378cd fact check from image is functional 2024-12-19 16:47:18 +06:00
Utsho Dey
9298352f2e fact check from image is functional 2024-12-19 16:37:57 +06:00
Utsho Dey
a1a699f9b3 dockerfile added 2024-12-18 17:39:00 +06:00
Utsho Dey
56335cbfa7 fixed pipeline error 2024-12-18 13:16:48 +06:00
Utsho Dey
15a0061a0d fixed response 2024-12-18 13:10:03 +06:00
Utsho Dey
9be0343695 added curl command 2024-12-17 18:51:05 +06:00
Utsho Dey
f32745326b added .env 2024-12-17 18:46:38 +06:00
Utsho Dey
b79c746e15 added .env 2024-12-17 18:43:42 +06:00
Utsho Dey
019e07e1b9 added cicd modified 2024-12-17 18:34:04 +06:00
Utsho Dey
9c15f7a59c added cicd 2024-12-17 18:27:37 +06:00
Utsho Dey
954c01432b added cicd 2024-12-17 18:24:39 +06:00
Utsho Dey
49c9c9c92d added cicd 2024-12-17 18:23:13 +06:00
Utsho Dey
d59f5c884e content fact checked is functional 2024-12-17 18:05:50 +06:00
26 changed files with 1121 additions and 596 deletions

4
.flake8 Normal file
View file

@ -0,0 +1,4 @@
[flake8]
max-line-length = 100
exclude = .git,__pycache__,dist,*.egg-info,venv
extend-ignore = E203

42
.gitignore vendored
View file

@ -1,4 +1,42 @@
env
# Environment
env/
.env
venv/
ENV/
# Python
__pycache__/
*.py[cod]
*$py.class
.Python
*.so
.pytest_cache/
.coverage
.coverage.*
coverage.xml
*.cover
htmlcov/
# IDEs and editors
.idea/
.vscode/
*.swp
*.swo
*~
# Project specific
test.py
__pycache__
*.log
.pip-cache/
# Temporary files
*.tmp
.DS_Store
# Distribution / packaging
dist/
build/
*.egg-info/
# Docker
.docker/

52
.gitlab-ci.yml Normal file
View file

@ -0,0 +1,52 @@
image: python:3.10-slim
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.pip-cache"
PYTHONPATH: "$CI_PROJECT_DIR"
cache:
paths:
- .pip-cache
- venv/
stages:
- setup
- test
before_script:
- apt-get update
- apt-get install -y curl
- python --version
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
setup:
stage: setup
script:
- pip install --no-cache-dir -r requirements.txt
artifacts:
paths:
- venv/
expire_in: 1 hour
test:
stage: test
needs:
- setup
script:
# Run all tests
- pytest tests/ -v
# Start FastAPI server
- uvicorn main:app --host 0.0.0.0 --port 8000 &
# Wait for server to start
- sleep 15
# Test health endpoint
- |
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/health)
if [ "$RESPONSE" = "200" ]; then
echo "✅ Health check passed"
else
echo "❌ Health check failed with status $RESPONSE"
exit 1
fi

View file

@ -0,0 +1,8 @@
FROM python:3.12
COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
ENTRYPOINT ["gunicorn", "main:app", "--workers", "4", "--timeout", "90", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

View file

@ -6,7 +6,7 @@ from app.models.ai_fact_check_models import (
AIFactCheckResponse,
VerificationResult,
TokenUsage,
ErrorResponse
ErrorResponse,
)
from urllib.parse import urlparse
import asyncio
@ -16,13 +16,11 @@ aifact_check_router = APIRouter()
openai_client = OpenAIClient(api_key=OPENAI_API_KEY)
fact_checker = AIFactChecker(openai_client=openai_client)
@aifact_check_router.post(
"/aicheck-facts",
response_model=AIFactCheckResponse,
responses={
400: {"model": ErrorResponse},
500: {"model": ErrorResponse}
}
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}},
)
async def ai_fact_check(request: AIFactCheckRequest):
"""
@ -57,7 +55,7 @@ async def ai_fact_check(request: AIFactCheckRequest):
confidence="Low",
evidence=f"Error checking URL: {str(result)}",
reasoning="URL processing failed",
missing_info="Could not access or process the URL"
missing_info="Could not access or process the URL",
)
continue
@ -66,7 +64,7 @@ async def ai_fact_check(request: AIFactCheckRequest):
confidence=result["verification_result"]["confidence"],
evidence=result["verification_result"]["evidence"],
reasoning=result["verification_result"]["reasoning"],
missing_info=result["verification_result"].get("missing_info", None)
missing_info=result["verification_result"].get("missing_info", None),
)
results[url] = verification_result
@ -80,24 +78,22 @@ async def ai_fact_check(request: AIFactCheckRequest):
token_usage = TokenUsage(
prompt_tokens=total_prompt_tokens,
completion_tokens=total_completion_tokens,
total_tokens=total_tokens
total_tokens=total_tokens,
)
return AIFactCheckResponse(
query=request.content,
verification_result=results,
sources=list(all_sources),
token_usage=token_usage
token_usage=token_usage,
)
except ValueError as e:
raise HTTPException(
status_code=400,
detail=ErrorResponse(
detail=str(e),
error_code="INVALID_URL",
path="/aicheck-facts"
).dict()
detail=str(e), error_code="INVALID_URL", path="/aicheck-facts"
).dict(),
)
except Exception as e:
raise HTTPException(
@ -105,6 +101,6 @@ async def ai_fact_check(request: AIFactCheckRequest):
detail=ErrorResponse(
detail=f"Error processing fact-check request: {str(e)}",
error_code="PROCESSING_ERROR",
path="/aicheck-facts"
).dict()
path="/aicheck-facts",
).dict(),
)

View file

@ -1,20 +1,109 @@
from fastapi import APIRouter, HTTPException
import httpx
import asyncio
import logging
from typing import Union, Optional, Dict, Any
from app.config import GOOGLE_API_KEY, GOOGLE_FACT_CHECK_BASE_URL, OPENAI_API_KEY
from app.api.scrap_websites import search_websites, SearchRequest
from app.services.openai_client import OpenAIClient
from app.services.openai_client import OpenAIClient, AIFactChecker
from app.services.image_text_extractor import ImageTextExtractor
from app.models.ai_fact_check_models import AIFactCheckResponse
from app.models.fact_check_models import (
FactCheckRequest,
FactCheckResponse,
UnverifiedFactCheckResponse,
ErrorResponse,
Source
Source,
VerdictEnum,
ConfidenceEnum
)
from app.websites.fact_checker_website import get_all_sources
# Setup logging
logger = logging.getLogger(__name__)
fact_check_router = APIRouter()
openai_client = OpenAIClient(OPENAI_API_KEY)
ai_fact_checker = AIFactChecker(openai_client)
image_text_extractor = ImageTextExtractor(OPENAI_API_KEY)
async def generate_fact_report(query: str, fact_check_data: dict) -> FactCheckResponse:
async def process_url_content(url: str) -> Optional[str]:
"""Extract text content from the provided URL."""
try:
# Add await here
text = await image_text_extractor.extract_text(url, is_url=True)
if text:
logger.info(f"Successfully extracted text from URL: {text}")
else:
logger.warning(f"No text could be extracted from URL: {url}")
return text
except Exception as e:
logger.error(f"Error extracting text from URL: {str(e)}")
return None
async def process_fact_check(query: str) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Process a single fact check query."""
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL:
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="The fact-checking service is not properly configured.",
explanation="The system is missing required API configuration for fact-checking services.",
additional_context="This is a temporary system configuration issue."
)
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
fact_checker_sources = get_all_sources()
for source in fact_checker_sources:
params = {
"key": GOOGLE_API_KEY,
"query": query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain,
"pageSize": 10,
}
try:
response = await client.get(
GOOGLE_FACT_CHECK_BASE_URL, params=params, headers=headers
)
response.raise_for_status()
json_response = response.json()
if json_response.get("claims"):
return await generate_fact_report(query, json_response)
except Exception as e:
logger.error(f"Error with source {source.domain}: {str(e)}")
continue
try:
search_request = SearchRequest(
search_text=query,
source_types=["fact_checkers"]
)
ai_response = await search_websites(search_request)
return await generate_fact_report(query, ai_response)
except Exception as e:
logger.error(f"Error in AI fact check: {str(e)}")
return await generate_fact_report(query, {
"status": "no_results",
"verification_result": {
"no_sources_found": True,
"reason": str(e)
}
})
async def generate_fact_report(query: str, fact_check_data: dict | AIFactCheckResponse) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Generate a fact check report using OpenAI based on the fact check results."""
try:
base_system_prompt = """You are a professional fact-checking reporter. Your task is to create a detailed fact check report based on the provided data. Focus on accuracy, clarity, and proper citation of sources.
@ -23,7 +112,35 @@ Rules:
1. Include all source URLs and names in the sources list
2. Keep the explanation focused on verifiable facts
3. Include dates when available
4. Maintain objectivity in the report"""
4. Maintain objectivity in the report
5. If no reliable sources are found, provide a clear explanation why"""
# Handle both dictionary and AIFactCheckResponse
if hasattr(fact_check_data, 'verification_result'):
# It's an AIFactCheckResponse
has_sources = bool(fact_check_data.sources)
verification_result = fact_check_data.verification_result
fact_check_data_dict = fact_check_data.dict()
else:
# It's a dictionary
has_sources = bool(fact_check_data.get("claims") or fact_check_data.get("urls_found"))
verification_result = fact_check_data.get("verification_result", {})
fact_check_data_dict = fact_check_data
# If no sources were found, return an unverified response
if not has_sources or (
isinstance(fact_check_data, dict) and
fact_check_data.get("status") == "no_results"
) or (verification_result and verification_result.get("no_sources_found")):
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="No fact-checking sources have verified this claim yet.",
explanation="Our search across reputable fact-checking websites did not find any formal verification of this claim. This doesn't mean the claim is false - just that it hasn't been formally fact-checked yet.",
additional_context="The claim may be too recent for fact-checkers to have investigated, or it may not have been widely circulated enough to warrant formal fact-checking."
)
base_user_prompt = """Generate a comprehensive fact check report in this exact JSON format:
{
@ -39,14 +156,12 @@ Rules:
"evidence": "A concise summary of the key evidence (1-2 sentences)",
"explanation": "A detailed explanation including who verified it, when it was verified, and the key findings (2-3 sentences)",
"additional_context": "Important context about the verification process, limitations, or broader implications (1-2 sentences)"
}
}"""
Ensure all URLs in sources are complete (including https:// if missing) and each source has both a URL and name."""
if "claims" in fact_check_data:
if isinstance(fact_check_data, dict) and "claims" in fact_check_data:
system_prompt = base_system_prompt
user_prompt = f"""Query: {query}
Fact Check Results: {fact_check_data}
Fact Check Results: {fact_check_data_dict}
{base_user_prompt}
@ -55,11 +170,10 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
2. Specify verification dates when available
3. Name the fact-checking organizations involved
4. Describe the verification process"""
else:
system_prompt = base_system_prompt
user_prompt = f"""Query: {query}
Fact Check Results: {fact_check_data}
Fact Check Results: {fact_check_data_dict}
{base_user_prompt}
@ -76,117 +190,180 @@ Ensure all URLs in sources are complete (including https:// if missing) and each
)
try:
# First try to parse the response directly
response_data = response["response"]
# Clean up sources before validation
if isinstance(response_data.get('sources'), list):
if isinstance(response_data.get("sources"), list):
cleaned_sources = []
for source in response_data['sources']:
for source in response_data["sources"]:
if isinstance(source, str):
# Convert string sources to Source objects
url = source if source.startswith('http') else f"https://{source}"
cleaned_sources.append({
"url": url,
"name": source
})
url = source if source.startswith("http") else f"https://{source}"
cleaned_sources.append({"url": url, "name": source})
elif isinstance(source, dict):
# Ensure URL has proper scheme
url = source.get('url', '')
if url and not url.startswith('http'):
source['url'] = f"https://{url}"
url = source.get("url", "")
if url and not url.startswith("http"):
source["url"] = f"https://{url}"
cleaned_sources.append(source)
response_data['sources'] = cleaned_sources
response_data["sources"] = cleaned_sources
fact_check_response = FactCheckResponse(**response_data)
return fact_check_response
if response_data["verdict"] == "Unverified" or not response_data.get("sources"):
return UnverifiedFactCheckResponse(**response_data)
return FactCheckResponse(**response_data)
except Exception as validation_error:
print(f"Response validation error: {str(validation_error)}")
raise HTTPException(
status_code=422,
detail=ErrorResponse(
detail=f"Invalid response format: {str(validation_error)}",
error_code="VALIDATION_ERROR",
path="/check-facts"
).dict()
logger.error(f"Response validation error: {str(validation_error)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="An error occurred while processing the fact check results.",
explanation="The system encountered an error while validating the fact check results.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
)
except Exception as e:
print(f"Error generating fact report: {str(e)}")
raise HTTPException(
status_code=500,
detail=ErrorResponse(
detail="Error generating fact report",
error_code="FACT_CHECK_ERROR",
path="/check-facts"
).dict()
logger.error(f"Error generating fact report: {str(e)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="An error occurred while generating the fact check report.",
explanation="The system encountered an unexpected error while processing the fact check request.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
)
@fact_check_router.post("/check-facts", response_model=FactCheckResponse)
async def combine_fact_reports(query: str, url_text: str, query_result: Dict[str, Any], url_result: Dict[str, Any]) -> Union[FactCheckResponse, UnverifiedFactCheckResponse]:
"""Combine fact check results from query and URL into a single comprehensive report."""
try:
system_prompt = """You are a professional fact-checking reporter. Your task is to create a comprehensive fact check report by combining and analyzing multiple fact-checking results. Focus on accuracy, clarity, and proper citation of all sources.
Rules:
1. Include all source URLs and names from both result sets
2. Compare and contrast findings from different sources
3. Include dates when available
4. Note any discrepancies between sources
5. Provide a balanced, objective analysis"""
user_prompt = f"""Original Query: {query}
Extracted Text from URL: {url_text}
First Fact Check Result: {query_result}
Second Fact Check Result: {url_result}
Generate a comprehensive fact check report in this exact JSON format:
{{
"claim": "Write the exact claim being verified",
"verdict": "One of: True/False/Partially True/Unverified",
"confidence": "One of: High/Medium/Low",
"sources": [
{{
"url": "Full URL of the source",
"name": "Name of the source organization"
}}
],
"evidence": "A concise summary of the key evidence from both sources (2-3 sentences)",
"explanation": "A detailed explanation combining findings from both fact checks (3-4 sentences)",
"additional_context": "Important context about differences or similarities in findings (1-2 sentences)"
}}
The report should:
1. Combine sources from both fact checks
2. Compare findings from both analyses
3. Note any differences in conclusions
4. Provide a unified verdict based on all available information"""
response = await openai_client.generate_text_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
max_tokens=1000
)
response_data = response["response"]
# Clean up sources from both results
if isinstance(response_data.get("sources"), list):
cleaned_sources = []
for source in response_data["sources"]:
if isinstance(source, str):
url = source if source.startswith("http") else f"https://{source}"
cleaned_sources.append({"url": url, "name": source})
elif isinstance(source, dict):
url = source.get("url", "")
if url and not url.startswith("http"):
source["url"] = f"https://{url}"
cleaned_sources.append(source)
response_data["sources"] = cleaned_sources
if response_data["verdict"] == "Unverified" or not response_data.get("sources"):
return UnverifiedFactCheckResponse(**response_data)
return FactCheckResponse(**response_data)
except Exception as e:
logger.error(f"Error combining fact reports: {str(e)}")
return UnverifiedFactCheckResponse(
claim=query,
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="An error occurred while combining fact check reports.",
explanation="The system encountered an error while trying to combine results from multiple sources.",
additional_context="This is a technical error and does not reflect on the truthfulness of the claim."
)
@fact_check_router.post("/check-facts", response_model=Union[FactCheckResponse, UnverifiedFactCheckResponse])
async def check_facts(request: FactCheckRequest):
"""
Fetch fact check results and generate a comprehensive report.
Handles both query-based and URL-based fact checking.
"""
if not GOOGLE_API_KEY or not GOOGLE_FACT_CHECK_BASE_URL:
raise HTTPException(
status_code=500,
detail=ErrorResponse(
detail="Google API key or base URL is not configured",
error_code="CONFIGURATION_ERROR",
path="/check-facts"
).dict()
url_text = None
query_result = None
url_result = None
# If URL is provided, try to extract text
if request.url:
url_text = await process_url_content(request.url)
if not url_text and not request.query:
# Only return early if URL text extraction failed and no query provided
return UnverifiedFactCheckResponse(
claim=f"URL check requested: {request.url}",
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="Unable to extract text from the provided URL.",
explanation="The system could not process the content from the provided URL. The URL might be invalid or inaccessible.",
additional_context="Please provide a valid URL or a text query for fact-checking."
)
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
# Get fact checker sources from the centralized configuration
fact_checker_sources = get_all_sources()
# If URL text was successfully extracted, process it
if url_text:
logger.info(f"Processing fact check for extracted text: {url_text}")
url_result = await process_fact_check(url_text)
for source in fact_checker_sources:
params = {
"key": GOOGLE_API_KEY,
"query": request.query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": source.domain,
"pageSize": 10
}
# Process query if provided
if request.query:
query_result = await process_fact_check(request.query)
try:
response = await client.get(
GOOGLE_FACT_CHECK_BASE_URL,
params=params,
headers=headers
)
response.raise_for_status()
json_response = response.json()
if json_response.get("claims"):
return await generate_fact_report(request.query, json_response)
except httpx.RequestError as e:
print(f"Error fetching results for site {source.domain}: {str(e)}")
continue
except Exception as e:
print(f"Unexpected error for site {source.domain}: {str(e)}")
continue
try:
search_request = SearchRequest(
search_text=request.query,
source_types=["fact_checkers"]
)
ai_response = await search_websites(search_request)
return await generate_fact_report(request.query, ai_response)
except Exception as e:
print(f"Error in AI fact check: {str(e)}")
raise HTTPException(
status_code=404,
detail=ErrorResponse(
detail="No fact check results found",
error_code="NOT_FOUND",
path="/check-facts"
).dict()
# If both results are available, combine them
if query_result and url_result and url_text:
return await combine_fact_reports(request.query, url_text,
query_result.dict(), url_result.dict())
# If only one result is available
if query_result:
return query_result
if url_result:
return url_result
# If no valid results
return UnverifiedFactCheckResponse(
claim=request.query or f"URL: {request.url}",
verdict=VerdictEnum.UNVERIFIED,
confidence=ConfidenceEnum.LOW,
sources=[],
evidence="Failed to process fact-checking request.",
explanation="The system encountered errors while processing the fact checks.",
additional_context="Please try again with different input or contact support if the issue persists."
)

View file

@ -7,7 +7,7 @@ from pydantic import BaseModel
from app.models.ai_fact_check_models import (
AIFactCheckRequest,
FactCheckSource,
SourceType
SourceType,
)
from app.websites.fact_checker_website import SOURCES, get_all_sources
from app.api.ai_fact_check import ai_fact_check
@ -18,10 +18,10 @@ class SearchRequest(BaseModel):
search_text: str
source_types: List[str] = ["fact_checkers"]
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@ -38,39 +38,46 @@ def get_domain_from_url(url: str) -> str:
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
if domain.startswith('www.'):
if domain.startswith("www."):
domain = domain[4:]
return domain
except Exception as e:
logger.error(f"Error extracting domain from URL {url}: {str(e)}")
return ""
def is_valid_source_domain(domain: str, sources: List[FactCheckSource]) -> bool:
"""Check if domain matches any source with improved matching logic."""
if not domain:
return False
domain = domain.lower()
if domain.startswith('www.'):
if domain.startswith("www."):
domain = domain[4:]
for source in sources:
source_domain = source.domain.lower()
if source_domain.startswith('www.'):
if source_domain.startswith("www."):
source_domain = source_domain[4:]
if domain == source_domain or domain.endswith('.' + source_domain):
if domain == source_domain or domain.endswith("." + source_domain):
return True
return False
async def build_enhanced_search_query(query: str, sources: List[FactCheckSource]) -> str:
async def build_enhanced_search_query(
query: str, sources: List[FactCheckSource]
) -> str:
"""Build search query with site restrictions."""
site_queries = [f"site:{source.domain}" for source in sources]
site_restriction = " OR ".join(site_queries)
return f"({query}) ({site_restriction})"
async def google_custom_search(query: str, sources: List[FactCheckSource], page: int = 1) -> Optional[Dict]:
async def google_custom_search(
query: str, sources: List[FactCheckSource], page: int = 1
) -> Optional[Dict]:
"""Perform Google Custom Search with enhanced query."""
enhanced_query = await build_enhanced_search_query(query, sources)
start_index = ((page - 1) * RESULTS_PER_PAGE) + 1
@ -80,7 +87,7 @@ async def google_custom_search(query: str, sources: List[FactCheckSource], page:
"cx": GOOGLE_ENGINE_ID,
"q": enhanced_query,
"num": RESULTS_PER_PAGE,
"start": start_index
"start": start_index,
}
async with httpx.AsyncClient(timeout=30.0) as client:
@ -92,6 +99,7 @@ async def google_custom_search(query: str, sources: List[FactCheckSource], page:
logger.error(f"Search error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search error: {str(e)}")
@scrap_websites_router.post("/search")
async def search_websites(request: SearchRequest):
# Get the source types from the request
@ -115,7 +123,9 @@ async def search_websites(request: SearchRequest):
if len(all_urls) >= 50:
break
search_response = await google_custom_search(request.search_text, selected_sources, page)
search_response = await google_custom_search(
request.search_text, selected_sources, page
)
if not search_response or not search_response.get("items"):
break
@ -132,25 +142,23 @@ async def search_websites(request: SearchRequest):
domain_results[domain] = []
if len(domain_results[domain]) < MAX_URLS_PER_DOMAIN:
domain_results[domain].append({
domain_results[domain].append(
{
"url": url,
"title": item.get("title", ""),
"snippet": item.get("snippet", "")
})
"snippet": item.get("snippet", ""),
}
)
all_urls.append(url)
if len(all_urls) >= 50:
break
if not all_urls:
return {
"status": "no_results",
"urls_found": 0
}
return {"status": "no_results", "urls_found": 0}
fact_check_request = AIFactCheckRequest(
content=request.search_text,
urls=all_urls[:5]
content=request.search_text, urls=all_urls[:5]
)
return await ai_fact_check(fact_check_request)

View file

@ -4,7 +4,7 @@ from dotenv import load_dotenv
load_dotenv()
GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
GOOGLE_FACT_CHECK_BASE_URL= os.environ["GOOGLE_FACT_CHECK_BASE_URL"]
GOOGLE_FACT_CHECK_BASE_URL = os.environ["GOOGLE_FACT_CHECK_BASE_URL"]
GOOGLE_ENGINE_ID = os.environ["GOOGLE_ENGINE_ID"]
GOOGLE_SEARCH_URL = os.environ["GOOGLE_SEARCH_URL"]

View file

@ -4,38 +4,46 @@ from enum import Enum
from datetime import datetime
from urllib.parse import urlparse
# Common Models
class TokenUsage(BaseModel):
prompt_tokens: Optional[int] = 0
completion_tokens: Optional[int] = 0
total_tokens: Optional[int] = 0
class ErrorResponse(BaseModel):
detail: str
error_code: str = Field(..., description="Unique error code for this type of error")
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
path: Optional[str] = Field(None, description="The endpoint path where error occurred")
path: Optional[str] = Field(
None, description="The endpoint path where error occurred"
)
model_config = ConfigDict(json_schema_extra={
model_config = ConfigDict(
json_schema_extra={
"example": {
"detail": "Error description",
"error_code": "ERROR_CODE",
"timestamp": "2024-12-09T16:49:30.905765",
"path": "/check-facts"
"path": "/check-facts",
}
})
}
)
# Fact Check Models
class Publisher(BaseModel):
name: str
site: Optional[str] = Field(None, description="Publisher's website")
@validator('site')
@validator("site")
def validate_site(cls, v):
if v and not (v.startswith('http://') or v.startswith('https://')):
if v and not (v.startswith("http://") or v.startswith("https://")):
return f"https://{v}"
return v
class ClaimReview(BaseModel):
publisher: Publisher
url: Optional[HttpUrl] = None
@ -44,21 +52,25 @@ class ClaimReview(BaseModel):
textualRating: Optional[str] = None
languageCode: str = Field(default="en-US")
class Claim(BaseModel):
text: str
claimant: Optional[str] = None
claimDate: Optional[str] = None
claimReview: List[ClaimReview]
class SourceType(str, Enum):
FACT_CHECKER = "fact_checker"
NEWS_SITE = "news_site"
class FactCheckSource(BaseModel):
domain: str
type: SourceType
priority: int = Field(default=1, ge=1, le=10)
# Verification Models
class VerificationResult(BaseModel):
verdict: str = Field(..., description="True/False/Insufficient Information")
@ -67,44 +79,46 @@ class VerificationResult(BaseModel):
reasoning: str
missing_info: Optional[str] = None
model_config = ConfigDict(json_schema_extra={
model_config = ConfigDict(
json_schema_extra={
"example": {
"verdict": "True",
"confidence": "High",
"evidence": ["Direct quote from source supporting the claim"],
"reasoning": "Detailed analysis of why the claim is considered true",
"missing_info": "Any caveats or limitations of the verification"
"missing_info": "Any caveats or limitations of the verification",
}
})
}
)
# Request Models
class BaseFactCheckRequest(BaseModel):
content: str = Field(
...,
min_length=10,
max_length=1000,
description="The claim to be fact-checked"
..., min_length=10, max_length=1000, description="The claim to be fact-checked"
)
@validator('content')
@validator("content")
def validate_content(cls, v):
if not v.strip():
raise ValueError("Content cannot be empty or just whitespace")
return v.strip()
class GoogleFactCheckRequest(BaseFactCheckRequest):
language: str = Field(default="en-US", pattern="^[a-z]{2}-[A-Z]{2}$")
max_results_per_source: int = Field(default=10, ge=1, le=50)
class AIFactCheckRequest(BaseFactCheckRequest):
urls: List[str] = Field(
...,
min_items=1,
max_items=5,
description="List of URLs to check the content against. URLs will be prefixed with https:// if protocol is missing"
description="List of URLs to check the content against. URLs will be prefixed with https:// if protocol is missing",
)
@validator('urls')
@validator("urls")
def validate_urls(cls, urls):
validated_urls = []
for url in urls:
@ -112,8 +126,8 @@ class AIFactCheckRequest(BaseFactCheckRequest):
raise ValueError("URL cannot be empty")
# Add https:// if no protocol specified
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
try:
result = urlparse(url)
@ -125,15 +139,18 @@ class AIFactCheckRequest(BaseFactCheckRequest):
return validated_urls
model_config = ConfigDict(json_schema_extra={
model_config = ConfigDict(
json_schema_extra={
"example": {
"content": "Indian flag was drawn in BUET campus",
"urls": [
"www.altnews.in/article-about-flag",
"www.another-source.com/related-news"
]
"www.another-source.com/related-news",
],
}
})
}
)
# Response Models
class BaseFactCheckResponse(BaseModel):
@ -141,17 +158,20 @@ class BaseFactCheckResponse(BaseModel):
token_usage: TokenUsage
sources: List[str]
model_config = ConfigDict(json_schema_extra={
model_config = ConfigDict(
json_schema_extra={
"example": {
"query": "Example statement to verify",
"token_usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150
"total_tokens": 150,
},
"sources": ["source1.com", "source2.com"],
}
})
}
)
class GoogleFactCheckResponse(BaseFactCheckResponse):
total_claims_found: int
@ -159,44 +179,51 @@ class GoogleFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[str, Any]
summary: Dict[str, int]
model_config = ConfigDict(json_schema_extra={
model_config = ConfigDict(
json_schema_extra={
"example": {
"query": "Example claim",
"total_claims_found": 1,
"results": [{
"results": [
{
"text": "Example claim text",
"claimant": "Source name",
"claimReview": [{
"claimReview": [
{
"publisher": {
"name": "Fact Checker",
"site": "factchecker.com"
"site": "factchecker.com",
},
"textualRating": "True"
}]
}],
"textualRating": "True",
}
],
}
],
"verification_result": {
"verdict": "True",
"confidence": "High",
"evidence": ["Supporting evidence"],
"reasoning": "Detailed analysis"
"reasoning": "Detailed analysis",
},
"sources": ["factchecker.com"],
"token_usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150
"total_tokens": 150,
},
"summary": {
"total_sources": 1,
"fact_checking_sites_queried": 10
"summary": {"total_sources": 1, "fact_checking_sites_queried": 10},
}
}
})
)
class AIFactCheckResponse(BaseFactCheckResponse):
verification_result: Dict[str, VerificationResult] # Changed to Dict to store results per URL
verification_result: Dict[
str, VerificationResult
] # Changed to Dict to store results per URL
model_config = ConfigDict(json_schema_extra={
model_config = ConfigDict(
json_schema_extra={
"example": {
"query": "Indian flag was drawn in BUET campus",
"verification_result": {
@ -205,24 +232,26 @@ class AIFactCheckResponse(BaseFactCheckResponse):
"confidence": "High",
"evidence": ["Supporting evidence from source 1"],
"reasoning": "Detailed analysis from source 1",
"missing_info": None
"missing_info": None,
},
"https://www.source2.com": {
"verdict": "True",
"confidence": "Medium",
"evidence": ["Supporting evidence from source 2"],
"reasoning": "Analysis from source 2",
"missing_info": "Additional context needed"
}
"missing_info": "Additional context needed",
},
},
"sources": ["source1.com", "source2.com"],
"token_usage": {
"prompt_tokens": 200,
"completion_tokens": 100,
"total_tokens": 300
"total_tokens": 300,
},
}
}
})
)
# Backwards compatibility aliases
FactCheckRequest = GoogleFactCheckRequest

View file

@ -1,54 +1,106 @@
from pydantic import BaseModel, Field, HttpUrl, validator
from typing import List, Literal, Union
from pydantic import BaseModel, Field, HttpUrl, validator, root_validator
from typing import List, Literal, Union, Optional
from datetime import datetime
from enum import Enum
class VerdictEnum(str, Enum):
TRUE = "True"
FALSE = "False"
PARTIALLY_TRUE = "Partially True"
UNVERIFIED = "Unverified"
class ConfidenceEnum(str, Enum):
HIGH = "High"
MEDIUM = "Medium"
LOW = "Low"
class FactCheckRequest(BaseModel):
query: str = Field(
...,
query: Optional[str] = Field(
None,
min_length=3,
max_length=500,
description="The claim or statement to be fact-checked",
example="Did NASA confirm finding alien structures on Mars in 2024?"
example="Did NASA confirm finding alien structures on Mars in 2024?",
)
url: Optional[str] = Field(
None,
description="URL to be fact-checked",
example="https://example.com/article",
)
@root_validator(pre=True)
def validate_at_least_one(cls, values):
"""Validate that at least one of query or url is provided."""
query = values.get('query')
url = values.get('url')
if not query and not url:
raise ValueError("At least one of 'query' or 'url' must be provided")
return values
@validator('url')
def validate_url(cls, v):
"""Validate URL format if provided."""
if v is not None and len(v) < 3:
raise ValueError("URL must be at least 3 characters")
return v
class Source(BaseModel):
url: str
name: str = ""
@validator('url')
@validator("url")
def validate_url(cls, v):
# Basic URL validation without requiring HTTP/HTTPS
if not v or len(v) < 3:
raise ValueError("URL must not be empty and must be at least 3 characters")
return v
class UnverifiedFactCheckResponse(BaseModel):
claim: str = Field(
...,
min_length=10,
max_length=1000,
description="The exact claim being verified",
)
verdict: VerdictEnum = Field(..., description="The verification verdict")
confidence: ConfidenceEnum = Field(..., description="Confidence level in the verdict")
sources: List[Source] = Field(
default=[],
description="List of sources used in verification"
)
evidence: str = Field(
...,
min_length=20,
max_length=500,
description="Concise summary of key evidence",
)
explanation: str = Field(
...,
min_length=50,
max_length=1000,
description="Detailed explanation of verification findings",
)
additional_context: str = Field(
...,
min_length=20,
max_length=500,
description="Important context about the verification",
)
class FactCheckResponse(BaseModel):
claim: str = Field(
...,
min_length=10,
max_length=1000,
description="The exact claim being verified"
)
verdict: VerdictEnum = Field(
...,
description="The verification verdict"
)
confidence: ConfidenceEnum = Field(
...,
description="Confidence level in the verdict"
description="The exact claim being verified",
)
verdict: VerdictEnum = Field(..., description="The verification verdict")
confidence: ConfidenceEnum = Field(..., description="Confidence level in the verdict")
sources: List[Source] = Field(
...,
min_items=1,
@ -58,19 +110,19 @@ class FactCheckResponse(BaseModel):
...,
min_length=20,
max_length=500,
description="Concise summary of key evidence"
description="Concise summary of key evidence",
)
explanation: str = Field(
...,
min_length=50,
max_length=1000,
description="Detailed explanation of verification findings"
description="Detailed explanation of verification findings",
)
additional_context: str = Field(
...,
min_length=20,
max_length=500,
description="Important context about the verification"
description="Important context about the verification",
)
class Config:
@ -82,19 +134,16 @@ class FactCheckResponse(BaseModel):
"sources": [
{
"url": "https://www.nasa.gov/mars-exploration",
"name": "NASA Mars Exploration"
},
{
"url": "https://factcheck.org/2024/mars-claims",
"name": "FactCheck.org"
"name": "NASA Mars Exploration",
}
],
"evidence": "NASA has made no such announcement. Recent Mars rover images show natural rock formations.",
"explanation": "Multiple fact-checking organizations investigated this claim. NASA's official communications and Mars mission reports from 2024 contain no mention of alien structures. The viral images being shared are misidentified natural geological formations.",
"additional_context": "Similar false claims about alien structures on Mars have circulated periodically since the first Mars rovers began sending back images."
"explanation": "Multiple fact-checking organizations investigated this claim. NASA's official communications and Mars mission reports from 2024 contain no mention of alien structures.",
"additional_context": "Similar false claims about alien structures on Mars have circulated periodically.",
}
}
class ErrorResponse(BaseModel):
detail: str
error_code: str = Field(..., example="VALIDATION_ERROR")

View file

@ -1,38 +1,46 @@
from pydantic import BaseModel
from typing import List, Dict
class SearchRequest(BaseModel):
search_text: str
source_types: List[str] = ["fact_checkers"]
class Publisher(BaseModel):
name: str
site: str
class ClaimReview(BaseModel):
publisher: Publisher
textualRating: str
class Claim(BaseModel):
claimReview: List[ClaimReview]
claimant: str
text: str
class Summary(BaseModel):
fact_checking_sites_queried: int
total_sources: int
class TokenUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class VerificationResult(BaseModel):
verdict: str
confidence: str
evidence: List[str]
reasoning: str
class EnhancedFactCheckResponse(BaseModel):
query: str
results: List[Claim]

View file

@ -0,0 +1,119 @@
import base64
import requests
import os
from io import BytesIO
from typing import Tuple, Optional
import logging
import aiohttp
logger = logging.getLogger(__name__)
class ImageTextExtractor:
def __init__(self, api_key: str):
"""Initialize ImageTextExtractor with OpenAI API key."""
self.api_key = api_key
self.api_url = "https://api.openai.com/v1/chat/completions"
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
def encode_image(self, image_path: str) -> str:
"""Encode a local image into base64."""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except Exception as e:
logger.error(f"Error encoding image: {str(e)}")
raise Exception(f"Error encoding image: {e}")
async def fetch_image_from_url(self, image_url: str) -> Tuple[str, str]:
"""Fetch an image from a URL and encode it as base64."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
if response.status != 200:
raise Exception(f"Failed to fetch image: Status {response.status}")
content_type = response.headers.get('Content-Type', '')
if "text/html" in content_type:
raise ValueError("The URL points to a webpage, not an image")
if "image" not in content_type:
raise ValueError("The URL does not point to a valid image")
image_data = await response.read()
image_format = "jpeg" if "jpeg" in content_type or "jpg" in content_type else "png"
base64_image = base64.b64encode(image_data).decode('utf-8')
return base64_image, image_format
except aiohttp.ClientError as e:
logger.error(f"Error fetching image from URL: {str(e)}")
raise Exception(f"Error fetching image from URL: {e}")
except ValueError as e:
raise
except Exception as e:
logger.error(f"Unexpected error processing image URL: {str(e)}")
raise Exception(f"Unexpected error processing image: {e}")
async def extract_text(self, image_input: str, is_url: bool = False) -> Optional[str]:
"""Extract text from an image, either from a local path or URL."""
try:
if is_url:
try:
base64_image, image_format = await self.fetch_image_from_url(image_input)
except ValueError as e:
if "webpage" in str(e):
return None
raise
else:
if not os.path.exists(image_input):
raise FileNotFoundError(f"Image file not found: {image_input}")
base64_image = self.encode_image(image_input)
image_format = "jpeg" if image_input.endswith(".jpg") else "png"
payload = {
"model": "gpt-4-turbo-2024-04-09", # Updated model name
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Extract and return only the key text from this image in the original language. Do not provide translations or explanations."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/{image_format};base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
async with aiohttp.ClientSession() as session:
async with session.post(self.api_url, headers=self.headers, json=payload) as response:
if response.status != 200:
error_content = await response.text()
logger.error(f"API request failed: Status {response.status}, Response: {error_content}")
raise Exception(f"API request failed with status {response.status}")
result = await response.json()
logger.debug(f"GPT-4 API Response: {result}")
if 'choices' in result and len(result['choices']) > 0:
extracted_text = result['choices'][0]['message']['content'].strip()
if extracted_text:
return extracted_text
return None
except (aiohttp.ClientError, ValueError, FileNotFoundError) as e:
logger.error(f"Error in text extraction: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected error in text extraction: {str(e)}")
return None
return None

View file

@ -1,4 +1,3 @@
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain_community.document_transformers import BeautifulSoupTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
@ -7,6 +6,9 @@ import numpy as np
import logging as logger
import openai
import json
import aiohttp
from bs4 import BeautifulSoup
class OpenAIClient:
def __init__(self, api_key: str):
@ -15,7 +17,9 @@ class OpenAIClient:
"""
openai.api_key = api_key
async def generate_text_response(self, system_prompt: str, user_prompt: str, max_tokens: int) -> dict:
async def generate_text_response(
self, system_prompt: str, user_prompt: str, max_tokens: int
) -> dict:
"""
Generate a response using OpenAI's chat completion API.
"""
@ -24,19 +28,19 @@ class OpenAIClient:
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
{"role": "user", "content": user_prompt},
],
max_tokens=max_tokens
max_tokens=max_tokens,
)
content = response['choices'][0]['message']['content']
content = response["choices"][0]["message"]["content"]
# Parse the JSON string into a dictionary
parsed_content = json.loads(content)
return {
"response": parsed_content, # Now returns a dictionary instead of string
"prompt_tokens": response['usage']['prompt_tokens'],
"completion_tokens": response['usage']['completion_tokens'],
"total_tokens": response['usage']['total_tokens']
"prompt_tokens": response["usage"]["prompt_tokens"],
"completion_tokens": response["usage"]["completion_tokens"],
"total_tokens": response["usage"]["total_tokens"],
}
except json.JSONDecodeError as e:
raise Exception(f"Failed to parse OpenAI response as JSON: {str(e)}")
@ -49,14 +53,14 @@ class OpenAIClient:
"""
try:
response = openai.Embedding.create(
input=texts,
model="text-embedding-ada-002"
input=texts, model="text-embedding-ada-002"
)
embeddings = [data['embedding'] for data in response['data']]
embeddings = [data["embedding"] for data in response["data"]]
return embeddings
except Exception as e:
raise Exception(f"OpenAI embedding error: {str(e)}")
class AIFactChecker:
def __init__(self, openai_client: OpenAIClient):
"""Initialize the fact checker with OpenAI client."""
@ -65,20 +69,36 @@ class AIFactChecker:
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
)
async def scrape_webpage(self, url: str) -> List[Document]:
"""Scrape webpage content using LangChain's AsyncHtmlLoader."""
"""Scrape webpage content without saving HTML files."""
try:
loader = AsyncHtmlLoader([url])
docs = await loader.aload()
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(
f"Failed to fetch URL: {url}, status: {response.status}"
)
bs_transformer = BeautifulSoupTransformer()
docs_transformed = bs_transformer.transform_documents(docs)
docs_chunks = self.text_splitter.split_documents(docs_transformed)
html_content = await response.text()
logger.info(f"Successfully scraped webpage | chunks={len(docs_chunks)}")
# Parse HTML with BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
# Create a Document with the parsed content
doc = Document(
page_content=soup.get_text(separator="\n", strip=True),
metadata={"source": url},
)
# Split into chunks
docs_chunks = self.text_splitter.split_documents([doc])
logger.info(
f"Successfully scraped webpage | chunks={len(docs_chunks)}"
)
return docs_chunks
except Exception as e:
@ -89,7 +109,7 @@ class AIFactChecker:
self,
query_embedding: List[float],
doc_embeddings: List[List[float]],
docs: List[Document]
docs: List[Document],
) -> List[Document]:
"""Find most relevant document chunks using cosine similarity."""
try:
@ -107,7 +127,9 @@ class AIFactChecker:
logger.error(f"Error finding relevant chunks | error={str(e)}")
raise
async def verify_fact(self, query: str, relevant_docs: List[Document]) -> Dict[str, Any]:
async def verify_fact(
self, query: str, relevant_docs: List[Document]
) -> Dict[str, Any]:
"""Verify fact using OpenAI's API with context from relevant documents."""
try:
context = "\n\n".join([doc.page_content for doc in relevant_docs])
@ -132,12 +154,17 @@ class AIFactChecker:
Analyze the statement based on the provided context and return your response in the specified JSON format."""
response = await self.openai_client.generate_text_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
max_tokens=800
system_prompt=system_prompt, user_prompt=user_prompt, max_tokens=800
)
sources = list(set([doc.metadata.get('source', 'Unknown source') for doc in relevant_docs]))
sources = list(
set(
[
doc.metadata.get("source", "Unknown source")
for doc in relevant_docs
]
)
)
return {
"verification_result": response["response"], # This is now a dictionary
@ -145,8 +172,8 @@ class AIFactChecker:
"token_usage": {
"prompt_tokens": response["prompt_tokens"],
"completion_tokens": response["completion_tokens"],
"total_tokens": response["total_tokens"]
}
"total_tokens": response["total_tokens"],
},
}
except Exception as e:
@ -162,7 +189,9 @@ class AIFactChecker:
doc_embeddings = self.openai_client.get_embeddings(doc_texts)
query_embedding = self.openai_client.get_embeddings([query])
relevant_docs = self.find_relevant_chunks(query_embedding[0], doc_embeddings, docs)
relevant_docs = self.find_relevant_chunks(
query_embedding[0], doc_embeddings, docs
)
verification_result = await self.verify_fact(query, relevant_docs)
return verification_result

View file

@ -1,7 +1,12 @@
from typing import Dict, List
import requests
from fastapi import HTTPException
from app.models.ai_fact_check_models import FactCheckSource, ErrorResponse, FactCheckRequest, SourceType
from app.models.ai_fact_check_models import (
FactCheckSource,
ErrorResponse,
FactCheckRequest,
SourceType,
)
# Sources configuration with validation
SOURCES = {
@ -113,8 +118,8 @@ SOURCES = {
"thejournal.ie/factcheck",
"journalistsresource.org",
"metafact.io",
"reporterslab.org/fact-checking"
]
"reporterslab.org/fact-checking",
]
],
"news_sites": [
FactCheckSource(domain=domain, type=SourceType.NEWS_SITE, priority=2)
@ -133,16 +138,14 @@ SOURCES = {
"www.risingbd.com/english",
"www.dailyindustry.news",
"www.bangladeshpost.net",
"www.daily-bangladesh.com/english"
]
"www.daily-bangladesh.com/english",
]
],
}
async def fetch_fact_checks(
api_key: str,
base_url: str,
query: str,
site: FactCheckSource
api_key: str, base_url: str, query: str, site: FactCheckSource
) -> Dict:
"""
Fetch fact checks from a specific site using the Google Fact Check API
@ -156,7 +159,7 @@ async def fetch_fact_checks(
"query": query,
"languageCode": "en-US",
"reviewPublisherSiteFilter": site.domain,
"pageSize": 10
"pageSize": 10,
}
response = requests.get(base_url, params=params)
@ -168,19 +171,18 @@ async def fetch_fact_checks(
detail=ErrorResponse(
detail=f"Error fetching from {site.domain}: {str(e)}",
error_code="FACT_CHECK_SERVICE_ERROR",
path="/check-facts"
).dict()
path="/check-facts",
).dict(),
)
except ValueError as e:
raise HTTPException(
status_code=500,
detail=ErrorResponse(
detail=str(e),
error_code="CONFIGURATION_ERROR",
path="/check-facts"
).dict()
detail=str(e), error_code="CONFIGURATION_ERROR", path="/check-facts"
).dict(),
)
def get_all_sources() -> List[FactCheckSource]:
"""
Get all sources sorted by priority

5
docker-compose.yaml Normal file
View file

@ -0,0 +1,5 @@
services:
backend:
build: .
container_name: backend-service
restart: always

BIN
images-test.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

21
main.py
View file

@ -7,25 +7,14 @@ from app.config import FRONTEND_URL
# Initialize FastAPI app
app = FastAPI(
title="Your API Title",
description="Your API Description",
version="1.0.0"
title="Your API Title", description="Your API Description", version="1.0.0"
)
# CORS configuration
origins = [
FRONTEND_URL,
"http://localhost",
"http://localhost:5173",
"http://0.0.0.0",
"http://0.0.0.0:5173",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_origins=["*"], # Only wildcard
allow_credentials=False, # Changed to False to work with wildcard
allow_methods=["*"],
allow_headers=["*"],
)
@ -44,10 +33,6 @@ app.include_router(fact_check_router, prefix="")
app.include_router(aifact_check_router, prefix="")
app.include_router(scrap_websites_router, prefix="")
# Include routers (uncomment and modify as needed)
# from routes import some_router
# app.include_router(some_router, prefix="/your-prefix", tags=["your-tag"])
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

View file

@ -1,6 +1,88 @@
certifi==2024.8.30
aiofiles==24.1.0
aiohappyeyeballs==2.4.4
aiohttp==3.11.10
aiosignal==1.3.2
annotated-types==0.7.0
anyio==4.7.0
attrs==24.3.0
beautifulsoup4==4.12.3
black==24.10.0
certifi==2024.12.14
charset-normalizer==3.4.0
click==8.1.7
dataclasses-json==0.6.7
dnspython==2.7.0
email_validator==2.2.0
fastapi==0.115.6
fastapi-cli==0.0.7
flake8==7.1.1
frozenlist==1.5.0
greenlet==3.1.1
gunicorn==23.0.0
h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
iniconfig==2.0.0
itsdangerous==2.2.0
Jinja2==3.1.4
jsonpatch==1.33
jsonpointer==3.0.0
langchain==0.3.12
langchain-community==0.3.12
langchain-core==0.3.25
langchain-text-splitters==0.3.3
langsmith==0.2.3
markdown-it-py==3.0.0
MarkupSafe==3.0.2
marshmallow==3.23.1
mccabe==0.7.0
mdurl==0.1.2
multidict==6.1.0
mypy-extensions==1.0.0
numpy==1.26.4
openai==0.28.0
orjson==3.10.12
packaging==24.2
pathspec==0.12.1
pillow==11.0.0
platformdirs==4.3.6
pluggy==1.5.0
propcache==0.2.1
pycodestyle==2.12.1
pydantic==2.10.3
pydantic-extra-types==2.10.1
pydantic-settings==2.7.0
pydantic_core==2.27.1
pyflakes==3.2.0
Pygments==2.18.0
pytest==8.3.4
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-json-logger==3.2.1
python-multipart==0.0.20
PyYAML==6.0.2
requests==2.32.3
requests-toolbelt==1.0.0
rich==13.9.4
rich-toolkit==0.12.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
soupsieve==2.6
SQLAlchemy==2.0.36
starlette==0.41.3
tenacity==9.0.0
tqdm==4.67.1
typer==0.15.1
typing-inspect==0.9.0
typing_extensions==4.12.2
ujson==5.10.0
urllib3==2.2.3
uvicorn==0.34.0
uvloop==0.21.0
watchfiles==1.0.3
websockets==14.1
yarl==1.18.3

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

18
tests/test_main.py Normal file
View file

@ -0,0 +1,18 @@
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_root_endpoint():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Welcome to your FastAPI application"}
def test_health_endpoint():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "healthy"}
def test_cors_headers():
response = client.get("/", headers={"Origin": "http://localhost:5173"})
assert response.headers["access-control-allow-origin"] == "*"