Skip to main content

System Architecture Overview

This document provides a comprehensive technical overview of the Cimigo Collect Platform architecture, design decisions, and implementation patterns for developers working on the platform.

🏗️ High-Level Architecture

The Cimigo Collect Platform follows a modular, domain-driven architecture built for scalability, maintainability, and developer experience.

🎯 Design Principles

1. Domain-Driven Design (DDD)

The backend is organized around business domains rather than technical layers:

backend/
├── api/ # HTTP layer (FastAPI routes)
│ ├── v1/ # API version 1 endpoints
│ ├── public/ # Public endpoints (no auth)
│ └── internal/ # Internal service endpoints
├── core/ # Cross-cutting concerns
│ ├── config.py # Configuration management
│ ├── security.py # Authentication & authorization
│ └── exceptions.py # Custom exception classes
├── models/ # Database entities (SQLAlchemy)
│ ├── tenant.py # Multi-tenancy models
│ ├── survey.py # Survey-related models
│ └── response.py # Response data models
├── repositories/ # Data access layer
│ ├── base.py # Base repository patterns
│ ├── survey.py # Survey data operations
│ └── response.py # Response data operations
├── services/ # Business logic layer
│ ├── survey.py # Survey management logic
│ ├── response.py # Response processing logic
│ └── webhook.py # Webhook delivery logic
└── schemas/ # API contracts (Pydantic)
├── survey.py # Survey request/response schemas
└── response.py # Response data schemas

2. Separation of Concerns

Clear boundaries between different layers:

  • API Layer: HTTP handling, validation, serialization
  • Service Layer: Business logic, domain rules, orchestration
  • Repository Layer: Data persistence abstraction
  • Model Layer: Database entities and relationships

3. CQRS (Command Query Responsibility Segregation)

Separate read and write operations for optimal performance:

# Commands (Write Operations)
class SurveyService:
async def create_survey(self, survey_data: SurveyCreate) -> Survey:
# Business logic for creating surveys
pass

async def update_survey(self, survey_id: int, updates: SurveyUpdate) -> Survey:
# Business logic for updating surveys
pass

# Queries (Read Operations)
class SurveyQueryService:
async def get_survey(self, survey_id: int) -> Survey:
# Optimized read operations
pass

async def list_surveys(self, filters: SurveyFilters) -> List[Survey]:
# Optimized listing with caching
pass

🔧 Technology Stack

Backend Stack (FastAPI + Python)

# Core Technologies
FastAPI >= 0.104.0 # High-performance async web framework
Pydantic >= 2.0.0 # Data validation and settings management
SQLAlchemy >= 2.0.0 # ORM and database toolkit
Alembic >= 1.12.0 # Database migration management
asyncpg >= 0.28.0 # Async PostgreSQL driver
redis >= 5.0.0 # Redis client for caching/queues
celery >= 5.3.0 # Distributed task queue

Why FastAPI?

  • Performance: One of the fastest Python frameworks
  • 🔒 Type Safety: Built-in Pydantic integration for validation
  • 📚 Documentation: Automatic OpenAPI/Swagger generation
  • 🔄 Async Support: Native async/await for high concurrency
  • 📏 Standards: OpenAPI, JSON Schema, OAuth2 compliant

Frontend Stack (Next.js + React)

// Core Technologies
Next.js 14+ // React framework with App Router
React 18+ // UI library with concurrent features
TypeScript 5+ // Type safety and developer experience
Tailwind CSS 3+ // Utility-first CSS framework
React Hook Form 7+ // Performant form handling
Framer Motion 10+ // Animation and micro-interactions
React Query 4+ // Server state management

Why Next.js?

  • 🔥 Performance: Server-side rendering and static generation
  • 🎯 Developer Experience: Hot reload, error overlay, built-in optimization
  • 🌐 SEO Friendly: Server-side rendering for search engines
  • 📱 Mobile Optimized: Responsive design patterns
  • 🏗️ Full-Stack: API routes for server-side logic

Database Stack (PostgreSQL)

-- Core Capabilities
PostgreSQL 15+ -- Primary database with advanced features
Row Level Security -- Multi-tenant data isolation
JSONB Support -- Flexible survey definition storage
Full-text Search -- Response content search
UUID Primary Keys -- Distributed-friendly identifiers
Partitioning -- Large table performance optimization

