119 lines
No EOL
5.2 KiB
Python
119 lines
No EOL
5.2 KiB
Python
import base64
|
|
import requests
|
|
import os
|
|
from io import BytesIO
|
|
from typing import Tuple, Optional
|
|
import logging
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ImageTextExtractor:
|
|
def __init__(self, api_key: str):
|
|
"""Initialize ImageTextExtractor with OpenAI API key."""
|
|
self.api_key = api_key
|
|
self.api_url = "https://api.openai.com/v1/chat/completions"
|
|
self.headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {api_key}"
|
|
}
|
|
|
|
def encode_image(self, image_path: str) -> str:
|
|
"""Encode a local image into base64."""
|
|
try:
|
|
with open(image_path, "rb") as image_file:
|
|
return base64.b64encode(image_file.read()).decode('utf-8')
|
|
except Exception as e:
|
|
logger.error(f"Error encoding image: {str(e)}")
|
|
raise Exception(f"Error encoding image: {e}")
|
|
|
|
async def fetch_image_from_url(self, image_url: str) -> Tuple[str, str]:
|
|
"""Fetch an image from a URL and encode it as base64."""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(image_url) as response:
|
|
if response.status != 200:
|
|
raise Exception(f"Failed to fetch image: Status {response.status}")
|
|
|
|
content_type = response.headers.get('Content-Type', '')
|
|
if "text/html" in content_type:
|
|
raise ValueError("The URL points to a webpage, not an image")
|
|
if "image" not in content_type:
|
|
raise ValueError("The URL does not point to a valid image")
|
|
|
|
image_data = await response.read()
|
|
image_format = "jpeg" if "jpeg" in content_type or "jpg" in content_type else "png"
|
|
base64_image = base64.b64encode(image_data).decode('utf-8')
|
|
return base64_image, image_format
|
|
|
|
except aiohttp.ClientError as e:
|
|
logger.error(f"Error fetching image from URL: {str(e)}")
|
|
raise Exception(f"Error fetching image from URL: {e}")
|
|
except ValueError as e:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error processing image URL: {str(e)}")
|
|
raise Exception(f"Unexpected error processing image: {e}")
|
|
|
|
async def extract_text(self, image_input: str, is_url: bool = False) -> Optional[str]:
|
|
"""Extract text from an image, either from a local path or URL."""
|
|
try:
|
|
if is_url:
|
|
try:
|
|
base64_image, image_format = await self.fetch_image_from_url(image_input)
|
|
except ValueError as e:
|
|
if "webpage" in str(e):
|
|
return None
|
|
raise
|
|
else:
|
|
if not os.path.exists(image_input):
|
|
raise FileNotFoundError(f"Image file not found: {image_input}")
|
|
base64_image = self.encode_image(image_input)
|
|
image_format = "jpeg" if image_input.endswith(".jpg") else "png"
|
|
|
|
payload = {
|
|
"model": "gpt-4-turbo-2024-04-09", # Updated model name
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Extract and return only the key text from this image in the original language. Do not provide translations or explanations."
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/{image_format};base64,{base64_image}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"max_tokens": 300
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(self.api_url, headers=self.headers, json=payload) as response:
|
|
if response.status != 200:
|
|
error_content = await response.text()
|
|
logger.error(f"API request failed: Status {response.status}, Response: {error_content}")
|
|
raise Exception(f"API request failed with status {response.status}")
|
|
|
|
result = await response.json()
|
|
logger.debug(f"GPT-4 API Response: {result}")
|
|
|
|
if 'choices' in result and len(result['choices']) > 0:
|
|
extracted_text = result['choices'][0]['message']['content'].strip()
|
|
if extracted_text:
|
|
return extracted_text
|
|
return None
|
|
|
|
except (aiohttp.ClientError, ValueError, FileNotFoundError) as e:
|
|
logger.error(f"Error in text extraction: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in text extraction: {str(e)}")
|
|
return None
|
|
|
|
return None |