Skip to main content

Architecture

System Overview

AgentCore is a multi-channel AI assistant platform that connects messaging channels (WhatsApp Cloud API, Telegram) to a department-scoped RAG knowledge base with human-in-the-loop approval workflows.

┌─────────────┐   ┌─────────────┐
│ WhatsApp │ │ Telegram │
│ Cloud API │ │ (Grammy) │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌──────────────────────────────────┐
│ Fastify Server │
│ routes, auth, RBAC, dept scope │
└──────────────┬───────────────────┘

┌──────────┼──────────┬──────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌───────┐ ┌────────────┐ ┌────────────┐
│ Agent │ │ HITL │ │ Memory │ │ WebSocket │
│ Runner │ │Approve│ │ Extraction │ │ Events │
└───┬────┘ └───┬───┘ └─────┬──────┘ └─────┬──────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────┐
│ PostgreSQL 16 + pgvector │
│ Redis 7 (BullMQ queues + rate-limit store) │
└──────────────────────────────────────────────────┘

Message Pipeline

ADR-001 makes agent-tasks the primary execution boundary for inbound messages. Channel workers are transport adapters — they normalize inbound messages, persist user/conversation/message records, create an AgentTask, and enqueue it. They no longer own the RAG/HITL decision pipeline.

  1. Channel receives message through a WhatsApp Cloud API webhook or Telegram polling/webhook.
  2. Channel inbound queue (wa-inbound or tg-inbound) normalizes the transport payload.
  3. Inbound worker finds or creates the user and conversation, stores the user message, creates an AgentTask, and enqueues agent-tasks.
  4. Agent runner worker processes the task:
    • inject_profile: load namespace system prompt and employee profile context.
    • rag_search: run OpenAiRagPipeline when wired, or the namespace-selected adapter fallback in test/adapter mode.
    • generate: format the channel reply.
    • confidence_check: handle prompt-injection routing, persona escalation triggers, intent classification, confidence fallback, and trust-matrix bypass.
  5. Routing result is persisted as AgentToolCall rows and one of:
    • auto-send through wa-outbound or tg-outbound;
    • create a pending Approval and move the conversation to awaiting_approval;
    • send persona escalation text and move the conversation to escalated.
  6. On approval the approved or edited answer is enqueued to the outbound channel queue.
  7. Memory extraction runs after the configured message interval.
  8. WebSocket events broadcast task lifecycle in real time (agent-task.created, .started, .tool-call, .completed, .failed) with department filtering on every delivery.

Component Architecture

Fastify Application (src/app.ts)

The app factory registers components in this order:

  1. OpenAPI: Swagger and Swagger UI (/docs) with /api/v1 as the API server prefix.
  2. Security plugins: Helmet, exact-origin CORS, global rate limit, JWT auth, and departmentScope.
  3. Middleware: audit logger, idempotency, and structured error handler.
  4. Background workers: knowledge ingestion, memory extraction, and agent runner queues.
  5. Routes: health, auth, knowledge, approvals, traces, namespaces, me, conversations, departments, users, roles, audit, RAG draft, employee profiles, intents, agent tasks, plugins, document templates, and notifications under /api/v1.
  6. Channels: WhatsApp Cloud API plugin and Telegram plugin under /api/v1.
  7. Monitoring: Bull-board admin UI (/admin/queues) and WebSocket event bridges (/ws/agent-tasks, /ws/notifications).

Plugin System

Fastify plugins provide:

  • authenticate decorator — JWT verification hook (Bearer header only; WebSocket uses first-message auth handshake)
  • departmentScope decorator — request-local DepartmentScope from forDepartment(request.user)
  • knowledgeIngestionQueue — BullMQ queue for document processing
  • memoryExtractionQueue — BullMQ queue for profile extraction
  • agentTasksQueue — BullMQ queue for agent task processing
  • channelService — channel configuration abstraction, currently backed by environment variables
  • broadcastAgentTaskEvent — WebSocket broadcast helper that revalidates namespace department access before each send
  • pluginRegistry — namespace integration plugin registry with built-in OpenDataBot and webhook plugins
  • notification helpers — persisted user notifications plus WebSocket delivery

