import os import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor from PIL import Image import io def get_html(url, file_name, max_retries=3): session = requests.Session() adapter = requests.adapters.HTTPAdapter(max_retries=max_retries) session.mount("http://", adapter) session.mount("https://", adapter) try: response = session.get(url) response.raise_for_status() return response.text except Exception as e: print(f"Error occurred while fetching HTML from {url}: {e}") raise Exception(f"{file_name}, 获取网页html失败") def download_image(img_url, max_retries=5): for retry in range(max_retries): try: with requests.get(img_url, stream=True) as response: response.raise_for_status() return response.content except Exception as e: if retry < max_retries - 1: print( f"Failed to download image, retrying ({retry+1}/{max_retries})..." ) else: print("Failed to download image after multiple retries, skipping.") return None def get_img_urls(html_content): soup = BeautifulSoup(html_content, "html.parser") img_tags = soup.find("div", class_="reading-content").find_all("img") img_urls = [] for img_tag in img_tags: img_url = img_tag.attrs["data-src"] img_urls.append(img_url) return img_urls def create_img_obj_list(img_url_list, file_name): img_obj_list = [] for url in img_url_list: obj = dict() obj["file_name"] = file_name obj["url"] = url obj["data"] = None img_obj_list.append(obj) return img_obj_list def set_img_obj_data(img_obj): url = img_obj["url"] data = download_image(url) if data is None: file_name = img_obj["file_name"] raise Exception(f"{file_name}, 下载图片失败") img_obj["data"] = data def concatenate_images_vertically(img_obj_list, output_file): """ 垂直拼接图片,并保存到本地目录 """ try: # 计算拼接后的长图宽度和总高度 max_width = max( Image.open(io.BytesIO(img_obj["data"])).width for img_obj in img_obj_list ) total_height = sum( Image.open(io.BytesIO(img_obj["data"])).height for img_obj in img_obj_list ) # 创建一张新的长图 long_image = Image.new("RGB", (max_width, total_height), color=(255, 255, 255)) # 依次将图片在垂直方向上拼接起来 y_offset = 0 for img_obj in img_obj_list: img = Image.open(io.BytesIO(img_obj["data"])) img_width, img_height = img.size x_offset = (max_width - img_width) // 2 # 居中拼接 long_image.paste(img, (x_offset, y_offset)) y_offset += img_height # 保存拼接后的长图到本地 long_image.save(output_file) except Exception as e: file_name = img_obj_list[0]["file_name"] print(f"{file_name}, 拼接图片失败:{e}") return None def process_batch(lines): for line in lines: line = line.strip() # 去掉每行开头和结尾的空白字符 if line: # 解析出 HTML 文件名和 URL 地址 file_name, _, url = line.partition(" - ") html_content = get_html(url, file_name) img_url_list = get_img_urls(html_content) img_obj_list = create_img_obj_list(img_url_list, file_name) # 使用 ThreadPoolExecutor 创建线程池 with ThreadPoolExecutor() as executor: # 多线程处理图片下载和替换 executor.map(set_img_obj_data, img_obj_list) concatenate_images_vertically( img_obj_list, output_file=f"imgs/{file_name}.png" ) if __name__ == "__main__": task_file = "input.txt" batch_size = 3 # 每个线程处理的行数 with open(task_file, "r", encoding="utf-8") as file: lines = file.readlines() # 使用 ThreadPoolExecutor 创建线程池 with ThreadPoolExecutor() as executor: # 按照 batch_size 将行分批次处理 for i in range(0, len(lines), batch_size): batch_lines = lines[i : i + batch_size] executor.submit(process_batch, batch_lines) print("finish, 程序结束...")