Integrating AI into Existing Software: The Practical Guide
Forget the Rewrite — Build a Bridge
Every week we hear the same thing: "We want AI, so we should probably rewrite the entire system." That's the most expensive mistake you can make. Your existing system has real users, real data, and real contracts. A big-bang rewrite risks all of that — and takes three times longer than planned.
What actually works: place AI as a standalone service layer next to your system. Your existing system calls a new API. Done. No migration, no rewrite, no risk.
We've done this at SecretStack in over a dozen projects — from B2B SaaS to e-commerce to internal enterprise tools. This guide shows you exactly what it looks like technically.
Architecture: AI as a Service Layer
The central pattern is simple. Your existing system doesn't even know about the AI — it only knows an API:
┌──────────────────────────────────────────────────────────────────┐
│ AI AS A SERVICE LAYER │
│ │
│ ┌──────────────┐ ┌───────────────────┐ ┌────────────┐ │
│ │ Existing │────▶│ AI Service Layer │────▶│ LLM API │ │
│ │ System │ │ (FastAPI/Next) │ │ (Claude, │ │
│ │ │◀────│ │◀────│ GPT-4) │ │
│ └──────────────┘ └─────────┬─────────┘ └────────────┘ │
│ │ │ │
│ │ ┌──────┴──────┐ │
│ │ │ │ │
│ │ ┌──────▼──┐ ┌──────▼──────┐ │
│ │ │ Cache │ │ Vector DB │ │
│ │ │ (Redis) │ │ (pgvector) │ │
│ │ └─────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ PostgreSQL │ ← Existing DB stays untouched │
│ │ (Existing) │ │
│ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘
The AI Service Layer is its own microservice — for us typically FastAPI (Python) for ML-heavy tasks or Next.js API Routes (TypeScript) for lighter integrations. It has three jobs:
- Accept requests — from the existing system via REST or webhooks
- Orchestrate AI — build prompts, load context, call LLM, validate results
- Graceful degradation — when the AI can't deliver, the system falls back to the manual workflow
RAG Pipeline: Making Your Own Knowledge Usable
The most common use case: you want the AI to know about your data. Product catalog, support documentation, internal knowledge base. For that you need RAG — Retrieval Augmented Generation.
Here's the data flow:
┌─────────────────────────────────────────────────────────────────┐
│ RAG PIPELINE │
│ │
│ INGESTION (one-time / periodic): │
│ ┌──────────┐ ┌──────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Documents │──▶│ Chunking │──▶│ Embedding │──▶│ pgvector │ │
│ │ PDF, MD, │ │ 512 Token │ │ text- │ │ HNSW │ │
│ │ HTML, DB │ │ Overlap │ │ embedding- │ │ Index │ │
│ └──────────┘ │ 50 Token │ │ 3-large │ └───────────┘ │
│ └──────────┘ └───────────┘ │
│ │
│ QUERY (per request): │
│ ┌──────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ User │──▶│ Query │──▶│ Semantic │──▶│ LLM + │ │
│ │ Question │ │ Embedding │ │ Search │ │ Context │ │
│ │ │ │ │ │ Top-5 │ │ = Answer │ │
│ └──────────┘ └───────────┘ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────────┘
Step 1: Create the Vector Table
First you need the infrastructure. pgvector is a PostgreSQL extension — you don't need a separate vector database. It runs in your existing Postgres instance:
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Table for document chunks with embeddings
CREATE TABLE document_embeddings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
source_id TEXT NOT NULL,
source_type TEXT NOT NULL, -- 'support_doc', 'product', 'faq'
embedding vector(3072), -- OpenAI text-embedding-3-large
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- HNSW index for fast nearest-neighbor search
-- m=16 and ef_construction=128 are good defaults up to ~1M vectors
CREATE INDEX idx_embeddings_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 128);
-- Partial index for source type filtering
CREATE INDEX idx_embeddings_source_type
ON document_embeddings (source_type);
-- Useful for bulk updates: quickly find all chunks from a source
CREATE INDEX idx_embeddings_source_id
ON document_embeddings (source_id);
Step 2: Embedding Creation and Search (TypeScript)
The complete RAG pipeline in TypeScript — from document ingestion to semantic search:
// lib/rag-pipeline.ts
import OpenAI from "openai";
import { Pool } from "pg";
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
// --- Embedding creation ---
interface DocumentChunk {
content: string;
sourceId: string;
sourceType: "support_doc" | "product" | "faq";
metadata?: Record<string, unknown>;
}
async function createEmbedding(text: string): Promise<number[]> {
const response = await openai.embeddings.create({
model: "text-embedding-3-large",
input: text,
dimensions: 3072,
});
return response.data[0].embedding;
}
export async function ingestDocuments(chunks: DocumentChunk[]): Promise<void> {
// Batch embedding: max 2048 inputs per API call
const batchSize = 100;
for (let i = 0; i < chunks.length; i += batchSize) {
const batch = chunks.slice(i, i + batchSize);
const response = await openai.embeddings.create({
model: "text-embedding-3-large",
input: batch.map((c) => c.content),
dimensions: 3072,
});
const query = `
INSERT INTO document_embeddings (content, source_id, source_type, metadata, embedding)
VALUES ($1, $2, $3, $4, $5::vector)
ON CONFLICT (id) DO NOTHING
`;
for (let j = 0; j < batch.length; j++) {
await pool.query(query, [
batch[j].content,
batch[j].sourceId,
batch[j].sourceType,
JSON.stringify(batch[j].metadata ?? {}),
`[${response.data[j].embedding.join(",")}]`,
]);
}
}
}
// --- Semantic search ---
interface SearchResult {
content: string;
sourceId: string;
sourceType: string;
similarity: number;
metadata: Record<string, unknown>;
}
export async function semanticSearch(
query: string,
options: {
topK?: number;
sourceType?: string;
similarityThreshold?: number;
} = {}
): Promise<SearchResult[]> {
const { topK = 5, sourceType, similarityThreshold = 0.7 } = options;
const queryEmbedding = await createEmbedding(query);
const sql = `
SELECT
content,
source_id,
source_type,
metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM document_embeddings
WHERE 1 - (embedding <=> $1::vector) > $2
${sourceType ? "AND source_type = $4" : ""}
ORDER BY embedding <=> $1::vector
LIMIT $3
`;
const params: (string | number)[] = [
`[${queryEmbedding.join(",")}]`,
similarityThreshold,
topK,
];
if (sourceType) params.push(sourceType);
const { rows } = await pool.query(sql, params);
return rows.map((row) => ({
content: row.content,
sourceId: row.source_id,
sourceType: row.source_type,
similarity: parseFloat(row.similarity),
metadata: row.metadata,
}));
}
AI Service Endpoint with Streaming (Python/FastAPI)
For ML-heavy tasks we use FastAPI. Here's a production-ready endpoint with streaming, structured output parsing, and fallback:
# ai_service/main.py
import json
import hashlib
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from anthropic import Anthropic, APIError
import redis.asyncio as redis
app = FastAPI()
client = Anthropic()
cache = redis.from_url("redis://localhost:6379/0")
TICKET_CLASSIFICATION_PROMPT = """You are a support ticket classifier.
Analyze the following support ticket and respond EXCLUSIVELY
with valid JSON in this format:
{{
"category": "billing" | "technical" | "feature_request" | "complaint" | "other",
"priority": "low" | "medium" | "high" | "critical",
"summary": "Max 2 sentences",
"suggested_reply": "Suggested response to the customer",
"confidence": 0.0 - 1.0
}}
Ticket:
{ticket_text}
Customer history (last 5 tickets):
{customer_history}
"""
class TicketRequest(BaseModel):
ticket_text: str
customer_id: str
customer_history: list[str] = []
class TicketClassification(BaseModel):
category: str
priority: str
summary: str
suggested_reply: str
confidence: float
@app.post("/api/classify-ticket")
async def classify_ticket(req: TicketRequest) -> TicketClassification:
# Generate cache key from ticket text
cache_key = f"ticket:{hashlib.sha256(req.ticket_text.encode()).hexdigest()[:16]}"
cached = await cache.get(cache_key)
if cached:
return TicketClassification(**json.loads(cached))
prompt = TICKET_CLASSIFICATION_PROMPT.format(
ticket_text=req.ticket_text,
customer_history="\n".join(req.customer_history[-5:]) or "No history"
)
try:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
temperature=0,
messages=[{"role": "user", "content": prompt}],
)
raw = response.content[0].text
parsed = json.loads(raw)
result = TicketClassification(**parsed)
# Cache for 1 hour
await cache.set(cache_key, json.dumps(parsed), ex=3600)
return result
except (json.JSONDecodeError, KeyError) as e:
# LLM didn't return valid JSON -> fallback
return TicketClassification(
category="other",
priority="medium",
summary="Automatic classification failed",
suggested_reply="",
confidence=0.0,
)
except APIError as e:
raise HTTPException(status_code=502, detail=f"LLM API error: {e.message}")
@app.post("/api/generate-stream")
async def generate_stream(req: TicketRequest):
"""Streaming endpoint for long responses — e.g. detailed customer replies."""
prompt = TICKET_CLASSIFICATION_PROMPT.format(
ticket_text=req.ticket_text,
customer_history="\n".join(req.customer_history[-5:]) or "No history"
)
async def event_stream():
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
yield f"data: {json.dumps({'text': text})}\n\n"
yield "data: [DONE]\n\n"
except APIError:
yield f"data: {json.dumps({'error': 'LLM not reachable'})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
Webhook-Based Async Processing
Not every AI task needs to run synchronously. For anything that takes longer than about 2 seconds, asynchronous processing is better. The user doesn't wait — they get the result as soon as it's ready:
// api/webhooks/ticket-created.ts (Next.js API Route)
import { NextRequest, NextResponse } from "next/server";
import { Queue } from "bullmq";
import { createHmac } from "crypto";
const aiQueue = new Queue("ai-processing", {
connection: { host: "localhost", port: 6379 },
});
// Webhook endpoint: called by the existing system
export async function POST(req: NextRequest) {
// Verify webhook signature
const signature = req.headers.get("x-webhook-signature");
const body = await req.text();
const expected = createHmac("sha256", process.env.WEBHOOK_SECRET!)
.update(body)
.digest("hex");
if (signature !== expected) {
return NextResponse.json({ error: "Invalid signature" }, { status: 401 });
}
const payload = JSON.parse(body);
// Add job to queue — processed asynchronously by worker
await aiQueue.add(
"classify-ticket",
{
ticketId: payload.ticket_id,
ticketText: payload.text,
customerId: payload.customer_id,
callbackUrl: payload.callback_url,
},
{
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
removeOnComplete: 1000,
removeOnFail: 5000,
}
);
return NextResponse.json({ status: "queued" }, { status: 202 });
}
// --- Worker (separate process) ---
// workers/ai-worker.ts
import { Worker } from "bullmq";
const worker = new Worker(
"ai-processing",
async (job) => {
const { ticketId, ticketText, customerId, callbackUrl } = job.data;
// Call AI service
const classification = await fetch(
`${process.env.AI_SERVICE_URL}/api/classify-ticket`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
ticket_text: ticketText,
customer_id: customerId,
}),
}
);
const result = await classification.json();
// Send result back to existing system
await fetch(callbackUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.SYSTEM_API_KEY}`,
},
body: JSON.stringify({
ticket_id: ticketId,
classification: result,
processed_at: new Date().toISOString(),
}),
});
},
{ connection: { host: "localhost", port: 6379 }, concurrency: 10 }
);
worker.on("failed", (job, err) => {
console.error(`Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, err);
});
Caching Layer for LLM Responses
LLM calls are expensive and slow. Intelligent caching saves on both. We use a two-tier cache — exact matches and semantic matches:
// lib/llm-cache.ts
import Redis from "ioredis";
import { createHash } from "crypto";
import { createEmbedding, semanticSearch } from "./rag-pipeline";
const redis = new Redis(process.env.REDIS_URL!);
interface CacheOptions {
ttl?: number; // TTL in seconds (default: 1 hour)
semanticMatch?: boolean; // Also match similar queries?
similarityThreshold?: number;
}
export async function cachedLlmCall<T>(
prompt: string,
llmCall: () => Promise<T>,
options: CacheOptions = {}
): Promise<T & { _cache: "hit" | "semantic_hit" | "miss" }> {
const { ttl = 3600, semanticMatch = true, similarityThreshold = 0.95 } = options;
// 1. Exact cache hit
const exactKey = `llm:exact:${createHash("sha256").update(prompt).digest("hex").slice(0, 24)}`;
const exactHit = await redis.get(exactKey);
if (exactHit) {
return { ...JSON.parse(exactHit), _cache: "hit" as const };
}
// 2. Semantic cache hit (optional)
if (semanticMatch) {
const similar = await semanticSearch(prompt, {
topK: 1,
sourceType: "llm_cache",
similarityThreshold,
});
if (similar.length > 0) {
const semanticKey = `llm:result:${similar[0].sourceId}`;
const semanticHit = await redis.get(semanticKey);
if (semanticHit) {
return { ...JSON.parse(semanticHit), _cache: "semantic_hit" as const };
}
}
}
// 3. Cache miss — call LLM
const result = await llmCall();
// Cache the result
await redis.set(exactKey, JSON.stringify(result), "EX", ttl);
return { ...result, _cache: "miss" as const };
}
// --- Usage ---
const classification = await cachedLlmCall(
`Classify: ${ticketText}`,
() => classifyTicket(ticketText),
{ ttl: 7200, semanticMatch: true, similarityThreshold: 0.93 }
);
if (classification._cache !== "miss") {
console.log(`Cache ${classification._cache} — no LLM call needed`);
}
Performance Numbers from Production
Here are real measurements from one of our projects — a B2B SaaS with roughly 50,000 support tickets per month:
Latency Comparison
| Method | p50 Latency | p99 Latency | Cost per Call | |---|---|---|---| | Direct (Claude API, synchronous) | 1,800ms | 4,200ms | ~$0.008 | | With Redis Cache (hit) | 3ms | 12ms | $0.000 | | With Streaming (time to first token) | 280ms | 650ms | ~$0.008 | | Async + Queue (from user perspective) | 0ms* | 0ms* | ~$0.008 |
*User doesn't wait — result arrives asynchronously.
Token Optimization
Through targeted prompt engineering we were able to drastically reduce token usage:
| Metric | Before Optimization | After Optimization | Savings | |---|---|---|---| | Input Tokens / Ticket | ~2,400 | ~850 | -65% | | Output Tokens / Ticket | ~600 | ~280 | -53% | | Monthly API Costs (50k tickets) | ~$4,800 | ~$1,650 | -66% | | Cache Hit Rate | 0% | 34% | - | | Effective Cost After Cache | ~$4,800 | ~$1,090 | -77% |
The biggest levers: structured prompts instead of free text, few-shot examples removed and replaced with clear instructions, customer history limited to the last 5 tickets instead of sending everything.
Practical Example: Transforming the Support Ticket Workflow
Before: Manual Workflow
- Ticket comes in (email, form, chat)
- Support agent reads the ticket
- Agent manually decides: category, priority, assignment
- Agent writes a response from scratch
- Average handling time: 12 minutes per ticket
After: AI-Assisted Workflow
- Ticket comes in
- Webhook triggers automatic AI classification
- Agent sees: category, priority, summary, suggested response
- Agent reviews, adjusts if needed, sends
- Average handling time: 3 minutes per ticket
The integration into the existing system looks like this — a single API call is enough:
// In the existing ticket system: extend the new ticket handler
import type { Ticket, TicketClassification } from "@/types";
async function onTicketCreated(ticket: Ticket): Promise<void> {
// Existing logic stays completely intact
await saveToDatabase(ticket);
await notifyTeam(ticket);
// NEW: Trigger AI classification (async, doesn't block anything)
try {
const response = await fetch(`${process.env.AI_SERVICE_URL}/api/classify-ticket`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
ticket_text: ticket.subject + "\n\n" + ticket.body,
customer_id: ticket.customerId,
customer_history: ticket.customerHistory ?? [],
}),
signal: AbortSignal.timeout(10_000), // 10s timeout
});
if (response.ok) {
const classification: TicketClassification = await response.json();
// Only enrich, don't overwrite
await updateTicket(ticket.id, {
ai_category: classification.category,
ai_priority: classification.priority,
ai_summary: classification.summary,
ai_suggested_reply: classification.suggested_reply,
ai_confidence: classification.confidence,
ai_classified_at: new Date(),
});
}
} catch (error) {
// AI not reachable? No problem — ticket works as before
console.warn("AI classification failed, continuing without:", error);
}
}
The key point: the try/catch around the AI call. If the AI is unreachable or returns nonsense, the ticket system works exactly as before. AI is an enhancement, not a single point of failure.
Checklist: Is Your System Ready for AI?
Before you start, check these 8 points. A "no" on any point isn't a dealbreaker — but you should know what you're getting into:
1. Does your system have an API? Your existing system needs to be able to get data in and out. REST, GraphQL, webhooks — doesn't matter what, but there must be an interface. Without an API, you need to build one first.
2. How is the data quality? AI is only as good as the data you feed it. If your support tickets are one-liners or your product descriptions are empty, the AI won't deliver much. Check: are the relevant data fields populated, consistent, and up to date?
3. What's your latency budget? An LLM API call takes 1-4 seconds. If your user flow needs to stay under 200ms, you need async processing or caching. Where in the flow can the user wait, where not?
4. How many requests do you expect? 100 requests per day is trivial. 100,000 per day requires caching, queuing, and rate limit management. Calculate the API costs at full load.
5. GDPR and data privacy sorted? Sending personal data to external APIs is sensitive. Check: what data goes out? Can it be anonymized/pseudonymized? Do you need a data processing agreement (DPA) with the LLM provider? Is EU hosting sufficient?
6. Are there clear success criteria? "The AI should be better than before" isn't a criterion. Define measurable goals: handling time per ticket drops from 12 to 5 minutes. Classification accuracy is at least 85%. Cost per processed document under 0.02 EUR.
7. Who maintains the prompts? Prompts are code — they need versioning, testing, and iteration. Clarify upfront: who's responsible? How are prompt changes tested before going live? Do you have an evaluation framework?
8. Is there a manual fallback? What happens when the AI goes down? When the LLM provider has an outage? When the result is obviously wrong? Your system must work without AI exactly as before — AI features are an improvement, not a dependency.
The Key Lessons
After over a dozen AI integrations, three principles have crystallized:
Start with a single use case. Not three at once. Pick the use case with the clearest ROI — usually where the most manual, repetitive work happens today.
Build the fallback first. Before you build the happy path, implement the error case. What happens when the AI doesn't respond? When it returns nonsense? When it's too slow? Only when the fallback works, build the AI path.
Measure everything from day 1. Latency, token usage, cache hit rate, cost per request, user satisfaction. Without data you can't optimize and can't prove that the integration delivers value.
Next Step
If you have an existing application and want to add AI features — without a rewrite, without risk — let's talk. In a free discovery call we analyze your existing system, identify the best entry point, and show you what the integration looks like concretely.
No slide decks, no buzzword bingo. Just a technical conversation between engineers.