Security hardening guide for Chainlit applications.
Never hardcode API keys in your code:
# ✅ Correct
import os
from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# ❌ Wrong - Never do this!
client = OpenAI(api_key="sk-1234567890") # Never hardcode!
Create .env file (add to .gitignore):
OPENAI_API_KEY=sk-your-key
ANTHROPIC_API_KEY=sk-ant-key
CHAINLIT_AUTH_SECRET=your-secret-key
Load in Python:
from dotenv import load_dotenv
load_dotenv()
Generate a secure auth secret:
# Generate secure random key
python -c "import secrets; print(secrets.token_hex(32))"
# Set in environment
export CHAINLIT_AUTH_SECRET="generated-secret-key"
import chainlit as cl
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Store hashed passwords
hashed_password = pwd_context.hash("secure-password")
@cl.password_auth_callback
async def password_auth(username: str, password: str):
"""Authenticate user with password"""
# Fetch user from database
user = get_user_from_db(username)
if user and pwd_context.verify(password, user.hashed_password):
return cl.User(
identifier=username,
metadata={"role": user.role}
)
return None
import chainlit as cl
@cl.oauth_callback
async def oauth_callback(provider_id: str, token: str, raw_user_data: dict):
"""Handle OAuth callback"""
if provider_id == "google":
# Process Google OAuth data
email = raw_user_data.get("email")
if email:
return cl.User(
identifier=email,
metadata={"provider": "google"}
)
return None
@cl.on_chat_start
async def check_access():
user = cl.user_session.get("user")
if not user:
await cl.Message(
content="Please log in to access this feature"
).send()
raise cl.StopException()
# Check role
if user.metadata.get("role") not in ["admin", "user"]:
await cl.Message(
content="Access denied"
).send()
raise cl.StopException()
import re
def sanitize_input(user_input: str) -> str:
"""Remove potentially dangerous patterns"""
# Remove script tags
sanitized = re.sub(r'<script.*?>.*?</script>', '', user_input, flags=re.IGNORECASE | re.DOTALL)
# Remove SQL injection attempts
sanitized = re.sub(r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP)\b)', '', sanitized, flags=re.IGNORECASE)
return sanitized.strip()
@cl.on_message
async def on_message(message: cl.Message):
safe_input = sanitize_input(message.content)
# Process safe_input
MAX_MESSAGE_LENGTH = 4000
@cl.on_message
async def on_message(message: cl.Message):
if len(message.content) > MAX_MESSAGE_LENGTH:
await cl.Message(
content=f"Message too long. Maximum length is {MAX_MESSAGE_LENGTH} characters."
).send()
return
# Process message
def detect_prompt_injection(text: str) -> bool:
"""Detect potential prompt injection attempts"""
dangerous_patterns = [
r'ignore.*instructions',
r'system:.*',
r'you are now.*',
r'forget.*previous'
]
for pattern in dangerous_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
@cl.on_message
async def on_message(message: cl.Message):
if detect_prompt_injection(message.content):
await cl.Message(
content="I cannot comply with that request."
).send()
return
# Process message
import chainlit as cl
from pathlib import Path
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.md', '.docx'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
@cl.on_message
async def on_message(message: cl.Message):
for file in message.elements:
if isinstance(file, cl.File):
# Check file extension
ext = Path(file.name).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
await cl.Message(
content=f"File type {ext} not allowed"
).send()
continue
# Check file size
if len(file.content) > MAX_FILE_SIZE:
await cl.Message(
content="File too large"
).send()
continue
# Process file
import re
from pathlib import Path
def sanitize_filename(filename: str) -> str:
"""Sanitize filename to prevent path traversal"""
# Remove path components
filename = Path(filename).name
# Remove dangerous characters
filename = re.sub(r'[^\w.\-]', '_', filename)
# Limit length
return filename[:255]
# chainlit_config.toml
[features]
# Enable persistent sessions
persistent_sessions = true
# Session timeout (in seconds)
session_timeout = 3600
# Require authentication
require_auth = true
@cl.on_chat_end
async def on_chat_end():
"""Clean up session data"""
# Clear sensitive data
cl.user_session.set("llm", None)
cl.user_session.set("history", None)
# Log session end
print(f"Session ended: {cl.user_session.get('id')}")
from collections import defaultdict
from datetime import datetime, timedelta
user_requests = defaultdict(list)
RATE_LIMIT = 10 # requests per minute
TIME_WINDOW = 60 # seconds
def check_rate_limit(user_id: str) -> bool:
"""Check if user has exceeded rate limit"""
now = datetime.now()
window_start = now - timedelta(seconds=TIME_WINDOW)
# Clean old requests
user_requests[user_id] = [
t for t in user_requests[user_id] if t > window_start
]
# Check limit
if len(user_requests[user_id]) >= RATE_LIMIT:
return False
user_requests[user_id].append(now)
return True
@cl.on_message
async def on_message(message: cl.Message):
user = cl.user_session.get("user")
user_id = user.identifier if user else "anonymous"
if not check_rate_limit(user_id):
await cl.Message(
content="Rate limit exceeded. Please try again later."
).send()
return
# Process message
import logging
import re
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SecurityFilter(logging.Filter):
"""Filter sensitive data from logs"""
def filter(self, record):
# Redact API keys
record.msg = re.sub(r'sk-[a-zA-Z0-9]+', '[REDACTED]', str(record.msg))
# Redact passwords
record.msg = re.sub(r'password[=:]\s*\S+', 'password=[REDACTED]', record.msg)
return True
logger.addFilter(SecurityFilter())
@cl.on_message
async def audit_message(message: cl.Message):
"""Log message for audit"""
user = cl.user_session.get("user")
user_id = user.identifier if user else "anonymous"
logger.info(f"User {user_id} sent message: {message.content[:100]}...")
@cl.on_chat_end
async def audit_session_end():
"""Log session end for audit"""
user = cl.user_session.get("user")
user_id = user.identifier if user else "anonymous"
session_id = cl.user_session.get("id")
logger.info(f"Session {session_id} ended for user {user_id}")
server {
listen 443 ssl;
server_name your-domain.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
location / {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
server {
listen 80;
server_name your-domain.com;
return 301 https://$server_name$request_uri;
}