Why PostgreSQL?

  • 🔒 ACID Compliance: Reliable transactions and data consistency
  • 📊 JSONB: Flexible survey schema storage with indexing
  • 🚀 Performance: Excellent query optimization and indexing
  • 🔧 Extensions: Rich ecosystem (pgcrypto, pg_stat_statements)
  • 🏢 Multi-tenancy: Built-in Row Level Security (RLS)

Cache & Queue Stack (Redis/KeyDB)

Use Cases:
├── Session Storage # JWT refresh tokens and user sessions
├── Response Caching # Frequently accessed survey data
├── Rate Limiting # API request throttling per tenant
├── Queue Management # Webhook delivery and email sending
├── Real-time Data # Survey response streaming
└── Distributed Locks # Preventing duplicate operations

📊 Database Design

Multi-Tenant Architecture

The platform uses shared database, shared schema with Row Level Security for data isolation:

-- Enable RLS on all tenant-scoped tables
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE surveys ENABLE ROW LEVEL SECURITY;
ALTER TABLE responses ENABLE ROW LEVEL SECURITY;
ALTER TABLE links ENABLE ROW LEVEL SECURITY;

-- Create tenant isolation policies
CREATE POLICY tenant_isolation ON surveys
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

CREATE POLICY tenant_isolation ON responses
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

Core Entity Relationships

Schema Design Patterns

1. Audit Trail Pattern

-- Every table includes audit fields
CREATE TABLE surveys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,

-- Audit fields
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
created_by UUID REFERENCES users(id),
updated_by UUID REFERENCES users(id),
version INTEGER DEFAULT 1,

-- Soft delete
deleted_at TIMESTAMP,
deleted_by UUID REFERENCES users(id)
);

-- Automatic update trigger
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_surveys_updated_at
BEFORE UPDATE ON surveys
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

2. Flexible Schema Storage

-- JSONB for flexible survey definitions
CREATE TABLE surveys (
id UUID PRIMARY KEY,
definition JSONB NOT NULL,
schema_version TEXT NOT NULL DEFAULT '1.0',

-- JSON Schema validation
CONSTRAINT valid_definition
CHECK (jsonb_typeof(definition) = 'object'),

-- Schema version validation
CONSTRAINT valid_schema_version
CHECK (schema_version ~ '^[0-9]+\.[0-9]+$')
);

-- Optimized indexes for JSONB queries
CREATE INDEX idx_surveys_definition_gin ON surveys USING GIN (definition);
CREATE INDEX idx_surveys_schema_version ON surveys (schema_version);
CREATE INDEX idx_surveys_title ON surveys ((definition->>'title'));

3. Event Sourcing Pattern

-- Event log for audit and debugging
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
aggregate_type TEXT NOT NULL, -- 'survey', 'response', etc.
aggregate_id UUID NOT NULL,
event_type TEXT NOT NULL, -- 'created', 'updated', 'deleted'
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
created_by UUID REFERENCES users(id)
);

-- Indexes for event querying
CREATE INDEX idx_events_tenant_aggregate ON events (tenant_id, aggregate_type, aggregate_id);
CREATE INDEX idx_events_created_at ON events (created_at DESC);

🔄 Request Lifecycle

1. HTTP Request Flow

2. Survey Response Processing

🚀 Performance Architecture

Caching Strategy

Cache Implementation

from functools import wraps
import redis
import json
from typing import Optional, Any

class CacheService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.local_cache = {} # L1 cache

async def get(self, key: str) -> Optional[Any]:
# L1 cache check
if key in self.local_cache:
return self.local_cache[key]

# L2 cache check (Redis)
data = await self.redis.get(key)
if data:
decoded = json.loads(data)
self.local_cache[key] = decoded # Store in L1
return decoded

return None

async def set(self, key: str, value: Any, ttl: int = 300):
# Store in both caches
self.local_cache[key] = value
await self.redis.setex(key, ttl, json.dumps(value))

# Cache decorator
def cached(ttl: int = 300, key_prefix: str = ""):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"

