Перейти к основному содержимому

Потоки данных

Общая схема потоков данных

Исходный код диаграммы:

graph LR
browser["Browser"] --> seadragon["Seadragon<br/>Next.js"]
seadragon -->|"/api/chat SSE"| nats["NATS JetStream"]
seadragon -->|"REST /api/*"| sprut["Sprut<br/>API Gateway"]

sprut --> postgres[("PostgreSQL")]
sprut --> milvus[("Milvus")]
sprut --> keycloak["Keycloak"]
sprut --> doubloon["Doubloon"] --> tinkoff["Tinkoff API"]
sprut -->|task publish| nats

nats --> amigofish["Amigofish"] --> llm["LLM APIs"]
nats --> anglerfish["Anglerfish"] --> milvus
nats --> krill["Krill<br/>PDF+OCR"]
nats --> gull["Gull"] --> bitrix["Bitrix 24"]
nats --> descent["Descent"] --> yandex["Yandex"]
nats --> bilge["Bilge"]

amigofish -->|FrontProducer| nats
chronograf["Chronograf<br/>Scheduler"] -->|publish| nats

style sprut fill:#5a9e6f,stroke:#3a7a4f,color:#fff
style nats fill:#5b9dad,stroke:#3a7a8a,color:#fff
style amigofish fill:#c47c2b,stroke:#9a5f1a,color:#fff
style anglerfish fill:#c47c2b,stroke:#9a5f1a,color:#fff

Основные потоки

1. Пользовательский запрос (Chat Flow)

Путь запроса от пользователя до ответа с RAG-обогащением:

Исходный код диаграммы:

graph TD
user["Пользователь"] -->|HTTPS| seadragon["Seadragon<br/><i>Next.js 15, AI SDK 5.0</i>"]
seadragon -->|REST| sprut["Sprut<br/><i>API Gateway</i>"]
sprut -->|NATS publish| nats["NATS JetStream<br/><i>complete_tasks stream</i>"]
nats -->|"consume (orker)"| amigofish["Amigofish<br/><i>LangGraph Orchestrator</i>"]

amigofish -->|RAG Agent| milvus[("Milvus<br/><i>BM25 + dense + RRF</i>")]
amigofish -->|Web Agent| tavily["Tavily API"]
amigofish -->|"gpt-5.2"| llm["OpenAI API"]
amigofish -->|документы| stonefish["Stonefish<br/><i>PDF/DOCX/XLSX/PPTX</i>"]

amigofish -->|"FrontProducer<br/>completion.{chat_id}"| nats
nats -->|SSE стриминг| seadragon

style amigofish fill:#c47c2b,stroke:#9a5f1a,color:#fff
style nats fill:#5b9dad,stroke:#3a7a8a,color:#fff
style milvus fill:#4a7eb5,stroke:#2d5f8a,color:#fff

Жизненный цикл запроса (14 шагов)

Каждый пользовательский запрос проходит через полный жизненный цикл внутри Amigofish worker:

  1. Загрузка данных из PostgreSQL (через psdot): чат + вложения
  2. Декодирование вложений (изображения сжимаются, PDF конвертируется в markdown)
  3. Параллельно: суммаризация истории / генерация заголовка чата (async)
  4. Контекстуализация последнего сообщения пользователя (LLM rewrite)
  5. Очистка устаревших сообщений NATS subject
  6. DoubloonManager().__aenter__ — холдирование кредитов
  7. build_graph() — сборка CompiledStateGraph
  8. graph.astream(initial_state, stream_mode=custom) — запуск графа
  9. Стриминг JSON-событий в NATS completion.{chat_id}
  10. При GraphInterrupt — ожидание ответа на уточнение, Command(resume=answer), цикл
  11. По завершении — отправка {"type":"finish"}
  12. Ожидание фоновых задач summary/title
  13. DoubloonManager.__aexit__ — коммит использования токенов
  14. on_success — сохранение сообщения ассистента + метаданных чата

