diff --git a/1_零差云控官网爬虫方案.md b/1_零差云控官网爬虫方案.md index 66172d0..71eb7c7 100644 --- a/1_零差云控官网爬虫方案.md +++ b/1_零差云控官网爬虫方案.md @@ -16,6 +16,7 @@ crawl_0131(1)/ │ ├── base_crawler.py # 基础爬虫类 │ ├── product_crawler.py # 产品页专用爬虫(处理 eRob、eCoder、配件) │ ├── extract_abstract.py # 摘要提取模块(使用大模型生成文档摘要) +│ ├── post_process.py # Word 文档后处理模块(优化连续标题) │ └── utils.py # 工具函数 └── output/ # 输出目录 ``` @@ -70,9 +71,13 @@ python main.py - 使用 `ProductCrawler` 处理产品页面(机器人关节、编码器、配件) - 支持多种页面布局和内容选择器 - 自动去重标题,优化 Word 文档格式 +- **层级处理**:Markdown 和 Word 采用相同的层级处理规则,确保文档结构一致 + - 页面内容中的 h1 自动降级为二级标题,确保层级结构清晰 + - Word 文档生成后自动进行后处理,优化连续标题 - **摘要提取**:`extract_abstract.py` 模块使用大模型(OpenAI API)为每个分类的文档集合生成摘要 - 面向客户售前咨询场景,生成100-200字的简洁摘要 - 自动生成相关链接列表 + - 摘要前自动添加索引页链接 - 摘要失败时自动降级为仅生成链接列表 ## 待处理项目 diff --git a/zeroerr_crawler/base_crawler.py b/zeroerr_crawler/base_crawler.py index 7952e03..56ddd1a 100644 --- a/zeroerr_crawler/base_crawler.py +++ b/zeroerr_crawler/base_crawler.py @@ -19,6 +19,7 @@ from abc import ABC, abstractmethod from .config import BASE_URL, HEADERS, REQUEST_DELAY, OUTPUT_DIR from .utils import ensure_dir, download_image, safe_filename, make_absolute_url from .extract_abstract import generate_abstract +from .post_process import post_process_docx_headings class BaseCrawler(ABC): @@ -354,11 +355,11 @@ class BaseCrawler(ABC): Returns: Markdown 文本 """ + # 创建内容的副本,避免修改原始内容 + content_copy = BeautifulSoup(str(content), 'html.parser') + # 如果提供了页面标题,检查并移除内容中与标题重复的标签 if page_title: - # 创建内容的副本,避免修改原始内容 - content_copy = BeautifulSoup(str(content), 'html.parser') - # 移除与标题完全相同的第一个h1 first_h1 = content_copy.find('h1') if first_h1: @@ -384,9 +385,13 @@ class BaseCrawler(ABC): if h2_text == product_name: h2.decompose() break # 只移除第一个匹配的 - - return markdownify.markdownify(str(content_copy), heading_style="ATX") - return markdownify.markdownify(str(content), heading_style="ATX") + + # 页面内容中的 h1 降级为 h2(与 Word 文档处理一致) + # 因为页面标题已经是二级标题(##),所以内容中的 h1 应该降级为二级标题 + for h1 in content_copy.find_all('h1'): + h1.name = 'h2' + + return markdownify.markdownify(str(content_copy), heading_style="ATX") def add_content_to_docx(self, doc: Document, content: BeautifulSoup, output_dir: str, page_title: str = None): """ @@ -431,10 +436,17 @@ class BaseCrawler(ABC): elif element.name.startswith('h'): text = element.get_text(strip=True) if text: - # HTML h1-h6 直接映射到 Word Heading 1-6 - # 限制在 1-9 范围内(Word 支持的最大标题级别) - level = int(element.name[1]) - doc.add_heading(text, level=min(level, 9)) + # 对于页面内容中的标题,h1 转换为 Heading 2,h2-h6 保持原层级 + # 因为页面标题已经是 Heading 1,所以内容中的 h1 应该降级为 Heading 2 + original_level = int(element.name[1]) + if original_level == 1: + # 页面内容中的 h1 转换为 Heading 2 + word_level = 2 + print(f" 标题层级转换: h1 '{text}' → Heading 2") + else: + # h2-h6 保持原层级(h2→Heading 2, h3→Heading 3, ...) + word_level = original_level + doc.add_heading(text, level=min(word_level, 9)) elif element.name in ['ul', 'ol']: # 列表容器,跳过(列表项会单独处理) @@ -613,16 +625,21 @@ class BaseCrawler(ABC): # 合并所有页面(已存在的 + 新添加的),用于生成摘要 all_pages_for_abstract = existing_pages + all_pages + # 获取索引页URL(如果存在) + index_url_full = None + if "index_url" in self.config: + index_url_full = make_absolute_url(BASE_URL, self.config["index_url"]) + # 生成摘要(新建文档时生成,追加新内容时也重新生成,确保包含所有URL) abstract = None if not existing_content: # 新建文档:使用当前爬取的页面生成摘要 print(f" 正在生成文档摘要...") - abstract = generate_abstract(all_pages, output_dir_name) + abstract = generate_abstract(all_pages, output_dir_name, index_url_full) else: # 追加模式:重新生成摘要,包含所有页面(已存在的 + 新添加的) print(f" 正在重新生成文档摘要(包含所有 {len(all_pages_for_abstract)} 篇)...") - abstract = generate_abstract(all_pages_for_abstract, output_dir_name) + abstract = generate_abstract(all_pages_for_abstract, output_dir_name, index_url_full) # 追加或创建文件 if existing_content: @@ -697,6 +714,8 @@ class BaseCrawler(ABC): doc.add_page_break() doc.save(docx_path) print(f" 追加 {len(new_pages_for_doc)} 篇新内容到 Word 文档") + # 后处理:优化连续标题 + post_process_docx_headings(docx_path) else: print(f" Word 文档无需更新: {docx_path}") else: @@ -730,6 +749,8 @@ class BaseCrawler(ABC): doc.save(docx_path) print(f" 汇总 Word: {docx_path}") + # 后处理:优化连续标题 + post_process_docx_headings(docx_path) def run(self): """ diff --git a/zeroerr_crawler/extract_abstract.py b/zeroerr_crawler/extract_abstract.py index be3be0d..f73ffae 100644 --- a/zeroerr_crawler/extract_abstract.py +++ b/zeroerr_crawler/extract_abstract.py @@ -11,13 +11,14 @@ API_KEY = "sk-LX1g8KkG61S6eUaVD567C0C187D4452c90F9E6985cDf3586" MODEL = "Yiming" -def generate_abstract(all_pages: list[dict], category_name: str) -> str: +def generate_abstract(all_pages: list[dict], category_name: str, index_url: str = None) -> str: """ 使用大模型生成文档摘要 Args: all_pages: 所有页面数据列表,每个元素包含 'title', 'url', 'markdown' 等字段 category_name: 文档类别名称(如"应用案例") + index_url: 索引页完整URL(可选),如果提供则会在摘要前添加原文链接 Returns: 摘要文本(Markdown格式),包含摘要内容和链接列表 @@ -75,8 +76,11 @@ def generate_abstract(all_pages: list[dict], category_name: str) -> str: url = page.get('url', '') links_section += f"{i}. [{title}]({url})\n" - # 组合摘要和链接 - result = f"{abstract_text}{links_section}" + # 组合摘要和链接,如果提供了索引页URL,则在摘要前添加原文链接 + if index_url: + result = f"原文链接: {index_url}\n\n{abstract_text}{links_section}" + else: + result = f"{abstract_text}{links_section}" return result @@ -88,4 +92,8 @@ def generate_abstract(all_pages: list[dict], category_name: str) -> str: title = page.get('title', '未命名') url = page.get('url', '') links_section += f"{i}. [{title}]({url})\n" + + # 如果提供了索引页URL,在链接列表前添加原文链接 + if index_url: + return f"原文链接: {index_url}{links_section}" return links_section \ No newline at end of file diff --git a/zeroerr_crawler/post_process.py b/zeroerr_crawler/post_process.py new file mode 100644 index 0000000..f006522 --- /dev/null +++ b/zeroerr_crawler/post_process.py @@ -0,0 +1,164 @@ +""" +Word 文档后处理模块 +优化生成的 Word 文档格式 +""" + +import re +from docx import Document + + +def post_process_docx_headings(docx_path: str): + """ + 后处理 Word 文档:优化相同层级的连续标题 + + 规则: + 1. 如果两个相同层级的连续标题之间没有文字内容和图片 + 2. 如果一个标题包含另一个,则保留较长的 + 3. 如果不包含,则合并为一个标题 + 4. 如果中间有图片,不合并 + + Args: + docx_path: Word 文档路径 + """ + try: + doc = Document(docx_path) + paragraphs = doc.paragraphs + + # 找到所有标题段落及其索引 + heading_indices = [] + for i, para in enumerate(paragraphs): + if para.style.name.startswith('Heading'): + # 提取标题级别(Heading 1 -> 1, Heading 2 -> 2, ...) + level_match = re.search(r'Heading\s+(\d+)', para.style.name) + if level_match: + level = int(level_match.group(1)) + text = para.text.strip() + if text: # 只处理非空标题 + heading_indices.append({ + 'index': i, + 'level': level, + 'text': text, + 'paragraph': para + }) + + if len(heading_indices) < 2: + return # 至少需要两个标题才能合并 + + # 需要删除的段落索引 + to_remove = set() + # 需要修改的段落(合并标题) + to_modify = {} + + i = 0 + while i < len(heading_indices) - 1: + current = heading_indices[i] + next_heading = heading_indices[i + 1] + + # 只处理相同层级的连续标题 + if current['level'] == next_heading['level']: + # 检查两个标题之间是否有文字内容或图片 + start_idx = current['index'] + 1 + end_idx = next_heading['index'] + + has_content = False + for j in range(start_idx, end_idx): + para = paragraphs[j] + # 如果遇到其他标题,说明不是连续的 + if para.style.name.startswith('Heading'): + has_content = True + break + + # 检查是否有图片(通过检查段落中的 drawing 元素) + has_image = False + try: + # 方法1: 检查段落 XML 中是否包含 drawing 标签 + if hasattr(para, '_element'): + para_xml = para._element.xml if hasattr(para._element, 'xml') else str(para._element) + if 'drawing' in para_xml.lower(): + has_image = True + + # 方法2: 检查段落中的运行(runs)是否有图片 + if not has_image and hasattr(para, 'runs'): + for run in para.runs: + if hasattr(run, '_element'): + try: + run_xml = run._element.xml if hasattr(run._element, 'xml') else str(run._element) + if 'drawing' in run_xml.lower(): + has_image = True + break + except: + pass + except Exception: + # 如果检查失败,保守处理:假设有内容,不合并 + pass + + if has_image: + has_content = True + break + + # 检查是否有文字内容(非标题段落) + text = para.text.strip() + if text: + has_content = True + break + + # 如果中间没有文字内容,需要处理 + if not has_content: + current_text = current['text'] + next_text = next_heading['text'] + + # 判断包含关系(较短的标题是否包含在较长的标题中) + if len(current_text) <= len(next_text): + # 当前标题较短,检查是否包含在下一个标题中 + if current_text in next_text: + # 当前标题包含在下一个标题中,保留较长的(下一个) + to_remove.add(current['index']) + print(f" 标题优化: 删除 '{current_text}'(包含在 '{next_text}' 中)") + i += 1 # 跳过下一个标题,继续检查 + continue + else: + # 下一个标题较短,检查是否包含在当前标题中 + if next_text in current_text: + # 下一个标题包含在当前标题中,保留较长的(当前) + to_remove.add(next_heading['index']) + print(f" 标题优化: 删除 '{next_text}'(包含在 '{current_text}' 中)") + i += 1 # 继续检查当前标题与下一个标题 + continue + + # 不包含,合并标题 + merged_text = f"{current_text} {next_text}" + to_modify[current['index']] = merged_text + to_remove.add(next_heading['index']) + print(f" 标题优化: 合并 '{current_text}' 和 '{next_text}' → '{merged_text}'") + # 更新当前标题文本,以便继续检查与下一个标题的关系 + current['text'] = merged_text + i += 1 # 跳过下一个标题,但继续用合并后的标题检查 + continue + + i += 1 + + # 应用修改 + if to_remove or to_modify: + # 修改合并的标题 + for idx, merged_text in to_modify.items(): + para = paragraphs[idx] + para.clear() + para.add_run(merged_text) + + # 删除需要移除的标题(清空内容并改为普通段落) + for idx in sorted(to_remove, reverse=True): + para = paragraphs[idx] + # 清空段落内容 + para.clear() + # 改为普通段落样式(避免保留标题样式) + para.style = doc.styles['Normal'] + + # 保存文档 + doc.save(docx_path) + total_changes = len(to_remove) + len(to_modify) + print(f" 标题优化完成: 处理了 {total_changes} 个标题(删除 {len(to_remove)} 个,合并 {len(to_modify)} 个)") + + except Exception as e: + print(f" 警告: 标题后处理失败: {e}") + # 失败不影响原始文档,继续执行 + diff --git a/zeroerr_crawler/product_crawler.py b/zeroerr_crawler/product_crawler.py index 79d7405..6a4b862 100644 --- a/zeroerr_crawler/product_crawler.py +++ b/zeroerr_crawler/product_crawler.py @@ -162,10 +162,17 @@ class ProductCrawler(BaseCrawler): elif element.name.startswith('h'): text = element.get_text(strip=True) if text and '零差云控' not in text: - # HTML h1-h6 直接映射到 Word Heading 1-6 - # 限制在 1-9 范围内(Word 支持的最大标题级别) - level = int(element.name[1]) - doc.add_heading(text, level=min(level, 9)) + # 对于页面内容中的标题,h1 转换为 Heading 2,h2-h6 保持原层级 + # 因为页面标题已经是 Heading 1,所以内容中的 h1 应该降级为 Heading 2 + original_level = int(element.name[1]) + if original_level == 1: + # 页面内容中的 h1 转换为 Heading 2 + word_level = 2 + print(f" 标题层级转换: h1 '{text}' → Heading 2") + else: + # h2-h6 保持原层级(h2→Heading 2, h3→Heading 3, ...) + word_level = original_level + doc.add_heading(text, level=min(word_level, 9)) elif element.name == 'table': # 处理表格