# Try cache first
cached_result = await cache_service.get(cache_key)
if cached_result is not None:
return cached_result

# Execute function and cache result
result = await func(*args, **kwargs)
await cache_service.set(cache_key, result, ttl)
return result
return wrapper
return decorator

# Usage example
@cached(ttl=600, key_prefix="survey")
async def get_survey_definition(survey_id: str) -> dict:
return await survey_repository.get_by_id(survey_id)

Database Optimization

Query Performance

-- Composite indexes for multi-tenant queries
CREATE INDEX CONCURRENTLY idx_responses_tenant_survey_created
ON responses (tenant_id, survey_id, created_at DESC);

-- Partial indexes for common filters
CREATE INDEX CONCURRENTLY idx_responses_completed
ON responses (tenant_id, survey_id, completed_at)
WHERE status = 'completed';

-- Covering indexes for read-heavy queries
CREATE INDEX CONCURRENTLY idx_surveys_list_covering
ON surveys (tenant_id, created_at DESC)
INCLUDE (title, status, version);

-- Full-text search indexes
CREATE INDEX CONCURRENTLY idx_responses_data_gin
ON responses USING GIN ((data::text) gin_trgm_ops);

Connection Pooling

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# Production-optimized engine configuration
engine = create_async_engine(
database_url,
# Connection pool settings
pool_size=20, # Base connections
max_overflow=30, # Additional connections under load
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections every hour

# Query optimization
echo=False, # Disable SQL logging in production
future=True, # Use SQLAlchemy 2.0 API

# Connection string parameters
connect_args={
"server_settings": {
"application_name": "cimigo_collect_api",
"jit": "off", # Disable JIT for consistent performance
}
}
)

AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)

🔐 Security Architecture

Authentication & Authorization Flow

Multi-Tenant Security

from functools import wraps
from fastapi import Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

class TenantContext:
def __init__(self, tenant_id: str, user_id: str, permissions: List[str]):
self.tenant_id = tenant_id
self.user_id = user_id
self.permissions = permissions

async def get_current_tenant_context(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> TenantContext:
# Validate JWT token
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

# Extract tenant and user information
tenant_id = payload.get("tenant_id")
user_id = payload.get("sub")

if not tenant_id or not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)

# Get user permissions for this tenant
permissions = await get_user_permissions(db, user_id, tenant_id)

# Set tenant context for RLS
await db.execute(text(f"SET app.current_tenant_id = '{tenant_id}'"))

return TenantContext(tenant_id, user_id, permissions)

def require_permission(permission: str):
def decorator(func):
@wraps(func)
async def wrapper(
context: TenantContext = Depends(get_current_tenant_context),
*args, **kwargs
):
if permission not in context.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
return await func(context=context, *args, **kwargs)
return wrapper
return decorator

# Usage in routes
@router.get("/surveys")
@require_permission("surveys:read")
async def list_surveys(
context: TenantContext = Depends(get_current_tenant_context),
db: AsyncSession = Depends(get_db)
):
# RLS automatically filters by tenant_id
surveys = await survey_service.list_surveys(db, context.tenant_id)
return surveys

Data Encryption

from cryptography.fernet import Fernet
import os

class EncryptionService:
def __init__(self):
# Use different keys for different data types
self.pii_key = Fernet(os.environ["PII_ENCRYPTION_KEY"])
self.response_key = Fernet(os.environ["RESPONSE_ENCRYPTION_KEY"])

def encrypt_pii(self, data: str) -> str:
"""Encrypt personally identifiable information"""
return self.pii_key.encrypt(data.encode()).decode()

def decrypt_pii(self, encrypted_data: str) -> str:
"""Decrypt personally identifiable information"""
return self.pii_key.decrypt(encrypted_data.encode()).decode()

def encrypt_response_data(self, data: dict) -> str:
"""Encrypt sensitive response data"""
json_data = json.dumps(data)
return self.response_key.encrypt(json_data.encode()).decode()

def decrypt_response_data(self, encrypted_data: str) -> dict:
"""Decrypt sensitive response data"""
decrypted = self.response_key.decrypt(encrypted_data.encode()).decode()
return json.loads(decrypted)

