System Architecture Overview
This document provides a comprehensive technical overview of the Cimigo Collect Platform architecture, design decisions, and implementation patterns for developers working on the platform.
🏗️ High-Level Architecture
The Cimigo Collect Platform follows a modular, domain-driven architecture built for scalability, maintainability, and developer experience.
🎯 Design Principles
1. Domain-Driven Design (DDD)
The backend is organized around business domains rather than technical layers:
backend/
├── api/ # HTTP layer (FastAPI routes)
│ ├── v1/ # API version 1 endpoints
│ ├── public/ # Public endpoints (no auth)
│ └── internal/ # Internal service endpoints
├── core/ # Cross-cutting concerns
│ ├── config.py # Configuration management
│ ├── security.py # Authentication & authorization
│ └── exceptions.py # Custom exception classes
├── models/ # Database entities (SQLAlchemy)
│ ├── tenant.py # Multi-tenancy models
│ ├── survey.py # Survey-related models
│ └── response.py # Response data models
├── repositories/ # Data access layer
│ ├── base.py # Base repository patterns
│ ├── survey.py # Survey data operations
│ └── response.py # Response data operations
├── services/ # Business logic layer
│ ├── survey.py # Survey management logic
│ ├── response.py # Response processing logic
│ └── webhook.py # Webhook delivery logic
└── schemas/ # API contracts (Pydantic)
├── survey.py # Survey request/response schemas
└── response.py # Response data schemas
2. Separation of Concerns
Clear boundaries between different layers:
- API Layer: HTTP handling, validation, serialization
- Service Layer: Business logic, domain rules, orchestration
- Repository Layer: Data persistence abstraction
- Model Layer: Database entities and relationships
3. CQRS (Command Query Responsibility Segregation)
Separate read and write operations for optimal performance:
# Commands (Write Operations)
class SurveyService:
async def create_survey(self, survey_data: SurveyCreate) -> Survey:
# Business logic for creating surveys
pass
async def update_survey(self, survey_id: int, updates: SurveyUpdate) -> Survey:
# Business logic for updating surveys
pass
# Queries (Read Operations)
class SurveyQueryService:
async def get_survey(self, survey_id: int) -> Survey:
# Optimized read operations
pass
async def list_surveys(self, filters: SurveyFilters) -> List[Survey]:
# Optimized listing with caching
pass
🔧 Technology Stack
Backend Stack (FastAPI + Python)
# Core Technologies
FastAPI >= 0.104.0 # High-performance async web framework
Pydantic >= 2.0.0 # Data validation and settings management
SQLAlchemy >= 2.0.0 # ORM and database toolkit
Alembic >= 1.12.0 # Database migration management
asyncpg >= 0.28.0 # Async PostgreSQL driver
redis >= 5.0.0 # Redis client for caching/queues
celery >= 5.3.0 # Distributed task queue
Why FastAPI?
- ⚡ Performance: One of the fastest Python frameworks
- 🔒 Type Safety: Built-in Pydantic integration for validation
- 📚 Documentation: Automatic OpenAPI/Swagger generation
- 🔄 Async Support: Native async/await for high concurrency
- 📏 Standards: OpenAPI, JSON Schema, OAuth2 compliant
Frontend Stack (Next.js + React)
// Core Technologies
Next.js 14+ // React framework with App Router
React 18+ // UI library with concurrent features
TypeScript 5+ // Type safety and developer experience
Tailwind CSS 3+ // Utility-first CSS framework
React Hook Form 7+ // Performant form handling
Framer Motion 10+ // Animation and micro-interactions
React Query 4+ // Server state management
Why Next.js?
- 🔥 Performance: Server-side rendering and static generation
- 🎯 Developer Experience: Hot reload, error overlay, built-in optimization
- 🌐 SEO Friendly: Server-side rendering for search engines
- 📱 Mobile Optimized: Responsive design patterns
- 🏗️ Full-Stack: API routes for server-side logic
Database Stack (PostgreSQL)
-- Core Capabilities
PostgreSQL 15+ -- Primary database with advanced features
Row Level Security -- Multi-tenant data isolation
JSONB Support -- Flexible survey definition storage
Full-text Search -- Response content search
UUID Primary Keys -- Distributed-friendly identifiers
Partitioning -- Large table performance optimization
Why PostgreSQL?
- 🔒 ACID Compliance: Reliable transactions and data consistency
- 📊 JSONB: Flexible survey schema storage with indexing
- 🚀 Performance: Excellent query optimization and indexing
- 🔧 Extensions: Rich ecosystem (pgcrypto, pg_stat_statements)
- 🏢 Multi-tenancy: Built-in Row Level Security (RLS)
Cache & Queue Stack (Redis/KeyDB)
Use Cases:
├── Session Storage # JWT refresh tokens and user sessions
├── Response Caching # Frequently accessed survey data
├── Rate Limiting # API request throttling per tenant
├── Queue Management # Webhook delivery and email sending
├── Real-time Data # Survey response streaming
└── Distributed Locks # Preventing duplicate operations
📊 Database Design
Multi-Tenant Architecture
The platform uses shared database, shared schema with Row Level Security for data isolation:
-- Enable RLS on all tenant-scoped tables
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE surveys ENABLE ROW LEVEL SECURITY;
ALTER TABLE responses ENABLE ROW LEVEL SECURITY;
ALTER TABLE links ENABLE ROW LEVEL SECURITY;
-- Create tenant isolation policies
CREATE POLICY tenant_isolation ON surveys
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
CREATE POLICY tenant_isolation ON responses
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
Core Entity Relationships
Schema Design Patterns
1. Audit Trail Pattern
-- Every table includes audit fields
CREATE TABLE surveys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
-- Audit fields
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
created_by UUID REFERENCES users(id),
updated_by UUID REFERENCES users(id),
version INTEGER DEFAULT 1,
-- Soft delete
deleted_at TIMESTAMP,
deleted_by UUID REFERENCES users(id)
);
-- Automatic update trigger
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_surveys_updated_at
BEFORE UPDATE ON surveys
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
2. Flexible Schema Storage
-- JSONB for flexible survey definitions
CREATE TABLE surveys (
id UUID PRIMARY KEY,
definition JSONB NOT NULL,
schema_version TEXT NOT NULL DEFAULT '1.0',
-- JSON Schema validation
CONSTRAINT valid_definition
CHECK (jsonb_typeof(definition) = 'object'),
-- Schema version validation
CONSTRAINT valid_schema_version
CHECK (schema_version ~ '^[0-9]+\.[0-9]+$')
);
-- Optimized indexes for JSONB queries
CREATE INDEX idx_surveys_definition_gin ON surveys USING GIN (definition);
CREATE INDEX idx_surveys_schema_version ON surveys (schema_version);
CREATE INDEX idx_surveys_title ON surveys ((definition->>'title'));
3. Event Sourcing Pattern
-- Event log for audit and debugging
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
aggregate_type TEXT NOT NULL, -- 'survey', 'response', etc.
aggregate_id UUID NOT NULL,
event_type TEXT NOT NULL, -- 'created', 'updated', 'deleted'
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
created_by UUID REFERENCES users(id)
);
-- Indexes for event querying
CREATE INDEX idx_events_tenant_aggregate ON events (tenant_id, aggregate_type, aggregate_id);
CREATE INDEX idx_events_created_at ON events (created_at DESC);
🔄 Request Lifecycle
1. HTTP Request Flow
2. Survey Response Processing
🚀 Performance Architecture
Caching Strategy
Cache Implementation
from functools import wraps
import redis
import json
from typing import Optional, Any
class CacheService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.local_cache = {} # L1 cache
async def get(self, key: str) -> Optional[Any]:
# L1 cache check
if key in self.local_cache:
return self.local_cache[key]
# L2 cache check (Redis)
data = await self.redis.get(key)
if data:
decoded = json.loads(data)
self.local_cache[key] = decoded # Store in L1
return decoded
return None
async def set(self, key: str, value: Any, ttl: int = 300):
# Store in both caches
self.local_cache[key] = value
await self.redis.setex(key, ttl, json.dumps(value))
# Cache decorator
def cached(ttl: int = 300, key_prefix: str = ""):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try cache first
cached_result = await cache_service.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = await func(*args, **kwargs)
await cache_service.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Usage example
@cached(ttl=600, key_prefix="survey")
async def get_survey_definition(survey_id: str) -> dict:
return await survey_repository.get_by_id(survey_id)
Database Optimization
Query Performance
-- Composite indexes for multi-tenant queries
CREATE INDEX CONCURRENTLY idx_responses_tenant_survey_created
ON responses (tenant_id, survey_id, created_at DESC);
-- Partial indexes for common filters
CREATE INDEX CONCURRENTLY idx_responses_completed
ON responses (tenant_id, survey_id, completed_at)
WHERE status = 'completed';
-- Covering indexes for read-heavy queries
CREATE INDEX CONCURRENTLY idx_surveys_list_covering
ON surveys (tenant_id, created_at DESC)
INCLUDE (title, status, version);
-- Full-text search indexes
CREATE INDEX CONCURRENTLY idx_responses_data_gin
ON responses USING GIN ((data::text) gin_trgm_ops);
Connection Pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Production-optimized engine configuration
engine = create_async_engine(
database_url,
# Connection pool settings
pool_size=20, # Base connections
max_overflow=30, # Additional connections under load
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections every hour
# Query optimization
echo=False, # Disable SQL logging in production
future=True, # Use SQLAlchemy 2.0 API
# Connection string parameters
connect_args={
"server_settings": {
"application_name": "cimigo_collect_api",
"jit": "off", # Disable JIT for consistent performance
}
}
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
🔐 Security Architecture
Authentication & Authorization Flow
Multi-Tenant Security
from functools import wraps
from fastapi import Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
class TenantContext:
def __init__(self, tenant_id: str, user_id: str, permissions: List[str]):
self.tenant_id = tenant_id
self.user_id = user_id
self.permissions = permissions
async def get_current_tenant_context(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> TenantContext:
# Validate JWT token
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Extract tenant and user information
tenant_id = payload.get("tenant_id")
user_id = payload.get("sub")
if not tenant_id or not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Get user permissions for this tenant
permissions = await get_user_permissions(db, user_id, tenant_id)
# Set tenant context for RLS
await db.execute(text(f"SET app.current_tenant_id = '{tenant_id}'"))
return TenantContext(tenant_id, user_id, permissions)
def require_permission(permission: str):
def decorator(func):
@wraps(func)
async def wrapper(
context: TenantContext = Depends(get_current_tenant_context),
*args, **kwargs
):
if permission not in context.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
return await func(context=context, *args, **kwargs)
return wrapper
return decorator
# Usage in routes
@router.get("/surveys")
@require_permission("surveys:read")
async def list_surveys(
context: TenantContext = Depends(get_current_tenant_context),
db: AsyncSession = Depends(get_db)
):
# RLS automatically filters by tenant_id
surveys = await survey_service.list_surveys(db, context.tenant_id)
return surveys
Data Encryption
from cryptography.fernet import Fernet
import os
class EncryptionService:
def __init__(self):
# Use different keys for different data types
self.pii_key = Fernet(os.environ["PII_ENCRYPTION_KEY"])
self.response_key = Fernet(os.environ["RESPONSE_ENCRYPTION_KEY"])
def encrypt_pii(self, data: str) -> str:
"""Encrypt personally identifiable information"""
return self.pii_key.encrypt(data.encode()).decode()
def decrypt_pii(self, encrypted_data: str) -> str:
"""Decrypt personally identifiable information"""
return self.pii_key.decrypt(encrypted_data.encode()).decode()
def encrypt_response_data(self, data: dict) -> str:
"""Encrypt sensitive response data"""
json_data = json.dumps(data)
return self.response_key.encrypt(json_data.encode()).decode()
def decrypt_response_data(self, encrypted_data: str) -> dict:
"""Decrypt sensitive response data"""
decrypted = self.response_key.decrypt(encrypted_data.encode()).decode()
return json.loads(decrypted)
# Usage in models
class Response(Base):
__tablename__ = "responses"
id = Column(UUID, primary_key=True)
tenant_id = Column(UUID, nullable=False)
survey_id = Column(UUID, nullable=False)
# Encrypted response data
_encrypted_data = Column("data", Text)
@property
def data(self) -> dict:
if self._encrypted_data:
return encryption_service.decrypt_response_data(self._encrypted_data)
return {}
@data.setter
def data(self, value: dict):
self._encrypted_data = encryption_service.encrypt_response_data(value)
📡 Event Architecture
Webhook Delivery System
Webhook Implementation
from celery import Celery
import httpx
import json
from datetime import datetime, timedelta
# Celery configuration
celery_app = Celery(
"webhook_delivery",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task(
bind=True,
autoretry_for=(httpx.RequestError, httpx.HTTPStatusError),
retry_backoff=True,
retry_backoff_max=600, # 10 minutes max delay
max_retries=5
)
async def deliver_webhook(self, webhook_delivery_id: str):
"""Deliver webhook with exponential backoff retry"""
# Get webhook delivery details
delivery = await get_webhook_delivery(webhook_delivery_id)
if not delivery:
logger.error(f"Webhook delivery {webhook_delivery_id} not found")
return
try:
# Prepare webhook payload
payload = {
"event": delivery.event_type,
"timestamp": delivery.created_at.isoformat(),
"api_version": "v1",
"data": delivery.payload,
"delivery_id": delivery.id
}
# Sign the payload
signature = create_webhook_signature(
payload,
delivery.webhook.secret
)
# Deliver webhook
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
delivery.webhook.url,
json=payload,
headers={
"Content-Type": "application/json",
"X-Cimigo-Signature": signature,
"X-Cimigo-Delivery": delivery.id,
"User-Agent": "Cimigo-Collect-Webhook/1.0"
}
)
response.raise_for_status()
# Log successful delivery
await update_delivery_status(
webhook_delivery_id,
"delivered",
response.status_code,
response.headers.get("content-type", ""),
response.text[:1000] # First 1000 chars of response
)
logger.info(f"Webhook {webhook_delivery_id} delivered successfully")
except Exception as exc:
logger.error(f"Webhook delivery failed: {exc}")
# Update delivery status
await update_delivery_status(
webhook_delivery_id,
"failed",
getattr(exc.response, 'status_code', None) if hasattr(exc, 'response') else None,
str(exc)
)
# Re-raise for Celery retry logic
raise
def create_webhook_signature(payload: dict, secret: str) -> str:
"""Create HMAC signature for webhook payload"""
import hmac
import hashlib
payload_bytes = json.dumps(payload, sort_keys=True).encode()
signature = hmac.new(
secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
return f"sha256={signature}"
# Event dispatcher
class EventDispatcher:
def __init__(self, db: AsyncSession):
self.db = db
async def dispatch_event(
self,
tenant_id: str,
event_type: str,
payload: dict
):
"""Dispatch event to all registered webhooks"""
# Get active webhooks for this tenant and event type
webhooks = await self.get_active_webhooks(tenant_id, event_type)
for webhook in webhooks:
# Create webhook delivery record
delivery = WebhookDelivery(
webhook_id=webhook.id,
event_type=event_type,
payload=payload,
status="pending"
)
self.db.add(delivery)
await self.db.commit()
# Queue for asynchronous delivery
deliver_webhook.delay(str(delivery.id))
🔍 Monitoring & Observability
Application Metrics
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
# Custom metrics
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code', 'tenant_id']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'tenant_id']
)
SURVEY_RESPONSES = Counter(
'survey_responses_total',
'Total survey responses submitted',
['tenant_id', 'survey_id', 'status']
)
ACTIVE_SURVEYS = Gauge(
'active_surveys_count',
'Number of active surveys',
['tenant_id']
)
DATABASE_POOL = Gauge(
'database_connection_pool_size',
'Database connection pool metrics',
['pool_type']
)
# Middleware for automatic metrics collection
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
# Extract tenant from JWT token
tenant_id = extract_tenant_from_request(request)
response = await call_next(request)
# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status_code=response.status_code,
tenant_id=tenant_id or "unknown"
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path,
tenant_id=tenant_id or "unknown"
).observe(time.time() - start_time)
return response
# Metrics endpoint
@app.get("/metrics")
async def get_metrics():
return Response(
generate_latest(),
media_type="text/plain"
)
Structured Logging
import structlog
import logging
from datetime import datetime
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Usage in services
class SurveyService:
async def create_survey(self, survey_data: SurveyCreate, context: TenantContext):
await logger.info(
"Survey creation started",
tenant_id=context.tenant_id,
user_id=context.user_id,
survey_title=survey_data.survey.title,
schema_version=survey_data.schema_version
)
try:
survey = await self._create_survey_internal(survey_data, context)
await logger.info(
"Survey created successfully",
tenant_id=context.tenant_id,
survey_id=str(survey.id),
survey_title=survey.title,
question_count=len(survey.definition["survey"]["pages"][0]["questions"])
)
return survey
except Exception as exc:
await logger.error(
"Survey creation failed",
tenant_id=context.tenant_id,
user_id=context.user_id,
error=str(exc),
error_type=type(exc).__name__
)
raise
🚀 Scalability Considerations
Horizontal Scaling
# Docker Compose scaling example
version: '3.8'
services:
api:
image: cimigo/collect-api:latest
deploy:
replicas: 3
resources:
limits:
cpus: '0.5'
memory: 512M
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/db
- REDIS_URL=redis://redis:6379/0
renderer:
image: cimigo/collect-renderer:latest
deploy:
replicas: 2
resources:
limits:
cpus: '0.3'
memory: 256M
postgres:
image: postgres:15
deploy:
replicas: 1
resources:
limits:
cpus: '1.0'
memory: 2G
redis:
image: redis:7-alpine
deploy:
replicas: 1
resources:
limits:
cpus: '0.2'
memory: 256M
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
depends_on:
- api
- renderer
Performance Optimization
# Database query optimization
class OptimizedSurveyRepository:
async def get_survey_with_responses(
self,
survey_id: str,
tenant_id: str,
limit: int = 100,
offset: int = 0
):
"""Optimized query with proper indexing and pagination"""
query = select(Survey).options(
# Eager load related data to avoid N+1 queries
selectinload(Survey.responses.and_(
Response.status == 'completed'
)).options(
selectinload(Response.response_data)
),
selectinload(Survey.links)
).where(
and_(
Survey.id == survey_id,
Survey.tenant_id == tenant_id,
Survey.deleted_at.is_(None)
)
)
# Add pagination
query = query.offset(offset).limit(limit)
result = await self.db.execute(query)
return result.unique().scalar_one_or_none()
# Response caching for expensive operations
@cached(ttl=3600, key_prefix="survey_analytics")
async def get_survey_analytics(survey_id: str, tenant_id: str) -> dict:
"""Cached survey analytics calculation"""
# This is an expensive operation that doesn't need real-time data
query = text("""
SELECT
COUNT(*) as total_responses,
COUNT(*) FILTER (WHERE status = 'completed') as completed_responses,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_completion_time,
DATE_TRUNC('day', completed_at) as completion_date,
COUNT(*) as daily_responses
FROM responses
WHERE survey_id = :survey_id AND tenant_id = :tenant_id
GROUP BY DATE_TRUNC('day', completed_at)
ORDER BY completion_date DESC
LIMIT 30
""")
result = await db.execute(query, {
"survey_id": survey_id,
"tenant_id": tenant_id
})
return {
"analytics": result.fetchall(),
"generated_at": datetime.utcnow().isoformat()
}
📚 Next Steps
This architecture overview provides the foundation for understanding the Cimigo Collect Platform. For deeper dives into specific areas:
- Database Design - Detailed schema and optimization patterns (coming soon)
- Multi-tenancy - Complete tenant isolation strategies (coming soon)
- Security Architecture - Comprehensive security implementation (coming soon)
- Performance Optimization - Advanced caching and scaling patterns (coming soon)
- API Development - Building new endpoints and services (coming soon)
The architecture is designed to be scalable, secure, and maintainable while providing excellent developer experience and platform reliability.