import os import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor from PIL import Image import io from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from utils import ArrayUtil def get_html(url, max_retries=3): session = requests.Session() adapter = requests.adapters.HTTPAdapter(max_retries=max_retries) session.mount("http://", adapter) session.mount("https://", adapter) try: response = session.get(url) response.raise_for_status() return response.text except Exception as e: print(f"Error occurred while fetching HTML from {url}: {e}") raise Exception("获取网页html失败") def download_image(img_url, max_retries=5): for retry in range(max_retries): try: with requests.get(img_url, stream=True) as response: response.raise_for_status() return response.content except Exception as e: if retry < max_retries - 1: print( f"Failed to download image, retrying ({retry+1}/{max_retries})..." ) else: print("Failed to download image after multiple retries, skipping.") return None def get_img_urls(html_content): soup = BeautifulSoup(html_content, "html.parser") img_tags = soup.find("div", class_="reading-content").find_all("img") img_urls = [] for img_tag in img_tags: img_url = img_tag.attrs["data-src"] img_urls.append(img_url) return img_urls def create_img_obj_list(img_url_list): img_obj_list = [] for url in img_url_list: obj = dict() obj["url"] = url obj["data"] = None img_obj_list.append(obj) return img_obj_list def set_img_obj_data(img_obj): url = img_obj["url"] data = download_image(url) if data is None: raise Exception("下载图片失败") img_obj["data"] = data def save_images_to_directory(img_obj_list, directory_path): try: # 创建保存图片的目录(如果不存在) os.makedirs(directory_path, exist_ok=True) for idx, img_obj in enumerate(img_obj_list): url = img_obj["url"] data = img_obj["data"] # 获取图片的扩展名(假设url以图片扩展名结尾) extension = os.path.splitext(url)[1] # 图片文件名,这里用序号作为文件名 file_name = f"image_{idx}{extension}" file_path = os.path.join(directory_path, file_name) # 将图片数据写入本地文件 with open(file_path, "wb") as file: file.write(data) print("图片保存成功!") except Exception as e: print(f"图片保存失败:{e}") def generate_pdf_from_images(img_obj_list, output_file): try: c = canvas.Canvas(output_file, pagesize=letter) for img_obj in img_obj_list: # 从图片对象的 data 字段中创建图像对象 img_data = img_obj["data"] img = Image.open(io.BytesIO(img_data)) # 将图像大小调整为 PDF 页面大小 img_width, img_height = img.size pdf_width, pdf_height = letter scale = min(pdf_width / img_width, pdf_height / img_height) new_width, new_height = int(img_width * scale), int(img_height * scale) img = img.resize((new_width, new_height), Image.ANTIALIAS) # 将图像添加到 PDF 页面中 c.drawInlineImage(img, 0, 0, width=new_width, height=new_height) # 创建新的页面 c.showPage() c.save() print("PDF 生成成功!") except Exception as e: print(f"PDF 生成失败:{e}") def concatenate_images_vertically(img_obj_list, output_file): try: # 计算拼接后的长图宽度和总高度 max_width = max( Image.open(io.BytesIO(img_obj["data"])).width for img_obj in img_obj_list ) total_height = sum( Image.open(io.BytesIO(img_obj["data"])).height for img_obj in img_obj_list ) # 创建一张新的长图 long_image = Image.new("RGB", (max_width, total_height), color=(255, 255, 255)) # 依次将图片在垂直方向上拼接起来 y_offset = 0 for img_obj in img_obj_list: img = Image.open(io.BytesIO(img_obj["data"])) img_width, img_height = img.size x_offset = (max_width - img_width) // 2 # 居中拼接 long_image.paste(img, (x_offset, y_offset)) y_offset += img_height # 保存拼接后的长图到本地 # long_image.save(output_file) except Exception as e: print(f"拼接图片失败:{e}") return None def process_batch(lines): for line in lines: line = line.strip() # 去掉每行开头和结尾的空白字符 if line: # 解析出 HTML 文件名和 URL 地址 file_name, _, url = line.partition(" - ") html_content = get_html(url) img_url_list = get_img_urls(html_content) img_obj_list = create_img_obj_list(img_url_list) # 使用 ThreadPoolExecutor 创建线程池 with ThreadPoolExecutor() as executor: # 多线程处理图片下载和替换 executor.map(set_img_obj_data, img_obj_list) # save_images_to_directory(img_obj_list, directory_path="imgs") concatenate_images_vertically( img_obj_list, output_file=f"imgs/{file_name}.pdf" ) if __name__ == "__main__": file_name = "input.txt" batch_size = 3 # 每个线程处理的行数 with open(file_name, "r", encoding="utf-8") as file: lines = file.readlines() # 使用 ThreadPoolExecutor 创建线程池 with ThreadPoolExecutor() as executor: # 按照 batch_size 将行分批次处理 for i in range(0, len(lines), batch_size): batch_lines = lines[i : i + batch_size] executor.submit(process_batch, batch_lines)