fact-checker-backend/app/api/scrap_websites.py
2024-12-12 17:31:44 +06:00

160 lines
No EOL
5.5 KiB
Python

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Dict
import requests
from bs4 import BeautifulSoup
import urllib.parse
import numpy as np
from app.services.openai_client import OpenAIClient
from app.config import OPENAI_API_KEY
scrap_websites_router = APIRouter()
class SearchRequest(BaseModel):
search_text: str
site_domains: List[str]
class UrlSimilarityInfo(BaseModel):
url: str
similarity: float
extracted_text: str
class SearchResponse(BaseModel):
results: Dict[str, List[str]]
error_messages: Dict[str, str]
url_similarities: Dict[str, List[UrlSimilarityInfo]]
def extract_url_text(url: str) -> str:
"""Extract and process meaningful text from URL path with improved cleaning"""
try:
# Parse the URL and get the path
parsed = urllib.parse.urlparse(url)
path = parsed.path
# Remove common URL parts and file extensions
path = path.replace('.html', '').replace('/index', '').replace('.php', '')
# Split path into segments
segments = [seg for seg in path.split('/') if seg]
# Remove dates and numbers
cleaned_segments = []
for segment in segments:
# Replace hyphens and underscores with spaces
segment = segment.replace('-', ' ').replace('_', ' ')
# Filter out segments that are just dates or numbers
if not (segment.replace(' ', '').isdigit() or
all(part.isdigit() for part in segment.split() if part)):
cleaned_segments.append(segment)
# Remove very common words that don't add meaning
common_words = {
'www', 'live', 'news', 'intl', 'index', 'world', 'us', 'uk',
'updates', 'update', 'latest', 'breaking', 'new', 'article'
}
# Join segments and split into words
text = ' '.join(cleaned_segments)
words = [word.lower() for word in text.split()
if word.lower() not in common_words and len(word) > 1]
return ' '.join(words)
except Exception:
return ''
def google_search_scraper(search_text: str, site_domain: str) -> List[str]:
query = f"{search_text} \"site:{site_domain}\""
encoded_query = urllib.parse.quote(query)
base_url = "https://www.google.com/search"
url = f"{base_url}?q={encoded_query}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
search_results = soup.find_all('div', class_='g')
urls = []
for result in search_results[:5]:
link = result.find('a')
if link and 'href' in link.attrs:
url = link['href']
if url.startswith('http'):
urls.append(url)
return urls[:5]
except requests.RequestException as e:
raise HTTPException(status_code=500, detail=f"Error scraping {site_domain}: {str(e)}")
def calculate_similarity(query_embedding: List[float], url_embedding: List[float]) -> float:
query_array = np.array(query_embedding)
url_array = np.array(url_embedding)
similarity = np.dot(url_array, query_array) / (
np.linalg.norm(url_array) * np.linalg.norm(query_array)
)
return float(similarity)
@scrap_websites_router.post("/search", response_model=SearchResponse)
async def search_websites(request: SearchRequest):
results = {}
error_messages = {}
url_similarities = {}
# Initialize OpenAI client
openai_client = OpenAIClient(OPENAI_API_KEY)
# Enhance search text with key terms
search_context = request.search_text
query_embedding = openai_client.get_embeddings([search_context])[0]
# Higher similarity threshold for better filtering
SIMILARITY_THRESHOLD = 0.75
for domain in request.site_domains:
try:
urls = google_search_scraper(request.search_text, domain)
url_sims = []
valid_urls = []
for url in urls:
url_text = extract_url_text(url)
# Skip URLs with no meaningful text extracted
if not url_text:
continue
url_embedding = openai_client.get_embeddings([url_text])[0]
similarity = calculate_similarity(query_embedding, url_embedding)
url_sims.append(UrlSimilarityInfo(
url=url,
similarity=similarity,
extracted_text=url_text
))
if similarity >= SIMILARITY_THRESHOLD:
valid_urls.append(url)
results[domain] = valid_urls
url_similarities[domain] = sorted(url_sims,
key=lambda x: x.similarity,
reverse=True)
except HTTPException as e:
error_messages[domain] = str(e.detail)
except Exception as e:
error_messages[domain] = f"Unexpected error for {domain}: {str(e)}"
return SearchResponse(
results=results,
error_messages=error_messages,
url_similarities=url_similarities
)