Backend Architecture
Overview
The VerticalFarm OS backend is a sophisticated FastAPI application built with Python 3.13, designed for high-performance asynchronous operations, comprehensive business logic processing, and seamless integration with IoT devices and external services.
Technology Stack
Core Technologies
- FastAPI - Modern, fast web framework for building APIs
- Python 3.13 - Latest Python with performance improvements
- Pydantic - Data validation using Python type annotations
- SQLAlchemy - SQL toolkit and ORM (when needed)
- Supabase - Backend database and authentication
- JWT - JSON Web Tokens for authentication
Supporting Technologies
- Uvicorn - ASGI server
- Docker - Containerization
- Datadog - Application performance monitoring
- pytest - Testing framework
- Alembic - Database migration tool
Application Structure
Directory Organization
backend/
├── app/
│ ├── api/
│ │ └── v1/
│ │ ├── endpoints/ # API endpoint modules
│ │ │ ├── farms.py # Farm management endpoints
│ │ │ ├── devices.py # Device control endpoints
│ │ │ ├── sensors.py # Sensor data endpoints
│ │ │ ├── automation.py # Automation rule endpoints
│ │ │ ├── users.py # User management endpoints
│ │ │ └── auth.py # Authentication endpoints
│ │ └── api.py # API router configuration
│ ├── core/ # Core functionality
│ │ ├── config.py # Configuration management
│ │ ├── security.py # Security utilities
│ │ ├── dependencies.py # Dependency injection
│ │ └── exceptions.py # Custom exceptions
│ ├── crud/ # Database operations
│ │ ├── base.py # Base CRUD operations
│ │ ├── farm.py # Farm-specific CRUD
│ │ ├── device.py # Device-specific CRUD
│ │ └── user.py # User-specific CRUD
│ ├── models/ # Database models
│ │ ├── __init__.py
│ │ ├── farm.py # Farm models
│ │ ├── device.py # Device models
│ │ ├── sensor.py # Sensor models
│ │ └── user.py # User models
│ ├── schemas/ # Pydantic schemas
│ │ ├── farm.py # Farm schemas
│ │ ├── device.py # Device schemas
│ │ ├── sensor.py # Sensor schemas
│ │ └── user.py # User schemas
│ ├── services/ # Business logic layer
│ │ ├── farm_service.py # Farm business logic
│ │ ├── device_service.py # Device integration
│ │ ├── automation_service.py # Automation engine
│ │ ├── notification_service.py # Notifications
│ │ └── analytics_service.py # Analytics processing
│ ├── integrations/ # External integrations
│ │ ├── home_assistant.py # Home Assistant integration
│ │ ├── weather_api.py # Weather service
│ │ └── payment.py # Payment processing
│ ├── background/ # Background tasks
│ │ ├── tasks.py # Task definitions
│ │ ├── scheduler.py # Task scheduling
│ │ └── workers.py # Worker processes
│ ├── middleware/ # Custom middleware
│ │ ├── auth.py # Authentication middleware
│ │ ├── logging.py # Request logging
│ │ ├── cors.py # CORS configuration
│ │ └── rate_limit.py # Rate limiting
│ ├── utils/ # Utility functions
│ │ ├── validators.py # Custom validators
│ │ ├── formatters.py # Data formatters
│ │ └── helpers.py # Helper functions
│ └── main.py # Application entry point
├── tests/ # Test suite
│ ├── unit/ # Unit tests
│ ├── integration/ # Integration tests
│ ├── api/ # API tests
│ └── conftest.py # Test configuration
├── migrations/ # Database migrations
├── scripts/ # Utility scripts
├── requirements.txt # Python dependencies
├── Dockerfile # Container configuration
└── .env.example # Environment variables example
Core Architecture Patterns
Layered Architecture
# Request Flow
API Endpoint → Validation → Service Layer → CRUD/Database → Response
# Layer Responsibilities
- API Layer: HTTP handling, request/response formatting
- Service Layer: Business logic, orchestration
- CRUD Layer: Database operations
- Model Layer: Data representation
Dependency Injection
# core/dependencies.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.core.security import verify_jwt_token
from app.services.farm_service import FarmService
security = HTTPBearer()
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
) -> dict:
"""Dependency to get current authenticated user."""
try:
payload = verify_jwt_token(credentials.credentials)
return payload
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def get_farm_service() -> FarmService:
"""Dependency to get farm service instance."""
return FarmService()
# Usage in endpoint
@router.get("/farms")
async def get_user_farms(
current_user: Annotated[dict, Depends(get_current_user)],
farm_service: Annotated[FarmService, Depends(get_farm_service)]
):
return await farm_service.get_user_farms(current_user["sub"])
Service Layer Pattern
# services/farm_service.py
from typing import List, Optional
from app.crud.farm import farm_crud
from app.schemas.farm import FarmCreate, FarmUpdate, Farm
from app.core.exceptions import BusinessRuleViolation
from app.integrations.home_assistant import HomeAssistantClient
class FarmService:
"""Farm business logic service."""
def __init__(self):
self.farm_crud = farm_crud
self.ha_client = HomeAssistantClient()
async def create_farm(
self,
farm_data: FarmCreate,
user_id: str
) -> Farm:
"""Create a new farm with validation and setup."""
# Business rule validation
existing_farms = await self.farm_crud.get_user_farms(user_id)
if len(existing_farms) >= 5:
raise BusinessRuleViolation("User farm limit exceeded")
# Create farm
farm = await self.farm_crud.create(
obj_in=farm_data,
user_id=user_id
)
# Initialize farm infrastructure
await self._initialize_farm_infrastructure(farm)
# Set up default automation rules
await self._create_default_automation_rules(farm.id)
# Register with Home Assistant
await self.ha_client.register_farm(farm)
return farm
async def update_farm(
self,
farm_id: str,
farm_update: FarmUpdate,
user_id: str
) -> Farm:
"""Update farm with authorization check."""
# Verify ownership
farm = await self.farm_crud.get(farm_id)
if not farm or farm.user_id != user_id:
raise PermissionError("Unauthorized farm access")
# Update farm
updated_farm = await self.farm_crud.update(
db_obj=farm,
obj_in=farm_update
)
# Sync with external systems
await self.ha_client.update_farm(updated_farm)
return updated_farm
async def _initialize_farm_infrastructure(self, farm: Farm):
"""Set up initial farm infrastructure."""
# Create default rows, racks, shelves
# Set up monitoring
# Initialize sensor baselines
pass
async def _create_default_automation_rules(self, farm_id: str):
"""Create default automation rules for new farm."""
# Temperature control rules
# Humidity management
# Light scheduling
pass
API Design
RESTful Endpoints
# api/v1/endpoints/farms.py
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.schemas.farm import Farm, FarmCreate, FarmUpdate, FarmList
from app.core.dependencies import get_current_user, get_farm_service
router = APIRouter(prefix="/farms", tags=["farms"])
@router.get("/", response_model=FarmList)
async def list_farms(
current_user: dict = Depends(get_current_user),
farm_service: FarmService = Depends(get_farm_service),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
search: Optional[str] = None
):
"""List user's farms with pagination and search."""
farms = await farm_service.get_user_farms(
user_id=current_user["sub"],
skip=skip,
limit=limit,
search=search
)
total = await farm_service.count_user_farms(current_user["sub"])
return FarmList(
items=farms,
total=total,
skip=skip,
limit=limit
)
@router.post("/", response_model=Farm, status_code=status.HTTP_201_CREATED)
async def create_farm(
farm_data: FarmCreate,
current_user: dict = Depends(get_current_user),
farm_service: FarmService = Depends(get_farm_service)
):
"""Create a new farm."""
return await farm_service.create_farm(
farm_data=farm_data,
user_id=current_user["sub"]
)
@router.get("/{farm_id}", response_model=Farm)
async def get_farm(
farm_id: str,
current_user: dict = Depends(get_current_user),
farm_service: FarmService = Depends(get_farm_service)
):
"""Get farm details."""
farm = await farm_service.get_farm(
farm_id=farm_id,
user_id=current_user["sub"]
)
if not farm:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Farm not found"
)
return farm
@router.put("/{farm_id}", response_model=Farm)
async def update_farm(
farm_id: str,
farm_update: FarmUpdate,
current_user: dict = Depends(get_current_user),
farm_service: FarmService = Depends(get_farm_service)
):
"""Update farm details."""
return await farm_service.update_farm(
farm_id=farm_id,
farm_update=farm_update,
user_id=current_user["sub"]
)
@router.delete("/{farm_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_farm(
farm_id: str,
current_user: dict = Depends(get_current_user),
farm_service: FarmService = Depends(get_farm_service)
):
"""Delete a farm."""
await farm_service.delete_farm(
farm_id=farm_id,
user_id=current_user["sub"]
)
Request/Response Schemas
# schemas/farm.py
from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel, Field, validator
class FarmBase(BaseModel):
"""Base farm schema."""
name: str = Field(..., min_length=1, max_length=100)
location: Optional[str] = Field(None, max_length=200)
width: Optional[float] = Field(None, gt=0)
depth: Optional[float] = Field(None, gt=0)
@validator('name')
def validate_name(cls, v):
if not v.strip():
raise ValueError('Farm name cannot be empty')
return v.strip()
class FarmCreate(FarmBase):
"""Schema for creating a farm."""
pass
class FarmUpdate(BaseModel):
"""Schema for updating a farm."""
name: Optional[str] = Field(None, min_length=1, max_length=100)
location: Optional[str] = Field(None, max_length=200)
width: Optional[float] = Field(None, gt=0)
depth: Optional[float] = Field(None, gt=0)
class Farm(FarmBase):
"""Complete farm schema."""
id: str
user_id: str
created_at: datetime
updated_at: datetime
device_count: Optional[int] = 0
active_schedules: Optional[int] = 0
health_score: Optional[float] = None
class Config:
from_attributes = True
class FarmList(BaseModel):
"""Paginated farm list response."""
items: List[Farm]
total: int
skip: int
limit: int
Authentication & Security
JWT Authentication
# core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: str,
expires_delta: Optional[timedelta] = None
) -> str:
"""Create JWT access token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {
"exp": expire,
"sub": str(subject),
"type": "access"
}
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_jwt_token(token: str) -> dict:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
return payload
except JWTError as e:
raise ValueError(f"Invalid token: {e}")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
Authorization Middleware
# middleware/auth.py
from fastapi import Request, HTTPException
from fastapi.security.utils import get_authorization_scheme_param
from app.core.security import verify_jwt_token
async def auth_middleware(request: Request, call_next):
"""Authentication middleware for protected routes."""
# Skip auth for public endpoints
public_paths = ["/docs", "/openapi.json", "/health", "/api/v1/auth/login"]
if request.url.path in public_paths:
return await call_next(request)
# Extract token
authorization = request.headers.get("Authorization")
scheme, token = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
raise HTTPException(
status_code=401,
detail="Not authenticated"
)
try:
# Verify token
payload = verify_jwt_token(token)
request.state.user = payload
except Exception:
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials"
)
response = await call_next(request)
return response
Database Integration
CRUD Operations
# crud/base.py
from typing import Generic, TypeVar, Type, Optional, List, Any
from pydantic import BaseModel
from supabase import Client
ModelType = TypeVar("ModelType")
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""Base CRUD operations."""
def __init__(self, table: str, client: Client):
self.table = table
self.client = client
async def get(self, id: str) -> Optional[ModelType]:
"""Get single record by ID."""
response = self.client.table(self.table).select("*").eq("id", id).single().execute()
return response.data if response.data else None
async def get_multi(
self,
skip: int = 0,
limit: int = 100,
**filters
) -> List[ModelType]:
"""Get multiple records with pagination."""
query = self.client.table(self.table).select("*")
for key, value in filters.items():
query = query.eq(key, value)
response = query.range(skip, skip + limit - 1).execute()
return response.data
async def create(self, obj_in: CreateSchemaType) -> ModelType:
"""Create new record."""
obj_in_data = obj_in.dict()
response = self.client.table(self.table).insert(obj_in_data).execute()
return response.data[0]
async def update(
self,
id: str,
obj_in: UpdateSchemaType
) -> ModelType:
"""Update existing record."""
obj_data = obj_in.dict(exclude_unset=True)
response = self.client.table(self.table).update(obj_data).eq("id", id).execute()
return response.data[0]
async def delete(self, id: str) -> bool:
"""Delete record."""
response = self.client.table(self.table).delete().eq("id", id).execute()
return True
Database Connection Management
# core/database.py
from supabase import create_client, Client
from app.core.config import settings
class DatabaseManager:
"""Database connection manager."""
_client: Optional[Client] = None
@classmethod
def get_client(cls) -> Client:
"""Get or create Supabase client."""
if cls._client is None:
cls._client = create_client(
supabase_url=settings.SUPABASE_URL,
supabase_key=settings.SUPABASE_SERVICE_KEY
)
return cls._client
@classmethod
def close(cls):
"""Close database connection."""
if cls._client:
# Supabase client doesn't need explicit closing
cls._client = None
# Dependency for database session
def get_db() -> Client:
"""Get database client dependency."""
return DatabaseManager.get_client()
Background Tasks
Task Queue Implementation
# background/tasks.py
from typing import Any
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"vertical_farm",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL
)
@celery_app.task(name="process_sensor_data")
async def process_sensor_data(sensor_data: dict) -> dict:
"""Process incoming sensor data."""
# Validate data
# Store in database
# Check thresholds
# Trigger alerts if needed
return {"status": "processed", "data": sensor_data}
@celery_app.task(name="execute_automation_rule")
async def execute_automation_rule(rule_id: str) -> dict:
"""Execute an automation rule."""
# Fetch rule details
# Check conditions
# Execute actions
# Log execution
return {"status": "executed", "rule_id": rule_id}
@celery_app.task(name="generate_report")
async def generate_report(farm_id: str, report_type: str) -> str:
"""Generate farm report."""
# Fetch data
# Process analytics
# Generate report
# Send notification
return f"Report generated: {report_type}"
Scheduled Tasks
# background/scheduler.py
from celery.schedules import crontab
from app.background.tasks import celery_app
celery_app.conf.beat_schedule = {
'check-sensor-health': {
'task': 'check_sensor_health',
'schedule': crontab(minute='*/5'), # Every 5 minutes
},
'process-automation-rules': {
'task': 'process_automation_rules',
'schedule': crontab(minute='*/1'), # Every minute
},
'generate-daily-reports': {
'task': 'generate_daily_reports',
'schedule': crontab(hour=0, minute=0), # Daily at midnight
},
'cleanup-old-data': {
'task': 'cleanup_old_data',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
}
Error Handling
Custom Exceptions
# core/exceptions.py
from typing import Optional
class AppException(Exception):
"""Base application exception."""
def __init__(
self,
message: str,
code: Optional[str] = None,
status_code: int = 500
):
self.message = message
self.code = code
self.status_code = status_code
super().__init__(message)
class ValidationError(AppException):
"""Validation error."""
def __init__(self, message: str):
super().__init__(message, "VALIDATION_ERROR", 400)
class NotFoundError(AppException):
"""Resource not found error."""
def __init__(self, message: str):
super().__init__(message, "NOT_FOUND", 404)
class AuthenticationError(AppException):
"""Authentication error."""
def __init__(self, message: str):
super().__init__(message, "AUTHENTICATION_ERROR", 401)
class AuthorizationError(AppException):
"""Authorization error."""
def __init__(self, message: str):
super().__init__(message, "AUTHORIZATION_ERROR", 403)
class BusinessRuleViolation(AppException):
"""Business rule violation."""
def __init__(self, message: str):
super().__init__(message, "BUSINESS_RULE_VIOLATION", 422)
Global Exception Handler
# main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app.core.exceptions import AppException
app = FastAPI()
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
"""Handle application exceptions."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"message": exc.message,
"code": exc.code,
"status": exc.status_code
}
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle unexpected exceptions."""
# Log the error
logger.error(f"Unexpected error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": {
"message": "An unexpected error occurred",
"code": "INTERNAL_ERROR",
"status": 500
}
}
)
Monitoring & Logging
Structured Logging
# core/logging.py
import logging
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
"""Structured JSON logger."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
self.logger.addHandler(handler)
def _log(self, level: str, message: str, **kwargs):
"""Log with structured data."""
extra = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**kwargs
}
getattr(self.logger, level.lower())(message, extra={"structured": extra})
def info(self, message: str, **kwargs):
self._log("INFO", message, **kwargs)
def error(self, message: str, **kwargs):
self._log("ERROR", message, **kwargs)
def warning(self, message: str, **kwargs):
self._log("WARNING", message, **kwargs)
class JSONFormatter(logging.Formatter):
"""JSON log formatter."""
def format(self, record):
if hasattr(record, 'structured'):
return json.dumps(record.structured)
return json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
})
Datadog Integration
# core/monitoring.py
from datadog import initialize, statsd
from app.core.config import settings
# Initialize Datadog
initialize(
api_key=settings.DATADOG_API_KEY,
app_key=settings.DATADOG_APP_KEY
)
class Metrics:
"""Application metrics collector."""
@staticmethod
def increment(metric: str, value: int = 1, tags: list = None):
"""Increment a counter metric."""
statsd.increment(f"verticalfarm.{metric}", value, tags=tags)
@staticmethod
def gauge(metric: str, value: float, tags: list = None):
"""Set a gauge metric."""
statsd.gauge(f"verticalfarm.{metric}", value, tags=tags)
@staticmethod
def timing(metric: str, value: float, tags: list = None):
"""Record a timing metric."""
statsd.timing(f"verticalfarm.{metric}", value, tags=tags)
@staticmethod
def histogram(metric: str, value: float, tags: list = None):
"""Record a histogram metric."""
statsd.histogram(f"verticalfarm.{metric}", value, tags=tags)
# Usage in endpoints
from app.core.monitoring import Metrics
@router.get("/farms")
async def get_farms():
with statsd.timed('api.farms.get.duration'):
farms = await farm_service.get_farms()
Metrics.increment('api.farms.get.count')
Metrics.gauge('api.farms.count', len(farms))
return farms
Testing
Unit Testing
# tests/unit/test_farm_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from app.services.farm_service import FarmService
from app.schemas.farm import FarmCreate
@pytest.fixture
def farm_service():
service = FarmService()
service.farm_crud = Mock()
service.ha_client = Mock()
return service
@pytest.mark.asyncio
async def test_create_farm_success(farm_service):
# Arrange
farm_data = FarmCreate(name="Test Farm", location="Test Location")
user_id = "test-user-id"
farm_service.farm_crud.get_user_farms = AsyncMock(return_value=[])
farm_service.farm_crud.create = AsyncMock(return_value={"id": "farm-id"})
farm_service.ha_client.register_farm = AsyncMock()
# Act
result = await farm_service.create_farm(farm_data, user_id)
# Assert
assert result["id"] == "farm-id"
farm_service.farm_crud.create.assert_called_once()
farm_service.ha_client.register_farm.assert_called_once()
@pytest.mark.asyncio
async def test_create_farm_limit_exceeded(farm_service):
# Arrange
farm_data = FarmCreate(name="Test Farm", location="Test Location")
user_id = "test-user-id"
# User already has 5 farms
existing_farms = [{"id": f"farm-{i}"} for i in range(5)]
farm_service.farm_crud.get_user_farms = AsyncMock(return_value=existing_farms)
# Act & Assert
with pytest.raises(BusinessRuleViolation) as exc:
await farm_service.create_farm(farm_data, user_id)
assert "farm limit exceeded" in str(exc.value).lower()
Integration Testing
# tests/integration/test_api_farms.py
import pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_farm_crud_flow():
async with AsyncClient(app=app, base_url="http://test") as client:
# Login
login_response = await client.post(
"/api/v1/auth/login",
json={"email": "test@example.com", "password": "testpass"}
)
token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# Create farm
create_response = await client.post(
"/api/v1/farms",
json={"name": "Test Farm", "location": "Test Location"},
headers=headers
)
assert create_response.status_code == 201
farm = create_response.json()
# Get farm
get_response = await client.get(
f"/api/v1/farms/{farm['id']}",
headers=headers
)
assert get_response.status_code == 200
assert get_response.json()["name"] == "Test Farm"
# Update farm
update_response = await client.put(
f"/api/v1/farms/{farm['id']}",
json={"name": "Updated Farm"},
headers=headers
)
assert update_response.status_code == 200
assert update_response.json()["name"] == "Updated Farm"
# Delete farm
delete_response = await client.delete(
f"/api/v1/farms/{farm['id']}",
headers=headers
)
assert delete_response.status_code == 204
Performance Optimization
Async Operations
# Parallel data fetching
async def get_farm_dashboard(farm_id: str):
"""Get comprehensive farm dashboard data."""
# Fetch all data in parallel
(
farm,
devices,
sensor_data,
active_schedules,
recent_alerts
) = await asyncio.gather(
farm_service.get_farm(farm_id),
device_service.get_farm_devices(farm_id),
sensor_service.get_latest_readings(farm_id),
schedule_service.get_active_schedules(farm_id),
alert_service.get_recent_alerts(farm_id)
)
return {
"farm": farm,
"devices": devices,
"sensor_data": sensor_data,
"schedules": active_schedules,
"alerts": recent_alerts
}
Caching Strategy
# core/cache.py
import json
from typing import Optional, Any
from redis import Redis
from app.core.config import settings
class CacheManager:
"""Redis cache manager."""
def __init__(self):
self.redis = Redis.from_url(settings.REDIS_URL)
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
value = self.redis.get(key)
if value:
return json.loads(value)
return None
async def set(
self,
key: str,
value: Any,
expire: int = 300
):
"""Set value in cache with expiration."""
self.redis.set(
key,
json.dumps(value),
ex=expire
)
async def delete(self, key: str):
"""Delete value from cache."""
self.redis.delete(key)
def cache_key(self, prefix: str, *args) -> str:
"""Generate cache key."""
return f"{prefix}:" + ":".join(str(arg) for arg in args)
# Usage in service
cache = CacheManager()
async def get_farm_with_cache(farm_id: str):
# Try cache first
cache_key = cache.cache_key("farm", farm_id)
cached = await cache.get(cache_key)
if cached:
return cached
# Fetch from database
farm = await farm_crud.get(farm_id)
# Cache for 5 minutes
await cache.set(cache_key, farm, expire=300)
return farm
Deployment
Docker Configuration
# Dockerfile
FROM python:3.13-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY ./app ./app
# Run application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Environment Configuration
# core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Application
APP_NAME: str = "VerticalFarm API"
VERSION: str = "1.0.0"
DEBUG: bool = False
# Database
SUPABASE_URL: str
SUPABASE_SERVICE_KEY: str
# Security
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Redis
REDIS_URL: str = "redis://localhost:6379"
# Monitoring
DATADOG_API_KEY: str
DATADOG_APP_KEY: str
# External Services
HOME_ASSISTANT_URL: str
HOME_ASSISTANT_TOKEN: str
WEATHER_API_KEY: str
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
Summary
The backend architecture provides: - Scalability through async operations and microservice-ready design - Maintainability with clear separation of concerns - Security with comprehensive authentication and authorization - Performance through caching and optimized queries - Reliability with error handling and monitoring - Testability with dependency injection and clear interfaces
Follow these patterns to maintain consistency and quality in the backend codebase.
For backend-specific questions or architectural decisions, consult the backend team lead or submit proposals for review.