# Usage in models
class Response(Base):
__tablename__ = "responses"

id = Column(UUID, primary_key=True)
tenant_id = Column(UUID, nullable=False)
survey_id = Column(UUID, nullable=False)

# Encrypted response data
_encrypted_data = Column("data", Text)

@property
def data(self) -> dict:
if self._encrypted_data:
return encryption_service.decrypt_response_data(self._encrypted_data)
return {}

@data.setter
def data(self, value: dict):
self._encrypted_data = encryption_service.encrypt_response_data(value)

📡 Event Architecture

Webhook Delivery System

Webhook Implementation

from celery import Celery
import httpx
import json
from datetime import datetime, timedelta

# Celery configuration
celery_app = Celery(
"webhook_delivery",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

@celery_app.task(
bind=True,
autoretry_for=(httpx.RequestError, httpx.HTTPStatusError),
retry_backoff=True,
retry_backoff_max=600, # 10 minutes max delay
max_retries=5
)
async def deliver_webhook(self, webhook_delivery_id: str):
"""Deliver webhook with exponential backoff retry"""

# Get webhook delivery details
delivery = await get_webhook_delivery(webhook_delivery_id)

if not delivery:
logger.error(f"Webhook delivery {webhook_delivery_id} not found")
return

try:
# Prepare webhook payload
payload = {
"event": delivery.event_type,
"timestamp": delivery.created_at.isoformat(),
"api_version": "v1",
"data": delivery.payload,
"delivery_id": delivery.id
}

# Sign the payload
signature = create_webhook_signature(
payload,
delivery.webhook.secret
)

# Deliver webhook
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
delivery.webhook.url,
json=payload,
headers={
"Content-Type": "application/json",
"X-Cimigo-Signature": signature,
"X-Cimigo-Delivery": delivery.id,
"User-Agent": "Cimigo-Collect-Webhook/1.0"
}
)

response.raise_for_status()

# Log successful delivery
await update_delivery_status(
webhook_delivery_id,
"delivered",
response.status_code,
response.headers.get("content-type", ""),
response.text[:1000] # First 1000 chars of response
)

logger.info(f"Webhook {webhook_delivery_id} delivered successfully")

except Exception as exc:
logger.error(f"Webhook delivery failed: {exc}")

# Update delivery status
await update_delivery_status(
webhook_delivery_id,
"failed",
getattr(exc.response, 'status_code', None) if hasattr(exc, 'response') else None,
str(exc)
)

# Re-raise for Celery retry logic
raise

def create_webhook_signature(payload: dict, secret: str) -> str:
"""Create HMAC signature for webhook payload"""
import hmac
import hashlib

payload_bytes = json.dumps(payload, sort_keys=True).encode()
signature = hmac.new(
secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()

return f"sha256={signature}"

# Event dispatcher
class EventDispatcher:
def __init__(self, db: AsyncSession):
self.db = db

async def dispatch_event(
self,
tenant_id: str,
event_type: str,
payload: dict
):
"""Dispatch event to all registered webhooks"""

# Get active webhooks for this tenant and event type
webhooks = await self.get_active_webhooks(tenant_id, event_type)

for webhook in webhooks:
# Create webhook delivery record
delivery = WebhookDelivery(
webhook_id=webhook.id,
event_type=event_type,
payload=payload,
status="pending"
)

self.db.add(delivery)
await self.db.commit()

# Queue for asynchronous delivery
deliver_webhook.delay(str(delivery.id))

🔍 Monitoring & Observability

Application Metrics

from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time

# Custom metrics
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code', 'tenant_id']
)

REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'tenant_id']
)

SURVEY_RESPONSES = Counter(
'survey_responses_total',
'Total survey responses submitted',
['tenant_id', 'survey_id', 'status']
)

ACTIVE_SURVEYS = Gauge(
'active_surveys_count',
'Number of active surveys',
['tenant_id']
)

DATABASE_POOL = Gauge(
'database_connection_pool_size',
'Database connection pool metrics',
['pool_type']
)

# Middleware for automatic metrics collection
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()

# Extract tenant from JWT token
tenant_id = extract_tenant_from_request(request)

response = await call_next(request)

# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status_code=response.status_code,
tenant_id=tenant_id or "unknown"
).inc()

REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path,
tenant_id=tenant_id or "unknown"
).observe(time.time() - start_time)

return response

# Metrics endpoint
@app.get("/metrics")
async def get_metrics():
return Response(
generate_latest(),
media_type="text/plain"
)

Structured Logging

import structlog
import logging
from datetime import datetime

# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Usage in services
class SurveyService:
async def create_survey(self, survey_data: SurveyCreate, context: TenantContext):
await logger.info(
"Survey creation started",
tenant_id=context.tenant_id,
user_id=context.user_id,
survey_title=survey_data.survey.title,
schema_version=survey_data.schema_version
)

try:
survey = await self._create_survey_internal(survey_data, context)

await logger.info(
"Survey created successfully",
tenant_id=context.tenant_id,
survey_id=str(survey.id),
survey_title=survey.title,
question_count=len(survey.definition["survey"]["pages"][0]["questions"])
)

return survey

except Exception as exc:
await logger.error(
"Survey creation failed",
tenant_id=context.tenant_id,
user_id=context.user_id,
error=str(exc),
error_type=type(exc).__name__
)
raise

🚀 Scalability Considerations

Horizontal Scaling

# Docker Compose scaling example
version: '3.8'

services:
api:
image: cimigo/collect-api:latest
deploy:
replicas: 3
resources:
limits:
cpus: '0.5'
memory: 512M
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/db
- REDIS_URL=redis://redis:6379/0

renderer:
image: cimigo/collect-renderer:latest
deploy:
replicas: 2
resources:
limits:
cpus: '0.3'
memory: 256M

postgres:
image: postgres:15
deploy:
replicas: 1
resources:
limits:
cpus: '1.0'
memory: 2G

redis:
image: redis:7-alpine
deploy:
replicas: 1
resources:
limits:
cpus: '0.2'
memory: 256M

nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
depends_on:
- api
- renderer

Performance Optimization

# Database query optimization
class OptimizedSurveyRepository:
async def get_survey_with_responses(
self,
survey_id: str,
tenant_id: str,
limit: int = 100,
offset: int = 0
):
"""Optimized query with proper indexing and pagination"""

query = select(Survey).options(
# Eager load related data to avoid N+1 queries
selectinload(Survey.responses.and_(
Response.status == 'completed'
)).options(
selectinload(Response.response_data)
),
selectinload(Survey.links)
).where(
and_(
Survey.id == survey_id,
Survey.tenant_id == tenant_id,
Survey.deleted_at.is_(None)
)
)

# Add pagination
query = query.offset(offset).limit(limit)

result = await self.db.execute(query)
return result.unique().scalar_one_or_none()

# Response caching for expensive operations
@cached(ttl=3600, key_prefix="survey_analytics")
async def get_survey_analytics(survey_id: str, tenant_id: str) -> dict:
"""Cached survey analytics calculation"""

# This is an expensive operation that doesn't need real-time data
query = text("""
SELECT
COUNT(*) as total_responses,
COUNT(*) FILTER (WHERE status = 'completed') as completed_responses,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_completion_time,
DATE_TRUNC('day', completed_at) as completion_date,
COUNT(*) as daily_responses
FROM responses
WHERE survey_id = :survey_id AND tenant_id = :tenant_id
GROUP BY DATE_TRUNC('day', completed_at)
ORDER BY completion_date DESC
LIMIT 30
""")

result = await db.execute(query, {
"survey_id": survey_id,
"tenant_id": tenant_id
})

return {
"analytics": result.fetchall(),
"generated_at": datetime.utcnow().isoformat()
}

📚 Next Steps

This architecture overview provides the foundation for understanding the Cimigo Collect Platform. For deeper dives into specific areas:

  • Database Design - Detailed schema and optimization patterns (coming soon)
  • Multi-tenancy - Complete tenant isolation strategies (coming soon)
  • Security Architecture - Comprehensive security implementation (coming soon)
  • Performance Optimization - Advanced caching and scaling patterns (coming soon)
  • API Development - Building new endpoints and services (coming soon)

The architecture is designed to be scalable, secure, and maintainable while providing excellent developer experience and platform reliability.