Python函数默认参数为什么不建议使用可变类型?
Why is it not recommended to use mutable types as default arguments in Python?
*考察点:可变类型默认参数陷阱。*
共 45 道题目
Why is it not recommended to use mutable types as default arguments in Python?
Why is it not recommended to use mutable types as default arguments in Python?
考察点:可变类型默认参数陷阱。
答案:
Python函数的默认参数在函数定义时只会被创建一次,而不是每次调用时都创建。如果默认参数是可变类型(如列表、字典),所有函数调用将共享同一个对象,导致意外的状态累积。
核心问题:
def add_item(item, items=[]):
items.append(item)
return items
print(add_item(1)) # [1]
print(add_item(2)) # [1, 2] ← 不是期望的 [2]
print(add_item(3, [])) # [3]
print(add_item(4)) # [1, 2, 4] ← 继续累积
正确做法:
def add_item(item, items=None):
if items is None:
items = []
items.append(item)
return items
实际应用:
What's the difference between shallow copy and deep copy?
What’s the difference between shallow copy and deep copy?
考察点:引用传递、浅拷贝、深拷贝。
答案:
浅拷贝创建一个新对象,但其内部的元素仍然是原对象元素的引用。深拷贝则递归地复制所有层级的对象,创建完全独立的副本。
三种赋值方式对比:
import copy
a = [[1, 2], [3, 4]]
# 引用赋值(不是拷贝)
b = a
# 浅拷贝
c = a.copy()
# 或 c = copy.copy(a)
# 深拷贝
d = copy.deepcopy(a)
a[0][0] = 999
结果分析:
b = [[999, 2], [3, 4]] - 引用同一对象c = [[999, 2], [3, 4]] - 浅拷贝,内层列表仍是引用d = [[1, 2], [3, 4]] - 深拷贝,完全独立适用场景:
What's the difference between list comprehension and generator expression?
What’s the difference between list comprehension and generator expression?
考察点:内存效率、惰性求值。
答案:
列表推导式立即创建并返回完整列表,占用内存。生成器表达式返回迭代器,按需生成元素,内存效率高。
语法对比:
# 列表推导式 - 使用方括号
list_comp = [x * 2 for x in range(1000000)]
# 生成器表达式 - 使用圆括号
gen_exp = (x * 2 for x in range(1000000))
内存差异:
import sys
list_comp = [x for x in range(10000)]
gen_exp = (x for x in range(10000))
print(sys.getsizeof(list_comp)) # ~87KB
print(sys.getsizeof(gen_exp)) # ~128字节
适用场景:
What is the late binding issue in closures?
What is the late binding issue in closures?
考察点:闭包、作用域、late binding。
答案:
Late binding(延迟绑定)是指闭包中的变量在函数调用时才会查找值,而不是定义时。这导致所有闭包共享同一个变量的最终值。
问题示例:
def create_multipliers():
return [lambda x: x * i for i in range(5)]
funcs = create_multipliers()
print(funcs[0](2)) # 期望 0,实际 8
print(funcs[3](2)) # 期望 6,实际 8
# 所有函数都使用 i 的最终值 4
解决方案:
# 方案1:使用默认参数立即绑定
def create_multipliers():
return [lambda x, i=i: x * i for i in range(5)]
# 方案2:使用 functools.partial
from functools import partial
def create_multipliers():
def multiplier(i, x):
return x * i
return [partial(multiplier, i) for i in range(5)]
实际应用:
How to implement a decorator with arguments that records function execution time?
How to implement a decorator with arguments that records function execution time?
考察点:装饰器原理、高阶函数。
答案:
带参数的装饰器需要三层函数嵌套:最外层接收装饰器参数,中间层接收被装饰函数,最内层是实际执行的包装函数。
实现代码:
import time
from functools import wraps
def timer(unit='s'):
"""装饰器工厂函数,接收参数"""
def decorator(func):
"""实际的装饰器函数"""
@wraps(func)
def wrapper(*args, **kwargs):
"""包装函数"""
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
# 根据参数转换时间单位
if unit == 'ms':
duration *= 1000
elif unit == 'μs':
duration *= 1000000
print(f"{func.__name__} took {duration:.2f}{unit}")
return result
return wrapper
return decorator
使用示例:
@timer(unit='ms')
def slow_function():
time.sleep(1)
return "Done"
result = slow_function()
# 输出: slow_function took 1000.00ms
关键要点:
@wraps(func) 保留原函数元数据*args, **kwargs 处理任意参数What is Django's MTV architecture? How is it different from traditional MVC?
What is Django’s MTV architecture? How is it different from traditional MVC?
考察点:Django架构理解、框架设计模式。
答案:
Django采用MTV(Model-Template-View)架构模式,是对传统MVC模式的变体。Model负责数据层,Template负责展示层,View负责业务逻辑层。
MTV vs MVC 对应关系:
| Django MTV | 传统 MVC | 职责 |
|---|---|---|
| Model | Model | 数据模型和业务逻辑 |
| Template | View | 展示层(HTML模板) |
| View | Controller | 业务逻辑控制 |
| URL Dispatcher | - | 路由分发(Django特有) |
工作流程:
请求 → URL配置 → View函数 → Model数据 → Template渲染 → 响应
核心组件:
# Model - 数据层
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
# View - 逻辑层
def article_list(request):
articles = Article.objects.all()
return render(request, 'list.html', {'articles': articles})
# Template - 展示层
# templates/list.html
# {% for article in articles %}
# <h2>{{ article.title }}</h2>
# {% endfor %}
# URL配置 - 路由
# path('articles/', article_list, name='article-list')
设计优势:
How does Django match URL patterns?
How does Django match URL patterns?
考察点:URL路由匹配规则。
答案:
Django按照 urlpatterns 列表中定义的顺序从上到下依次匹配URL,找到第一个匹配的模式就停止搜索。因此,更具体的模式应该放在更通用的模式之前。
匹配规则:
urlpatterns = [
path('articles/special/', views.special), # 具体路径,优先级高
path('articles/<int:year>/', views.year_archive),
path('articles/<str:category>/', views.category_archive), # 通用路径
]
# /articles/special/ → 匹配第一条
# /articles/2024/ → 匹配第二条(int类型)
# /articles/tech/ → 匹配第三条(str类型)
常见陷阱:
# ❌ 错误顺序
urlpatterns = [
path('articles/<str:category>/', views.category), # 会拦截所有
path('articles/special/', views.special), # 永远不会被匹配
]
# ✅ 正确顺序
urlpatterns = [
path('articles/special/', views.special), # 先匹配特殊路径
path('articles/<str:category>/', views.category),
]
最佳实践:
<int:id>)避免冲突path() 而非 re_path() 提高可读性What is the N+1 query problem in Django ORM?
What is the N+1 query problem in Django ORM?
考察点:N+1查询问题。
答案:
N+1查询问题是指在查询主对象列表时执行1次查询,然后为每个对象的关联数据又执行N次查询,导致数据库查询次数激增。
问题示例:
# 模型定义
class Author(models.Model):
name = models.CharField(max_length=100)
class Book(models.Model):
title = models.CharField(max_length=100)
author = models.ForeignKey(Author, on_delete=models.CASCADE)
# 问题代码 - 产生 N+1 查询
books = Book.objects.all() # 1次查询
for book in books:
print(book.author.name) # 每本书1次查询,共N次
# 总计:1 + N 次查询
解决方案:
# 使用 select_related() - 一次JOIN查询
books = Book.objects.select_related('author').all()
for book in books:
print(book.author.name)
# 总计:1次查询(使用JOIN)
# SQL 类似于:
# SELECT * FROM book INNER JOIN author ON book.author_id = author.id
优化工具:
select_related() - 用于ForeignKey和OneToOneprefetch_related() - 用于ManyToMany和反向ForeignKey性能影响:
How does Django prevent XSS attacks in templates?
How does Django prevent XSS attacks in templates?
考察点:XSS防护、模板自动转义。
答案:
Django模板系统默认对所有变量进行HTML转义,将 <、>、&、"、' 等特殊字符转换为HTML实体,防止恶意脚本执行。
自动转义机制:
# View
def user_profile(request):
user_input = "<script>alert('XSS')</script>"
return render(request, 'profile.html', {'bio': user_input})
# Template
{{ bio }}
# 输出: <script>alert('XSS')</script>
# 浏览器显示为文本,不执行脚本
转义控制:
<!-- 自动转义(默认) -->
{{ user_input }}
<!-- 标记为安全(慎用) -->
{{ user_input|safe }}
<!-- 关闭自动转义 -->
{% autoescape off %}
{{ user_input }}
{% endautoescape %}
安全最佳实践:
# ✅ 安全 - 使用模板过滤器
{{ user_bio|linebreaks }}
# ❌ 危险 - 直接标记为安全
{{ user_bio|safe }}
# ✅ 安全 - 使用专门的标签
{% csrf_token %}
# ✅ 安全 - JavaScript中的变量
<script>
const userName = "{{ user_name|escapejs }}";
</script>
特殊过滤器:
escape - 强制转义safe - 标记为安全(不转义)escapejs - JavaScript字符串转义linebreaks - 安全地转换换行符What's the difference between static files and media files in Django?
What’s the difference between static files and media files in Django?
考察点:静态资源管理、部署配置。
答案:
静态文件是开发者创建的固定资源(CSS、JS、图片),媒体文件是用户上传的动态内容。两者有不同的存储路径、URL配置和处理方式。
配置对比:
# settings.py
# 静态文件配置
STATIC_URL = '/static/' # URL前缀
STATIC_ROOT = BASE_DIR / 'staticfiles' # 收集后的存储路径
STATICFILES_DIRS = [
BASE_DIR / 'static', # 开发时的查找路径
]
# 媒体文件配置
MEDIA_URL = '/media/' # URL前缀
MEDIA_ROOT = BASE_DIR / 'media' # 用户上传的存储路径
开发环境使用:
# urls.py - 开发环境配置
from django.conf import settings
from django.conf.urls.static import static
urlpatterns = [
# ... 你的URL模式
]
# 开发环境提供媒体文件服务
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
生产环境部署:
# 收集所有静态文件到STATIC_ROOT
python manage.py collectstatic
# Nginx配置
# location /static/ {
# alias /path/to/staticfiles/;
# }
# location /media/ {
# alias /path/to/media/;
# }
主要区别:
| 特性 | 静态文件 | 媒体文件 |
|---|---|---|
| 来源 | 开发者创建 | 用户上传 |
| 内容 | CSS/JS/图片 | 头像/文档/图片 |
| 变化 | 部署时改变 | 运行时改变 |
| 收集 | collectstatic | 直接存储 |
| 权限 | 公开访问 | 可能需要权限控制 |
How does Django's CSRF protection work?
How does Django’s CSRF protection work?
考察点:CSRF攻击防护。
答案:
Django的CSRF保护通过在表单中嵌入一个随机令牌,服务器验证提交的令牌是否与会话中存储的令牌匹配,防止跨站请求伪造攻击。
工作原理:
1. 用户访问页面 → Django生成CSRF token存入cookie
2. 渲染表单 → {% csrf_token %}生成隐藏字段
3. 用户提交 → Django验证token是否匹配
4. 验证通过 → 处理请求
5. 验证失败 → 返回403错误
表单使用:
<form method="post">
{% csrf_token %}
<input type="text" name="username">
<button type="submit">提交</button>
</form>
<!-- 渲染后 -->
<form method="post">
<input type="hidden" name="csrfmiddlewaretoken" value="随机token值">
<input type="text" name="username">
<button type="submit">提交</button>
</form>
AJAX请求处理:
// 方式1:从cookie获取token
function getCookie(name) {
const value = `; ${document.cookie}`;
const parts = value.split(`; ${name}=`);
if (parts.length === 2) return parts.pop().split(';').shift();
}
fetch('/api/endpoint/', {
method: 'POST',
headers: {
'X-CSRFToken': getCookie('csrftoken'),
'Content-Type': 'application/json',
},
body: JSON.stringify(data)
});
// 方式2:从DOM获取
const csrftoken = document.querySelector('[name=csrfmiddlewaretoken]').value;
豁免CSRF(谨慎使用):
from django.views.decorators.csrf import csrf_exempt
@csrf_exempt
def api_endpoint(request):
# API端点可能使用其他认证方式
pass
安全要点:
HttpOnly 和 Secure 标志What's the difference between Form and ModelForm?
What’s the difference between Form and ModelForm?
考察点:表单系统、代码复用。
答案:
Form是通用表单类,需要手动定义所有字段。ModelForm自动从Model生成字段,减少重复代码,并提供自动保存到数据库的功能。
Form - 手动定义:
from django import forms
class ContactForm(forms.Form):
name = forms.CharField(max_length=100)
email = forms.EmailField()
message = forms.CharField(widget=forms.Textarea)
def clean_email(self):
email = self.cleaned_data['email']
if not email.endswith('@company.com'):
raise forms.ValidationError('必须使用公司邮箱')
return email
def send_email(self):
# 手动处理数据
pass
ModelForm - 自动生成:
from django import forms
from .models import User
class UserForm(forms.ModelForm):
class Meta:
model = User
fields = ['username', 'email', 'bio']
# 或排除某些字段
# exclude = ['password']
widgets = {
'bio': forms.Textarea(attrs={'rows': 4}),
}
labels = {
'username': '用户名',
}
def save(self, commit=True):
user = super().save(commit=False)
# 可以添加额外处理
if commit:
user.save()
return user
使用对比:
# Form使用
form = ContactForm(request.POST)
if form.is_valid():
name = form.cleaned_data['name']
email = form.cleaned_data['email']
# 手动处理数据
# ModelForm使用
form = UserForm(request.POST, instance=user)
if form.is_valid():
user = form.save() # 自动保存到数据库
选择建议:
How to customize Django Admin list display?
How to customize Django Admin list display?
考察点:Admin自定义。
答案:
通过 ModelAdmin 类可以自定义Admin列表显示、过滤、搜索、排序等功能,提供强大的后台管理界面。
基础自定义:
from django.contrib import admin
from .models import Book
@admin.register(Book)
class BookAdmin(admin.ModelAdmin):
# 列表显示字段
list_display = ['title', 'author', 'published_date', 'is_available']
# 列表可点击字段
list_display_links = ['title']
# 列表可编辑字段
list_editable = ['is_available']
# 右侧过滤器
list_filter = ['published_date', 'is_available']
# 搜索字段
search_fields = ['title', 'author__name']
# 默认排序
ordering = ['-published_date']
# 每页显示数量
list_per_page = 20
自定义方法列:
class BookAdmin(admin.ModelAdmin):
list_display = ['title', 'author', 'get_status', 'colored_price']
@admin.display(description='状态', ordering='is_available')
def get_status(self, obj):
return '可借' if obj.is_available else '已借出'
@admin.display(description='价格')
def colored_price(self, obj):
from django.utils.html import format_html
color = 'red' if obj.price > 100 else 'green'
return format_html(
'<span style="color: {};">${}</span>',
color, obj.price
)
批量操作:
class BookAdmin(admin.ModelAdmin):
actions = ['make_available', 'export_as_csv']
@admin.action(description='标记为可借')
def make_available(self, request, queryset):
updated = queryset.update(is_available=True)
self.message_user(request, f'{updated}本书已标记为可借')
@admin.action(description='导出为CSV')
def export_as_csv(self, request, queryset):
import csv
from django.http import HttpResponse
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="books.csv"'
writer = csv.writer(response)
writer.writerow(['标题', '作者', '价格'])
for book in queryset:
writer.writerow([book.title, book.author.name, book.price])
return response
内联编辑:
class ChapterInline(admin.TabularInline):
model = Chapter
extra = 1
class BookAdmin(admin.ModelAdmin):
inlines = [ChapterInline]
What's the difference between makemigrations and migrate?
What’s the difference between makemigrations and migrate?
考察点:数据库迁移管理。
答案:
makemigrations 根据模型变更创建迁移文件,migrate 将迁移文件应用到数据库。前者是规划阶段,后者是执行阶段。
工作流程:
# 1. 修改models.py
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
published_date = models.DateTimeField(auto_now_add=True) # 新增字段
# 2. 创建迁移文件
python manage.py makemigrations
# 输出: Migrations for 'blog':
# blog/migrations/0002_article_published_date.py
# - Add field published_date to article
# 3. 应用迁移到数据库
python manage.py migrate
# 输出: Running migrations:
# Applying blog.0002_article_published_date... OK
命令详解:
# makemigrations - 创建迁移文件
python manage.py makemigrations # 所有app
python manage.py makemigrations blog # 指定app
python manage.py makemigrations --name custom_name # 自定义名称
python manage.py makemigrations --empty blog # 空迁移文件
# migrate - 应用迁移
python manage.py migrate # 所有迁移
python manage.py migrate blog # 指定app
python manage.py migrate blog 0001 # 迁移到特定版本
python manage.py migrate blog zero # 撤销所有迁移
# 查看迁移状态
python manage.py showmigrations
python manage.py showmigrations blog
# 查看SQL
python manage.py sqlmigrate blog 0001
迁移文件示例:
# blog/migrations/0002_article_published_date.py
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='article',
name='published_date',
field=models.DateTimeField(auto_now_add=True),
),
]
最佳实践:
--check 检查是否有未创建的迁移What problem does Python virtual environment solve?
What problem does Python virtual environment solve?
考察点:依赖管理、环境隔离。
答案:
虚拟环境为每个Python项目创建独立的依赖空间,避免不同项目间的包版本冲突,确保项目的可移植性和可重现性。
解决的问题:
问题场景:
项目A需要 Django 3.2
项目B需要 Django 4.2
没有虚拟环境:
- 全局只能安装一个Django版本
- 升级影响所有项目
- 团队协作时环境不一致
使用虚拟环境:
- 每个项目独立的Django版本
- 互不干扰
- 环境可复现
创建和使用:
# venv(Python 3.3+自带)
python -m venv myenv
source myenv/bin/activate # Linux/Mac
myenv\Scripts\activate # Windows
# virtualenv(功能更强大)
pip install virtualenv
virtualenv myenv
source myenv/bin/activate
# pipenv(现代化工具)
pip install pipenv
pipenv install django
pipenv shell
依赖管理:
# 导出依赖
pip freeze > requirements.txt
# requirements.txt内容示例:
# Django==4.2.0
# djangorestframework==3.14.0
# psycopg2-binary==2.9.5
# 安装依赖
pip install -r requirements.txt
# 退出虚拟环境
deactivate
项目结构示例:
myproject/
├── venv/ # 虚拟环境(不提交到git)
├── myapp/
├── manage.py
├── requirements.txt # 依赖列表(提交到git)
└── .gitignore # 包含venv/
最佳实践:
What are generators? How are they different from regular functions?
What are generators? How are they different from regular functions?
考察点:生成器原理、惰性求值。
答案:
生成器是使用 yield 关键字的特殊函数,它返回一个迭代器对象,按需生成值而不是一次性返回所有结果。与普通函数相比,生成器具有惰性求值和内存效率高的特点。
普通函数 vs 生成器:
# 普通函数 - 立即计算所有值
def fib_list(n):
result = []
a, b = 0, 1
for _ in range(n):
result.append(a)
a, b = b, a + b
return result
# 调用立即生成全部结果
numbers = fib_list(1000000) # 占用大量内存
# 生成器 - 按需生成值
def fib_generator(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
# 返回生成器对象,不立即计算
numbers = fib_generator(1000000) # 几乎不占内存
# 使用时才计算
for num in numbers:
print(num)
核心区别:
| 特性 | 普通函数 | 生成器 |
|---|---|---|
| 关键字 | return | yield |
| 返回 | 具体值 | 迭代器对象 |
| 内存 | 存储所有结果 | 只存储当前值 |
| 执行 | 一次性执行完 | 暂停和恢复 |
| 状态 | 不保存状态 | 保存执行状态 |
生成器表达式:
# 列表推导式
squares_list = [x**2 for x in range(1000000)]
# 生成器表达式(使用圆括号)
squares_gen = (x**2 for x in range(1000000))
import sys
print(sys.getsizeof(squares_list)) # ~8MB
print(sys.getsizeof(squares_gen)) # 128字节
高级用法 - send()方法:
def echo_generator():
while True:
received = yield
print(f"收到: {received}")
gen = echo_generator()
next(gen) # 启动生成器
gen.send("Hello") # 发送值到生成器
gen.send("World")
适用场景:
How to implement a context manager?
How to implement a context manager?
考察点:资源管理、异常处理。
答案:
上下文管理器通过实现 __enter__ 和 __exit__ 方法,或使用 contextlib.contextmanager 装饰器,提供资源的自动获取和释放机制。
类实现方式:
class DatabaseConnection:
def __init__(self, host, port):
self.host = host
self.port = port
self.connection = None
def __enter__(self):
"""进入with块时调用"""
print(f"连接到 {self.host}:{self.port}")
self.connection = self._connect()
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
"""离开with块时调用"""
print("关闭连接")
if self.connection:
self.connection.close()
# 异常处理
if exc_type is not None:
print(f"发生异常: {exc_type.__name__}: {exc_val}")
# 返回True表示异常已处理,不再向外传播
# 返回False或None表示异常继续传播
return False
def _connect(self):
# 实际连接逻辑
return {"connected": True}
# 使用
with DatabaseConnection('localhost', 5432) as conn:
print(f"使用连接: {conn}")
# 操作数据库
装饰器实现方式:
from contextlib import contextmanager
@contextmanager
def file_operation(filename, mode):
"""文件操作上下文管理器"""
print(f"打开文件: {filename}")
f = open(filename, mode)
try:
yield f # yield前是__enter__,yield后是__exit__
finally:
print(f"关闭文件: {filename}")
f.close()
# 使用
with file_operation('test.txt', 'w') as f:
f.write('Hello, World!')
实际应用示例:
# 1. 数据库事务管理
@contextmanager
def transaction(connection):
try:
yield connection
connection.commit()
print("事务提交")
except Exception:
connection.rollback()
print("事务回滚")
raise
with transaction(db_conn) as conn:
conn.execute("INSERT ...")
conn.execute("UPDATE ...")
# 2. 临时修改配置
@contextmanager
def temporary_setting(key, value):
old_value = settings.get(key)
settings[key] = value
try:
yield
finally:
settings[key] = old_value
with temporary_setting('DEBUG', True):
# DEBUG模式下的操作
pass
# 自动恢复原值
# 3. 性能计时
@contextmanager
def timer(name):
import time
start = time.time()
yield
duration = time.time() - start
print(f"{name} 耗时: {duration:.2f}秒")
with timer("数据处理"):
# 耗时操作
process_data()
异常处理详解:
def __exit__(self, exc_type, exc_val, exc_tb):
# exc_type: 异常类型
# exc_val: 异常实例
# exc_tb: 异常追踪信息
if exc_type is ValueError:
print(f"捕获到ValueError: {exc_val}")
return True # 异常已处理,不再传播
# 其他异常继续传播
return False
常用内置上下文管理器:
open() - 文件操作threading.Lock() - 线程锁decimal.localcontext() - 精度控制unittest.mock.patch() - 测试模拟What are the unpacking rules for *args and kwargs in Python?
**What are the unpacking rules for *args and kwargs in Python?
考察点:参数传递、参数解包。
答案:
*args 收集位置参数为元组,**kwargs 收集关键字参数为字典。Python 3中引入了keyword-only参数,改变了参数传递规则。
参数定义规则:
def func(a, b, *args, c=10, **kwargs):
print(f"a={a}, b={b}, c={c}")
print(f"args={args}")
print(f"kwargs={kwargs}")
# 参数顺序:
# 1. 位置参数 (a, b)
# 2. *args (可变位置参数)
# 3. keyword-only参数 (c)
# 4. **kwargs (可变关键字参数)
调用示例:
# 示例1
func(1, 2, 3, 4, c=5, d=6)
# 输出:
# a=1, b=2, c=5
# args=(3, 4)
# kwargs={'d': 6}
# 示例2
func(1, 2, 3, d=6)
# 输出:
# a=1, b=2, c=10 # c使用默认值
# args=(3,)
# kwargs={'d': 6}
# 示例3
func(a=1, b=2, c=3)
# 输出:
# a=1, b=2, c=3
# args=()
# kwargs={}
# 示例4
func(*[1, 2, 3], **{'c': 4, 'd': 5})
# 输出:
# a=1, b=2, c=4
# args=(3,)
# kwargs={'d': 5}
Keyword-Only参数:
def greet(name, *, greeting="Hello", punctuation="!"):
"""* 后面的参数必须通过关键字传递"""
return f"{greeting} {name}{punctuation}"
greet("Alice") # ✅ Hello Alice!
greet("Bob", greeting="Hi") # ✅ Hi Bob!
greet("Charlie", "Hey") # ❌ TypeError
解包规则:
# 列表/元组解包
args = [1, 2, 3]
func(*args) # 等价于 func(1, 2, 3)
# 字典解包
kwargs = {'a': 1, 'b': 2, 'c': 3}
func(**kwargs) # 等价于 func(a=1, b=2, c=3)
# 混合使用
func(*[1, 2], *[3, 4], **{'c': 5})
# a=1, b=2, c=5
# args=(3, 4)
实际应用:
# 1. 装饰器传递参数
def decorator(func):
def wrapper(*args, **kwargs):
print("Before")
result = func(*args, **kwargs)
print("After")
return result
return wrapper
# 2. 合并字典
def merge_dicts(**dicts):
result = {}
for d in dicts.values():
result.update(d)
return result
config = merge_dicts(
defaults={'timeout': 30},
user={'timeout': 60, 'debug': True}
)
# 3. 函数组合
def compose(*functions):
def inner(arg):
for func in reversed(functions):
arg = func(arg)
return arg
return inner
pipeline = compose(str.upper, str.strip, lambda x: x + "!")
result = pipeline(" hello ") # "HELLO!"
Python 3.8+ 仅位置参数:
def func(a, b, /, c, d, *, e, f):
"""
a, b: 仅位置参数
c, d: 位置或关键字参数
e, f: 仅关键字参数
"""
pass
func(1, 2, 3, 4, e=5, f=6) # ✅
func(1, 2, c=3, d=4, e=5, f=6) # ✅
func(a=1, b=2, c=3, d=4, e=5, f=6) # ❌
What is lazy evaluation in Django QuerySet?
What is lazy evaluation in Django QuerySet?
考察点:QuerySet惰性求值。
答案:
QuerySet的惰性求值指查询不会立即执行数据库操作,只有在实际需要数据时(迭代、切片、len()等)才会触发SQL查询,提高性能并支持查询链式组合。
惰性求值示例:
# 创建QuerySet - 不执行SQL
users = User.objects.filter(is_active=True)
print("创建QuerySet")
# 继续链式过滤 - 仍不执行SQL
users = users.filter(age__gte=18)
print("添加过滤条件")
# 添加排序 - 仍不执行SQL
users = users.order_by('name')
print("添加排序")
# 触发求值 - 执行1次SQL
for user in users:
print(user.name)
触发求值的操作:
users = User.objects.filter(is_active=True)
# 场景1: 迭代
for user in users: # 执行SQL
pass
# 场景2: 切片
first_ten = users[:10] # 执行SQL: LIMIT 10
specific_user = users[5] # 执行SQL: LIMIT 1 OFFSET 5
# 场景3: len() 或 count()
total = len(users) # 执行SQL: SELECT *
count = users.count() # 执行SQL: SELECT COUNT(*)
# 场景4: list()、bool()
user_list = list(users) # 执行SQL
exists = bool(users) # 执行SQL
# 场景5: 索引访问
first = users[0] # 执行SQL
查询缓存:
# 场景1: QuerySet被求值后会缓存结果
users = User.objects.all()
# 第一次迭代 - 执行SQL并缓存
for user in users:
print(user.name)
# 第二次迭代 - 使用缓存,不执行SQL
for user in users:
print(user.email)
# 场景2: 每次求值都执行SQL
for user in User.objects.all(): # 执行SQL
pass
for user in User.objects.all(): # 再次执行SQL
pass
多次查询示例:
users = User.objects.filter(is_active=True)
# 查询1: len()
print(len(users)) # SELECT * FROM user WHERE is_active=true
# 查询2: list()
print(list(users)) # SELECT * FROM user WHERE is_active=true
# 查询3: count()
print(users.count()) # SELECT COUNT(*) FROM user WHERE is_active=true
# 优化: 使用缓存
user_list = list(users) # 执行1次SQL
print(len(user_list)) # 使用缓存
print(user_list) # 使用缓存
实际应用优化:
# ❌ 低效 - 执行3次查询
users = User.objects.filter(is_active=True)
if len(users) > 0: # 查询1
for user in users: # 查询2
print(user)
print(users.count()) # 查询3
# ✅ 高效 - 执行1次查询
users = list(User.objects.filter(is_active=True))
if users:
for user in users:
print(user)
print(len(users))
# ✅ 或使用exists()检查
if User.objects.filter(is_active=True).exists(): # 高效的EXISTS查询
users = User.objects.filter(is_active=True)
for user in users:
print(user)
特殊情况:
# 不触发求值的操作
users = User.objects.all()
filtered = users.filter(age__gte=18) # 不执行SQL
ordered = users.order_by('name') # 不执行SQL
query_str = str(users.query) # 获取SQL但不执行
# 强制求值
users = User.objects.all()
users._fetch_all() # 内部方法,强制求值
What's the difference between select_related and prefetch_related?
What’s the difference between select_related and prefetch_related?
考察点:JOIN查询优化。
答案:
select_related 通过SQL JOIN在一次查询中获取关联对象,适用于ForeignKey和OneToOne关系。prefetch_related 通过额外的查询预取关联对象,适用于ManyToMany和反向ForeignKey关系。
模型定义:
class Author(models.Model):
name = models.CharField(max_length=100)
country = models.ForeignKey('Country', on_delete=models.CASCADE)
class Book(models.Model):
title = models.CharField(max_length=100)
author = models.ForeignKey(Author, on_delete=models.CASCADE)
class Tag(models.Model):
name = models.CharField(max_length=50)
books = models.ManyToManyField(Book)
select_related - 单次JOIN查询:
# ❌ N+1查询
books = Book.objects.all()
for book in books:
print(book.author.name) # 每本书1次查询
# ✅ 使用select_related - 1次查询
books = Book.objects.select_related('author').all()
for book in books:
print(book.author.name) # 不再查询
# 生成的SQL(简化)
# SELECT book.*, author.*
# FROM book
# INNER JOIN author ON book.author_id = author.id
# 多层关联
books = Book.objects.select_related('author__country').all()
# SELECT book.*, author.*, country.*
# FROM book
# INNER JOIN author ON book.author_id = author.id
# INNER JOIN country ON author.country_id = country.id
prefetch_related - 额外查询:
# ❌ N+1查询
books = Book.objects.all()
for book in books:
print(book.tag_set.all()) # 每本书1次查询
# ✅ 使用prefetch_related - 2次查询
books = Book.objects.prefetch_related('tag_set').all()
for book in books:
print(book.tag_set.all()) # 不再查询
# 执行的SQL(2次查询)
# 查询1: SELECT * FROM book
# 查询2: SELECT tag.*, book_tag.book_id
# FROM tag
# INNER JOIN book_tag ON tag.id = book_tag.tag_id
# WHERE book_tag.book_id IN (1, 2, 3, ...)
# Python在内存中组装关联
性能对比:
# 场景:100本书,每本书有1个作者,5个标签
# 方案1: 不优化 - 201次查询
books = Book.objects.all() # 1次
for book in books:
print(book.author.name) # 100次
print(book.tag_set.all()) # 100次
# 方案2: 使用select_related - 101次查询
books = Book.objects.select_related('author').all() # 1次
for book in books:
print(book.author.name) # 0次
print(book.tag_set.all()) # 100次
# 方案3: 组合使用 - 2次查询
books = Book.objects.select_related('author') \
.prefetch_related('tag_set') \
.all()
for book in books:
print(book.author.name) # 0次
print(book.tag_set.all()) # 0次
高级用法 - Prefetch对象:
from django.db.models import Prefetch
# 自定义预取查询
books = Book.objects.prefetch_related(
Prefetch(
'tag_set',
queryset=Tag.objects.filter(name__startswith='Python')
.order_by('name')
)
).all()
# 预取并赋值到自定义属性
books = Book.objects.prefetch_related(
Prefetch(
'tag_set',
queryset=Tag.objects.filter(active=True),
to_attr='active_tags'
)
).all()
for book in books:
print(book.active_tags) # 使用预取的活跃标签
选择指南:
| 关系类型 | 推荐方法 | 查询次数 | SQL类型 |
|---|---|---|---|
| ForeignKey | select_related | 1次 | JOIN |
| OneToOneField | select_related | 1次 | JOIN |
| ManyToManyField | prefetch_related | 2次 | IN查询 |
| 反向ForeignKey | prefetch_related | 2次 | IN查询 |
最佳实践:
# ✅ 组合使用
books = Book.objects.select_related('author', 'publisher') \
.prefetch_related('tags', 'reviews')
# ✅ 只预取需要的字段
books = Book.objects.select_related('author') \
.only('title', 'author__name')
# ❌ 避免过度预取
books = Book.objects.prefetch_related('tags__creator__profile__settings')
What is the purpose of related_name in Django models?
What is the purpose of related_name in Django models?
考察点:反向关系、related_name。
答案:
related_name 定义从关联模型反向访问当前模型的属性名。当一个模型有多个外键指向同一个模型时,必须使用 related_name 避免冲突。
基本用法:
class Author(models.Model):
name = models.CharField(max_length=100)
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(
Author,
on_delete=models.CASCADE,
related_name='articles' # 自定义反向访问名
)
# 使用
author = Author.objects.get(id=1)
articles = author.articles.all() # 使用 related_name
# 默认是 author.article_set.all()
多个外键冲突问题:
# ❌ 错误 - 会产生冲突
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(User, on_delete=models.CASCADE)
reviewer = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
# 错误信息:
# fields.E304: Reverse accessor for 'Article.author' clashes with
# reverse accessor for 'Article.reviewer'.
# ✅ 正确 - 使用 related_name
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='authored_articles'
)
reviewer = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='reviewed_articles'
)
# 使用
user = User.objects.get(id=1)
authored = user.authored_articles.all() # 作为作者的文章
reviewed = user.reviewed_articles.all() # 作为审核者的文章
禁用反向关系:
class Article(models.Model):
author = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='+' # 禁用反向访问
)
# user.article_set # ❌ AttributeError
related_query_name:
class Article(models.Model):
author = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='articles',
related_query_name='article' # 用于查询过滤
)
# 使用 related_query_name 进行过滤
users = User.objects.filter(article__published=True)
# 而不是 User.objects.filter(articles__published=True)
实际应用场景:
# 电商订单系统
class Order(models.Model):
customer = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='orders_as_customer'
)
seller = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='orders_as_seller'
)
# 查询
user = User.objects.get(id=1)
my_purchases = user.orders_as_customer.all()
my_sales = user.orders_as_seller.all()
How to optimize Django queries using only() and defer()?
How to optimize Django queries using only() and defer()?
考察点:字段级查询优化。
答案:
only() 指定只查询特定字段,defer() 指定延迟加载某些字段。两者都通过减少查询的列数来优化性能,特别是当表有大字段(如TextField)时。
only() - 只查询指定字段:
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField() # 大字段
summary = models.CharField(max_length=500)
author = models.CharField(max_length=100)
created_at = models.DateTimeField(auto_now_add=True)
# ❌ 查询所有字段
articles = Article.objects.all()
# SQL: SELECT id, title, content, summary, author, created_at FROM article
# ✅ 只查询需要的字段
articles = Article.objects.only('title', 'author')
# SQL: SELECT id, title, author FROM article
for article in articles:
print(article.title) # ✅ 不触发额外查询
print(article.author) # ✅ 不触发额外查询
print(article.content) # ❌ 触发额外查询获取该字段
defer() - 延迟加载指定字段:
# 延迟加载大字段
articles = Article.objects.defer('content')
# SQL: SELECT id, title, summary, author, created_at FROM article
# 不包括 content
for article in articles:
print(article.title) # ✅ 不触发额外查询
print(article.content) # ❌ 触发额外查询
# SQL: SELECT content FROM article WHERE id = ?
性能对比:
# 场景:列表页只需要标题和摘要
# 方案1:查询所有字段(假设content平均100KB)
articles = Article.objects.all()[:100]
# 传输数据量:100条 × 100KB = ~10MB
# 方案2:只查询需要的字段
articles = Article.objects.only('title', 'summary')[:100]
# 传输数据量:100条 × 1KB = ~100KB
# 性能提升:100倍
组合使用:
# 与select_related组合
articles = Article.objects.select_related('author') \
.only('title', 'author__name')
# SQL: SELECT article.id, article.title, author.name
# FROM article
# INNER JOIN author ON article.author_id = author.id
# 与prefetch_related组合
articles = Article.objects.prefetch_related('tags') \
.defer('content')
实际应用示例:
# 1. 列表页 - 只需要摘要信息
def article_list(request):
articles = Article.objects.only(
'id', 'title', 'summary', 'created_at'
).order_by('-created_at')[:20]
return render(request, 'list.html', {'articles': articles})
# 2. 详情页 - 需要完整信息
def article_detail(request, pk):
article = Article.objects.get(pk=pk) # 查询所有字段
return render(request, 'detail.html', {'article': article})
# 3. API序列化 - 按需加载
class ArticleListSerializer(serializers.ModelSerializer):
class Meta:
model = Article
fields = ['id', 'title', 'summary']
@classmethod
def get_queryset(cls):
return Article.objects.only('id', 'title', 'summary')
注意事项:
# ❌ 避免:访问延迟字段导致N+1查询
articles = Article.objects.defer('content')
for article in articles:
print(article.content) # 每篇文章1次查询
# ✅ 正确:如果需要该字段,就不要defer
articles = Article.objects.all()
for article in articles:
print(article.content) # 一次性获取
# ❌ 避免:过度优化
articles = Article.objects.only('title')
# 如果后续需要多个字段,会产生多次查询
# ✅ 正确:根据实际需求优化
articles = Article.objects.only('title', 'summary', 'author')
最佳实践:
only() 或 defer() 排除大字段select_relatedWhat does as_view() do in Django CBV?
What does as_view() do in Django CBV?
考察点:CBV原理、方法解析顺序。
答案:
as_view() 是类视图的入口点,它返回一个可调用的视图函数,将URL请求路由到类的相应方法。它负责实例化类、调度请求到正确的HTTP方法处理器。
工作流程:
# urls.py
from django.urls import path
from .views import BookListView
urlpatterns = [
path('books/', BookListView.as_view(), name='book-list'),
]
# as_view() 做了什么:
# 1. 返回一个 view 函数
# 2. view 函数被调用时:
# - 创建类的实例
# - 调用 setup() 初始化
# - 调用 dispatch() 分发请求
# - dispatch() 根据 request.method 调用对应方法
源码简化版本:
class View:
@classmethod
def as_view(cls, **initkwargs):
"""返回一个视图函数"""
def view(request, *args, **kwargs):
# 创建类实例
self = cls(**initkwargs)
# 设置请求属性
self.setup(request, *args, **kwargs)
# 分发到对应的HTTP方法
return self.dispatch(request, *args, **kwargs)
# 保留类的属性
view.view_class = cls
view.view_initkwargs = initkwargs
return view
def dispatch(self, request, *args, **kwargs):
"""根据HTTP方法分发请求"""
# 获取小写的HTTP方法名
method = request.method.lower()
# 检查是否允许该方法
if method in self.http_method_names:
handler = getattr(self, method, self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
return handler(request, *args, **kwargs)
完整示例:
from django.views.generic import ListView
from .models import Book
class BookListView(ListView):
model = Book
template_name = 'books/list.html'
context_object_name = 'books'
paginate_by = 10
def get_queryset(self):
"""自定义查询集"""
queryset = super().get_queryset()
# 添加过滤逻辑
category = self.request.GET.get('category')
if category:
queryset = queryset.filter(category=category)
return queryset
def get_context_data(self, **kwargs):
"""添加额外的上下文"""
context = super().get_context_data(**kwargs)
context['categories'] = Category.objects.all()
context['total_books'] = Book.objects.count()
return context
# 请求处理流程:
# 1. URL匹配到 BookListView.as_view()
# 2. as_view() 返回的 view 函数被调用
# 3. view 函数创建 BookListView 实例
# 4. 调用 dispatch(request)
# 5. dispatch 发现是 GET 请求,调用 get() 方法
# 6. get() 调用 get_queryset() 获取数据
# 7. get() 调用 get_context_data() 准备上下文
# 8. 渲染模板并返回响应
方法解析顺序(MRO):
class BookListView(ListView):
def get(self, request, *args, **kwargs):
"""GET请求处理流程"""
# 1. 获取查询集
self.object_list = self.get_queryset()
# 2. 允许空查询集
allow_empty = self.get_allow_empty()
# 3. 获取上下文数据
context = self.get_context_data()
# 4. 渲染响应
return self.render_to_response(context)
# MRO链:
# BookListView → ListView → MultipleObjectMixin →
# ContextMixin → View → object
自定义dispatch:
class ProtectedView(ListView):
def dispatch(self, request, *args, **kwargs):
"""在分发前添加逻辑"""
# 权限检查
if not request.user.is_authenticated:
return redirect('login')
# 记录访问
log_access(request.user, request.path)
# 调用父类dispatch
return super().dispatch(request, *args, **kwargs)
传递初始化参数:
# urls.py
urlpatterns = [
path('books/', BookListView.as_view(
template_name='custom_list.html',
paginate_by=20
)),
]
# 等价于
class CustomBookListView(BookListView):
template_name = 'custom_list.html'
paginate_by = 20
urlpatterns = [
path('books/', CustomBookListView.as_view()),
]
装饰器使用:
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
# 方式1:在as_view()上装饰
urlpatterns = [
path('books/', login_required(BookListView.as_view())),
]
# 方式2:使用method_decorator
@method_decorator(login_required, name='dispatch')
class BookListView(ListView):
model = Book
# 方式3:装饰特定方法
class BookListView(ListView):
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
What's the difference between JsonResponse and HttpResponse?
What’s the difference between JsonResponse and HttpResponse?
考察点:HTTP响应类型、Content-Type。
答案:
JsonResponse 是 HttpResponse 的子类,专门用于返回JSON数据。它自动序列化Python对象为JSON,并设置正确的 Content-Type 头为 application/json。
基本对比:
from django.http import JsonResponse, HttpResponse
import json
# HttpResponse - 需要手动处理
def api_view_http(request):
data = {
'status': 'success',
'data': {'id': 1, 'name': 'Alice'},
'count': 100
}
# 手动序列化
json_string = json.dumps(data)
# 手动设置Content-Type
return HttpResponse(
json_string,
content_type='application/json'
)
# JsonResponse - 自动处理
def api_view_json(request):
data = {
'status': 'success',
'data': {'id': 1, 'name': 'Alice'},
'count': 100
}
# 自动序列化和设置Content-Type
return JsonResponse(data)
JsonResponse特性:
from django.http import JsonResponse
# 1. 基本用法
def user_api(request):
return JsonResponse({
'username': 'alice',
'email': '[email protected]'
})
# 响应头:Content-Type: application/json
# 响应体:{"username": "alice", "email": "[email protected]"}
# 2. 返回列表(需要设置safe=False)
def users_list(request):
users = [
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'}
]
return JsonResponse(users, safe=False)
# 默认safe=True只允许dict类型
# 3. 自定义状态码
def not_found(request):
return JsonResponse(
{'error': 'Not found'},
status=404
)
# 4. 自定义JSON编码器
from django.core.serializers.json import DjangoJSONEncoder
from decimal import Decimal
from datetime import datetime
def custom_encoder(request):
data = {
'price': Decimal('19.99'),
'created': datetime.now(),
}
return JsonResponse(
data,
encoder=DjangoJSONEncoder # 处理Decimal、datetime等类型
)
HttpResponse用于其他内容类型:
from django.http import HttpResponse, FileResponse
# 1. HTML响应
def html_view(request):
html = '<h1>Hello World</h1>'
return HttpResponse(html, content_type='text/html')
# 2. 纯文本
def text_view(request):
return HttpResponse('Plain text', content_type='text/plain')
# 3. CSV文件下载
def export_csv(request):
import csv
from io import StringIO
output = StringIO()
writer = csv.writer(output)
writer.writerow(['Name', 'Age'])
writer.writerow(['Alice', 25])
response = HttpResponse(
output.getvalue(),
content_type='text/csv'
)
response['Content-Disposition'] = 'attachment; filename="data.csv"'
return response
# 4. 文件下载(使用FileResponse更高效)
def download_file(request):
file_path = '/path/to/file.pdf'
return FileResponse(
open(file_path, 'rb'),
content_type='application/pdf',
as_attachment=True,
filename='document.pdf'
)
实际API开发示例:
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
import json
@require_http_methods(["GET", "POST"])
def user_api(request):
if request.method == 'GET':
# 获取用户列表
users = User.objects.values('id', 'username', 'email')
return JsonResponse({
'status': 'success',
'data': list(users),
'count': users.count()
})
elif request.method == 'POST':
# 创建用户
try:
data = json.loads(request.body)
user = User.objects.create(
username=data['username'],
email=data['email']
)
return JsonResponse({
'status': 'success',
'data': {
'id': user.id,
'username': user.username
}
}, status=201)
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, status=400)
错误处理:
from django.http import JsonResponse
from django.core.exceptions import ObjectDoesNotExist
def get_user(request, user_id):
try:
user = User.objects.get(id=user_id)
return JsonResponse({
'id': user.id,
'username': user.username
})
except ObjectDoesNotExist:
return JsonResponse({
'error': 'User not found'
}, status=404)
except Exception as e:
return JsonResponse({
'error': 'Internal server error'
}, status=500)
响应头对比:
# JsonResponse
response = JsonResponse({'key': 'value'})
# Content-Type: application/json
# HttpResponse
response = HttpResponse('text')
# Content-Type: text/html; charset=utf-8 (默认)
# 自定义响应头
response = JsonResponse({'key': 'value'})
response['X-Custom-Header'] = 'custom value'
response['Access-Control-Allow-Origin'] = '*'
How does Django's signal mechanism work?
How does Django’s signal mechanism work?
考察点:信号机制、事务处理。
答案:
Django信号允许解耦的应用程序在框架的特定操作发生时得到通知。发送者广播信号,接收者注册回调函数处理信号,实现松耦合的事件驱动架构。
基本使用:
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
from django.contrib.auth.models import User
# 方式1:使用装饰器注册
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""用户创建后自动创建Profile"""
if created:
Profile.objects.create(user=instance)
print(f"为用户 {instance.username} 创建了Profile")
# 方式2:手动连接
def user_saved_handler(sender, instance, **kwargs):
print(f"用户 {instance.username} 已保存")
post_save.connect(user_saved_handler, sender=User)
常用内置信号:
# 模型信号
from django.db.models.signals import (
pre_init, # 模型__init__()前
post_init, # 模型__init__()后
pre_save, # 模型save()前
post_save, # 模型save()后
pre_delete, # 模型delete()前
post_delete, # 模型delete()后
m2m_changed, # ManyToMany字段改变时
)
# 请求/响应信号
from django.core.signals import (
request_started,
request_finished,
)
# 数据库信号
from django.db.backends.signals import connection_created
信号处理中的常见问题:
from django.db import transaction
@receiver(post_save, sender=User)
def problematic_handler(sender, instance, created, **kwargs):
if created:
# 问题1:在事务中触发
Profile.objects.create(user=instance)
# 问题2:发送邮件失败会导致用户创建失败吗?
send_welcome_email(instance.email) # 如果失败会回滚吗?
# 问题3:递归触发
instance.last_login = timezone.now()
instance.save() # 又触发post_save信号!
# 事务问题示例
@transaction.atomic
def create_user(username, email):
user = User.objects.create(username=username, email=email)
# post_save信号在这里触发,还在事务中
# 如果后续代码出错,信号中的操作会回滚吗?
raise Exception("Something went wrong")
正确处理方式:
# 1. 使用post_commit信号(Django 3.2+)
from django.db.transaction import on_commit
@receiver(post_save, sender=User)
def send_welcome_email_safe(sender, instance, created, **kwargs):
if created:
# 等待事务提交后再发送邮件
on_commit(lambda: send_welcome_email(instance.email))
# 2. 避免递归触发
@receiver(post_save, sender=User)
def avoid_recursion(sender, instance, created, **kwargs):
if created:
# 使用update避免触发信号
User.objects.filter(pk=instance.pk).update(
last_login=timezone.now()
)
# 或使用update_fields
# instance.last_login = timezone.now()
# instance.save(update_fields=['last_login'])
# 3. 条件性连接信号
@receiver(post_save, sender=User)
def conditional_handler(sender, instance, created, **kwargs):
# 使用标志避免重复处理
if hasattr(instance, '_signal_processed'):
return
instance._signal_processed = True
# 处理逻辑
ManyToMany信号:
from django.db.models.signals import m2m_changed
@receiver(m2m_changed, sender=Article.tags.through)
def tags_changed(sender, instance, action, **kwargs):
"""
action参数可能的值:
- pre_add: 添加前
- post_add: 添加后
- pre_remove: 删除前
- post_remove: 删除后
- pre_clear: 清空前
- post_clear: 清空后
"""
if action == 'post_add':
print(f"文章 {instance.title} 添加了标签")
elif action == 'post_remove':
print(f"文章 {instance.title} 移除了标签")
自定义信号:
from django.dispatch import Signal
# 定义自定义信号
order_placed = Signal() # providing_args=['order', 'user']
# 发送信号
def place_order(order_data):
order = Order.objects.create(**order_data)
# 发送信号
order_placed.send(
sender=Order,
order=order,
user=order_data['user']
)
return order
# 接收信号
@receiver(order_placed)
def update_inventory(sender, order, user, **kwargs):
"""订单创建后更新库存"""
for item in order.items.all():
item.product.stock -= item.quantity
item.product.save()
@receiver(order_placed)
def send_confirmation(sender, order, user, **kwargs):
"""发送订单确认邮件"""
send_email(user.email, f"订单 {order.id} 已确认")
性能考虑:
# ❌ 信号过度使用
@receiver(post_save, sender=Article)
def update_related_data(sender, instance, **kwargs):
# 每次保存都执行复杂操作
update_statistics()
rebuild_search_index()
clear_all_caches()
# 严重影响性能
# ✅ 考虑异步处理
from celery import shared_task
@shared_task
def async_update_statistics():
update_statistics()
@receiver(post_save, sender=Article)
def schedule_update(sender, instance, **kwargs):
# 将耗时操作放入队列
async_update_statistics.delay()
最佳实践:
on_commitHow does Django middleware execution flow work?
How does Django middleware execution flow work?
考察点:中间件生命周期、洋葱模型。
答案:
Django中间件采用"洋葱模型",请求从外层中间件进入,响应从内层中间件返回。每个中间件可以在请求处理前后执行代码,形成请求-响应的处理链。
中间件执行顺序:
请求流程:
浏览器 → 中间件1 → 中间件2 → 中间件3 → 视图
↓ ↓ ↓
__init__ __init__ __init__
__call__ __call__ __call__
响应流程:
浏览器 ← 中间件1 ← 中间件2 ← 中间件3 ← 视图
↑ ↑ ↑
__call__ __call__ __call__
完整的中间件实现:
import time
class TimingMiddleware:
"""记录请求处理时间的中间件"""
def __init__(self, get_response):
"""中间件初始化,只执行一次"""
self.get_response = get_response
print("TimingMiddleware initialized")
def __call__(self, request):
"""每个请求都会调用"""
# ===== 请求进入阶段 =====
print(f"[进入] 处理请求: {request.path}")
start_time = time.time()
# 调用下一个中间件或视图
response = self.get_response(request)
# ===== 响应返回阶段 =====
duration = time.time() - start_time
print(f"[离开] 处理完成: {request.path}, 耗时: {duration:.2f}s")
# 添加自定义响应头
response['X-Request-Duration'] = f"{duration:.2f}"
return response
def process_exception(self, request, exception):
"""视图抛出异常时调用"""
print(f"[异常] {request.path}: {exception}")
# 返回None继续传播异常
# 返回HttpResponse终止异常传播
return None
中间件配置顺序:
# settings.py
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware', # 1. 安全
'django.contrib.sessions.middleware.SessionMiddleware', # 2. Session
'django.middleware.common.CommonMiddleware', # 3. 通用
'django.middleware.csrf.CsrfViewMiddleware', # 4. CSRF
'django.contrib.auth.middleware.AuthenticationMiddleware', # 5. 认证
'django.contrib.messages.middleware.MessageMiddleware', # 6. 消息
'django.middleware.clickjacking.XFrameOptionsMiddleware', # 7. 点击劫持
'myapp.middleware.TimingMiddleware', # 8. 自定义
]
# 执行顺序:
# 请求: 1→2→3→4→5→6→7→8→视图
# 响应: 视图→8→7→6→5→4→3→2→1
完整执行流程示例:
# 中间件1
class Middleware1:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
print("1: 请求进入")
response = self.get_response(request)
print("1: 响应离开")
return response
# 中间件2
class Middleware2:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
print("2: 请求进入")
response = self.get_response(request)
print("2: 响应离开")
return response
# 视图
def my_view(request):
print("视图: 处理请求")
return HttpResponse("OK")
# 输出顺序:
# 1: 请求进入
# 2: 请求进入
# 视图: 处理请求
# 2: 响应离开
# 1: 响应离开
process_view和process_exception:
class AdvancedMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 请求阶段
response = self.get_response(request)
# 响应阶段
return response
def process_view(self, request, view_func, view_args, view_kwargs):
"""在视图函数调用之前"""
print(f"即将调用视图: {view_func.__name__}")
# 返回None继续
# 返回HttpResponse跳过视图
return None
def process_exception(self, request, exception):
"""视图抛出异常时"""
if isinstance(exception, ValueError):
return JsonResponse({
'error': 'Invalid value'
}, status=400)
return None
def process_template_response(self, request, response):
"""处理TemplateResponse"""
if hasattr(response, 'context_data'):
response.context_data['extra'] = 'Added by middleware'
return response
实际应用示例:
# 1. 认证中间件
class JWTAuthMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 从请求头获取token
token = request.META.get('HTTP_AUTHORIZATION', '').replace('Bearer ', '')
if token:
try:
# 验证token
payload = jwt.decode(token, settings.SECRET_KEY)
request.user_id = payload['user_id']
except jwt.InvalidTokenError:
return JsonResponse({'error': 'Invalid token'}, status=401)
response = self.get_response(request)
return response
# 2. 请求日志中间件
class RequestLoggingMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
import logging
logger = logging.getLogger('django.request')
# 记录请求信息
logger.info(f"{request.method} {request.path} "
f"from {request.META.get('REMOTE_ADDR')}")
response = self.get_response(request)
# 记录响应状态
logger.info(f"Response: {response.status_code}")
return response
# 3. CORS中间件
class CORSMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 添加CORS头
response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
response['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
中间件顺序的重要性:
# ❌ 错误顺序
MIDDLEWARE = [
'AuthenticationMiddleware', # 依赖Session
'SessionMiddleware', # Session必须在前
]
# ✅ 正确顺序
MIDDLEWARE = [
'SessionMiddleware', # 先处理Session
'AuthenticationMiddleware', # 再处理认证
]
性能考虑:
# ❌ 低效 - 每个请求都执行耗时操作
class SlowMiddleware:
def __call__(self, request):
slow_operation() # 影响所有请求
return self.get_response(request)
# ✅ 高效 - 条件性执行
class ConditionalMiddleware:
def __call__(self, request):
if request.path.startswith('/api/'):
check_api_auth(request)
return self.get_response(request)
How to implement a JWT authentication middleware?
How to implement a JWT authentication middleware?
考察点:认证机制、JWT原理。
答案:
JWT(JSON Web Token)认证中间件通过验证请求头中的token来识别用户身份,实现无状态的认证机制。与Session相比,JWT不需要服务器端存储,适合分布式系统和微服务架构。
JWT基本结构:
JWT = Header.Payload.Signature
Header: {"alg": "HS256", "typ": "JWT"}
Payload: {"user_id": 123, "exp": 1234567890}
Signature: HMACSHA256(base64(header) + "." + base64(payload), secret)
完整的JWT中间件实现:
import jwt
from django.conf import settings
from django.http import JsonResponse
from django.contrib.auth.models import User
class JWTAuthenticationMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.exempt_paths = ['/api/login/', '/api/register/']
def __call__(self, request):
# 跳过豁免路径
if request.path in self.exempt_paths:
return self.get_response(request)
# 获取Authorization头
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({
'error': 'Missing or invalid authorization header'
}, status=401)
# 提取token
token = auth_header.replace('Bearer ', '')
try:
# 验证并解码token
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=['HS256']
)
# 获取用户
user_id = payload.get('user_id')
user = User.objects.get(id=user_id)
# 将用户附加到request
request.user = user
request.user_id = user_id
except jwt.ExpiredSignatureError:
return JsonResponse({
'error': 'Token has expired'
}, status=401)
except jwt.InvalidTokenError:
return JsonResponse({
'error': 'Invalid token'
}, status=401)
except User.DoesNotExist:
return JsonResponse({
'error': 'User not found'
}, status=401)
response = self.get_response(request)
return response
生成JWT Token:
import jwt
from datetime import datetime, timedelta
from django.conf import settings
def generate_jwt_token(user):
"""为用户生成JWT token"""
payload = {
'user_id': user.id,
'username': user.username,
'exp': datetime.utcnow() + timedelta(days=7), # 7天过期
'iat': datetime.utcnow(), # 签发时间
}
token = jwt.encode(
payload,
settings.SECRET_KEY,
algorithm='HS256'
)
return token
# 登录视图
from django.contrib.auth import authenticate
def login_view(request):
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('password')
user = authenticate(username=username, password=password)
if user:
token = generate_jwt_token(user)
return JsonResponse({
'token': token,
'user': {
'id': user.id,
'username': user.username
}
})
else:
return JsonResponse({
'error': 'Invalid credentials'
}, status=401)
JWT vs Session对比:
# Session认证流程
"""
1. 用户登录 → 服务器创建session → 返回session_id cookie
2. 后续请求带cookie → 服务器查询session → 验证身份
3. 需要服务器端存储session数据
4. 单点登录需要共享session存储
"""
# JWT认证流程
"""
1. 用户登录 → 服务器生成JWT → 返回token
2. 后续请求带token → 服务器验证签名 → 验证身份
3. 无需服务器端存储
4. 天然支持分布式系统
"""
# 优缺点对比
"""
Session优点:
- 服务器可控,可随时撤销
- 数据存储更安全
- 支持复杂的权限逻辑
Session缺点:
- 需要服务器端存储
- 分布式系统复杂
- CSRF风险
JWT优点:
- 无状态,易扩展
- 跨域友好
- 移动端友好
- 减少数据库查询
JWT缺点:
- 无法主动撤销
- Payload不能存敏感信息
- Token较大
"""
刷新Token机制:
def generate_tokens(user):
"""生成访问token和刷新token"""
# 访问token(短期)
access_token = jwt.encode({
'user_id': user.id,
'type': 'access',
'exp': datetime.utcnow() + timedelta(minutes=15)
}, settings.SECRET_KEY, algorithm='HS256')
# 刷新token(长期)
refresh_token = jwt.encode({
'user_id': user.id,
'type': 'refresh',
'exp': datetime.utcnow() + timedelta(days=30)
}, settings.SECRET_KEY, algorithm='HS256')
return access_token, refresh_token
def refresh_token_view(request):
"""使用刷新token获取新的访问token"""
refresh_token = request.POST.get('refresh_token')
try:
payload = jwt.decode(
refresh_token,
settings.SECRET_KEY,
algorithms=['HS256']
)
if payload.get('type') != 'refresh':
return JsonResponse({'error': 'Invalid token type'}, status=400)
user = User.objects.get(id=payload['user_id'])
new_access_token = generate_jwt_token(user)
return JsonResponse({'access_token': new_access_token})
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return JsonResponse({'error': 'Invalid refresh token'}, status=401)
使用Django REST Framework的JWT:
# 使用djangorestframework-simplejwt
from rest_framework_simplejwt.views import (
TokenObtainPairView,
TokenRefreshView,
)
# urls.py
urlpatterns = [
path('api/token/', TokenObtainPairView.as_view()),
path('api/token/refresh/', TokenRefreshView.as_view()),
]
# settings.py
from datetime import timedelta
SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': timedelta(minutes=15),
'REFRESH_TOKEN_LIFETIME': timedelta(days=30),
'ROTATE_REFRESH_TOKENS': True,
'BLACKLIST_AFTER_ROTATION': True,
}
How to implement object-level permissions?
How to implement object-level permissions?
考察点:权限系统、对象级权限。
答案:
Django默认只提供模型级权限(add, change, delete, view),对象级权限需要自己实现,确保用户只能操作自己有权限的特定对象。
Django内置权限系统:
from django.contrib.auth.decorators import permission_required
from django.contrib.auth.models import Permission
# 模型级权限
@permission_required('blog.add_article')
def create_article(request):
# 用户需要有"添加文章"权限
pass
@permission_required('blog.change_article')
def edit_article(request, pk):
# 用户需要有"修改文章"权限
pass
# 检查权限
if request.user.has_perm('blog.change_article'):
# 有权限
pass
# 授予权限
from django.contrib.auth.models import User, Permission
user = User.objects.get(username='alice')
permission = Permission.objects.get(codename='change_article')
user.user_permissions.add(permission)
实现对象级权限:
# 方案1:手动检查
from django.shortcuts import get_object_or_404
from django.http import HttpResponseForbidden
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(User, on_delete=models.CASCADE)
def edit_article(request, pk):
article = get_object_or_404(Article, pk=pk)
# 只有作者可以编辑
if request.user != article.author:
return HttpResponseForbidden("You can only edit your own articles")
# 编辑逻辑
if request.method == 'POST':
article.title = request.POST.get('title')
article.save()
return redirect('article_detail', pk=pk)
return render(request, 'edit.html', {'article': article})
# 方案2:装饰器
from functools import wraps
def author_required(model_class, pk_param='pk'):
"""只允许作者访问"""
def decorator(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
pk = kwargs.get(pk_param)
obj = get_object_or_404(model_class, pk=pk)
if obj.author != request.user:
return HttpResponseForbidden()
return view_func(request, *args, **kwargs)
return wrapper
return decorator
@author_required(Article)
def edit_article(request, pk):
article = Article.objects.get(pk=pk)
# 已经验证过权限
pass
使用django-guardian实现对象权限:
# 安装: pip install django-guardian
# settings.py
AUTHENTICATION_BACKENDS = (
'django.contrib.auth.backends.ModelBackend',
'guardian.backends.ObjectPermissionBackend',
)
# models.py
from guardian.shortcuts import assign_perm, remove_perm, get_objects_for_user
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(User, on_delete=models.CASCADE)
class Meta:
permissions = [
('view_article', 'Can view article'),
('edit_article', 'Can edit article'),
]
# 授予对象级权限
article = Article.objects.create(title="My Article", author=user)
assign_perm('view_article', user, article)
assign_perm('edit_article', user, article)
# 检查对象级权限
from guardian.shortcuts import get_perms
user_perms = get_perms(user, article)
# ['view_article', 'edit_article']
if user.has_perm('view_article', article):
print("User can view this article")
# 移除权限
remove_perm('edit_article', user, article)
# 获取用户有权限的所有对象
articles = get_objects_for_user(
user,
'blog.view_article',
klass=Article
)
DRF中的对象级权限:
from rest_framework import permissions
class IsAuthorOrReadOnly(permissions.BasePermission):
"""
对象级权限:
- 读取权限:所有人
- 写入权限:仅作者
"""
def has_object_permission(self, request, view, obj):
# 读取权限允许所有人
if request.method in permissions.SAFE_METHODS:
return True
# 写入权限仅作者
return obj.author == request.user
# 使用
from rest_framework import viewsets
class ArticleViewSet(viewsets.ModelViewSet):
queryset = Article.objects.all()
serializer_class = ArticleSerializer
permission_classes = [IsAuthorOrReadOnly]
def perform_create(self, serializer):
# 创建时自动设置作者
serializer.save(author=self.request.user)
复杂权限场景:
class Document(models.Model):
title = models.CharField(max_length=200)
owner = models.ForeignKey(User, related_name='owned_documents', on_delete=models.CASCADE)
shared_with = models.ManyToManyField(User, through='DocumentPermission')
class DocumentPermission(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
document = models.ForeignKey(Document, on_delete=models.CASCADE)
can_view = models.BooleanField(default=False)
can_edit = models.BooleanField(default=False)
can_delete = models.BooleanField(default=False)
class Meta:
unique_together = ['user', 'document']
# 检查权限
def can_edit_document(user, document):
# 所有者可以编辑
if document.owner == user:
return True
# 检查共享权限
try:
perm = DocumentPermission.objects.get(
user=user,
document=document
)
return perm.can_edit
except DocumentPermission.DoesNotExist:
return False
# 视图中使用
def edit_document(request, pk):
document = get_object_or_404(Document, pk=pk)
if not can_edit_document(request.user, document):
return HttpResponseForbidden()
# 编辑逻辑
pass
最佳实践:
# 1. 在QuerySet级别过滤
class ArticleListView(ListView):
def get_queryset(self):
# 只返回用户有权限的文章
return Article.objects.filter(
models.Q(author=self.request.user) |
models.Q(shared_with=self.request.user)
)
# 2. 使用Mixin
class OwnerRequiredMixin:
def dispatch(self, request, *args, **kwargs):
obj = self.get_object()
if obj.author != request.user:
return HttpResponseForbidden()
return super().dispatch(request, *args, **kwargs)
class ArticleUpdateView(OwnerRequiredMixin, UpdateView):
model = Article
# 3. 结合模型级和对象级权限
@permission_required('blog.change_article') # 模型级
def edit_article(request, pk):
article = get_object_or_404(Article, pk=pk)
# 对象级
if article.author != request.user:
return HttpResponseForbidden()
pass
What are the Django session storage backends? What are their characteristics?
What are the Django session storage backends? What are their characteristics?
考察点:Session存储策略。
答案:
Django提供多种Session存储后端,包括数据库、缓存、文件和Cookie。不同后端适用于不同场景,需要根据性能、持久化、安全性需求选择。
所有Session后端配置:
# settings.py
# 1. 数据库存储(默认)
SESSION_ENGINE = 'django.contrib.sessions.backends.db'
# 2. 缓存存储
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'
# 3. 缓存+数据库(推荐)
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
# 4. 文件存储
SESSION_ENGINE = 'django.contrib.sessions.backends.file'
SESSION_FILE_PATH = '/var/django/session'
# 5. Cookie存储
SESSION_ENGINE = 'django.contrib.sessions.backends.signed_cookies'
后端对比分析:
"""
1. 数据库存储 (db)
优点:
- 持久化存储
- 数据可靠
- 支持QuerySet查询
- 默认选项,零配置
缺点:
- 每次请求都查询数据库
- 高并发性能差
- 需要定期清理过期session
适用场景:
- 小型项目
- Session数据需要持久化
- 不需要极高性能
配置:
SESSION_ENGINE = 'django.contrib.sessions.backends.db'
"""
"""
2. 缓存存储 (cache)
优点:
- 性能极高
- 支持分布式
- 自动过期清理
缺点:
- 缓存清空则session丢失
- 不持久化
- 可能被LRU淘汰
适用场景:
- 高并发系统
- Session数据不重要
- 有Redis/Memcached
配置:
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
}
}
"""
"""
3. 缓存+数据库 (cached_db) ⭐ 推荐
优点:
- 读取快速(缓存)
- 数据可靠(数据库备份)
- 两全其美
缺点:
- 写入需要同步两个存储
- 配置稍复杂
适用场景:
- 生产环境首选
- 需要性能和可靠性
- 有缓存系统
工作原理:
- 读:先查缓存,缓存miss查数据库
- 写:同时写缓存和数据库
- 缓存清空不影响session
配置:
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
"""
"""
4. 文件存储 (file)
优点:
- 持久化
- 不需要数据库
- 部署简单
缺点:
- 并发性能差
- 文件锁问题
- 难以分布式
适用场景:
- 开发测试
- 单服务器
- 临时方案
配置:
SESSION_ENGINE = 'django.contrib.sessions.backends.file'
SESSION_FILE_PATH = '/tmp/django_sessions'
"""
"""
5. Cookie存储 (signed_cookies)
优点:
- 无服务器端存储
- 易扩展
- 无状态
缺点:
- Cookie大小限制(4KB)
- 每次请求传输
- 客户端可见(虽然签名)
适用场景:
- Session数据很少
- 完全无状态系统
- 不存储敏感信息
配置:
SESSION_ENGINE = 'django.contrib.sessions.backends.signed_cookies'
"""
性能对比测试:
# 100,000次Session读写测试
# 数据库
# 读:50-100ms/请求
# 写:100-200ms/请求
# 并发:~500 req/s
# 缓存(Redis)
# 读:1-5ms/请求
# 写:1-5ms/请求
# 并发:~10,000 req/s
# 缓存+数据库
# 读:1-5ms/请求(缓存命中)
# 写:10-20ms/请求(双写)
# 并发:~5,000 req/s
# Cookie
# 读:<1ms
# 写:<1ms
# 并发:~20,000 req/s
实现自定义Session后端(Redis):
# myapp/session_backends.py
from django.contrib.sessions.backends.base import SessionBase
import redis
class RedisSessionStore(SessionBase):
"""自定义Redis Session后端"""
def __init__(self, session_key=None):
super().__init__(session_key)
self.redis = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
def load(self):
"""加载session数据"""
try:
data = self.redis.get(self.session_key)
return self.decode(data) if data else {}
except:
return {}
def create(self):
"""创建新session"""
while True:
self._session_key = self._get_new_session_key()
if not self.exists(self._session_key):
break
self.modified = True
def save(self, must_create=False):
"""保存session"""
session_data = self.encode(self._get_session(no_load=must_create))
self.redis.setex(
self.session_key,
self.get_expiry_age(),
session_data
)
def exists(self, session_key):
"""检查session是否存在"""
return self.redis.exists(session_key)
def delete(self, session_key=None):
"""删除session"""
if session_key is None:
session_key = self.session_key
self.redis.delete(session_key)
# settings.py
SESSION_ENGINE = 'myapp.session_backends'
Session使用示例:
# 设置session
def login_view(request):
request.session['user_id'] = user.id
request.session['username'] = user.username
request.session.set_expiry(3600) # 1小时过期
# 读取session
def profile_view(request):
user_id = request.session.get('user_id')
username = request.session.get('username', 'Guest')
# 删除session
def logout_view(request):
request.session.flush() # 清空并删除session
# 或
del request.session['user_id'] # 删除特定key
request.session.modified = True
# 设置过期时间
request.session.set_expiry(0) # 浏览器关闭时过期
request.session.set_expiry(300) # 5分钟后过期
request.session.set_expiry(None) # 使用全局设置
高并发Session优化:
# settings.py
# 1. 使用Redis缓存后端
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
}
}
}
}
# 2. 使用cached_db引擎
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
# 3. 延长session过期时间,减少写入
SESSION_COOKIE_AGE = 1209600 # 2周
# 4. 只在修改时保存
SESSION_SAVE_EVERY_REQUEST = False
# 5. HTTP-only cookie
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SECURE = True # HTTPS
SESSION_COOKIE_SAMESITE = 'Lax'
What is Python's GIL? How does it affect concurrency?
What is Python’s GIL? How does it affect concurrency?
考察点:GIL理解、并发vs并行。
答案:
GIL(Global Interpreter Lock)是CPython解释器的全局锁,同一时刻只允许一个线程执行Python字节码。它简化了CPython的实现,但限制了多线程的并行能力,特别是在CPU密集型任务中。
GIL的工作原理:
"""
GIL机制:
1. 线程获取GIL锁
2. 执行Python字节码
3. 达到时间片或遇到IO操作
4. 释放GIL锁
5. 其他线程竞争GIL
6. 重复循环
结果:
- 同一时刻只有一个线程在执行
- 多核CPU也无法并行执行Python代码
- IO密集型任务影响较小
- CPU密集型任务无法利用多核
"""
不同场景的并发方案:
# 场景1:CPU密集型任务(计算、图像处理)
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
# ❌ 多线程 - GIL限制,无法并行
import threading
def cpu_intensive_task(n):
return sum(i * i for i in range(n))
threads = []
for i in range(4):
t = threading.Thread(target=cpu_intensive_task, args=(10000000,))
t.start()
threads.append(t)
for t in threads:
t.join()
# 执行时间:约等于单线程的4倍(因为GIL)
# ✅ 多进程 - 绕过GIL,真正并行
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(cpu_intensive_task, [10000000] * 4)
# 执行时间:约等于单线程(充分利用多核)
# 场景2:IO密集型任务(网络请求、文件读写)
import requests
from concurrent.futures import ThreadPoolExecutor
import asyncio
import aiohttp
urls = ['https://api.example.com/data' for _ in range(100)]
# ✅ 多线程 - IO期间释放GIL
def fetch_url(url):
return requests.get(url)
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(fetch_url, urls)
# 性能好:IO等待时其他线程可执行
# ✅ 异步IO - 最佳方案
async def fetch_async(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
# 性能最好:单线程异步
# 场景3:数据库查询(IO密集)
# ✅ 使用连接池 + 多线程
from django.db import connection
def query_database(query):
with connection.cursor() as cursor:
cursor.execute(query)
return cursor.fetchall()
queries = ["SELECT * FROM table WHERE id = %s" % i for i in range(100)]
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(query_database, queries)
并发方案选择指南:
"""
CPU密集型(计算、数据处理)
├── multiprocessing - 绕过GIL
│ ├── 优点:真正并行,充分利用多核
│ ├── 缺点:进程开销大,内存占用高
│ └── 适用:计算任务、图像处理、数据分析
│
└── 不要用threading - GIL限制
IO密集型(网络、文件、数据库)
├── asyncio - 推荐
│ ├── 优点:性能最好,资源占用少
│ ├── 缺点:需要异步库支持
│ └── 适用:Web爬虫、API调用、WebSocket
│
├── threading - 可用
│ ├── 优点:简单,库支持广泛
│ ├── 缺点:有线程开销
│ └── 适用:文件IO、数据库查询
│
└── multiprocessing - 过度
└── IO任务用多进程浪费资源
混合型
└── 进程池 + 线程池
└── 每个进程内用线程处理IO
"""
实际示例对比:
import time
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# CPU密集型任务
def cpu_task():
return sum(i * i for i in range(10000000))
# IO密集型任务
def io_task():
time.sleep(1)
return "Done"
# 测试1:CPU密集任务
print("=== CPU密集型任务 ===")
# 单线程
start = time.time()
for _ in range(4):
cpu_task()
print(f"单线程: {time.time() - start:.2f}秒") # ~8秒
# 多线程(GIL影响)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
list(executor.map(lambda _: cpu_task(), range(4)))
print(f"多线程: {time.time() - start:.2f}秒") # ~8秒(无提升)
# 多进程(绕过GIL)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
list(executor.map(lambda _: cpu_task(), range(4)))
print(f"多进程: {time.time() - start:.2f}秒") # ~2秒(4倍提升)
# 测试2:IO密集任务
print("\n=== IO密集型任务 ===")
# 单线程
start = time.time()
for _ in range(10):
io_task()
print(f"单线程: {time.time() - start:.2f}秒") # ~10秒
# 多线程(IO期间释放GIL)
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(lambda _: io_task(), range(10)))
print(f"多线程: {time.time() - start:.2f}秒") # ~1秒(10倍提升)
Django中的并发处理:
# 1. 异步视图(Django 3.1+)
from django.http import JsonResponse
import asyncio
import aiohttp
async def async_view(request):
"""异步视图处理并发请求"""
async with aiohttp.ClientSession() as session:
tasks = [
fetch_data(session, f"https://api.example.com/{i}")
for i in range(10)
]
results = await asyncio.gather(*tasks)
return JsonResponse({'data': results})
# 2. Celery异步任务(CPU密集)
from celery import shared_task
@shared_task
def process_heavy_computation(data):
"""Celery worker进程处理"""
result = heavy_computation(data)
return result
def api_view(request):
# 将任务放入队列
task = process_heavy_computation.delay(request.data)
return JsonResponse({'task_id': task.id})
# 3. 数据库查询优化
from django.db import connection
from concurrent.futures import ThreadPoolExecutor
def bulk_fetch_users(user_ids):
"""并发查询多个用户"""
with ThreadPoolExecutor(max_workers=5) as executor:
users = list(executor.map(
lambda uid: User.objects.get(id=uid),
user_ids
))
return users
绕过GIL的方法:
# 1. 使用多进程
from multiprocessing import Process, Queue
# 2. 使用C扩展(释放GIL)
# NumPy, Pandas等科学计算库内部已释放GIL
# 3. 使用其他Python实现
# - Jython(Java实现,无GIL)
# - IronPython(.NET实现,无GIL)
# - PyPy(JIT编译,有GIL但更快)
# 4. 使用Cython(手动释放GIL)
# cython代码
"""
from cython.parallel import prange
def parallel_computation(int n):
cdef int i
cdef double result = 0
with nogil: # 释放GIL
for i in prange(n, num_threads=4):
result += i * i
return result
"""
最佳实践:
What are the performance issues with nested serializers in DRF?
What are the performance issues with nested serializers in DRF?
考察点:DRF性能优化、嵌套序列化。
答案:
DRF嵌套序列化器会为每个对象单独查询关联数据,导致N+1查询问题。解决方法是在ViewSet中优化QuerySet,使用select_related或prefetch_related预加载关联数据。
问题示例:
from rest_framework import serializers
class AuthorSerializer(serializers.ModelSerializer):
class Meta:
model = Author
fields = ['id', 'name', 'email']
class BookSerializer(serializers.ModelSerializer):
author = AuthorSerializer(read_only=True) # 嵌套序列化
class Meta:
model = Book
fields = ['id', 'title', 'author', 'published_date']
# ViewSet
class BookListView(ListAPIView):
queryset = Book.objects.all() # ❌ 会产生N+1查询
serializer_class = BookSerializer
# 执行的SQL:
# 1. SELECT * FROM book (1次)
# 2. SELECT * FROM author WHERE id = 1 (N次,每本书一次)
# 总计:1 + N 次查询
优化方案:
# 方案1:在ViewSet中优化QuerySet
class BookListView(ListAPIView):
serializer_class = BookSerializer
def get_queryset(self):
return Book.objects.select_related('author').all() # ✅ 1次JOIN查询
# 方案2:自定义序列化器的to_representation
class OptimizedBookSerializer(serializers.ModelSerializer):
author = AuthorSerializer(read_only=True)
class Meta:
model = Book
fields = ['id', 'title', 'author']
@classmethod
def get_optimized_queryset(cls, queryset):
"""提供优化的查询集"""
return queryset.select_related('author')
# 方案3:使用SerializerMethodField(注意性能)
class BookSerializer(serializers.ModelSerializer):
author_name = serializers.SerializerMethodField()
class Meta:
model = Book
fields = ['id', 'title', 'author_name']
def get_author_name(self, obj):
# ⚠️ 如果没有select_related,每个对象都会查询
return obj.author.name
# ✅ 必须在ViewSet中优化
class BookListView(ListAPIView):
queryset = Book.objects.select_related('author').all()
serializer_class = BookSerializer
多层嵌套优化:
# 模型关系
class Country(models.Model):
name = models.CharField(max_length=100)
class Author(models.Model):
name = models.CharField(max_length=100)
country = models.ForeignKey(Country, on_delete=models.CASCADE)
class Book(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(Author, on_delete=models.CASCADE)
tags = models.ManyToManyField('Tag')
# 序列化器
class CountrySerializer(serializers.ModelSerializer):
class Meta:
model = Country
fields = ['id', 'name']
class AuthorSerializer(serializers.ModelSerializer):
country = CountrySerializer(read_only=True)
class Meta:
model = Author
fields = ['id', 'name', 'country']
class TagSerializer(serializers.ModelSerializer):
class Meta:
model = Tag
fields = ['id', 'name']
class BookSerializer(serializers.ModelSerializer):
author = AuthorSerializer(read_only=True)
tags = TagSerializer(many=True, read_only=True)
class Meta:
model = Book
fields = ['id', 'title', 'author', 'tags']
# ✅ ViewSet优化
class BookViewSet(viewsets.ModelViewSet):
serializer_class = BookSerializer
def get_queryset(self):
return Book.objects \
.select_related('author', 'author__country') \
.prefetch_related('tags') \
.all()
使用drf-spectacular自动文档的优化:
from drf_spectacular.utils import extend_schema
class BookViewSet(viewsets.ModelViewSet):
serializer_class = BookSerializer
def get_queryset(self):
queryset = Book.objects.all()
# 根据action优化查询
if self.action == 'list':
queryset = queryset.select_related('author') \
.prefetch_related('tags') \
.only('id', 'title', 'author__name')
elif self.action == 'retrieve':
queryset = queryset.select_related('author__country') \
.prefetch_related('tags', 'reviews')
return queryset
性能监控:
# 使用django-silk监控
# pip install django-silk
# settings.py
MIDDLEWARE = [
...
'silk.middleware.SilkyMiddleware',
]
INSTALLED_APPS = [
...
'silk',
]
# urls.py
urlpatterns = [
path('silk/', include('silk.urls', namespace='silk'))
]
# 访问 /silk/ 查看SQL查询统计
最佳实践:
get_queryset中优化select_related处理ForeignKeyprefetch_related处理ManyToManyHow to implement writable nested serializers in DRF?
How to implement writable nested serializers in DRF?
考察点:嵌套写入、事务管理。
答案:
DRF默认的嵌套序列化器是只读的,实现可写的嵌套序列化器需要重写create()和update()方法,并使用事务确保数据一致性。
只读嵌套(默认):
class OrderItemSerializer(serializers.ModelSerializer):
class Meta:
model = OrderItem
fields = ['product', 'quantity', 'price']
class OrderSerializer(serializers.ModelSerializer):
items = OrderItemSerializer(many=True, read_only=True) # 只读
class Meta:
model = Order
fields = ['id', 'user', 'items', 'total', 'created_at']
# ❌ 创建时无法传入items
data = {
'user': 1,
'items': [ # 这部分会被忽略
{'product': 1, 'quantity': 2, 'price': 10.00}
],
'total': 20.00
}
serializer = OrderSerializer(data=data)
serializer.save() # items不会被创建
实现可写嵌套序列化器:
from django.db import transaction
class OrderItemSerializer(serializers.ModelSerializer):
class Meta:
model = OrderItem
fields = ['product', 'quantity', 'price']
class OrderSerializer(serializers.ModelSerializer):
items = OrderItemSerializer(many=True) # 移除read_only
class Meta:
model = Order
fields = ['id', 'user', 'items', 'total', 'created_at']
read_only_fields = ['created_at']
@transaction.atomic
def create(self, validated_data):
"""创建订单和订单项"""
# 1. 提取嵌套数据
items_data = validated_data.pop('items')
# 2. 创建主对象
order = Order.objects.create(**validated_data)
# 3. 创建嵌套对象
for item_data in items_data:
OrderItem.objects.create(order=order, **item_data)
return order
@transaction.atomic
def update(self, instance, validated_data):
"""更新订单和订单项"""
# 1. 提取嵌套数据
items_data = validated_data.pop('items', None)
# 2. 更新主对象
instance.user = validated_data.get('user', instance.user)
instance.total = validated_data.get('total', instance.total)
instance.save()
# 3. 更新嵌套对象
if items_data is not None:
# 删除旧的订单项
instance.items.all().delete()
# 创建新的订单项
for item_data in items_data:
OrderItem.objects.create(order=instance, **item_data)
return instance
使用示例:
# POST创建订单
data = {
'user': 1,
'items': [
{'product': 1, 'quantity': 2, 'price': 10.00},
{'product': 2, 'quantity': 1, 'price': 15.00}
],
'total': 35.00
}
serializer = OrderSerializer(data=data)
if serializer.is_valid():
order = serializer.save()
# 订单和订单项都已创建
else:
print(serializer.errors)
# PUT更新订单
data = {
'user': 1,
'items': [
{'product': 3, 'quantity': 5, 'price': 8.00}
],
'total': 40.00
}
order = Order.objects.get(pk=1)
serializer = OrderSerializer(order, data=data)
if serializer.is_valid():
order = serializer.save()
# 旧订单项被删除,新订单项被创建
更智能的update实现:
class OrderSerializer(serializers.ModelSerializer):
items = OrderItemSerializer(many=True)
class Meta:
model = Order
fields = ['id', 'user', 'items', 'total']
@transaction.atomic
def update(self, instance, validated_data):
"""智能更新:保留现有项,更新或删除"""
items_data = validated_data.pop('items', None)
# 更新订单主信息
for attr, value in validated_data.items():
setattr(instance, attr, value)
instance.save()
if items_data is not None:
# 获取现有订单项ID
existing_items = {item.id: item for item in instance.items.all()}
updated_ids = []
for item_data in items_data:
item_id = item_data.get('id')
if item_id and item_id in existing_items:
# 更新现有项
item = existing_items[item_id]
for attr, value in item_data.items():
setattr(item, attr, value)
item.save()
updated_ids.append(item_id)
else:
# 创建新项
new_item = OrderItem.objects.create(
order=instance,
**item_data
)
updated_ids.append(new_item.id)
# 删除未更新的项
instance.items.exclude(id__in=updated_ids).delete()
return instance
验证嵌套数据:
class OrderItemSerializer(serializers.ModelSerializer):
class Meta:
model = OrderItem
fields = ['id', 'product', 'quantity', 'price']
def validate_quantity(self, value):
"""验证数量"""
if value <= 0:
raise serializers.ValidationError("数量必须大于0")
if value > 100:
raise serializers.ValidationError("数量不能超过100")
return value
def validate(self, attrs):
"""验证product库存"""
product = attrs['product']
quantity = attrs['quantity']
if product.stock < quantity:
raise serializers.ValidationError({
'quantity': f"库存不足,仅剩{product.stock}件"
})
return attrs
class OrderSerializer(serializers.ModelSerializer):
items = OrderItemSerializer(many=True)
class Meta:
model = Order
fields = ['id', 'user', 'items', 'total']
def validate_items(self, value):
"""验证订单项列表"""
if not value:
raise serializers.ValidationError("订单至少需要一个商品")
if len(value) > 50:
raise serializers.ValidationError("单个订单最多50件商品")
return value
def validate(self, attrs):
"""验证总价"""
items = attrs.get('items', [])
total = attrs.get('total', 0)
calculated_total = sum(
item['quantity'] * item['price']
for item in items
)
if abs(calculated_total - total) > 0.01:
raise serializers.ValidationError({
'total': f"总价计算错误,应为{calculated_total}"
})
return attrs
事务和异常处理:
from django.db import transaction
from rest_framework import status
from rest_framework.response import Response
class OrderViewSet(viewsets.ModelViewSet):
queryset = Order.objects.all()
serializer_class = OrderSerializer
@transaction.atomic
def create(self, request, *args, **kwargs):
"""创建订单(带事务)"""
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
# 保存订单(在事务中)
order = serializer.save()
# 减少库存
for item in order.items.all():
product = item.product
product.stock -= item.quantity
product.save()
# 如果一切正常,事务提交
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
headers=headers
)
except Exception as e:
# 任何异常都会导致事务回滚
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
多对多关系的嵌套写入:
class TagSerializer(serializers.ModelSerializer):
class Meta:
model = Tag
fields = ['id', 'name']
class ArticleSerializer(serializers.ModelSerializer):
tags = TagSerializer(many=True)
class Meta:
model = Article
fields = ['id', 'title', 'content', 'tags']
@transaction.atomic
def create(self, validated_data):
tags_data = validated_data.pop('tags')
article = Article.objects.create(**validated_data)
for tag_data in tags_data:
# 获取或创建tag
tag, created = Tag.objects.get_or_create(
name=tag_data['name']
)
article.tags.add(tag)
return article
@transaction.atomic
def update(self, instance, validated_data):
tags_data = validated_data.pop('tags', None)
# 更新文章主信息
instance.title = validated_data.get('title', instance.title)
instance.content = validated_data.get('content', instance.content)
instance.save()
# 更新tags
if tags_data is not None:
instance.tags.clear()
for tag_data in tags_data:
tag, created = Tag.objects.get_or_create(
name=tag_data['name']
)
instance.tags.add(tag)
return instance
性能优化技巧:
# 1. 使用bulk_create批量创建
@transaction.atomic
def create(self, validated_data):
items_data = validated_data.pop('items')
order = Order.objects.create(**validated_data)
# ✅ 批量创建(1次SQL)
order_items = [
OrderItem(order=order, **item_data)
for item_data in items_data
]
OrderItem.objects.bulk_create(order_items)
return order
# 2. 使用PrimaryKeyRelatedField简化
class OrderSerializer(serializers.ModelSerializer):
# 只传递ID,不嵌套完整对象
items = serializers.PrimaryKeyRelatedField(
many=True,
queryset=OrderItem.objects.all()
)
class Meta:
model = Order
fields = ['id', 'user', 'items', 'total']
# 3. 使用不同的序列化器
class OrderListSerializer(serializers.ModelSerializer):
# 列表视图:简化序列化
item_count = serializers.IntegerField(source='items.count')
class Meta:
model = Order
fields = ['id', 'user', 'total', 'item_count']
class OrderDetailSerializer(serializers.ModelSerializer):
# 详情视图:完整序列化
items = OrderItemSerializer(many=True)
class Meta:
model = Order
fields = ['id', 'user', 'items', 'total', 'created_at']
class OrderViewSet(viewsets.ModelViewSet):
queryset = Order.objects.all()
def get_serializer_class(self):
if self.action == 'list':
return OrderListSerializer
return OrderDetailSerializer
How to implement multi-tenant permissions in DRF?
How to implement multi-tenant permissions in DRF?
考察点:多租户架构、数据隔离。
答案:
多租户系统中每个租户的数据必须严格隔离,通过在模型中添加租户字段,在QuerySet级别自动过滤,在权限类中验证,确保用户只能访问自己租户的数据。
模型设计:
from django.db import models
from django.contrib.auth.models import User
class Tenant(models.Model):
"""租户模型"""
name = models.CharField(max_length=100)
subdomain = models.CharField(max_length=50, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
is_active = models.BooleanField(default=True)
class TenantUser(models.Model):
"""用户与租户的关联"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE)
role = models.CharField(max_length=20, choices=[
('admin', 'Administrator'),
('member', 'Member'),
('viewer', 'Viewer'),
])
class Meta:
unique_together = ['user', 'tenant']
class TenantAwareModel(models.Model):
"""所有租户数据的基类"""
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE)
class Meta:
abstract = True
class Project(TenantAwareModel):
name = models.CharField(max_length=200)
description = models.TextField()
created_by = models.ForeignKey(User, on_delete=models.CASCADE)
class Task(TenantAwareModel):
project = models.ForeignKey(Project, on_delete=models.CASCADE)
title = models.CharField(max_length=200)
assignee = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
中间件识别当前租户:
class TenantMiddleware:
"""从子域名识别当前租户"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 从子域名提取租户
host = request.get_host().split(':')[0]
subdomain = host.split('.')[0]
try:
tenant = Tenant.objects.get(
subdomain=subdomain,
is_active=True
)
request.tenant = tenant
except Tenant.DoesNotExist:
request.tenant = None
response = self.get_response(request)
return response
# 或从请求头识别
class TenantHeaderMiddleware:
def __call__(self, request):
tenant_id = request.META.get('HTTP_X_TENANT_ID')
if tenant_id:
try:
tenant = Tenant.objects.get(id=tenant_id)
request.tenant = tenant
except Tenant.DoesNotExist:
request.tenant = None
return self.get_response(request)
QuerySet级别过滤:
# 方案1:Manager
class TenantManager(models.Manager):
"""自动过滤当前租户的数据"""
def get_queryset(self):
# 需要从某处获取当前租户
# 这里使用thread-local存储
from threading import local
_thread_locals = local()
queryset = super().get_queryset()
if hasattr(_thread_locals, 'tenant'):
queryset = queryset.filter(tenant=_thread_locals.tenant)
return queryset
class Project(TenantAwareModel):
name = models.CharField(max_length=200)
objects = TenantManager() # 使用自定义Manager
all_objects = models.Manager() # 保留不过滤的Manager
# 使用
projects = Project.objects.all() # 自动过滤当前租户
all_projects = Project.all_objects.all() # 所有租户的数据(管理员用)
# 方案2:在ViewSet中过滤
class ProjectViewSet(viewsets.ModelViewSet):
serializer_class = ProjectSerializer
def get_queryset(self):
"""只返回当前租户的项目"""
return Project.objects.filter(tenant=self.request.tenant)
def perform_create(self, serializer):
"""创建时自动设置租户"""
serializer.save(
tenant=self.request.tenant,
created_by=self.request.user
)
权限类实现:
from rest_framework import permissions
class TenantPermission(permissions.BasePermission):
"""确保用户只能访问自己租户的数据"""
def has_permission(self, request, view):
"""视图级权限"""
# 1. 检查用户是否已认证
if not request.user.is_authenticated:
return False
# 2. 检查用户是否属于当前租户
if not hasattr(request, 'tenant'):
return False
return TenantUser.objects.filter(
user=request.user,
tenant=request.tenant
).exists()
def has_object_permission(self, request, view, obj):
"""对象级权限"""
# 确保对象属于当前租户
if hasattr(obj, 'tenant'):
return obj.tenant == request.tenant
return True
class TenantAdminPermission(permissions.BasePermission):
"""租户管理员权限"""
def has_permission(self, request, view):
if not request.user.is_authenticated:
return False
try:
tenant_user = TenantUser.objects.get(
user=request.user,
tenant=request.tenant
)
return tenant_user.role == 'admin'
except TenantUser.DoesNotExist:
return False
ViewSet完整实现:
class ProjectViewSet(viewsets.ModelViewSet):
serializer_class = ProjectSerializer
permission_classes = [TenantPermission]
def get_queryset(self):
"""自动过滤当前租户的项目"""
return Project.objects.filter(
tenant=self.request.tenant
).select_related('created_by')
def perform_create(self, serializer):
"""创建时自动设置租户"""
serializer.save(
tenant=self.request.tenant,
created_by=self.request.user
)
def perform_update(self, serializer):
"""更新时验证租户"""
if serializer.instance.tenant != self.request.tenant:
raise PermissionDenied("无权修改其他租户的数据")
serializer.save()
def perform_destroy(self, instance):
"""删除时验证租户"""
if instance.tenant != self.request.tenant:
raise PermissionDenied("无权删除其他租户的数据")
instance.delete()
防止跨租户数据泄露:
# 1. 数据库级约束
class Project(TenantAwareModel):
name = models.CharField(max_length=200)
class Meta:
indexes = [
models.Index(fields=['tenant', 'id']),
]
constraints = [
models.UniqueConstraint(
fields=['tenant', 'name'],
name='unique_project_per_tenant'
)
]
# 2. QuerySet检查Mixin
class TenantQuerySetMixin:
def get_queryset(self):
queryset = super().get_queryset()
# 强制过滤租户
if hasattr(self.request, 'tenant') and self.request.tenant:
if hasattr(queryset.model, 'tenant'):
queryset = queryset.filter(tenant=self.request.tenant)
return queryset
class ProjectViewSet(TenantQuerySetMixin, viewsets.ModelViewSet):
queryset = Project.objects.all()
serializer_class = ProjectSerializer
# 3. 单元测试
from rest_framework.test import APITestCase
class TenantIsolationTest(APITestCase):
def setUp(self):
self.tenant1 = Tenant.objects.create(name="Tenant 1")
self.tenant2 = Tenant.objects.create(name="Tenant 2")
self.user1 = User.objects.create_user('user1')
self.user2 = User.objects.create_user('user2')
TenantUser.objects.create(user=self.user1, tenant=self.tenant1)
TenantUser.objects.create(user=self.user2, tenant=self.tenant2)
self.project1 = Project.objects.create(
name="Project 1",
tenant=self.tenant1
)
self.project2 = Project.objects.create(
name="Project 2",
tenant=self.tenant2
)
def test_user_can_only_see_own_tenant_projects(self):
"""用户只能看到自己租户的项目"""
self.client.force_authenticate(user=self.user1)
# 模拟租户中间件
from unittest.mock import Mock
request = Mock(tenant=self.tenant1, user=self.user1)
response = self.client.get('/api/projects/')
# 断言只返回tenant1的项目
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]['id'], self.project1.id)
def test_user_cannot_access_other_tenant_project(self):
"""用户无法访问其他租户的项目"""
self.client.force_authenticate(user=self.user1)
# 尝试访问tenant2的项目
response = self.client.get(f'/api/projects/{self.project2.id}/')
# 应该返回404或403
self.assertIn(response.status_code, [403, 404])
最佳实践:
How to implement API throttling in DRF?
How to implement API throttling in DRF?
考察点:限流算法、API保护。
答案:
API限流通过控制用户在一定时间内的请求次数,防止滥用和保护服务器资源。DRF提供多种限流策略,也可以自定义限流算法和存储后端。
DRF内置限流类:
from rest_framework.throttling import (
AnonRateThrottle, # 匿名用户限流
UserRateThrottle, # 认证用户限流
ScopedRateThrottle, # 基于视图范围限流
)
# settings.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'anon': '100/day', # 匿名用户每天100次
'user': '1000/day', # 认证用户每天1000次
}
}
自定义限流类:
from rest_framework.throttling import UserRateThrottle
class BurstRateThrottle(UserRateThrottle):
"""突发流量限流"""
scope = 'burst'
class SustainedRateThrottle(UserRateThrottle):
"""持续流量限流"""
scope = 'sustained'
# settings.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'api.throttles.BurstRateThrottle',
'api.throttles.SustainedRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'burst': '60/min', # 每分钟60次
'sustained': '1000/day' # 每天1000次
}
}
# 视图中使用
from rest_framework.views import APIView
from rest_framework.response import Response
class MyAPIView(APIView):
throttle_classes = [BurstRateThrottle, SustainedRateThrottle]
def get(self, request):
return Response({'message': 'OK'})
基于用户等级的差异化限流:
class PremiumUserThrottle(UserRateThrottle):
"""高级用户限流"""
def get_rate(self):
"""根据用户级别返回不同速率"""
if not self.request.user.is_authenticated:
return '100/day'
if self.request.user.is_premium:
return '10000/day' # 高级用户
elif self.request.user.is_vip:
return '5000/day' # VIP用户
else:
return '1000/day' # 普通用户
def get_cache_key(self, request, view):
"""自定义缓存key"""
if request.user.is_authenticated:
ident = request.user.pk
else:
ident = self.get_ident(request)
return self.cache_format % {
'scope': self.scope,
'ident': ident
}
class APIView(APIView):
throttle_classes = [PremiumUserThrottle]
自定义限流算法:
from rest_framework.throttling import SimpleRateThrottle
import redis
import time
class RedisRateThrottle(SimpleRateThrottle):
"""使用Redis实现的限流"""
scope = 'redis_throttle'
def __init__(self):
super().__init__()
self.redis_client = redis.Redis(
host='localhost',
port=6379,
db=0
)
def allow_request(self, request, view):
"""
使用令牌桶算法:
- 令牌以固定速率生成
- 请求消耗令牌
- 令牌不足则拒绝请求
"""
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
# 解析速率(如'60/min')
self.num_requests, self.duration = self.parse_rate(self.rate)
# 令牌桶算法
now = time.time()
key_tokens = f"{self.key}:tokens"
key_timestamp = f"{self.key}:timestamp"
# 获取当前令牌数和上次更新时间
tokens = float(self.redis_client.get(key_tokens) or self.num_requests)
last_update = float(self.redis_client.get(key_timestamp) or now)
# 计算新增令牌
time_passed = now - last_update
new_tokens = time_passed * (self.num_requests / self.duration)
tokens = min(tokens + new_tokens, self.num_requests)
# 检查是否有足够的令牌
if tokens >= 1:
tokens -= 1
# 更新Redis
self.redis_client.setex(key_tokens, int(self.duration), tokens)
self.redis_client.setex(key_timestamp, int(self.duration), now)
return True
else:
return False
def wait(self):
"""计算需要等待的时间"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
return remaining_duration
return None
# 漏桶算法实现
class LeakyBucketThrottle(SimpleRateThrottle):
"""漏桶算法限流"""
def allow_request(self, request, view):
"""
漏桶算法:
- 请求以任意速率进入桶
- 请求以固定速率流出
- 桶满则拒绝请求
"""
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
self.num_requests, self.duration = self.parse_rate(self.rate)
now = time.time()
key = f"{self.key}:leaky"
# 获取桶中的水量
water_level = float(self.redis_client.get(key) or 0)
# 计算漏出的水量
leak_rate = self.num_requests / self.duration
time_since_last = now - (float(self.redis_client.get(f"{key}:time") or now))
leaked = time_since_last * leak_rate
water_level = max(0, water_level - leaked)
# 检查是否溢出
if water_level < self.num_requests:
water_level += 1
# 更新Redis
self.redis_client.setex(key, int(self.duration), water_level)
self.redis_client.setex(f"{key}:time", int(self.duration), now)
return True
else:
return False
分布式限流(Redis):
from django.core.cache import cache
from rest_framework.throttling import SimpleRateThrottle
class DistributedThrottle(SimpleRateThrottle):
"""分布式环境下的限流"""
scope = 'distributed'
def allow_request(self, request, view):
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
self.num_requests, self.duration = self.parse_rate(self.rate)
self.now = time.time()
# 使用Redis的sorted set实现滑动窗口
# key: 请求key
# score: 时间戳
# value: 唯一ID
# 移除过期的请求记录
min_time = self.now - self.duration
cache.delete_pattern(f"{self.key}:*:expired")
# 获取当前窗口内的请求数
self.history = cache.get(self.key, [])
self.history = [t for t in self.history if t > min_time]
if len(self.history) >= self.num_requests:
return self.throttle_failure()
# 记录当前请求
self.history.append(self.now)
cache.set(self.key, self.history, self.duration)
return self.throttle_success()
视图级别的限流:
from rest_framework.decorators import api_view, throttle_classes
# ViewSet中使用
class ArticleViewSet(viewsets.ModelViewSet):
queryset = Article.objects.all()
serializer_class = ArticleSerializer
def get_throttles(self):
"""不同action使用不同限流策略"""
if self.action == 'create':
throttle_classes = [SustainedRateThrottle]
elif self.action == 'list':
throttle_classes = [BurstRateThrottle]
else:
throttle_classes = []
return [throttle() for throttle in throttle_classes]
# 函数视图中使用
@api_view(['POST'])
@throttle_classes([BurstRateThrottle])
def create_post(request):
return Response({'status': 'created'})
# 动态限流
class DynamicRateThrottle(UserRateThrottle):
def get_rate(self):
"""从数据库获取用户的限流配置"""
try:
user_config = UserThrottleConfig.objects.get(
user=self.request.user
)
return user_config.rate
except UserThrottleConfig.DoesNotExist:
return '100/hour' # 默认值
自定义限流响应:
from rest_framework.views import exception_handler
from rest_framework.exceptions import Throttled
def custom_exception_handler(exc, context):
"""自定义限流错误响应"""
response = exception_handler(exc, context)
if isinstance(exc, Throttled):
custom_response_data = {
'error': 'Request limit exceeded',
'message': f'请在 {exc.wait} 秒后重试',
'retry_after': exc.wait
}
response.data = custom_response_data
response['Retry-After'] = str(exc.wait)
return response
# settings.py
REST_FRAMEWORK = {
'EXCEPTION_HANDLER': 'myapp.utils.custom_exception_handler'
}
IP级别限流:
class IPRateThrottle(SimpleRateThrottle):
"""基于IP的限流"""
scope = 'ip'
def get_cache_key(self, request, view):
"""使用IP地址作为限流key"""
ident = self.get_ident(request) # 获取IP
return self.cache_format % {
'scope': self.scope,
'ident': ident
}
# 获取真实IP(考虑代理)
from rest_framework.throttling import AnonRateThrottle
class RealIPThrottle(AnonRateThrottle):
def get_ident(self, request):
"""获取真实IP地址"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
if xff:
return xff.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')
监控和分析:
from django.core.cache import cache
import logging
logger = logging.getLogger(__name__)
class MonitoringThrottle(UserRateThrottle):
"""带监控的限流"""
def allow_request(self, request, view):
allowed = super().allow_request(request, view)
# 记录限流统计
if not allowed:
logger.warning(
f"用户 {request.user.id} 被限流",
extra={
'user_id': request.user.id,
'path': request.path,
'rate': self.rate
}
)
# 增加限流计数器
counter_key = f"throttle_hits:{request.user.id}:{self.scope}"
cache.incr(counter_key, 1)
return allowed
最佳实践:
How to optimize complex Django queries?
How to optimize complex Django queries?
考察点:复杂查询优化、聚合查询。
答案:
复杂查询优化需要综合使用select_related、prefetch_related、annotate、only/defer等技术,将多次查询合并为少量高效的SQL语句。
问题场景:
# 需求:获取所有文章及其作者、分类、标签、评论数
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(User, on_delete=models.CASCADE)
category = models.ForeignKey(Category, on_delete=models.CASCADE)
tags = models.ManyToManyField(Tag)
class Comment(models.Model):
article = models.ForeignKey(Article, related_name='comments', on_delete=models.CASCADE)
content = models.TextField()
# ❌ 未优化代码 - 产生大量查询
articles = Article.objects.all() # 1次查询
for article in articles:
print(f"{article.title}") # 不触发查询
print(f"作者: {article.author.name}") # N次查询(author)
print(f"分类: {article.category.name}") # N次查询(category)
print(f"标签: {[tag.name for tag in article.tags.all()]}") # N次查询(tags)
print(f"评论数: {article.comments.count()}") # N次查询(count)
# 总计:1 + 4N 次查询
# 100篇文章 = 401次查询!
完整优化方案:
from django.db.models import Count, Prefetch
# ✅ 优化后 - 3次查询
articles = Article.objects \
.select_related('author', 'category') \
.prefetch_related('tags') \
.annotate(comment_count=Count('comments')) \
.only('id', 'title', 'author__name', 'category__name')
for article in articles:
print(f"{article.title}")
print(f"作者: {article.author.name}") # 不查询
print(f"分类: {article.category.name}") # 不查询
print(f"标签: {[tag.name for tag in article.tags.all()]}") # 不查询
print(f"评论数: {article.comment_count}") # 不查询
# 执行的SQL:
# 1. SELECT article.id, article.title, author.name, category.name, COUNT(comments.id)
# FROM article
# LEFT JOIN author ON article.author_id = author.id
# LEFT JOIN category ON article.category_id = category.id
# LEFT JOIN comment ON comment.article_id = article.id
# GROUP BY article.id
# 2. SELECT tag.* FROM tag
# INNER JOIN article_tags ON tag.id = article_tags.tag_id
# WHERE article_tags.article_id IN (1, 2, 3, ...)
# 3. 可能的额外查询(prefetch_related)
# 总计:2-3次查询
使用annotate聚合查询:
from django.db.models import Count, Avg, Sum, Max, Min, Q
# 基本聚合
articles = Article.objects.annotate(
comment_count=Count('comments'),
avg_rating=Avg('comments__rating'),
total_views=Sum('views'),
latest_comment=Max('comments__created_at')
)
for article in articles:
print(f"{article.title}: {article.comment_count}条评论")
print(f"平均评分: {article.avg_rating}")
print(f"总浏览量: {article.total_views}")
# 条件聚合
articles = Article.objects.annotate(
approved_comments=Count('comments', filter=Q(comments__approved=True)),
pending_comments=Count('comments', filter=Q(comments__approved=False))
)
# 复杂聚合
from django.db.models import Case, When, IntegerField
articles = Article.objects.annotate(
status=Case(
When(published=True, then=1),
When(draft=True, then=2),
default=0,
output_field=IntegerField()
),
priority_score=Count('comments') + Count('likes') * 2
).order_by('-priority_score')
使用Prefetch自定义预取:
from django.db.models import Prefetch
# 预取并过滤
articles = Article.objects.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.filter(approved=True).select_related('author'),
to_attr='approved_comments'
)
).all()
for article in articles:
for comment in article.approved_comments:
print(f"{comment.author.name}: {comment.content}")
# 多个自定义预取
articles = Article.objects.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.filter(approved=True),
to_attr='approved_comments'
),
Prefetch(
'comments',
queryset=Comment.objects.filter(approved=False),
to_attr='pending_comments'
),
Prefetch(
'tags',
queryset=Tag.objects.filter(active=True).order_by('name'),
to_attr='active_tags'
)
)
子查询优化:
from django.db.models import OuterRef, Subquery, Exists
# 使用Subquery
latest_comment = Comment.objects.filter(
article=OuterRef('pk')
).order_by('-created_at')
articles = Article.objects.annotate(
latest_comment_content=Subquery(
latest_comment.values('content')[:1]
),
latest_comment_date=Subquery(
latest_comment.values('created_at')[:1]
)
)
# 使用Exists
articles = Article.objects.annotate(
has_comments=Exists(
Comment.objects.filter(article=OuterRef('pk'))
)
).filter(has_comments=True)
# 相当于
# SELECT *, EXISTS(
# SELECT 1 FROM comment WHERE comment.article_id = article.id
# ) AS has_comments
# FROM article
# WHERE has_comments = TRUE
原生SQL的使用场景:
from django.db import connection
# 场景:极其复杂的查询,ORM难以表达
def complex_report():
with connection.cursor() as cursor:
cursor.execute("""
SELECT
a.id,
a.title,
COUNT(DISTINCT c.id) as comment_count,
COUNT(DISTINCT l.id) as like_count,
AVG(r.rating) as avg_rating
FROM article a
LEFT JOIN comment c ON c.article_id = a.id
LEFT JOIN like l ON l.article_id = a.id
LEFT JOIN rating r ON r.article_id = a.id
WHERE a.published = TRUE
GROUP BY a.id
HAVING comment_count > 10
ORDER BY avg_rating DESC
LIMIT 100
""")
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
# 使用raw()
articles = Article.objects.raw("""
SELECT a.*, COUNT(c.id) as comment_count
FROM article a
LEFT JOIN comment c ON c.article_id = a.id
GROUP BY a.id
""")
性能对比:
# 测试数据:1000篇文章,每篇10条评论、5个标签
# 方案1:无优化
# 查询次数:1 + 1000 + 1000 + 1000 + 1000 = 4001次
# 响应时间:~10秒
# 方案2:使用select_related
# 查询次数:1 + 1000 + 1000 = 2001次
# 响应时间:~5秒
# 方案3:select_related + prefetch_related
# 查询次数:3-4次
# 响应时间:~200ms
# 方案4:完整优化(+annotate +only)
# 查询次数:2-3次
# 响应时间:~100ms
最佳实践:
# 1. 列表视图优化
class ArticleListView(ListAPIView):
def get_queryset(self):
return Article.objects \
.select_related('author', 'category') \
.prefetch_related('tags') \
.annotate(comment_count=Count('comments')) \
.only('id', 'title', 'summary', 'author__name', 'category__name') \
.order_by('-created_at')[:20]
# 2. 详情视图优化
class ArticleDetailView(RetrieveAPIView):
def get_queryset(self):
return Article.objects \
.select_related('author__profile', 'category') \
.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.select_related('author').order_by('-created_at')[:50]
),
'tags'
)
# 3. 使用Django Debug Toolbar
# pip install django-debug-toolbar
# 监控每个请求的SQL查询
# 4. 索引优化
class Article(models.Model):
title = models.CharField(max_length=200, db_index=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
class Meta:
indexes = [
models.Index(fields=['-created_at']), # 降序索引
models.Index(fields=['author', 'category']), # 组合索引
]
How to prevent cache penetration, cache avalanche, and cache breakdown?
How to prevent cache penetration, cache avalanche, and cache breakdown?
考察点:缓存架构、高可用设计。
答案:
缓存穿透、雪崩、击穿是高并发系统中的三大缓存问题。穿透是查询不存在的数据,雪崩是大量缓存同时失效,击穿是热点数据失效导致大量请求打到数据库。
1. 缓存穿透(Cache Penetration):
from django.core.cache import cache
# 问题:查询不存在的数据
def get_user(user_id):
# ❌ 问题代码
user = cache.get(f'user:{user_id}')
if user is None:
user = User.objects.filter(id=user_id).first()
if user: # 只缓存存在的数据
cache.set(f'user:{user_id}', user, 300)
return user
# 攻击者查询user_id=-1, -2, -3...
# 每次都穿透缓存,直接查询数据库
# 解决方案1:缓存空值
def get_user_v1(user_id):
cache_key = f'user:{user_id}'
user = cache.get(cache_key)
# 使用特殊值表示"不存在"
if user == 'NULL':
return None
if user is None:
user = User.objects.filter(id=user_id).first()
if user:
cache.set(cache_key, user, 300)
else:
# ✅ 缓存空值,短期过期
cache.set(cache_key, 'NULL', 60)
return user if user != 'NULL' else None
# 解决方案2:布隆过滤器
import hashlib
class BloomFilter:
"""简单的布隆过滤器"""
def __init__(self, size=10000):
self.size = size
self.bit_array = [False] * size
def _hash(self, value, seed):
"""哈希函数"""
h = hashlib.md5(f"{value}{seed}".encode())
return int(h.hexdigest(), 16) % self.size
def add(self, value):
"""添加元素"""
for seed in range(3): # 使用3个哈希函数
index = self._hash(value, seed)
self.bit_array[index] = True
def contains(self, value):
"""检查元素是否可能存在"""
for seed in range(3):
index = self._hash(value, seed)
if not self.bit_array[index]:
return False # 肯定不存在
return True # 可能存在
# 初始化时将所有user_id添加到布隆过滤器
bloom = BloomFilter(size=100000)
for user in User.objects.values_list('id', flat=True):
bloom.add(user)
def get_user_v2(user_id):
# 先检查布隆过滤器
if not bloom.contains(user_id):
return None # 肯定不存在,不查询
# 可能存在,正常查询
cache_key = f'user:{user_id}'
user = cache.get(cache_key)
if user is None:
user = User.objects.filter(id=user_id).first()
if user:
cache.set(cache_key, user, 300)
return user
2. 缓存雪崩(Cache Avalanche):
from random import randint
# 问题:大量缓存同时过期
def cache_user_batch():
users = User.objects.all()
for user in users:
cache.set(f'user:{user.id}', user, 300) # ❌ 同时过期
# 某个时刻所有缓存同时失效 → 大量请求打到数据库 → 数据库崩溃
# 解决方案1:过期时间加随机值
def set_cache_with_random_ttl(key, value, base_ttl=300):
"""添加随机过期时间"""
random_ttl = base_ttl + randint(0, 60) # ✅ 加0-60秒随机值
cache.set(key, value, random_ttl)
def cache_user_batch_v1():
users = User.objects.all()
for user in users:
set_cache_with_random_l(f'user:{user.id}', user, 300)
# 解决方案2:永不过期 + 异步更新
import threading
def get_user_never_expire(user_id):
cache_key = f'user:{user_id}'
cache_meta_key = f'user:{user_id}:meta'
user = cache.get(cache_key)
meta = cache.get(cache_meta_key) or {}
# 检查是否需要更新
if meta.get('updating'):
# 正在更新,返回旧数据
return user
if user is None or meta.get('version', 0) < get_current_version():
# 标记为更新中
cache.set(cache_meta_key, {'updating': True}, 10)
# 异步更新缓存
def async_update():
fresh_user = User.objects.get(id=user_id)
cache.set(cache_key, fresh_user, None) # 永不过期
cache.set(cache_meta_key, {'version': get_current_version()}, None)
threading.Thread(target=async_update).start()
return user
# 解决方案3:多级缓存
def get_user_multi_level(user_id):
# L1: 本地内存缓存(极快,但不共享)
from cachetools import TTLCache
local_cache = TTLCache(maxsize=1000, ttl=60)
user = local_cache.get(user_id)
if user:
return user
# L2: Redis缓存(快,共享)
cache_key = f'user:{user_id}'
user = cache.get(cache_key)
if user:
local_cache[user_id] = user
return user
# L3: 数据库
user = User.objects.get(id=user_id)
cache.set(cache_key, user, 300 + randint(0, 60))
local_cache[user_id] = user
return user
3. 缓存击穿(Cache Breakdown):
import threading
# 问题:热点数据过期,瞬间大量请求
def get_hot_article(article_id):
# ❌ 热点文章缓存过期瞬间,1000个请求同时查数据库
cache_key = f'article:{article_id}'
article = cache.get(cache_key)
if article is None:
article = Article.objects.get(id=article_id)
cache.set(cache_key, article, 300)
return article
# 解决方案1:互斥锁
import threading
_locks = {}
_lock = threading.Lock()
def get_hot_article_with_lock(article_id):
cache_key = f'article:{article_id}'
article = cache.get(cache_key)
if article is None:
# 获取或创建该key的锁
with _lock:
if article_id not in _locks:
_locks[article_id] = threading.Lock()
item_lock = _locks[article_id]
# 使用该key的锁
with item_lock:
# Double-check
article = cache.get(cache_key)
if article is None:
article = Article.objects.get(id=article_id)
cache.set(cache_key, article, 300)
return article
# 解决方案2:使用Redis的SETNX实现分布式锁
import redis
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_hot_article_distributed_lock(article_id):
cache_key = f'article:{article_id}'
lock_key = f'lock:{cache_key}'
article = cache.get(cache_key)
if article is None:
# 尝试获取锁(10秒超时)
lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)
if lock_acquired:
try:
# 成功获取锁,查询数据库
article = Article.objects.get(id=article_id)
cache.set(cache_key, article, 300)
finally:
# 释放锁
redis_client.delete(lock_key)
else:
# 未获取到锁,等待后重试
time.sleep(0.1)
return get_hot_article_distributed_lock(article_id)
return article
# 解决方案3:热点数据永不过期
def get_hot_article_never_expire(article_id):
cache_key = f'article:{article_id}'
cache_time_key = f'article:{article_id}:time'
article = cache.get(cache_key)
last_update = cache.get(cache_time_key)
# 数据存在但可能过期
if article and last_update:
age = time.time() - last_update
# 数据接近过期(如超过4分钟),异步更新
if age > 240:
def async_refresh():
fresh_article = Article.objects.get(id=article_id)
cache.set(cache_key, fresh_article, None) # 永不过期
cache.set(cache_time_key, time.time(), None)
threading.Thread(target=async_refresh).start()
if article is None:
article = Article.objects.get(id=article_id)
cache.set(cache_key, article, None)
cache.set(cache_time_key, time.time(), None)
return article
缓存更新策略:
# Cache Aside(旁路缓存)- 最常用
def get_user_cache_aside(user_id):
"""
读:先查缓存,miss则查DB并更新缓存
写:先更新DB,然后删除缓存
"""
# 读
user = cache.get(f'user:{user_id}')
if user is None:
user = User.objects.get(id=user_id)
cache.set(f'user:{user_id}', user, 300)
return user
def update_user_cache_aside(user_id, data):
# 写
user = User.objects.get(id=user_id)
user.name = data['name']
user.save()
# 删除缓存(下次读取时更新)
cache.delete(f'user:{user_id}')
# Read Through / Write Through
# 由缓存层负责与数据库交互,应用只与缓存交互
# Write Behind(异步写回)
def update_user_write_behind(user_id, data):
# 立即更新缓存
user = cache.get(f'user:{user_id}')
user.name = data['name']
cache.set(f'user:{user_id}', user, 300)
# 异步写入数据库
from celery import shared_task
@shared_task
def async_save_to_db(user_id, data):
user = User.objects.get(id=user_id)
user.name = data['name']
user.save()
async_save_to_db.delay(user_id, data)
Django中的缓存配置:
# settings.py
# Redis缓存
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'SOCKET_CONNECT_TIMEOUT': 5,
'SOCKET_TIMEOUT': 5,
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True
},
# 序列化器
'SERIALIZER': 'django_redis.serializers.json.JSONSerializer',
},
'KEY_PREFIX': 'myapp',
'TIMEOUT': 300,
}
}
# 使用django-redis的高级功能
from django_redis import get_redis_connection
# 获取原生Redis客户端
redis_conn = get_redis_connection('default')
# 使用Redis pipeline(批量操作)
pipe = redis_conn.pipeline()
for user_id in range(1, 101):
pipe.get(f'user:{user_id}')
results = pipe.execute()
# 使用Redis的原子操作
redis_conn.incr('page_views')
redis_conn.expire('page_views', 3600)
综合防护方案:
import redis
from django.core.cache import cache
from functools import wraps
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_with_protection(
key_prefix,
ttl=300,
null_ttl=60,
use_lock=True
):
"""综合防护装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
cache_key = f"{key_prefix}:{':'.join(map(str, args))}"
lock_key = f"lock:{cache_key}"
# 1. 尝试从缓存获取
result = cache.get(cache_key)
# 缓存命中
if result is not None:
if result == '__NULL__':
return None
return result
# 2. 缓存miss,使用锁防止击穿
if use_lock:
# 尝试获取分布式锁
lock_acquired = redis_client.set(
lock_key,
'1',
nx=True,
ex=10
)
if lock_acquired:
try:
# 获取锁成功,查询数据
result = func(*args, **kwargs)
if result is None:
# 防止穿透:缓存空值
cache.set(cache_key, '__NULL__', null_ttl)
else:
# 防止雪崩:添加随机TTL
random_ttl = ttl + randint(0, 60)
cache.set(cache_key, result, random_ttl)
return result
finally:
# 释放锁
redis_client.delete(lock_key)
else:
# 未获取到锁,等待后重试
time.sleep(0.05)
return wrapper(*args, **kwargs)
else:
# 不使用锁
result = func(*args, **kwargs)
if result:
cache.set(cache_key, result, ttl + randint(0, 60))
return result
return wrapper
return decorator
# 使用
@cache_with_protection(key_prefix='user', ttl=300, use_lock=True)
def get_user_protected(user_id):
return User.objects.filter(id=user_id).first()
缓存预热:
from celery import shared_task
@shared_task
def warmup_cache():
"""缓存预热任务"""
# 预热热门文章
hot_articles = Article.objects.filter(
views__gt=1000
).select_related('author', 'category')
for article in hot_articles:
cache.set(f'article:{article.id}', article, 3600)
# 预热热门用户
hot_users = User.objects.filter(followers__gt=100)
for user in hot_users:
cache.set(f'user:{user.id}', user, 3600)
# 在系统启动或定时任务中执行
from celery.schedules import crontab
app.conf.beat_schedule = {
'warmup-cache-every-hour': {
'task': 'myapp.tasks.warmup_cache',
'schedule': crontab(minute=0), # 每小时执行
},
}
监控缓存性能:
from django.core.cache import cache
import logging
logger = logging.getLogger(__name__)
class CacheMonitor:
"""缓存监控"""
@staticmethod
def get_with_metrics(key):
"""带监控的缓存获取"""
start = time.time()
value = cache.get(key)
duration = time.time() - start
# 记录缓存命中率
if value is not None:
cache.incr('cache:hits', 1)
logger.info(f"缓存命中: {key}, 耗时: {duration*1000:.2f}ms")
else:
cache.incr('cache:misses', 1)
logger.info(f"缓存未命中: {key}")
return value
@staticmethod
def get_hit_rate():
"""获取缓存命中率"""
hits = cache.get('cache:hits') or 0
misses = cache.get('cache:misses') or 0
total = hits + misses
if total == 0:
return 0
return hits / total * 100
# 使用Prometheus监控
from prometheus_client import Counter, Histogram
cache_hits = Counter('cache_hits_total', 'Total cache hits')
cache_misses = Counter('cache_misses_total', 'Total cache misses')
cache_duration = Histogram('cache_duration_seconds', 'Cache operation duration')
def monitored_cache_get(key):
with cache_duration.time():
value = cache.get(key)
if value is not None:
cache_hits.inc()
else:
cache_misses.inc()
return value
最佳实践总结:
"""
缓存穿透防护:
✅ 缓存空值(短TTL)
✅ 布隆过滤器预判
✅ 参数校验
缓存雪崩防护:
✅ 过期时间加随机值
✅ 永不过期+异步更新
✅ 多级缓存
✅ 熔断降级
缓存击穿防护:
✅ 分布式锁
✅ 热点数据永不过期
✅ 缓存预热
通用建议:
✅ 监控缓存命中率
✅ 设置合理的TTL
✅ 使用缓存集群
✅ 定期清理过期key
✅ 业务降级预案
"""
How does Celery work?
How does Celery work?
考察点:异步任务架构、消息队列。
答案:
Celery是分布式任务队列系统,由Worker(执行任务)、Broker(消息队列)、Backend(结果存储)、Beat(定时调度器)四个组件组成,通过消息队列实现任务的异步执行。
Celery架构:
客户端应用
│
├─ 发送任务 ─→ Broker (Redis/RabbitMQ)
│ │
│ ├─→ Worker 1 ─→ Backend (结果存储)
│ ├─→ Worker 2 ─→ Backend
│ └─→ Worker 3 ─→ Backend
│
└─ Celery Beat (定时任务调度器)
│
└─→ Broker
基本配置:
# celery.py
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# settings.py
# Broker配置(消息队列)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Backend配置(结果存储)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 其他配置
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30分钟超时
基本任务定义:
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(to_email, subject, message):
"""发送邮件任务"""
send_mail(
subject,
message,
'[email protected]',
[to_email],
fail_silently=False,
)
return f"邮件已发送到 {to_email}"
# 调用任务
# 异步执行
result = send_email_task.delay('[email protected]', '欢迎', '欢迎注册')
# 同步执行(测试用)
result = send_email_task('[email protected]', '欢迎', '欢迎注册')
# 延迟执行
from datetime import timedelta
send_email_task.apply_async(
args=['[email protected]', '提醒', '您有新消息'],
countdown=300 # 5分钟后执行
)
# 指定时间执行
from datetime import datetime
send_email_task.apply_async(
args=['[email protected]', '定时', '定时消息'],
eta=datetime(2024, 12, 31, 23, 59)
)
任务重试机制:
from celery import shared_task
from celery.exceptions import Retry
import requests
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_api_data(self, url):
"""带重试的API请求任务"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as exc:
# 指数退避重试
retry_countdown = 2 ** self.request.retries * 60
raise self.retry(
exc=exc,
countdown=retry_countdown, # 延迟时间:60s, 120s, 240s
max_retries=3
)
# 自定义重试条件
@shared_task(bind=True, autoretry_for=(requests.RequestException,),
retry_kwargs={'max_retries': 5}, retry_backoff=True)
def robust_fetch(self, url):
response = requests.get(url)
return response.json()
任务链和组合:
from celery import chain, group, chord
# 任务链(顺序执行)
@shared_task
def task1(x):
return x * 2
@shared_task
def task2(x):
return x + 10
@shared_task
def task3(x):
return x ** 2
# 链式执行:task1 → task2 → task3
result = chain(
task1.s(5), # 5 * 2 = 10
task2.s(), # 10 + 10 = 20
task3.s() # 20 ** 2 = 400
)()
print(result.get()) # 400
# 任务组(并行执行)
job = group(
task1.s(1),
task1.s(2),
task1.s(3),
)
result = job.apply_async()
print(result.get()) # [2, 4, 6]
# Chord(并行执行后汇总)
@shared_task
def summarize(results):
return sum(results)
job = chord([
task1.s(1),
task1.s(2),
task1.s(3),
])(summarize.s())
print(job.get()) # 12
# 复杂组合
workflow = chain(
group(task1.s(i) for i in range(10)), # 并行
summarize.s(), # 汇总
task3.s() # 最终处理
)
result = workflow()
定时任务(Celery Beat):
from celery.schedules import crontab
# settings.py
CELERY_BEAT_SCHEDULE = {
# 每天午夜执行
'cleanup-every-midnight': {
'task': 'myapp.tasks.cleanup_old_data',
'schedule': crontab(hour=0, minute=0),
},
# 每小时执行
'update-stats-hourly': {
'task': 'myapp.tasks.update_statistics',
'schedule': crontab(minute=0),
},
# 每30分钟执行
'check-health': {
'task': 'myapp.tasks.health_check',
'schedule': 30.0, # 秒数
},
# 工作日早上9点
'send-daily-report': {
'task': 'myapp.tasks.daily_report',
'schedule': crontab(hour=9, minute=0, day_of_week='1-5'),
},
# 每月1号
'monthly-billing': {
'task': 'myapp.tasks.billing',
'schedule': crontab(0, 0, day_of_month='1'),
},
}
# tasks.py
@shared_task
def cleanup_old_data():
"""清理30天前的数据"""
from datetime import timedelta
from django.utils import timezone
threshold = timezone.now() - timedelta(days=30)
deleted = OldData.objects.filter(created_at__lt=threshold).delete()
return f"清理了 {deleted[0]} 条数据"
# 启动Beat调度器
# celery -A myproject beat -l info
任务监控和管理:
from celery import current_app
# 获取任务状态
result = send_email_task.delay('[email protected]', '测试', '内容')
task_id = result.task_id
# 检查任务状态
from celery.result import AsyncResult
task = AsyncResult(task_id)
print(task.state) # PENDING, STARTED, SUCCESS, FAILURE, RETRY
if task.successful():
print(task.result)
elif task.failed():
print(task.traceback)
# 撤销任务
task.revoke(terminate=True)
# 查看活跃任务
from celery import current_app
inspect = current_app.control.inspect()
active_tasks = inspect.active() # 正在执行的任务
scheduled_tasks = inspect.scheduled() # 计划中的任务
reserved_tasks = inspect.reserved() # 保留的任务
# 查看worker状态
stats = inspect.stats()
print(stats)
错误处理和回调:
from celery import shared_task
from celery.signals import task_failure, task_success
@shared_task(bind=True)
def risky_task(self):
try:
# 执行可能失败的操作
result = perform_operation()
return result
except Exception as exc:
# 记录错误
logger.error(f"任务失败: {exc}")
# 可以发送通知
send_alert(f"任务 {self.request.id} 失败")
# 重新抛出异常
raise
# 使用信号
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
"""任务失败时的处理"""
print(f"任务 {task_id} 失败: {exception}")
# 发送告警、记录日志等
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
"""任务成功时的处理"""
print(f"任务成功,结果: {result}")
# 任务回调
@shared_task
def on_success_callback(result):
print(f"任务完成: {result}")
@shared_task
def on_failure_callback(request, exc, traceback):
print(f"任务失败: {exc}")
task.apply_async(
args=[1, 2],
link=on_success_callback.s(),
link_error=on_failure_callback.s()
)
长时间运行的任务:
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True)
def process_large_dataset(self, dataset_id):
"""处理大数据集"""
dataset = Dataset.objects.get(id=dataset_id)
total = dataset.items.count()
for index, item in enumerate(dataset.items.all(), 1):
# 处理每个项
process_item(item)
# 更新进度
self.update_state(
state='PROGRESS',
meta={
'current': index,
'total': total,
'percent': int(index / total * 100)
}
)
logger.info(f"处理进度: {index}/{total}")
return {'status': 'completed', 'processed': total}
# 查询进度
from celery.result import AsyncResult
task = process_large_dataset.delay(dataset_id=1)
# 轮询进度
while not task.ready():
if task.state == 'PROGRESS':
meta = task.info
print(f"进度: {meta['percent']}%")
time.sleep(1)
result = task.get()
消息队列选型:
"""
Redis:
优点:
- 配置简单
- 性能高
- 支持持久化
- 可作为结果后端
缺点:
- 内存存储为主
- 可靠性不如RabbitMQ
- 不支持复杂路由
适用:
- 中小型项目
- 任务丢失可接受
- 简单的任务分发
RabbitMQ:
优点:
- 功能强大
- 高可靠性
- 复杂路由支持
- 消息确认机制
缺点:
- 配置复杂
- 性能略低于Redis
适用:
- 大型项目
- 任务不能丢失
- 复杂的消息路由
Kafka:
优点:
- 高吞吐量
- 持久化
- 支持消息重放
缺点:
- 延迟较高
- 配置最复杂
适用:
- 超大规模
- 数据管道
- 事件溯源
"""
# 配置示例
# Redis
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# RabbitMQ
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
# Kafka
CELERY_BROKER_URL = 'kafka://localhost:9092'
最佳实践:
# 1. 任务应该是幂等的
@shared_task
def idempotent_task(order_id):
"""幂等任务:多次执行结果相同"""
order = Order.objects.get(id=order_id)
# ✅ 使用状态检查避免重复处理
if order.status == 'processed':
return "已处理"
process_order(order)
order.status = 'processed'
order.save()
return "处理完成"
# 2. 设置合理的超时
@shared_task(time_limit=300, soft_time_limit=240)
def time_limited_task():
"""带超时的任务"""
# soft_time_limit: 发送SIGTERM信号
# time_limit: 强制SIGKILL
pass
# 3. 任务优先级
high_priority_task.apply_async(priority=10) # 高优先级
low_priority_task.apply_async(priority=0) # 低优先级
# 4. 任务路由
CELERY_TASK_ROUTES = {
'myapp.tasks.cpu_intensive': {'queue': 'cpu'},
'myapp.tasks.io_intensive': {'queue': 'io'},
}
# 启动不同的worker处理不同队列
# celery -A myproject worker -Q cpu -c 4
# celery -A myproject worker -Q io -c 20
How to design database read/write splitting in Django?
How to design database read/write splitting in Django?
考察点:数据库架构、主从复制。
答案:
数据库读写分离通过配置主库处理写操作,从库处理读操作,提高数据库并发能力。Django通过数据库路由器实现读写分离和负载均衡。
配置多数据库:
# settings.py
DATABASES = {
'default': { # 主库(写)
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'USER': 'root',
'PASSWORD': 'password',
'HOST': 'master.db.example.com',
'PORT': '3306',
'OPTIONS': {
'charset': 'utf8mb4',
},
},
'replica1': { # 从库1(读)
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'USER': 'readonly',
'PASSWORD': 'password',
'HOST': 'replica1.db.example.com',
'PORT': '3306',
},
'replica2': { # 从库2(读)
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'USER': 'readonly',
'PASSWORD': 'password',
'HOST': 'replica2.db.example.com',
'PORT': '3306',
},
}
实现数据库路由器:
import random
class PrimaryReplicaRouter:
"""读写分离路由器"""
def db_for_read(self, model, **hints):
"""读操作路由到从库"""
# 随机选择一个从库实现负载均衡
return random.choice(['replica1', 'replica2'])
def db_for_write(self, model, **hints):
"""写操作路由到主库"""
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"""允许任何数据库间的关系"""
db_set = {'default', 'replica1', 'replica2'}
if obj1._state.db in db_set and obj2._state.db in db_set:
return True
return None
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""只在主库执行迁移"""
return db == 'default'
# settings.py
DATABASE_ROUTERS = ['myapp.routers.PrimaryReplicaRouter']
高级路由器实现:
class SmartRouter:
"""智能路由器"""
# 读从库,写主库的应用
read_replica_apps = {'blog', 'news'}
# 始终用主库的应用(实时性要求高)
master_only_apps = {'payment', 'order'}
def db_for_read(self, model, **hints):
"""智能读路由"""
app_label = model._meta.app_label
# 某些应用必须读主库
if app_label in self.master_only_apps:
return 'default'
# 事务中的读操作用主库(避免主从延迟)
from django.db import transaction
if transaction.get_connection().in_atomic_block:
return 'default'
# 其他读操作用从库
if app_label in self.read_replica_apps:
# 权重负载均衡
replicas = [
('replica1', 60), # 60%流量
('replica2', 40), # 40%流量
]
rand = random.randint(1, 100)
cumulative = 0
for db, weight in replicas:
cumulative += weight
if rand <= cumulative:
return db
return 'default'
def db_for_write(self, model, **hints):
"""写操作始终用主库"""
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"""允许同一组数据库的关系"""
db_set = {'default', 'replica1', 'replica2'}
if obj1._state.db in db_set and obj2._state.db in db_set:
return True
return None
手动指定数据库:
# 强制使用主库
users = User.objects.using('default').all()
# 强制使用从库
users = User.objects.using('replica1').all()
# 保存到指定数据库
user = User(username='alice')
user.save(using='default')
# 删除指定数据库的对象
user.delete(using='default')
# 事务中指定数据库
from django.db import transaction
with transaction.atomic(using='default'):
user = User.objects.create(username='bob')
profile = Profile.objects.create(user=user)
处理主从延迟:
from django.db import transaction
def create_user_and_send_email(username, email):
"""创建用户后发送邮件"""
# 方案1:使用事务后的回调
user = User.objects.create(username=username, email=email)
def send_email_after_commit():
# 确保主从同步后再读取
from django.db import connection
connection.ensure_connection()
# 强制从主库读取
user = User.objects.using('default').get(id=user.id)
send_welcome_email(user.email)
transaction.on_commit(send_email_after_commit)
return user
# 方案2:延迟读取
from celery import shared_task
@shared_task
def delayed_process(user_id):
"""延迟5秒后处理,等待主从同步"""
import time
time.sleep(5) # 等待主从同步
user = User.objects.get(id=user_id)
process_user(user)
def create_user_delayed(username, email):
user = User.objects.create(username=username, email=email)
delayed_process.delay(user.id)
return user
读写分离配置示例:
# 生产环境配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_USER'),
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': os.environ.get('DB_MASTER_HOST'),
'PORT': '3306',
'CONN_MAX_AGE': 60, # 连接池
'OPTIONS': {
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
'charset': 'utf8mb4',
},
},
'replica': {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_READONLY_USER'),
'PASSWORD': os.environ.get('DB_READONLY_PASSWORD'),
'HOST': os.environ.get('DB_REPLICA_HOST'),
'PORT': '3306',
'CONN_MAX_AGE': 60,
},
}
DATABASE_ROUTERS = ['myapp.routers.PrimaryReplicaRouter']
# 监控主从延迟
from django.core.management.base import BaseCommand
import time
class Command(BaseCommand):
def handle(self, *args, **options):
"""监控主从延迟"""
from django.db import connections
while True:
# 在主库写入时间戳
with connections['default'].cursor() as cursor:
cursor.execute(
"INSERT INTO heartbeat (updated_at) VALUES (NOW())"
)
# 从从库读取时间戳
time.sleep(0.1)
with connections['replica'].cursor() as cursor:
cursor.execute(
"SELECT TIMESTAMPDIFF(MICROSECOND, updated_at, NOW()) "
"FROM heartbeat ORDER BY id DESC LIMIT 1"
)
delay = cursor.fetchone()[0]
print(f"主从延迟: {delay/1000:.2f}ms")
time.sleep(60)
最佳实践:
How to design RESTful API versioning?
How to design RESTful API versioning?
考察点:API设计、版本管理。
答案:
API版本管理有URL路径、请求头、查询参数等多种方式。Django REST Framework内置了版本控制支持,可以根据业务需求选择合适的版本控制策略。
版本控制方式对比:
# 方式1:URL路径版本(推荐)
# 优点:清晰明了、易于缓存、支持浏览器访问
# 缺点:URL冗余
"""
/api/v1/users/
/api/v2/users/
/api/v3/users/
"""
# 方式2:请求头版本
# 优点:URL简洁、符合REST原则
# 缺点:浏览器不友好、缓存复杂
"""
GET /api/users/
Accept: application/vnd.myapp.v1+json
GET /api/users/
Accept: application/vnd.myapp.v2+json
"""
# 方式3:查询参数版本
# 优点:实现简单
# 缺点:不符合REST原则、容易被忽略
"""
/api/users/?version=1
/api/users/?version=2
"""
# 方式4:自定义请求头
# 优点:灵活
# 缺点:非标准
"""
GET /api/users/
API-Version: 1
"""
DRF版本控制实现:
# settings.py
REST_FRAMEWORK = {
'DEFAULT_VERSIONING_CLASS': 'rest_framework.versioning.URLPathVersioning',
'DEFAULT_VERSION': 'v1',
'ALLOWED_VERSIONS': ['v1', 'v2', 'v3'],
'VERSION_PARAM': 'version',
}
# urls.py
from django.urls import path, include
from rest_framework import routers
router_v1 = routers.DefaultRouter()
router_v1.register(r'users', UserViewSetV1)
router_v1.register(r'articles', ArticleViewSetV1)
router_v2 = routers.DefaultRouter()
router_v2.register(r'users', UserViewSetV2)
router_v2.register(r'articles', ArticleViewSetV2)
urlpatterns = [
path('api/v1/', include(router_v1.urls)),
path('api/v2/', include(router_v2.urls)),
]
# 或使用单一路由
from rest_framework.versioning import URLPathVersioning
urlpatterns = [
path('api/<version>/users/', UserView.as_view()),
]
ViewSet中处理不同版本:
from rest_framework import viewsets
from rest_framework.versioning import URLPathVersioning
class UserViewSet(viewsets.ModelViewSet):
versioning_class = URLPathVersioning
def get_serializer_class(self):
"""根据API版本返回不同的序列化器"""
if self.request.version == 'v1':
return UserSerializerV1
elif self.request.version == 'v2':
return UserSerializerV2
elif self.request.version == 'v3':
return UserSerializerV3
return UserSerializerV1 # 默认版本
def get_queryset(self):
"""根据版本优化查询"""
queryset = User.objects.all()
if self.request.version == 'v2':
# v2版本需要额外的关联数据
queryset = queryset.select_related('profile')
return queryset
def list(self, request, *args, **kwargs):
"""根据版本返回不同的响应格式"""
queryset = self.filter_queryset(self.get_queryset())
serializer = self.get_serializer(queryset, many=True)
if request.version == 'v1':
# v1: 简单列表
return Response(serializer.data)
else:
# v2+: 包含元数据
return Response({
'version': request.version,
'count': queryset.count(),
'results': serializer.data
})
序列化器版本管理:
# serializers.py
# V1版本
class UserSerializerV1(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email']
# V2版本 - 添加新字段
class UserSerializerV2(serializers.ModelSerializer):
profile_picture = serializers.URLField(source='profile.picture')
class Meta:
model = User
fields = ['id', 'username', 'email', 'profile_picture', 'created_at']
# V3版本 - 修改字段名称
class UserSerializerV3(serializers.ModelSerializer):
# 重命名字段
user_name = serializers.CharField(source='username')
user_email = serializers.EmailField(source='email')
class Meta:
model = User
fields = ['id', 'user_name', 'user_email', 'profile', 'metadata']
处理API废弃(Deprecation):
from rest_framework.response import Response
from rest_framework import status
import warnings
class DeprecationMiddleware:
"""API废弃警告中间件"""
DEPRECATED_VERSIONS = {'v1': '2024-12-31'}
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 检查是否使用废弃版本
if hasattr(request, 'version'):
if request.version in self.DEPRECATED_VERSIONS:
sunset_date = self.DEPRECATED_VERSIONS[request.version]
# 添加废弃警告头
response['Warning'] = (
f'299 - "API version {request.version} is deprecated. '
f'Will be removed on {sunset_date}"'
)
response['Sunset'] = sunset_date
response['Link'] = '</api/v2/>; rel="successor-version"'
return response
# ViewSet中处理废弃
class UserViewSet(viewsets.ModelViewSet):
def list(self, request, *args, **kwargs):
if request.version == 'v1':
warnings.warn(
"API v1 已废弃,请迁移到 v2",
DeprecationWarning
)
return super().list(request, *args, **kwargs)
向后兼容策略:
# 1. 字段别名保持兼容
class UserSerializer(serializers.ModelSerializer):
# 新字段名
user_name = serializers.CharField(source='username')
# 保留旧字段名(只读)
username = serializers.CharField(read_only=True)
class Meta:
model = User
fields = ['id', 'username', 'user_name', 'email']
# 2. 兼容性装饰器
from functools import wraps
def compatible_response(old_version):
"""将新版本响应转换为旧版本格式"""
def decorator(func):
@wraps(func)
def wrapper(self, request, *args, **kwargs):
response = func(self, request, *args, **kwargs)
if request.version == old_version:
# 转换响应格式
data = response.data
if isinstance(data, dict) and 'results' in data:
# v2格式 → v1格式
response.data = data['results']
return response
return wrapper
return decorator
class UserViewSet(viewsets.ModelViewSet):
@compatible_response('v1')
def list(self, request, *args, **kwargs):
return super().list(request, *args, **kwargs)
# 3. 版本转换中间件
class APIVersionTransformer:
"""自动转换API版本格式"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 转换响应格式以兼容旧版本
if hasattr(request, 'version') and request.version == 'v1':
if hasattr(response, 'data'):
response.data = self.transform_v2_to_v1(response.data)
return response
def transform_v2_to_v1(self, data):
"""将v2格式转换为v1格式"""
if isinstance(data, dict):
# 移除v2新增的字段
data.pop('metadata', None)
# 重命名字段
if 'user_name' in data:
data['username'] = data.pop('user_name')
return data
版本迁移文档:
"""
API版本迁移指南
V1 → V2 迁移:
Breaking Changes:
1. 响应格式变更
- V1: 直接返回列表 []
- V2: 包含元数据 {count, results}
2. 字段重命名
- username → user_name
- email → user_email
3. 新增字段
- profile_picture
- created_at
4. 移除字段
- last_login(移至单独端点)
迁移步骤:
1. 更新请求URL: /api/v1/users/ → /api/v2/users/
2. 处理新的响应格式: data → data.results
3. 更新字段名称
4. 测试验证
Sunset Date: 2024-12-31
Support: [email protected]
"""
# 在API响应中包含版本信息
class VersionedAPIView(APIView):
def get(self, request):
return Response({
'version': request.version,
'deprecated': request.version in ['v1'],
'sunset_date': '2024-12-31' if request.version == 'v1' else None,
'data': {...}
})
测试不同版本:
from rest_framework.test import APITestCase
class APIVersionTest(APITestCase):
def test_v1_response_format(self):
"""测试v1返回格式"""
response = self.client.get('/api/v1/users/')
self.assertIsInstance(response.data, list)
def test_v2_response_format(self):
"""测试v2返回格式"""
response = self.client.get('/api/v2/users/')
self.assertIn('count', response.data)
self.assertIn('results', response.data)
def test_v1_deprecation_warning(self):
"""测试v1废弃警告"""
response = self.client.get('/api/v1/users/')
self.assertIn('Warning', response)
最佳实践:
How to implement distributed transactions?
How to implement distributed transactions?
考察点:分布式系统、事务一致性。
答案:
分布式事务需要在多个服务或数据库间保持数据一致性。常用方案包括本地消息表(最终一致性)、Saga模式(补偿事务)、TCC(Try-Confirm-Cancel)等。
问题场景:
# 场景:电商下单
# 涉及:订单服务、库存服务、支付服务
# ❌ 单体事务(分布式环境不可用)
@transaction.atomic
def create_order(order_data):
# 创建订单
order = Order.objects.create(**order_data)
# 调用库存服务(不同数据库/服务)
inventory_service.reduce_stock(order.items) # 可能失败
# 调用支付服务
payment_service.process(order.total) # 可能失败
# 问题:跨服务的操作无法在同一事务中
方案1:本地消息表 + 最终一致性:
from django.db import transaction, models
class OutboxMessage(models.Model):
"""本地消息表"""
event_type = models.CharField(max_length=50)
payload = models.JSONField()
status = models.CharField(max_length=20, default='pending')
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(null=True)
retry_count = models.IntegerField(default=0)
@transaction.atomic
def create_order(order_data):
"""创建订单并发送消息"""
# 1. 在本地事务中创建订单
order = Order.objects.create(**order_data)
# 2. 在同一事务中创建消息记录
OutboxMessage.objects.create(
event_type='ORDER_CREATED',
payload={
'order_id': order.id,
'items': [
{'product_id': item.product_id, 'quantity': item.quantity}
for item in order.items.all()
],
'total': float(order.total)
}
)
# 3. 事务提交,订单和消息都持久化
return order
# 后台任务扫描并发送消息
from celery import shared_task
import requests
@shared_task
def process_outbox_messages():
"""处理待发送的消息"""
messages = OutboxMessage.objects.filter(
status='pending',
retry_count__lt=3
)[:100]
for message in messages:
try:
if message.event_type == 'ORDER_CREATED':
payload = message.payload
# 调用库存服务
inventory_response = requests.post(
'http://inventory-service/reduce',
json={'items': payload['items']}
)
inventory_response.raise_for_status()
# 调用支付服务
payment_response = requests.post(
'http://payment-service/charge',
json={'order_id': payload['order_id'], 'amount': payload['total']}
)
payment_response.raise_for_status()
# 标记为已处理
message.status = 'processed'
message.processed_at = timezone.now()
message.save()
except Exception as e:
# 增加重试计数
message.retry_count += 1
if message.retry_count >= 3:
message.status = 'failed'
message.save()
logger.error(f"消息处理失败: {e}")
# 定时任务
from celery.schedules import crontab
app.conf.beat_schedule = {
'process-outbox-every-minute': {
'task': 'myapp.tasks.process_outbox_messages',
'schedule': 60.0, # 每分钟
},
}
方案2:Saga模式(编排式):
from enum import Enum
class SagaStatus(Enum):
PENDING = 'pending'
COMPLETED = 'completed'
FAILED = 'failed'
COMPENSATING = 'compensating'
COMPENSATED = 'compensated'
class OrderSaga:
"""订单Saga编排器"""
def __init__(self, order_data):
self.order_data = order_data
self.order = None
self.inventory_reserved = False
self.payment_processed = False
def execute(self):
"""执行Saga"""
try:
# 步骤1:创建订单
self.order = self.create_order()
# 步骤2:预留库存
self.reserve_inventory()
self.inventory_reserved = True
# 步骤3:处理支付
self.process_payment()
self.payment_processed = True
# 步骤4:确认订单
self.confirm_order()
return self.order
except Exception as e:
# 执行补偿事务
self.compensate()
raise
def create_order(self):
"""创建订单"""
order = Order.objects.create(
**self.order_data,
status='pending'
)
return order
def reserve_inventory(self):
"""预留库存"""
response = requests.post(
'http://inventory-service/reserve',
json={
'order_id': self.order.id,
'items': self.order_data['items']
}
)
response.raise_for_status()
def process_payment(self):
"""处理支付"""
response = requests.post(
'http://payment-service/charge',
json={
'order_id': self.order.id,
'amount': self.order.total
}
)
response.raise_for_status()
def confirm_order(self):
"""确认订单"""
self.order.status = 'confirmed'
self.order.save()
def compensate(self):
"""补偿事务(回滚)"""
logger.error(f"订单 {self.order.id} 创建失败,执行补偿")
# 按相反顺序回滚
if self.payment_processed:
self.cancel_payment()
if self.inventory_reserved:
self.release_inventory()
if self.order:
self.cancel_order()
def cancel_payment(self):
"""取消支付"""
requests.post(
'http://payment-service/refund',
json={'order_id': self.order.id}
)
def release_inventory(self):
"""释放库存"""
requests.post(
'http://inventory-service/release',
json={'order_id': self.order.id}
)
def cancel_order(self):
"""取消订单"""
self.order.status = 'cancelled'
self.order.save()
# 使用
def place_order(request):
order_data = request.data
saga = OrderSaga(order_data)
try:
order = saga.execute()
return Response({
'order_id': order.id,
'status': 'success'
})
except Exception as e:
return Response({
'error': str(e),
'status': 'failed'
}, status=400)
方案3:TCC模式:
class TCCTransaction:
"""TCC事务协调器"""
def __init__(self):
self.participants = []
def try_phase(self, order_data):
"""Try阶段:预留资源"""
try:
# Try 1: 预留库存
inventory_result = self.try_reserve_inventory(order_data)
self.participants.append(('inventory', inventory_result))
# Try 2: 预授权支付
payment_result = self.try_payment(order_data)
self.participants.append(('payment', payment_result))
# Try 3: 预创建订单
order_result = self.try_create_order(order_data)
self.participants.append(('order', order_result))
return True
except Exception as e:
# Try失败,执行Cancel
self.cancel_phase()
raise
def confirm_phase(self):
"""Confirm阶段:确认提交"""
for service, result in self.participants:
if service == 'inventory':
self.confirm_inventory(result)
elif service == 'payment':
self.confirm_payment(result)
elif service == 'order':
self.confirm_order(result)
def cancel_phase(self):
"""Cancel阶段:取消预留"""
# 按相反顺序取消
for service, result in reversed(self.participants):
if service == 'inventory':
self.cancel_inventory(result)
elif service == 'payment':
self.cancel_payment(result)
elif service == 'order':
self.cancel_order(result)
def try_reserve_inventory(self, order_data):
"""预留库存"""
response = requests.post(
'http://inventory-service/try-reserve',
json=order_data
)
return response.json()['reservation_id']
def confirm_inventory(self, reservation_id):
"""确认库存预留"""
requests.post(
'http://inventory-service/confirm',
json={'reservation_id': reservation_id}
)
def cancel_inventory(self, reservation_id):
"""取消库存预留"""
requests.post(
'http://inventory-service/cancel',
json={'reservation_id': reservation_id}
)
# 使用
def create_order_tcc(order_data):
tcc = TCCTransaction()
try:
# Try阶段
tcc.try_phase(order_data)
# Confirm阶段
tcc.confirm_phase()
return {'status': 'success'}
except Exception as e:
return {'status': 'failed', 'error': str(e)}
幂等性保证:
class IdempotentOrder:
"""幂等的订单创建"""
@transaction.atomic
def create(self, idempotency_key, order_data):
"""使用幂等键创建订单"""
# 检查是否已存在
existing = Order.objects.filter(
idempotency_key=idempotency_key
).first()
if existing:
return existing # 返回已存在的订单
# 创建新订单
order = Order.objects.create(
idempotency_key=idempotency_key,
**order_data
)
return order
# API中使用
from rest_framework.views import APIView
class CreateOrderView(APIView):
def post(self, request):
# 从请求头获取幂等键
idempotency_key = request.META.get('HTTP_IDEMPOTENCY_KEY')
if not idempotency_key:
return Response(
{'error': 'Missing Idempotency-Key header'},
status=400
)
order = IdempotentOrder().create(
idempotency_key,
request.data
)
return Response({
'order_id': order.id,
'status': order.status
})
CAP定理考虑:
"""
CAP定理:
- Consistency(一致性)
- Availability(可用性)
- Partition Tolerance(分区容错性)
只能同时满足两个
分布式事务策略选择:
强一致性(CP):
└── 2PC/3PC
├── 优点:强一致性
├── 缺点:性能差、阻塞
└── 适用:金融交易
最终一致性(AP):
├── 本地消息表
│ ├── 优点:实现简单、可靠
│ └── 适用:大多数场景
│
├── Saga模式
│ ├── 优点:灵活、无长事务
│ └── 适用:复杂业务流程
│
└── TCC
├── 优点:性能好、一致性强
└── 适用:高并发场景
"""
# Django中使用最终一致性
def create_order_eventually_consistent(order_data):
"""最终一致性的订单创建"""
# 1. 本地事务
with transaction.atomic():
order = Order.objects.create(**order_data)
OutboxMessage.objects.create(
event_type='ORDER_CREATED',
payload={'order_id': order.id}
)
# 2. 异步处理消息(最终一致)
# Celery worker会处理OutboxMessage
# 即使暂时失败,也会重试直到成功
return order
最佳实践:
What are common security vulnerabilities in Django applications?
What are common security vulnerabilities in Django applications?
考察点:Web安全、漏洞防护。
答案:
Django应用常见的安全隐患包括SQL注入、XSS、CSRF、敏感信息泄露、不安全的配置等。Django内置了多种安全机制,但需要正确使用和配置。
1. SQL注入漏洞:
from django.http import HttpResponse
from .models import User
# ❌ 危险 - SQL注入风险
def search_users_unsafe(request):
keyword = request.GET.get('q', '')
# 直接拼接SQL
users = User.objects.raw(
f"SELECT * FROM users WHERE name LIKE '%{keyword}%'"
)
# 攻击:?q='; DROP TABLE users; --
return HttpResponse(list(users))
# ✅ 安全 - 使用参数化查询
def search_users_safe(request):
keyword = request.GET.get('q', '')
# ORM自动转义
users = User.objects.filter(name__icontains=keyword)
return HttpResponse(list(users))
# ✅ 安全 - raw()使用参数
def search_users_raw_safe(request):
keyword = request.GET.get('q', '')
users = User.objects.raw(
"SELECT * FROM users WHERE name LIKE %s",
[f'%{keyword}%'] # 参数化
)
return HttpResponse(list(users))
2. XSS(跨站脚本)漏洞:
from django.http import HttpResponse
from django.template import Template, Context
from django.utils.html import escape
# ❌ 危险 - XSS风险
def user_profile_unsafe(request):
bio = request.user.bio # 可能包含 <script>alert('XSS')</script>
html = f"<div class='bio'>{bio}</div>"
return HttpResponse(html)
# ✅ 安全 - 使用模板自动转义
def user_profile_safe1(request):
return render(request, 'profile.html', {
'bio': request.user.bio # 模板会自动转义
})
# ✅ 安全 - 手动转义
def user_profile_safe2(request):
bio = escape(request.user.bio)
html = f"<div class='bio'>{bio}</div>"
return HttpResponse(html)
# ✅ 安全 - JsonResponse
def user_api_safe(request):
return JsonResponse({
'bio': request.user.bio # JSON自动转义
})
# 模板中的XSS防护
"""
<!-- ✅ 自动转义 -->
<div>{{ user.bio }}</div>
<!-- ❌ 危险 - 标记为安全 -->
<div>{{ user.bio|safe }}</div>
<!-- ✅ 在JavaScript中使用 -->
<script>
const bio = "{{ user.bio|escapejs }}";
</script>
"""
3. CSRF绕过:
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
# ❌ 危险 - 绕过CSRF保护
@csrf_exempt
def dangerous_api(request):
if request.method == 'POST':
# 处理敏感操作(如转账)
amount = request.POST.get('amount')
transfer_money(amount)
return HttpResponse('OK')
# ✅ 安全 - 保持CSRF保护
@require_http_methods(["POST"])
def safe_api(request):
# Django会验证CSRF token
amount = request.POST.get('amount')
transfer_money(amount)
return HttpResponse('OK')
# ✅ 安全 - API使用其他认证方式
from rest_framework.decorators import api_view
from rest_framework.permissions import IsAuthenticated
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def api_endpoint(request):
# DRF的token/JWT认证不需要CSRF
pass
4. 敏感信息泄露:
# ❌ 危险配置
# settings.py
DEBUG = True # 生产环境泄露错误详情
SECRET_KEY = 'hardcoded-secret-key-123' # 硬编码密钥
DATABASES = {
'default': {
'PASSWORD': 'password123' # 密码硬编码
}
}
# ✅ 安全配置
import os
from decouple import config
DEBUG = config('DEBUG', default=False, cast=bool)
SECRET_KEY = config('SECRET_KEY')
DATABASES = {
'default': {
'PASSWORD': config('DB_PASSWORD')
}
}
# 错误处理
ADMINS = [('Admin', '[email protected]')]
LOGGING = {
'version': 1,
'handlers': {
'file': {
'class': 'logging.FileHandler',
'filename': '/var/log/django/error.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'ERROR',
},
},
}
# 生产环境隐藏敏感信息
if not DEBUG:
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
5. 不安全的文件上传:
from django.core.files.storage import FileSystemStorage
import os
# ❌ 危险 - 文件上传漏洞
def upload_file_unsafe(request):
uploaded_file = request.FILES['file']
filename = uploaded_file.name # 用户可控
# 危险:可以上传 ../../etc/passwd
fs = FileSystemStorage(location='/var/www/uploads')
fs.save(filename, uploaded_file)
# ✅ 安全 - 验证和清理文件名
import uuid
from django.core.exceptions import ValidationError
def upload_file_safe(request):
uploaded_file = request.FILES['file']
# 验证文件类型
allowed_extensions = ['.jpg', '.png', '.pdf']
ext = os.path.splitext(uploaded_file.name)[1].lower()
if ext not in allowed_extensions:
raise ValidationError('不允许的文件类型')
# 验证文件大小
max_size = 5 * 1024 * 1024 # 5MB
if uploaded_file.size > max_size:
raise ValidationError('文件过大')
# 使用UUID生成安全的文件名
filename = f"{uuid.uuid4()}{ext}"
fs = FileSystemStorage(location='/var/www/uploads')
fs.save(filename, uploaded_file)
return HttpResponse(f'文件已上传: {filename}')
# 使用Django的FileField验证
from django.db import models
class Document(models.Model):
file = models.FileField(
upload_to='documents/%Y/%m/%d/',
max_length=200,
validators=[
FileExtensionValidator(allowed_extensions=['pdf', 'doc', 'docx'])
]
)
安全配置清单:
# settings.py - 生产环境安全配置
# 1. HTTPS强制
SECURE_SSL_REDIRECT = True # HTTP重定向到HTTPS
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# 2. Cookie安全
SESSION_COOKIE_SECURE = True # 只通过HTTPS传输
CSRF_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True # 防止JavaScript访问
CSRF_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax' # 防止CSRF
CSRF_COOKIE_SAMESITE = 'Lax'
# 3. 安全响应头
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
X_FRAME_OPTIONS = 'DENY' # 防止点击劫持
# 4. HSTS(HTTP Strict Transport Security)
SECURE_HSTS_SECONDS = 31536000 # 1年
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# 5. 允许的主机
ALLOWED_HOSTS = ['example.com', 'www.example.com']
# 6. 密码验证
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
'OPTIONS': {'min_length': 12}
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# 7. 内容安全策略(CSP)
CSP_DEFAULT_SRC = ("'self'",)
CSP_SCRIPT_SRC = ("'self'", "https://cdn.example.com")
CSP_STYLE_SRC = ("'self'", "'unsafe-inline'")
# 8. 数据库连接安全
DATABASES = {
'default': {
'OPTIONS': {
'ssl': {'ca': '/path/to/ca-cert.pem'}
}
}
}
安全中间件示例:
import logging
logger = logging.getLogger('security')
class SecurityMiddleware:
"""安全检查中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 检查SQL注入特征
dangerous_patterns = ["'", '"', '--', ';', 'DROP', 'DELETE']
for key, value in request.GET.items():
for pattern in dangerous_patterns:
if pattern.lower() in str(value).lower():
logger.warning(
f"可疑请求: {request.path}?{key}={value}",
extra={'ip': request.META.get('REMOTE_ADDR')}
)
# 检查异常User-Agent
user_agent = request.META.get('HTTP_USER_AGENT', '')
if 'sqlmap' in user_agent.lower() or 'nmap' in user_agent.lower():
logger.error(f"检测到扫描工具: {user_agent}")
return HttpResponse('Forbidden', status=403)
response = self.get_response(request)
# 添加安全响应头
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
return response
密码加密和验证:
from django.contrib.auth.hashers import make_password, check_password
# ✅ 使用Django的密码哈希
user = User.objects.create(
username='alice',
password=make_password('plain_password') # PBKDF2加密
)
# 验证密码
is_valid = check_password('plain_password', user.password)
# ❌ 永远不要
user.password = 'plain_password' # 明文存储
user.save()
# 密码哈希算法配置
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.Argon2PasswordHasher', # 最安全
'django.contrib.auth.hashers.PBKDF2PasswordHasher',
'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher',
'django.contrib.auth.hashers.BCryptSHA256PasswordHasher',
]
API安全最佳实践:
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.throttling import UserRateThrottle
@api_view(['POST'])
@permission_classes([IsAuthenticated])
@throttle_classes([UserRateThrottle])
def secure_api(request):
# 1. 认证检查
if not request.user.is_authenticated:
return Response({'error': 'Unauthorized'}, status=401)
# 2. 权限检查
if not request.user.has_perm('app.can_access_api'):
return Response({'error': 'Forbidden'}, status=403)
# 3. 输入验证
from rest_framework import serializers
class InputSerializer(serializers.Serializer):
amount = serializers.DecimalField(max_digits=10, decimal_places=2)
def validate_amount(self, value):
if value <= 0:
raise serializers.ValidationError("金额必须大于0")
if value > 10000:
raise serializers.ValidationError("金额不能超过10000")
return value
serializer = InputSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# 4. 业务逻辑
amount = serializer.validated_data['amount']
process_payment(amount)
# 5. 安全响应(不泄露敏感信息)
return Response({
'status': 'success',
'transaction_id': 'xxx' # 不返回敏感数据
})
安全检查清单:
"""
✅ 认证和授权
- 使用Django内置认证系统
- 实现细粒度权限控制
- API使用Token/JWT认证
- 定期轮换密钥
✅ 数据验证
- 使用Form/Serializer验证
- 验证文件上传
- 检查数据类型和范围
- 防止批量赋值漏洞
✅ 防护措施
- 开启CSRF保护
- 配置CSP头
- 使用HTTPS
- 设置安全的Cookie属性
✅ 敏感信息
- 使用环境变量
- 不在代码中硬编码密钥
- 日志脱敏
- 错误信息不泄露详情
✅ 依赖安全
- 定期更新依赖
- 使用pip-audit检查漏洞
- 锁定依赖版本
✅ 监控和审计
- 记录敏感操作
- 异常登录告警
- 定期安全审计
"""
使用django-security工具:
# 安全检查工具
pip install django-security
pip install bandit # Python安全扫描
pip install safety # 依赖漏洞检查
# 运行安全检查
python manage.py check --deploy
# 检查依赖漏洞
safety check
# 代码安全扫描
bandit -r myapp/
How to manage Django configuration following 12-Factor App principles?
How to manage Django configuration following 12-Factor App principles?
考察点:配置管理、环境隔离。
答案:
按照12-Factor App原则,配置应该存储在环境变量中,而不是代码中。Django通过环境变量、配置文件分离、密钥管理服务实现不同环境的配置管理。
12-Factor App配置原则:
"""
核心原则:
1. 配置与代码严格分离
2. 使用环境变量存储配置
3. 不同环境使用相同代码
4. 配置不提交到版本控制
5. 支持多环境部署
"""
配置管理方案:
# ❌ 反模式:硬编码配置
# settings.py
SECRET_KEY = 'my-secret-key-12345'
DATABASE_PASSWORD = 'password123'
DEBUG = True
ALLOWED_HOSTS = ['example.com']
# ✅ 最佳实践:环境变量
# settings.py
import os
from decouple import config
SECRET_KEY = config('SECRET_KEY')
DATABASE_PASSWORD = config('DB_PASSWORD')
DEBUG = config('DEBUG', default=False, cast=bool)
ALLOWED_HOSTS = config('ALLOWED_HOSTS', cast=lambda v: [s.strip() for s in v.split(',')])
# .env 文件(不提交到git)
"""
SECRET_KEY=your-secret-key-here
DB_PASSWORD=your-db-password
DEBUG=False
ALLOWED_HOSTS=example.com,www.example.com
"""
# .env.example(提交到git)
"""
SECRET_KEY=
DB_PASSWORD=
DEBUG=False
ALLOWED_HOSTS=localhost
"""
多环境配置结构:
# 项目结构
"""
myproject/
├── settings/
│ ├── __init__.py
│ ├── base.py # 基础配置(所有环境共享)
│ ├── development.py # 开发环境
│ ├── staging.py # 预发布环境
│ ├── production.py # 生产环境
│ └── testing.py # 测试环境
├── manage.py
└── .env
"""
# settings/base.py - 通用配置
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent.parent
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
# ...
]
MIDDLEWARE = [
# ...
]
# settings/development.py
from .base import *
DEBUG = True
ALLOWED_HOSTS = ['localhost', '127.0.0.1']
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# 开发环境特定配置
INSTALLED_APPS += ['debug_toolbar', 'django_extensions']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
INTERNAL_IPS = ['127.0.0.1']
# settings/production.py
from .base import *
from decouple import config
DEBUG = False
ALLOWED_HOSTS = config('ALLOWED_HOSTS', cast=lambda v: v.split(','))
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME'),
'USER': config('DB_USER'),
'PASSWORD': config('DB_PASSWORD'),
'HOST': config('DB_HOST'),
'PORT': config('DB_PORT', default='5432'),
'CONN_MAX_AGE': 600,
}
}
# 生产环境安全配置
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'ERROR',
'class': 'logging.FileHandler',
'filename': '/var/log/django/error.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'ERROR',
'propagate': True,
},
},
}
# settings/__init__.py - 根据环境加载配置
import os
env = os.environ.get('DJANGO_ENV', 'development')
if env == 'production':
from .production import *
elif env == 'staging':
from .staging import *
elif env == 'testing':
from .testing import *
else:
from .development import *
使用环境变量:
# .env 文件
DJANGO_ENV=production
SECRET_KEY=your-secret-key
DEBUG=False
DB_NAME=mydb
DB_USER=dbuser
DB_PASSWORD=dbpass
DB_HOST=localhost
DB_PORT=5432
REDIS_URL=redis://localhost:6379/0
ALLOWED_HOSTS=example.com,www.example.com
# 加载环境变量
export $(cat .env | xargs)
# 或使用python-decouple自动加载
pip install python-decouple
密钥管理服务:
# 方案1:AWS Secrets Manager
import boto3
import json
def get_secret(secret_name):
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name='us-east-1'
)
response = client.get_secret_value(SecretId=secret_name)
secret = json.loads(response['SecretString'])
return secret
# settings.py
if not DEBUG:
secrets = get_secret('myapp/production')
SECRET_KEY = secrets['SECRET_KEY']
DATABASE_PASSWORD = secrets['DB_PASSWORD']
# 方案2:HashiCorp Vault
import hvac
client = hvac.Client(url='http://vault:8200')
client.token = os.environ.get('VAULT_TOKEN')
secrets = client.secrets.kv.v2.read_secret_version(
path='myapp/config'
)['data']['data']
SECRET_KEY = secrets['SECRET_KEY']
# 方案3:Kubernetes Secrets
# 通过环境变量或文件挂载
SECRET_KEY = os.environ.get('SECRET_KEY')
# 或从文件读取
with open('/run/secrets/secret_key') as f:
SECRET_KEY = f.read().strip()
配置验证:
# settings/base.py
def validate_config():
"""验证必需的配置项"""
required_settings = [
'SECRET_KEY',
'DATABASES',
'ALLOWED_HOSTS',
]
for setting in required_settings:
if not globals().get(setting):
raise ImproperlyConfigured(
f"Missing required setting: {setting}"
)
# 验证SECRET_KEY安全性
if len(SECRET_KEY) < 50:
raise ImproperlyConfigured("SECRET_KEY太短")
# 验证DEBUG状态
if DEBUG and 'production' in os.environ.get('DJANGO_ENV', ''):
raise ImproperlyConfigured("生产环境不能开启DEBUG")
# 在启动时验证
validate_config()
动态配置(数据库配置):
from django.db import models
class SiteConfig(models.Model):
"""站点动态配置"""
key = models.CharField(max_length=100, unique=True)
value = models.TextField()
description = models.TextField(blank=True)
updated_at = models.DateTimeField(auto_now=True)
@classmethod
def get_value(cls, key, default=None):
"""获取配置值"""
try:
config = cls.objects.get(key=key)
return config.value
except cls.DoesNotExist:
return default
@classmethod
def set_value(cls, key, value):
"""设置配置值"""
config, created = cls.objects.update_or_create(
key=key,
defaults={'value': value}
)
return config
# 使用
max_upload_size = SiteConfig.get_value('MAX_UPLOAD_SIZE', '5242880')
maintenance_mode = SiteConfig.get_value('MAINTENANCE_MODE', 'False') == 'True'
最佳实践:
How to containerize Django applications?
How to containerize Django applications?
考察点:容器化、Docker最佳实践。
答案:
Docker容器化通过多阶段构建优化镜像大小,使用非root用户提升安全性,配合docker-compose编排多个服务,实现应用的标准化部署。
Dockerfile - 多阶段构建:
# ============ 构建阶段 ============
FROM python:3.11-slim as builder
WORKDIR /app
# 安装构建依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
postgresql-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 构建wheel包
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
# ============ 运行阶段 ============
FROM python:3.11-slim
# 创建应用目录
WORKDIR /app
# 安装运行时依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 从构建阶段复制wheel包
COPY --from=builder /app/wheels /wheels
COPY --from=builder /app/requirements.txt .
# 安装Python依赖
RUN pip install --no-cache /wheels/*
# 复制项目文件
COPY . .
# 收集静态文件
RUN python manage.py collectstatic --no-input
# 创建非root用户
RUN adduser --disabled-password --gecos '' appuser && \
chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health/')"
# 启动命令
CMD ["gunicorn", "myproject.wsgi:application", \
"--bind", "0.0.0.0:8000", \
"--workers", "4", \
"--timeout", "120"]
docker-compose.yml - 服务编排:
version: '3.8'
services:
# Web应用
web:
build: .
command: gunicorn myproject.wsgi:application --bind 0.0.0.0:8000 --workers 4
volumes:
- static_volume:/app/staticfiles
- media_volume:/app/media
env_file:
- .env.production
depends_on:
- db
- redis
networks:
- app-network
restart: unless-stopped
# 数据库
db:
image: postgres:15-alpine
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
- POSTGRES_DB=${DB_NAME}
- POSTGRES_USER=${DB_USER}
- POSTGRES_PASSWORD=${DB_PASSWORD}
networks:
- app-network
restart: unless-stopped
# Redis缓存
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
networks:
- app-network
restart: unless-stopped
# Celery Worker
celery:
build: .
command: celery -A myproject worker -l info
volumes:
- ./:/app
env_file:
- .env.production
depends_on:
- db
- redis
networks:
- app-network
restart: unless-stopped
# Celery Beat
celery-beat:
build: .
command: celery -A myproject beat -l info
volumes:
- ./:/app
env_file:
- .env.production
depends_on:
- db
- redis
networks:
- app-network
restart: unless-stopped
# Nginx反向代理
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- static_volume:/app/staticfiles:ro
- media_volume:/app/media:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- web
networks:
- app-network
restart: unless-stopped
volumes:
postgres_data:
redis_data:
static_volume:
media_volume:
networks:
app-network:
driver: bridge
Nginx配置:
# nginx.conf
upstream django {
server web:8000;
}
server {
listen 80;
server_name example.com;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name example.com;
# SSL配置
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
# 客户端最大上传大小
client_max_body_size 20M;
# 静态文件
location /static/ {
alias /app/staticfiles/;
expires 30d;
add_header Cache-Control "public, immutable";
}
# 媒体文件
location /media/ {
alias /app/media/;
expires 7d;
}
# Django应用
location / {
proxy_pass http://django;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
数据库迁移处理:
# entrypoint.sh
#!/bin/sh
# 等待数据库就绪
echo "等待数据库..."
while ! nc -z db 5432; do
sleep 0.1
done
echo "数据库已就绪"
# 执行迁移
echo "执行数据库迁移..."
python manage.py migrate --noinput
# 收集静态文件
echo "收集静态文件..."
python manage.py collectstatic --noinput
# 创建超级用户(仅首次)
python manage.py shell -c "
from django.contrib.auth import get_user_model;
User = get_user_model();
User.objects.filter(username='admin').exists() or \
User.objects.create_superuser('admin', '[email protected]', 'admin123')
"
# 启动应用
echo "启动Gunicorn..."
exec "$@"
# Dockerfile中使用
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
CMD ["gunicorn", "myproject.wsgi:application", "--bind", "0.0.0.0:8000"]
零宕机部署:
# docker-compose.yml
version: '3.8'
services:
web:
build: .
deploy:
replicas: 3 # 3个实例
update_config:
parallelism: 1 # 每次更新1个
delay: 10s # 间隔10秒
order: start-first # 先启动新容器
rollback_config:
parallelism: 1
delay: 5s
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/"]
interval: 10s
timeout: 5s
retries: 3
start_period: 40s
健康检查端点:
# views.py
from django.http import JsonResponse
from django.db import connection
def health_check(request):
"""健康检查端点"""
checks = {}
# 检查数据库
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
checks['database'] = 'ok'
except Exception as e:
checks['database'] = f'error: {e}'
# 检查Redis
try:
from django.core.cache import cache
cache.set('health_check', 'ok', 10)
checks['redis'] = 'ok' if cache.get('health_check') == 'ok' else 'error'
except Exception as e:
checks['redis'] = f'error: {e}'
# 检查磁盘空间
import shutil
disk = shutil.disk_usage('/')
checks['disk'] = {
'free_gb': disk.free // (1024**3),
'percent': (disk.used / disk.total) * 100
}
# 判断整体状态
all_ok = all(
v == 'ok' if isinstance(v, str) else True
for v in checks.values()
)
status = 200 if all_ok else 503
return JsonResponse({'status': 'healthy' if all_ok else 'unhealthy', 'checks': checks}, status=status)
# urls.py
urlpatterns = [
path('health/', health_check, name='health_check'),
]
日志收集:
# docker-compose.yml
version: '3.8'
services:
web:
build: .
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# 或使用集中式日志
logging:
driver: "syslog"
options:
syslog-address: "tcp://logstash:5000"
最佳实践:
How to implement observability for Django applications?
How to implement observability for Django applications?
考察点:可观测性、日志系统、监控告警。
答案:
可观测性包括日志(Logging)、指标(Metrics)、追踪(Tracing)三大支柱。通过结构化日志、Prometheus指标、OpenTelemetry追踪,实现系统的全面监控。
1. 结构化日志(Logging):
# settings.py - 配置结构化日志
import structlog
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': structlog.stdlib.ProcessorFormatter,
'processor': structlog.processors.JSONRenderer(),
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'json',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/app.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
'formatter': 'json',
},
},
'loggers': {
'django': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'myapp': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
},
},
}
# 配置structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# 使用结构化日志
import structlog
logger = structlog.get_logger()
def process_order(order_id):
logger.info("processing_order", order_id=order_id, user_id=request.user.id)
try:
order = Order.objects.get(id=order_id)
order.process()
logger.info(
"order_processed",
order_id=order_id,
status="success",
amount=float(order.total)
)
except Exception as e:
logger.error(
"order_failed",
order_id=order_id,
error=str(e),
exc_info=True
)
raise
# 输出JSON格式:
# {"event": "processing_order", "order_id": 123, "user_id": 456, "timestamp": "2024-01-01T12:00:00"}
2. Prometheus指标(Metrics):
# 安装:pip install django-prometheus
# settings.py
INSTALLED_APPS = [
'django_prometheus',
...
]
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware',
...
'django_prometheus.middleware.PrometheusAfterMiddleware',
]
# 使用Prometheus数据库后端(可选)
DATABASES = {
'default': {
'ENGINE': 'django_prometheus.db.backends.postgresql',
...
}
}
# urls.py
urlpatterns = [
path('metrics/', include('django_prometheus.urls')),
]
# 自定义指标
from prometheus_client import Counter, Histogram, Gauge
# 计数器
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# 直方图
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration'
)
# 仪表
active_users = Gauge(
'active_users_total',
'Number of active users'
)
# 在视图中使用
class MetricsMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 记录请求
request_count.labels(
method=request.method,
endpoint=request.path,
status='started'
).inc()
# 记录耗时
with request_duration.time():
response = self.get_response(request)
# 记录响应
request_count.labels(
method=request.method,
endpoint=request.path,
status=response.status_code
).inc()
return response
# 业务指标
from prometheus_client import Counter
order_created = Counter('orders_created_total', 'Total orders created')
order_failed = Counter('orders_failed_total', 'Total orders failed', ['reason'])
def create_order(order_data):
try:
order = Order.objects.create(**order_data)
order_created.inc()
return order
except ValidationError as e:
order_failed.labels(reason='validation').inc()
raise
except Exception as e:
order_failed.labels(reason='unknown').inc()
raise
3. 分布式追踪(Tracing):
# 安装:pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-django
# settings.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# 配置Jaeger导出器
jaeger_exporter = JaegerExporter(
agent_host_name='jaeger',
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# 使用追踪
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def process_order(order_id):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# 获取订单
with tracer.start_as_current_span("fetch_order"):
order = Order.objects.get(id=order_id)
# 处理支付
with tracer.start_as_current_span("process_payment") as pay_span:
pay_span.set_attribute("amount", float(order.total))
result = process_payment(order)
# 更新库存
with tracer.start_as_current_span("update_inventory"):
update_inventory(order)
span.set_attribute("status", "success")
return order
APM集成(Application Performance Monitoring):
# Sentry - 错误追踪
# pip install sentry-sdk
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
sentry_sdk.init(
dsn="https://[email protected]/123",
integrations=[DjangoIntegration()],
traces_sample_rate=0.1, # 10%的请求追踪
send_default_pii=False, # 不发送个人信息
environment="production",
)
# 手动捕获异常
def risky_operation():
try:
dangerous_code()
except Exception as e:
sentry_sdk.capture_exception(e)
# 添加上下文
sentry_sdk.set_context("order", {
"id": order_id,
"amount": amount
})
# New Relic集成
# pip install newrelic
# 启动时初始化
# newrelic-admin run-program gunicorn myproject.wsgi:application
日志聚合(ELK Stack):
# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: elasticsearch:8.5.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
networks:
- elk
logstash:
image: logstash:8.5.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
depends_on:
- elasticsearch
networks:
- elk
kibana:
image: kibana:8.5.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- elk
web:
build: .
logging:
driver: "syslog"
options:
syslog-address: "tcp://logstash:5000"
tag: "django"
networks:
- elk
volumes:
elasticsearch_data:
networks:
elk:
告警配置:
# 使用Prometheus Alertmanager
# prometheus.yml
groups:
- name: django_alerts
rules:
# 错误率告警
- alert: HighErrorRate
expr: rate(http_requests_total{status="500"}[5m]) > 0.05
for: 5m
annotations:
summary: "高错误率"
description: "5xx错误率超过5%"
# 响应时间告警
- alert: SlowRequests
expr: histogram_quantile(0.95, http_request_duration_seconds) > 2
for: 10m
annotations:
summary: "响应缓慢"
description: "95%请求超过2秒"
# 数据库连接告警
- alert: DatabaseConnectionHigh
expr: django_db_connections > 80
for: 5m
annotations:
summary: "数据库连接数过高"
自定义监控中间件:
import time
import logging
from prometheus_client import Histogram, Counter
logger = logging.getLogger(__name__)
request_duration = Histogram(
'django_request_duration_seconds',
'Request duration',
['method', 'endpoint', 'status']
)
request_total = Counter(
'django_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
class ObservabilityMiddleware:
"""可观测性中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
# 生成请求ID
request_id = str(uuid.uuid4())
request.id = request_id
# 记录请求开始
logger.info(
"request_started",
extra={
'request_id': request_id,
'method': request.method,
'path': request.path,
'user_id': getattr(request.user, 'id', None),
'ip': request.META.get('REMOTE_ADDR'),
}
)
# 处理请求
response = self.get_response(request)
# 计算耗时
duration = time.time() - start_time
# 记录指标
request_duration.labels(
method=request.method,
endpoint=request.path,
status=response.status_code
).observe(duration)
request_total.labels(
method=request.method,
endpoint=request.path,
status=response.status_code
).inc()
# 记录请求完成
logger.info(
"request_completed",
extra={
'request_id': request_id,
'status': response.status_code,
'duration': duration,
}
)
# 添加请求ID到响应头
response['X-Request-ID'] = request_id
return response
最佳实践:
How to design high availability architecture for Django applications?
How to design high availability architecture for Django applications?
考察点:高可用架构、故障恢复、弹性设计。
答案:
高可用架构通过消除单点故障、实现自动故障转移、横向扩展、降级熔断等策略,确保系统在部分组件失败时仍能继续服务。
架构设计:
Internet
|
Cloud CDN
|
WAF/DDoS防护
|
┌───────────────┴───────────────┐
│ Load Balancer (HA) │
│ Nginx/HAProxy (主备) │
└───────────────┬───────────────┘
|
┌───────┬───────┴───────┬───────┐
│ │ │ │
┌───▼───┐ ┌▼────┐ ┌──────▼┐ ┌────▼──┐
│Django │ │Django│ │Django │ │Django │
│ + │ │ + │ │ + │ │ + │
│Gunicorn│Gunicorn│Gunicorn│Gunicorn│
└───┬───┘ └┬────┘ └──────┬┘ └────┬──┘
│ │ │ │
└──────┴──────┬───────┴───────┘
│
┌─────────────┴─────────────┐
│ │
┌───▼────┐ ┌─────▼─────┐
│ DB主从 │ │ Redis集群 │
│ 集群 │ │ (Sentinel)│
│(Master)│ └───────────┘
│ + │
│(Replica)│
└────────┘
│
┌───▼────┐
│消息队列│
│RabbitMQ│
│(集群) │
└────────┘
1. 数据库高可用:
# settings.py - 数据库主从配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME'),
'USER': config('DB_USER'),
'PASSWORD': config('DB_PASSWORD'),
'HOST': config('DB_MASTER_HOST'),
'PORT': '5432',
'CONN_MAX_AGE': 60,
'OPTIONS': {
'connect_timeout': 10,
},
},
'replica1': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME'),
'USER': config('DB_READONLY_USER'),
'PASSWORD': config('DB_READONLY_PASSWORD'),
'HOST': config('DB_REPLICA1_HOST'),
'PORT': '5432',
'CONN_MAX_AGE': 60,
},
'replica2': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME'),
'USER': config('DB_READONLY_USER'),
'PASSWORD': config('DB_READONLY_PASSWORD'),
'HOST': config('DB_REPLICA2_HOST'),
'PORT': '5432',
'CONN_MAX_AGE': 60,
},
}
# 故障转移路由器
import random
from django.db import connections
from django.db.utils import OperationalError
class FailoverRouter:
"""支持故障转移的路由器"""
replica_dbs = ['replica1', 'replica2']
failed_dbs = set() # 记录失败的数据库
def db_for_read(self, model, **hints):
"""读操作with故障转移"""
# 过滤掉失败的数据库
available_replicas = [
db for db in self.replica_dbs
if db not in self.failed_dbs
]
if not available_replicas:
# 所有从库都故障,使用主库
logger.warning("所有从库故障,使用主库")
return 'default'
# 随机选择可用的从库
return random.choice(available_replicas)
def db_for_write(self, model, **hints):
return 'default'
def health_check(self):
"""健康检查"""
for db_name in self.replica_dbs:
try:
conn = connections[db_name]
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
# 数据库恢复,从失败列表移除
self.failed_dbs.discard(db_name)
except OperationalError:
# 数据库故障,添加到失败列表
self.failed_dbs.add(db_name)
logger.error(f"数据库 {db_name} 故障")
# 定期健康检查
from celery import shared_task
@shared_task
def database_health_check():
router = FailoverRouter()
router.health_check()
# 每30秒检查一次
app.conf.beat_schedule = {
'db-health-check': {
'task': 'myapp.tasks.database_health_check',
'schedule': 30.0,
},
}
2. Redis高可用(Sentinel):
# settings.py
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
# 使用Redis Sentinel
'LOCATION': [
'redis://sentinel1:26379/0',
'redis://sentinel2:26379/0',
'redis://sentinel3:26379/0',
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.SentinelClient',
'SENTINELS': [
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379),
],
'SENTINEL_KWARGS': {
'password': config('REDIS_PASSWORD'),
},
'PASSWORD': config('REDIS_PASSWORD'),
'MASTER_NAME': 'mymaster',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'socket_timeout': 5,
'socket_connect_timeout': 5,
'retry_on_timeout': True,
},
},
}
}
3. 应用层高可用:
# Gunicorn配置 - gunicorn.conf.py
import multiprocessing
# Worker数量
workers = multiprocessing.cpu_count() * 2 + 1
# Worker类型
worker_class = 'gevent' # 或 'gthread'
# 超时设置
timeout = 120
graceful_timeout = 30
keepalive = 5
# Worker重启(防止内存泄漏)
max_requests = 1000
max_requests_jitter = 100
# 优雅重启
preload_app = True
# 健康检查
def when_ready(server):
server.log.info("服务器就绪")
def worker_int(worker):
worker.log.info("Worker中断,优雅关闭")
def worker_abort(worker):
worker.log.info("Worker异常退出")
4. 负载均衡配置:
# Nginx负载均衡
"""
upstream django_cluster {
least_conn; # 最少连接算法
server web1:8000 weight=3 max_fails=3 fail_timeout=30s;
server web2:8000 weight=3 max_fails=3 fail_timeout=30s;
server web3:8000 weight=2 max_fails=3 fail_timeout=30s backup;
}
server {
listen 80;
location / {
proxy_pass http://django_cluster;
# 健康检查
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
proxy_connect_timeout 5s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
"""
# HAProxy配置
"""
frontend http-in
bind *:80
default_backend django_servers
backend django_servers
balance roundrobin
option httpchk GET /health/
server web1 web1:8000 check inter 2000 rise 2 fall 3
server web2 web2:8000 check inter 2000 rise 2 fall 3
server web3 web3:8000 check inter 2000 rise 2 fall 3 backup
"""
5. 熔断和降级:
from functools import wraps
import time
from collections import deque
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
# 检查是否可以尝试恢复
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
logger.info("熔断器进入半开状态")
else:
raise Exception("熔断器开启,服务暂时不可用")
try:
result = func(*args, **kwargs)
# 成功,重置计数
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
logger.info("熔断器恢复")
return result
except self.expected_exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
logger.error(f"熔断器开启,失败次数: {self.failure_count}")
raise
def circuit_breaker(failure_threshold=5, recovery_timeout=60):
"""熔断器装饰器"""
breaker = CircuitBreaker(failure_threshold, recovery_timeout)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return breaker.call(func, *args, **kwargs)
return wrapper
return decorator
# 使用熔断器
@circuit_breaker(failure_threshold=5, recovery_timeout=60)
def call_external_service(data):
"""调用外部服务"""
response = requests.post(
'http://external-service/api',
json=data,
timeout=5
)
response.raise_for_status()
return response.json()
# 降级处理
def get_recommendations_with_fallback(user_id):
"""推荐服务with降级"""
try:
# 尝试调用推荐服务
recommendations = call_recommendation_service(user_id)
except Exception as e:
logger.warning(f"推荐服务故障,使用降级方案: {e}")
# 降级:返回热门商品
recommendations = Product.objects.filter(
is_hot=True
).order_by('-sales')[:10]
return recommendations
6. Kubernetes自动扩缩容:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: django-app
spec:
replicas: 3
selector:
matchLabels:
app: django
template:
metadata:
labels:
app: django
spec:
containers:
- name: django
image: myapp:latest
ports:
- containerPort: 8000
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
livenessProbe:
httpGet:
path: /health/
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready/
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
# hpa.yaml - 水平自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: django-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: django-app
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 1
periodSeconds: 60
故障场景处理:
"""
场景1:数据库主节点宕机
- 检测:健康检查失败
- 响应:自动提升从库为主库
- 工具:Patroni、PgPool-II
- 恢复时间:<30秒
场景2:应用服务器宕机
- 检测:健康检查失败
- 响应:负载均衡器移除故障节点
- 工具:Nginx/HAProxy健康检查
- 恢复时间:<10秒
场景3:Redis主节点宕机
- 检测:Sentinel监控
- 响应:自动提升从节点
- 工具:Redis Sentinel
- 恢复时间:<60秒
场景4:缓存完全失效
- 检测:缓存命中率骤降
- 响应:降级直接查数据库
- 保护:限流、熔断
- 恢复:缓存预热
场景5:流量突增
- 检测:CPU/内存使用率上升
- 响应:HPA自动扩容
- 工具:Kubernetes HPA
- 扩容时间:1-2分钟
场景6:跨地域故障
- 检测:区域健康检查
- 响应:DNS切换到备用区域
- 工具:Route53、CloudFlare
- 切换时间:TTL时间(通常5分钟)
"""
灾难恢复计划:
# 1. 数据备份
from django.core.management.base import BaseCommand
import subprocess
from datetime import datetime
class Command(BaseCommand):
def handle(self, *args, **options):
"""数据库备份"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"/backups/db_backup_{timestamp}.sql"
# PostgreSQL备份
subprocess.run([
'pg_dump',
'-h', config('DB_HOST'),
'-U', config('DB_USER'),
'-d', config('DB_NAME'),
'-f', backup_file
])
# 上传到S3
subprocess.run([
'aws', 's3', 'cp',
backup_file,
f"s3://myapp-backups/database/{timestamp}.sql"
])
self.stdout.write(f"备份完成: {backup_file}")
# 2. 定时备份
CELERY_BEAT_SCHEDULE = {
'backup-database-daily': {
'task': 'myapp.tasks.backup_database',
'schedule': crontab(hour=2, minute=0), # 每天凌晨2点
},
}
# 3. 恢复流程
def restore_database(backup_file):
"""恢复数据库"""
subprocess.run([
'psql',
'-h', config('DB_HOST'),
'-U', config('DB_USER'),
'-d', config('DB_NAME'),
'-f', backup_file
])
监控和告警:
# Prometheus告警规则
"""
groups:
- name: high_availability
rules:
# 服务可用性
- alert: ServiceDown
expr: up{job="django"} == 0
for: 1m
annotations:
summary: "Django服务宕机"
# 数据库连接
- alert: DatabaseDown
expr: django_db_execute_total == 0
for: 2m
annotations:
summary: "数据库连接失败"
# 缓存故障
- alert: CacheDown
expr: redis_up == 0
for: 1m
annotations:
summary: "Redis缓存故障"
# 错误率
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
for: 5m
annotations:
summary: "错误率过高"
"""
# Django中的告警
import requests
def send_alert(title, message, level='critical'):
"""发送告警"""
# 发送到Slack
requests.post(
'https://hooks.slack.com/services/YOUR/WEBHOOK/URL',
json={
'text': f"[{level.upper()}] {title}",
'attachments': [{
'text': message,
'color': 'danger' if level == 'critical' else 'warning'
}]
}
)
# 发送到PagerDuty
requests.post(
'https://events.pagerduty.com/v2/enqueue',
json={
'routing_key': config('PAGERDUTY_KEY'),
'event_action': 'trigger',
'payload': {
'summary': title,
'severity': level,
'source': 'django-app',
}
}
)
# 在关键错误时发送告警
def critical_operation():
try:
perform_operation()
except CriticalError as e:
send_alert(
"关键操作失败",
f"错误详情: {e}",
level='critical'
)
raise
最佳实践总结:
"""
架构层面:
✅ 消除所有单点故障
✅ 多活部署(多个数据中心)
✅ 自动故障转移
✅ 定期故障演练
数据层面:
✅ 数据库主从复制
✅ 定期备份(本地+异地)
✅ 备份定期测试恢复
✅ 读写分离
应用层面:
✅ 无状态应用设计
✅ 会话外部存储(Redis)
✅ 横向扩展能力
✅ 优雅启停
监控层面:
✅ 全链路监控
✅ 实时告警
✅ 性能基线
✅ 容量规划
容灾层面:
✅ 降级预案
✅ 熔断机制
✅ 限流保护
✅ 灾难恢复计划(RTO/RPO)
目标SLA:
- 可用性:99.99%(年停机52分钟)
- RTO (恢复时间):<5分钟
- RPO (数据丢失):<5分钟
"""
✅ 完成! 所有45道题目的答案已全部完成:
每道题都包含了详细的代码示例、原理讲解、最佳实践,完全符合 format-optimization-guide.md 的标准模板要求!