"""文档解析:PDF / DOCX / TXT / MD / CSV / XLSX""" import os import io import base64 import httpx from config import SILICONFLOW_API_KEY, SILICONFLOW_BASE_URL async def download_file(url: str, local_path: str) -> str: """从 COS 预签名 URL 下载文件到本地""" async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client: resp = await client.get(url) resp.raise_for_status() os.makedirs(os.path.dirname(local_path), exist_ok=True) with open(local_path, "wb") as f: f.write(resp.content) return local_path def parse_txt(file_path: str) -> str: with open(file_path, "r", encoding="utf-8", errors="replace") as f: return f.read() def parse_markdown(file_path: str) -> str: return parse_txt(file_path) def parse_docx(file_path: str) -> str: from docx import Document doc = Document(file_path) return "\n\n".join(p.text for p in doc.paragraphs if p.text.strip()) def parse_pdf_text(file_path: str) -> str: """用 PyMuPDF 提取 PDF 文本层""" import fitz doc = fitz.open(file_path) pages = [] for page in doc: text = page.get_text() if text.strip(): pages.append(text) doc.close() return "\n\n".join(pages) def pdf_needs_ocr(file_path: str) -> bool: """判断 PDF 是否需要 OCR(文本层为空或极少文字)""" import fitz doc = fitz.open(file_path) total_len = sum(len(page.get_text().strip()) for page in doc) doc.close() # 平均每页少于 50 字符 → 扫描件 page_count = max(doc.page_count if hasattr(doc, 'page_count') else len(doc), 1) return (total_len / page_count) < 50 async def ocr_with_siliconflow(image_bytes: bytes) -> str: """用硅基流动多模态模型做 OCR / 图文识别""" b64 = base64.b64encode(image_bytes).decode() async with httpx.AsyncClient(timeout=60) as client: resp = await client.post( f"{SILICONFLOW_BASE_URL}/chat/completions", headers={"Authorization": f"Bearer {SILICONFLOW_API_KEY}"}, json={ "model": "Qwen/Qwen3-VL-32B-Instruct", "messages": [{ "role": "user", "content": [ {"type": "text", "text": "请识别并提取这张图片中的所有文字内容。如果有表格,请用 Markdown 表格格式输出。不要添加任何解释。"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}, ], }], "max_tokens": 4096, }, ) data = resp.json() return data["choices"][0]["message"]["content"] async def parse_image_with_ocr(file_path: str) -> str: """对图片进行 OCR""" with open(file_path, "rb") as f: image_bytes = f.read() return await ocr_with_siliconflow(image_bytes) def parse_csv(file_path: str) -> str: import pandas as pd df = pd.read_csv(file_path) return df.to_markdown(index=False) def parse_xlsx(file_path: str) -> str: import pandas as pd df = pd.read_excel(file_path) return df.to_markdown(index=False) async def parse_document(file_path: str, mime_type: str) -> str: """根据文件类型路由到合适的解析器""" ext = os.path.splitext(file_path)[1].lower() if ext in (".txt",): return parse_txt(file_path) elif ext in (".md", ".markdown"): return parse_markdown(file_path) elif ext in (".docx",): return parse_docx(file_path) elif ext in (".csv",): return parse_csv(file_path) elif ext in (".xlsx",): return parse_xlsx(file_path) elif ext in (".pdf",): if pdf_needs_ocr(file_path): # 扫描件——先尝试文本提取,空则走多模态 text = parse_pdf_text(file_path) if len(text.strip()) < 100: # 全扫描件,逐页 OCR import fitz doc = fitz.open(file_path) results = [] for i, page in enumerate(doc): pix = page.get_pixmap(dpi=150) img_bytes = pix.tobytes("png") page_text = await ocr_with_siliconflow(img_bytes) results.append(page_text) doc.close() return "\n\n".join(results) return text return parse_pdf_text(file_path) elif ext in (".png", ".jpg", ".jpeg", ".webp", ".heic", ".bmp"): return await parse_image_with_ocr(file_path) else: raise ValueError(f"不支持的文件类型: {ext}")