148 lines
4.6 KiB
Python
148 lines
4.6 KiB
Python
import os
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from PIL import Image
|
|
import io
|
|
|
|
|
|
def get_html(url, file_name, max_retries=3):
|
|
session = requests.Session()
|
|
adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
|
|
try:
|
|
response = session.get(url)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except Exception as e:
|
|
print(f"Error occurred while fetching HTML from {url}: {e}")
|
|
raise Exception(f"{file_name}, 获取网页html失败")
|
|
|
|
|
|
def download_image(img_url, max_retries=5):
|
|
for retry in range(max_retries):
|
|
try:
|
|
with requests.get(img_url, stream=True) as response:
|
|
response.raise_for_status()
|
|
return response.content
|
|
except Exception as e:
|
|
if retry < max_retries - 1:
|
|
print(
|
|
f"Failed to download image, retrying ({retry+1}/{max_retries})..."
|
|
)
|
|
else:
|
|
print("Failed to download image after multiple retries, skipping.")
|
|
return None
|
|
|
|
|
|
def get_img_urls(html_content):
|
|
soup = BeautifulSoup(html_content, "html.parser")
|
|
img_tags = soup.find("div", class_="reading-content").find_all("img")
|
|
|
|
img_urls = []
|
|
for img_tag in img_tags:
|
|
img_url = img_tag.attrs["data-src"]
|
|
img_urls.append(img_url)
|
|
return img_urls
|
|
|
|
|
|
def create_img_obj_list(img_url_list, file_name):
|
|
img_obj_list = []
|
|
for url in img_url_list:
|
|
obj = dict()
|
|
obj["file_name"] = file_name
|
|
obj["url"] = url
|
|
obj["data"] = None
|
|
img_obj_list.append(obj)
|
|
|
|
return img_obj_list
|
|
|
|
|
|
def set_img_obj_data(img_obj):
|
|
url = img_obj["url"]
|
|
data = download_image(url)
|
|
if data is None:
|
|
file_name = img_obj["file_name"]
|
|
raise Exception(f"{file_name}, 下载图片失败")
|
|
img_obj["data"] = data
|
|
|
|
|
|
def batch_download_images_to_img_obj_list(img_obj_list):
|
|
"""
|
|
使用 ThreadPoolExecutor 创建线程池,对 img_obj_list 中的每个图片对象调用 set_img_obj_data 函数。
|
|
|
|
Args:
|
|
img_obj_list (list): 图片对象列表,每个对象包含图片的数据等信息。
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
with ThreadPoolExecutor() as executor:
|
|
executor.map(set_img_obj_data, img_obj_list)
|
|
|
|
|
|
def concatenate_images_vertically(img_obj_list):
|
|
"""
|
|
垂直拼接长图片
|
|
"""
|
|
try:
|
|
# 计算拼接后的长图宽度和总高度
|
|
max_width = max(
|
|
Image.open(io.BytesIO(img_obj["data"])).width for img_obj in img_obj_list
|
|
)
|
|
total_height = sum(
|
|
Image.open(io.BytesIO(img_obj["data"])).height for img_obj in img_obj_list
|
|
)
|
|
|
|
# 创建一张新的长图
|
|
long_image = Image.new("RGB", (max_width, total_height), color=(255, 255, 255))
|
|
|
|
# 依次将图片在垂直方向上拼接起来
|
|
y_offset = 0
|
|
for img_obj in img_obj_list:
|
|
img = Image.open(io.BytesIO(img_obj["data"]))
|
|
img_width, img_height = img.size
|
|
x_offset = (max_width - img_width) // 2 # 居中拼接
|
|
long_image.paste(img, (x_offset, y_offset))
|
|
y_offset += img_height
|
|
|
|
return long_image
|
|
|
|
except Exception as e:
|
|
file_name = img_obj_list[0]["file_name"]
|
|
print(f"{file_name}, 拼接图片失败:{e}")
|
|
return None
|
|
|
|
|
|
def process_batch(lines):
|
|
for line in lines:
|
|
line = line.strip() # 去掉每行开头和结尾的空白字符
|
|
if line:
|
|
# 解析出 HTML 文件名和 URL 地址
|
|
file_name, _, url = line.partition(" - ")
|
|
html_content = get_html(url, file_name)
|
|
img_url_list = get_img_urls(html_content)
|
|
img_obj_list = create_img_obj_list(img_url_list, file_name)
|
|
batch_download_images_to_img_obj_list(img_obj_list)
|
|
long_image = concatenate_images_vertically(img_obj_list) # 垂直拼接长图片
|
|
long_image.save(f"imgs/{file_name}.png") # 保存长图到本地
|
|
|
|
|
|
if __name__ == "__main__":
|
|
task_file = "input.txt"
|
|
batch_size = 3 # 每个线程处理的行数
|
|
|
|
with open(task_file, "r", encoding="utf-8") as file:
|
|
lines = file.readlines()
|
|
|
|
# 使用 ThreadPoolExecutor 创建线程池
|
|
with ThreadPoolExecutor() as executor:
|
|
# 按照 batch_size 将行分批次处理
|
|
for i in range(0, len(lines), batch_size):
|
|
batch_lines = lines[i : i + batch_size]
|
|
executor.submit(process_batch, batch_lines)
|
|
|
|
print("finish, 程序结束...")
|