comic_book_downloader/main.py
2023-07-21 23:17:56 +08:00

204 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import io
def get_html(url, file_name, max_retries=3):
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
response = session.get(url)
response.raise_for_status()
return response.text
except Exception as e:
print(f"Error occurred while fetching HTML from {url}: {e}")
raise Exception(f"{file_name}, 获取网页html失败")
def download_image(img_url, max_retries=5):
"""
通过给定的图片URL下载图片内容。
参数:
img_url (str): 图片的URL地址。
max_retries (int, 可选): 下载失败时的最大重试次数。默认为5次。
返回值:
bytes or None: 成功下载图片的二进制数据若下载失败则返回None。
注解:
这个函数通过发送HTTP请求下载图片文件。它使用`requests`库来获取URL返回的响应。
如果下载成功函数将返回图片的二进制内容bytes格式
如果下载失败,函数将尝试最多`max_retries`次重试,直到成功或达到重试次数上限。
在每次重试之间,函数会打印错误消息来指示重试进度。
如果重试次数用尽后仍然无法下载图片函数将输出失败消息并返回None。
例子:
```
image_url = "https://example.com/image.jpg"
image_data = download_image(image_url)
if image_data:
# 处理图片数据...
else:
print("无法下载图片,下载失败。")
```
"""
for retry in range(max_retries):
try:
with requests.get(img_url, stream=True) as response:
response.raise_for_status()
return response.content
except Exception as e:
if retry < max_retries - 1:
print(
f"Failed to download image, retrying ({retry+1}/{max_retries})..."
)
else:
print("Failed to download image after multiple retries, skipping.")
return None
def get_img_urls(html_content):
soup = BeautifulSoup(html_content, "html.parser")
img_tags = soup.find("div", class_="reading-content").find_all("img")
img_urls = []
for img_tag in img_tags:
img_url = img_tag.attrs["data-src"]
img_urls.append(img_url)
return img_urls
def create_img_obj_list(img_url_list, file_name):
img_obj_list = []
for url in img_url_list:
obj = dict()
obj["file_name"] = file_name
obj["url"] = url
obj["data"] = None
img_obj_list.append(obj)
return img_obj_list
def download_images_to_img_obj(img_obj):
url = img_obj["url"]
data = download_image(url)
if data is None:
file_name = img_obj["file_name"]
print(f"{file_name}, 下载图片失败")
raise Exception(f"{file_name}, 下载图片失败")
img_obj["data"] = data
def batch_download_images_to_img_obj_list(img_obj_list):
"""
使用 ThreadPoolExecutor 创建线程池,对 img_obj_list 中的每个图片对象调用 set_img_obj_data 函数。
Args:
img_obj_list (list): 图片对象列表,每个对象包含图片的数据等信息。
Returns:
None
"""
with ThreadPoolExecutor() as executor:
executor.map(download_images_to_img_obj, img_obj_list)
def concatenate_images_vertically(img_obj_list):
"""
垂直拼接长图片
"""
try:
# 计算拼接后的长图宽度和总高度
max_width = max(
Image.open(io.BytesIO(img_obj["data"])).width for img_obj in img_obj_list
)
total_height = sum(
Image.open(io.BytesIO(img_obj["data"])).height for img_obj in img_obj_list
)
# 创建一张新的长图
long_image = Image.new("RGB", (max_width, total_height), color=(255, 255, 255))
# 依次将图片在垂直方向上拼接起来
y_offset = 0
for img_obj in img_obj_list:
img = Image.open(io.BytesIO(img_obj["data"]))
img_width, img_height = img.size
x_offset = (max_width - img_width) // 2 # 居中拼接
long_image.paste(img, (x_offset, y_offset))
y_offset += img_height
return long_image
except Exception as e:
file_name = img_obj_list[0]["file_name"]
print(f"{file_name}, 拼接图片失败:{e}")
return None
def pre_batch_task(lines):
"""
批次任务
"""
for line in lines:
line = line.strip() # 去掉每行开头和结尾的空白字符
if line:
file_name, _, url = line.partition(" - ") # 解析出 HTML 文件名和 URL 地址
print(f"{file_name}, 开始下载")
html_content = get_html(url, file_name)
img_url_list = get_img_urls(html_content)
img_obj_list = create_img_obj_list(img_url_list, file_name)
batch_download_images_to_img_obj_list(img_obj_list)
long_image = concatenate_images_vertically(img_obj_list) # 垂直拼接长图片
long_image.save(f"imgs/{file_name}.png") # 保存长图到本地
print(f"{file_name}, 完成!!")
def read_lines_from_file(task_file):
"""
从文件中读取所有行并返回一个包含行的列表。
参数:
file_name (str): 要读取的文件名。
返回值:
lines (list): 包含文件中所有行的列表。
"""
with open(task_file, "r", encoding="utf-8") as file:
lines = file.readlines()
return lines
def process_lines_in_batches(lines, batch_size):
"""
将行数据按照指定的批次大小,利用线程池并行处理。
参数:
lines (list): 包含所有行的列表。
batch_size (int): 每个批次处理的行数。
"""
# 使用 ThreadPoolExecutor 创建线程池
with ThreadPoolExecutor() as executor:
# 按照 batch_size 将行分批次处理
for i in range(0, len(lines), batch_size):
batch_lines = lines[i : i + batch_size]
executor.submit(pre_batch_task, batch_lines)
if __name__ == "__main__":
task_file = "input.txt"
batch_size = 3 # 每个线程处理的行数
lines = read_lines_from_file(task_file)
process_lines_in_batches(lines, batch_size)
print("finish, 程序结束...")