205 lines
7.1 KiB
Python
205 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Шаг 7: сохранение вектора эмбеддинга и payload в коллекцию Qdrant chapter_analyses.
|
||
|
||
Вход:
|
||
- merged JSON (шаг 5): теги, анализ и метаданные главы/книги (book_id, chapter_id,
|
||
chapter_number, chapter_title, book_title, author — из входного JSON этапа 1,
|
||
подмешанные при мерже через --input-chapter).
|
||
- вектор (файл или stdin): результат шага 6 (embed_cli.py).
|
||
|
||
Выход: одна точка в коллекции chapter_analyses (upsert по chapter_id).
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import sys
|
||
import urllib.error
|
||
import urllib.request
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
def env(name: str, default: str) -> str:
|
||
"""Читает переменную окружения или default."""
|
||
return os.environ.get(name, default).strip()
|
||
|
||
|
||
def tags_from_merged(merged: dict[str, Any]) -> list[dict[str, Any]]:
|
||
"""Преобразует теги из merged JSON в плоский список для payload (tag, category, confidence)."""
|
||
result: list[dict[str, Any]] = []
|
||
tags_block = merged.get("tags") or {}
|
||
for category, items in tags_block.items():
|
||
for item in items if isinstance(items, list) else []:
|
||
if isinstance(item, dict) and "tag" in item:
|
||
result.append({
|
||
"tag": item["tag"],
|
||
"category": category,
|
||
"confidence": item.get("confidence"),
|
||
})
|
||
return result
|
||
|
||
|
||
def payload_from_merged(merged: dict[str, Any], validation_score: float | None = None) -> dict[str, Any]:
|
||
"""
|
||
Собирает payload для Qdrant из merged JSON (шаг 5).
|
||
|
||
Ожидает в merged поля из входного JSON этапа 1: book_id, chapter_id,
|
||
chapter_number, chapter_title, book_title; опционально author.
|
||
"""
|
||
required = ("book_id", "chapter_id", "chapter_number", "chapter_title")
|
||
missing = [k for k in required if not merged.get(k)]
|
||
if missing:
|
||
raise ValueError(
|
||
f"В merged JSON отсутствуют поля (должны быть из входного JSON этапа 1, мерж с --input-chapter): {missing}"
|
||
)
|
||
payload: dict[str, Any] = {
|
||
"bookId": merged["book_id"],
|
||
"chapterId": merged["chapter_id"],
|
||
"chapterNumber": int(merged["chapter_number"]),
|
||
"chapterTitle": merged["chapter_title"],
|
||
"tags": tags_from_merged(merged),
|
||
}
|
||
if validation_score is not None:
|
||
payload["validationScore"] = validation_score
|
||
if merged.get("book_title") is not None:
|
||
payload["title"] = merged["book_title"]
|
||
if merged.get("author") is not None:
|
||
payload["author"] = merged["author"]
|
||
return payload
|
||
|
||
|
||
def load_vector(path: Path | None) -> list[float]:
|
||
"""Загружает вектор из файла или stdin (JSON-массив)."""
|
||
if path is None or str(path) == "-":
|
||
data = json.load(sys.stdin)
|
||
else:
|
||
with open(path, encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, list):
|
||
raise ValueError("Вектор должен быть JSON-массивом чисел")
|
||
return [float(x) for x in data]
|
||
|
||
|
||
def upsert_point(
|
||
base_url: str,
|
||
collection: str,
|
||
point_id: str,
|
||
vector: list[float],
|
||
payload: dict[str, Any],
|
||
) -> dict[str, Any]:
|
||
"""Добавляет или обновляет одну точку в коллекции Qdrant."""
|
||
url = f"{base_url.rstrip('/')}/collections/{collection}/points?wait=true"
|
||
body = {
|
||
"points": [
|
||
{
|
||
"id": point_id,
|
||
"vector": vector,
|
||
"payload": payload,
|
||
}
|
||
]
|
||
}
|
||
req_body = json.dumps(body).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=req_body,
|
||
headers={"Content-Type": "application/json"},
|
||
method="PUT",
|
||
)
|
||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
return json.loads(resp.read().decode("utf-8"))
|
||
|
||
|
||
def main() -> int:
|
||
"""Точка входа CLI."""
|
||
parser = argparse.ArgumentParser(
|
||
description="Шаг 7: сохранить вектор и payload в Qdrant (коллекция chapter_analyses).",
|
||
)
|
||
parser.add_argument(
|
||
"merged_json",
|
||
type=Path,
|
||
help="Путь к merged JSON (шаг 5) — для тегов и др.",
|
||
)
|
||
parser.add_argument(
|
||
"vector",
|
||
nargs="?",
|
||
default=None,
|
||
help="Путь к файлу с вектором (JSON-массив) или '-' для stdin.",
|
||
)
|
||
parser.add_argument(
|
||
"--validation-score",
|
||
type=float,
|
||
default=None,
|
||
help="Оценка валидации (опционально).",
|
||
)
|
||
parser.add_argument(
|
||
"--qdrant-url",
|
||
default=env("QDRANT_URL", "http://localhost:6333"),
|
||
help="URL Qdrant (по умолчанию из QDRANT_URL или localhost:6333).",
|
||
)
|
||
parser.add_argument(
|
||
"--collection",
|
||
default=env("QDRANT_COLLECTION_CHAPTER_ANALYSES", "chapter_analyses"),
|
||
help="Имя коллекции.",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not args.merged_json.exists():
|
||
print(f"Ошибка: файл не найден: {args.merged_json}", file=sys.stderr)
|
||
return 1
|
||
|
||
try:
|
||
with open(args.merged_json, encoding="utf-8") as f:
|
||
merged = json.load(f)
|
||
except json.JSONDecodeError as e:
|
||
print(f"Ошибка разбора merged JSON: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
try:
|
||
vector = load_vector(args.vector)
|
||
except (ValueError, FileNotFoundError) as e:
|
||
print(f"Ошибка загрузки вектора: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
try:
|
||
payload = payload_from_merged(merged, validation_score=args.validation_score)
|
||
except ValueError as e:
|
||
print(f"Ошибка: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
chapter_id = merged["chapter_id"]
|
||
|
||
try:
|
||
result = upsert_point(
|
||
args.qdrant_url,
|
||
args.collection,
|
||
point_id=chapter_id,
|
||
vector=vector,
|
||
payload=payload,
|
||
)
|
||
except urllib.error.HTTPError as e:
|
||
print(f"Ошибка HTTP {e.code}: {e.reason}", file=sys.stderr)
|
||
if e.fp:
|
||
try:
|
||
print(e.fp.read().decode("utf-8")[:500], file=sys.stderr)
|
||
except Exception:
|
||
pass
|
||
return 1
|
||
except urllib.error.URLError as e:
|
||
print(f"Ошибка запроса к Qdrant: {e.reason}", file=sys.stderr)
|
||
return 1
|
||
except Exception as e:
|
||
print(f"Ошибка: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
if result.get("status") == "ok":
|
||
print(f"Точка {chapter_id} сохранена в коллекцию {args.collection}.", flush=True)
|
||
return 0
|
||
print(f"Неожиданный ответ Qdrant: {result}", file=sys.stderr)
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|