Каждое событие, отправленное в NATS, также добавляется в whole_response, сжимается (соседние text-delta с одинаковым id объединяются) и сохраняется в строке сообщения при on_success.

LangGraph — граф оркестрации

Amigofish использует LangGraph для мультиагентной оркестрации. Граф состоит из специализированных узлов (агентов) и условных переходов:

Исходный код диаграммы:

graph TD
START(("START")) --> coordinator["coordinator<br/><i>classify_intent + decide</i>"]

coordinator -->|"research"| rag_agent["rag_agent<br/><i>RAG ReAct loop</i>"]
coordinator -->|"web search"| web_agent["web_agent<br/><i>Web ReAct loop</i>"]
coordinator -->|"document"| document_agent["document_agent<br/><i>Stonefish generation</i>"]
coordinator -->|"direct"| direct_answer["direct_answer<br/><i>greeting/math/meta</i>"]
coordinator -->|"unintelligible"| clarify["clarify<br/><i>polite rephrase request</i>"]
coordinator -->|"ambiguous"| generate_questions["generate_questions<br/><i>2-3 disambiguation Q</i>"]
coordinator -->|"ready"| synthesis["synthesis<br/><i>Answer + MindMap streaming</i>"]

rag_agent --> analyst["analyst<br/><i>sufficiency verdict</i>"]
web_agent --> analyst

analyst -->|"loop back"| coordinator

document_agent --> synthesis
direct_answer --> synthesis
clarify --> synthesis

generate_questions --> clarify_interrupt["clarify_interrupt<br/><i>LangGraph interrupt</i>"]
clarify_interrupt --> refine_query["refine_query<br/><i>merge Q&A into query</i>"]
refine_query --> coordinator

synthesis --> END(("END"))

style coordinator fill:#5a9e6f,stroke:#3a7a4f,color:#fff
style analyst fill:#5b9dad,stroke:#3a7a8a,color:#fff
style synthesis fill:#c47c2b,stroke:#9a5f1a,color:#fff
style rag_agent fill:#4a7eb5,stroke:#2d5f8a,color:#fff
style web_agent fill:#4a7eb5,stroke:#2d5f8a,color:#fff
style document_agent fill:#8b6db8,stroke:#6a4f9a,color:#fff

Узлы графа:

УзелНазначение
coordinatorКлассификация intent + принятие решения (подграф classify_intent -> decide)
rag_agentReAct-агент для поиска по базе знаний (tools: RAG, Tree, GetDocumentContent)
web_agentReAct-агент для поиска в интернете (tools: WebSearch, WebExtract)
document_agentГенерация/редактирование файлов через Stonefish (PDF/DOCX/XLSX/PPTX)
analystВердикт о достаточности исследования (AnalystVerdict: sufficient/missing/suggestions/summary)
synthesisФинальный ответ через streaming Answer tool + опциональный MindMap
direct_answerПрямой ответ на приветствия, математику, мета-вопросы
clarifyЗапрос переформулировки при непонятном вводе
generate_questionsГенерация 2-3 уточняющих вопросов при неоднозначности
clarify_interruptLangGraph interrupt() — приостановка выполнения графа
refine_queryОбъединение исходного запроса + ответов пользователя в уточнённый запрос

Логика маршрутизации (_coordinator_route, приоритет жёстко задан):

  1. route_intent == "document" — всегда document_agent (LLM не может генерировать бинарные файлы)
  2. direct_answer == Truedirect_answer
  3. route_intent == "clarify"clarify
  4. route_intent == "ambiguous" и clarification_count < max_clarification_roundsgenerate_questions
  5. ready_for_synthesis или нет агентов — synthesis
  6. Один агент — направление к нему; несколько — LangGraph list-branch (fan-out, параллельный запуск)

Подграф уточнения (Clarification sub-flow)

При неоднозначном запросе активируется цикл уточнения:

Исходный код диаграммы:

