Files
crawl4zeroerr/zeroerr_crawler/base_crawler.py

700 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
基础爬虫类
提供通用的爬取逻辑
"""
import os
import time
import copy
import re
import requests
from bs4 import BeautifulSoup
import markdownify
from docx import Document
from docx.shared import Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from urllib.parse import urljoin
from abc import ABC, abstractmethod
from .config import BASE_URL, HEADERS, REQUEST_DELAY, OUTPUT_DIR
from .utils import ensure_dir, download_image, safe_filename, make_absolute_url
class BaseCrawler(ABC):
"""
基础爬虫类
提供通用的页面获取、内容提取、文档生成功能
"""
def __init__(self, task_config: dict):
"""
初始化爬虫
Args:
task_config: 任务配置字典
"""
self.config = task_config
self.name = task_config.get("name", "未命名")
self.session = requests.Session()
self.session.headers.update(HEADERS)
# 输出目录(支持自定义)
if "output_dir" in task_config:
output_dir_name = task_config["output_dir"]
else:
output_dir_name = self.name
self.output_dir = os.path.join(OUTPUT_DIR, safe_filename(output_dir_name))
self.images_dir = os.path.join(self.output_dir, "images")
ensure_dir(self.output_dir)
ensure_dir(self.images_dir)
def fetch_page(self, url: str) -> BeautifulSoup | None:
"""
获取页面内容
Args:
url: 页面URL
Returns:
BeautifulSoup 对象,失败返回 None
"""
try:
response = self.session.get(url, timeout=30)
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
print(f" 获取页面失败: {url} - {e}")
return None
def get_links_from_index(self, index_url: str) -> list[str]:
"""
从索引页提取子页面链接
Args:
index_url: 索引页URL
Returns:
链接列表
"""
full_url = make_absolute_url(BASE_URL, index_url)
print(f"正在从索引页提取链接: {full_url}")
soup = self.fetch_page(full_url)
if not soup:
return []
link_pattern = self.config.get("link_pattern", "")
link_suffix = self.config.get("link_suffix", ".html")
exclude_patterns = self.config.get("exclude_patterns", [])
links = []
for a in soup.find_all('a', href=True):
href = a['href']
# 检查是否匹配模式
if link_pattern and link_pattern not in href:
continue
if link_suffix and not href.endswith(link_suffix):
continue
# 检查是否需要排除
excluded = False
for pattern in exclude_patterns:
if pattern in href:
excluded = True
break
if excluded:
continue
# 转为绝对URL
full_link = make_absolute_url(full_url, href)
if full_link not in links:
links.append(full_link)
print(f"共发现 {len(links)} 个页面链接")
return links
def extract_title(self, soup: BeautifulSoup, url: str) -> str:
"""
提取页面标题
Args:
soup: BeautifulSoup 对象
url: 页面URL用于生成默认标题
Returns:
标题文本
"""
selector = self.config.get("title_selector", "h1")
index = self.config.get("title_index", 0)
# 优先从配置的选择器提取
tags = soup.find_all(selector)
if tags and len(tags) > index:
title = tags[index].get_text(strip=True)
if title:
return title
elif tags:
title = tags[0].get_text(strip=True)
if title:
return title
# 尝试从页面 title 标签提取
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text(strip=True)
# 移除网站名称后缀(如 " - 零差云控"
if ' - ' in title:
title = title.split(' - ')[0].strip()
if title and title.lower() not in ['about-us', 'contact-us', 'join-us']:
return title
# 尝试从 h1 标签提取(即使不在配置的选择器中)
h1_tags = soup.find_all('h1')
for h1 in h1_tags:
title = h1.get_text(strip=True)
# 跳过网站名称
if title and '零差云控' not in title and '零误差' not in title:
return title
# 最后使用URL最后一部分作为标题但进行美化
url_part = url.split('/')[-1].replace('.html', '')
# 将连字符替换为空格,并首字母大写
if '-' in url_part:
url_part = ' '.join(word.capitalize() for word in url_part.split('-'))
return url_part
def extract_content(self, soup: BeautifulSoup) -> BeautifulSoup | None:
"""
提取页面主内容
Args:
soup: BeautifulSoup 对象
Returns:
内容区域的 BeautifulSoup 对象,未找到返回 None
"""
selector = self.config.get("content_selector", "")
# 支持多个选择器,用逗号分隔
selectors = [s.strip() for s in selector.split(',')]
# 收集所有匹配的内容
all_contents = []
for sel in selectors:
if '.' in sel:
# class 选择器
tag, class_name = sel.split('.', 1)
tag = tag if tag else 'div'
# 使用 find 只匹配第一个元素,避免重复
content = soup.find(tag, class_=class_name)
else:
content = soup.find(sel)
if content:
all_contents.append(content)
# 如果没有找到任何内容
if not all_contents:
return None
# 如果只找到一个,直接返回
if len(all_contents) == 1:
return all_contents[0]
# 去重:移除嵌套或重复的内容块
unique_contents = []
seen_texts = set() # 用于记录已见过的文本内容
for content in all_contents:
is_duplicate = False
content_text = content.get_text(strip=True)
# 跳过空内容
if not content_text:
continue
# 检查是否被其他内容块包含(是其他块的子元素)
for other in all_contents:
if content is other:
continue
# 检查当前内容是否是另一个内容块的子元素
parent = content.find_parent()
while parent:
if parent is other:
is_duplicate = True
break
parent = parent.find_parent()
if is_duplicate:
break
# 如果内容文本完全相同,只保留第一个
if not is_duplicate and content_text in seen_texts:
is_duplicate = True
if not is_duplicate:
unique_contents.append(content)
seen_texts.add(content_text)
# 如果去重后只剩一个,直接返回
if len(unique_contents) == 1:
return unique_contents[0]
# 合并多个内容区域到一个容器
combined = soup.new_tag('div')
for content in unique_contents:
# 深拷贝内容以避免从原DOM中移除
combined.append(copy.deepcopy(content))
return combined
def clean_content(self, content: BeautifulSoup) -> BeautifulSoup:
"""
清洗内容,移除无用元素
Args:
content: 内容区域
Returns:
清洗后的内容
"""
# 移除 script 和 style 标签
for tag in content(['script', 'style']):
tag.decompose()
# 移除导航链接、空链接、锚点链接
for a in content.find_all('a', href=True):
href = a.get('href', '')
# 移除空链接、锚点链接、JavaScript 链接
if not href or href.startswith('#') or href.startswith('javascript:'):
# 保留链接文本,移除链接标签
a.unwrap()
# 移除导航菜单中的链接(通常包含特定 class 或 id
elif a.find_parent(['nav', 'menu', 'navigation']):
a.decompose()
# 移除空的 div、span 等标签(只包含空白字符)
for tag in content.find_all(['div', 'span', 'p']):
text = tag.get_text(strip=True)
if not text and not tag.find_all(['img', 'table']):
# 如果没有文本内容且没有图片/表格,移除
tag.decompose()
# 移除注释
from bs4 import Comment
for comment in content.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# 清理多余的空白字符
for tag in content.find_all(['p', 'div', 'span']):
if tag.string:
# 清理段落内的多余空白
tag.string = ' '.join(tag.string.split())
return content
def process_images(self, content: BeautifulSoup, page_url: str) -> list[tuple[str, str]]:
"""
处理内容中的图片,下载到本地
Args:
content: 内容区域
page_url: 页面URL用于解析相对路径
Returns:
图片信息列表 [(原URL, 本地路径), ...]
"""
images_info = []
for img in content.find_all('img'):
src = img.get('src')
if not src:
continue
# 转为绝对URL
full_url = make_absolute_url(page_url, src)
# 下载图片
local_path = download_image(full_url, self.images_dir)
if local_path:
images_info.append((full_url, local_path))
# 更新 img 标签的 src 为本地相对路径
img['src'] = os.path.relpath(local_path, self.output_dir).replace('\\', '/')
else:
# 下载失败保留原URL
img['src'] = full_url
return images_info
def content_to_markdown(self, content: BeautifulSoup) -> str:
"""
将内容转换为 Markdown
Args:
content: 内容区域
Returns:
Markdown 文本
"""
return markdownify.markdownify(str(content), heading_style="ATX")
def add_content_to_docx(self, doc: Document, content: BeautifulSoup, output_dir: str):
"""
将内容添加到 Word 文档
Args:
doc: Document 对象
content: 内容区域
output_dir: 输出目录(用于解析图片路径)
"""
# 按文档顺序处理元素,保持列表的连续性
for element in content.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'img', 'li', 'ul', 'ol', 'table']):
if element.name == 'img':
src = element.get('src', '')
# 尝试获取本地图片路径
if not src.startswith('http'):
local_path = os.path.join(output_dir, src)
else:
local_path = src
if os.path.exists(local_path):
try:
# 图片前添加空行
doc.add_paragraph()
doc.add_picture(local_path, width=Inches(5))
doc.paragraphs[-1].alignment = WD_ALIGN_PARAGRAPH.CENTER
# 图片后添加空行
doc.add_paragraph()
except Exception as e:
print(f" Word插入图片失败: {local_path} - {e}")
elif element.name.startswith('h'):
text = element.get_text(strip=True)
if text:
# HTML h1-h6 直接映射到 Word Heading 1-6
# 限制在 1-9 范围内Word 支持的最大标题级别)
level = int(element.name[1])
doc.add_heading(text, level=min(level, 9))
elif element.name in ['ul', 'ol']:
# 列表容器,跳过(列表项会单独处理)
continue
elif element.name == 'li':
text = element.get_text(strip=True)
if text:
# 检查父元素是 ul 还是 ol
parent = element.find_parent(['ul', 'ol'])
is_ordered = parent and parent.name == 'ol'
# 使用列表样式
if is_ordered:
doc.add_paragraph(text, style='List Number')
else:
doc.add_paragraph(text, style='List Bullet')
elif element.name == 'table':
# 处理表格,创建 Word 表格结构(便于 doc2md.py 解析)
self._add_table_to_docx(doc, element)
elif element.name == 'p':
text = element.get_text(strip=True)
if text:
# 跳过空段落和只包含空白字符的段落
if text.strip():
# 检查是否是列表项(某些网站用 p 标签包裹列表项)
parent = element.find_parent(['ul', 'ol'])
if parent:
is_ordered = parent.name == 'ol'
if is_ordered:
doc.add_paragraph(text, style='List Number')
else:
doc.add_paragraph(text, style='List Bullet')
else:
doc.add_paragraph(text)
def crawl_page(self, url: str) -> dict | None:
"""
爬取单个页面
Args:
url: 页面URL
Returns:
页面数据字典,失败返回 None
"""
soup = self.fetch_page(url)
if not soup:
return None
# 提取标题
title = self.extract_title(soup, url)
# 提取内容
content = self.extract_content(soup)
if not content:
print(f" 警告: 页面未找到主内容区域: {url}")
return None
# 清洗内容
content = self.clean_content(content)
# 处理图片
images = self.process_images(content, url)
# 转换为 Markdown
markdown = self.content_to_markdown(content)
return {
"url": url,
"title": title,
"content": content,
"markdown": markdown,
"images": images,
}
def save_single_page(self, page_data: dict):
"""
保存单个页面为独立的 md 和 docx 文件
Args:
page_data: 页面数据字典
"""
title = page_data["title"]
safe_title = safe_filename(title)
# 保存 Markdown
md_path = os.path.join(self.output_dir, f"{safe_title}.md")
md_content = f"# {title}\n\n"
md_content += f"**原文链接**: {page_data['url']}\n\n"
md_content += page_data["markdown"]
with open(md_path, "w", encoding="utf-8") as f:
f.write(md_content)
# 保存 Word
docx_path = os.path.join(self.output_dir, f"{safe_title}.docx")
doc = Document()
doc.add_heading(title, 0)
p = doc.add_paragraph()
p.add_run(f"原文链接: {page_data['url']}").italic = True
self.add_content_to_docx(doc, page_data["content"], self.output_dir)
doc.save(docx_path)
def save_combined_documents(self, all_pages: list[dict]):
"""
将所有页面汇总保存为一个 md 和 docx 文件
如果文件已存在会追加内容并去重基于URL
Args:
all_pages: 所有页面数据列表
"""
if not all_pages:
return
# 确定汇总文件名(使用输出目录名,而不是任务名)
output_dir_name = os.path.basename(self.output_dir)
safe_name = safe_filename(output_dir_name)
md_path = os.path.join(self.output_dir, f"{safe_name}_汇总.md")
docx_path = os.path.join(self.output_dir, f"{safe_name}_汇总.docx")
# === 处理 Markdown ===
existing_urls = set()
existing_content = ""
# 如果文件已存在读取现有内容并提取已存在的URL
if os.path.exists(md_path):
with open(md_path, "r", encoding="utf-8") as f:
existing_content = f.read()
# 提取已存在的URL用于去重
url_pattern = r'\*\*原文链接\*\*: (https?://[^\s\n]+)'
existing_urls = set(re.findall(url_pattern, existing_content))
# 过滤掉已存在的页面基于URL去重
new_pages = [page for page in all_pages if page['url'] not in existing_urls]
if not new_pages and existing_content:
print(f" 所有页面已存在,无需更新: {md_path}")
return
# 生成新内容
new_md_content = ""
for page in new_pages:
new_md_content += f"## {page['title']}\n\n"
new_md_content += f"**原文链接**: {page['url']}\n\n"
new_md_content += page["markdown"]
new_md_content += "\n\n---\n\n"
# 追加或创建文件
if existing_content:
# 追加模式:在现有内容后追加新内容
combined_md = existing_content.rstrip() + "\n\n" + new_md_content
print(f" 追加 {len(new_pages)} 篇新内容到现有文档")
else:
# 新建模式:创建新文档
combined_md = f"# {output_dir_name}全集\n\n" + new_md_content
with open(md_path, "w", encoding="utf-8") as f:
f.write(combined_md)
print(f" 汇总 Markdown: {md_path}")
# === 处理 Word 文档 ===
if os.path.exists(docx_path):
# 如果Word文档已存在重新生成因为python-docx不支持追加
doc = Document(docx_path)
# 提取已存在的URL
existing_doc_urls = set()
for para in doc.paragraphs:
if para.runs and "原文链接:" in para.text:
url_match = re.search(r'原文链接: (https?://[^\s\n]+)', para.text)
if url_match:
existing_doc_urls.add(url_match.group(1))
# 过滤新页面
new_pages_for_doc = [page for page in all_pages if page['url'] not in existing_doc_urls]
if new_pages_for_doc:
# 添加新内容
for page in new_pages_for_doc:
doc.add_heading(page["title"], level=1)
p = doc.add_paragraph()
p.add_run(f"原文链接: {page['url']}").italic = True
self.add_content_to_docx(doc, page["content"], self.output_dir)
doc.add_page_break()
doc.save(docx_path)
print(f" 追加 {len(new_pages_for_doc)} 篇新内容到 Word 文档")
else:
print(f" Word 文档无需更新: {docx_path}")
else:
# 新建Word文档
doc = Document()
doc.add_heading(f'{output_dir_name}全集', level=1)
for page in all_pages:
doc.add_heading(page["title"], level=1)
p = doc.add_paragraph()
p.add_run(f"原文链接: {page['url']}").italic = True
self.add_content_to_docx(doc, page["content"], self.output_dir)
doc.add_page_break()
doc.save(docx_path)
print(f" 汇总 Word: {docx_path}")
def run(self):
"""
执行爬取任务
"""
print(f"\n{'='*60}")
print(f"开始爬取: {self.name}")
print(f"{'='*60}")
# 获取页面链接
if "static_pages" in self.config:
# 静态页面列表
links = [make_absolute_url(BASE_URL, p) for p in self.config["static_pages"]]
elif "index_url" in self.config:
# 从索引页提取
links = self.get_links_from_index(self.config["index_url"])
else:
print("错误: 配置中未指定 static_pages 或 index_url")
return
if not links:
print("未获取到链接,跳过此任务")
return
# 爬取每个页面
all_pages = [] # 存储所有成功爬取的页面数据
for i, url in enumerate(links):
print(f"[{i+1}/{len(links)}] 正在抓取: {url}")
page_data = self.crawl_page(url)
if page_data:
all_pages.append(page_data)
# 请求延迟
time.sleep(REQUEST_DELAY)
# 生成汇总文档
if all_pages:
print(f"\n正在生成汇总文档(共 {len(all_pages)} 篇)...")
self.save_combined_documents(all_pages)
print(f"\n{self.name} 爬取完成!成功: {len(all_pages)}/{len(links)}")
print(f"输出目录: {self.output_dir}")
def _add_table_to_docx(self, doc: Document, table_element: BeautifulSoup):
"""
将 HTML 表格添加到 Word 文档
Args:
doc: Document 对象
table_element: 表格元素
"""
rows = table_element.find_all('tr')
if not rows:
return
# 获取最大列数(考虑 colspan
max_cols = 0
for row in rows:
cells = row.find_all(['td', 'th'])
col_count = 0
for cell in cells:
colspan = int(cell.get('colspan', 1))
col_count += colspan
max_cols = max(max_cols, col_count)
if max_cols == 0:
return
# 创建 Word 表格
try:
word_table = doc.add_table(rows=len(rows), cols=max_cols)
word_table.style = 'Table Grid'
for i, row in enumerate(rows):
cells = row.find_all(['td', 'th'])
col_idx = 0
for cell in cells:
if col_idx >= max_cols:
break
text = cell.get_text(strip=True)
colspan = int(cell.get('colspan', 1))
rowspan = int(cell.get('rowspan', 1))
# 设置单元格文本
word_table.rows[i].cells[col_idx].text = text
# 处理合并单元格python-docx 的合并需要特殊处理)
# 注意python-docx 的合并单元格功能有限,这里先简单处理
if colspan > 1 or rowspan > 1:
# 对于合并单元格python-docx 需要手动合并
# 这里先标记,后续可以改进
pass
col_idx += colspan
except Exception as e:
# 如果表格创建失败,降级为文本
print(f" 表格创建失败,降级为文本: {e}")
for row in rows:
cells = row.find_all(['td', 'th'])
row_text = ' | '.join([cell.get_text(strip=True) for cell in cells])
if row_text.strip():
doc.add_paragraph(row_text)
class StandardCrawler(BaseCrawler):
"""
标准爬虫类
适用于大多数页面类型
"""
pass