Files
crawl4zeroerr/zeroerr_crawler/base_crawler.py

868 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
基础爬虫类
提供通用的爬取逻辑
"""
import os
import time
import copy
import re
import requests
from bs4 import BeautifulSoup
import markdownify
from docx import Document
from docx.shared import Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from urllib.parse import urljoin
from abc import ABC, abstractmethod
from .config import BASE_URL, HEADERS, REQUEST_DELAY, OUTPUT_DIR
from .utils import ensure_dir, download_image, safe_filename, make_absolute_url
from .extract_abstract import generate_abstract
from .post_process import post_process_docx_headings
class BaseCrawler(ABC):
"""
基础爬虫类
提供通用的页面获取、内容提取、文档生成功能
"""
def __init__(self, task_config: dict):
"""
初始化爬虫
Args:
task_config: 任务配置字典
"""
self.config = task_config
self.name = task_config.get("name", "未命名")
self.session = requests.Session()
self.session.headers.update(HEADERS)
# 输出目录(支持自定义)
if "output_dir" in task_config:
output_dir_name = task_config["output_dir"]
else:
output_dir_name = self.name
self.output_dir = os.path.join(OUTPUT_DIR, safe_filename(output_dir_name))
self.images_dir = os.path.join(self.output_dir, "images")
ensure_dir(self.output_dir)
ensure_dir(self.images_dir)
def fetch_page(self, url: str) -> BeautifulSoup | None:
"""
获取页面内容
Args:
url: 页面URL
Returns:
BeautifulSoup 对象,失败返回 None
"""
try:
response = self.session.get(url, timeout=30)
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
print(f" 获取页面失败: {url} - {e}")
return None
def get_links_from_index(self, index_url: str) -> list[str]:
"""
从索引页提取子页面链接
Args:
index_url: 索引页URL
Returns:
链接列表
"""
full_url = make_absolute_url(BASE_URL, index_url)
print(f"正在从索引页提取链接: {full_url}")
soup = self.fetch_page(full_url)
if not soup:
return []
link_pattern = self.config.get("link_pattern", "")
link_suffix = self.config.get("link_suffix", ".html")
exclude_patterns = self.config.get("exclude_patterns", [])
links = []
for a in soup.find_all('a', href=True):
href = a['href']
# 检查是否匹配模式
if link_pattern and link_pattern not in href:
continue
if link_suffix and not href.endswith(link_suffix):
continue
# 检查是否需要排除
excluded = False
for pattern in exclude_patterns:
if pattern in href:
excluded = True
break
if excluded:
continue
# 转为绝对URL
full_link = make_absolute_url(full_url, href)
if full_link not in links:
links.append(full_link)
print(f"共发现 {len(links)} 个页面链接")
return links
def extract_title(self, soup: BeautifulSoup, url: str) -> str:
"""
提取页面标题
Args:
soup: BeautifulSoup 对象
url: 页面URL用于生成默认标题
Returns:
标题文本
"""
selector = self.config.get("title_selector", "h1")
index = self.config.get("title_index", 0)
# 支持多个选择器,用逗号分隔(类似 extract_content 的处理方式)
selectors = [s.strip() for s in selector.split(',')]
# 收集所有匹配的标签
all_tags = []
for sel in selectors:
# 对于简单的标签名(如 "h1", "h2"),直接查找
if sel in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'title']:
found_tags = soup.find_all(sel)
all_tags.extend(found_tags)
else:
# 对于其他选择器,尝试查找
found_tags = soup.find_all(sel)
all_tags.extend(found_tags)
# 优先从配置的选择器提取
if all_tags and len(all_tags) > index:
title = all_tags[index].get_text(strip=True)
if title:
return title
elif all_tags:
title = all_tags[0].get_text(strip=True)
if title:
return title
# 尝试从页面 title 标签提取
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text(strip=True)
# 移除网站名称后缀(如 " - 零差云控"
if ' - ' in title:
title = title.split(' - ')[0].strip()
if title and title.lower() not in ['about-us', 'contact-us', 'join-us']:
return title
# 尝试从 h1 标签提取(即使不在配置的选择器中)
h1_tags = soup.find_all('h1')
for h1 in h1_tags:
title = h1.get_text(strip=True)
# 跳过网站名称
if title and '零差云控' not in title and '零误差' not in title:
return title
# 最后使用URL最后一部分作为标题但进行美化
url_part = url.split('/')[-1].replace('.html', '')
# 将连字符替换为空格,并首字母大写
if '-' in url_part:
url_part = ' '.join(word.capitalize() for word in url_part.split('-'))
return url_part
def extract_content(self, soup: BeautifulSoup) -> BeautifulSoup | None:
"""
提取页面主内容
Args:
soup: BeautifulSoup 对象
Returns:
内容区域的 BeautifulSoup 对象,未找到返回 None
"""
selector = self.config.get("content_selector", "")
# 支持多个选择器,用逗号分隔
selectors = [s.strip() for s in selector.split(',')]
# 收集所有匹配的内容
all_contents = []
for sel in selectors:
if '.' in sel:
# class 选择器
tag, class_name = sel.split('.', 1)
tag = tag if tag else 'div'
# 使用 find 只匹配第一个元素,避免重复
content = soup.find(tag, class_=class_name)
else:
content = soup.find(sel)
if content:
all_contents.append(content)
# 如果没有找到任何内容
if not all_contents:
return None
# 如果只找到一个,直接返回
if len(all_contents) == 1:
return all_contents[0]
# 去重:移除嵌套或重复的内容块
unique_contents = []
seen_texts = set() # 用于记录已见过的文本内容
for content in all_contents:
is_duplicate = False
content_text = content.get_text(strip=True)
# 跳过空内容
if not content_text:
continue
# 检查是否被其他内容块包含(是其他块的子元素)
for other in all_contents:
if content is other:
continue
# 检查当前内容是否是另一个内容块的子元素
parent = content.find_parent()
while parent:
if parent is other:
is_duplicate = True
break
parent = parent.find_parent()
if is_duplicate:
break
# 如果内容文本完全相同,只保留第一个
if not is_duplicate and content_text in seen_texts:
is_duplicate = True
if not is_duplicate:
unique_contents.append(content)
seen_texts.add(content_text)
# 如果去重后只剩一个,直接返回
if len(unique_contents) == 1:
return unique_contents[0]
# 合并多个内容区域到一个容器
combined = soup.new_tag('div')
for content in unique_contents:
# 深拷贝内容以避免从原DOM中移除
combined.append(copy.deepcopy(content))
return combined
def clean_content(self, content: BeautifulSoup) -> BeautifulSoup:
"""
清洗内容,移除无用元素
Args:
content: 内容区域
Returns:
清洗后的内容
"""
# 移除 script 和 style 标签
for tag in content(['script', 'style']):
tag.decompose()
# 移除导航链接、空链接、锚点链接
for a in content.find_all('a', href=True):
href = a.get('href', '')
# 移除空链接、锚点链接、JavaScript 链接
if not href or href.startswith('#') or href.startswith('javascript:'):
# 保留链接文本,移除链接标签
a.unwrap()
# 移除导航菜单中的链接(通常包含特定 class 或 id
elif a.find_parent(['nav', 'menu', 'navigation']):
a.decompose()
# 移除空的 div、span 等标签(只包含空白字符)
for tag in content.find_all(['div', 'span', 'p']):
text = tag.get_text(strip=True)
if not text and not tag.find_all(['img', 'table']):
# 如果没有文本内容且没有图片/表格,移除
tag.decompose()
# 移除注释
from bs4 import Comment
for comment in content.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# 清理多余的空白字符
for tag in content.find_all(['p', 'div', 'span']):
if tag.string:
# 清理段落内的多余空白
tag.string = ' '.join(tag.string.split())
return content
def process_images(self, content: BeautifulSoup, page_url: str) -> list[tuple[str, str]]:
"""
处理内容中的图片,下载到本地
Args:
content: 内容区域
page_url: 页面URL用于解析相对路径
Returns:
图片信息列表 [(原URL, 本地路径), ...]
"""
images_info = []
for img in content.find_all('img'):
src = img.get('src')
if not src:
continue
# 转为绝对URL
full_url = make_absolute_url(page_url, src)
# 下载图片
local_path = download_image(full_url, self.images_dir)
if local_path:
images_info.append((full_url, local_path))
# 更新 img 标签的 src 为本地相对路径
img['src'] = os.path.relpath(local_path, self.output_dir).replace('\\', '/')
else:
# 下载失败保留原URL
img['src'] = full_url
return images_info
def content_to_markdown(self, content: BeautifulSoup, page_title: str = None) -> str:
"""
将内容转换为 Markdown
Args:
content: 内容区域
page_title: 页面标题如果提供会移除内容中与标题重复的第一个h1/h2标签
Returns:
Markdown 文本
"""
# 创建内容的副本,避免修改原始内容
content_copy = BeautifulSoup(str(content), 'html.parser')
# 如果提供了页面标题,检查并移除内容中与标题重复的标签
if page_title:
# 移除与标题完全相同的第一个h1
first_h1 = content_copy.find('h1')
if first_h1:
h1_text = first_h1.get_text(strip=True)
if h1_text == page_title:
first_h1.decompose()
# 移除与标题完全相同的第一个h2
first_h2 = content_copy.find('h2')
if first_h2:
h2_text = first_h2.get_text(strip=True)
if h2_text == page_title:
first_h2.decompose()
# 检查标题是否包含"型号:"前缀如果是也移除内容中只包含产品名称的h2
# 例如:标题是"型号eCoder11",内容中有"eCoder11"的h2
if '型号:' in page_title or '型号:' in page_title:
product_name = page_title.replace('型号:', '').replace('型号:', '').strip()
if product_name:
# 查找第一个只包含产品名称的h2
for h2 in content_copy.find_all('h2'):
h2_text = h2.get_text(strip=True)
if h2_text == product_name:
h2.decompose()
break # 只移除第一个匹配的
# 页面内容中的 h1 降级为 h2与 Word 文档处理一致)
# 因为页面标题已经是二级标题(##),所以内容中的 h1 应该降级为二级标题
for h1 in content_copy.find_all('h1'):
h1.name = 'h2'
return markdownify.markdownify(str(content_copy), heading_style="ATX")
def add_content_to_docx(self, doc: Document, content: BeautifulSoup, output_dir: str, page_title: str = None):
"""
将内容添加到 Word 文档
Args:
doc: Document 对象
content: 内容区域
output_dir: 输出目录(用于解析图片路径)
page_title: 页面标题如果提供会跳过内容中与标题重复的第一个h1标签
"""
# 如果提供了页面标题创建内容副本并移除重复的h1
if page_title:
content = BeautifulSoup(str(content), 'html.parser')
first_h1 = content.find('h1')
if first_h1:
h1_text = first_h1.get_text(strip=True)
if h1_text == page_title:
first_h1.decompose() # 移除该标签
# 按文档顺序处理元素,保持列表的连续性
for element in content.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'img', 'li', 'ul', 'ol', 'table']):
if element.name == 'img':
src = element.get('src', '')
# 尝试获取本地图片路径
if not src.startswith('http'):
local_path = os.path.join(output_dir, src)
else:
local_path = src
if os.path.exists(local_path):
try:
# 图片前添加空行
doc.add_paragraph()
doc.add_picture(local_path, width=Inches(5))
doc.paragraphs[-1].alignment = WD_ALIGN_PARAGRAPH.CENTER
# 图片后添加空行
doc.add_paragraph()
except Exception as e:
print(f" Word插入图片失败: {local_path} - {e}")
elif element.name.startswith('h'):
text = element.get_text(strip=True)
if text:
# 对于页面内容中的标题h1 转换为 Heading 2h2-h6 保持原层级
# 因为页面标题已经是 Heading 1所以内容中的 h1 应该降级为 Heading 2
original_level = int(element.name[1])
if original_level == 1:
# 页面内容中的 h1 转换为 Heading 2
word_level = 2
print(f" 标题层级转换: h1 '{text}' → Heading 2")
else:
# h2-h6 保持原层级h2→Heading 2, h3→Heading 3, ...
word_level = original_level
doc.add_heading(text, level=min(word_level, 9))
elif element.name in ['ul', 'ol']:
# 列表容器,跳过(列表项会单独处理)
continue
elif element.name == 'li':
text = element.get_text(strip=True)
if text:
# 检查父元素是 ul 还是 ol
parent = element.find_parent(['ul', 'ol'])
is_ordered = parent and parent.name == 'ol'
# 使用列表样式
if is_ordered:
doc.add_paragraph(text, style='List Number')
else:
doc.add_paragraph(text, style='List Bullet')
elif element.name == 'table':
# 处理表格,创建 Word 表格结构(便于 doc2md.py 解析)
self._add_table_to_docx(doc, element)
elif element.name == 'p':
text = element.get_text(strip=True)
if text:
# 跳过空段落和只包含空白字符的段落
if text.strip():
# 检查是否是列表项(某些网站用 p 标签包裹列表项)
parent = element.find_parent(['ul', 'ol'])
if parent:
is_ordered = parent.name == 'ol'
if is_ordered:
doc.add_paragraph(text, style='List Number')
else:
doc.add_paragraph(text, style='List Bullet')
else:
doc.add_paragraph(text)
def crawl_page(self, url: str) -> dict | None:
"""
爬取单个页面
Args:
url: 页面URL
Returns:
页面数据字典,失败返回 None
"""
soup = self.fetch_page(url)
if not soup:
return None
# 提取标题
title = self.extract_title(soup, url)
# 提取内容
content = self.extract_content(soup)
if not content:
print(f" 警告: 页面未找到主内容区域: {url}")
return None
# 清洗内容
content = self.clean_content(content)
# 处理图片
images = self.process_images(content, url)
# 转换为 Markdown传入标题用于去除重复的h1标签
markdown = self.content_to_markdown(content, title)
return {
"url": url,
"title": title,
"content": content,
"markdown": markdown,
"images": images,
}
def save_single_page(self, page_data: dict):
"""
保存单个页面为独立的 md 和 docx 文件
Args:
page_data: 页面数据字典
"""
title = page_data["title"]
safe_title = safe_filename(title)
# 保存 Markdown
md_path = os.path.join(self.output_dir, f"{safe_title}.md")
md_content = f"# {title}\n\n"
md_content += f"**原文链接**: {page_data['url']}\n\n"
md_content += page_data["markdown"]
with open(md_path, "w", encoding="utf-8") as f:
f.write(md_content)
# 保存 Word
docx_path = os.path.join(self.output_dir, f"{safe_title}.docx")
doc = Document()
doc.add_heading(title, 0)
p = doc.add_paragraph()
p.add_run(f"原文链接: {page_data['url']}").italic = True
self.add_content_to_docx(doc, page_data["content"], self.output_dir, title)
doc.save(docx_path)
def save_combined_documents(self, all_pages: list[dict]):
"""
将所有页面汇总保存为一个 md 和 docx 文件
如果文件已存在会追加内容并去重基于URL
Args:
all_pages: 所有页面数据列表
"""
if not all_pages:
return
# 确定汇总文件名(使用输出目录名,而不是任务名)
output_dir_name = os.path.basename(self.output_dir)
safe_name = safe_filename(output_dir_name)
md_path = os.path.join(self.output_dir, f"{safe_name}_汇总.md")
docx_path = os.path.join(self.output_dir, f"{safe_name}_汇总.docx")
# === 处理 Markdown ===
existing_urls = set()
existing_content = ""
existing_pages = [] # 存储已存在的页面信息(用于重新生成摘要)
# 如果文件已存在读取现有内容并提取已存在的URL和页面信息
if os.path.exists(md_path):
with open(md_path, "r", encoding="utf-8") as f:
existing_content = f.read()
# 提取已存在的URL用于去重
url_pattern = r'\*\*原文链接\*\*: (https?://[^\s\n]+)'
existing_urls = set(re.findall(url_pattern, existing_content))
# 提取已存在的页面信息标题、URL和部分内容用于重新生成摘要
# 匹配格式:## 标题\n\n**原文链接**: URL\n\n内容...
# 使用更复杂的正则来匹配每个页面的完整内容块
page_blocks = re.split(r'\n\n---\n\n', existing_content)
for block in page_blocks:
# 匹配页面标题和URL
title_match = re.search(r'^##\s+([^\n]+)', block, re.MULTILINE)
url_match = re.search(r'\*\*原文链接\*\*:\s+(https?://[^\s\n]+)', block)
if title_match and url_match:
title = title_match.group(1).strip()
url = url_match.group(1).strip()
# 提取内容部分跳过标题和URL行
content_start = url_match.end()
markdown_content = block[content_start:].strip()
# 只取前500字符作为预览
markdown_preview = markdown_content[:500] if len(markdown_content) > 500 else markdown_content
existing_pages.append({
'title': title,
'url': url,
'markdown': markdown_preview
})
# 过滤掉已存在的页面基于URL去重
new_pages = [page for page in all_pages if page['url'] not in existing_urls]
if not new_pages and existing_content:
print(f" 所有页面已存在,无需更新: {md_path}")
return
# 生成新内容
new_md_content = ""
for page in new_pages:
new_md_content += f"## {page['title']}\n\n"
new_md_content += f"**原文链接**: {page['url']}\n\n"
new_md_content += page["markdown"]
new_md_content += "\n\n---\n\n"
# 合并所有页面(已存在的 + 新添加的),用于生成摘要
all_pages_for_abstract = existing_pages + all_pages
# 获取索引页URL如果存在
index_url_full = None
if "index_url" in self.config:
index_url_full = make_absolute_url(BASE_URL, self.config["index_url"])
# 生成摘要新建文档时生成追加新内容时也重新生成确保包含所有URL
abstract = None
if not existing_content:
# 新建文档:使用当前爬取的页面生成摘要
print(f" 正在生成文档摘要...")
abstract = generate_abstract(all_pages, output_dir_name, index_url_full)
else:
# 追加模式:重新生成摘要,包含所有页面(已存在的 + 新添加的)
print(f" 正在重新生成文档摘要(包含所有 {len(all_pages_for_abstract)} 篇)...")
abstract = generate_abstract(all_pages_for_abstract, output_dir_name, index_url_full)
# 追加或创建文件
if existing_content:
# 追加模式:更新摘要部分,然后在现有内容后追加新内容
# 使用正则表达式替换摘要部分(从标题后到第一个"---"分隔符之间的内容)
# 匹配模式:标题行 + 摘要内容 + 分隔符
title_pattern = r'^#\s+.*?全集\s*\n\n'
separator_pattern = r'\n\n---\n\n'
# 查找标题后的第一个分隔符位置
title_match = re.search(title_pattern, existing_content, re.MULTILINE)
if title_match:
title_end = title_match.end()
# 查找第一个分隔符
separator_match = re.search(separator_pattern, existing_content[title_end:])
if separator_match:
# 替换摘要部分
separator_start = title_end + separator_match.start()
separator_end = title_end + separator_match.end()
# 保留标题和分隔符,替换中间的内容
combined_md = existing_content[:title_end]
if abstract:
combined_md += abstract
combined_md += existing_content[separator_end:]
# 追加新内容
combined_md = combined_md.rstrip() + "\n\n" + new_md_content
else:
# 如果没有找到分隔符,说明可能没有摘要,直接添加摘要和新内容
combined_md = existing_content[:title_end]
if abstract:
combined_md += abstract + "\n\n---\n\n"
combined_md += existing_content[title_end:].lstrip()
combined_md = combined_md.rstrip() + "\n\n" + new_md_content
else:
# 如果没有找到标题,说明格式异常,直接追加
combined_md = existing_content.rstrip() + "\n\n" + new_md_content
print(f" 追加 {len(new_pages)} 篇新内容到现有文档,并更新摘要")
else:
# 新建模式:创建新文档
# 构建文档内容:标题 + 摘要 + 正文
combined_md = f"# {output_dir_name}全集\n\n"
if abstract:
combined_md += f"{abstract}\n\n---\n\n"
combined_md += new_md_content
with open(md_path, "w", encoding="utf-8") as f:
f.write(combined_md)
print(f" 汇总 Markdown: {md_path}")
# === 处理 Word 文档 ===
if os.path.exists(docx_path):
# 如果Word文档已存在重新生成因为python-docx不支持追加
doc = Document(docx_path)
# 提取已存在的URL
existing_doc_urls = set()
for para in doc.paragraphs:
if para.runs and "原文链接:" in para.text:
url_match = re.search(r'原文链接: (https?://[^\s\n]+)', para.text)
if url_match:
existing_doc_urls.add(url_match.group(1))
# 过滤新页面
new_pages_for_doc = [page for page in all_pages if page['url'] not in existing_doc_urls]
if new_pages_for_doc:
# 添加新内容
for page in new_pages_for_doc:
doc.add_heading(page["title"], level=1)
p = doc.add_paragraph()
p.add_run(f"原文链接: {page['url']}").italic = True
self.add_content_to_docx(doc, page["content"], self.output_dir, page["title"])
doc.add_page_break()
doc.save(docx_path)
print(f" 追加 {len(new_pages_for_doc)} 篇新内容到 Word 文档")
# 后处理:优化连续标题
post_process_docx_headings(docx_path)
else:
print(f" Word 文档无需更新: {docx_path}")
else:
# 新建Word文档
doc = Document()
doc.add_heading(f'{output_dir_name}全集', level=1)
# 添加摘要只在新建时生成复用Markdown部分生成的摘要
if not existing_content and abstract:
# 将Markdown格式的摘要转换为Word格式
# 处理Markdown链接将 [文本](URL) 转换为 "文本 (URL)" 格式
abstract_text = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\1 (\2)', abstract)
# 移除Markdown加粗标记
abstract_text = abstract_text.replace('**', '')
# 添加摘要段落
for line in abstract_text.split('\n'):
if line.strip():
doc.add_paragraph(line.strip())
else:
doc.add_paragraph() # 空行
doc.add_paragraph() # 空行
doc.add_paragraph("" * 50) # 分隔线
doc.add_paragraph() # 空行
for page in all_pages:
doc.add_heading(page["title"], level=1)
p = doc.add_paragraph()
p.add_run(f"原文链接: {page['url']}").italic = True
self.add_content_to_docx(doc, page["content"], self.output_dir, page["title"])
doc.add_page_break()
doc.save(docx_path)
print(f" 汇总 Word: {docx_path}")
# 后处理:优化连续标题
post_process_docx_headings(docx_path)
def run(self):
"""
执行爬取任务
"""
print(f"\n{'='*60}")
print(f"开始爬取: {self.name}")
print(f"{'='*60}")
# 获取页面链接
if "static_pages" in self.config:
# 静态页面列表
links = [make_absolute_url(BASE_URL, p) for p in self.config["static_pages"]]
elif "index_url" in self.config:
# 从索引页提取
links = self.get_links_from_index(self.config["index_url"])
else:
print("错误: 配置中未指定 static_pages 或 index_url")
return
if not links:
print("未获取到链接,跳过此任务")
return
# 爬取每个页面
all_pages = [] # 存储所有成功爬取的页面数据
for i, url in enumerate(links):
print(f"[{i+1}/{len(links)}] 正在抓取: {url}")
page_data = self.crawl_page(url)
if page_data:
all_pages.append(page_data)
# 请求延迟
time.sleep(REQUEST_DELAY)
# 生成汇总文档
if all_pages:
print(f"\n正在生成汇总文档(共 {len(all_pages)} 篇)...")
self.save_combined_documents(all_pages)
print(f"\n{self.name} 爬取完成!成功: {len(all_pages)}/{len(links)}")
print(f"输出目录: {self.output_dir}")
def _add_table_to_docx(self, doc: Document, table_element: BeautifulSoup):
"""
将 HTML 表格添加到 Word 文档
Args:
doc: Document 对象
table_element: 表格元素
"""
rows = table_element.find_all('tr')
if not rows:
return
# 获取最大列数(考虑 colspan
max_cols = 0
for row in rows:
cells = row.find_all(['td', 'th'])
col_count = 0
for cell in cells:
colspan = int(cell.get('colspan', 1))
col_count += colspan
max_cols = max(max_cols, col_count)
if max_cols == 0:
return
# 创建 Word 表格
try:
word_table = doc.add_table(rows=len(rows), cols=max_cols)
word_table.style = 'Table Grid'
for i, row in enumerate(rows):
cells = row.find_all(['td', 'th'])
col_idx = 0
for cell in cells:
if col_idx >= max_cols:
break
text = cell.get_text(strip=True)
colspan = int(cell.get('colspan', 1))
rowspan = int(cell.get('rowspan', 1))
# 设置单元格文本
word_table.rows[i].cells[col_idx].text = text
# 处理合并单元格python-docx 的合并需要特殊处理)
# 注意python-docx 的合并单元格功能有限,这里先简单处理
if colspan > 1 or rowspan > 1:
# 对于合并单元格python-docx 需要手动合并
# 这里先标记,后续可以改进
pass
col_idx += colspan
except Exception as e:
# 如果表格创建失败,降级为文本
print(f" 表格创建失败,降级为文本: {e}")
for row in rows:
cells = row.find_all(['td', 'th'])
row_text = ' | '.join([cell.get_text(strip=True) for cell in cells])
if row_text.strip():
doc.add_paragraph(row_text)
class StandardCrawler(BaseCrawler):
"""
标准爬虫类
适用于大多数页面类型
"""
pass