-
Notifications
You must be signed in to change notification settings - Fork 0
/
metadata.py
148 lines (127 loc) · 6.18 KB
/
metadata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import aiohttp
from bs4 import BeautifulSoup
from typing import Dict, Any
from PIL import Image
from io import BytesIO
import os
import hashlib
import asyncio
from ssl import SSLError
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from webdriver_manager.firefox import GeckoDriverManager
metadata_cache: Dict[str, Dict[str, Any]] = {}
IMAGE_DIR = "static/images"
if not os.path.exists(IMAGE_DIR):
os.makedirs(IMAGE_DIR)
async def fetch_metadata(url: str) -> Dict[str, Any]:
if url in metadata_cache:
return metadata_cache[url]
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10, ssl=False) as response:
if response.status != 200:
raise aiohttp.ClientResponseError(
response.request_info,
response.history,
status=response.status,
message=f"HTTP error {response.status}",
)
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
og_image = soup.find('meta', property='og:image')
og_description = soup.find('meta', property='og:description')
image_url = og_image['content'] if og_image else None
description = og_description['content'] if og_description else ''
# Truncate description if it's too long
description = description[:280].rsplit(' ', 1)[0] + '...' if len(description) > 280 else description
base_url = f"{response.url.scheme}://{response.url.host}"
image_filename = await download_and_resize_image(image_url, base_url) if image_url else None
# If image is missing or too small, use Firefox screenshot
if not image_filename or is_image_too_small(image_filename):
image_filename = await capture_screenshot(url)
# Use 'placeholder.jpg' as fallback if everything else fails
if not image_filename:
image_filename = 'placeholder.jpg'
metadata = {
'image_url': f"/static/images/{image_filename}",
'description': description,
}
metadata_cache[url] = metadata
return metadata
except (aiohttp.ClientError, asyncio.TimeoutError, SSLError) as e:
print(f"Error fetching metadata for {url}: {e}")
return {'image_url': '/static/images/placeholder.jpg', 'description': f"Error fetching metadata: {str(e)}"}
except Exception as e:
print(f"Unexpected error fetching metadata for {url}: {e}")
return {'image_url': '/static/images/placeholder.jpg', 'description': f"Unexpected error: {str(e)}"}
async def download_and_resize_image(image_url: str, base_url: str = None) -> str:
try:
# Handle relative paths
if image_url.startswith('/') and base_url:
image_url = f"{base_url.rstrip('/')}{image_url}"
async with aiohttp.ClientSession() as session:
async with session.get(image_url, timeout=10) as response:
if response.status == 200:
image_data = await response.read()
image = Image.open(BytesIO(image_data))
image = image.convert('RGB')
max_width = 640
if image.width > max_width:
ratio = max_width / float(image.width)
height = int((float(image.height) * float(ratio)))
image = image.resize((max_width, height), Image.LANCZOS)
image_hash = hashlib.md5(image_url.encode('utf-8')).hexdigest()
image_filename = f"{image_hash}.jpg"
image_path = os.path.join(IMAGE_DIR, image_filename)
image.save(image_path, "JPEG")
return image_filename
except Exception as e:
print(f"Error downloading image {image_url}: {e}")
return None
def is_image_too_small(image_filename: str) -> bool:
image_path = os.path.join(IMAGE_DIR, image_filename)
with Image.open(image_path) as img:
return img.width < 80 or img.height < 80
async def capture_screenshot(url: str) -> str:
try:
options = Options()
options.add_argument("--headless")
service = Service(GeckoDriverManager().install())
driver = webdriver.Firefox(service=service, options=options)
try:
driver.get(url)
driver.set_window_size(1280, 1024) # Set a larger window size
screenshot = driver.get_screenshot_as_png()
image = Image.open(BytesIO(screenshot))
image = image.convert('RGB')
# Resize to max width of 640px
max_width = 640
if image.width > max_width:
ratio = max_width / float(image.width)
height = int((float(image.height) * float(ratio)))
image = image.resize((max_width, height), Image.LANCZOS)
image_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
image_filename = f"{image_hash}_screenshot.jpg"
image_path = os.path.join(IMAGE_DIR, image_filename)
image.save(image_path, "JPEG")
return image_filename
finally:
driver.quit()
except Exception as e:
print(f"Error capturing screenshot for {url}: {e}")
return await fallback_image_capture(url)
async def fallback_image_capture(url: str) -> str:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
og_image = soup.find('meta', property='og:image')
if og_image and og_image.get('content'):
return await download_and_resize_image(og_image['content'])
except Exception as e:
print(f"Error in fallback image capture for {url}: {e}")
return 'placeholder.jpg'