Observability
Langfuse
Langfuse handles LLM observability — tracing, cost tracking, and quality monitoring.
Implementation
src/observability/langfuse.ts
Configuration
| Variable | Required | Description |
|---|---|---|
LANGFUSE_PUBLIC_KEY | no | Public key (pk-lf-...) |
LANGFUSE_SECRET_KEY | no | Secret key (sk-lf-...) |
LANGFUSE_HOST | no | Server URL (default: cloud.langfuse.com) |
Langfuse is optional. Without keys, tracing is silently disabled.
Traced Operations
| Span Name | Component | What's Traced |
|---|---|---|
intent.classify | Intent classifier | Input text, matched intent, confidence |
ingestion.synthetic-qa | Ingestion pipeline | Content length, generated questions |
rag.query | RAG pipeline | Query, retrieved chunks, scores, generation |
generation | LLM calls | Model, input/output tokens, latency, cost |
Data Model
Traces are stored locally in the AgentRun model:
traceId— Langfuse trace IDprovider,model— LLM provider infoinputTokens,outputTokens,totalTokens— token usagecostUsd— estimated costlatencyMs— response time
Related RetrievalHit records track which chunks were retrieved and their scores.
Viewing Traces
Via API
GET /api/v1/traces/:trace_id
Returns the AgentRun with all retrieval hits.
Via Langfuse Dashboard
The Langfuse dashboard at your LANGFUSE_HOST shows:
- Trace timelines
- Token usage over time
- Cost breakdowns by model
- Quality metrics
Agent Task Monitoring
The agent runner writes every task and its tool-call steps to the database and broadcasts live events over WebSocket.
Data Model
Each inbound message creates an AgentTask with:
adapterType— which adapter processed it (api, claude_local, codex_local, ollama)status— lifecycle state (queued → running → done/failed/timeout)- Token usage (
inputTokens,outputTokens,totalTokens) andcostUsd durationMs— end-to-end processing time- Nested
AgentToolCallrecords for each pipeline step
REST API
| Endpoint | Purpose |
|---|---|
GET /api/v1/agent-tasks | List tasks with filters (namespace, department, status, adapter, date range) |
GET /api/v1/agent-tasks/:id | Task detail with tool call trace |
GET /api/v1/agent-tasks/stats | Aggregated stats: success rate, avg duration, avg cost per adapter/namespace |
Real-Time Events (WebSocket)
Connect to ws://<host>/ws/agent-tasks, send the first-message auth handshake ({ "action": "auth", "token": "<jwt>" }), then subscribe by namespace to receive live events:
agent-task.created— task queuedagent-task.started— worker picked upagent-task.tool-call— individual step completed (inject_profile,rag_search,generate,confidence_check, adapter calls)agent-task.completed— task done with outputagent-task.failed— task failed or timed out
Bull-board
Queue dashboard at /admin/queues (requires canEditSettings). Shows all BullMQ queues — including agent-tasks — with job status, retry counts, and failed-job inspection.
Audit Log
Every mutation is logged automatically for compliance and debugging.
Implementation
src/middleware/audit.ts
Automatic Logging
The audit middleware intercepts every POST, PATCH, and DELETE request and creates an AuditLog entry:
| Field | Source |
|---|---|
userId | JWT payload |
action | Inferred from method + URL (create/update/delete/approve/reject/escalate/login) |
entityType | Inferred from URL path |
entityId | Extracted from URL params |
changes | Request body (JSON) |
ipAddress | Client IP |
userAgent | Client user agent header |
Action Inference
| URL Pattern | Method | Action |
|---|---|---|
/auth/login | POST | login |
/approvals/:id/approve | POST | approve |
/approvals/:id/reject | POST | reject |
/approvals/:id/escalate | POST | escalate |
* | POST | create |
* | PATCH | update |
* | DELETE | delete |
Explicit Logging
Business logic can also log events directly:
import { logAuditEvent } from '../middleware/audit';
await logAuditEvent({
userId: user.id,
action: 'approve',
entityType: 'approval',
entityId: approval.id,
changes: { status: 'approved' },
ipAddress: request.ip,
userAgent: request.headers['user-agent'],
});
Querying
List Audit Logs
GET /api/v1/audit?action=approve&entityType=approval&userId=<id>&limit=50&offset=0
Auth: permission canViewAudit.
Indexes
The AuditLog table is indexed on:
userId— filter by actor(entityType, entityId)— filter by targetcreatedAt— time-range queries