graph LR
coordinator -->|"ambiguous"| gq["generate_questions<br/><i>LLM: 2-3 вопроса</i>"]
gq --> ci["clarify_interrupt<br/><i>LangGraph interrupt()</i>"]
ci -->|"NATS: clarification-request<br/>30s heartbeat"| user["Ожидание ответа<br/>пользователя"]
user -->|"Command(resume=answer)"| rq["refine_query<br/><i>merge Q&A</i>"]
rq --> coordinator

Worker ловит GraphInterrupt в complete.py, отправляет событие clarification-request в NATS, подписывается на clarification_answer.{complete_id} с heartbeat 30 сек, и при получении ответа возобновляет граф через Command(resume=answer). Требуется MemorySaver checkpointer (активируется при max_clarification_rounds > 0, по умолчанию отключен).

Кооперативная отмена (Cooperative Cancellation)

Отмена генерации реализована кооперативно: между событиями вызывается check_actual_complete_id, который сравнивает complete_id из Redis (ключ complete_{chat_id}) с id текущей задачи. При несовпадении выбрасывается AlreadyCompleting — генерация прерывается. Это позволяет фронтенду остановить работающую генерацию, отправив новый запрос.

Биллинговый цикл (DoubloonManager)

DoubloonManager — async context manager, управляющий биллингом AI-запроса:

  1. Hold — резервирование ~30 кредитов при входе в контекст
  2. Выполнение — граф потребляет токены разных типов
  3. Commit — списание фактического использования с детализацией:
Тип токеновОписание
openai_input_tokensВходные токены OpenAI
openai_cached_input_tokensКэшированные входные токены
openai_output_tokensВыходные токены
openai_output_tokens_reasoningТокены reasoning
openai_small_embedding_tokensТокены эмбеддингов
tavily_tokensТокены Tavily (20 за search, 4 за extract)
  1. Release — при ошибке холд освобождается

Stonefish — генерация документов

Amigofish интегрирован со Stonefish — FastAPI-сервисом для рендеринга бинарных файлов. Поддерживаемые форматы: PDF, DOCX, XLSX, PPTX.

  • GenerateDocument(query, raw_data) — вызывает Stonefish POST /api/document/generate, сохраняет файл как Artifact
  • EditDocument(artifact_id, instructions) — повторный вызов POST /api/document/edit с сохранённым base_code
  • Таймаут: 900 сек (генерация бинарных файлов медленная)
  • Ограничение: один вызов генерации/редактирования за ход

Stonefish использует Claude API через source/src/services/claude.py для синтеза содержимого документов. Dockerfile устанавливает Claude Code CLI поверх базового образа cr.monsterscorp.ru:5000/python:latest. Сервис деплоится через собственный Helm chart (tools/charts/stonefish/) с отдельными конфигурациями для dev и pro окружений.

Формат событий NATS (AI-стриминг)

Все события — JSON-объекты в completion.{chat_id}:

ТипПоляОписание
reasoning-startidОткрытие блока reasoning
reasoning-deltaid, delta, [tool, meta]Фрагмент «размышления» (на русском)
reasoning-endid, [duration]Закрытие блока reasoning
text-startidНачало текстового блока ответа
text-deltaid, deltaИнкрементальная часть текста ответа
text-endidЗавершение текстового блока
citation-markertextId, markerId, sourceId, index, length, unitИнлайн-маркер цитаты в тексте
citation-sourceid, kind (document/web), documentId/url, ...Источник цитаты
citations-finalizeorder, unusedФинальный порядок цитат (после стрима)
tool-input-start / tool-call / tool-resulttoolCallId, toolName, input, resultMindMap / артефакт файла
clarification-requestchat_id, thread_id, questions, timeout_seconds, round, max_roundsЗапрос уточнения (при interrupt)
finishВсегда последнее событие

Resume stream

При перезагрузке страницы ChatResumeHandler восстанавливает прерванный AI-стрим через DeliverPolicy.StartTime (NATS JetStream) — читает с сохранённого timestamp. Это позволяет пользователю не потерять ответ при случайном обновлении страницы.