Queue Architecture (BullMQ + Redis)

QueuePurposeConcurrency
agent-tasksAgent task processing via adaptersconfigurable
knowledge-ingestDocument parsing, chunking, embedding1
memory-extractionEmployee profile extraction from chats1
wa-inboundWhatsApp message processingconfigurable
wa-outboundWhatsApp message sendingconfigurable
tg-inboundTelegram message processingconfigurable
tg-outboundTelegram message sendingconfigurable

The agent-tasks queue uses 3 retries with exponential backoff (2s base delay). Per-adapter timeouts: api 30s, claude_local/codex_local 300s, ollama 120s. Stalled-job detection auto-retries when a worker dies.

All queues retry with exponential backoff. Workers shut down gracefully on SIGTERM/SIGINT.

Agent Runner & Adapter Layer

The agent runner is the canonical message-processing pipeline. It persists each task in AgentTask, records step-level progress in AgentToolCall, and hides generation behind a pluggable AgentAdapter interface. Each namespace picks its adapter via config.agentRunner.activeAdapter.

Available adapters:

AdapterBackendTimeout
apiOpenAI SDK (chat.completions.create)30s
claude_localClaude CLI (claude --print)300s
codex_localCodex CLI (codex exec --json)300s
ollamaOllama HTTP API (OpenAI-compatible)120s

Data model: each task writes an AgentTask row with nested AgentToolCall entries for inject_profile, rag_search, generate, confidence_check, and any adapter-level calls from fallback mode. Token usage, cost, and duration are tracked per task.

See Configuration for namespace adapter setup.

Department Isolation

ADR-002 centralizes department access through src/lib/department-scope.ts.

  • forDepartment(user) returns a DepartmentScope.
  • scope.directWhere() scopes Prisma models with a direct departmentId.
  • scope.nestedWhere('namespace') scopes models such as AgentTask through related namespaces.
  • scope.departmentId is used in raw SQL RAG filters.
  • Admin users get an all-department scope; all other roles are restricted to their JWT departmentId.

The Fastify plugin in src/plugins/department-scope.ts decorates authenticated requests with request.departmentScope. REST routes, RAG retrieval, the agent runner, and WebSocket broadcasts use the same scope primitive. The regression harness in tests/department-isolation.test.ts verifies list, detail, mutation, RAG, analytics, and WebSocket isolation.

Data Flow: Knowledge Ingestion

Upload → Parse (PDF/DOCX/TXT/Image) → PII Scrub → Chunk → Embed → Synthetic Q&A → Store

See Knowledge & RAG for details.

Data Flow: RAG Query

User Query → Injection Guard → Intent Classify → Embed Query
→ Vector Search (chunks + questions) + Keyword Search
→ Hybrid Score + Rank → Top-K Assembly
→ LLM Generation (with system prompt + history + profile)
→ PII Restore → Confidence Check → Bypass or HITL

See Knowledge & RAG for details.

Key Design Decisions

Namespace Isolation

Each department owns namespaces with custom system prompts, persona config, escalation rules, and trust matrices. Non-global users see only namespace-backed data inside their effective department scope.

Two-Tier PII Protection

  • Ingestion time — one-way scrubbing before chunks and embeddings are stored.
  • Conversation time — reversible AES-256-GCM encryption with PII_ENCRYPTION_KEY. The LLM sees placeholders; user-facing responses restore the originals.

Trust Matrix

Intent-level autonomy tracking. Once an intent collects enough approvals, the system starts auto-sending — with configurable sampling for continuous validation.

Hybrid Retrieval

Combines vector and keyword search for recall. Defaults: 65% vector, 35% keyword. The vector budget is split between chunk embeddings and synthetic-question embeddings.