Потоки данных
Общая схема потоков данных
Исходный код диаграммы:
graph LR
browser["Browser"] --> seadragon["Seadragon<br/>Next.js"]
seadragon -->|"/api/chat SSE"| nats["NATS JetStream"]
seadragon -->|"REST /api/*"| sprut["Sprut<br/>API Gateway"]
sprut --> postgres[("PostgreSQL")]
sprut --> milvus[("Milvus")]
sprut --> keycloak["Keycloak"]
sprut --> doubloon["Doubloon"] --> tinkoff["Tinkoff API"]
sprut -->|task publish| nats
nats --> amigofish["Amigofish"] --> llm["LLM APIs"]
nats --> anglerfish["Anglerfish"] --> milvus
nats --> krill["Krill<br/>PDF+OCR"]
nats --> gull["Gull"] --> bitrix["Bitrix 24"]
nats --> descent["Descent"] --> yandex["Yandex"]
nats --> bilge["Bilge"]
amigofish -->|FrontProducer| nats
chronograf["Chronograf<br/>Scheduler"] -->|publish| nats
style sprut fill:#5a9e6f,stroke:#3a7a4f,color:#fff
style nats fill:#5b9dad,stroke:#3a7a8a,color:#fff
style amigofish fill:#c47c2b,stroke:#9a5f1a,color:#fff
style anglerfish fill:#c47c2b,stroke:#9a5f1a,color:#fff
Основные потоки
1. Пользовательский запрос (Chat Flow)
Путь запроса от пользователя до ответа с RAG-обогащением:
Исходный код диаграммы:
graph TD
user["Пользователь"] -->|HTTPS| seadragon["Seadragon<br/><i>Next.js 15, AI SDK 5.0</i>"]
seadragon -->|REST| sprut["Sprut<br/><i>API Gateway</i>"]
sprut -->|NATS publish| nats["NATS JetStream<br/><i>complete_tasks stream</i>"]
nats -->|"consume (orker)"| amigofish["Amigofish<br/><i>LangGraph Orchestrator</i>"]
amigofish -->|RAG Agent| milvus[("Milvus<br/><i>BM25 + dense + RRF</i>")]
amigofish -->|Web Agent| tavily["Tavily API"]
amigofish -->|"gpt-5.2"| llm["OpenAI API"]
amigofish -->|документы| stonefish["Stonefish<br/><i>PDF/DOCX/XLSX/PPTX</i>"]
amigofish -->|"FrontProducer<br/>completion.{chat_id}"| nats
nats -->|SSE стриминг| seadragon
style amigofish fill:#c47c2b,stroke:#9a5f1a,color:#fff
style nats fill:#5b9dad,stroke:#3a7a8a,color:#fff
style milvus fill:#4a7eb5,stroke:#2d5f8a,color:#fff
Жизненный цикл запроса (14 шагов)
Каждый пользовательский запрос проходит через полный жизненный цикл внутри Amigofish worker:
- Загрузка данных из PostgreSQL (через
psdot): чат + вложения - Декодирование вложений (изображения сжимаются, PDF конвертируется в markdown)
- Параллельно: суммаризация истории / генерация заголовка чата (async)
- Контекстуализация последнего сообщения пользователя (LLM rewrite)
- Очистка устаревших сообщений NATS subject
DoubloonManager().__aenter__— холдирование кредитовbuild_graph()— сборкаCompiledStateGraphgraph.astream(initial_state, stream_mode=custom)— запуск графа- Стриминг JSON-событий в NATS
completion.{chat_id} - При
GraphInterrupt— ожидание ответа на уточнение,Command(resume=answer), цикл - По завершении — отправка
{"type":"finish"} - Ожидание фоновых задач summary/title
DoubloonManager.__aexit__— коммит использования токеновon_success— сохранение сообщения ассистента + метаданных чата
Каждое событие, отправленное в NATS, также добавляется в whole_response, сжимается (соседние text-delta с одинаковым id объединяются) и сохраняется в строке сообщения при on_success.
LangGraph — граф оркестрации
Amigofish использует LangGraph для мультиагентной оркестрации. Граф состоит из специализированных узлов (агентов) и условных переходов:
Исходный код диаграммы:
graph TD
START(("START")) --> coordinator["coordinator<br/><i>classify_intent + decide</i>"]
coordinator -->|"research"| rag_agent["rag_agent<br/><i>RAG ReAct loop</i>"]
coordinator -->|"web search"| web_agent["web_agent<br/><i>Web ReAct loop</i>"]
coordinator -->|"document"| document_agent["document_agent<br/><i>Stonefish generation</i>"]
coordinator -->|"direct"| direct_answer["direct_answer<br/><i>greeting/math/meta</i>"]
coordinator -->|"unintelligible"| clarify["clarify<br/><i>polite rephrase request</i>"]
coordinator -->|"ambiguous"| generate_questions["generate_questions<br/><i>2-3 disambiguation Q</i>"]
coordinator -->|"ready"| synthesis["synthesis<br/><i>Answer + MindMap streaming</i>"]
rag_agent --> analyst["analyst<br/><i>sufficiency verdict</i>"]
web_agent --> analyst
analyst -->|"loop back"| coordinator
document_agent --> synthesis
direct_answer --> synthesis
clarify --> synthesis
generate_questions --> clarify_interrupt["clarify_interrupt<br/><i>LangGraph interrupt</i>"]
clarify_interrupt --> refine_query["refine_query<br/><i>merge Q&A into query</i>"]
refine_query --> coordinator
synthesis --> END(("END"))
style coordinator fill:#5a9e6f,stroke:#3a7a4f,color:#fff
style analyst fill:#5b9dad,stroke:#3a7a8a,color:#fff
style synthesis fill:#c47c2b,stroke:#9a5f1a,color:#fff
style rag_agent fill:#4a7eb5,stroke:#2d5f8a,color:#fff
style web_agent fill:#4a7eb5,stroke:#2d5f8a,color:#fff
style document_agent fill:#8b6db8,stroke:#6a4f9a,color:#fff
Узлы графа:
| Узел | Назначение |
|---|---|
coordinator | Классификация intent + принятие решения (подграф classify_intent -> decide) |
rag_agent | ReAct-агент для поиска по базе знаний (tools: RAG, Tree, GetDocumentContent) |
web_agent | ReAct-агент для поиска в интернете (tools: WebSearch, WebExtract) |
document_agent | Генерация/редактирование файлов через Stonefish (PDF/DOCX/XLSX/PPTX) |
analyst | Вердикт о достаточности исследования (AnalystVerdict: sufficient/missing/suggestions/summary) |
synthesis | Финальный ответ через streaming Answer tool + опциональный MindMap |
direct_answer | Прямой ответ на приветствия, математику, мета-вопросы |
clarify | Запрос переформулировки при непонятном вводе |
generate_questions | Генерация 2-3 уточняющих вопросов при неоднозначности |
clarify_interrupt | LangGraph interrupt() — приостановка выполнения графа |
refine_query | Объединение исходного запроса + ответов пользователя в уточнённый запрос |
Логика маршрутизации (_coordinator_route, приоритет жёстко задан):
route_intent == "document"— всегдаdocument_agent(LLM не может генерировать бинарные файлы)direct_answer == True—direct_answerroute_intent == "clarify"—clarifyroute_intent == "ambiguous"иclarification_count < max_clarification_rounds—generate_questionsready_for_synthesisили нет агентов —synthesis- Один агент — направление к нему; несколько — LangGraph list-branch (fan-out, параллельный запуск)
Подграф уточнения (Clarification sub-flow)
При неоднозначном запросе активируется цикл уточнения:
Исходный код диаграммы:
graph LR
coordinator -->|"ambiguous"| gq["generate_questions<br/><i>LLM: 2-3 вопроса</i>"]
gq --> ci["clarify_interrupt<br/><i>LangGraph interrupt()</i>"]
ci -->|"NATS: clarification-request<br/>30s heartbeat"| user["Ожидание ответа<br/>пользователя"]
user -->|"Command(resume=answer)"| rq["refine_query<br/><i>merge Q&A</i>"]
rq --> coordinator
Worker ловит GraphInterrupt в complete.py, отправляет событие clarification-request в NATS, подписывается на clarification_answer.{complete_id} с heartbeat 30 сек, и при получении ответа возобновляет граф через Command(resume=answer). Требуется MemorySaver checkpointer (активируется при max_clarification_rounds > 0, по умолчанию отключен).
Кооперативная отмена (Cooperative Cancellation)
Отмена генерации реализована кооперативно: между событиями вызывается check_actual_complete_id, который сравнивает complete_id из Redis (ключ complete_{chat_id}) с id текущей задачи. При несовпадении выбрасывается AlreadyCompleting — генерация прерывается. Это позволяет фронтенду остановить работающую генерацию, отправив новый запрос.
Биллинговый цикл (DoubloonManager)
DoubloonManager — async context manager, управляющий биллингом AI-запроса:
- Hold — резервирование ~30 кредитов при входе в контекст
- Выполнение — граф потребляет токены разных типов
- Commit — списание фактического использования с детализацией:
| Тип токенов | Описание |
|---|---|
openai_input_tokens | Входные токены OpenAI |
openai_cached_input_tokens | Кэшированные входные токены |
openai_output_tokens | Выходные токены |
openai_output_tokens_reasoning | Токены reasoning |
openai_small_embedding_tokens | Токены эмбеддингов |
tavily_tokens | Токены Tavily (20 за search, 4 за extract) |
- Release — при ошибке холд освобождается
Stonefish — генерация документов
Amigofish интегрирован со Stonefish — FastAPI-сервисом для рендеринга бинарных файлов. Поддерживаемые форматы: PDF, DOCX, XLSX, PPTX.
GenerateDocument(query, raw_data)— вызывает StonefishPOST /api/document/generate, сохраняет файл какArtifactEditDocument(artifact_id, instructions)— повторный вызовPOST /api/document/editс сохранённымbase_code- Таймаут: 900 сек (генерация бинарных файлов медленная)
- Ограничение: один вызов генерации/редактирования за ход
Stonefish использует Claude API через source/src/services/claude.py для синтеза содержимого документов. Dockerfile устанавливает Claude Code CLI поверх базового образа cr.monsterscorp.ru:5000/python:latest. Сервис деплоится через собственный Helm chart (tools/charts/stonefish/) с отдельными конфигурациями для dev и pro окружений.
Формат событий NATS (AI-стриминг)
Все события — JSON-объекты в completion.{chat_id}:
| Тип | Поля | Описание |
|---|---|---|
reasoning-start | id | Открытие блока reasoning |
reasoning-delta | id, delta, [tool, meta] | Фрагмент «размышления» (на русском) |
reasoning-end | id, [duration] | Закрытие блока reasoning |
text-start | id | Начало текстового блока ответа |
text-delta | id, delta | Инкрементальная часть текста ответа |
text-end | id | Завершение текстового блока |
citation-marker | textId, markerId, sourceId, index, length, unit | Инлайн-маркер цитаты в тексте |
citation-source | id, kind (document/web), documentId/url, ... | Источник цитаты |
citations-finalize | order, unused | Финальный порядок цитат (после стрима) |
tool-input-start / tool-call / tool-result | toolCallId, toolName, input, result | MindMap / артефакт файла |
clarification-request | chat_id, thread_id, questions, timeout_seconds, round, max_rounds | Запрос уточнения (при interrupt) |
finish | — | Всегда последнее событие |
Resume stream
При перезагрузке страницы ChatResumeHandler восстанавливает прерванный AI-стрим через DeliverPolicy.StartTime (NATS JetStream) — читает с сохранённого timestamp. Это позволяет пользователю не потерять ответ при случайном обновлении страницы.
2. Индексирование документов (Indexing Flow)
Путь документа от загрузки до готовности к RAG-поиску:
Исходный код диаграммы:
graph TD
user["👤 Пользователь"] -->|"upload: PDF, DOCX,<br/>Excel, PPTX, аудио, сканы"| sprut["Sprut<br/><i>Валидация, метаданные</i>"]
sprut -->|NATS event| anglerfish["Anglerfish<br/><i>LlamaIndex</i>"]
anglerfish --> krill["Krill<br/><i>OCR + PDF parsing</i>"]
anglerfish --> marker["Marker API<br/><i>PDF → текст</i>"]
anglerfish --> vision["OpenAI Vision<br/><i>Изображения → описание</i>"]
anglerfish -->|"Chunking + Embedding"| openai["OpenAI API<br/><i>text-embedding-3-small</i>"]
openai -->|vectors| milvus[("Milvus<br/><i>BM25 + dense + RRF</i>")]
style anglerfish fill:#c47c2b,stroke:#9a5f1a,color:#fff
style milvus fill:#4a7eb5,stroke:#2d5f8a,color:#fff
Три типа операций Anglerfish:
index— индексирование нового документаattachment_index— индексирование вложенийunindex— удаление из индекса
3. Биллинг и платежи (Payment Flow)
Исходный код диаграммы:
graph TD
user["👤 Пользователь"] -->|"Выбор тарифа / оплата"| sprut["Sprut<br/><i>/billing endpoint</i>"]
sprut --> doubloon["Doubloon<br/><i>Billing Service</i>"]
doubloon <-->|"карты, SBP"| tinkoff["Tinkoff API"]
doubloon --> postgres[("PostgreSQL<br/><i>транзакции, баланс</i>")]
doubloon -->|NATS event| chronograf["Chronograf<br/><i>Автоплатежи по расписанию</i>"]
doubloon -->|NATS event| anchor["Anchor<br/><i>Payment status</i>"]
style doubloon fill:#8b6db8,stroke:#6a4f9a,color:#fff
Биллинговый цикл AI-запроса (DoubloonManager):
hold— резервирование кредитов перед генерацией- Выполнение AI-запроса (токены: input, cached_input, output, reasoning, embedding, tavily)
commit— списание фактически потреблённых кредитов- При ошибке —
release(освобождение резерва)
4. Email-уведомления (Notification Flow)
Исходный код диаграммы:
graph LR
event["Событие<br/><i>регистрация, оплата, подписка</i>"] -->|NATS event| gull["Gull<br/><i>Orker Worker</i>"]
gull --> smtp["SMTP<br/><i>HTML-шаблоны</i>"]
gull --> bitrix["Bitrix CRM<br/><i>лиды, контакты</i>"]
gull --> unisender["UniSender<br/><i>маркетинговые рассылки</i>"]
style gull fill:#8b6db8,stroke:#6a4f9a,color:#fff
Протоколы взаимодействия
Полная таблица протоколов с описанием каждого канала связи — в разделе Обзор API → Протоколы коммуникации.
Кратко: внешние клиенты взаимодействуют с платформой через HTTPS; синхронное межсервисное взаимодействие — через REST/HTTP; асинхронная обработка задач и AI-стриминг — через NATS JetStream.
Хранилища данных
| Хранилище | Данные | Запись | Чтение |
|---|---|---|---|
| PostgreSQL | Пользователи, чаты, документы, транзакции, подписки | Все сервисы (через psdot models) | Все сервисы |
| Milvus | Векторные эмбеддинги документов (hybrid BM25+dense) | Anglerfish | Amigofish, Sprut |
| Redis | Application cache (aiocache), complete_id для кооперативной отмены | Sprut, Workers | Sprut, Workers |
| MinIO | Файлы документов | Sprut, Anglerfish | Amigofish |
| Yandex Object Storage | Пользовательские файлы | Descent | Sprut |
| NATS JetStream | События и задачи | Все продюсеры | Orker workers |