2. Индексирование документов (Indexing Flow)

Путь документа от загрузки до готовности к RAG-поиску:

Исходный код диаграммы:

graph TD
user["👤 Пользователь"] -->|"upload: PDF, DOCX,<br/>Excel, PPTX, аудио, сканы"| sprut["Sprut<br/><i>Валидация, метаданные</i>"]
sprut -->|NATS event| anglerfish["Anglerfish<br/><i>LlamaIndex</i>"]
anglerfish --> krill["Krill<br/><i>OCR + PDF parsing</i>"]
anglerfish --> marker["Marker API<br/><i>PDF → текст</i>"]
anglerfish --> vision["OpenAI Vision<br/><i>Изображения → описание</i>"]

anglerfish -->|"Chunking + Embedding"| openai["OpenAI API<br/><i>text-embedding-3-small</i>"]
openai -->|vectors| milvus[("Milvus<br/><i>BM25 + dense + RRF</i>")]

style anglerfish fill:#c47c2b,stroke:#9a5f1a,color:#fff
style milvus fill:#4a7eb5,stroke:#2d5f8a,color:#fff

Три типа операций Anglerfish:

  1. index — индексирование нового документа
  2. attachment_index — индексирование вложений
  3. unindex — удаление из индекса

3. Биллинг и платежи (Payment Flow)

Исходный код диаграммы:

graph TD
user["👤 Пользователь"] -->|"Выбор тарифа / оплата"| sprut["Sprut<br/><i>/billing endpoint</i>"]
sprut --> doubloon["Doubloon<br/><i>Billing Service</i>"]
doubloon <-->|"карты, SBP"| tinkoff["Tinkoff API"]
doubloon --> postgres[("PostgreSQL<br/><i>транзакции, баланс</i>")]
doubloon -->|NATS event| chronograf["Chronograf<br/><i>Автоплатежи по расписанию</i>"]
doubloon -->|NATS event| anchor["Anchor<br/><i>Payment status</i>"]

style doubloon fill:#8b6db8,stroke:#6a4f9a,color:#fff

Биллинговый цикл AI-запроса (DoubloonManager):

  1. hold — резервирование кредитов перед генерацией
  2. Выполнение AI-запроса (токены: input, cached_input, output, reasoning, embedding, tavily)
  3. commit — списание фактически потреблённых кредитов
  4. При ошибке — release (освобождение резерва)

4. Email-уведомления (Notification Flow)

Исходный код диаграммы:

graph LR
event["Событие<br/><i>регистрация, оплата, подписка</i>"] -->|NATS event| gull["Gull<br/><i>Orker Worker</i>"]
gull --> smtp["SMTP<br/><i>HTML-шаблоны</i>"]
gull --> bitrix["Bitrix CRM<br/><i>лиды, контакты</i>"]
gull --> unisender["UniSender<br/><i>маркетинговые рассылки</i>"]

style gull fill:#8b6db8,stroke:#6a4f9a,color:#fff

Протоколы взаимодействия

Полная таблица протоколов с описанием каждого канала связи — в разделе Обзор API → Протоколы коммуникации.

Кратко: внешние клиенты взаимодействуют с платформой через HTTPS; синхронное межсервисное взаимодействие — через REST/HTTP; асинхронная обработка задач и AI-стриминг — через NATS JetStream.

Хранилища данных

ХранилищеДанныеЗаписьЧтение
PostgreSQLПользователи, чаты, документы, транзакции, подпискиВсе сервисы (через psdot models)Все сервисы
MilvusВекторные эмбеддинги документов (hybrid BM25+dense)AnglerfishAmigofish, Sprut
RedisApplication cache (aiocache), complete_id для кооперативной отменыSprut, WorkersSprut, Workers
MinIOФайлы документовSprut, AnglerfishAmigofish
Yandex Object StorageПользовательские файлыDescentSprut
NATS JetStreamСобытия и задачиВсе продюсерыOrker workers

Связанные страницы