Files
tech/8_сохранение_postgres/save_to_postgres.py
2026-02-01 17:01:21 +03:00

253 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Шаг 8: сохранение анализа главы и тегов в Postgres.
Вход: merged JSON (шаг 5) с полями book_id, chapter_id, chapter_number, chapter_title,
book_title, author, framework, insights, application, limitations, tags.
Действия: upsert книги и главы, запись в chapter_analyses, get-or-create тегов в tags,
связи в chapter_tags. Ожидается, что схема уже применена (schema.sql).
"""
import argparse
import json
import os
import sys
import uuid
from pathlib import Path
from typing import Any
try:
import psycopg2
except ImportError:
print("Ошибка: установите psycopg2-binary (pip install psycopg2-binary).", file=sys.stderr)
sys.exit(1)
def env(name: str, default: str) -> str:
"""Читает переменную окружения или default."""
return os.environ.get(name, default).strip()
def get_connection(database_url: str | None = None):
"""Создаёт подключение к Postgres."""
if database_url:
return psycopg2.connect(database_url)
host = env("POSTGRES_HOST", "localhost")
port = env("POSTGRES_PORT", "5432")
user = env("POSTGRES_USER", "n8n")
password = env("POSTGRES_PASSWORD", "n8n_password")
dbname = env("POSTGRES_DB", "n8n")
return psycopg2.connect(
host=host,
port=port,
user=user,
password=password,
dbname=dbname,
)
def upsert_book(conn, book_id: str, title: str | None, author: str | None) -> None:
"""Вставка или обновление книги."""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO books (id, title, author, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (id) DO UPDATE SET
title = COALESCE(EXCLUDED.title, books.title),
author = COALESCE(EXCLUDED.author, books.author),
updated_at = NOW()
""",
(book_id, title or "", author or ""),
)
def upsert_chapter(
conn,
chapter_id: str,
book_id: str,
chapter_number: int,
chapter_title: str | None,
content: str | None = None,
) -> None:
"""Вставка или обновление главы."""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO chapters (id, book_id, chapter_number, chapter_title, content)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
book_id = EXCLUDED.book_id,
chapter_number = EXCLUDED.chapter_number,
chapter_title = COALESCE(EXCLUDED.chapter_title, chapters.chapter_title),
content = COALESCE(EXCLUDED.content, chapters.content)
""",
(chapter_id, book_id, chapter_number, chapter_title or "", content),
)
def upsert_chapter_analysis(
conn,
chapter_id: str,
analysis_result: dict[str, Any],
validation_score: float | None = None,
) -> None:
"""Вставка или обновление анализа главы."""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO chapter_analyses (chapter_id, analysis_result, validation_score, validated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (chapter_id) DO UPDATE SET
analysis_result = EXCLUDED.analysis_result,
validation_score = COALESCE(EXCLUDED.validation_score, chapter_analyses.validation_score),
validated_at = NOW()
""",
(chapter_id, json.dumps(analysis_result, ensure_ascii=False), validation_score),
)
def get_or_create_tag(conn, name: str, category: str) -> str:
"""Возвращает id тега по имени; создаёт тег при отсутствии."""
with conn.cursor() as cur:
cur.execute("SELECT id FROM tags WHERE name = %s", (name,))
row = cur.fetchone()
if row:
return str(row[0])
tag_id = str(uuid.uuid4())
cur.execute(
"INSERT INTO tags (id, name, category) VALUES (%s, %s, %s)",
(tag_id, name, category),
)
return tag_id
def upsert_chapter_tags(
conn,
chapter_id: str,
tags_flat: list[tuple[str, str, float | None]],
) -> None:
"""Связывает главу с тегами (get-or-create тегов, вставка в chapter_tags)."""
with conn.cursor() as cur:
for name, category, confidence in tags_flat:
tag_id = get_or_create_tag(conn, name, category)
cur.execute(
"""
INSERT INTO chapter_tags (chapter_id, tag_id, confidence, validated, source)
VALUES (%s, %s, %s, true, 'ai_validation')
ON CONFLICT (chapter_id, tag_id) DO UPDATE SET
confidence = EXCLUDED.confidence,
validated = true
""",
(chapter_id, tag_id, confidence),
)
def tags_from_merged(merged: dict[str, Any]) -> list[tuple[str, str, float | None]]:
"""(name, category, confidence) из merged JSON."""
result: list[tuple[str, str, float | None]] = []
for category, items in (merged.get("tags") or {}).items():
for item in items if isinstance(items, list) else []:
if isinstance(item, dict) and "tag" in item:
result.append((
item["tag"],
category,
item.get("confidence"),
))
return result
def analysis_result_from_merged(merged: dict[str, Any]) -> dict[str, Any]:
"""Блок для chapter_analyses.analysis_result (framework, insights, application, limitations)."""
return {
"framework": merged.get("framework"),
"insights": merged.get("insights"),
"application": merged.get("application"),
"limitations": merged.get("limitations"),
}
def main() -> int:
parser = argparse.ArgumentParser(
description="Шаг 8: сохранить анализ и теги в Postgres (merged JSON → books, chapters, chapter_analyses, tags, chapter_tags).",
)
parser.add_argument(
"merged_json",
type=Path,
help="Путь к merged JSON (шаг 5).",
)
parser.add_argument(
"--validation-score",
type=float,
default=None,
help="Оценка валидации (опционально).",
)
parser.add_argument(
"--database-url",
default=env("DATABASE_URL", ""),
help="URL подключения к Postgres (или POSTGRES_HOST/USER/PASSWORD/DB).",
)
args = parser.parse_args()
if not args.merged_json.exists():
print(f"Ошибка: файл не найден: {args.merged_json}", file=sys.stderr)
return 1
try:
with open(args.merged_json, encoding="utf-8") as f:
merged = json.load(f)
except json.JSONDecodeError as e:
print(f"Ошибка разбора JSON: {e}", file=sys.stderr)
return 1
required = ("book_id", "chapter_id", "chapter_number", "chapter_title")
missing = [k for k in required if not merged.get(k)]
if missing:
print(f"Ошибка: в merged нет полей (нужен мерж с --input-chapter): {missing}", file=sys.stderr)
return 1
database_url = args.database_url or None
try:
conn = get_connection(database_url)
except Exception as e:
print(f"Ошибка подключения к Postgres: {e}", file=sys.stderr)
return 1
try:
upsert_book(
conn,
merged["book_id"],
merged.get("book_title"),
merged.get("author"),
)
upsert_chapter(
conn,
merged["chapter_id"],
merged["book_id"],
int(merged["chapter_number"]),
merged.get("chapter_title"),
merged.get("chapter_text"), # если подмешали в мерж
)
upsert_chapter_analysis(
conn,
merged["chapter_id"],
analysis_result_from_merged(merged),
args.validation_score,
)
upsert_chapter_tags(conn, merged["chapter_id"], tags_from_merged(merged))
conn.commit()
except Exception as e:
conn.rollback()
print(f"Ошибка: {e}", file=sys.stderr)
return 1
finally:
conn.close()
print(f"Глава {merged['chapter_id']} сохранена в Postgres (books, chapters, chapter_analyses, tags, chapter_tags).", flush=True)
return 0
if __name__ == "__main__":
sys.exit(main())