253 lines
8.7 KiB
Python
253 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Шаг 8: сохранение анализа главы и тегов в Postgres.
|
||
|
||
Вход: merged JSON (шаг 5) с полями book_id, chapter_id, chapter_number, chapter_title,
|
||
book_title, author, framework, insights, application, limitations, tags.
|
||
|
||
Действия: upsert книги и главы, запись в chapter_analyses, get-or-create тегов в tags,
|
||
связи в chapter_tags. Ожидается, что схема уже применена (schema.sql).
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import sys
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
try:
|
||
import psycopg2
|
||
except ImportError:
|
||
print("Ошибка: установите psycopg2-binary (pip install psycopg2-binary).", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
def env(name: str, default: str) -> str:
|
||
"""Читает переменную окружения или default."""
|
||
return os.environ.get(name, default).strip()
|
||
|
||
|
||
def get_connection(database_url: str | None = None):
|
||
"""Создаёт подключение к Postgres."""
|
||
if database_url:
|
||
return psycopg2.connect(database_url)
|
||
host = env("POSTGRES_HOST", "localhost")
|
||
port = env("POSTGRES_PORT", "5432")
|
||
user = env("POSTGRES_USER", "n8n")
|
||
password = env("POSTGRES_PASSWORD", "n8n_password")
|
||
dbname = env("POSTGRES_DB", "n8n")
|
||
return psycopg2.connect(
|
||
host=host,
|
||
port=port,
|
||
user=user,
|
||
password=password,
|
||
dbname=dbname,
|
||
)
|
||
|
||
|
||
def upsert_book(conn, book_id: str, title: str | None, author: str | None) -> None:
|
||
"""Вставка или обновление книги."""
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO books (id, title, author, updated_at)
|
||
VALUES (%s, %s, %s, NOW())
|
||
ON CONFLICT (id) DO UPDATE SET
|
||
title = COALESCE(EXCLUDED.title, books.title),
|
||
author = COALESCE(EXCLUDED.author, books.author),
|
||
updated_at = NOW()
|
||
""",
|
||
(book_id, title or "", author or ""),
|
||
)
|
||
|
||
|
||
def upsert_chapter(
|
||
conn,
|
||
chapter_id: str,
|
||
book_id: str,
|
||
chapter_number: int,
|
||
chapter_title: str | None,
|
||
content: str | None = None,
|
||
) -> None:
|
||
"""Вставка или обновление главы."""
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO chapters (id, book_id, chapter_number, chapter_title, content)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
ON CONFLICT (id) DO UPDATE SET
|
||
book_id = EXCLUDED.book_id,
|
||
chapter_number = EXCLUDED.chapter_number,
|
||
chapter_title = COALESCE(EXCLUDED.chapter_title, chapters.chapter_title),
|
||
content = COALESCE(EXCLUDED.content, chapters.content)
|
||
""",
|
||
(chapter_id, book_id, chapter_number, chapter_title or "", content),
|
||
)
|
||
|
||
|
||
def upsert_chapter_analysis(
|
||
conn,
|
||
chapter_id: str,
|
||
analysis_result: dict[str, Any],
|
||
validation_score: float | None = None,
|
||
) -> None:
|
||
"""Вставка или обновление анализа главы."""
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO chapter_analyses (chapter_id, analysis_result, validation_score, validated_at)
|
||
VALUES (%s, %s, %s, NOW())
|
||
ON CONFLICT (chapter_id) DO UPDATE SET
|
||
analysis_result = EXCLUDED.analysis_result,
|
||
validation_score = COALESCE(EXCLUDED.validation_score, chapter_analyses.validation_score),
|
||
validated_at = NOW()
|
||
""",
|
||
(chapter_id, json.dumps(analysis_result, ensure_ascii=False), validation_score),
|
||
)
|
||
|
||
|
||
def get_or_create_tag(conn, name: str, category: str) -> str:
|
||
"""Возвращает id тега по имени; создаёт тег при отсутствии."""
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT id FROM tags WHERE name = %s", (name,))
|
||
row = cur.fetchone()
|
||
if row:
|
||
return str(row[0])
|
||
tag_id = str(uuid.uuid4())
|
||
cur.execute(
|
||
"INSERT INTO tags (id, name, category) VALUES (%s, %s, %s)",
|
||
(tag_id, name, category),
|
||
)
|
||
return tag_id
|
||
|
||
|
||
def upsert_chapter_tags(
|
||
conn,
|
||
chapter_id: str,
|
||
tags_flat: list[tuple[str, str, float | None]],
|
||
) -> None:
|
||
"""Связывает главу с тегами (get-or-create тегов, вставка в chapter_tags)."""
|
||
with conn.cursor() as cur:
|
||
for name, category, confidence in tags_flat:
|
||
tag_id = get_or_create_tag(conn, name, category)
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO chapter_tags (chapter_id, tag_id, confidence, validated, source)
|
||
VALUES (%s, %s, %s, true, 'ai_validation')
|
||
ON CONFLICT (chapter_id, tag_id) DO UPDATE SET
|
||
confidence = EXCLUDED.confidence,
|
||
validated = true
|
||
""",
|
||
(chapter_id, tag_id, confidence),
|
||
)
|
||
|
||
|
||
def tags_from_merged(merged: dict[str, Any]) -> list[tuple[str, str, float | None]]:
|
||
"""(name, category, confidence) из merged JSON."""
|
||
result: list[tuple[str, str, float | None]] = []
|
||
for category, items in (merged.get("tags") or {}).items():
|
||
for item in items if isinstance(items, list) else []:
|
||
if isinstance(item, dict) and "tag" in item:
|
||
result.append((
|
||
item["tag"],
|
||
category,
|
||
item.get("confidence"),
|
||
))
|
||
return result
|
||
|
||
|
||
def analysis_result_from_merged(merged: dict[str, Any]) -> dict[str, Any]:
|
||
"""Блок для chapter_analyses.analysis_result (framework, insights, application, limitations)."""
|
||
return {
|
||
"framework": merged.get("framework"),
|
||
"insights": merged.get("insights"),
|
||
"application": merged.get("application"),
|
||
"limitations": merged.get("limitations"),
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(
|
||
description="Шаг 8: сохранить анализ и теги в Postgres (merged JSON → books, chapters, chapter_analyses, tags, chapter_tags).",
|
||
)
|
||
parser.add_argument(
|
||
"merged_json",
|
||
type=Path,
|
||
help="Путь к merged JSON (шаг 5).",
|
||
)
|
||
parser.add_argument(
|
||
"--validation-score",
|
||
type=float,
|
||
default=None,
|
||
help="Оценка валидации (опционально).",
|
||
)
|
||
parser.add_argument(
|
||
"--database-url",
|
||
default=env("DATABASE_URL", ""),
|
||
help="URL подключения к Postgres (или POSTGRES_HOST/USER/PASSWORD/DB).",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not args.merged_json.exists():
|
||
print(f"Ошибка: файл не найден: {args.merged_json}", file=sys.stderr)
|
||
return 1
|
||
|
||
try:
|
||
with open(args.merged_json, encoding="utf-8") as f:
|
||
merged = json.load(f)
|
||
except json.JSONDecodeError as e:
|
||
print(f"Ошибка разбора JSON: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
required = ("book_id", "chapter_id", "chapter_number", "chapter_title")
|
||
missing = [k for k in required if not merged.get(k)]
|
||
if missing:
|
||
print(f"Ошибка: в merged нет полей (нужен мерж с --input-chapter): {missing}", file=sys.stderr)
|
||
return 1
|
||
|
||
database_url = args.database_url or None
|
||
try:
|
||
conn = get_connection(database_url)
|
||
except Exception as e:
|
||
print(f"Ошибка подключения к Postgres: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
try:
|
||
upsert_book(
|
||
conn,
|
||
merged["book_id"],
|
||
merged.get("book_title"),
|
||
merged.get("author"),
|
||
)
|
||
upsert_chapter(
|
||
conn,
|
||
merged["chapter_id"],
|
||
merged["book_id"],
|
||
int(merged["chapter_number"]),
|
||
merged.get("chapter_title"),
|
||
merged.get("chapter_text"), # если подмешали в мерж
|
||
)
|
||
upsert_chapter_analysis(
|
||
conn,
|
||
merged["chapter_id"],
|
||
analysis_result_from_merged(merged),
|
||
args.validation_score,
|
||
)
|
||
upsert_chapter_tags(conn, merged["chapter_id"], tags_from_merged(merged))
|
||
conn.commit()
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f"Ошибка: {e}", file=sys.stderr)
|
||
return 1
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f"Глава {merged['chapter_id']} сохранена в Postgres (books, chapters, chapter_analyses, tags, chapter_tags).", flush=True)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|