Files
tech/epub-parser/app.py
2026-02-02 00:05:24 +03:00

1179 lines
49 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Сервис для парсинга EPUB файлов с извлечением глав и их названий
"""
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
import io
import os
import tempfile
import base64
import json as json_module
import re
import logging
import uuid as uuid_module
import chardet
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Union, Any
from pydantic import BaseModel
import psycopg2
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Импорт для подсчета токенов
try:
import tiktoken
TIKTOKEN_AVAILABLE = True
except ImportError:
TIKTOKEN_AVAILABLE = False
logger.warning("tiktoken не установлен, будет использован приблизительный расчет токенов")
# Константы для магических чисел
# Длины текста
MIN_TEXT_LENGTH_FOR_TITLE_EXTRACTION: int = 10
TITLE_SEARCH_PREFIX_LENGTH: int = 500
MIN_TITLE_LENGTH: int = 5
MAX_TITLE_LENGTH_SHORT: int = 100
MAX_TITLE_LENGTH_MEDIUM: int = 150
MAX_TITLE_LENGTH_LONG: int = 200
MIN_TITLE_LENGTH_STRICT: int = 3
# Длины для оценки заголовков
IDEAL_TITLE_LENGTH_MIN: int = 20
IDEAL_TITLE_LENGTH_MAX: int = 100
GOOD_TITLE_LENGTH_MIN: int = 10
GOOD_TITLE_LENGTH_MAX: int = 150
ACCEPTABLE_TITLE_LENGTH_MIN: int = 5
ACCEPTABLE_TITLE_LENGTH_MAX: int = 200
INFORMATIVE_TITLE_LENGTH: int = 30
MIN_TITLE_LENGTH_FOR_PENALTY: int = 10
# Оценки заголовков
MIN_HEADING_SCORE_THRESHOLD: float = 40.0
MIN_H2_SCORE_THRESHOLD: float = 30.0
SCORE_LENGTH_IDEAL: float = 30.0
SCORE_LENGTH_GOOD: float = 20.0
SCORE_LENGTH_ACCEPTABLE: float = 10.0
SCORE_H1: float = 25.0
SCORE_H2: float = 20.0
SCORE_H3: float = 15.0
SCORE_H4: float = 10.0
SCORE_H5_PLUS: float = 5.0
SCORE_POSITION_FIRST: float = 20.0
SCORE_POSITION_SECOND: float = 15.0
SCORE_POSITION_THIRD: float = 10.0
SCORE_POSITION_TOP5: float = 5.0
SCORE_QUESTION_MARK: float = 15.0
SCORE_LONG_TITLE: float = 10.0
SCORE_CHAPTER_CLASS: float = 10.0
SCORE_NOT_IN_TOC_PENALTY: float = -5.0
SCORE_SHORT_TITLE_PENALTY: float = -10.0
SCORE_GENERIC_TITLE_PENALTY: float = -5.0
# Кодировка
MIN_ENCODING_CONFIDENCE: float = 0.7
# Сноски
MAX_FOOTNOTE_LENGTH: int = 500
FOOTNOTE_COMPARISON_LENGTH: int = 100
# Файлы
MIN_BINARY_FILE_SIZE: int = 100
EPUB_SIGNATURE: bytes = b'PK'
# Позиции заголовков
MAX_POSITION_FOR_BONUS: int = 5
app = FastAPI(title="EPUB Parser API", version="1.0.0")
# Добавляем CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def clean_text_for_vectorization(text: str) -> str:
"""Очищает текст от символов, которые могут мешать векторизации.
Args:
text: Исходный текст для очистки.
Returns:
Очищенный текст без проблемных символов.
"""
if not text:
return text
# Удаляем HTML теги (более тщательно)
text = re.sub(r'<[^>]+>', '', text)
# Удаляем soft hyphens и другие проблемные символы
# \xad - soft hyphen (мягкий перенос)
# \u200b-\u200f - zero-width spaces
# \ufeff - BOM
# \u0600-\u0605 - арабские символы форматирования
problematic_chars = [
'\xad', # Soft hyphen
'\ufeff', # BOM
'\u200b', # Zero-width space
'\u200c', # Zero-width non-joiner
'\u200d', # Zero-width joiner
'\u200e', # Left-to-right mark
'\u200f', # Right-to-left mark
'\u2060', # Word joiner
'\u2028', # Line separator
'\u2029', # Paragraph separator
]
for char in problematic_chars:
text = text.replace(char, '')
# Удаляем другие непечатаемые символы (кроме пробелов, табуляции, переносов)
cleaned = []
for char in text:
if char.isprintable() or char in ['\n', '\r', '\t', ' ']:
cleaned.append(char)
# Заменяем непечатаемые символы на пробел
elif ord(char) < 32:
continue # Пропускаем контрольные символы
else:
# Для других непечатаемых символов проверяем, не являются ли они проблемными
code = ord(char)
if 0x0600 <= code <= 0x0605: # Арабские символы форматирования
continue
elif 0x2000 <= code <= 0x200F: # Различные пробелы
cleaned.append(' ')
elif 0x2028 <= code <= 0x202F: # Разделители строк/параграфов
cleaned.append(' ')
else:
# Оставляем остальные символы (могут быть валидными Unicode)
cleaned.append(char)
text = ''.join(cleaned)
# Нормализуем пробелы (множественные пробелы -> один)
text = re.sub(r' +', ' ', text)
# Нормализуем переносы строк (множественные -> один)
text = re.sub(r'\n{3,}', '\n\n', text)
# Удаляем пробелы в начале и конце строк
lines = [line.strip() for line in text.split('\n')]
text = '\n'.join(line for line in lines if line)
return text.strip()
def count_tokens(text: str, model: str = "gpt-4") -> int:
"""Подсчитывает количество токенов в тексте для указанной модели ИИ.
Использует tiktoken для точного подсчета токенов OpenAI моделей.
Если tiktoken недоступен, использует приблизительный расчет:
1 токен ≈ 4 символа для английского текста
1 токен ≈ 1.5 символа для русского текста (среднее значение)
Args:
text: Текст для подсчета токенов.
model: Название модели ИИ (по умолчанию "gpt-4").
Returns:
Количество токенов в тексте.
"""
if not text:
return 0
if TIKTOKEN_AVAILABLE:
try:
# Используем кодировку для указанной модели
# Для gpt-4 и gpt-3.5-turbo используется cl100k_base
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except Exception as e:
logger.warning(f"Ошибка при подсчете токенов через tiktoken: {e}, используем приблизительный расчет")
# Приблизительный расчет: учитываем смешанный текст (русский + английский)
# Для русского текста: ~1.5 символа на токен
# Для английского текста: ~4 символа на токен
# Используем среднее значение ~2.5 символа на токен
return int(len(text) / 2.5)
def calculate_chapter_tokens(chapter_data: Dict[str, Any]) -> int:
"""Подсчитывает общее количество токенов для главы со всеми метаданными.
Учитывает:
- Текст главы
- Название главы
- Текст всех сносок
- Метаданные (chapterId, chapterNumber, filePath)
Args:
chapter_data: Словарь с данными главы.
Returns:
Общее количество токенов для главы.
"""
total_tokens = 0
# Текст главы
chapter_text = chapter_data.get('text', '')
if chapter_text:
total_tokens += count_tokens(chapter_text)
# Название главы
chapter_title = chapter_data.get('chapterTitle', '')
if chapter_title:
total_tokens += count_tokens(chapter_title)
# Сноски
footnotes = chapter_data.get('footnotes', [])
for footnote in footnotes:
footnote_text = footnote.get('text', '')
if footnote_text:
total_tokens += count_tokens(footnote_text)
# Метаданные (chapterId, chapterNumber, filePath)
# Формируем строку с метаданными для подсчета
metadata_parts = []
if chapter_data.get('chapterId'):
metadata_parts.append(f"ID: {chapter_data['chapterId']}")
if chapter_data.get('chapterNumber'):
metadata_parts.append(f"Номер: {chapter_data['chapterNumber']}")
if chapter_data.get('filePath'):
metadata_parts.append(f"Файл: {chapter_data['filePath']}")
if metadata_parts:
metadata_text = " ".join(metadata_parts)
total_tokens += count_tokens(metadata_text)
return total_tokens
def clean_title(title: str) -> str:
"""Очищает название главы от номеров и лишних символов.
Args:
title: Исходное название главы.
Returns:
Очищенное название без номеров и лишних пробелов.
"""
if not title:
return title
# Убираем начальные номера (например, "20 Обратная сторона..." -> "Обратная сторона...")
title = title.strip()
# Убираем номера в начале строки (1-3 цифры, возможно с точкой или пробелом)
title = re.sub(r'^\d{1,3}[.\s]+', '', title)
# Убираем "Ch1" и подобные префиксы
title = re.sub(r'^Ch\d+\s*', '', title)
# Убираем множественные пробелы
title = re.sub(r'\s+', ' ', title).strip()
return title
def extract_title_from_text(text: str) -> Optional[str]:
"""Извлекает название главы из текста, если оно там есть.
Ищет паттерны типа "Глава N Название" в начале текста.
Args:
text: Текст главы для анализа.
Returns:
Название главы или None, если название не найдено.
"""
if not text or len(text) < MIN_TEXT_LENGTH_FOR_TITLE_EXTRACTION:
return None
# Ищем паттерны типа "Глава 3 Название" в первых N символах
first_part = text[:TITLE_SEARCH_PREFIX_LENGTH].strip()
# Паттерн: "Глава N Название" или "Глава N. Название"
patterns = [
rf'Глава\s+\d+\s+([А-ЯЁ][^.!?]{{{MIN_TITLE_LENGTH},{MAX_TITLE_LENGTH_SHORT}}}?)(?:[.!?]|$)',
rf'Глава\s+\d+\.\s+([А-ЯЁ][^.!?]{{{MIN_TITLE_LENGTH},{MAX_TITLE_LENGTH_SHORT}}}?)(?:[.!?]|$)',
rf'Глава\s+\d+:\s+([А-ЯЁ][^.!?]{{{MIN_TITLE_LENGTH},{MAX_TITLE_LENGTH_SHORT}}}?)(?:[.!?]|$)',
rf'Глава\s+\d+\s+«([^»]{{{MIN_TITLE_LENGTH},{MAX_TITLE_LENGTH_SHORT}}}?)»',
rf'Глава\s+\d+\.\s+«([^»]{{{MIN_TITLE_LENGTH},{MAX_TITLE_LENGTH_SHORT}}}?)»',
]
for pattern in patterns:
match = re.search(pattern, first_part, re.IGNORECASE)
if match:
title = match.group(1).strip()
if MIN_TITLE_LENGTH <= len(title) <= MAX_TITLE_LENGTH_MEDIUM:
return clean_title(title)
return None
def score_heading(
heading_text: str,
tag_name: str,
position: int,
classes: List[str]
) -> float:
"""Оценивает заголовок по критериям пригодности для названия главы.
Args:
heading_text: Текст заголовка для оценки.
tag_name: Название тега (h1-h6).
position: Позиция заголовка в документе (0 = первый).
classes: Список классов тега.
Returns:
Оценка от 0 до 100 (чем выше, тем лучше).
"""
score = 0.0
text_len = len(heading_text)
# Базовые проверки - исключаем технические заголовки
if heading_text.isdigit() or heading_text.startswith('Ch') or text_len < MIN_TITLE_LENGTH:
return 0.0
# Длина текста (идеально 20-100 символов)
if IDEAL_TITLE_LENGTH_MIN <= text_len <= IDEAL_TITLE_LENGTH_MAX:
score += SCORE_LENGTH_IDEAL
elif GOOD_TITLE_LENGTH_MIN <= text_len <= GOOD_TITLE_LENGTH_MAX:
score += SCORE_LENGTH_GOOD
elif ACCEPTABLE_TITLE_LENGTH_MIN <= text_len <= ACCEPTABLE_TITLE_LENGTH_MAX:
score += SCORE_LENGTH_ACCEPTABLE
# Уровень заголовка (h1 и h2 предпочтительнее)
if tag_name == 'h1':
score += SCORE_H1
elif tag_name == 'h2':
score += SCORE_H2
elif tag_name == 'h3':
score += SCORE_H3
elif tag_name == 'h4':
score += SCORE_H4
else:
score += SCORE_H5_PLUS
# Позиция (первые заголовки предпочтительнее)
if position == 0:
score += SCORE_POSITION_FIRST
elif position == 1:
score += SCORE_POSITION_SECOND
elif position == 2:
score += SCORE_POSITION_THIRD
elif position < MAX_POSITION_FOR_BONUS:
score += SCORE_POSITION_TOP5
# Информативность (вопросы, длинные заголовки)
if '?' in heading_text:
score += SCORE_QUESTION_MARK
if text_len > INFORMATIVE_TITLE_LENGTH:
score += SCORE_LONG_TITLE
# Классы (chapter_title, title и т.д. дают бонус)
class_str = ' '.join(classes).lower()
if 'chapter' in class_str or 'title' in class_str:
score += SCORE_CHAPTER_CLASS
if 'not_in_toc' in class_str:
score += SCORE_NOT_IN_TOC_PENALTY
# Штраф за слишком короткие или общие слова
if text_len < MIN_TITLE_LENGTH_FOR_PENALTY:
score += SCORE_SHORT_TITLE_PENALTY
if heading_text.lower() in ['введение', 'вступление', 'предисловие', 'заключение', 'обучение']:
score += SCORE_GENERIC_TITLE_PENALTY
return max(0.0, score)
def detect_encoding(content: bytes) -> str:
"""Определяет кодировку содержимого файла.
Args:
content: Байтовое содержимое файла для определения кодировки.
Returns:
Название кодировки (по умолчанию 'utf-8').
"""
try:
result = chardet.detect(content)
if result and result['encoding']:
confidence = result.get('confidence', 0)
if confidence > MIN_ENCODING_CONFIDENCE:
encoding = result['encoding'].lower()
# Нормализуем названия кодировок
if encoding in ['utf-8', 'utf8']:
return 'utf-8'
elif encoding in ['windows-1251', 'cp1251']:
return 'windows-1251'
elif encoding in ['iso-8859-1', 'latin1']:
return 'iso-8859-1'
return encoding
except Exception as e:
logger.warning(f"Ошибка определения кодировки: {e}")
# Пробуем декодировать как UTF-8
try:
content.decode('utf-8')
return 'utf-8'
except UnicodeDecodeError:
pass
# Пробуем Windows-1251 для русских текстов
try:
content.decode('windows-1251')
return 'windows-1251'
except UnicodeDecodeError:
pass
return 'utf-8' # По умолчанию
def extract_footnotes(soup: BeautifulSoup) -> List[Dict[str, str]]:
"""Извлекает сноски и примечания из HTML содержимого главы.
Args:
soup: BeautifulSoup объект с HTML содержимым главы.
Returns:
Список словарей с информацией о сносках. Каждый словарь содержит:
- id: идентификатор сноски
- text: текст сноски
- type: тип сноски (footnote, noteref, aside, class-based)
"""
footnotes: List[Dict[str, str]] = []
# 1. EPUB3 стандартные сноски (epub:type="footnote")
footnote_elements = soup.find_all(attrs={'epub:type': 'footnote'})
for fn in footnote_elements:
fn_id = fn.get('id', '')
fn_text = fn.get_text(separator=' ', strip=True)
if fn_text:
footnotes.append({
'id': fn_id,
'text': clean_text_for_vectorization(fn_text),
'type': 'footnote'
})
# 2. Ссылки на сноски (epub:type="noteref")
noteref_elements = soup.find_all(attrs={'epub:type': 'noteref'})
for ref in noteref_elements:
ref_href = ref.get('href', '')
ref_id = ref.get('id', '')
ref_text = ref.get_text(strip=True)
# Ищем соответствующую сноску по href
if ref_href.startswith('#'):
footnote_id = ref_href[1:]
footnote_elem = soup.find(id=footnote_id)
if footnote_elem:
fn_text = footnote_elem.get_text(separator=' ', strip=True)
if fn_text:
footnotes.append({
'id': footnote_id,
'ref_id': ref_id,
'ref_text': ref_text,
'text': clean_text_for_vectorization(fn_text),
'type': 'noteref'
})
# 3. Теги <aside> (часто используются для сносок)
aside_elements = soup.find_all('aside')
for aside in aside_elements:
aside_id = aside.get('id', '')
aside_text = aside.get_text(separator=' ', strip=True)
# Проверяем, что это не обычный текст, а именно сноска
if aside_text and len(aside_text) < MAX_FOOTNOTE_LENGTH:
# Проверяем наличие ссылок на этот aside
refs_to_aside = soup.find_all(href=f'#{aside_id}')
if refs_to_aside or 'note' in str(aside.get('class', [])).lower():
footnotes.append({
'id': aside_id,
'text': clean_text_for_vectorization(aside_text),
'type': 'aside'
})
# 4. Элементы с классами, содержащими "note", "footnote", "endnote"
note_classes = soup.find_all(class_=lambda x: x and any(
keyword in str(x).lower() for keyword in ['note', 'footnote', 'endnote', 'annotation']
))
for note in note_classes:
note_id = note.get('id', '')
note_text = note.get_text(separator=' ', strip=True)
if note_text and note_text not in [fn['text'] for fn in footnotes]:
footnotes.append({
'id': note_id,
'text': clean_text_for_vectorization(note_text),
'type': 'class-based'
})
# Удаляем дубликаты по тексту
seen_texts: set[str] = set()
unique_footnotes: List[Dict[str, str]] = []
for fn in footnotes:
fn_text_short = fn['text'][:FOOTNOTE_COMPARISON_LENGTH]
if fn_text_short not in seen_texts:
seen_texts.add(fn_text_short)
unique_footnotes.append(fn)
return unique_footnotes
def extract_chapter_title(
soup: BeautifulSoup,
item_name: str,
text_content: Optional[str] = None
) -> str:
"""Извлекает название главы из HTML структуры.
Универсальный подход: собирает все заголовки h1-h6 и выбирает наиболее
подходящий на основе системы оценки.
Args:
soup: BeautifulSoup объект с HTML содержимым главы.
item_name: Имя файла элемента EPUB.
text_content: Опциональный текст содержимого для поиска названия в тексте.
Returns:
Название главы или дефолтное значение 'Глава без названия'.
"""
body = soup.find('body')
if not body:
body = soup
# Собираем все заголовки h1-h6 с их метаданными
candidates: List[Dict[str, Any]] = []
for tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
tags = body.find_all(tag_name)
for position, tag in enumerate(tags):
title_text = tag.get_text().strip()
if not title_text or len(title_text) < MIN_TITLE_LENGTH:
continue
# Базовые фильтры
if title_text.isdigit() or title_text.startswith('Ch'):
continue
classes = tag.get('class', [])
score = score_heading(title_text, tag_name, position, classes)
if score > 0:
candidates.append({
'text': title_text,
'tag': tag_name,
'position': position,
'score': score,
'classes': classes,
'tag_obj': tag
})
# Сортируем кандидатов по оценке (лучшие первыми)
candidates.sort(key=lambda x: x['score'], reverse=True)
# Если есть кандидаты с высокой оценкой, выбираем лучший
if candidates:
best = candidates[0]
# Если лучший кандидат имеет хорошую оценку, используем его
if best['score'] >= MIN_HEADING_SCORE_THRESHOLD:
cleaned = clean_title(best['text'])
if cleaned:
return cleaned
# Если есть h1 с class="chapter_title" и h2 с хорошей оценкой,
# предпочитаем h2 (h1 может быть названием раздела)
h1_chapter_title = None
h2_candidate = None
for cand in candidates:
if cand['tag'] == 'h1' and 'chapter_title' in ' '.join(cand['classes']).lower():
h1_chapter_title = cand
elif cand['tag'] == 'h2' and cand['score'] >= MIN_H2_SCORE_THRESHOLD:
h2_candidate = cand
if h1_chapter_title and h2_candidate:
# Если h2 информативнее (содержит вопрос или длиннее), используем его
h2_text = h2_candidate['text']
if '?' in h2_text or len(h2_text) > INFORMATIVE_TITLE_LENGTH:
cleaned = clean_title(h2_text)
if cleaned:
return cleaned
# Используем лучшего кандидата
cleaned = clean_title(best['text'])
if cleaned:
return cleaned
# Пробуем найти заголовки по классам и ID
for selector in [
'.chapter-title', '.chapter', '.title', '.heading',
'#chapter-title', '#chapter', '#title', '#heading',
'[class*="chapter"]', '[class*="title"]', '[id*="chapter"]', '[id*="title"]'
]:
try:
tags = soup.select(selector)
for tag in tags:
title_text = tag.get_text().strip()
if title_text and MIN_TITLE_LENGTH <= len(title_text) <= MAX_TITLE_LENGTH_LONG:
if not title_text.isdigit() and not title_text.startswith('Ch'):
cleaned = clean_title(title_text)
if cleaned:
return cleaned
except Exception as e:
logger.debug(f"Ошибка при поиске заголовка по селектору {selector}: {e}")
continue
# Пробуем найти заголовок в title теге
title_tag = soup.find('title')
if title_tag:
title_text = title_tag.get_text().strip()
if title_text and MIN_TITLE_LENGTH <= len(title_text) <= MAX_TITLE_LENGTH_LONG:
if not title_text.isdigit() and not title_text.startswith('Ch'):
cleaned = clean_title(title_text)
if cleaned:
return cleaned
# Если ничего не найдено в HTML, пробуем извлечь из текста
if text_content:
title_from_text = extract_title_from_text(text_content)
if title_from_text:
return title_from_text
# Если ничего не найдено, используем имя файла или дефолтное значение
if item_name:
base_name = os.path.basename(item_name)
name_without_ext = os.path.splitext(base_name)[0]
if name_without_ext and name_without_ext not in ['cover', 'title', 'toc']:
return name_without_ext.replace('_', ' ').replace('-', ' ').title()
return 'Глава без названия'
def extract_all_metadata(book: epub.EpubBook) -> Dict[str, Any]:
"""Извлекает все доступные метаданные из EPUB книги.
Args:
book: Объект EpubBook для извлечения метаданных.
Returns:
Словарь со всеми метаданными книги, включая Dublin Core поля
и дополнительные метаданные из других namespace.
"""
metadata = {}
# Стандартные Dublin Core поля
dc_fields = [
'title', 'creator', 'subject', 'description', 'publisher',
'contributor', 'date', 'type', 'format', 'identifier',
'source', 'language', 'relation', 'coverage', 'rights'
]
for field in dc_fields:
values = book.get_metadata('DC', field)
if values:
# Извлекаем все значения для поля
field_values = [val[0] if isinstance(val, tuple) else val for val in values]
if len(field_values) == 1:
metadata[field] = field_values[0]
else:
metadata[field] = field_values
# Дополнительные метаданные из других namespace
for namespace, ns_metadata in book.metadata.items():
if namespace != 'http://purl.org/dc/elements/1.1/': # Уже обработали DC
for key, values in ns_metadata.items():
if values:
ns_key = f"{namespace.split('/')[-1]}_{key}" if '/' in namespace else key
field_values = [val[0] if isinstance(val, tuple) else val for val in values]
if len(field_values) == 1:
metadata[ns_key] = field_values[0]
else:
metadata[ns_key] = field_values
return metadata
def parse_epub_content(book: epub.EpubBook) -> Tuple[str, str, Dict[str, Any], List[Dict[str, Any]]]:
"""Парсит содержимое EPUB книги и извлекает главы.
Args:
book: Объект EpubBook для парсинга.
Returns:
Кортеж из четырех элементов:
- title: название книги
- author: автор книги
- metadata: словарь со всеми метаданными
- chapters: список словарей с информацией о главах
"""
# Извлекаем все метаданные
all_metadata = extract_all_metadata(book)
# Извлекаем основные метаданные с fallback значениями
title = all_metadata.get('title', 'Без названия')
if not title or title == '':
title = 'Без названия'
# Creator может быть списком
creator = all_metadata.get('creator', 'Неизвестный автор')
if isinstance(creator, list):
author = creator[0] if creator else 'Неизвестный автор'
else:
author = creator if creator else 'Неизвестный автор'
if not author or author == '':
author = 'Неизвестный автор'
# Создаем словарь всех элементов по ID для быстрого доступа
items_dict: Dict[str, Any] = {item.get_id(): item for item in book.get_items()}
# Получаем порядок элементов из spine (правильный порядок чтения)
spine_items: List[Any] = []
for item_id, _ in book.spine:
if item_id in items_dict:
item = items_dict[item_id]
if item.get_type() == ebooklib.ITEM_DOCUMENT:
spine_items.append(item)
# Если spine пуст, используем все документы
if not spine_items:
spine_items = list(book.get_items_of_type(ebooklib.ITEM_DOCUMENT))
# Извлекаем главы
chapters: List[Dict[str, Any]] = []
chapter_number: int = 0
parsing_errors: List[Dict[str, str]] = []
for item in spine_items:
# Пропускаем не-текстовые элементы
item_name = item.get_name()
if not (item_name.endswith('.html') or item_name.endswith('.xhtml')):
continue
chapter_title: str = 'Глава без названия'
clean_text: str = ''
footnotes: List[Dict[str, str]] = []
error_info: Optional[Dict[str, str]] = None
try:
# Извлекаем HTML содержимое
content = item.get_content()
# Проверяем кодировку
encoding = detect_encoding(content)
try:
if encoding != 'utf-8':
content = content.decode(encoding).encode('utf-8')
else:
# Пробуем декодировать как UTF-8
content.decode('utf-8')
except (UnicodeDecodeError, UnicodeEncodeError) as e:
logger.warning(f"Проблема с кодировкой в {item_name}: {e}, пробуем другие варианты")
# Пробуем другие кодировки
for enc in ['windows-1251', 'cp1251', 'iso-8859-1', 'latin1']:
try:
content = content.decode(enc).encode('utf-8')
encoding = enc
break
except:
continue
else:
# Если ничего не помогло, пробуем с ошибками
content = content.decode('utf-8', errors='replace').encode('utf-8')
soup = BeautifulSoup(content, 'html.parser')
# Сначала извлекаем текст для поиска названия в тексте
body = soup.find('body')
if body:
temp_text = body.get_text(separator=' ', strip=False)
else:
temp_text = soup.get_text(separator=' ', strip=False)
# Извлекаем название главы (сначала из HTML, потом из текста)
chapter_title = extract_chapter_title(soup, item_name, temp_text)
# Удаляем заголовки и другие служебные элементы перед извлечением текста
# Это поможет получить чистый текст без заголовков
for tag in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'title']):
# Сохраняем заголовок, если он еще не найден
if not chapter_title or chapter_title == 'Глава без названия':
title_text = tag.get_text().strip()
if title_text and MIN_TITLE_LENGTH_STRICT <= len(title_text) <= MAX_TITLE_LENGTH_LONG:
if not title_text.isdigit() and not title_text.startswith('Ch'):
cleaned = clean_title(title_text)
if cleaned:
chapter_title = cleaned
# Удаляем заголовок из текста, чтобы он не дублировался
tag.decompose()
# Извлекаем текст из body, если он есть, иначе из всего документа
body = soup.find('body')
if body:
text_content = body.get_text(separator=' ', strip=False)
else:
text_content = soup.get_text(separator=' ', strip=False)
# Очищаем текст: убираем лишние пробелы, но сохраняем структуру предложений
# Разбиваем на строки, очищаем каждую, затем объединяем
lines = []
for line in text_content.split('\n'):
cleaned_line = ' '.join(line.split())
if cleaned_line:
lines.append(cleaned_line)
clean_text = ' '.join(lines)
# Дополнительная очистка: убираем множественные пробелы
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
# Очищаем текст от символов, мешающих векторизации
clean_text = clean_text_for_vectorization(clean_text)
# Извлекаем сноски и примечания ДО удаления заголовков
footnotes = extract_footnotes(soup)
# Удаляем название главы из начала текста, если оно там есть
if chapter_title and chapter_title != 'Глава без названия':
# Пробуем удалить название в разных вариантах
title_variants = [
chapter_title,
re.sub(r'^\d{1,3}[.\s]+', '', chapter_title), # С номером в начале
f'\\d{{1,3}}\\s*{re.escape(chapter_title)}', # С номером перед названием
]
for variant in title_variants:
# Удаляем название, если оно в самом начале текста
pattern = f'^{re.escape(variant)}\\s+'
clean_text = re.sub(pattern, '', clean_text, flags=re.IGNORECASE)
# Также пробуем без экранирования для варианта с номером
if '\\d' in variant:
pattern = variant
clean_text = re.sub(pattern, '', clean_text, flags=re.IGNORECASE)
clean_text = clean_text.strip()
# Пропускаем только файлы, которые явно не являются главами
# УБРАЛИ проверку на длину текста - берем все главы
skip_files = ['cover', 'title', 'toc', 'copyright', 'colophon']
should_skip = any(skip in item_name.lower() for skip in skip_files)
if not should_skip:
chapter_number += 1
chapter_data = {
'chapterNumber': chapter_number,
'chapterTitle': chapter_title,
'chapterId': item.get_id(),
'text': clean_text,
'filePath': item_name,
'textLength': len(clean_text),
'footnotes': footnotes,
'footnotesCount': len(footnotes)
}
# Подсчитываем токены для главы
chapter_data['tokenCount'] = calculate_chapter_tokens(chapter_data)
chapters.append(chapter_data)
except Exception as e:
# НЕ игнорируем ошибки - сохраняем информацию о них
error_info = {
'file': item_name,
'error': str(e),
'errorType': type(e).__name__
}
parsing_errors.append(error_info)
logger.error(f"Ошибка при парсинге главы {item_name}: {e}", exc_info=True)
# Все равно добавляем главу с минимальной информацией
chapter_number += 1
chapter_data = {
'chapterNumber': chapter_number,
'chapterTitle': 'Глава без названия',
'chapterId': item.get_id(),
'text': '',
'filePath': item_name,
'textLength': 0,
'footnotes': [],
'footnotesCount': 0,
'error': str(e)
}
# Подсчитываем токены для главы (даже если есть ошибка)
chapter_data['tokenCount'] = calculate_chapter_tokens(chapter_data)
chapters.append(chapter_data)
# Логируем ошибки парсинга
if parsing_errors:
logger.warning(f"Обнаружено {len(parsing_errors)} ошибок при парсинге глав")
for err in parsing_errors:
logger.warning(f" - {err['file']}: {err['error']}")
return title, author, all_metadata, chapters
def _get_postgres_connection() -> Any:
"""Создаёт подключение к Postgres из переменных окружения."""
host = os.environ.get("POSTGRES_HOST", "localhost").strip()
port = os.environ.get("POSTGRES_PORT", "5432").strip()
user = os.environ.get("POSTGRES_USER", "n8n").strip()
password = os.environ.get("POSTGRES_PASSWORD", "n8n_password").strip()
dbname = os.environ.get("POSTGRES_DB", "n8n").strip()
return psycopg2.connect(
host=host,
port=port,
user=user,
password=password,
dbname=dbname,
)
def save_parse_result_to_postgres(
title: str,
author: str,
metadata: Dict[str, Any],
chapters: List[Dict[str, Any]],
) -> Tuple[str, List[str]]:
"""Сохраняет результат парсинга в Postgres (books + chapters).
Схема соответствует 8_сохранение_postgres/schema.sql.
Генерирует UUID для книги и для каждой главы.
Args:
title: Название книги.
author: Автор книги.
metadata: Метаданные EPUB (books.metadata).
chapters: Список глав с chapterNumber, chapterTitle, text.
Returns:
Кортеж (book_id_uuid, [chapter_id_uuid, ...]).
Raises:
RuntimeError: Если psycopg2 недоступен или ошибка записи.
"""
book_id_uuid = str(uuid_module.uuid4())
chapter_ids: List[str] = []
conn = _get_postgres_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO books (id, title, author, metadata, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (id) DO UPDATE SET
title = COALESCE(EXCLUDED.title, books.title),
author = COALESCE(EXCLUDED.author, books.author),
metadata = COALESCE(EXCLUDED.metadata, books.metadata),
updated_at = NOW()
""",
(
book_id_uuid,
title or "",
author or "",
json_module.dumps(metadata or {}, ensure_ascii=False),
),
)
for ch in chapters:
chapter_id_uuid = str(uuid_module.uuid4())
chapter_ids.append(chapter_id_uuid)
cur.execute(
"""
INSERT INTO chapters (id, book_id, chapter_number, chapter_title, content)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
book_id = EXCLUDED.book_id,
chapter_number = EXCLUDED.chapter_number,
chapter_title = COALESCE(EXCLUDED.chapter_title, chapters.chapter_title),
content = COALESCE(EXCLUDED.content, chapters.content)
""",
(
chapter_id_uuid,
book_id_uuid,
int(ch.get("chapterNumber", 0)),
(ch.get("chapterTitle") or ""),
(ch.get("text") or ""),
),
)
conn.commit()
except Exception as e:
conn.rollback()
logger.exception("Ошибка сохранения в Postgres")
raise RuntimeError(f"Ошибка сохранения в Postgres: {e}") from e
finally:
conn.close()
return book_id_uuid, chapter_ids
class FileDataRequest(BaseModel):
file_data: str # base64 encoded
@app.post('/parse', response_model=None)
async def parse_epub(
request: Request,
data: Optional[UploadFile] = File(None, alias='data')
):
"""Парсит EPUB файл и возвращает главы в JSON формате.
Поддерживает три способа передачи файла:
1. multipart/form-data с полем 'data'
2. JSON с base64 в поле 'file_data'
3. raw binary (application/octet-stream)
Args:
request: FastAPI Request объект с данными запроса.
data: Опциональный загруженный файл через multipart/form-data.
Returns:
JSON словарь с полями:
- title: название книги (всегда присутствует)
- author: автор книги (всегда присутствует)
- bookId: уникальный идентификатор книги
- totalChapters: общее количество глав
- totalTokens: общее количество токенов для всей книги
- metadata: словарь со всеми метаданными EPUB
- chapters: массив глав с текстом, сносками, метаданными и tokenCount
- error: опциональное поле с информацией об ошибке
Raises:
HTTPException: Если файл не найден или не может быть обработан.
"""
epub_data: Optional[bytes] = None
error_info: Optional[str] = None
try:
content_type = request.headers.get('content-type', '')
# 1. Проверяем multipart/form-data
if data and data.filename:
epub_data = await data.read()
# 2. Проверяем JSON с base64
if epub_data is None and 'application/json' in content_type:
try:
body = await request.json()
if 'file_data' in body:
epub_data = base64.b64decode(body['file_data'])
except Exception as e:
logger.debug(f"Ошибка при парсинге JSON: {e}")
# 3. Проверяем raw binary (application/octet-stream или другой binary)
if epub_data is None:
try:
body_bytes = await request.body()
if body_bytes and len(body_bytes) > MIN_BINARY_FILE_SIZE:
# Проверяем, что это похоже на EPUB (начинается с PK - это ZIP архив)
if body_bytes[:2] == EPUB_SIGNATURE:
epub_data = body_bytes
except Exception as e:
logger.debug(f"Ошибка при чтении raw binary: {e}")
if not epub_data:
raise HTTPException(
status_code=400,
detail='Файл не найден. Используйте multipart/form-data, JSON base64 или raw binary'
)
# Сохраняем во временный файл для epub.read_epub
with tempfile.NamedTemporaryFile(delete=False, suffix='.epub') as tmp_file:
tmp_file.write(epub_data)
tmp_path = tmp_file.name
try:
book = epub.read_epub(tmp_path)
title, author, metadata, chapters = parse_epub_content(book)
except Exception as e:
logger.error(f"Ошибка при парсинге EPUB: {e}", exc_info=True)
title: str = 'Без названия'
author: str = 'Неизвестный автор'
metadata: Dict[str, Any] = {}
chapters: List[Dict[str, Any]] = []
error_info = str(e)
finally:
os.unlink(tmp_path)
# Формируем результат - title и author ВСЕГДА присутствуют
book_id: str = f"{title}_{int(datetime.now().timestamp() * 1000)}"
result_chapters: List[Dict[str, Any]] = list(chapters)
# По умолчанию сохраняем в Postgres (books + chapters)
saved_to_postgres = False
if not error_info and chapters:
try:
pg_book_id, pg_chapter_ids = save_parse_result_to_postgres(
title, author, metadata, chapters
)
book_id = pg_book_id
result_chapters = [
{**ch, "chapter_id": pg_chapter_ids[i] if i < len(pg_chapter_ids) else ""}
for i, ch in enumerate(chapters)
]
saved_to_postgres = True
except Exception as e:
logger.warning("Сохранение в Postgres не удалось: %s", e)
total_tokens = sum(c.get("tokenCount", 0) for c in result_chapters)
result: Dict[str, Any] = {
"title": title,
"author": author,
"bookId": book_id,
"totalChapters": len(result_chapters),
"totalTokens": total_tokens,
"metadata": metadata,
"chapters": result_chapters,
}
if saved_to_postgres:
result["book_id"] = book_id
if error_info:
result["error"] = error_info
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Критическая ошибка при обработке запроса: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={
'error': str(e),
'title': 'Без названия',
'author': 'Неизвестный автор',
'bookId': f"error_{int(datetime.now().timestamp() * 1000)}",
'totalChapters': 0,
'totalTokens': 0,
'metadata': {},
'chapters': []
}
)
@app.get('/health')
async def health() -> Dict[str, str]:
"""Проверка работоспособности сервиса.
Returns:
Словарь со статусом 'ok' если сервис работает.
"""
return {'status': 'ok'}
if __name__ == '__main__':
import uvicorn
import os
port = int(os.environ.get('PORT', 5000))
# Запуск через uvicorn (ASGI сервер)
uvicorn.run(app, host='0.0.0.0', port=port)