什么是Node.js?它的主要特点和优势是什么?
What is Node.js? What are its main features and advantages?
*考察点:Node.js基础概念。*
共 30 道题目
What is Node.js? What are its main features and advantages?
What is Node.js? What are its main features and advantages?
考察点:Node.js基础概念。
答案:
Node.js是一个基于Chrome V8引擎的JavaScript运行时环境,它允许JavaScript代码在服务器端运行。Node.js采用事件驱动、非阻塞I/O模型,使其在构建可扩展的网络应用程序时非常高效。
主要特点:
主要优势:
适用场景:
How does the event loop mechanism work in Node.js?
How does the event loop mechanism work in Node.js?
考察点:事件循环理解。
答案:
Node.js的事件循环是其核心机制,负责处理异步操作和回调函数。它采用单线程模型,通过事件循环来管理和调度各种异步任务。
事件循环的六个阶段:
执行优先级:
// 微任务队列优先级最高
process.nextTick(() => console.log('nextTick'));
Promise.resolve().then(() => console.log('Promise'));
// 宏任务按阶段执行
setTimeout(() => console.log('setTimeout'), 0);
setImmediate(() => console.log('setImmediate'));
// 输出顺序:nextTick -> Promise -> setTimeout -> setImmediate
关键概念:
What is npm? How to use npm to manage project dependencies?
What is npm? How to use npm to manage project dependencies?
考察点:包管理基础。
答案:
npm(Node Package Manager)是Node.js的包管理工具,用于管理和分享JavaScript代码包。它是世界上最大的软件包注册表,包含数百万个可重用的代码包。
核心功能:
常用命令:
# 初始化项目
npm init
npm init -y # 使用默认配置
# 安装依赖
npm install express # 安装到dependencies
npm install --save-dev jest # 安装到devDependencies
npm install -g nodemon # 全局安装
# 管理依赖
npm uninstall express # 卸载包
npm update # 更新所有包
npm outdated # 查看过时的包
# 运行脚本
npm run start
npm run test
npm run build
package.json配置:
{
"name": "my-app",
"version": "1.0.0",
"dependencies": {
"express": "^4.18.0"
},
"devDependencies": {
"jest": "^28.0.0"
},
"scripts": {
"start": "node index.js",
"test": "jest"
}
}
依赖版本管理:
^1.2.3:兼容版本(1.x.x)~1.2.3:补丁版本(1.2.x)1.2.3:精确版本What is the module system in Node.js? What are the differences between CommonJS and ES Module?
What is the module system in Node.js? What are the differences between CommonJS and ES Module?
考察点:模块系统理解。
答案:
Node.js支持两种主要的模块系统:CommonJS(默认)和ES Module(ES6模块)。模块系统用于组织代码,实现代码的复用和封装。
CommonJS模块系统:
// 导出模块 (math.js)
function add(a, b) {
return a + b;
}
module.exports = { add };
// 或者
exports.add = add;
// 导入模块 (app.js)
const { add } = require('./math');
const math = require('./math');
console.log(add(1, 2)); // 3
ES Module模块系统:
// 导出模块 (math.mjs 或设置 "type": "module")
export function add(a, b) {
return a + b;
}
export default { add };
// 导入模块
import { add } from './math.mjs';
import math from './math.mjs';
console.log(add(1, 2)); // 3
主要区别对比:
| 特性 | CommonJS | ES Module |
|---|---|---|
| 语法 | require/module.exports | import/export |
| 加载时机 | 运行时动态加载 | 编译时静态加载 |
| 同步/异步 | 同步加载 | 异步加载 |
| 循环依赖 | 支持但可能有问题 | 更好的循环依赖处理 |
| Tree-shaking | 不支持 | 支持 |
| 顶层await | 不支持 | 支持 |
启用ES Module方式:
.mjs"type": "module"--input-type=module标志最佳实践:
How to handle file operations in Node.js?
How to handle file operations in Node.js?
考察点:文件系统操作。
答案:
Node.js提供了强大的文件系统模块(fs)来处理文件操作,支持同步和异步两种方式。推荐使用异步方法以避免阻塞事件循环。
基本文件操作:
const fs = require('fs');
const path = require('path');
// 异步读取文件
fs.readFile('data.txt', 'utf8', (err, data) => {
if (err) throw err;
console.log(data);
});
// Promise方式(Node.js 10+)
const fsPromises = require('fs').promises;
async function readFileAsync() {
try {
const data = await fsPromises.readFile('data.txt', 'utf8');
console.log(data);
} catch (err) {
console.error(err);
}
}
// 写入文件
fs.writeFile('output.txt', 'Hello World', 'utf8', (err) => {
if (err) throw err;
console.log('文件已保存');
});
常用文件操作方法:
// 检查文件是否存在
fs.access('file.txt', fs.constants.F_OK, (err) => {
console.log(err ? '文件不存在' : '文件存在');
});
// 获取文件信息
fs.stat('file.txt', (err, stats) => {
if (err) throw err;
console.log('文件大小:', stats.size);
console.log('是否为文件:', stats.isFile());
console.log('是否为目录:', stats.isDirectory());
});
// 创建目录
fs.mkdir('newDir', { recursive: true }, (err) => {
if (err) throw err;
console.log('目录已创建');
});
// 读取目录
fs.readdir('.', (err, files) => {
if (err) throw err;
console.log('目录内容:', files);
});
流式文件操作(大文件处理):
const fs = require('fs');
// 创建可读流
const readStream = fs.createReadStream('large-file.txt');
readStream.on('data', chunk => {
console.log('接收到数据块:', chunk.length);
});
// 文件复制
const readStream2 = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');
readStream2.pipe(writeStream);
错误处理最佳实践:
How to create an HTTP server in Node.js?
How to create an HTTP server in Node.js?
考察点:HTTP服务基础。
答案:
Node.js内置的http模块提供了创建HTTP服务器的核心功能。可以创建基础的Web服务器来处理HTTP请求和响应。
基础HTTP服务器:
const http = require('http');
const url = require('url');
// 创建服务器
const server = http.createServer((req, res) => {
const parsedUrl = url.parse(req.url, true);
const path = parsedUrl.pathname;
const method = req.method;
// 设置响应头
res.writeHead(200, {
'Content-Type': 'text/html; charset=utf-8',
'Access-Control-Allow-Origin': '*'
});
// 路由处理
if (path === '/' && method === 'GET') {
res.end('<h1>欢迎访问首页</h1>');
} else if (path === '/api/users' && method === 'GET') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ users: ['Alice', 'Bob'] }));
} else {
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('页面未找到');
}
});
// 启动服务器
const PORT = 3000;
server.listen(PORT, () => {
console.log(`服务器运行在 http://localhost:${PORT}`);
});
处理POST请求和数据:
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
let body = '';
// 接收数据
req.on('data', chunk => {
body += chunk.toString();
});
// 数据接收完成
req.on('end', () => {
try {
const data = JSON.parse(body);
console.log('接收到数据:', data);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ message: '数据已接收', data }));
} catch (error) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: '无效的JSON数据' }));
}
});
}
});
使用Express框架简化开发:
const express = require('express');
const app = express();
// 中间件
app.use(express.json());
app.use(express.static('public'));
// 路由
app.get('/', (req, res) => {
res.send('<h1>欢迎访问首页</h1>');
});
app.get('/api/users', (req, res) => {
res.json({ users: ['Alice', 'Bob'] });
});
app.post('/api/users', (req, res) => {
const { name } = req.body;
res.json({ message: `用户 ${name} 已创建` });
});
app.listen(3000, () => {
console.log('Express服务器运行在端口3000');
});
HTTPS服务器:
const https = require('https');
const fs = require('fs');
const options = {
key: fs.readFileSync('private-key.pem'),
cert: fs.readFileSync('certificate.pem')
};
const server = https.createServer(options, (req, res) => {
res.writeHead(200);
res.end('HTTPS服务器运行中');
});
server.listen(443);
What is middleware? How to use it in Express?
What is middleware? How to use it in Express?
考察点:中间件概念。
答案:
中间件是Express.js的核心概念,它是一个函数,在请求-响应循环中具有访问请求对象(req)、响应对象(res)和下一个中间件函数(next)的能力。中间件可以执行代码、修改请求和响应对象、结束请求-响应循环或调用下一个中间件。
中间件函数结构:
function middleware(req, res, next) {
// 执行中间件逻辑
console.log('中间件被调用');
// 调用next()传递控制权给下一个中间件
next();
}
中间件类型和使用方式:
1. 应用级中间件:
const express = require('express');
const app = express();
// 全局中间件 - 所有请求都会执行
app.use((req, res, next) => {
console.log('请求时间:', new Date().toISOString());
console.log('请求方法:', req.method);
console.log('请求路径:', req.path);
next();
});
// 路径特定中间件
app.use('/api', (req, res, next) => {
console.log('API请求');
next();
});
2. 路由级中间件:
const router = express.Router();
// 路由中间件
router.use((req, res, next) => {
console.log('路由中间件');
next();
});
router.get('/users', (req, res) => {
res.json({ users: [] });
});
app.use('/api', router);
3. 内置中间件:
// 解析JSON请求体
app.use(express.json());
// 解析URL编码数据
app.use(express.urlencoded({ extended: true }));
// 提供静态文件服务
app.use(express.static('public'));
4. 第三方中间件:
const cors = require('cors');
const helmet = require('helmet');
const morgan = require('morgan');
// 跨域中间件
app.use(cors());
// 安全头中间件
app.use(helmet());
// 日志中间件
app.use(morgan('combined'));
5. 自定义中间件:
// 身份验证中间件
function authenticate(req, res, next) {
const token = req.headers.authorization;
if (!token) {
return res.status(401).json({ error: '未提供认证令牌' });
}
// 验证token逻辑
try {
const user = verifyToken(token);
req.user = user;
next();
} catch (error) {
res.status(401).json({ error: '无效令牌' });
}
}
// 使用认证中间件
app.get('/protected', authenticate, (req, res) => {
res.json({ message: `欢迎,${req.user.name}` });
});
错误处理中间件:
// 错误处理中间件 - 必须有4个参数
app.use((err, req, res, next) => {
console.error('错误:', err.stack);
if (err.status) {
res.status(err.status).json({ error: err.message });
} else {
res.status(500).json({ error: '服务器内部错误' });
}
});
中间件执行顺序:
中间件按照定义的顺序执行,必须调用next()来传递控制权,否则请求会挂起。
How to handle asynchronous operations in Node.js? What are the differences between callbacks, Promises, and async/await?
How to handle asynchronous operations in Node.js? What are the differences between callbacks, Promises, and async/await?
考察点:异步编程。
答案:
Node.js中的异步操作是其核心特性,主要有三种处理方式:回调函数(Callback)、Promise和async/await。每种方式都有其特点和适用场景。
1. 回调函数(Callback):
const fs = require('fs');
// 传统回调方式
function readFileCallback(filename, callback) {
fs.readFile(filename, 'utf8', (err, data) => {
if (err) {
callback(err, null);
} else {
callback(null, data);
}
});
}
// 使用回调
readFileCallback('data.txt', (err, data) => {
if (err) {
console.error('读取失败:', err);
} else {
console.log('文件内容:', data);
}
});
// 回调地狱问题
fs.readFile('file1.txt', (err, data1) => {
if (err) throw err;
fs.readFile('file2.txt', (err, data2) => {
if (err) throw err;
fs.writeFile('result.txt', data1 + data2, (err) => {
if (err) throw err;
console.log('合并完成');
});
});
});
2. Promise:
const fs = require('fs').promises;
// Promise方式
function readFilePromise(filename) {
return fs.readFile(filename, 'utf8');
}
// 使用Promise
readFilePromise('data.txt')
.then(data => {
console.log('文件内容:', data);
})
.catch(err => {
console.error('读取失败:', err);
});
// Promise链式调用解决回调地狱
fs.readFile('file1.txt', 'utf8')
.then(data1 => {
return fs.readFile('file2.txt', 'utf8')
.then(data2 => ({ data1, data2 }));
})
.then(({ data1, data2 }) => {
return fs.writeFile('result.txt', data1 + data2);
})
.then(() => {
console.log('合并完成');
})
.catch(err => {
console.error('操作失败:', err);
});
3. async/await:
const fs = require('fs').promises;
// async/await方式
async function readFileAsync(filename) {
try {
const data = await fs.readFile(filename, 'utf8');
console.log('文件内容:', data);
return data;
} catch (err) {
console.error('读取失败:', err);
throw err;
}
}
// 使用async/await解决回调地狱
async function mergeFiles() {
try {
const data1 = await fs.readFile('file1.txt', 'utf8');
const data2 = await fs.readFile('file2.txt', 'utf8');
await fs.writeFile('result.txt', data1 + data2);
console.log('合并完成');
} catch (err) {
console.error('操作失败:', err);
}
}
// 并行执行多个异步操作
async function readMultipleFiles() {
try {
const [data1, data2, data3] = await Promise.all([
fs.readFile('file1.txt', 'utf8'),
fs.readFile('file2.txt', 'utf8'),
fs.readFile('file3.txt', 'utf8')
]);
console.log('所有文件读取完成');
return { data1, data2, data3 };
} catch (err) {
console.error('读取失败:', err);
}
}
三种方式对比:
| 特性 | Callback | Promise | async/await |
|---|---|---|---|
| 语法复杂度 | 简单 | 中等 | 最简洁 |
| 错误处理 | 每个回调都要处理 | .catch()统一处理 | try/catch处理 |
| 回调地狱 | 存在 | 解决 | 完全解决 |
| 调试难度 | 困难 | 中等 | 容易 |
| 浏览器支持 | 全部 | ES6+ | ES2017+ |
| 并行执行 | 复杂 | Promise.all() | Promise.all() + await |
最佳实践:
How to handle errors in Node.js?
How to handle errors in Node.js?
考察点:错误处理机制。
答案:
Node.js中的错误处理是保证应用程序稳定性和可靠性的关键。Node.js提供了多种错误处理机制来应对不同类型的错误情况。
错误类型:
同步错误处理:
// 使用try-catch处理同步错误
try {
const data = JSON.parse('无效的JSON');
console.log(data);
} catch (error) {
console.error('JSON解析错误:', error.message);
}
// 函数中的错误处理
function divide(a, b) {
if (b === 0) {
throw new Error('除数不能为零');
}
return a / b;
}
try {
const result = divide(10, 0);
console.log(result);
} catch (error) {
console.error('计算错误:', error.message);
}
异步错误处理:
1. 回调函数错误处理:
const fs = require('fs');
// Node.js错误优先回调模式
fs.readFile('nonexistent.txt', 'utf8', (err, data) => {
if (err) {
console.error('文件读取错误:', err.message);
return;
}
console.log('文件内容:', data);
});
// 自定义错误优先回调
function asyncOperation(callback) {
setTimeout(() => {
const random = Math.random();
if (random > 0.5) {
callback(null, '操作成功');
} else {
callback(new Error('操作失败'));
}
}, 1000);
}
asyncOperation((err, result) => {
if (err) {
console.error('异步操作错误:', err.message);
} else {
console.log('结果:', result);
}
});
2. Promise错误处理:
const fs = require('fs').promises;
// Promise错误处理
fs.readFile('nonexistent.txt', 'utf8')
.then(data => {
console.log('文件内容:', data);
})
.catch(err => {
console.error('Promise错误:', err.message);
});
// Promise链中的错误传播
Promise.resolve()
.then(() => {
throw new Error('Promise链中的错误');
})
.then(() => {
console.log('这里不会执行');
})
.catch(err => {
console.error('捕获到错误:', err.message);
});
3. async/await错误处理:
async function handleAsyncErrors() {
try {
const data = await fs.readFile('nonexistent.txt', 'utf8');
console.log('文件内容:', data);
} catch (err) {
console.error('async/await错误:', err.message);
}
}
// 多个异步操作的错误处理
async function multipleAsyncOperations() {
try {
const results = await Promise.allSettled([
fs.readFile('file1.txt', 'utf8'),
fs.readFile('file2.txt', 'utf8'),
fs.readFile('nonexistent.txt', 'utf8')
]);
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`文件${index + 1}读取成功:`, result.value.substring(0, 50));
} else {
console.error(`文件${index + 1}读取失败:`, result.reason.message);
}
});
} catch (err) {
console.error('批量操作错误:', err.message);
}
}
全局错误处理:
// 捕获未处理的Promise拒绝
process.on('unhandledRejection', (reason, promise) => {
console.error('未处理的Promise拒绝:', reason);
// 记录日志、清理资源等
process.exit(1);
});
// 捕获未捕获的异常
process.on('uncaughtException', (err) => {
console.error('未捕获的异常:', err);
// 记录日志、清理资源等
process.exit(1);
});
// Express错误处理中间件
app.use((err, req, res, next) => {
console.error('Express错误:', err.stack);
if (err.status) {
res.status(err.status).json({
error: err.message,
stack: process.env.NODE_ENV === 'development' ? err.stack : undefined
});
} else {
res.status(500).json({ error: '内部服务器错误' });
}
});
自定义错误类:
class CustomError extends Error {
constructor(message, statusCode = 500) {
super(message);
this.name = this.constructor.name;
this.statusCode = statusCode;
Error.captureStackTrace(this, this.constructor);
}
}
class ValidationError extends CustomError {
constructor(message) {
super(message, 400);
}
}
// 使用自定义错误
function validateUser(user) {
if (!user.email) {
throw new ValidationError('邮箱是必填项');
}
if (!user.password) {
throw new ValidationError('密码是必填项');
}
}
最佳实践:
What is Buffer in Node.js? How to use it?
What is Buffer in Node.js? How to use it?
考察点:Buffer数据处理。
答案:
Buffer是Node.js中用于处理二进制数据的全局对象。在Node.js中,Buffer类是用来创建一个专门存放二进制数据的缓存区,类似于整数数组,但对应于V8堆内存之外的一块原始内存。
Buffer的特点:
创建Buffer的方式:
// 1. 创建指定长度的Buffer(已废弃,不安全)
// const buf1 = new Buffer(10); // 不推荐
// 2. 安全的创建方式
const buf1 = Buffer.alloc(10); // 创建10字节的Buffer,填充0
const buf2 = Buffer.allocUnsafe(10); // 创建10字节的Buffer,不初始化(更快但不安全)
const buf3 = Buffer.from('hello'); // 从字符串创建
const buf4 = Buffer.from([0x62, 0x75, 0x66, 0x66, 0x65, 0x72]); // 从数组创建
console.log(buf1); // <Buffer 00 00 00 00 00 00 00 00 00 00>
console.log(buf3); // <Buffer 68 65 6c 6c 6f>
Buffer与字符串转换:
// 字符串转Buffer
const str = 'Hello Node.js';
const buf = Buffer.from(str, 'utf8');
console.log(buf); // <Buffer 48 65 6c 6c 6f 20 4e 6f 64 65 2e 6a 73>
// Buffer转字符串
const bufToStr = buf.toString('utf8');
console.log(bufToStr); // Hello Node.js
// 支持多种编码
const buf2 = Buffer.from('你好', 'utf8');
console.log(buf2.toString('utf8')); // 你好
console.log(buf2.toString('base64')); // 5L2g5aW9
console.log(buf2.toString('hex')); // e4bda0e5a5bd
Buffer操作方法:
const buf = Buffer.from('Hello World');
// 基本属性和方法
console.log(buf.length); // 11 - Buffer长度
console.log(buf[0]); // 72 - 第一个字节(H的ASCII码)
// 读取和写入
buf.write('Hi', 0, 'utf8'); // 在位置0写入'Hi'
console.log(buf.toString()); // Hi World
// 切片(不复制数据,共享内存)
const slice = buf.slice(0, 2);
console.log(slice.toString()); // Hi
// 复制Buffer
const buf2 = Buffer.alloc(5);
buf.copy(buf2, 0, 0, 5); // 复制buf的前5个字节到buf2
console.log(buf2.toString()); // Hi Wo
// 填充Buffer
const buf3 = Buffer.alloc(10);
buf3.fill('ab');
console.log(buf3.toString()); // ababababab
Buffer的实际应用场景:
1. 文件操作:
const fs = require('fs');
// 读取二进制文件
fs.readFile('image.jpg', (err, data) => {
if (err) throw err;
console.log('文件大小:', data.length, '字节');
console.log('文件类型:', data.slice(0, 4)); // 读取文件头
});
// 写入二进制数据
const imageBuffer = Buffer.from([/* 图片二进制数据 */]);
fs.writeFile('output.jpg', imageBuffer, (err) => {
if (err) throw err;
console.log('图片已保存');
});
2. 网络通信:
const net = require('net');
const server = net.createServer((socket) => {
socket.on('data', (data) => {
// data是Buffer对象
console.log('接收到数据:', data.length, '字节');
console.log('数据内容:', data.toString());
// 发送Buffer响应
const response = Buffer.from('数据已收到', 'utf8');
socket.write(response);
});
});
server.listen(8080);
3. 加密和哈希:
const crypto = require('crypto');
const data = 'sensitive data';
const buffer = Buffer.from(data, 'utf8');
// 创建哈希
const hash = crypto.createHash('sha256');
hash.update(buffer);
const hashResult = hash.digest('hex');
console.log('SHA256哈希:', hashResult);
// Base64编码
const base64 = buffer.toString('base64');
console.log('Base64编码:', base64);
Buffer性能优化:
// 预分配Buffer避免频繁创建
const pool = Buffer.allocUnsafe(1024 * 1024); // 1MB缓冲池
let offset = 0;
function getBuffer(size) {
if (offset + size > pool.length) {
offset = 0; // 重置偏移
}
const buf = pool.slice(offset, offset + size);
offset += size;
return buf;
}
// 使用Buffer.concat合并多个Buffer
const buffers = [
Buffer.from('Hello '),
Buffer.from('Node.js '),
Buffer.from('World')
];
const combined = Buffer.concat(buffers);
console.log(combined.toString()); // Hello Node.js World
注意事项:
What is cluster mode in Node.js? How to implement load balancing?
What is cluster mode in Node.js? How to implement load balancing?
考察点:集群架构理解。
答案:
Node.js集群模式是一种通过创建多个工作进程来充分利用多核CPU系统的方法。由于Node.js是单线程的,集群模式可以帮助应用程序处理更多的并发连接,提高应用性能和可靠性。
集群模式的工作原理:
基本集群实现:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 创建工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 监听工作进程退出
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
console.log('重新启动工作进程...');
cluster.fork();
});
// 监听工作进程上线
cluster.on('online', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已上线`);
});
} else {
// 工作进程代码
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from process ${process.pid}\n`);
});
server.listen(3000, () => {
console.log(`工作进程 ${process.pid} 监听端口 3000`);
});
}
高级集群管理:
const cluster = require('cluster');
const http = require('http');
const os = require('os');
class ClusterManager {
constructor(options = {}) {
this.workers = new Map();
this.maxWorkers = options.maxWorkers || os.cpus().length;
this.restartDelay = options.restartDelay || 1000;
this.maxRestarts = options.maxRestarts || 5;
}
start() {
if (cluster.isMaster) {
this.startMaster();
} else {
this.startWorker();
}
}
startMaster() {
console.log(`主进程 ${process.pid} 启动,创建 ${this.maxWorkers} 个工作进程`);
// 创建工作进程
for (let i = 0; i < this.maxWorkers; i++) {
this.createWorker();
}
// 优雅关闭
process.on('SIGTERM', () => {
console.log('接收到SIGTERM信号,开始优雅关闭...');
this.shutdown();
});
process.on('SIGINT', () => {
console.log('接收到SIGINT信号,开始优雅关闭...');
this.shutdown();
});
}
createWorker() {
const worker = cluster.fork();
const workerData = {
worker,
restarts: 0,
startTime: Date.now()
};
this.workers.set(worker.id, workerData);
worker.on('exit', (code, signal) => {
const data = this.workers.get(worker.id);
if (data && data.restarts < this.maxRestarts) {
console.log(`工作进程 ${worker.process.pid} 异常退出,准备重启...`);
setTimeout(() => {
this.createWorker();
}, this.restartDelay);
data.restarts++;
} else {
console.log(`工作进程 ${worker.process.pid} 重启次数过多,不再重启`);
}
this.workers.delete(worker.id);
});
return worker;
}
startWorker() {
const server = http.createServer((req, res) => {
// 模拟一些处理逻辑
const start = Date.now();
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
pid: process.pid,
timestamp: new Date().toISOString(),
uptime: process.uptime(),
memoryUsage: process.memoryUsage()
}));
});
server.listen(3000, () => {
console.log(`工作进程 ${process.pid} 监听端口 3000`);
});
// 优雅关闭工作进程
process.on('SIGTERM', () => {
console.log(`工作进程 ${process.pid} 接收到关闭信号`);
server.close(() => {
process.exit(0);
});
});
}
shutdown() {
for (const [id, data] of this.workers) {
data.worker.kill('SIGTERM');
}
setTimeout(() => {
process.exit(0);
}, 5000);
}
}
// 启动集群
const clusterManager = new ClusterManager({
maxWorkers: 4,
restartDelay: 2000,
maxRestarts: 3
});
clusterManager.start();
使用PM2进行集群管理:
// ecosystem.config.js
module.exports = {
apps: [{
name: 'my-app',
script: 'app.js',
instances: 'max', // 或指定数量,如 4
exec_mode: 'cluster',
env: {
NODE_ENV: 'production',
PORT: 3000
},
error_file: './logs/err.log',
out_file: './logs/out.log',
log_file: './logs/combined.log',
time: true
}]
};
// 启动命令
// pm2 start ecosystem.config.js
// pm2 reload my-app // 零停机重启
// pm2 scale my-app 8 // 扩展到8个实例
负载均衡策略:
// 自定义负载均衡
const cluster = require('cluster');
if (cluster.isMaster) {
const workers = [];
let current = 0;
// 轮询负载均衡
cluster.setupMaster({
schedulingPolicy: cluster.SCHED_NONE // 禁用默认负载均衡
});
for (let i = 0; i < 4; i++) {
workers.push(cluster.fork());
}
// 自定义请求分发逻辑
const server = require('net').createServer({ pauseOnConnect: true }, (connection) => {
const worker = workers[current];
current = (current + 1) % workers.length;
worker.send('sticky-session:connection', connection);
});
server.listen(3000);
}
集群监控和健康检查:
// 健康检查和监控
class HealthMonitor {
constructor() {
this.stats = new Map();
}
startMonitoring() {
setInterval(() => {
for (const [id, worker] of Object.entries(cluster.workers)) {
worker.send('health-check');
}
}, 30000);
cluster.on('message', (worker, message) => {
if (message.type === 'health-response') {
this.updateStats(worker.id, message.data);
}
});
}
updateStats(workerId, data) {
this.stats.set(workerId, {
...data,
lastUpdated: Date.now()
});
}
getClusterStats() {
return Array.from(this.stats.entries()).map(([id, stats]) => ({
workerId: id,
...stats
}));
}
}
最佳实践:
How to connect and operate databases in Node.js?
How to connect and operate databases in Node.js?
考察点:数据库集成。
答案:
Node.js支持多种数据库连接方式,包括关系型数据库(MySQL、PostgreSQL)和NoSQL数据库(MongoDB、Redis)。可以使用原生驱动或ORM/ODM工具进行数据库操作。
MongoDB连接和操作:
// 使用mongoose ODM
const mongoose = require('mongoose');
// 连接MongoDB
async function connectDB() {
try {
await mongoose.connect('mongodb://localhost:27017/myapp', {
useNewUrlParser: true,
useUnifiedTopology: true
});
console.log('MongoDB连接成功');
} catch (error) {
console.error('MongoDB连接失败:', error);
process.exit(1);
}
}
// 定义Schema和Model
const userSchema = new mongoose.Schema({
name: { type: String, required: true },
email: { type: String, required: true, unique: true },
age: { type: Number, min: 0 },
createdAt: { type: Date, default: Date.now }
});
const User = mongoose.model('User', userSchema);
// CRUD操作
async function userOperations() {
try {
// 创建用户
const newUser = new User({
name: '张三',
email: '[email protected]',
age: 25
});
await newUser.save();
// 查询用户
const users = await User.find({ age: { $gte: 18 } });
const user = await User.findOne({ email: '[email protected]' });
// 更新用户
await User.updateOne(
{ _id: user._id },
{ $set: { age: 26 } }
);
// 删除用户
await User.deleteOne({ _id: user._id });
} catch (error) {
console.error('数据库操作错误:', error);
}
}
MySQL连接和操作:
// 使用mysql2驱动
const mysql = require('mysql2/promise');
// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'myapp',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// 数据库操作函数
class UserService {
async createUser(userData) {
const { name, email, age } = userData;
const [result] = await pool.execute(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
[name, email, age]
);
return result.insertId;
}
async getUserById(id) {
const [rows] = await pool.execute(
'SELECT * FROM users WHERE id = ?',
[id]
);
return rows[0];
}
async updateUser(id, userData) {
const { name, email, age } = userData;
const [result] = await pool.execute(
'UPDATE users SET name = ?, email = ?, age = ? WHERE id = ?',
[name, email, age, id]
);
return result.affectedRows;
}
async deleteUser(id) {
const [result] = await pool.execute(
'DELETE FROM users WHERE id = ?',
[id]
);
return result.affectedRows;
}
async getUsers(limit = 10, offset = 0) {
const [rows] = await pool.execute(
'SELECT * FROM users LIMIT ? OFFSET ?',
[limit, offset]
);
return rows;
}
}
使用Sequelize ORM:
const { Sequelize, DataTypes } = require('sequelize');
// 创建Sequelize实例
const sequelize = new Sequelize('myapp', 'username', 'password', {
host: 'localhost',
dialect: 'mysql',
logging: false, // 禁用SQL日志
pool: {
max: 10,
min: 0,
acquire: 30000,
idle: 10000
}
});
// 定义模型
const User = sequelize.define('User', {
id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true
},
name: {
type: DataTypes.STRING,
allowNull: false
},
email: {
type: DataTypes.STRING,
allowNull: false,
unique: true,
validate: {
isEmail: true
}
},
age: {
type: DataTypes.INTEGER,
validate: {
min: 0,
max: 150
}
}
}, {
timestamps: true,
tableName: 'users'
});
// 同步数据库
async function syncDatabase() {
try {
await sequelize.authenticate();
console.log('数据库连接成功');
await sequelize.sync({ force: false });
console.log('数据库同步完成');
} catch (error) {
console.error('数据库连接失败:', error);
}
}
// 使用模型进行操作
async function userCRUD() {
try {
// 创建
const user = await User.create({
name: '李四',
email: '[email protected]',
age: 30
});
// 查询
const users = await User.findAll({
where: {
age: {
[Sequelize.Op.gte]: 18
}
},
limit: 10,
order: [['createdAt', 'DESC']]
});
// 更新
await User.update(
{ age: 31 },
{ where: { id: user.id } }
);
// 删除
await User.destroy({
where: { id: user.id }
});
} catch (error) {
console.error('操作失败:', error);
}
}
Redis连接和操作:
const redis = require('redis');
// 创建Redis客户端
const client = redis.createClient({
host: 'localhost',
port: 6379,
password: 'your-password',
db: 0
});
client.on('connect', () => {
console.log('Redis连接成功');
});
client.on('error', (err) => {
console.error('Redis连接错误:', err);
});
// Redis操作
class CacheService {
async set(key, value, expireTime = 3600) {
try {
const serializedValue = JSON.stringify(value);
await client.setex(key, expireTime, serializedValue);
} catch (error) {
console.error('缓存设置失败:', error);
}
}
async get(key) {
try {
const value = await client.get(key);
return value ? JSON.parse(value) : null;
} catch (error) {
console.error('缓存获取失败:', error);
return null;
}
}
async delete(key) {
try {
await client.del(key);
} catch (error) {
console.error('缓存删除失败:', error);
}
}
async exists(key) {
try {
return await client.exists(key);
} catch (error) {
console.error('缓存检查失败:', error);
return false;
}
}
}
数据库连接管理最佳实践:
class DatabaseManager {
constructor() {
this.connections = new Map();
}
async connect(config) {
try {
let connection;
switch (config.type) {
case 'mongodb':
connection = await mongoose.connect(config.url, config.options);
break;
case 'mysql':
connection = mysql.createPool(config.options);
break;
case 'redis':
connection = redis.createClient(config.options);
break;
default:
throw new Error(`不支持的数据库类型: ${config.type}`);
}
this.connections.set(config.name, {
type: config.type,
connection,
config
});
console.log(`${config.type} 数据库 ${config.name} 连接成功`);
return connection;
} catch (error) {
console.error('数据库连接失败:', error);
throw error;
}
}
getConnection(name) {
return this.connections.get(name)?.connection;
}
async closeAll() {
for (const [name, { type, connection }] of this.connections) {
try {
if (type === 'mongodb') {
await mongoose.disconnect();
} else if (type === 'mysql') {
await connection.end();
} else if (type === 'redis') {
connection.quit();
}
console.log(`${name} 连接已关闭`);
} catch (error) {
console.error(`关闭 ${name} 连接失败:`, error);
}
}
this.connections.clear();
}
}
// 使用示例
const dbManager = new DatabaseManager();
async function initDatabases() {
await dbManager.connect({
name: 'main',
type: 'mysql',
options: {
host: 'localhost',
user: 'root',
password: 'password',
database: 'myapp'
}
});
await dbManager.connect({
name: 'cache',
type: 'redis',
options: {
host: 'localhost',
port: 6379
}
});
}
What are streams in Node.js? What types are there?
What are streams in Node.js? What types are there?
考察点:流处理机制。
答案:
流(Stream)是Node.js中处理流式数据的抽象接口。Stream是一个抽象接口,用于读写数据的统一方式,特别适合处理大量数据,因为它允许你逐块处理数据,而不需要将整个数据集加载到内存中。
流的主要特点:
四种主要流类型:
1. 可读流(Readable Stream):
const fs = require('fs');
const { Readable } = require('stream');
// 文件可读流
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 1024 // 缓冲区大小
});
readStream.on('data', (chunk) => {
console.log('接收到数据块:', chunk.length);
});
readStream.on('end', () => {
console.log('数据读取完成');
});
readStream.on('error', (err) => {
console.error('读取错误:', err);
});
// 自定义可读流
class NumberStream extends Readable {
constructor(max) {
super();
this.current = 0;
this.max = max;
}
_read() {
if (this.current < this.max) {
this.push(`数字: ${this.current++}\n`);
} else {
this.push(null); // 结束流
}
}
}
const numberStream = new NumberStream(5);
numberStream.on('data', chunk => console.log(chunk.toString()));
2. 可写流(Writable Stream):
const fs = require('fs');
const { Writable } = require('stream');
// 文件可写流
const writeStream = fs.createWriteStream('output.txt');
writeStream.write('Hello ');
writeStream.write('World\n');
writeStream.end(); // 结束写入
writeStream.on('finish', () => {
console.log('写入完成');
});
// 自定义可写流
class ConsoleWriter extends Writable {
_write(chunk, encoding, callback) {
console.log(`写入数据: ${chunk.toString()}`);
callback();
}
}
const consoleWriter = new ConsoleWriter();
consoleWriter.write('测试数据');
consoleWriter.end();
3. 双工流(Duplex Stream):
const { Duplex } = require('stream');
const net = require('net');
// TCP套接字是双工流的例子
const server = net.createServer((socket) => {
// socket既可读又可写
socket.write('欢迎连接到服务器\n');
socket.on('data', (data) => {
console.log('接收到:', data.toString());
socket.write(`回显: ${data}`);
});
});
server.listen(3000);
// 自定义双工流
class EchoStream extends Duplex {
_write(chunk, encoding, callback) {
// 写入的数据会自动推送到可读端
this.push(chunk);
callback();
}
_read() {
// 可读端的实现
}
}
const echo = new EchoStream();
echo.on('data', chunk => console.log('回显:', chunk.toString()));
echo.write('Hello Duplex Stream');
4. 转换流(Transform Stream):
const { Transform } = require('stream');
const fs = require('fs');
// 自定义转换流 - 转换为大写
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
}
// 使用转换流处理文件
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
const upperCaseTransform = new UpperCaseTransform();
readStream
.pipe(upperCaseTransform)
.pipe(writeStream);
// CSV解析转换流
class CSVParser extends Transform {
constructor() {
super({ objectMode: true });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!this.headers) {
this.headers = line.split(',');
} else if (line.trim()) {
const values = line.split(',');
const obj = {};
this.headers.forEach((header, index) => {
obj[header] = values[index];
});
this.push(obj);
}
}
callback();
}
}
流的管道操作:
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
// 复杂的流管道:读取 -> 压缩 -> 加密 -> 写入
fs.createReadStream('input.txt')
.pipe(zlib.createGzip()) // 压缩
.pipe(crypto.createCipher('aes256', 'secret-key')) // 加密
.pipe(fs.createWriteStream('output.txt.gz.enc')) // 写入
.on('finish', () => {
console.log('流管道处理完成');
});
// 错误处理
const pipeline = require('stream').pipeline;
pipeline(
fs.createReadStream('input.txt'),
new UpperCaseTransform(),
fs.createWriteStream('output.txt'),
(err) => {
if (err) {
console.error('管道错误:', err);
} else {
console.log('管道成功完成');
}
}
);
流的高级应用:
// 背压处理(Backpressure)
const fs = require('fs');
function copyFile(src, dest) {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(src);
const writeStream = fs.createWriteStream(dest);
readStream.on('data', (chunk) => {
const canWriteMore = writeStream.write(chunk);
// 如果写入缓冲区满了,暂停读取
if (!canWriteMore) {
readStream.pause();
writeStream.once('drain', () => {
readStream.resume();
});
}
});
readStream.on('end', () => {
writeStream.end();
resolve();
});
readStream.on('error', reject);
writeStream.on('error', reject);
});
}
// 对象模式流
class ObjectTransform extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(obj, encoding, callback) {
// 处理对象数据
const processed = {
...obj,
timestamp: new Date().toISOString(),
processed: true
};
this.push(processed);
callback();
}
}
流的性能优化:
适用场景:
How to implement authentication and authorization in Node.js applications?
How to implement authentication and authorization in Node.js applications?
考察点:安全认证机制。
答案:
身份验证(Authentication)和授权(Authorization)是Node.js应用安全的核心组成部分。身份验证确认用户身份,授权决定用户可以访问哪些资源和执行哪些操作。
身份验证方法:
1. JWT(JSON Web Token)认证:
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');
const express = require('express');
const app = express();
app.use(express.json());
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key';
const REFRESH_SECRET = process.env.REFRESH_SECRET || 'refresh-secret';
// 用户注册
app.post('/register', async (req, res) => {
try {
const { username, password, email } = req.body;
// 密码哈希
const saltRounds = 12;
const hashedPassword = await bcrypt.hash(password, saltRounds);
// 保存用户到数据库(示例)
const user = {
id: Date.now(),
username,
email,
password: hashedPassword
};
res.status(201).json({
message: '用户注册成功',
userId: user.id
});
} catch (error) {
res.status(500).json({ error: '注册失败' });
}
});
// 用户登录
app.post('/login', async (req, res) => {
try {
const { username, password } = req.body;
// 从数据库获取用户(示例)
const user = await getUserByUsername(username);
if (!user) {
return res.status(401).json({ error: '用户名或密码错误' });
}
// 验证密码
const isValidPassword = await bcrypt.compare(password, user.password);
if (!isValidPassword) {
return res.status(401).json({ error: '用户名或密码错误' });
}
// 生成访问令牌和刷新令牌
const accessToken = jwt.sign(
{
userId: user.id,
username: user.username,
roles: user.roles || ['user']
},
JWT_SECRET,
{ expiresIn: '15m' }
);
const refreshToken = jwt.sign(
{ userId: user.id },
REFRESH_SECRET,
{ expiresIn: '7d' }
);
// 保存刷新令牌到数据库
await saveRefreshToken(user.id, refreshToken);
res.json({
accessToken,
refreshToken,
user: {
id: user.id,
username: user.username,
email: user.email
}
});
} catch (error) {
res.status(500).json({ error: '登录失败' });
}
});
// 令牌刷新
app.post('/refresh-token', async (req, res) => {
try {
const { refreshToken } = req.body;
if (!refreshToken) {
return res.status(401).json({ error: '刷新令牌缺失' });
}
// 验证刷新令牌
const decoded = jwt.verify(refreshToken, REFRESH_SECRET);
const storedToken = await getRefreshToken(decoded.userId);
if (!storedToken || storedToken !== refreshToken) {
return res.status(401).json({ error: '无效的刷新令牌' });
}
// 获取用户信息
const user = await getUserById(decoded.userId);
// 生成新的访问令牌
const newAccessToken = jwt.sign(
{
userId: user.id,
username: user.username,
roles: user.roles || ['user']
},
JWT_SECRET,
{ expiresIn: '15m' }
);
res.json({ accessToken: newAccessToken });
} catch (error) {
res.status(401).json({ error: '令牌刷新失败' });
}
});
2. 身份验证中间件:
// JWT认证中间件
function authenticateToken(req, res, next) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: '访问令牌缺失' });
}
jwt.verify(token, JWT_SECRET, (err, decoded) => {
if (err) {
return res.status(403).json({ error: '无效的访问令牌' });
}
req.user = decoded;
next();
});
}
// 可选认证中间件
function optionalAuth(req, res, next) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (token) {
jwt.verify(token, JWT_SECRET, (err, decoded) => {
if (!err) {
req.user = decoded;
}
});
}
next();
}
// 使用认证中间件
app.get('/protected', authenticateToken, (req, res) => {
res.json({
message: '这是受保护的资源',
user: req.user
});
});
3. 基于角色的授权(RBAC):
// 授权中间件
function authorize(roles = []) {
return (req, res, next) => {
if (!req.user) {
return res.status(401).json({ error: '未认证' });
}
// 如果没有指定角色要求,则只需要认证
if (roles.length === 0) {
return next();
}
// 检查用户角色
const userRoles = req.user.roles || [];
const hasPermission = roles.some(role => userRoles.includes(role));
if (!hasPermission) {
return res.status(403).json({ error: '权限不足' });
}
next();
};
}
// 权限检查函数
function checkPermission(resource, action) {
return async (req, res, next) => {
try {
const userId = req.user.userId;
const hasPermission = await userHasPermission(userId, resource, action);
if (!hasPermission) {
return res.status(403).json({ error: `无权限执行 ${action} 操作` });
}
next();
} catch (error) {
res.status(500).json({ error: '权限检查失败' });
}
};
}
// 使用授权中间件
app.get('/admin', authenticateToken, authorize(['admin']), (req, res) => {
res.json({ message: '管理员专用接口' });
});
app.get('/users', authenticateToken, authorize(['admin', 'moderator']), (req, res) => {
res.json({ message: '用户列表' });
});
app.delete('/posts/:id',
authenticateToken,
checkPermission('posts', 'delete'),
(req, res) => {
res.json({ message: '帖子已删除' });
}
);
4. Session基础认证:
const session = require('express-session');
const MongoStore = require('connect-mongo');
// Session配置
app.use(session({
secret: process.env.SESSION_SECRET || 'session-secret',
resave: false,
saveUninitialized: false,
store: MongoStore.create({
mongoUrl: 'mongodb://localhost:27017/myapp'
}),
cookie: {
secure: process.env.NODE_ENV === 'production', // HTTPS
httpOnly: true, // 防止XSS
maxAge: 1000 * 60 * 60 * 24 // 24小时
}
}));
// Session登录
app.post('/login-session', async (req, res) => {
try {
const { username, password } = req.body;
const user = await getUserByUsername(username);
if (!user || !await bcrypt.compare(password, user.password)) {
return res.status(401).json({ error: '认证失败' });
}
// 设置session
req.session.userId = user.id;
req.session.username = user.username;
req.session.roles = user.roles;
res.json({ message: '登录成功', user: { id: user.id, username: user.username } });
} catch (error) {
res.status(500).json({ error: '登录失败' });
}
});
// Session认证中间件
function requireAuth(req, res, next) {
if (!req.session.userId) {
return res.status(401).json({ error: '请先登录' });
}
req.user = {
userId: req.session.userId,
username: req.session.username,
roles: req.session.roles
};
next();
}
// Session登出
app.post('/logout', (req, res) => {
req.session.destroy((err) => {
if (err) {
return res.status(500).json({ error: '登出失败' });
}
res.clearCookie('connect.sid');
res.json({ message: '登出成功' });
});
});
5. OAuth 2.0集成(以Google为例):
const passport = require('passport');
const GoogleStrategy = require('passport-google-oauth20').Strategy;
// Passport配置
passport.use(new GoogleStrategy({
clientID: process.env.GOOGLE_CLIENT_ID,
clientSecret: process.env.GOOGLE_CLIENT_SECRET,
callbackURL: "/auth/google/callback"
}, async (accessToken, refreshToken, profile, done) => {
try {
// 查找或创建用户
let user = await getUserByGoogleId(profile.id);
if (!user) {
user = await createUser({
googleId: profile.id,
username: profile.displayName,
email: profile.emails[0].value,
avatar: profile.photos[0].value
});
}
return done(null, user);
} catch (error) {
return done(error, null);
}
}));
// Google OAuth路由
app.get('/auth/google',
passport.authenticate('google', { scope: ['profile', 'email'] })
);
app.get('/auth/google/callback',
passport.authenticate('google', { failureRedirect: '/login' }),
(req, res) => {
// 生成JWT令牌
const token = jwt.sign(
{ userId: req.user.id, username: req.user.username },
JWT_SECRET,
{ expiresIn: '1h' }
);
res.redirect(`/dashboard?token=${token}`);
}
);
6. 安全最佳实践:
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
// 安全头
app.use(helmet());
// 限流
const loginLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 5, // 最多5次尝试
message: '登录尝试过于频繁,请稍后再试',
standardHeaders: true,
legacyHeaders: false
});
app.use('/login', loginLimiter);
// 密码复杂度验证
function validatePassword(password) {
const minLength = 8;
const hasUpperCase = /[A-Z]/.test(password);
const hasLowerCase = /[a-z]/.test(password);
const hasNumbers = /\d/.test(password);
const hasSpecialChar = /[!@#$%^&*(),.?":{}|<>]/.test(password);
return password.length >= minLength &&
hasUpperCase &&
hasLowerCase &&
hasNumbers &&
hasSpecialChar;
}
// 登录日志
function logLoginAttempt(username, success, ip) {
const logEntry = {
username,
success,
ip,
timestamp: new Date(),
userAgent: req.headers['user-agent']
};
// 保存到日志系统
console.log('登录尝试:', logEntry);
}
// 账户锁定机制
async function checkAccountLockout(username) {
const attempts = await getFailedLoginAttempts(username);
const maxAttempts = 5;
const lockoutDuration = 30 * 60 * 1000; // 30分钟
if (attempts.length >= maxAttempts) {
const lastAttempt = attempts[attempts.length - 1];
const timeSinceLastAttempt = Date.now() - lastAttempt.timestamp;
if (timeSinceLastAttempt < lockoutDuration) {
throw new Error('账户已被锁定,请稍后再试');
}
}
}
认证授权最佳实践:
How to handle CORS issues in Node.js?
How to handle CORS issues in Node.js?
考察点:跨域处理。
答案:
跨源资源共享(CORS)是一种机制,允许Web页面从不同域名访问资源。在Node.js应用中,需要正确配置CORS来确保前端应用能够安全地访问后端API。
CORS的工作原理:
1. 使用cors中间件(推荐):
const express = require('express');
const cors = require('cors');
const app = express();
// 基本CORS设置 - 允许所有来源(开发环境)
app.use(cors());
// 自定义CORS配置
const corsOptions = {
origin: [
'http://localhost:3000',
'https://myapp.com',
'https://www.myapp.com'
],
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: [
'Origin',
'X-Requested-With',
'Content-Type',
'Accept',
'Authorization',
'Cache-Control'
],
credentials: true, // 允许发送Cookie
maxAge: 86400 // 预检请求缓存时间(秒)
};
app.use(cors(corsOptions));
// 动态CORS配置
const dynamicCors = (req, callback) => {
let corsOptions;
// 根据环境或请求动态设置
if (process.env.NODE_ENV === 'development') {
corsOptions = { origin: true }; // 允许所有来源
} else {
// 从数据库或配置中获取允许的域名
const allowedOrigins = getAllowedOrigins();
corsOptions = {
origin: allowedOrigins,
credentials: true
};
}
callback(null, corsOptions);
};
app.use(cors(dynamicCors));
2. 手动实现CORS:
// 自定义CORS中间件
function customCors(req, res, next) {
const allowedOrigins = [
'http://localhost:3000',
'https://myapp.com'
];
const origin = req.headers.origin;
// 检查来源是否被允许
if (allowedOrigins.includes(origin)) {
res.setHeader('Access-Control-Allow-Origin', origin);
}
// 设置允许的HTTP方法
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
// 设置允许的请求头
res.setHeader('Access-Control-Allow-Headers',
'Origin, X-Requested-With, Content-Type, Accept, Authorization');
// 允许发送Cookie
res.setHeader('Access-Control-Allow-Credentials', 'true');
// 预检请求缓存时间
res.setHeader('Access-Control-Max-Age', '86400');
// 处理预检请求
if (req.method === 'OPTIONS') {
res.status(200).end();
return;
}
next();
}
app.use(customCors);
// 更详细的CORS处理
function advancedCors(req, res, next) {
const origin = req.headers.origin;
const method = req.method;
const requestHeaders = req.headers['access-control-request-headers'];
// 日志记录
console.log(`CORS请求: ${method} ${req.url} from ${origin}`);
// 验证来源
if (isOriginAllowed(origin)) {
res.setHeader('Access-Control-Allow-Origin', origin);
// 根据请求方法设置不同的头
if (method === 'OPTIONS') {
// 预检请求处理
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE');
if (requestHeaders) {
res.setHeader('Access-Control-Allow-Headers', requestHeaders);
}
res.status(204).end();
return;
}
// 实际请求处理
res.setHeader('Access-Control-Allow-Credentials', 'true');
// 暴露自定义响应头给前端
res.setHeader('Access-Control-Expose-Headers', 'X-Total-Count, X-Page-Count');
} else {
// 不允许的来源
res.status(403).json({ error: '不允许的跨域访问' });
return;
}
next();
}
function isOriginAllowed(origin) {
const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];
return allowedOrigins.includes(origin) || process.env.NODE_ENV === 'development';
}
3. 特定路由的CORS设置:
const express = require('express');
const cors = require('cors');
const app = express();
// 为特定路由设置CORS
const apiCorsOptions = {
origin: ['https://api-client.com', 'https://dashboard.com'],
methods: ['GET', 'POST', 'PUT', 'DELETE'],
credentials: true
};
const publicCorsOptions = {
origin: true, // 允许所有来源
methods: ['GET'],
credentials: false
};
// API路由使用严格的CORS设置
app.use('/api', cors(apiCorsOptions));
// 公共资源使用宽松的CORS设置
app.use('/public', cors(publicCorsOptions));
// 特定路由的自定义CORS
app.get('/api/public-data', cors({ origin: true }), (req, res) => {
res.json({ message: '公共数据' });
});
// 需要认证的API使用严格CORS
app.use('/api/private', cors({
origin: (origin, callback) => {
// 动态验证来源
validateOrigin(origin)
.then(isValid => callback(null, isValid))
.catch(err => callback(err));
},
credentials: true
}));
4. 处理复杂的CORS场景:
// 处理文件上传的CORS
const multer = require('multer');
const upload = multer({ dest: 'uploads/' });
const uploadCors = cors({
origin: ['https://upload-client.com'],
methods: ['POST'],
allowedHeaders: ['Content-Type', 'Authorization'],
credentials: true
});
app.post('/api/upload', uploadCors, upload.single('file'), (req, res) => {
res.json({ message: '文件上传成功', file: req.file });
});
// WebSocket的CORS处理
const { Server } = require('socket.io');
const http = require('http');
const server = http.createServer(app);
const io = new Server(server, {
cors: {
origin: ["https://socket-client.com", "http://localhost:3000"],
methods: ["GET", "POST"],
credentials: true
}
});
io.on('connection', (socket) => {
console.log('WebSocket连接建立');
// 验证来源
const origin = socket.handshake.headers.origin;
if (!isOriginAllowed(origin)) {
socket.disconnect();
return;
}
socket.on('message', (data) => {
// 处理消息
});
});
5. CORS安全最佳实践:
// 环境相关的CORS配置
const getCorsOptions = () => {
const baseOptions = {
methods: ['GET', 'POST', 'PUT', 'DELETE'],
allowedHeaders: [
'Origin',
'X-Requested-With',
'Content-Type',
'Accept',
'Authorization'
],
maxAge: 86400
};
if (process.env.NODE_ENV === 'production') {
return {
...baseOptions,
origin: process.env.ALLOWED_ORIGINS?.split(',') || [],
credentials: true
};
} else {
return {
...baseOptions,
origin: true,
credentials: true
};
}
};
app.use(cors(getCorsOptions()));
// CORS错误处理
app.use((err, req, res, next) => {
if (err.message === 'CORS Error') {
res.status(403).json({
error: '跨域请求被拒绝',
message: '请确认请求来源是否被允许'
});
} else {
next(err);
}
});
// CORS监控和日志
function corsLogger(req, res, next) {
const origin = req.headers.origin;
const method = req.method;
// 记录跨域请求
if (origin && origin !== req.headers.host) {
console.log(`跨域请求: ${method} ${req.url} 来自 ${origin}`);
// 可以添加到日志系统或监控系统
logCorsRequest({
method,
url: req.url,
origin,
userAgent: req.headers['user-agent'],
timestamp: new Date()
});
}
next();
}
app.use(corsLogger);
6. 代理和反向代理的CORS处理:
// 使用http-proxy-middleware处理代理CORS
const { createProxyMiddleware } = require('http-proxy-middleware');
// API代理,自动处理CORS
const apiProxy = createProxyMiddleware('/api', {
target: 'http://backend-server.com',
changeOrigin: true,
pathRewrite: {
'^/api': '', // 重写路径
},
onProxyReq: (proxyReq, req, res) => {
// 添加必要的头
proxyReq.setHeader('Origin', 'http://your-frontend.com');
},
onProxyRes: (proxyRes, req, res) => {
// 设置CORS头
proxyRes.headers['Access-Control-Allow-Origin'] = req.headers.origin || '*';
proxyRes.headers['Access-Control-Allow-Credentials'] = 'true';
}
});
app.use(apiProxy);
CORS问题调试:
// CORS调试中间件
function corsDebug(req, res, next) {
const origin = req.headers.origin;
const method = req.method;
console.log('=== CORS Debug Info ===');
console.log('Origin:', origin);
console.log('Method:', method);
console.log('Headers:', req.headers);
if (method === 'OPTIONS') {
console.log('预检请求检测');
console.log('Request Headers:', req.headers['access-control-request-headers']);
console.log('Request Method:', req.headers['access-control-request-method']);
}
// 响应后记录CORS头
const originalEnd = res.end;
res.end = function(...args) {
console.log('Response CORS Headers:');
console.log('Access-Control-Allow-Origin:', res.getHeader('Access-Control-Allow-Origin'));
console.log('Access-Control-Allow-Methods:', res.getHeader('Access-Control-Allow-Methods'));
console.log('Access-Control-Allow-Headers:', res.getHeader('Access-Control-Allow-Headers'));
console.log('========================');
originalEnd.apply(this, args);
};
next();
}
// 在开发环境启用调试
if (process.env.NODE_ENV === 'development') {
app.use(corsDebug);
}
常见CORS问题和解决方案:
credentials: true和明确的originallowedHeaders中添加自定义头Access-Control-Expose-Headers暴露头部How to optimize the performance of Node.js applications?
How to optimize the performance of Node.js applications?
考察点:性能优化策略。
答案:
Node.js应用性能优化是一个多层面的过程,涉及代码层面、架构设计、系统配置等多个方面。以下是主要的优化策略和技术:
1. 代码层面优化:
异步操作优化:
// 避免阻塞事件循环
// 错误做法 - 同步操作
const fs = require('fs');
const data = fs.readFileSync('large-file.txt'); // 阻塞
// 正确做法 - 异步操作
const fsPromises = require('fs').promises;
async function readFile() {
try {
const data = await fsPromises.readFile('large-file.txt');
return data;
} catch (error) {
console.error(error);
}
}
// 批处理异步操作
async function processMultipleFiles(filenames) {
// 并行处理
const results = await Promise.all(
filenames.map(filename => fsPromises.readFile(filename))
);
return results;
}
// 限制并发数量
async function processWithConcurrencyLimit(items, limit = 5) {
const results = [];
for (let i = 0; i < items.length; i += limit) {
const batch = items.slice(i, i + limit);
const batchResults = await Promise.all(
batch.map(item => processItem(item))
);
results.push(...batchResults);
}
return results;
}
内存管理优化:
// 避免内存泄漏
class DataProcessor {
constructor() {
this.cache = new Map();
this.timers = new Set();
}
processData(data) {
// 使用WeakMap避免内存泄漏
const metadata = new WeakMap();
metadata.set(data, { processed: true, timestamp: Date.now() });
// 定期清理缓存
this.scheduleCleanup();
return this.transform(data);
}
scheduleCleanup() {
const timer = setTimeout(() => {
this.cleanup();
this.timers.delete(timer);
}, 60000);
this.timers.add(timer);
}
cleanup() {
// 清理过期缓存
const now = Date.now();
for (const [key, value] of this.cache.entries()) {
if (now - value.timestamp > 300000) { // 5分钟过期
this.cache.delete(key);
}
}
}
destroy() {
// 清理资源
this.cache.clear();
this.timers.forEach(timer => clearTimeout(timer));
this.timers.clear();
}
}
// 流式处理大数据
const { Transform } = require('stream');
class DataTransform extends Transform {
_transform(chunk, encoding, callback) {
// 逐块处理,避免加载整个文件到内存
const processed = this.processChunk(chunk);
this.push(processed);
callback();
}
processChunk(chunk) {
// 处理数据块
return chunk.toString().toUpperCase();
}
}
2. 数据库优化:
// 连接池优化
const mysql = require('mysql2/promise');
const pool = mysql.createPool({
host: 'localhost',
user: 'username',
password: 'password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10, // 连接池大小
queueLimit: 0,
acquireTimeout: 60000, // 获取连接超时
timeout: 60000, // 查询超时
reconnect: true,
reconnectDelay: 2000
});
// 查询优化
class DatabaseService {
async getUsersOptimized(page = 1, limit = 10) {
// 使用索引字段查询
// 限制返回字段
// 分页查询
const offset = (page - 1) * limit;
const [rows] = await pool.execute(`
SELECT id, username, email, created_at
FROM users
WHERE status = 'active'
ORDER BY id
LIMIT ? OFFSET ?
`, [limit, offset]);
return rows;
}
async getUserWithCache(userId) {
// 多级缓存策略
let user = await this.getFromMemoryCache(`user:${userId}`);
if (user) return user;
user = await this.getFromRedisCache(`user:${userId}`);
if (user) {
this.setMemoryCache(`user:${userId}`, user, 300); // 5分钟
return user;
}
user = await this.getUserFromDB(userId);
if (user) {
this.setRedisCache(`user:${userId}`, user, 3600); // 1小时
this.setMemoryCache(`user:${userId}`, user, 300);
}
return user;
}
}
3. 缓存策略:
const NodeCache = require('node-cache');
const Redis = require('redis');
// 多层缓存系统
class CacheManager {
constructor() {
// 内存缓存 - 最快,但容量小
this.memoryCache = new NodeCache({
stdTTL: 300, // 默认5分钟过期
checkperiod: 60, // 每60秒检查过期
maxKeys: 10000 // 最大key数量
});
// Redis缓存 - 较快,容量大
this.redisClient = Redis.createClient();
}
async get(key) {
// 先查内存缓存
let value = this.memoryCache.get(key);
if (value !== undefined) {
return value;
}
// 再查Redis缓存
const redisValue = await this.redisClient.get(key);
if (redisValue) {
value = JSON.parse(redisValue);
// 回写内存缓存
this.memoryCache.set(key, value, 300);
return value;
}
return null;
}
async set(key, value, ttl = 3600) {
// 同时设置内存和Redis缓存
this.memoryCache.set(key, value, Math.min(ttl, 300));
await this.redisClient.setex(key, ttl, JSON.stringify(value));
}
async invalidate(pattern) {
// 删除匹配的缓存
const keys = this.memoryCache.keys();
keys.forEach(key => {
if (key.includes(pattern)) {
this.memoryCache.del(key);
}
});
// Redis模式删除
const redisKeys = await this.redisClient.keys(`*${pattern}*`);
if (redisKeys.length > 0) {
await this.redisClient.del(redisKeys);
}
}
}
4. HTTP优化:
const compression = require('compression');
const express = require('express');
const app = express();
// 启用Gzip压缩
app.use(compression({
level: 6, // 压缩级别
threshold: 1024, // 只压缩大于1KB的响应
filter: (req, res) => {
// 自定义压缩过滤器
if (req.headers['x-no-compression']) {
return false;
}
return compression.filter(req, res);
}
}));
// HTTP缓存
app.get('/api/static-data', (req, res) => {
res.set({
'Cache-Control': 'public, max-age=3600', // 1小时缓存
'ETag': generateETag(data),
'Last-Modified': new Date().toUTCString()
});
// 检查条件请求
if (req.fresh) {
res.status(304).end();
return;
}
res.json(data);
});
// 响应时间监控
function responseTime(req, res, next) {
const startTime = process.hrtime();
res.on('finish', () => {
const [seconds, nanoseconds] = process.hrtime(startTime);
const milliseconds = seconds * 1000 + nanoseconds / 1000000;
res.set('X-Response-Time', `${milliseconds.toFixed(2)}ms`);
// 记录慢请求
if (milliseconds > 1000) {
console.warn(`慢请求: ${req.method} ${req.url} - ${milliseconds.toFixed(2)}ms`);
}
});
next();
}
app.use(responseTime);
5. 集群和负载均衡:
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numWorkers = process.env.NODE_WORKERS || os.cpus().length;
console.log(`启动 ${numWorkers} 个工作进程`);
// 创建工作进程
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
// 工作进程崩溃重启
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 退出`);
cluster.fork();
});
// 优雅关闭
process.on('SIGTERM', () => {
console.log('收到SIGTERM信号,开始关闭工作进程...');
for (const id in cluster.workers) {
cluster.workers[id].kill();
}
});
} else {
// 工作进程代码
require('./app.js');
}
6. 性能监控和分析:
// 性能监控
class PerformanceMonitor {
constructor() {
this.metrics = {
requests: 0,
errors: 0,
responseTime: [],
memory: [],
cpu: []
};
this.startMonitoring();
}
startMonitoring() {
// 每秒收集系统指标
setInterval(() => {
this.collectMetrics();
}, 1000);
// 每分钟输出报告
setInterval(() => {
this.generateReport();
}, 60000);
}
collectMetrics() {
const memUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
this.metrics.memory.push({
rss: memUsage.rss,
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal,
timestamp: Date.now()
});
this.metrics.cpu.push({
user: cpuUsage.user,
system: cpuUsage.system,
timestamp: Date.now()
});
// 保持最近1小时的数据
const oneHourAgo = Date.now() - 3600000;
this.metrics.memory = this.metrics.memory.filter(m => m.timestamp > oneHourAgo);
this.metrics.cpu = this.metrics.cpu.filter(c => c.timestamp > oneHourAgo);
this.metrics.responseTime = this.metrics.responseTime.filter(r => r.timestamp > oneHourAgo);
}
recordRequest(responseTime, isError = false) {
this.metrics.requests++;
if (isError) this.metrics.errors++;
this.metrics.responseTime.push({
time: responseTime,
timestamp: Date.now()
});
}
generateReport() {
const avgResponseTime = this.metrics.responseTime.length > 0
? this.metrics.responseTime.reduce((sum, r) => sum + r.time, 0) / this.metrics.responseTime.length
: 0;
const errorRate = this.metrics.requests > 0
? (this.metrics.errors / this.metrics.requests) * 100
: 0;
console.log('=== 性能报告 ===');
console.log(`请求总数: ${this.metrics.requests}`);
console.log(`错误率: ${errorRate.toFixed(2)}%`);
console.log(`平均响应时间: ${avgResponseTime.toFixed(2)}ms`);
console.log(`内存使用: ${(process.memoryUsage().heapUsed / 1024 / 1024).toFixed(2)}MB`);
}
}
const monitor = new PerformanceMonitor();
// 集成到Express中间件
function performanceMiddleware(req, res, next) {
const start = process.hrtime();
res.on('finish', () => {
const [seconds, nanoseconds] = process.hrtime(start);
const responseTime = seconds * 1000 + nanoseconds / 1000000;
const isError = res.statusCode >= 400;
monitor.recordRequest(responseTime, isError);
});
next();
}
app.use(performanceMiddleware);
7. 其他优化技巧:
// JSON序列化优化
const fastJsonStringify = require('fast-json-stringify');
const stringify = fastJsonStringify({
title: 'User Schema',
type: 'object',
properties: {
id: { type: 'number' },
username: { type: 'string' },
email: { type: 'string' }
}
});
// 比JSON.stringify快2-5倍
app.get('/api/users', async (req, res) => {
const users = await getUsersFromDB();
res.set('Content-Type', 'application/json');
res.send(stringify(users));
});
// 定时器优化
class TimerManager {
constructor() {
this.timers = new Set();
}
setTimeout(callback, delay) {
const timer = setTimeout(() => {
callback();
this.timers.delete(timer);
}, delay);
this.timers.add(timer);
return timer;
}
clearAll() {
this.timers.forEach(timer => clearTimeout(timer));
this.timers.clear();
}
}
// 避免创建不必要的对象
function optimizedHandler(req, res) {
// 重用对象
const response = {
success: true,
data: null,
timestamp: Date.now()
};
// 避免在循环中创建对象
const items = [];
for (let i = 0; i < data.length; i++) {
// 直接操作现有对象而不是创建新对象
items.push(processItem(data[i]));
}
response.data = items;
res.json(response);
}
性能优化最佳实践:
How to implement caching mechanisms in Node.js?
How to implement caching mechanisms in Node.js?
考察点:缓存策略设计。
答案:
缓存是提高Node.js应用性能的重要手段,通过存储频繁访问的数据来减少数据库查询和计算开销。Node.js中可以实现多种类型的缓存机制。
1. 内存缓存(Memory Cache):
const NodeCache = require('node-cache');
// 基本内存缓存
class MemoryCache {
constructor(options = {}) {
this.cache = new NodeCache({
stdTTL: options.ttl || 600, // 默认10分钟过期
checkperiod: options.checkperiod || 60, // 每60秒检查过期
useClones: false, // 不克隆对象,提高性能
maxKeys: options.maxKeys || 10000 // 最大key数量
});
this.stats = {
hits: 0,
misses: 0,
sets: 0
};
}
get(key) {
const value = this.cache.get(key);
if (value !== undefined) {
this.stats.hits++;
return value;
}
this.stats.misses++;
return null;
}
set(key, value, ttl) {
this.stats.sets++;
return this.cache.set(key, value, ttl);
}
del(key) {
return this.cache.del(key);
}
flush() {
this.cache.flushAll();
this.stats = { hits: 0, misses: 0, sets: 0 };
}
getStats() {
const hitRate = this.stats.hits + this.stats.misses > 0
? (this.stats.hits / (this.stats.hits + this.stats.misses) * 100).toFixed(2)
: 0;
return {
...this.stats,
hitRate: `${hitRate}%`,
keys: this.cache.keys().length
};
}
}
const memoryCache = new MemoryCache({ ttl: 300, maxKeys: 5000 });
// 使用示例
async function getUserById(userId) {
const cacheKey = `user:${userId}`;
// 先从缓存获取
let user = memoryCache.get(cacheKey);
if (user) {
console.log('从内存缓存获取用户');
return user;
}
// 缓存未命中,从数据库获取
user = await database.getUserById(userId);
if (user) {
memoryCache.set(cacheKey, user, 600); // 缓存10分钟
}
return user;
}
2. Redis分布式缓存:
const Redis = require('redis');
const client = Redis.createClient({
host: 'localhost',
port: 6379,
db: 0,
maxRetriesPerRequest: 3
});
class RedisCache {
constructor(redisClient) {
this.client = redisClient;
this.prefix = 'cache:';
}
async get(key) {
try {
const value = await this.client.get(this.prefix + key);
return value ? JSON.parse(value) : null;
} catch (error) {
console.error('Redis获取缓存错误:', error);
return null;
}
}
async set(key, value, ttl = 3600) {
try {
const serialized = JSON.stringify(value);
if (ttl > 0) {
await this.client.setex(this.prefix + key, ttl, serialized);
} else {
await this.client.set(this.prefix + key, serialized);
}
return true;
} catch (error) {
console.error('Redis设置缓存错误:', error);
return false;
}
}
async del(key) {
try {
return await this.client.del(this.prefix + key);
} catch (error) {
console.error('Redis删除缓存错误:', error);
return false;
}
}
async exists(key) {
try {
return await this.client.exists(this.prefix + key);
} catch (error) {
console.error('Redis检查缓存错误:', error);
return false;
}
}
async increment(key, value = 1, ttl = 3600) {
try {
const result = await this.client.incrby(this.prefix + key, value);
if (ttl > 0) {
await this.client.expire(this.prefix + key, ttl);
}
return result;
} catch (error) {
console.error('Redis递增缓存错误:', error);
return null;
}
}
async mget(keys) {
try {
const prefixedKeys = keys.map(key => this.prefix + key);
const values = await this.client.mget(prefixedKeys);
return values.map(value => value ? JSON.parse(value) : null);
} catch (error) {
console.error('Redis批量获取缓存错误:', error);
return keys.map(() => null);
}
}
async mset(pairs, ttl = 3600) {
try {
const pipeline = this.client.pipeline();
for (const [key, value] of pairs) {
const serialized = JSON.stringify(value);
if (ttl > 0) {
pipeline.setex(this.prefix + key, ttl, serialized);
} else {
pipeline.set(this.prefix + key, serialized);
}
}
await pipeline.exec();
return true;
} catch (error) {
console.error('Redis批量设置缓存错误:', error);
return false;
}
}
}
const redisCache = new RedisCache(client);
3. 多层缓存策略:
class MultiLayerCache {
constructor(options = {}) {
// L1缓存:内存缓存(最快,容量小)
this.l1Cache = new NodeCache({
stdTTL: options.l1Ttl || 300, // 5分钟
maxKeys: options.l1MaxKeys || 1000
});
// L2缓存:Redis缓存(较快,容量大)
this.l2Cache = new RedisCache(client);
this.stats = {
l1Hits: 0,
l2Hits: 0,
misses: 0
};
}
async get(key) {
// 先查L1缓存
let value = this.l1Cache.get(key);
if (value !== undefined) {
this.stats.l1Hits++;
return value;
}
// 再查L2缓存
value = await this.l2Cache.get(key);
if (value !== null) {
this.stats.l2Hits++;
// 回写L1缓存
this.l1Cache.set(key, value, 300);
return value;
}
this.stats.misses++;
return null;
}
async set(key, value, ttl = { l1: 300, l2: 3600 }) {
// 同时设置两层缓存
this.l1Cache.set(key, value, ttl.l1);
await this.l2Cache.set(key, value, ttl.l2);
}
async invalidate(key) {
this.l1Cache.del(key);
await this.l2Cache.del(key);
}
async invalidatePattern(pattern) {
// L1缓存模式删除
const l1Keys = this.l1Cache.keys();
l1Keys.forEach(key => {
if (key.includes(pattern)) {
this.l1Cache.del(key);
}
});
// L2缓存模式删除
const l2Keys = await client.keys(`cache:*${pattern}*`);
if (l2Keys.length > 0) {
const keysToDelete = l2Keys.map(key => key.replace('cache:', ''));
await Promise.all(keysToDelete.map(key => this.l2Cache.del(key)));
}
}
getStats() {
const totalRequests = this.stats.l1Hits + this.stats.l2Hits + this.stats.misses;
return {
...this.stats,
totalRequests,
l1HitRate: totalRequests > 0 ? ((this.stats.l1Hits / totalRequests) * 100).toFixed(2) + '%' : '0%',
l2HitRate: totalRequests > 0 ? ((this.stats.l2Hits / totalRequests) * 100).toFixed(2) + '%' : '0%',
overallHitRate: totalRequests > 0 ? (((this.stats.l1Hits + this.stats.l2Hits) / totalRequests) * 100).toFixed(2) + '%' : '0%'
};
}
}
const multiCache = new MultiLayerCache();
4. 缓存装饰器和中间件:
// 缓存装饰器
function cached(ttl = 3600, keyGenerator) {
return function(target, propertyName, descriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function(...args) {
const key = keyGenerator ? keyGenerator(...args) : `${propertyName}:${JSON.stringify(args)}`;
// 尝试从缓存获取
let result = await multiCache.get(key);
if (result !== null) {
return result;
}
// 执行原方法
result = await originalMethod.apply(this, args);
// 缓存结果
if (result !== null && result !== undefined) {
await multiCache.set(key, result, { l1: ttl / 10, l2: ttl });
}
return result;
};
};
}
// 使用缓存装饰器
class UserService {
@cached(600, (userId) => `user:${userId}`)
async getUserById(userId) {
return await database.getUserById(userId);
}
@cached(300, (status, page) => `users:${status}:page:${page}`)
async getUsersByStatus(status, page = 1) {
return await database.getUsersByStatus(status, page);
}
}
// Express缓存中间件
function cacheMiddleware(ttl = 300) {
return async (req, res, next) => {
if (req.method !== 'GET') {
return next();
}
const key = `http:${req.originalUrl || req.url}`;
try {
const cached = await multiCache.get(key);
if (cached) {
res.set('X-Cache', 'HIT');
return res.json(cached);
}
// 重写res.json来缓存响应
const originalJson = res.json;
res.json = function(data) {
multiCache.set(key, data, { l1: ttl / 10, l2: ttl });
res.set('X-Cache', 'MISS');
originalJson.call(this, data);
};
next();
} catch (error) {
console.error('缓存中间件错误:', error);
next();
}
};
}
// 使用缓存中间件
app.get('/api/users', cacheMiddleware(600), async (req, res) => {
const users = await userService.getAllUsers();
res.json(users);
});
5. 缓存失效策略:
class CacheInvalidationService {
constructor(cache) {
this.cache = cache;
this.dependencies = new Map(); // key -> Set of dependent keys
}
// 设置缓存依赖关系
addDependency(key, dependsOn) {
if (!this.dependencies.has(dependsOn)) {
this.dependencies.set(dependsOn, new Set());
}
this.dependencies.get(dependsOn).add(key);
}
// 级联删除依赖的缓存
async invalidateWithDependencies(key) {
await this.cache.invalidate(key);
const dependents = this.dependencies.get(key);
if (dependents) {
for (const dependent of dependents) {
await this.invalidateWithDependencies(dependent);
}
this.dependencies.delete(key);
}
}
// 基于标签的缓存失效
async invalidateByTag(tag) {
await this.cache.invalidatePattern(tag);
}
// 时间窗口缓存失效
scheduleInvalidation(key, delay) {
setTimeout(async () => {
await this.cache.invalidate(key);
}, delay);
}
}
// 使用示例
const cacheInvalidator = new CacheInvalidationService(multiCache);
async function updateUser(userId, userData) {
// 更新数据库
const updatedUser = await database.updateUser(userId, userData);
// 失效相关缓存
await cacheInvalidator.invalidateWithDependencies(`user:${userId}`);
await cacheInvalidator.invalidateByTag(`user_list`);
return updatedUser;
}
6. 缓存预热和刷新:
class CacheWarmer {
constructor(cache) {
this.cache = cache;
this.warmupTasks = new Map();
}
// 注册预热任务
registerWarmupTask(key, loader, interval = 3600000) { // 默认1小时
this.warmupTasks.set(key, { loader, interval });
}
// 执行预热
async warmup(keys = []) {
const tasksToWarm = keys.length > 0
? keys.filter(key => this.warmupTasks.has(key))
: Array.from(this.warmupTasks.keys());
console.log(`开始预热缓存,共${tasksToWarm.length}个任务`);
const results = await Promise.allSettled(
tasksToWarm.map(async (key) => {
const { loader } = this.warmupTasks.get(key);
try {
const data = await loader();
await this.cache.set(key, data);
console.log(`缓存预热完成: ${key}`);
} catch (error) {
console.error(`缓存预热失败: ${key}`, error);
}
})
);
const successful = results.filter(r => r.status === 'fulfilled').length;
console.log(`缓存预热完成: ${successful}/${tasksToWarm.length}`);
}
// 启动定期刷新
startPeriodicRefresh() {
for (const [key, { loader, interval }] of this.warmupTasks) {
setInterval(async () => {
try {
const data = await loader();
await this.cache.set(key, data);
console.log(`定期刷新缓存: ${key}`);
} catch (error) {
console.error(`定期刷新失败: ${key}`, error);
}
}, interval);
}
}
}
const cacheWarmer = new CacheWarmer(multiCache);
// 注册预热任务
cacheWarmer.registerWarmupTask('popular_products', async () => {
return await database.getPopularProducts();
}, 1800000); // 30分钟刷新
cacheWarmer.registerWarmupTask('site_config', async () => {
return await database.getSiteConfig();
}, 3600000); // 1小时刷新
// 应用启动时预热
cacheWarmer.warmup().then(() => {
console.log('初始缓存预热完成');
cacheWarmer.startPeriodicRefresh();
});
缓存最佳实践:
How to implement WebSocket communication in Node.js?
How to implement WebSocket communication in Node.js?
考察点:实时通信实现。
答案:
WebSocket是一种在单个TCP连接上进行全双工通信的协议,非常适合实时应用如聊天室、在线游戏、实时数据推送等。Node.js提供了多种实现WebSocket通信的方式。
1. 使用原生ws库:
const WebSocket = require('ws');
const http = require('http');
// 创建HTTP服务器
const server = http.createServer();
// 创建WebSocket服务器
const wss = new WebSocket.Server({ server });
// 连接处理
wss.on('connection', (ws, req) => {
console.log('新的WebSocket连接');
console.log('客户端IP:', req.connection.remoteAddress);
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: '连接成功',
timestamp: Date.now()
}));
// 消息处理
ws.on('message', (message) => {
try {
const data = JSON.parse(message);
console.log('接收到消息:', data);
// 处理不同类型的消息
switch (data.type) {
case 'chat':
handleChatMessage(ws, data);
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
break;
default:
ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' }));
}
} catch (error) {
console.error('消息解析错误:', error);
ws.send(JSON.stringify({ type: 'error', message: '消息格式错误' }));
}
});
// 连接关闭
ws.on('close', (code, reason) => {
console.log('WebSocket连接关闭:', code, reason);
});
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket错误:', error);
});
});
function handleChatMessage(ws, data) {
const message = {
type: 'chat',
user: data.user,
message: data.message,
timestamp: Date.now()
};
// 广播消息给所有连接的客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(message));
}
});
}
server.listen(3000, () => {
console.log('WebSocket服务器运行在端口3000');
});
2. 使用Socket.io(推荐):
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: {
origin: "http://localhost:3000",
methods: ["GET", "POST"]
}
});
// 用户管理
const users = new Map();
const rooms = new Map();
// 中间件 - 身份验证
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (token) {
// 验证JWT token
try {
const user = jwt.verify(token, JWT_SECRET);
socket.userId = user.id;
socket.username = user.username;
next();
} catch (error) {
next(new Error('Authentication error'));
}
} else {
next(new Error('Authentication error'));
}
});
// 连接处理
io.on('connection', (socket) => {
console.log(`用户 ${socket.username} 连接成功`);
// 添加用户到在线列表
users.set(socket.id, {
id: socket.userId,
username: socket.username,
socketId: socket.id,
joinedAt: Date.now()
});
// 广播在线用户列表
io.emit('users:online', Array.from(users.values()));
// 加入房间
socket.on('room:join', (roomId) => {
socket.join(roomId);
// 更新房间信息
if (!rooms.has(roomId)) {
rooms.set(roomId, { users: new Set(), messages: [] });
}
rooms.get(roomId).users.add(socket.id);
// 通知房间其他用户
socket.to(roomId).emit('user:joined', {
username: socket.username,
message: `${socket.username} 加入了房间`
});
// 发送房间历史消息
const roomData = rooms.get(roomId);
socket.emit('room:messages', roomData.messages.slice(-50)); // 最近50条消息
});
// 离开房间
socket.on('room:leave', (roomId) => {
socket.leave(roomId);
if (rooms.has(roomId)) {
rooms.get(roomId).users.delete(socket.id);
// 通知房间其他用户
socket.to(roomId).emit('user:left', {
username: socket.username,
message: `${socket.username} 离开了房间`
});
}
});
// 处理聊天消息
socket.on('chat:message', (data) => {
const message = {
id: generateMessageId(),
user: socket.username,
userId: socket.userId,
content: data.message,
roomId: data.roomId,
timestamp: Date.now()
};
// 保存消息到房间
if (rooms.has(data.roomId)) {
rooms.get(data.roomId).messages.push(message);
// 限制消息数量
const roomMessages = rooms.get(data.roomId).messages;
if (roomMessages.length > 1000) {
roomMessages.splice(0, roomMessages.length - 1000);
}
}
// 广播消息到房间
io.to(data.roomId).emit('chat:message', message);
// 保存到数据库(可选)
saveChatMessage(message).catch(console.error);
});
// 处理私聊
socket.on('private:message', (data) => {
const targetUser = Array.from(users.values())
.find(user => user.username === data.to);
if (targetUser) {
const message = {
from: socket.username,
to: data.to,
content: data.message,
timestamp: Date.now()
};
// 发送给目标用户
io.to(targetUser.socketId).emit('private:message', message);
// 确认发送给发送者
socket.emit('private:sent', message);
} else {
socket.emit('error', { message: '用户不在线' });
}
});
// 输入状态
socket.on('typing:start', (roomId) => {
socket.to(roomId).emit('user:typing', {
username: socket.username,
typing: true
});
});
socket.on('typing:stop', (roomId) => {
socket.to(roomId).emit('user:typing', {
username: socket.username,
typing: false
});
});
// 断线处理
socket.on('disconnect', (reason) => {
console.log(`用户 ${socket.username} 断开连接:`, reason);
// 从在线用户列表移除
users.delete(socket.id);
// 从所有房间移除
rooms.forEach((room, roomId) => {
if (room.users.has(socket.id)) {
room.users.delete(socket.id);
socket.to(roomId).emit('user:left', {
username: socket.username,
message: `${socket.username} 离开了房间`
});
}
});
// 更新在线用户列表
io.emit('users:online', Array.from(users.values()));
});
});
3. WebSocket集群和扩展性:
const cluster = require('cluster');
const redis = require('redis');
const { createAdapter } = require('@socket.io/redis-adapter');
if (cluster.isMaster) {
// 主进程创建工作进程
const numWorkers = require('os').cpus().length;
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('exit', (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
} else {
// 工作进程
const app = express();
const server = http.createServer(app);
const io = socketIo(server);
// Redis适配器用于多进程通信
const pubClient = redis.createClient({ host: 'localhost', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
// Socket.io逻辑
io.on('connection', (socket) => {
// ... 连接处理逻辑
});
server.listen(3000 + cluster.worker.id);
}
4. 高级WebSocket功能:
// 心跳检测
class WebSocketManager {
constructor() {
this.connections = new Map();
this.heartbeatInterval = 30000; // 30秒心跳
this.startHeartbeat();
}
addConnection(socket) {
const connectionInfo = {
socket,
lastPing: Date.now(),
isAlive: true
};
this.connections.set(socket.id, connectionInfo);
// 设置pong处理
socket.on('pong', () => {
connectionInfo.lastPing = Date.now();
connectionInfo.isAlive = true;
});
}
removeConnection(socketId) {
this.connections.delete(socketId);
}
startHeartbeat() {
setInterval(() => {
this.connections.forEach((conn, socketId) => {
if (!conn.isAlive) {
// 连接已死,终止连接
conn.socket.terminate();
this.connections.delete(socketId);
return;
}
// 检查是否超时
const timeSinceLastPing = Date.now() - conn.lastPing;
if (timeSinceLastPing > this.heartbeatInterval * 2) {
conn.isAlive = false;
conn.socket.ping();
}
});
}, this.heartbeatInterval);
}
broadcast(message) {
this.connections.forEach((conn) => {
if (conn.socket.readyState === WebSocket.OPEN) {
conn.socket.send(JSON.stringify(message));
}
});
}
sendToUser(userId, message) {
const connection = Array.from(this.connections.values())
.find(conn => conn.socket.userId === userId);
if (connection && connection.socket.readyState === WebSocket.OPEN) {
connection.socket.send(JSON.stringify(message));
return true;
}
return false;
}
}
const wsManager = new WebSocketManager();
5. WebSocket安全性:
const rateLimit = require('express-rate-limit');
// 连接速率限制
const connectionLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 10, // 每个IP最多10个连接
message: '连接过于频繁,请稍后再试'
});
app.use('/socket.io/', connectionLimiter);
// WebSocket身份验证和授权
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token;
if (!token) {
throw new Error('No token provided');
}
// 验证JWT
const decoded = jwt.verify(token, JWT_SECRET);
// 检查用户权限
const user = await getUserById(decoded.userId);
if (!user || user.status !== 'active') {
throw new Error('User not active');
}
socket.user = user;
next();
} catch (error) {
next(new Error('Authentication failed'));
}
});
// 消息验证和清理
function validateMessage(message) {
if (!message || typeof message !== 'string') {
throw new Error('Invalid message format');
}
if (message.length > 1000) {
throw new Error('Message too long');
}
// 过滤敏感内容
return sanitizeHtml(message);
}
// WebSocket消息加密
const crypto = require('crypto');
function encryptMessage(message, key) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipher('aes-256-cbc', key, iv);
let encrypted = cipher.update(message, 'utf8', 'hex');
encrypted += cipher.final('hex');
return iv.toString('hex') + ':' + encrypted;
}
function decryptMessage(encryptedMessage, key) {
const parts = encryptedMessage.split(':');
const iv = Buffer.from(parts[0], 'hex');
const encrypted = parts[1];
const decipher = crypto.createDecipher('aes-256-cbc', key, iv);
let decrypted = decipher.update(encrypted, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}
6. 客户端示例:
// 浏览器客户端
const socket = io('http://localhost:3000', {
auth: {
token: localStorage.getItem('jwt_token')
}
});
socket.on('connect', () => {
console.log('连接成功');
});
socket.on('chat:message', (message) => {
displayMessage(message);
});
socket.on('disconnect', () => {
console.log('连接断开');
});
// 发送消息
function sendMessage(roomId, message) {
socket.emit('chat:message', {
roomId,
message
});
}
// 加入房间
function joinRoom(roomId) {
socket.emit('room:join', roomId);
}
WebSocket最佳实践:
How does memory management work in Node.js? How to avoid memory leaks?
How does memory management work in Node.js? How to avoid memory leaks?
考察点:内存管理。
答案:
Node.js的内存管理基于V8引擎的垃圾回收机制。了解内存管理原理和常见的内存泄漏场景对于开发高性能的Node.js应用至关重要。
1. V8内存结构:
// 查看内存使用情况
function getMemoryUsage() {
const usage = process.memoryUsage();
return {
// 常驻内存集,包含堆、代码段、栈
rss: `${Math.round(usage.rss / 1024 / 1024 * 100) / 100} MB`,
// V8堆的总分配内存
heapTotal: `${Math.round(usage.heapTotal / 1024 / 1024 * 100) / 100} MB`,
// V8堆的已使用内存
heapUsed: `${Math.round(usage.heapUsed / 1024 / 1024 * 100) / 100} MB`,
// 外部C++对象绑定到JavaScript对象的内存
external: `${Math.round(usage.external / 1024 / 1024 * 100) / 100} MB`,
// ArrayBuffer分配的内存
arrayBuffers: `${Math.round(usage.arrayBuffers / 1024 / 1024 * 100) / 100} MB`
};
}
// 定期监控内存使用
setInterval(() => {
console.log('内存使用情况:', getMemoryUsage());
}, 10000);
// V8堆统计信息
if (global.gc) {
global.gc(); // 手动触发垃圾回收
const heapStats = v8.getHeapStatistics();
console.log('堆统计:', heapStats);
}
2. 垃圾回收机制:
const v8 = require('v8');
// 分代垃圾回收
class MemoryManager {
constructor() {
this.youngGeneration = new Map(); // 新生代 - Scavenge算法
this.oldGeneration = new Map(); // 老生代 - Mark-Sweep算法
}
// 监控垃圾回收
startGCMonitoring() {
// 在Node.js启动时添加 --expose-gc 标志
if (global.gc) {
const PerformanceObserver = require('perf_hooks').PerformanceObserver;
const obs = new PerformanceObserver((list) => {
const entries = list.getEntries();
entries.forEach((entry) => {
console.log(`GC ${entry.name}: ${entry.duration.toFixed(2)}ms`);
});
});
obs.observe({ entryTypes: ['gc'] });
}
}
// 获取堆快照
async takeHeapSnapshot() {
const heapSnapshot = v8.getHeapSnapshot();
const chunks = [];
for await (const chunk of heapSnapshot) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
// 内存压力检测
checkMemoryPressure() {
const usage = process.memoryUsage();
const heapUsedMB = usage.heapUsed / 1024 / 1024;
const heapTotalMB = usage.heapTotal / 1024 / 1024;
const heapUsagePercent = (heapUsedMB / heapTotalMB) * 100;
if (heapUsagePercent > 80) {
console.warn('内存使用率过高:', heapUsagePercent.toFixed(2) + '%');
// 触发垃圾回收
if (global.gc) {
global.gc();
}
return true;
}
return false;
}
}
const memoryManager = new MemoryManager();
memoryManager.startGCMonitoring();
3. 常见内存泄漏场景及解决方案:
场景1:事件监听器泄漏
// 错误示例 - 事件监听器累积
class BadEventHandler {
constructor() {
setInterval(() => {
process.on('data', this.handleData.bind(this)); // 每次都添加新监听器
}, 1000);
}
handleData(data) {
console.log('处理数据:', data);
}
}
// 正确示例 - 正确管理事件监听器
class GoodEventHandler {
constructor() {
this.handleData = this.handleData.bind(this);
this.isListening = false;
this.setupListener();
}
setupListener() {
if (!this.isListening) {
process.on('data', this.handleData);
this.isListening = true;
}
}
handleData(data) {
console.log('处理数据:', data);
}
cleanup() {
if (this.isListening) {
process.removeListener('data', this.handleData);
this.isListening = false;
}
}
}
// EventEmitter最大监听器数量检查
const EventEmitter = require('events');
class SafeEmitter extends EventEmitter {
constructor() {
super();
this.setMaxListeners(10); // 设置最大监听器数量
}
addSafeListener(event, listener) {
const currentCount = this.listenerCount(event);
if (currentCount >= this.getMaxListeners()) {
console.warn(`事件 ${event} 监听器数量过多: ${currentCount}`);
return false;
}
this.on(event, listener);
return true;
}
}
场景2:闭包和变量引用泄漏
// 错误示例 - 闭包导致的内存泄漏
class BadClosureHandler {
constructor() {
this.cache = new Map();
this.largeData = new Array(1000000).fill('data');
}
createHandler(id) {
// 闭包引用了整个this,包括largeData
return () => {
console.log('处理ID:', id);
return this.largeData.length; // 引用大对象
};
}
}
// 正确示例 - 避免不必要的闭包引用
class GoodClosureHandler {
constructor() {
this.cache = new Map();
this.largeData = new Array(1000000).fill('data');
}
createHandler(id) {
const dataLength = this.largeData.length; // 只保存需要的值
// 闭包只引用必要的变量
return () => {
console.log('处理ID:', id);
return dataLength;
};
}
// 使用WeakMap避免循环引用
createWeakHandler(obj) {
const handlers = new WeakMap();
return (key) => {
if (!handlers.has(obj)) {
handlers.set(obj, new Map());
}
const objHandlers = handlers.get(obj);
if (!objHandlers.has(key)) {
objHandlers.set(key, () => {
console.log('处理对象:', obj.id, '键:', key);
});
}
return objHandlers.get(key);
};
}
}
场景3:定时器泄漏
// 错误示例 - 定时器没有清理
class BadTimerHandler {
constructor() {
this.data = new Array(100000).fill('data');
setInterval(() => {
this.processData();
}, 1000); // 定时器持有对this的引用
}
processData() {
console.log('处理数据:', this.data.length);
}
}
// 正确示例 - 正确管理定时器
class GoodTimerHandler {
constructor() {
this.data = new Array(100000).fill('data');
this.timers = new Set();
this.isActive = true;
}
startProcessing() {
const timer = setInterval(() => {
if (this.isActive) {
this.processData();
}
}, 1000);
this.timers.add(timer);
return timer;
}
processData() {
console.log('处理数据:', this.data.length);
}
cleanup() {
this.isActive = false;
this.timers.forEach(timer => clearInterval(timer));
this.timers.clear();
this.data = null;
}
}
// 定时器管理器
class TimerManager {
constructor() {
this.timers = new Map();
}
setTimeout(callback, delay, id = null) {
const timerId = id || Math.random().toString(36);
const timer = setTimeout(() => {
callback();
this.timers.delete(timerId);
}, delay);
this.timers.set(timerId, timer);
return timerId;
}
setInterval(callback, interval, id = null) {
const timerId = id || Math.random().toString(36);
const timer = setInterval(callback, interval);
this.timers.set(timerId, timer);
return timerId;
}
clear(id) {
const timer = this.timers.get(id);
if (timer) {
clearTimeout(timer);
clearInterval(timer);
this.timers.delete(id);
}
}
clearAll() {
this.timers.forEach((timer) => {
clearTimeout(timer);
clearInterval(timer);
});
this.timers.clear();
}
}
4. 内存监控和诊断:
const fs = require('fs');
const path = require('path');
class MemoryProfiler {
constructor() {
this.profiles = [];
this.isMonitoring = false;
}
startMonitoring(interval = 5000) {
if (this.isMonitoring) return;
this.isMonitoring = true;
this.monitorTimer = setInterval(() => {
this.collectMemoryProfile();
}, interval);
}
stopMonitoring() {
if (this.monitorTimer) {
clearInterval(this.monitorTimer);
this.isMonitoring = false;
}
}
collectMemoryProfile() {
const usage = process.memoryUsage();
const profile = {
timestamp: Date.now(),
rss: usage.rss,
heapUsed: usage.heapUsed,
heapTotal: usage.heapTotal,
external: usage.external
};
this.profiles.push(profile);
// 保留最近1000个采样点
if (this.profiles.length > 1000) {
this.profiles.shift();
}
// 检测内存增长趋势
this.detectMemoryLeaks();
}
detectMemoryLeaks() {
if (this.profiles.length < 10) return;
const recent = this.profiles.slice(-10);
const growth = recent[recent.length - 1].heapUsed - recent[0].heapUsed;
const timeSpan = recent[recent.length - 1].timestamp - recent[0].timestamp;
// 如果10个采样点内堆内存增长超过10MB
if (growth > 10 * 1024 * 1024) {
console.warn('检测到可能的内存泄漏:');
console.warn(`时间跨度: ${timeSpan}ms`);
console.warn(`内存增长: ${Math.round(growth / 1024 / 1024)} MB`);
this.generateMemoryReport();
}
}
generateMemoryReport() {
const report = {
timestamp: new Date().toISOString(),
profiles: this.profiles.slice(-50), // 最近50个采样点
gcStats: v8.getHeapStatistics(),
processInfo: {
pid: process.pid,
uptime: process.uptime(),
version: process.version
}
};
const reportPath = path.join(__dirname, 'memory-reports', `memory-${Date.now()}.json`);
fs.writeFileSync(reportPath, JSON.stringify(report, null, 2));
console.log('内存报告已生成:', reportPath);
}
// 生成堆快照
async takeHeapSnapshot(filename) {
if (!global.gc) {
console.warn('需要 --expose-gc 标志来生成堆快照');
return;
}
// 先进行垃圾回收
global.gc();
const heapSnapshot = v8.getHeapSnapshot();
const snapshotPath = path.join(__dirname, 'heap-snapshots', filename || `heap-${Date.now()}.heapsnapshot`);
const writeStream = fs.createWriteStream(snapshotPath);
heapSnapshot.pipe(writeStream);
return new Promise((resolve, reject) => {
writeStream.on('finish', () => {
console.log('堆快照已保存:', snapshotPath);
resolve(snapshotPath);
});
writeStream.on('error', reject);
});
}
}
const profiler = new MemoryProfiler();
profiler.startMonitoring();
5. 内存优化策略:
// 对象池模式
class ObjectPool {
constructor(createFn, resetFn, maxSize = 100) {
this.createFn = createFn;
this.resetFn = resetFn;
this.maxSize = maxSize;
this.pool = [];
}
acquire() {
return this.pool.length > 0 ? this.pool.pop() : this.createFn();
}
release(obj) {
if (this.pool.length < this.maxSize) {
this.resetFn(obj);
this.pool.push(obj);
}
}
clear() {
this.pool.length = 0;
}
}
// 使用对象池
const bufferPool = new ObjectPool(
() => Buffer.allocUnsafe(1024),
(buffer) => buffer.fill(0),
50
);
function processData(data) {
const buffer = bufferPool.acquire();
try {
// 使用buffer处理数据
buffer.write(data);
return buffer.toString();
} finally {
bufferPool.release(buffer);
}
}
// 流式处理大数据
const { Transform } = require('stream');
class MemoryEfficientTransform extends Transform {
constructor(options) {
super(options);
this.chunkSize = options.chunkSize || 1024;
}
_transform(chunk, encoding, callback) {
// 分块处理,避免加载大对象到内存
for (let i = 0; i < chunk.length; i += this.chunkSize) {
const slice = chunk.slice(i, i + this.chunkSize);
this.push(this.processChunk(slice));
}
callback();
}
processChunk(chunk) {
// 处理数据块
return chunk.toString().toUpperCase();
}
}
内存管理最佳实践:
How to implement scheduled tasks and job scheduling in Node.js?
How to implement scheduled tasks and job scheduling in Node.js?
考察点:任务调度机制。
答案:
Node.js中实现定时任务和调度有多种方式,从简单的setTimeout/setInterval到复杂的任务队列系统。选择合适的方案取决于任务的复杂性、可靠性要求和系统规模。
1. 基础定时器实现:
// 简单定时任务管理器
class TaskScheduler {
constructor() {
this.tasks = new Map();
this.intervals = new Map();
this.timeouts = new Map();
}
// 一次性任务
scheduleOnce(taskId, callback, delay) {
if (this.timeouts.has(taskId)) {
clearTimeout(this.timeouts.get(taskId));
}
const timer = setTimeout(() => {
try {
callback();
console.log(`任务 ${taskId} 执行完成`);
} catch (error) {
console.error(`任务 ${taskId} 执行失败:`, error);
} finally {
this.timeouts.delete(taskId);
}
}, delay);
this.timeouts.set(taskId, timer);
console.log(`任务 ${taskId} 已安排在 ${delay}ms 后执行`);
}
// 重复执行任务
scheduleRepeating(taskId, callback, interval) {
if (this.intervals.has(taskId)) {
clearInterval(this.intervals.get(taskId));
}
const timer = setInterval(async () => {
try {
await callback();
console.log(`重复任务 ${taskId} 执行完成`);
} catch (error) {
console.error(`重复任务 ${taskId} 执行失败:`, error);
}
}, interval);
this.intervals.set(taskId, timer);
console.log(`重复任务 ${taskId} 已安排,间隔 ${interval}ms`);
}
// 取消任务
cancel(taskId) {
if (this.timeouts.has(taskId)) {
clearTimeout(this.timeouts.get(taskId));
this.timeouts.delete(taskId);
}
if (this.intervals.has(taskId)) {
clearInterval(this.intervals.get(taskId));
this.intervals.delete(taskId);
}
console.log(`任务 ${taskId} 已取消`);
}
// 清理所有任务
cleanup() {
this.timeouts.forEach(timer => clearTimeout(timer));
this.intervals.forEach(timer => clearInterval(timer));
this.timeouts.clear();
this.intervals.clear();
}
// 获取任务状态
getStatus() {
return {
activeTimeouts: this.timeouts.size,
activeIntervals: this.intervals.size,
tasks: Array.from(this.timeouts.keys()).concat(Array.from(this.intervals.keys()))
};
}
}
const scheduler = new TaskScheduler();
// 使用示例
scheduler.scheduleOnce('backup', () => {
console.log('执行数据备份');
}, 60000); // 1分钟后执行
scheduler.scheduleRepeating('cleanup', () => {
console.log('清理临时文件');
}, 300000); // 每5分钟执行
2. Cron表达式调度(使用node-cron):
const cron = require('node-cron');
const fs = require('fs').promises;
class CronScheduler {
constructor() {
this.tasks = new Map();
}
// 添加cron任务
addTask(taskId, cronExpression, callback, options = {}) {
if (this.tasks.has(taskId)) {
this.tasks.get(taskId).stop();
}
const task = cron.schedule(cronExpression, async () => {
const startTime = Date.now();
try {
console.log(`开始执行任务 ${taskId} - ${new Date().toISOString()}`);
await callback();
const duration = Date.now() - startTime;
console.log(`任务 ${taskId} 执行完成,耗时: ${duration}ms`);
// 记录任务执行日志
await this.logTaskExecution(taskId, true, duration);
} catch (error) {
const duration = Date.now() - startTime;
console.error(`任务 ${taskId} 执行失败:`, error);
await this.logTaskExecution(taskId, false, duration, error.message);
// 错误处理策略
if (options.onError) {
options.onError(error);
}
}
}, {
scheduled: false,
timezone: options.timezone || 'Asia/Shanghai'
});
this.tasks.set(taskId, {
task,
cronExpression,
options,
createdAt: new Date()
});
if (options.autoStart !== false) {
task.start();
}
console.log(`Cron任务 ${taskId} 已添加: ${cronExpression}`);
}
// 启动任务
startTask(taskId) {
const taskInfo = this.tasks.get(taskId);
if (taskInfo) {
taskInfo.task.start();
console.log(`任务 ${taskId} 已启动`);
}
}
// 停止任务
stopTask(taskId) {
const taskInfo = this.tasks.get(taskId);
if (taskInfo) {
taskInfo.task.stop();
console.log(`任务 ${taskId} 已停止`);
}
}
// 删除任务
removeTask(taskId) {
const taskInfo = this.tasks.get(taskId);
if (taskInfo) {
taskInfo.task.destroy();
this.tasks.delete(taskId);
console.log(`任务 ${taskId} 已删除`);
}
}
// 记录任务执行日志
async logTaskExecution(taskId, success, duration, error = null) {
const logEntry = {
taskId,
success,
duration,
error,
timestamp: new Date().toISOString()
};
try {
await fs.appendFile('task-logs.json', JSON.stringify(logEntry) + '\n');
} catch (err) {
console.error('写入任务日志失败:', err);
}
}
// 获取任务列表
getTasks() {
const tasks = [];
for (const [taskId, info] of this.tasks.entries()) {
tasks.push({
id: taskId,
cronExpression: info.cronExpression,
isRunning: info.task.running,
createdAt: info.createdAt,
options: info.options
});
}
return tasks;
}
}
const cronScheduler = new CronScheduler();
// 使用示例
cronScheduler.addTask('daily-report', '0 9 * * *', async () => {
// 每天上午9点生成报告
await generateDailyReport();
});
cronScheduler.addTask('weekly-cleanup', '0 2 * * 0', async () => {
// 每周日凌晨2点清理数据
await weeklyCleanup();
}, {
timezone: 'Asia/Shanghai',
onError: (error) => {
// 发送错误通知
sendErrorNotification('weekly-cleanup', error);
}
});
cronScheduler.addTask('health-check', '*/5 * * * *', async () => {
// 每5分钟执行健康检查
await performHealthCheck();
});
3. 任务队列系统(使用Bull):
const Queue = require('bull');
const Redis = require('redis');
// 创建Redis连接
const redisClient = Redis.createClient({
host: 'localhost',
port: 6379
});
class JobQueue {
constructor() {
// 创建不同类型的队列
this.emailQueue = new Queue('email processing', {
redis: { host: 'localhost', port: 6379 }
});
this.reportQueue = new Queue('report generation', {
redis: { host: 'localhost', port: 6379 }
});
this.setupProcessors();
this.setupEventHandlers();
}
// 设置任务处理器
setupProcessors() {
// 邮件队列处理器
this.emailQueue.process('send-email', 5, async (job) => {
const { to, subject, content } = job.data;
console.log(`处理邮件任务: ${job.id}`);
// 模拟发送邮件
await this.sendEmail(to, subject, content);
// 更新进度
job.progress(100);
return { success: true, sentAt: new Date() };
});
// 报告队列处理器
this.reportQueue.process('generate-report', 2, async (job) => {
const { reportType, params } = job.data;
console.log(`生成报告: ${reportType}`);
// 更新进度
job.progress(10);
// 生成报告
const report = await this.generateReport(reportType, params);
job.progress(100);
return { reportId: report.id, generatedAt: new Date() };
});
}
// 设置事件处理器
setupEventHandlers() {
this.emailQueue.on('completed', (job, result) => {
console.log(`邮件任务 ${job.id} 完成:`, result);
});
this.emailQueue.on('failed', (job, err) => {
console.error(`邮件任务 ${job.id} 失败:`, err);
});
this.reportQueue.on('progress', (job, progress) => {
console.log(`报告任务 ${job.id} 进度: ${progress}%`);
});
}
// 添加邮件任务
async addEmailJob(emailData, options = {}) {
const job = await this.emailQueue.add('send-email', emailData, {
delay: options.delay || 0,
attempts: options.attempts || 3,
backoff: options.backoff || 'exponential',
removeOnComplete: 10,
removeOnFail: 5
});
console.log(`邮件任务已添加: ${job.id}`);
return job;
}
// 添加报告任务
async addReportJob(reportData, options = {}) {
const job = await this.reportQueue.add('generate-report', reportData, {
priority: options.priority || 0,
attempts: 2,
backoff: 'fixed',
removeOnComplete: 5
});
console.log(`报告任务已添加: ${job.id}`);
return job;
}
// 添加定时任务
async addScheduledJob(queueName, jobType, data, cronExpression) {
let queue;
switch (queueName) {
case 'email':
queue = this.emailQueue;
break;
case 'report':
queue = this.reportQueue;
break;
default:
throw new Error(`未知队列: ${queueName}`);
}
const job = await queue.add(jobType, data, {
repeat: { cron: cronExpression },
removeOnComplete: 10,
removeOnFail: 5
});
console.log(`定时任务已添加: ${job.id}, 表达式: ${cronExpression}`);
return job;
}
// 获取队列状态
async getQueueStatus(queueName) {
let queue;
switch (queueName) {
case 'email':
queue = this.emailQueue;
break;
case 'report':
queue = this.reportQueue;
break;
default:
throw new Error(`未知队列: ${queueName}`);
}
const waiting = await queue.getWaiting();
const active = await queue.getActive();
const completed = await queue.getCompleted();
const failed = await queue.getFailed();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length
};
}
// 业务逻辑方法
async sendEmail(to, subject, content) {
// 模拟邮件发送延迟
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`邮件已发送给 ${to}: ${subject}`);
}
async generateReport(reportType, params) {
// 模拟报告生成
await new Promise(resolve => setTimeout(resolve, 5000));
return {
id: `report_${Date.now()}`,
type: reportType,
params,
generatedAt: new Date()
};
}
}
const jobQueue = new JobQueue();
// 使用示例
// 立即发送邮件
jobQueue.addEmailJob({
to: '[email protected]',
subject: '欢迎注册',
content: '感谢您的注册'
});
// 延迟发送邮件
jobQueue.addEmailJob({
to: '[email protected]',
subject: '提醒',
content: '这是一个延迟消息'
}, { delay: 60000 }); // 1分钟后发送
// 生成报告
jobQueue.addReportJob({
reportType: 'sales',
params: { startDate: '2024-01-01', endDate: '2024-01-31' }
}, { priority: 1 });
// 每天定时发送邮件
jobQueue.addScheduledJob('email', 'send-email', {
to: '[email protected]',
subject: '每日报告',
content: '这是每日自动报告'
}, '0 9 * * *');
4. 高级任务调度系统:
class AdvancedTaskScheduler {
constructor() {
this.tasks = new Map();
this.dependencies = new Map();
this.executionHistory = [];
this.retryPolicies = new Map();
}
// 添加带依赖的任务
addTask(taskId, taskFn, options = {}) {
const task = {
id: taskId,
fn: taskFn,
dependencies: options.dependencies || [],
cronExpression: options.cron,
retryPolicy: options.retryPolicy || { maxRetries: 3, backoff: 'exponential' },
timeout: options.timeout || 30000,
status: 'pending',
createdAt: Date.now()
};
this.tasks.set(taskId, task);
// 设置依赖关系
if (task.dependencies.length > 0) {
this.dependencies.set(taskId, new Set(task.dependencies));
}
console.log(`任务 ${taskId} 已添加`);
}
// 执行任务
async executeTask(taskId) {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`任务 ${taskId} 不存在`);
}
// 检查依赖是否满足
if (!this.checkDependencies(taskId)) {
throw new Error(`任务 ${taskId} 的依赖条件未满足`);
}
task.status = 'running';
task.startedAt = Date.now();
let result;
let error;
try {
// 设置超时
result = await Promise.race([
task.fn(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('任务超时')), task.timeout)
)
]);
task.status = 'completed';
task.completedAt = Date.now();
console.log(`任务 ${taskId} 执行成功`);
} catch (err) {
error = err;
task.status = 'failed';
task.failedAt = Date.now();
console.error(`任务 ${taskId} 执行失败:`, err.message);
// 重试机制
if (this.shouldRetry(task)) {
await this.retryTask(taskId);
}
}
// 记录执行历史
this.recordExecution(taskId, result, error);
// 触发依赖任务
if (task.status === 'completed') {
await this.triggerDependentTasks(taskId);
}
return result;
}
// 检查依赖
checkDependencies(taskId) {
const dependencies = this.dependencies.get(taskId);
if (!dependencies) return true;
for (const depId of dependencies) {
const depTask = this.tasks.get(depId);
if (!depTask || depTask.status !== 'completed') {
return false;
}
}
return true;
}
// 触发依赖任务
async triggerDependentTasks(completedTaskId) {
for (const [taskId, deps] of this.dependencies.entries()) {
if (deps.has(completedTaskId)) {
deps.delete(completedTaskId);
if (deps.size === 0) {
console.log(`任务 ${taskId} 的依赖已满足,开始执行`);
setImmediate(() => this.executeTask(taskId));
}
}
}
}
// 重试逻辑
shouldRetry(task) {
const retryCount = task.retryCount || 0;
return retryCount < task.retryPolicy.maxRetries;
}
async retryTask(taskId) {
const task = this.tasks.get(taskId);
task.retryCount = (task.retryCount || 0) + 1;
const delay = this.calculateRetryDelay(task);
console.log(`任务 ${taskId} 将在 ${delay}ms 后重试 (第${task.retryCount}次)`);
setTimeout(() => {
this.executeTask(taskId);
}, delay);
}
calculateRetryDelay(task) {
const { backoff } = task.retryPolicy;
const retryCount = task.retryCount;
switch (backoff) {
case 'exponential':
return Math.min(1000 * Math.pow(2, retryCount - 1), 30000);
case 'linear':
return 1000 * retryCount;
case 'fixed':
default:
return 1000;
}
}
// 记录执行历史
recordExecution(taskId, result, error) {
const execution = {
taskId,
timestamp: Date.now(),
success: !error,
result,
error: error ? error.message : null,
duration: Date.now() - this.tasks.get(taskId).startedAt
};
this.executionHistory.push(execution);
// 保留最近1000条记录
if (this.executionHistory.length > 1000) {
this.executionHistory.shift();
}
}
// 获取执行统计
getExecutionStats(taskId = null) {
const executions = taskId
? this.executionHistory.filter(e => e.taskId === taskId)
: this.executionHistory;
const total = executions.length;
const successful = executions.filter(e => e.success).length;
const failed = total - successful;
const avgDuration = total > 0
? executions.reduce((sum, e) => sum + e.duration, 0) / total
: 0;
return {
total,
successful,
failed,
successRate: total > 0 ? (successful / total * 100).toFixed(2) + '%' : '0%',
avgDuration: Math.round(avgDuration) + 'ms'
};
}
}
// 使用示例
const advancedScheduler = new AdvancedTaskScheduler();
// 添加任务
advancedScheduler.addTask('fetch-data', async () => {
console.log('获取数据...');
await new Promise(resolve => setTimeout(resolve, 2000));
return { data: 'fetched data' };
});
advancedScheduler.addTask('process-data', async () => {
console.log('处理数据...');
await new Promise(resolve => setTimeout(resolve, 1000));
return { processed: true };
}, {
dependencies: ['fetch-data'],
timeout: 5000
});
advancedScheduler.addTask('send-notification', async () => {
console.log('发送通知...');
return { sent: true };
}, {
dependencies: ['process-data']
});
// 开始执行
advancedScheduler.executeTask('fetch-data');
定时任务最佳实践:
How to design a highly available Node.js microservices architecture?
How to design a highly available Node.js microservices architecture?
考察点:微服务架构设计。
答案:
高可用的Node.js微服务架构需要从服务拆分、通信机制、数据一致性、监控告警、容错处理等多个维度进行设计,确保系统在面对故障时能够持续提供服务。
1. 微服务架构设计原则:
// 服务注册与发现
const consul = require('consul');
const express = require('express');
class ServiceRegistry {
constructor(serviceName, servicePort) {
this.serviceName = serviceName;
this.servicePort = servicePort;
this.serviceId = `${serviceName}-${process.pid}`;
this.consul = consul();
}
async register() {
const serviceConfig = {
id: this.serviceId,
name: this.serviceName,
port: this.servicePort,
address: process.env.SERVICE_HOST || 'localhost',
check: {
http: `http://localhost:${this.servicePort}/health`,
interval: '10s',
timeout: '5s',
deregistercriticalserviceafter: '30s'
},
tags: [
`version:${process.env.SERVICE_VERSION || '1.0.0'}`,
`environment:${process.env.NODE_ENV || 'development'}`
]
};
try {
await this.consul.agent.service.register(serviceConfig);
console.log(`服务 ${this.serviceName} 注册成功`);
// 优雅退出时注销服务
process.on('SIGTERM', () => this.deregister());
process.on('SIGINT', () => this.deregister());
} catch (error) {
console.error('服务注册失败:', error);
throw error;
}
}
async deregister() {
try {
await this.consul.agent.service.deregister(this.serviceId);
console.log(`服务 ${this.serviceName} 注销成功`);
} catch (error) {
console.error('服务注销失败:', error);
}
}
async discover(serviceName) {
try {
const services = await this.consul.health.service({
service: serviceName,
passing: true
});
return services[0].map(service => ({
id: service.Service.ID,
address: service.Service.Address,
port: service.Service.Port,
tags: service.Service.Tags
}));
} catch (error) {
console.error('服务发现失败:', error);
return [];
}
}
}
// 基础微服务框架
class MicroService {
constructor(name, port) {
this.name = name;
this.port = port;
this.app = express();
this.registry = new ServiceRegistry(name, port);
this.setupMiddleware();
this.setupRoutes();
}
setupMiddleware() {
// 请求ID跟踪
this.app.use((req, res, next) => {
req.requestId = req.headers['x-request-id'] ||
`${this.name}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
res.setHeader('x-request-id', req.requestId);
next();
});
// 请求日志
this.app.use((req, res, next) => {
const startTime = Date.now();
console.log(`[${req.requestId}] ${req.method} ${req.url} 开始处理`);
res.on('finish', () => {
const duration = Date.now() - startTime;
console.log(`[${req.requestId}] ${req.method} ${req.url} ${res.statusCode} ${duration}ms`);
});
next();
});
// 健康检查
this.app.get('/health', (req, res) => {
res.json({
status: 'healthy',
service: this.name,
version: process.env.SERVICE_VERSION || '1.0.0',
timestamp: new Date().toISOString(),
uptime: process.uptime()
});
});
// 就绪检查
this.app.get('/ready', async (req, res) => {
try {
await this.checkDependencies();
res.json({ status: 'ready' });
} catch (error) {
res.status(503).json({ status: 'not ready', error: error.message });
}
});
}
async checkDependencies() {
// 检查数据库连接、外部服务等
// 子类实现具体检查逻辑
return true;
}
setupRoutes() {
// 子类实现具体路由
}
async start() {
await this.registry.register();
this.server = this.app.listen(this.port, () => {
console.log(`微服务 ${this.name} 启动在端口 ${this.port}`);
});
return this.server;
}
async stop() {
await this.registry.deregister();
if (this.server) {
return new Promise((resolve) => {
this.server.close(resolve);
});
}
}
}
2. 服务通信与负载均衡:
// HTTP客户端与负载均衡
class ServiceClient {
constructor(serviceName, registry) {
this.serviceName = serviceName;
this.registry = registry;
this.instances = [];
this.currentIndex = 0;
this.refreshInstances();
// 定期刷新服务实例
setInterval(() => {
this.refreshInstances();
}, 30000);
}
async refreshInstances() {
this.instances = await this.registry.discover(this.serviceName);
}
// 轮询负载均衡
getNextInstance() {
if (this.instances.length === 0) {
throw new Error(`服务 ${this.serviceName} 无可用实例`);
}
const instance = this.instances[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.instances.length;
return instance;
}
// 带重试的HTTP请求
async request(method, path, data = null, options = {}) {
const maxRetries = options.maxRetries || 3;
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const instance = this.getNextInstance();
const url = `http://${instance.address}:${instance.port}${path}`;
const response = await axios({
method,
url,
data,
timeout: options.timeout || 5000,
headers: {
'x-request-id': options.requestId || `client-${Date.now()}`,
...options.headers
}
});
return response.data;
} catch (error) {
lastError = error;
console.error(`请求 ${this.serviceName} 失败 (尝试 ${attempt}/${maxRetries}):`, error.message);
if (attempt < maxRetries) {
// 指数退避
await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000));
}
}
}
throw lastError;
}
async get(path, options = {}) {
return this.request('GET', path, null, options);
}
async post(path, data, options = {}) {
return this.request('POST', path, data, options);
}
}
// 断路器模式
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.lastFailureTime = null;
this.successCount = 0;
}
async execute(operation) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.resetTimeout) {
this.state = 'HALF_OPEN';
this.successCount = 0;
} else {
throw new Error('断路器开启状态');
}
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) {
this.state = 'CLOSED';
this.failureCount = 0;
}
} else {
this.failureCount = Math.max(0, this.failureCount - 1);
}
}
onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
}
}
}
3. 分布式事务处理:
// Saga模式实现分布式事务
class SagaOrchestrator {
constructor() {
this.steps = [];
this.compensations = [];
this.currentStep = 0;
}
addStep(action, compensation) {
this.steps.push(action);
this.compensations.unshift(compensation); // 补偿操作需要反向执行
}
async execute() {
const sagaId = `saga-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
try {
console.log(`Saga事务 ${sagaId} 开始执行`);
for (let i = 0; i < this.steps.length; i++) {
this.currentStep = i;
console.log(`执行步骤 ${i + 1}/${this.steps.length}`);
await this.steps[i](sagaId);
}
console.log(`Saga事务 ${sagaId} 执行成功`);
return { success: true, sagaId };
} catch (error) {
console.error(`Saga事务 ${sagaId} 执行失败:`, error);
await this.rollback(sagaId);
throw error;
}
}
async rollback(sagaId) {
console.log(`开始回滚Saga事务 ${sagaId}`);
// 从当前步骤开始向前回滚
for (let i = this.currentStep; i >= 0; i--) {
try {
await this.compensations[i](sagaId);
console.log(`回滚步骤 ${i + 1} 成功`);
} catch (error) {
console.error(`回滚步骤 ${i + 1} 失败:`, error);
// 继续尝试回滚其他步骤
}
}
console.log(`Saga事务 ${sagaId} 回滚完成`);
}
}
// 使用示例:订单处理Saga
async function processOrderSaga(orderData) {
const saga = new SagaOrchestrator();
// 步骤1:创建订单
saga.addStep(
async (sagaId) => {
const order = await orderService.createOrder(orderData, sagaId);
return order;
},
async (sagaId) => {
await orderService.cancelOrder(sagaId);
}
);
// 步骤2:扣减库存
saga.addStep(
async (sagaId) => {
await inventoryService.reserveItems(orderData.items, sagaId);
},
async (sagaId) => {
await inventoryService.releaseItems(orderData.items, sagaId);
}
);
// 步骤3:处理支付
saga.addStep(
async (sagaId) => {
await paymentService.processPayment(orderData.payment, sagaId);
},
async (sagaId) => {
await paymentService.refundPayment(sagaId);
}
);
return await saga.execute();
}
4. 配置管理和服务网格:
// 配置中心客户端
class ConfigCenter {
constructor(consulClient) {
this.consul = consulClient;
this.cache = new Map();
this.watchers = new Map();
}
async getConfig(key, defaultValue = null) {
try {
// 先从缓存获取
if (this.cache.has(key)) {
return this.cache.get(key);
}
// 从Consul获取
const result = await this.consul.kv.get(key);
const value = result ? JSON.parse(result.Value) : defaultValue;
// 缓存配置
this.cache.set(key, value);
return value;
} catch (error) {
console.error(`获取配置 ${key} 失败:`, error);
return defaultValue;
}
}
async setConfig(key, value) {
try {
await this.consul.kv.set(key, JSON.stringify(value));
this.cache.set(key, value);
// 通知监听器
const watchers = this.watchers.get(key) || [];
watchers.forEach(callback => {
try {
callback(value, key);
} catch (error) {
console.error('配置变更回调失败:', error);
}
});
} catch (error) {
console.error(`设置配置 ${key} 失败:`, error);
throw error;
}
}
watchConfig(key, callback) {
if (!this.watchers.has(key)) {
this.watchers.set(key, []);
this.startWatching(key);
}
this.watchers.get(key).push(callback);
}
startWatching(key) {
const watch = this.consul.watch({
method: this.consul.kv.get,
options: { key }
});
watch.on('change', (data) => {
const value = data ? JSON.parse(data.Value) : null;
this.cache.set(key, value);
const watchers = this.watchers.get(key) || [];
watchers.forEach(callback => {
try {
callback(value, key);
} catch (error) {
console.error('配置变更回调失败:', error);
}
});
});
watch.on('error', (error) => {
console.error(`监听配置 ${key} 失败:`, error);
});
}
}
5. 容器化和编排:
# Dockerfile
FROM node:18-alpine
WORKDIR /app
# 复制package文件
COPY package*.json ./
# 安装依赖
RUN npm ci --only=production
# 复制源代码
COPY . .
# 创建非root用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nextjs -u 1001
# 更改文件所有者
RUN chown -R nextjs:nodejs /app
USER nextjs
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
EXPOSE 3000
CMD ["node", "index.js"]
# docker-compose.yml
version: '3.8'
services:
# 服务注册中心
consul:
image: consul:1.15
ports:
- "8500:8500"
command: >
consul agent -server -bootstrap -ui -client=0.0.0.0
-data-dir=/consul/data
volumes:
- consul_data:/consul/data
networks:
- microservices
# API网关
gateway:
build: ./gateway
ports:
- "80:3000"
environment:
- CONSUL_HOST=consul
- CONSUL_PORT=8500
depends_on:
- consul
networks:
- microservices
deploy:
replicas: 2
# 用户服务
user-service:
build: ./user-service
environment:
- CONSUL_HOST=consul
- CONSUL_PORT=8500
- DB_HOST=user-db
depends_on:
- consul
- user-db
networks:
- microservices
deploy:
replicas: 3
# 用户数据库
user-db:
image: postgres:15
environment:
- POSTGRES_DB=users
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- user_db_data:/var/lib/postgresql/data
networks:
- microservices
# 订单服务
order-service:
build: ./order-service
environment:
- CONSUL_HOST=consul
- CONSUL_PORT=8500
- DB_HOST=order-db
depends_on:
- consul
- order-db
networks:
- microservices
deploy:
replicas: 3
volumes:
consul_data:
user_db_data:
order_db_data:
networks:
microservices:
driver: bridge
6. 监控和可观测性:
// 分布式链路追踪
const opentracing = require('opentracing');
const { initTracer } = require('jaeger-client');
class TracingService {
constructor(serviceName) {
const config = {
serviceName,
sampler: {
type: 'const',
param: 1
},
reporter: {
logSpans: true,
agentHost: process.env.JAEGER_HOST || 'localhost',
agentPort: process.env.JAEGER_PORT || 6832
}
};
this.tracer = initTracer(config);
opentracing.initGlobalTracer(this.tracer);
}
createSpan(operationName, parentSpan = null) {
const span = this.tracer.startSpan(operationName, {
childOf: parentSpan
});
return span;
}
injectHeaders(span, headers = {}) {
this.tracer.inject(span, opentracing.FORMAT_HTTP_HEADERS, headers);
return headers;
}
extractSpan(headers) {
return this.tracer.extract(opentracing.FORMAT_HTTP_HEADERS, headers);
}
}
// 指标收集
const prometheus = require('prom-client');
class MetricsCollector {
constructor() {
// 创建默认指标
prometheus.collectDefaultMetrics();
// HTTP请求计数器
this.httpRequestsTotal = new prometheus.Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'route', 'status_code']
});
// HTTP请求持续时间
this.httpRequestDuration = new prometheus.Histogram({
name: 'http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['method', 'route', 'status_code'],
buckets: [0.001, 0.01, 0.1, 0.5, 1, 5, 10]
});
// 业务指标
this.businessMetrics = new prometheus.Gauge({
name: 'business_metric',
help: 'Business specific metrics',
labelNames: ['metric_name', 'service_name']
});
}
recordHttpRequest(method, route, statusCode, duration) {
this.httpRequestsTotal.inc({
method,
route,
status_code: statusCode
});
this.httpRequestDuration.observe({
method,
route,
status_code: statusCode
}, duration / 1000);
}
setBusinessMetric(name, value, serviceName) {
this.businessMetrics.set({
metric_name: name,
service_name: serviceName
}, value);
}
getMetrics() {
return prometheus.register.metrics();
}
}
微服务架构最佳实践:
How to implement monitoring and logging systems for Node.js applications?
How to implement monitoring and logging systems for Node.js applications?
考察点:监控体系设计。
答案:
完善的监控和日志系统是保障Node.js应用稳定运行的关键基础设施。需要从应用性能监控(APM)、日志收集分析、指标监控、告警通知等维度构建全面的可观测性体系。
1. 结构化日志系统:
// 高性能日志库 winston 配置
const winston = require('winston');
const { combine, timestamp, errors, json, colorize, printf } = winston.format;
class Logger {
constructor(serviceName) {
this.serviceName = serviceName;
// 自定义格式
const customFormat = printf(({ timestamp, level, message, service, requestId, userId, ...meta }) => {
return JSON.stringify({
timestamp,
level,
service: service || serviceName,
requestId,
userId,
message,
...meta
});
});
const transports = [
// 控制台输出
new winston.transports.Console({
level: process.env.LOG_LEVEL || 'info',
format: process.env.NODE_ENV === 'production'
? combine(timestamp(), errors({ stack: true }), json())
: combine(timestamp(), colorize(), customFormat)
}),
// 应用日志文件
new winston.transports.File({
filename: 'logs/app.log',
level: 'info',
maxsize: 50 * 1024 * 1024, // 50MB
maxFiles: 10,
format: combine(timestamp(), errors({ stack: true }), json())
}),
// 错误日志文件
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 50 * 1024 * 1024,
maxFiles: 5,
format: combine(timestamp(), errors({ stack: true }), json())
})
];
this.logger = winston.createLogger({
defaultMeta: { service: serviceName },
transports,
exitOnError: false
});
}
info(message, meta = {}) {
this.logger.info(message, meta);
}
error(message, error = null, meta = {}) {
this.logger.error(message, {
error: error ? {
message: error.message,
stack: error.stack,
name: error.name
} : null,
...meta
});
}
// 性能日志
performance(operation, duration, meta = {}) {
this.logger.info(`Performance: ${operation}`, {
operation,
duration,
type: 'performance',
...meta
});
}
// 业务日志
business(event, data = {}) {
this.logger.info(`Business: ${event}`, {
event,
type: 'business',
...data
});
}
}
// 请求日志中间件
function requestLogger(logger) {
return (req, res, next) => {
const startTime = Date.now();
const requestId = req.headers['x-request-id'] ||
`req-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
req.requestId = requestId;
req.logger = {
info: (message, meta = {}) => logger.info(message, { requestId, ...meta }),
error: (message, error, meta = {}) => logger.error(message, error, { requestId, ...meta })
};
// 记录请求开始
logger.info('Request started', {
requestId,
method: req.method,
url: req.url,
userAgent: req.get('User-Agent'),
ip: req.ip
});
res.on('finish', () => {
const duration = Date.now() - startTime;
logger.info('Request completed', {
requestId,
method: req.method,
url: req.url,
statusCode: res.statusCode,
duration
});
});
next();
};
}
2. 性能监控系统:
const prometheus = require('prom-client');
class PerformanceMonitor {
constructor(serviceName) {
this.serviceName = serviceName;
this.register = new prometheus.Register();
// 收集默认指标
prometheus.collectDefaultMetrics({
register: this.register,
prefix: `${serviceName}_`
});
// HTTP请求指标
this.httpRequestsTotal = new prometheus.Counter({
name: `${serviceName}_http_requests_total`,
help: 'Total HTTP requests',
labelNames: ['method', 'route', 'status_code'],
registers: [this.register]
});
this.httpRequestDuration = new prometheus.Histogram({
name: `${serviceName}_http_request_duration_seconds`,
help: 'HTTP request duration',
labelNames: ['method', 'route'],
buckets: [0.001, 0.01, 0.1, 0.5, 1, 2, 5],
registers: [this.register]
});
// 内存监控
this.memoryUsage = new prometheus.Gauge({
name: `${serviceName}_memory_usage_bytes`,
help: 'Memory usage',
labelNames: ['type'],
registers: [this.register]
});
this.startMemoryMonitoring();
}
startMemoryMonitoring() {
setInterval(() => {
const memUsage = process.memoryUsage();
this.memoryUsage.set({ type: 'heap_used' }, memUsage.heapUsed);
this.memoryUsage.set({ type: 'heap_total' }, memUsage.heapTotal);
this.memoryUsage.set({ type: 'rss' }, memUsage.rss);
}, 10000);
}
recordHttpRequest(method, route, statusCode, duration) {
this.httpRequestsTotal.inc({
method: method.toUpperCase(),
route,
status_code: statusCode.toString()
});
this.httpRequestDuration.observe({
method: method.toUpperCase(),
route
}, duration / 1000);
}
getMetrics() {
return this.register.metrics();
}
}
// 健康检查
function createHealthCheck(dependencies = []) {
return async (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
uptime: process.uptime(),
dependencies: {}
};
for (const dep of dependencies) {
try {
await dep.check();
health.dependencies[dep.name] = { status: 'healthy' };
} catch (error) {
health.dependencies[dep.name] = {
status: 'unhealthy',
error: error.message
};
health.status = 'degraded';
}
}
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
};
}
3. 告警系统:
class AlertManager {
constructor(logger) {
this.logger = logger;
this.rules = [];
this.webhooks = [];
}
addRule(name, condition, severity = 'warning') {
this.rules.push({
name,
condition,
severity,
isActive: false
});
}
addWebhook(url, headers = {}) {
this.webhooks.push({ url, headers });
}
checkAlerts(metrics) {
for (const rule of this.rules) {
const shouldTrigger = rule.condition(metrics);
if (shouldTrigger && !rule.isActive) {
this.triggerAlert(rule, metrics);
} else if (!shouldTrigger && rule.isActive) {
this.resolveAlert(rule);
}
}
}
async triggerAlert(rule, metrics) {
const alert = {
name: rule.name,
severity: rule.severity,
triggeredAt: new Date().toISOString(),
metrics
};
rule.isActive = true;
this.logger.error('Alert triggered', null, { alert: rule.name });
// 发送Webhook通知
for (const webhook of this.webhooks) {
try {
await fetch(webhook.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...webhook.headers },
body: JSON.stringify(alert)
});
} catch (error) {
this.logger.error('Webhook failed', error);
}
}
}
resolveAlert(rule) {
rule.isActive = false;
this.logger.info('Alert resolved', { alert: rule.name });
}
}
// 使用示例
const logger = new Logger('user-service');
const monitor = new PerformanceMonitor('user-service');
const alertManager = new AlertManager(logger);
// 添加告警规则
alertManager.addRule(
'high-memory-usage',
(metrics) => metrics.memory.heapUsed / metrics.memory.heapTotal > 0.9,
'critical'
);
alertManager.addRule(
'high-response-time',
(metrics) => metrics.avgResponseTime > 2000,
'warning'
);
const app = express();
app.use(requestLogger(logger));
app.get('/health', createHealthCheck([
{
name: 'database',
check: async () => {
// 检查数据库连接
await db.query('SELECT 1');
}
}
]));
app.get('/metrics', async (req, res) => {
res.set('Content-Type', prometheus.register.contentType);
res.end(await monitor.getMetrics());
});
监控系统最佳实践:
How to implement disaster recovery and fault tolerance for Node.js applications?
How to implement disaster recovery and fault tolerance for Node.js applications?
考察点:容灾机制设计。
答案:
容灾和故障恢复是确保Node.js应用高可用性的核心要素。需要从数据备份、服务冗余、故障检测、自动恢复等多个层面建立完整的容灾体系。
1. 数据备份策略:
// 数据库备份管理器
const fs = require('fs').promises;
const path = require('path');
const { spawn } = require('child_process');
const cron = require('node-cron');
class BackupManager {
constructor(config) {
this.config = {
databases: config.databases || [],
backupPath: config.backupPath || './backups',
retention: config.retention || 30, // 保留30天
schedule: config.schedule || '0 2 * * *', // 每天凌晨2点
remoteStorage: config.remoteStorage || null
};
this.logger = config.logger;
this.setupScheduledBackups();
}
setupScheduledBackups() {
cron.schedule(this.config.schedule, async () => {
try {
await this.performFullBackup();
} catch (error) {
this.logger.error('Scheduled backup failed', error);
}
});
}
async performFullBackup() {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const backupDir = path.join(this.config.backupPath, timestamp);
await fs.mkdir(backupDir, { recursive: true });
const backupTasks = [];
for (const db of this.config.databases) {
switch (db.type) {
case 'mysql':
backupTasks.push(this.backupMySQL(db, backupDir));
break;
case 'mongodb':
backupTasks.push(this.backupMongoDB(db, backupDir));
break;
case 'redis':
backupTasks.push(this.backupRedis(db, backupDir));
break;
}
}
// 并行执行备份
const results = await Promise.allSettled(backupTasks);
// 检查备份结果
const failures = results.filter(result => result.status === 'rejected');
if (failures.length > 0) {
throw new Error(`${failures.length} backup(s) failed`);
}
// 压缩备份文件
await this.compressBackup(backupDir);
// 上传到远程存储
if (this.config.remoteStorage) {
await this.uploadToRemote(backupDir);
}
// 清理旧备份
await this.cleanupOldBackups();
this.logger.info('Full backup completed', { backupDir, timestamp });
return { backupDir, timestamp };
}
async backupMySQL(db, backupDir) {
const filename = `${db.database}_${Date.now()}.sql`;
const filepath = path.join(backupDir, filename);
return new Promise((resolve, reject) => {
const mysqldump = spawn('mysqldump', [
`-h${db.host}`,
`-P${db.port || 3306}`,
`-u${db.username}`,
`-p${db.password}`,
'--single-transaction',
'--routines',
'--triggers',
db.database
]);
const writeStream = require('fs').createWriteStream(filepath);
mysqldump.stdout.pipe(writeStream);
mysqldump.on('error', reject);
writeStream.on('error', reject);
writeStream.on('close', () => resolve(filepath));
});
}
async backupMongoDB(db, backupDir) {
const filename = `${db.database}_${Date.now()}`;
const filepath = path.join(backupDir, filename);
return new Promise((resolve, reject) => {
const mongodump = spawn('mongodump', [
'--host', `${db.host}:${db.port || 27017}`,
'--db', db.database,
'--out', filepath
]);
if (db.username) {
mongodump.args.push('--username', db.username, '--password', db.password);
}
mongodump.on('error', reject);
mongodump.on('close', (code) => {
if (code === 0) {
resolve(filepath);
} else {
reject(new Error(`mongodump exited with code ${code}`));
}
});
});
}
async backupRedis(db, backupDir) {
const Redis = require('ioredis');
const redis = new Redis(db);
try {
// 触发RDB快照
await redis.bgsave();
// 等待备份完成
let saving = true;
while (saving) {
const info = await redis.info('persistence');
saving = info.includes('rdb_bgsave_in_progress:1');
if (saving) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
// 复制RDB文件
const rdbPath = await redis.config('get', 'dir');
const rdbFile = await redis.config('get', 'dbfilename');
const sourcePath = path.join(rdbPath[1], rdbFile[1]);
const targetPath = path.join(backupDir, `redis_${Date.now()}.rdb`);
await fs.copyFile(sourcePath, targetPath);
await redis.disconnect();
return targetPath;
} catch (error) {
await redis.disconnect();
throw error;
}
}
async compressBackup(backupDir) {
const tar = require('tar');
const archivePath = `${backupDir}.tar.gz`;
await tar.create(
{
gzip: true,
file: archivePath,
cwd: path.dirname(backupDir)
},
[path.basename(backupDir)]
);
// 删除原始目录
await fs.rmdir(backupDir, { recursive: true });
return archivePath;
}
async uploadToRemote(backupPath) {
// 示例:上传到S3
if (this.config.remoteStorage.type === 's3') {
const AWS = require('aws-sdk');
const s3 = new AWS.S3(this.config.remoteStorage.config);
const fileStream = require('fs').createReadStream(`${backupPath}.tar.gz`);
const uploadParams = {
Bucket: this.config.remoteStorage.bucket,
Key: `backups/${path.basename(backupPath)}.tar.gz`,
Body: fileStream
};
await s3.upload(uploadParams).promise();
}
}
async cleanupOldBackups() {
const files = await fs.readdir(this.config.backupPath);
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - this.config.retention);
for (const file of files) {
const filePath = path.join(this.config.backupPath, file);
const stat = await fs.stat(filePath);
if (stat.mtime < cutoffDate) {
await fs.unlink(filePath);
this.logger.info('Old backup deleted', { file });
}
}
}
// 恢复备份
async restoreBackup(backupFile, targetDatabase) {
const tempDir = path.join(this.config.backupPath, 'temp_restore');
try {
// 解压备份文件
await tar.extract({
file: backupFile,
cwd: tempDir
});
// 根据数据库类型执行恢复
const dbConfig = this.config.databases.find(db => db.database === targetDatabase);
if (!dbConfig) {
throw new Error(`Database config not found: ${targetDatabase}`);
}
switch (dbConfig.type) {
case 'mysql':
await this.restoreMySQL(dbConfig, tempDir);
break;
case 'mongodb':
await this.restoreMongoDB(dbConfig, tempDir);
break;
case 'redis':
await this.restoreRedis(dbConfig, tempDir);
break;
}
this.logger.info('Backup restored successfully', {
backupFile,
targetDatabase
});
} finally {
// 清理临时文件
await fs.rmdir(tempDir, { recursive: true });
}
}
}
2. 故障检测和自动恢复:
// 健康检查和故障转移
class HealthChecker {
constructor(services, config = {}) {
this.services = services;
this.config = {
checkInterval: config.checkInterval || 30000, // 30秒
failureThreshold: config.failureThreshold || 3,
recoveryThreshold: config.recoveryThreshold || 2,
timeout: config.timeout || 5000
};
this.serviceStates = new Map();
this.initialize();
}
initialize() {
for (const service of this.services) {
this.serviceStates.set(service.name, {
isHealthy: true,
consecutiveFailures: 0,
consecutiveSuccesses: 0,
lastCheck: null,
lastError: null
});
}
this.startHealthChecks();
}
startHealthChecks() {
setInterval(async () => {
await this.checkAllServices();
}, this.config.checkInterval);
}
async checkAllServices() {
const checks = this.services.map(service => this.checkService(service));
await Promise.allSettled(checks);
}
async checkService(service) {
const state = this.serviceStates.get(service.name);
const startTime = Date.now();
try {
await Promise.race([
service.healthCheck(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Health check timeout')), this.config.timeout)
)
]);
// 健康检查成功
state.consecutiveFailures = 0;
state.consecutiveSuccesses++;
state.lastCheck = new Date();
// 如果之前不健康,现在连续成功达到阈值,标记为健康
if (!state.isHealthy && state.consecutiveSuccesses >= this.config.recoveryThreshold) {
state.isHealthy = true;
await this.onServiceRecovered(service, state);
}
} catch (error) {
// 健康检查失败
state.consecutiveSuccesses = 0;
state.consecutiveFailures++;
state.lastError = error;
state.lastCheck = new Date();
// 如果之前健康,现在连续失败达到阈值,标记为不健康
if (state.isHealthy && state.consecutiveFailures >= this.config.failureThreshold) {
state.isHealthy = false;
await this.onServiceFailed(service, state);
}
}
}
async onServiceFailed(service, state) {
console.error(`Service ${service.name} is now unhealthy`, {
consecutiveFailures: state.consecutiveFailures,
lastError: state.lastError.message
});
// 触发故障转移
if (service.failover) {
try {
await service.failover();
console.info(`Failover initiated for service ${service.name}`);
} catch (error) {
console.error(`Failover failed for service ${service.name}`, error);
}
}
// 发送告警
if (service.onFailure) {
await service.onFailure(state);
}
}
async onServiceRecovered(service, state) {
console.info(`Service ${service.name} has recovered`, {
consecutiveSuccesses: state.consecutiveSuccesses
});
// 执行恢复操作
if (service.onRecovery) {
await service.onRecovery(state);
}
}
getServiceStatus() {
const status = {};
for (const [name, state] of this.serviceStates) {
status[name] = {
isHealthy: state.isHealthy,
lastCheck: state.lastCheck,
consecutiveFailures: state.consecutiveFailures,
lastError: state.lastError ? state.lastError.message : null
};
}
return status;
}
}
// 断路器模式
class CircuitBreaker {
constructor(operation, options = {}) {
this.operation = operation;
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.lastFailureTime = null;
this.successCount = 0;
this.nextAttempt = Date.now();
}
async execute(...args) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
} else {
this.state = 'HALF_OPEN';
this.successCount = 0;
}
}
try {
const result = await this.operation(...args);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) {
this.reset();
}
} else {
this.failureCount = 0;
}
}
onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.trip();
}
}
reset() {
this.state = 'CLOSED';
this.failureCount = 0;
this.successCount = 0;
}
trip() {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.resetTimeout;
}
getState() {
return {
state: this.state,
failureCount: this.failureCount,
nextAttempt: this.nextAttempt
};
}
}
3. 集群管理和负载均衡:
// PM2集群管理
const pm2 = require('pm2');
class ClusterManager {
constructor(appConfig) {
this.appConfig = {
name: appConfig.name || 'app',
script: appConfig.script || 'app.js',
instances: appConfig.instances || 'max',
maxMemoryRestart: appConfig.maxMemoryRestart || '1G',
...appConfig
};
}
async start() {
return new Promise((resolve, reject) => {
pm2.connect((err) => {
if (err) return reject(err);
pm2.start(this.appConfig, (err, apps) => {
if (err) return reject(err);
resolve(apps);
});
});
});
}
async restart() {
return new Promise((resolve, reject) => {
pm2.restart(this.appConfig.name, (err) => {
if (err) return reject(err);
resolve();
});
});
}
async gracefulReload() {
return new Promise((resolve, reject) => {
pm2.reload(this.appConfig.name, (err) => {
if (err) return reject(err);
resolve();
});
});
}
async scale(instances) {
return new Promise((resolve, reject) => {
pm2.scale(this.appConfig.name, instances, (err) => {
if (err) return reject(err);
resolve();
});
});
}
async getProcesses() {
return new Promise((resolve, reject) => {
pm2.describe(this.appConfig.name, (err, processDescription) => {
if (err) return reject(err);
resolve(processDescription);
});
});
}
// 自动扩缩容
async autoScale() {
const processes = await this.getProcesses();
const cpuUsage = await this.getCPUUsage();
const memoryUsage = await this.getMemoryUsage();
if (cpuUsage > 80 || memoryUsage > 80) {
// 扩容
const currentInstances = processes.length;
const maxInstances = require('os').cpus().length * 2;
if (currentInstances < maxInstances) {
await this.scale(currentInstances + 1);
console.log(`Scaled up to ${currentInstances + 1} instances`);
}
} else if (cpuUsage < 20 && memoryUsage < 20) {
// 缩容
const currentInstances = processes.length;
const minInstances = 2;
if (currentInstances > minInstances) {
await this.scale(currentInstances - 1);
console.log(`Scaled down to ${currentInstances - 1} instances`);
}
}
}
}
// 使用示例
const services = [
{
name: 'database',
healthCheck: async () => {
// 数据库连接检查
await db.query('SELECT 1');
},
failover: async () => {
// 切换到备库
await switchToBackupDatabase();
},
onFailure: async (state) => {
// 发送告警
await sendAlert('Database failure detected');
}
},
{
name: 'redis',
healthCheck: async () => {
await redis.ping();
},
failover: async () => {
// 切换Redis实例
await switchToBackupRedis();
}
}
];
const healthChecker = new HealthChecker(services);
const clusterManager = new ClusterManager({
name: 'my-app',
script: 'app.js',
instances: 4
});
// 启动容灾系统
async function startDisasterRecovery() {
await clusterManager.start();
// 定期自动扩缩容
setInterval(async () => {
await clusterManager.autoScale();
}, 60000); // 每分钟检查一次
}
容灾系统最佳实践:
How to implement distributed locks and distributed transactions in Node.js?
How to implement distributed locks and distributed transactions in Node.js?
考察点:分布式系统设计。
答案:
分布式锁和分布式事务是构建高并发分布式系统的核心技术。在Node.js中可以通过Redis、数据库、ZooKeeper等方式实现分布式协调。
1. 基于Redis的分布式锁:
const Redis = require('ioredis');
class DistributedLock {
constructor(redis, options = {}) {
this.redis = redis;
this.options = {
keyPrefix: options.keyPrefix || 'lock:',
defaultTTL: options.defaultTTL || 30000, // 30秒
retryInterval: options.retryInterval || 100,
maxRetries: options.maxRetries || 100
};
// Lua脚本确保原子性
this.unlockScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
}
async acquire(key, ttl = this.options.defaultTTL, retries = this.options.maxRetries) {
const lockKey = this.options.keyPrefix + key;
const lockValue = this.generateLockValue();
for (let i = 0; i <= retries; i++) {
try {
// 尝试获取锁
const result = await this.redis.set(
lockKey,
lockValue,
'PX', // 毫秒
ttl,
'NX' // 只在键不存在时设置
);
if (result === 'OK') {
return new Lock(this, lockKey, lockValue, ttl);
}
// 如果是最后一次重试,直接失败
if (i === retries) {
throw new Error(`Failed to acquire lock: ${key}`);
}
// 等待一段时间后重试
await this.sleep(this.options.retryInterval);
} catch (error) {
if (i === retries) {
throw error;
}
await this.sleep(this.options.retryInterval);
}
}
}
async release(lockKey, lockValue) {
try {
const result = await this.redis.eval(
this.unlockScript,
1,
lockKey,
lockValue
);
return result === 1;
} catch (error) {
console.error('Failed to release lock:', error);
return false;
}
}
generateLockValue() {
return `${process.pid}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
class Lock {
constructor(lockManager, key, value, ttl) {
this.lockManager = lockManager;
this.key = key;
this.value = value;
this.ttl = ttl;
this.isReleased = false;
// 自动续期
this.renewalTimer = setInterval(async () => {
await this.renew();
}, Math.floor(ttl * 0.3)); // 在30%的时间后续期
}
async renew() {
if (this.isReleased) return;
try {
const renewScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
`;
const result = await this.lockManager.redis.eval(
renewScript,
1,
this.key,
this.value,
this.ttl
);
if (result === 0) {
console.warn('Lock renewal failed - lock may have been released');
this.isReleased = true;
clearInterval(this.renewalTimer);
}
} catch (error) {
console.error('Lock renewal error:', error);
}
}
async release() {
if (this.isReleased) return false;
clearInterval(this.renewalTimer);
this.isReleased = true;
return await this.lockManager.release(this.key, this.value);
}
}
// 使用示例
const redis = new Redis();
const lockManager = new DistributedLock(redis);
async function criticalSection(userId) {
const lock = await lockManager.acquire(`user:${userId}`, 10000); // 10秒锁
try {
// 执行需要同步的操作
console.log(`Processing user ${userId}`);
// 模拟业务操作
await processUserData(userId);
} finally {
await lock.release();
}
}
2. 分布式事务 - Saga模式:
// Saga事务管理器
class SagaTransaction {
constructor(logger) {
this.steps = [];
this.compensations = [];
this.currentStep = 0;
this.logger = logger;
this.status = 'PENDING';
this.sagaId = `saga_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
addStep(name, action, compensation) {
this.steps.push({
name,
action,
executed: false,
result: null
});
this.compensations.unshift({
name: `compensate_${name}`,
action: compensation,
executed: false
});
}
async execute() {
this.status = 'EXECUTING';
this.logger.info('Saga transaction started', { sagaId: this.sagaId });
try {
// 依次执行所有步骤
for (let i = 0; i < this.steps.length; i++) {
this.currentStep = i;
const step = this.steps[i];
this.logger.info(`Executing step: ${step.name}`, {
sagaId: this.sagaId,
step: i + 1,
total: this.steps.length
});
try {
step.result = await step.action(this.sagaId);
step.executed = true;
this.logger.info(`Step completed: ${step.name}`, {
sagaId: this.sagaId,
result: step.result
});
} catch (error) {
this.logger.error(`Step failed: ${step.name}`, error, {
sagaId: this.sagaId
});
// 执行补偿操作
await this.compensate();
throw error;
}
}
this.status = 'COMPLETED';
this.logger.info('Saga transaction completed', { sagaId: this.sagaId });
return {
success: true,
sagaId: this.sagaId,
results: this.steps.map(step => step.result)
};
} catch (error) {
this.status = 'FAILED';
this.logger.error('Saga transaction failed', error, { sagaId: this.sagaId });
throw {
success: false,
sagaId: this.sagaId,
error: error.message,
failedStep: this.currentStep
};
}
}
async compensate() {
this.status = 'COMPENSATING';
this.logger.info('Starting saga compensation', { sagaId: this.sagaId });
// 从当前执行的步骤开始,逆序补偿
const compensationsToRun = this.compensations.slice(
this.compensations.length - this.currentStep - 1
);
for (let i = 0; i < compensationsToRun.length; i++) {
const compensation = compensationsToRun[i];
try {
this.logger.info(`Executing compensation: ${compensation.name}`, {
sagaId: this.sagaId
});
await compensation.action(this.sagaId);
compensation.executed = true;
this.logger.info(`Compensation completed: ${compensation.name}`, {
sagaId: this.sagaId
});
} catch (error) {
this.logger.error(`Compensation failed: ${compensation.name}`, error, {
sagaId: this.sagaId
});
// 补偿失败需要人工干预
await this.notifyManualIntervention(compensation, error);
}
}
this.status = 'COMPENSATED';
this.logger.info('Saga compensation completed', { sagaId: this.sagaId });
}
async notifyManualIntervention(compensation, error) {
// 发送告警通知运维人员
this.logger.error('Manual intervention required', error, {
sagaId: this.sagaId,
compensation: compensation.name,
requiresManualFix: true
});
}
getStatus() {
return {
sagaId: this.sagaId,
status: this.status,
currentStep: this.currentStep,
totalSteps: this.steps.length,
executedSteps: this.steps.filter(s => s.executed).length,
executedCompensations: this.compensations.filter(c => c.executed).length
};
}
}
// 订单处理Saga示例
async function createOrderSaga(orderData, logger) {
const saga = new SagaTransaction(logger);
// 步骤1:创建订单
saga.addStep(
'create_order',
async (sagaId) => {
const order = await orderService.createOrder({
...orderData,
sagaId,
status: 'PENDING'
});
return { orderId: order.id };
},
async (sagaId) => {
await orderService.cancelOrder(sagaId);
}
);
// 步骤2:扣减库存
saga.addStep(
'reserve_inventory',
async (sagaId) => {
const reservations = [];
for (const item of orderData.items) {
const reservation = await inventoryService.reserveItem(
item.productId,
item.quantity,
sagaId
);
reservations.push(reservation);
}
return { reservations };
},
async (sagaId) => {
await inventoryService.releaseReservation(sagaId);
}
);
// 步骤3:处理支付
saga.addStep(
'process_payment',
async (sagaId) => {
const payment = await paymentService.charge({
amount: orderData.totalAmount,
currency: orderData.currency,
paymentMethod: orderData.paymentMethod,
sagaId
});
return { paymentId: payment.id };
},
async (sagaId) => {
await paymentService.refund(sagaId);
}
);
// 步骤4:发送确认
saga.addStep(
'send_confirmation',
async (sagaId) => {
await notificationService.sendOrderConfirmation({
userId: orderData.userId,
sagaId
});
return { confirmationSent: true };
},
async (sagaId) => {
// 发送取消通知
await notificationService.sendOrderCancellation({
userId: orderData.userId,
sagaId
});
}
);
return await saga.execute();
}
3. 基于数据库的分布式锁:
// MySQL实现的分布式锁
class DatabaseLock {
constructor(mysql, tableName = 'distributed_locks') {
this.mysql = mysql;
this.tableName = tableName;
this.initializeTable();
}
async initializeTable() {
const createTableSQL = `
CREATE TABLE IF NOT EXISTS ${this.tableName} (
lock_key VARCHAR(255) PRIMARY KEY,
lock_value VARCHAR(255) NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_expires_at (expires_at)
) ENGINE=InnoDB
`;
await this.mysql.execute(createTableSQL);
// 定期清理过期锁
setInterval(async () => {
await this.cleanupExpiredLocks();
}, 60000); // 每分钟清理一次
}
async acquire(key, ttl = 30000, maxRetries = 50) {
const lockValue = this.generateLockValue();
const expiresAt = new Date(Date.now() + ttl);
for (let i = 0; i <= maxRetries; i++) {
try {
// 尝试插入锁记录
await this.mysql.execute(
`INSERT INTO ${this.tableName} (lock_key, lock_value, expires_at) VALUES (?, ?, ?)`,
[key, lockValue, expiresAt]
);
return new DatabaseLockHandle(this, key, lockValue, ttl);
} catch (error) {
if (error.code === 'ER_DUP_ENTRY') {
// 键已存在,检查是否过期
await this.cleanupExpiredLock(key);
if (i < maxRetries) {
await this.sleep(100); // 等待100ms后重试
continue;
}
}
throw new Error(`Failed to acquire lock: ${key}, error: ${error.message}`);
}
}
throw new Error(`Failed to acquire lock after ${maxRetries} retries: ${key}`);
}
async release(key, lockValue) {
try {
const result = await this.mysql.execute(
`DELETE FROM ${this.tableName} WHERE lock_key = ? AND lock_value = ?`,
[key, lockValue]
);
return result.affectedRows > 0;
} catch (error) {
console.error('Failed to release lock:', error);
return false;
}
}
async cleanupExpiredLocks() {
try {
const result = await this.mysql.execute(
`DELETE FROM ${this.tableName} WHERE expires_at < NOW()`
);
if (result.affectedRows > 0) {
console.log(`Cleaned up ${result.affectedRows} expired locks`);
}
} catch (error) {
console.error('Failed to cleanup expired locks:', error);
}
}
async cleanupExpiredLock(key) {
try {
await this.mysql.execute(
`DELETE FROM ${this.tableName} WHERE lock_key = ? AND expires_at < NOW()`,
[key]
);
} catch (error) {
// 忽略错误,可能是其他进程已经清理了
}
}
generateLockValue() {
return `${process.pid}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
class DatabaseLockHandle {
constructor(lockManager, key, value, ttl) {
this.lockManager = lockManager;
this.key = key;
this.value = value;
this.ttl = ttl;
this.isReleased = false;
// 启动续期定时器
this.renewTimer = setInterval(async () => {
await this.renew();
}, Math.floor(ttl * 0.3));
}
async renew() {
if (this.isReleased) return;
try {
const newExpiresAt = new Date(Date.now() + this.ttl);
const result = await this.lockManager.mysql.execute(
`UPDATE ${this.lockManager.tableName}
SET expires_at = ?
WHERE lock_key = ? AND lock_value = ?`,
[newExpiresAt, this.key, this.value]
);
if (result.affectedRows === 0) {
console.warn('Lock renewal failed - lock may have been released');
this.isReleased = true;
clearInterval(this.renewTimer);
}
} catch (error) {
console.error('Lock renewal error:', error);
}
}
async release() {
if (this.isReleased) return false;
clearInterval(this.renewTimer);
this.isReleased = true;
return await this.lockManager.release(this.key, this.value);
}
}
分布式锁和事务最佳实践:
How to optimize startup time and memory usage of Node.js applications?
How to optimize startup time and memory usage of Node.js applications?
考察点:深度性能优化。
答案:
Node.js应用的启动时间和内存优化需要从模块加载、代码结构、运行时配置等多个维度进行系统性优化,提升应用性能和用户体验。
1. 启动时间优化:
// 延迟加载模块
class LazyLoader {
constructor() {
this.modules = new Map();
this.loadPromises = new Map();
}
// 延迟加载模块
lazy(modulePath) {
return new Proxy({}, {
get: (target, prop) => {
if (!this.modules.has(modulePath)) {
// 同步加载模块
const module = require(modulePath);
this.modules.set(modulePath, module);
}
const module = this.modules.get(modulePath);
return typeof module[prop] === 'function'
? module[prop].bind(module)
: module[prop];
}
});
}
// 异步预加载
async preload(modulePath) {
if (this.loadPromises.has(modulePath)) {
return this.loadPromises.get(modulePath);
}
const promise = import(modulePath).then(module => {
this.modules.set(modulePath, module);
return module;
});
this.loadPromises.set(modulePath, promise);
return promise;
}
// 批量预加载
async preloadBatch(modulePaths) {
const promises = modulePaths.map(path => this.preload(path));
return Promise.all(promises);
}
}
// 应用启动优化器
class StartupOptimizer {
constructor(app) {
this.app = app;
this.loader = new LazyLoader();
this.startTime = Date.now();
this.phases = [];
}
// 分阶段初始化
async initialize() {
console.log('Application starting...');
// 阶段1:核心模块
await this.phase('Core Modules', async () => {
await this.initializeCore();
});
// 阶段2:数据库连接
await this.phase('Database Connection', async () => {
await this.connectDatabase();
});
// 阶段3:缓存初始化
await this.phase('Cache Initialization', async () => {
await this.initializeCache();
});
// 阶段4:启动HTTP服务器
await this.phase('HTTP Server', async () => {
await this.startServer();
});
// 阶段5:后台任务(异步)
this.initializeBackgroundTasks();
const totalTime = Date.now() - this.startTime;
console.log(`Application started in ${totalTime}ms`);
return this.phases;
}
async phase(name, fn) {
const startTime = Date.now();
try {
await fn();
const duration = Date.now() - startTime;
this.phases.push({
name,
duration,
success: true
});
console.log(`✓ ${name}: ${duration}ms`);
} catch (error) {
const duration = Date.now() - startTime;
this.phases.push({
name,
duration,
success: false,
error: error.message
});
console.error(`✗ ${name}: ${duration}ms - ${error.message}`);
throw error;
}
}
async initializeCore() {
// 预加载核心模块
await this.loader.preloadBatch([
'express',
'cors',
'helmet',
'./middlewares/auth'
]);
// 设置基础中间件
const express = await this.loader.preload('express');
const cors = await this.loader.preload('cors');
const helmet = await this.loader.preload('helmet');
this.app.use(helmet.default());
this.app.use(cors.default());
this.app.use(express.default.json({ limit: '10mb' }));
}
async connectDatabase() {
// 并行连接多个数据源
const connections = await Promise.all([
this.connectMySQL(),
this.connectRedis(),
this.connectMongoDB()
]);
return connections;
}
async connectMySQL() {
const mysql = this.loader.lazy('mysql2/promise');
const connection = await mysql.createConnection({
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
acquireTimeout: 5000,
timeout: 5000
});
return connection;
}
// 后台任务异步初始化
initializeBackgroundTasks() {
// 不阻塞启动的后台任务
setImmediate(async () => {
try {
await this.phase('Background Tasks', async () => {
await this.loader.preloadBatch([
'./services/scheduler',
'./services/cleanup',
'./services/metrics'
]);
// 启动定时任务
const scheduler = this.loader.lazy('./services/scheduler');
await scheduler.start();
// 启动清理任务
const cleanup = this.loader.lazy('./services/cleanup');
await cleanup.start();
});
} catch (error) {
console.error('Background tasks initialization failed:', error);
}
});
}
}
// 代码分割和路由懒加载
class RouteLoader {
constructor(app) {
this.app = app;
this.routes = new Map();
}
// 懒加载路由
lazyRoute(path, routeModulePath) {
this.app.use(path, async (req, res, next) => {
try {
if (!this.routes.has(routeModulePath)) {
const routeModule = await import(routeModulePath);
this.routes.set(routeModulePath, routeModule.default || routeModule);
}
const router = this.routes.get(routeModulePath);
return router(req, res, next);
} catch (error) {
next(error);
}
});
}
// 批量注册懒加载路由
registerLazyRoutes(routeMap) {
for (const [path, modulePath] of Object.entries(routeMap)) {
this.lazyRoute(path, modulePath);
}
}
}
// 使用示例
const express = require('express');
const app = express();
const optimizer = new StartupOptimizer(app);
const routeLoader = new RouteLoader(app);
// 注册懒加载路由
routeLoader.registerLazyRoutes({
'/api/users': './routes/users',
'/api/orders': './routes/orders',
'/api/products': './routes/products'
});
// 启动应用
async function startApp() {
try {
await optimizer.initialize();
const server = app.listen(3000, () => {
console.log('Server listening on port 3000');
});
return server;
} catch (error) {
console.error('Failed to start application:', error);
process.exit(1);
}
}
2. 内存优化:
// 内存监控和优化
class MemoryOptimizer {
constructor(options = {}) {
this.options = {
maxHeapUsage: options.maxHeapUsage || 0.8, // 80%
gcThreshold: options.gcThreshold || 0.7, // 70%
monitorInterval: options.monitorInterval || 30000, // 30秒
...options
};
this.metrics = {
heapUsed: [],
heapTotal: [],
external: [],
rss: []
};
this.startMonitoring();
}
startMonitoring() {
setInterval(() => {
this.collectMetrics();
this.checkMemoryUsage();
}, this.options.monitorInterval);
}
collectMetrics() {
const usage = process.memoryUsage();
const timestamp = Date.now();
// 记录内存指标
this.recordMetric('heapUsed', usage.heapUsed, timestamp);
this.recordMetric('heapTotal', usage.heapTotal, timestamp);
this.recordMetric('external', usage.external, timestamp);
this.recordMetric('rss', usage.rss, timestamp);
// 保留最近1小时的数据
const cutoff = timestamp - (60 * 60 * 1000);
for (const key in this.metrics) {
this.metrics[key] = this.metrics[key].filter(m => m.timestamp > cutoff);
}
}
recordMetric(type, value, timestamp) {
this.metrics[type].push({ value, timestamp });
}
checkMemoryUsage() {
const usage = process.memoryUsage();
const heapUsageRatio = usage.heapUsed / usage.heapTotal;
if (heapUsageRatio > this.options.maxHeapUsage) {
console.warn('High heap usage detected', {
heapUsed: this.formatBytes(usage.heapUsed),
heapTotal: this.formatBytes(usage.heapTotal),
ratio: (heapUsageRatio * 100).toFixed(2) + '%'
});
// 触发内存清理
this.performMemoryCleanup();
}
if (heapUsageRatio > this.options.gcThreshold) {
// 手动触发垃圾回收
if (global.gc) {
global.gc();
console.log('Manual GC triggered');
}
}
}
performMemoryCleanup() {
// 清理应用级缓存
this.clearCaches();
// 清理事件监听器
this.cleanupEventListeners();
// 清理定时器
this.cleanupTimers();
}
clearCaches() {
// 清理require缓存中的非核心模块
const coreModules = new Set([
'express', 'http', 'fs', 'path', 'crypto'
]);
for (const id in require.cache) {
const module = require.cache[id];
if (module.filename && !coreModules.has(path.basename(module.filename, '.js'))) {
// 检查模块是否可以安全删除
if (this.canSafelyDelete(module)) {
delete require.cache[id];
}
}
}
}
canSafelyDelete(module) {
// 检查模块是否被其他模块引用
const children = module.children || [];
return children.length === 0;
}
formatBytes(bytes) {
const sizes = ['B', 'KB', 'MB', 'GB'];
if (bytes === 0) return '0 B';
const i = Math.floor(Math.log(bytes) / Math.log(1024));
return (bytes / Math.pow(1024, i)).toFixed(2) + ' ' + sizes[i];
}
getMemoryReport() {
const usage = process.memoryUsage();
return {
current: {
heapUsed: this.formatBytes(usage.heapUsed),
heapTotal: this.formatBytes(usage.heapTotal),
external: this.formatBytes(usage.external),
rss: this.formatBytes(usage.rss)
},
ratios: {
heapUsage: ((usage.heapUsed / usage.heapTotal) * 100).toFixed(2) + '%',
heapOfTotal: ((usage.heapTotal / usage.rss) * 100).toFixed(2) + '%'
},
trends: this.calculateTrends()
};
}
calculateTrends() {
const trends = {};
for (const [type, metrics] of Object.entries(this.metrics)) {
if (metrics.length < 2) continue;
const latest = metrics[metrics.length - 1].value;
const previous = metrics[metrics.length - 2].value;
const change = latest - previous;
trends[type] = {
change: this.formatBytes(Math.abs(change)),
direction: change > 0 ? 'increase' : 'decrease',
percentage: ((change / previous) * 100).toFixed(2) + '%'
};
}
return trends;
}
}
// 对象池模式减少GC压力
class ObjectPool {
constructor(factory, reset, initialSize = 10) {
this.factory = factory;
this.reset = reset;
this.pool = [];
// 预分配对象
for (let i = 0; i < initialSize; i++) {
this.pool.push(this.factory());
}
}
acquire() {
if (this.pool.length > 0) {
return this.pool.pop();
}
return this.factory();
}
release(obj) {
if (obj) {
this.reset(obj);
this.pool.push(obj);
}
}
}
// 使用示例
const bufferPool = new ObjectPool(
() => Buffer.alloc(1024), // 创建1KB缓冲区
(buf) => buf.fill(0), // 重置缓冲区
50 // 初始化50个缓冲区
);
// 在请求处理中使用对象池
app.use((req, res, next) => {
const buffer = bufferPool.acquire();
// 使用buffer处理数据
res.on('finish', () => {
bufferPool.release(buffer);
});
next();
});
3. V8引擎优化配置:
// 启动脚本优化
// package.json
{
"scripts": {
"start": "node --max-old-space-size=4096 --max-semi-space-size=256 --optimize-for-size app.js",
"start:prod": "node --max-old-space-size=8192 --max-semi-space-size=512 --expose-gc app.js"
}
}
// V8优化配置
class V8Optimizer {
static configureV8() {
// 设置V8标志
const v8Flags = [
'--max-old-space-size=4096', // 设置老生代内存上限
'--max-semi-space-size=256', // 设置新生代内存上限
'--optimize-for-size', // 优化内存占用
'--expose-gc' // 暴露gc函数
];
// 在生产环境中应用标志
if (process.env.NODE_ENV === 'production') {
v8Flags.push(
'--use-idle-notification', // 使用空闲通知
'--max-inlined-source-size=600' // 限制内联源码大小
);
}
return v8Flags;
}
static monitorV8Performance() {
if (typeof global.gc === 'function') {
// 定期触发垃圾回收
setInterval(() => {
const before = process.memoryUsage();
global.gc();
const after = process.memoryUsage();
console.log('GC completed:', {
heapFreed: (before.heapUsed - after.heapUsed) / 1024 / 1024,
heapUsedAfter: after.heapUsed / 1024 / 1024
});
}, 60000); // 每分钟
}
}
}
性能优化最佳实践:
What are the security protection strategies for Node.js applications?
What are the security protection strategies for Node.js applications?
考察点:安全防护体系。
答案:
Node.js应用的安全防护需要从输入验证、身份认证、授权控制、数据加密、攻击防护等多个层面建立全面的安全体系。
1. 输入验证和SQL注入防护:
const joi = require('joi');
const xss = require('xss');
const validator = require('validator');
// 输入验证中间件
class InputValidator {
static createValidator(schema) {
return (req, res, next) => {
const { error, value } = schema.validate(req.body, {
abortEarly: false,
stripUnknown: true
});
if (error) {
return res.status(400).json({
error: 'Validation failed',
details: error.details.map(detail => ({
field: detail.path.join('.'),
message: detail.message
}))
});
}
req.validatedData = value;
next();
};
}
// XSS防护
static sanitizeInput(req, res, next) {
const sanitizeObject = (obj) => {
for (const key in obj) {
if (typeof obj[key] === 'string') {
obj[key] = xss(obj[key]);
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
sanitizeObject(obj[key]);
}
}
};
if (req.body) sanitizeObject(req.body);
if (req.query) sanitizeObject(req.query);
if (req.params) sanitizeObject(req.params);
next();
}
}
// 用户注册验证示例
const userRegistrationSchema = joi.object({
email: joi.string().email().required(),
password: joi.string()
.min(8)
.pattern(new RegExp('^(?=.*[a-z])(?=.*[A-Z])(?=.*[0-9])(?=.*[!@#\$%\^&\*])'))
.required()
.messages({
'string.pattern.base': 'Password must contain at least one lowercase, uppercase, number and special character'
}),
username: joi.string().alphanum().min(3).max(30).required(),
age: joi.number().integer().min(18).max(120).optional()
});
// 参数化查询防止SQL注入
class SecureDatabase {
constructor(connection) {
this.db = connection;
}
async findUser(email) {
// 使用参数化查询
const [rows] = await this.db.execute(
'SELECT id, email, password_hash FROM users WHERE email = ?',
[email]
);
return rows[0];
}
async createUser(userData) {
const { email, passwordHash, username } = userData;
// 参数化插入
const [result] = await this.db.execute(
'INSERT INTO users (email, password_hash, username, created_at) VALUES (?, ?, ?, ?)',
[email, passwordHash, username, new Date()]
);
return result.insertId;
}
// 动态查询构建器(安全版本)
buildWhereClause(conditions) {
const allowedFields = new Set(['id', 'email', 'username', 'status']);
const whereClauses = [];
const values = [];
for (const [field, value] of Object.entries(conditions)) {
if (allowedFields.has(field)) {
whereClauses.push(`${field} = ?`);
values.push(value);
}
}
return {
whereClause: whereClauses.length > 0 ? `WHERE ${whereClauses.join(' AND ')}` : '',
values
};
}
}
2. 身份认证和JWT安全:
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');
const crypto = require('crypto');
const speakeasy = require('speakeasy');
class AuthenticationService {
constructor(config) {
this.jwtSecret = config.jwtSecret;
this.jwtRefreshSecret = config.jwtRefreshSecret;
this.saltRounds = config.saltRounds || 12;
this.tokenExpiry = config.tokenExpiry || '15m';
this.refreshTokenExpiry = config.refreshTokenExpiry || '7d';
}
// 密码哈希
async hashPassword(password) {
return bcrypt.hash(password, this.saltRounds);
}
async verifyPassword(password, hash) {
return bcrypt.compare(password, hash);
}
// JWT令牌生成和验证
generateTokens(user) {
const payload = {
userId: user.id,
email: user.email,
role: user.role,
permissions: user.permissions
};
const accessToken = jwt.sign(payload, this.jwtSecret, {
expiresIn: this.tokenExpiry,
issuer: 'myapp',
audience: 'myapp-users'
});
const refreshToken = jwt.sign(
{ userId: user.id, tokenId: crypto.randomUUID() },
this.jwtRefreshSecret,
{ expiresIn: this.refreshTokenExpiry }
);
return { accessToken, refreshToken };
}
verifyToken(token, isRefreshToken = false) {
const secret = isRefreshToken ? this.jwtRefreshSecret : this.jwtSecret;
try {
return jwt.verify(token, secret, {
issuer: 'myapp',
audience: 'myapp-users'
});
} catch (error) {
if (error.name === 'TokenExpiredError') {
throw new Error('Token expired');
} else if (error.name === 'JsonWebTokenError') {
throw new Error('Invalid token');
}
throw error;
}
}
// 双因子认证
generateTOTPSecret(userIdentifier) {
return speakeasy.generateSecret({
name: `MyApp (${userIdentifier})`,
issuer: 'MyApp',
length: 32
});
}
verifyTOTP(token, secret) {
return speakeasy.totp.verify({
secret,
token,
window: 2, // 允许前后2个时间窗口
time: Math.floor(Date.now() / 1000)
});
}
// 登录尝试限制
async checkLoginAttempts(email, ip) {
const key = `login_attempts:${email}:${ip}`;
const attempts = await redis.get(key) || 0;
if (attempts >= 5) {
throw new Error('Too many login attempts. Please try again later.');
}
return parseInt(attempts);
}
async recordFailedLogin(email, ip) {
const key = `login_attempts:${email}:${ip}`;
const attempts = await redis.incr(key);
if (attempts === 1) {
await redis.expire(key, 900); // 15分钟后重置
}
return attempts;
}
async clearLoginAttempts(email, ip) {
const key = `login_attempts:${email}:${ip}`;
await redis.del(key);
}
}
// 认证中间件
function authenticateToken(req, res, next) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'Access token required' });
}
try {
const decoded = authService.verifyToken(token);
req.user = decoded;
next();
} catch (error) {
return res.status(403).json({ error: error.message });
}
}
// 权限控制中间件
function requirePermission(permission) {
return (req, res, next) => {
if (!req.user) {
return res.status(401).json({ error: 'Authentication required' });
}
if (!req.user.permissions.includes(permission)) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
next();
};
}
3. HTTPS和数据加密:
const https = require('https');
const fs = require('fs');
const crypto = require('crypto');
class EncryptionService {
constructor(config) {
this.algorithm = 'aes-256-gcm';
this.keyLength = 32;
this.ivLength = 16;
this.tagLength = 16;
this.masterKey = Buffer.from(config.masterKey, 'hex');
}
// 对称加密
encrypt(data) {
const iv = crypto.randomBytes(this.ivLength);
const cipher = crypto.createCipher(this.algorithm, this.masterKey);
cipher.setAAD(Buffer.from('authenticated-data'));
let encrypted = cipher.update(data, 'utf8', 'hex');
encrypted += cipher.final('hex');
const tag = cipher.getAuthTag();
return {
encrypted,
iv: iv.toString('hex'),
tag: tag.toString('hex')
};
}
decrypt(encryptedData) {
const { encrypted, iv, tag } = encryptedData;
const decipher = crypto.createDecipher(this.algorithm, this.masterKey);
decipher.setAAD(Buffer.from('authenticated-data'));
decipher.setAuthTag(Buffer.from(tag, 'hex'));
let decrypted = decipher.update(encrypted, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}
// 哈希函数
hash(data, salt = null) {
if (!salt) {
salt = crypto.randomBytes(16);
}
const hash = crypto.pbkdf2Sync(data, salt, 100000, 64, 'sha512');
return {
hash: hash.toString('hex'),
salt: salt.toString('hex')
};
}
verifyHash(data, hash, salt) {
const computedHash = crypto.pbkdf2Sync(data, Buffer.from(salt, 'hex'), 100000, 64, 'sha512');
return computedHash.toString('hex') === hash;
}
}
// HTTPS服务器配置
function createSecureServer(app) {
const options = {
key: fs.readFileSync('ssl/private-key.pem'),
cert: fs.readFileSync('ssl/certificate.pem'),
// 安全配置
secureProtocol: 'TLSv1_2_method',
ciphers: [
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384'
].join(':'),
honorCipherOrder: true
};
const server = https.createServer(options, app);
// 强制HTTPS重定向中间件
app.use((req, res, next) => {
if (req.header('x-forwarded-proto') !== 'https') {
return res.redirect(301, `https://${req.header('host')}${req.url}`);
}
next();
});
return server;
}
4. 攻击防护:
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const cors = require('cors');
// 安全中间件配置
function setupSecurityMiddleware(app) {
// Helmet安全头
app.use(helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'"],
scriptSrc: ["'self'"],
imgSrc: ["'self'", "data:", "https:"],
fontSrc: ["'self'"],
connectSrc: ["'self'"],
frameSrc: ["'none'"],
objectSrc: ["'none'"]
}
},
hsts: {
maxAge: 31536000,
includeSubDomains: true,
preload: true
}
}));
// CORS配置
app.use(cors({
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true,
methods: ['GET', 'POST', 'PUT', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization']
}));
// 请求大小限制
app.use(express.json({ limit: '10mb' }));
app.use(express.urlencoded({ limit: '10mb', extended: true }));
// 速率限制
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 100, // 每个IP最多100个请求
message: {
error: 'Too many requests from this IP'
},
standardHeaders: true,
legacyHeaders: false
});
app.use('/api/', limiter);
// 登录端点的严格限制
const loginLimiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 5,
skipSuccessfulRequests: true
});
app.use('/api/auth/login', loginLimiter);
}
// CSRF防护
class CSRFProtection {
constructor() {
this.tokens = new Map();
}
generateToken(sessionId) {
const token = crypto.randomBytes(32).toString('hex');
this.tokens.set(sessionId, token);
// 设置过期时间
setTimeout(() => {
this.tokens.delete(sessionId);
}, 3600000); // 1小时
return token;
}
verifyToken(sessionId, token) {
const validToken = this.tokens.get(sessionId);
return validToken && validToken === token;
}
middleware() {
return (req, res, next) => {
if (['POST', 'PUT', 'DELETE'].includes(req.method)) {
const token = req.headers['x-csrf-token'] || req.body._csrf;
const sessionId = req.sessionID;
if (!this.verifyToken(sessionId, token)) {
return res.status(403).json({ error: 'Invalid CSRF token' });
}
}
next();
};
}
}
// SQL注入检测
class SQLInjectionDetector {
constructor() {
this.patterns = [
/(\s|^)(union|select|insert|update|delete|drop|create|alter|exec|execute)(\s|$)/i,
/'[^']*'/,
/--/,
/\/\*.*\*\//,
/;[\s]*$/
];
}
detectSQLInjection(input) {
if (typeof input !== 'string') return false;
return this.patterns.some(pattern => pattern.test(input));
}
middleware() {
return (req, res, next) => {
const checkObject = (obj, path = '') => {
for (const [key, value] of Object.entries(obj)) {
const currentPath = path ? `${path}.${key}` : key;
if (typeof value === 'string' && this.detectSQLInjection(value)) {
console.warn('SQL injection attempt detected', {
path: currentPath,
value,
ip: req.ip,
userAgent: req.get('User-Agent')
});
return res.status(400).json({
error: 'Malicious input detected'
});
}
if (typeof value === 'object' && value !== null) {
checkObject(value, currentPath);
}
}
};
if (req.body) checkObject(req.body);
if (req.query) checkObject(req.query);
if (req.params) checkObject(req.params);
next();
};
}
}
5. 安全审计和日志:
class SecurityAuditLogger {
constructor(logger) {
this.logger = logger;
this.suspiciousActivities = new Map();
}
logSecurityEvent(event, data, req) {
const securityLog = {
event,
timestamp: new Date().toISOString(),
ip: req.ip,
userAgent: req.get('User-Agent'),
userId: req.user ? req.user.userId : null,
requestId: req.requestId,
...data
};
this.logger.security(event, securityLog);
// 检测可疑活动
this.detectSuspiciousActivity(req.ip, event);
}
detectSuspiciousActivity(ip, event) {
const key = `${ip}:${event}`;
const current = this.suspiciousActivities.get(key) || { count: 0, firstSeen: Date.now() };
current.count++;
current.lastSeen = Date.now();
this.suspiciousActivities.set(key, current);
// 如果在短时间内多次触发安全事件,标记为可疑
const timeWindow = 5 * 60 * 1000; // 5分钟
if (current.count >= 5 && (current.lastSeen - current.firstSeen) < timeWindow) {
this.logger.security('suspicious_activity_detected', {
ip,
event,
count: current.count,
timeWindow: timeWindow / 1000
});
// 可以在这里触发自动封IP等措施
this.blockIP(ip);
}
}
blockIP(ip) {
// 实现IP封锁逻辑
console.warn(`Blocking suspicious IP: ${ip}`);
}
// 安全事件记录中间件
createAuditMiddleware() {
return (req, res, next) => {
// 记录所有认证相关的请求
if (req.path.includes('/auth/')) {
this.logSecurityEvent('auth_request', {
path: req.path,
method: req.method
}, req);
}
// 监听响应状态
res.on('finish', () => {
if (res.statusCode === 401) {
this.logSecurityEvent('unauthorized_access', {
path: req.path,
method: req.method
}, req);
} else if (res.statusCode === 403) {
this.logSecurityEvent('forbidden_access', {
path: req.path,
method: req.method
}, req);
}
});
next();
};
}
}
// 使用示例
const app = express();
const encryptionService = new EncryptionService({ masterKey: process.env.MASTER_KEY });
const csrfProtection = new CSRFProtection();
const sqlInjectionDetector = new SQLInjectionDetector();
const auditLogger = new SecurityAuditLogger(logger);
// 设置安全中间件
setupSecurityMiddleware(app);
app.use(InputValidator.sanitizeInput);
app.use(sqlInjectionDetector.middleware());
app.use(csrfProtection.middleware());
app.use(auditLogger.createAuditMiddleware());
安全防护最佳实践:
How to implement hot reloading and zero-downtime deployment for Node.js applications?
How to implement hot reloading and zero-downtime deployment for Node.js applications?
考察点:部署策略设计。
答案:
热更新和零停机部署是保证服务连续性的关键技术,需要通过负载均衡、进程管理、健康检查等机制实现平滑的应用更新。
1. PM2集群热重载:
// ecosystem.config.js - PM2配置文件
module.exports = {
apps: [{
name: 'my-app',
script: './app.js',
instances: 'max', // 使用所有CPU核心
exec_mode: 'cluster',
// 零停机重启配置
wait_ready: true,
listen_timeout: 10000,
kill_timeout: 5000,
// 健康检查
health_check_url: 'http://localhost:3000/health',
health_check_grace_period: 3000,
// 环境变量
env: {
NODE_ENV: 'production',
PORT: 3000
},
// 日志配置
log_file: './logs/combined.log',
out_file: './logs/out.log',
error_file: './logs/error.log',
// 监控配置
monitoring: false,
pmx: true
}]
};
// 应用代码中的优雅关闭处理
class GracefulShutdown {
constructor(server, options = {}) {
this.server = server;
this.options = {
shutdownTimeout: options.shutdownTimeout || 30000,
healthCheckDelay: options.healthCheckDelay || 1000,
...options
};
this.isShuttingDown = false;
this.activeConnections = new Set();
this.setupSignalHandlers();
this.setupConnectionTracking();
}
setupSignalHandlers() {
// 监听PM2的优雅关闭信号
process.on('SIGINT', () => this.gracefulShutdown('SIGINT'));
process.on('SIGTERM', () => this.gracefulShutdown('SIGTERM'));
// PM2特有的reload信号
process.on('SIGUSR2', () => this.gracefulShutdown('SIGUSR2'));
}
setupConnectionTracking() {
this.server.on('connection', (socket) => {
this.activeConnections.add(socket);
socket.on('close', () => {
this.activeConnections.delete(socket);
});
});
}
async gracefulShutdown(signal) {
if (this.isShuttingDown) {
console.log('Already shutting down, force exit');
process.exit(1);
}
console.log(`Received ${signal}, starting graceful shutdown...`);
this.isShuttingDown = true;
// 停止接受新连接
this.server.close(async () => {
console.log('HTTP server closed');
try {
// 等待活跃连接完成
await this.waitForActiveConnections();
// 执行清理操作
await this.cleanup();
console.log('Graceful shutdown completed');
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
process.exit(1);
}
});
// 强制退出超时
setTimeout(() => {
console.log('Shutdown timeout, force exit');
process.exit(1);
}, this.options.shutdownTimeout);
}
async waitForActiveConnections() {
return new Promise((resolve) => {
const checkConnections = () => {
if (this.activeConnections.size === 0) {
resolve();
} else {
console.log(`Waiting for ${this.activeConnections.size} active connections...`);
setTimeout(checkConnections, 1000);
}
};
checkConnections();
});
}
async cleanup() {
// 关闭数据库连接
if (global.db) {
await global.db.close();
console.log('Database connections closed');
}
// 关闭Redis连接
if (global.redis) {
await global.redis.quit();
console.log('Redis connections closed');
}
// 其他清理操作
console.log('Cleanup completed');
}
// 健康检查端点
healthCheck(req, res) {
if (this.isShuttingDown) {
return res.status(503).json({
status: 'shutting down',
ready: false
});
}
res.json({
status: 'healthy',
ready: true,
uptime: process.uptime(),
memory: process.memoryUsage()
});
}
}
// 使用示例
const express = require('express');
const app = express();
const server = app.listen(3000, () => {
console.log('Server started on port 3000');
// 通知PM2服务已准备就绪
if (process.send) {
process.send('ready');
}
});
const gracefulShutdown = new GracefulShutdown(server);
app.get('/health', (req, res) => gracefulShutdown.healthCheck(req, res));
2. Docker容器滚动更新:
# 多阶段构建优化
FROM node:18-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
FROM node:18-alpine AS runtime
# 安全用户
RUN addgroup -g 1001 -S nodejs && \
adduser -S nextjs -u 1001
WORKDIR /app
COPY --from=builder /app/node_modules ./node_modules
COPY . .
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
USER nextjs
EXPOSE 3000
# 优雅关闭信号处理
STOPSIGNAL SIGTERM
CMD ["node", "app.js"]
# docker-compose.yml - 蓝绿部署
version: '3.8'
services:
app-blue:
build: .
ports:
- "3001:3000"
environment:
- NODE_ENV=production
- COLOR=blue
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
app-green:
build: .
ports:
- "3002:3000"
environment:
- NODE_ENV=production
- COLOR=green
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- app-blue
- app-green
3. Nginx负载均衡配置:
# nginx.conf
upstream app_servers {
# 权重分配,支持动态调整
server app-blue:3000 weight=100 max_fails=3 fail_timeout=30s;
server app-green:3000 weight=0 max_fails=3 fail_timeout=30s backup;
# 健康检查
keepalive 32;
}
server {
listen 80;
server_name localhost;
# 健康检查端点
location /nginx-health {
access_log off;
return 200 "healthy\n";
}
# 应用代理
location / {
proxy_pass http://app_servers;
# 连接设置
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# 头部设置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 连接重用
proxy_http_version 1.1;
proxy_set_header Connection "";
# 错误处理
proxy_next_upstream error timeout http_500 http_502 http_503;
proxy_next_upstream_timeout 10s;
proxy_next_upstream_tries 3;
}
# 应用健康检查
location /health {
proxy_pass http://app_servers/health;
access_log off;
}
}
4. 自动化部署脚本:
// deploy.js - 零停机部署脚本
const { exec } = require('child_process');
const util = require('util');
const execAsync = util.promisify(exec);
class ZeroDowntimeDeployment {
constructor(config) {
this.config = {
bluePort: 3001,
greenPort: 3002,
healthCheckUrl: 'http://localhost',
healthCheckTimeout: 30000,
drainTimeout: 30000,
...config
};
this.currentColor = null;
}
async deploy(newVersion) {
console.log(`Starting zero-downtime deployment for version ${newVersion}`);
try {
// 1. 确定当前活跃的服务
const currentColor = await this.getCurrentActiveColor();
const targetColor = currentColor === 'blue' ? 'green' : 'blue';
console.log(`Current active: ${currentColor}, Target: ${targetColor}`);
// 2. 部署到目标环境
await this.deployToTarget(targetColor, newVersion);
// 3. 健康检查
await this.waitForHealthy(targetColor);
// 4. 切换流量
await this.switchTraffic(targetColor);
// 5. 验证新版本
await this.verifyDeployment(targetColor);
// 6. 停止旧版本(可选的清理阶段)
await this.cleanupOldVersion(currentColor);
console.log(`Deployment completed successfully! Active: ${targetColor}`);
return { success: true, activeColor: targetColor, version: newVersion };
} catch (error) {
console.error('Deployment failed:', error);
// 回滚到之前的版本
await this.rollback();
throw error;
}
}
async getCurrentActiveColor() {
try {
const { stdout } = await execAsync('docker-compose ps --services --filter "status=running"');
const runningServices = stdout.trim().split('\n');
if (runningServices.includes('app-blue')) return 'blue';
if (runningServices.includes('app-green')) return 'green';
return 'blue'; // 默认启动蓝色
} catch (error) {
console.warn('Could not determine current color, defaulting to blue');
return 'blue';
}
}
async deployToTarget(color, version) {
console.log(`Deploying version ${version} to ${color} environment...`);
const commands = [
`docker build -t myapp:${version} .`,
`docker tag myapp:${version} myapp:${color}-latest`,
`docker-compose up -d app-${color}`
];
for (const command of commands) {
console.log(`Executing: ${command}`);
const { stdout, stderr } = await execAsync(command);
if (stderr && !stderr.includes('WARNING')) {
throw new Error(`Command failed: ${command}\n${stderr}`);
}
}
}
async waitForHealthy(color) {
console.log(`Waiting for ${color} environment to become healthy...`);
const port = color === 'blue' ? this.config.bluePort : this.config.greenPort;
const healthUrl = `${this.config.healthCheckUrl}:${port}/health`;
const startTime = Date.now();
while (Date.now() - startTime < this.config.healthCheckTimeout) {
try {
const response = await fetch(healthUrl);
const data = await response.json();
if (response.ok && data.ready) {
console.log(`${color} environment is healthy`);
return true;
}
console.log(`Health check failed, retrying in 5s...`);
await this.sleep(5000);
} catch (error) {
console.log(`Health check error: ${error.message}, retrying...`);
await this.sleep(5000);
}
}
throw new Error(`Health check timeout for ${color} environment`);
}
async switchTraffic(targetColor) {
console.log(`Switching traffic to ${targetColor}...`);
// 更新Nginx配置
const nginxConfig = this.generateNginxConfig(targetColor);
await this.updateNginxConfig(nginxConfig);
// 重新加载Nginx配置
await execAsync('docker-compose exec nginx nginx -s reload');
// 等待流量切换完成
await this.sleep(2000);
console.log(`Traffic switched to ${targetColor}`);
}
generateNginxConfig(activeColor) {
const inactiveColor = activeColor === 'blue' ? 'green' : 'blue';
return `
upstream app_servers {
server app-${activeColor}:3000 weight=100 max_fails=3 fail_timeout=30s;
server app-${inactiveColor}:3000 weight=0 max_fails=3 fail_timeout=30s backup;
keepalive 32;
}
`.trim();
}
async updateNginxConfig(config) {
// 写入新的Nginx配置
const fs = require('fs').promises;
await fs.writeFile('./nginx-upstream.conf', config);
// 复制到容器中
await execAsync('docker cp ./nginx-upstream.conf nginx:/etc/nginx/conf.d/upstream.conf');
}
async verifyDeployment(color) {
console.log(`Verifying deployment on ${color}...`);
// 执行一些验证测试
const tests = [
this.testApiEndpoint('/health'),
this.testApiEndpoint('/api/version'),
this.testDatabaseConnection()
];
const results = await Promise.allSettled(tests);
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
throw new Error(`Verification failed: ${failures.length} tests failed`);
}
console.log('Deployment verification passed');
}
async testApiEndpoint(endpoint) {
const response = await fetch(`${this.config.healthCheckUrl}${endpoint}`);
if (!response.ok) {
throw new Error(`API test failed for ${endpoint}: ${response.status}`);
}
return response.json();
}
async rollback() {
console.log('Initiating rollback...');
// 切换回之前的版本
const previousColor = this.currentColor === 'blue' ? 'green' : 'blue';
await this.switchTraffic(previousColor);
console.log('Rollback completed');
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用示例
const deployer = new ZeroDowntimeDeployment({
healthCheckUrl: 'http://localhost',
healthCheckTimeout: 60000
});
// 部署新版本
if (require.main === module) {
const version = process.argv[2] || 'latest';
deployer.deploy(version)
.then(result => {
console.log('Deployment successful:', result);
process.exit(0);
})
.catch(error => {
console.error('Deployment failed:', error);
process.exit(1);
});
}
零停机部署最佳实践:
How to handle large-scale data processing and stream computing in Node.js?
How to handle large-scale data processing and stream computing in Node.js?
考察点:大数据处理能力。
答案:
Node.js在大规模数据处理和流式计算方面具有天然优势,通过Stream API、Worker Threads、集群等技术可以高效处理海量数据。
1. 流式数据处理:
const { Transform, Readable, Writable, pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
// 高性能数据转换流
class DataTransformStream extends Transform {
constructor(options = {}) {
super({
objectMode: true,
highWaterMark: options.bufferSize || 1000
});
this.batchSize = options.batchSize || 100;
this.batch = [];
this.processedCount = 0;
this.startTime = Date.now();
}
_transform(chunk, encoding, callback) {
try {
// 数据验证和清洗
const cleanedData = this.cleanData(chunk);
if (cleanedData) {
this.batch.push(cleanedData);
// 批量处理
if (this.batch.length >= this.batchSize) {
this.processBatch();
}
}
callback();
} catch (error) {
callback(error);
}
}
_flush(callback) {
// 处理剩余数据
if (this.batch.length > 0) {
this.processBatch();
}
this.logStats();
callback();
}
cleanData(data) {
// 数据清洗逻辑
if (!data || typeof data !== 'object') return null;
return {
id: data.id,
timestamp: new Date(data.timestamp),
value: parseFloat(data.value) || 0,
category: (data.category || '').toLowerCase().trim(),
metadata: data.metadata || {}
};
}
processBatch() {
const processedBatch = this.batch.map(item => {
// 数据转换和聚合
return {
...item,
processedAt: Date.now(),
normalized_value: this.normalizeValue(item.value)
};
});
// 输出批量数据
for (const item of processedBatch) {
this.push(item);
}
this.processedCount += this.batch.length;
this.batch = [];
}
normalizeValue(value) {
// 数值标准化
return Math.log(Math.abs(value) + 1);
}
logStats() {
const duration = Date.now() - this.startTime;
const rate = (this.processedCount / duration * 1000).toFixed(2);
console.log(`Processed ${this.processedCount} records in ${duration}ms (${rate} records/sec)`);
}
}
// 大文件流式读取器
class BigFileReader extends Readable {
constructor(filePath, options = {}) {
super({ objectMode: true });
this.filePath = filePath;
this.chunkSize = options.chunkSize || 64 * 1024; // 64KB chunks
this.currentLine = '';
this.lineNumber = 0;
this.fileStream = require('fs').createReadStream(filePath, {
highWaterMark: this.chunkSize
});
this.setupFileStream();
}
setupFileStream() {
this.fileStream.on('data', (chunk) => {
this.processChunk(chunk.toString());
});
this.fileStream.on('end', () => {
if (this.currentLine) {
this.emitLine(this.currentLine);
}
this.push(null); // 结束流
});
this.fileStream.on('error', (error) => {
this.emit('error', error);
});
}
processChunk(data) {
const lines = (this.currentLine + data).split('\n');
this.currentLine = lines.pop(); // 保留不完整的最后一行
for (const line of lines) {
this.emitLine(line);
}
}
emitLine(line) {
if (line.trim()) {
try {
const data = JSON.parse(line);
this.lineNumber++;
this.push({ ...data, lineNumber: this.lineNumber });
} catch (error) {
console.warn(`Invalid JSON at line ${this.lineNumber + 1}: ${line}`);
}
}
}
_read() {
// Readable接口实现,实际读取由fileStream处理
}
}
// 数据库批量写入器
class BatchDatabaseWriter extends Writable {
constructor(db, tableName, options = {}) {
super({ objectMode: true });
this.db = db;
this.tableName = tableName;
this.batchSize = options.batchSize || 1000;
this.batch = [];
this.totalWritten = 0;
}
_write(chunk, encoding, callback) {
this.batch.push(chunk);
if (this.batch.length >= this.batchSize) {
this.flushBatch()
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
_final(callback) {
if (this.batch.length > 0) {
this.flushBatch()
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
async flushBatch() {
if (this.batch.length === 0) return;
const startTime = Date.now();
try {
// 批量插入
const placeholders = this.batch.map(() => '(?, ?, ?, ?)').join(', ');
const values = this.batch.flatMap(item => [
item.id, item.timestamp, item.value, JSON.stringify(item.metadata)
]);
const query = `
INSERT INTO ${this.tableName} (id, timestamp, value, metadata)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
value = VALUES(value),
metadata = VALUES(metadata)
`;
await this.db.execute(query, values);
this.totalWritten += this.batch.length;
const duration = Date.now() - startTime;
console.log(`Batch written: ${this.batch.length} records in ${duration}ms`);
this.batch = [];
} catch (error) {
console.error('Batch write failed:', error);
throw error;
}
}
}
// 流式处理管道
async function processLargeDataFile(inputFile, outputDb, tableName) {
try {
console.log(`Starting data processing: ${inputFile}`);
const reader = new BigFileReader(inputFile);
const transformer = new DataTransformStream({
batchSize: 500,
bufferSize: 2000
});
const writer = new BatchDatabaseWriter(outputDb, tableName, {
batchSize: 1000
});
// 流式处理管道
await pipelineAsync(reader, transformer, writer);
console.log('Data processing completed successfully');
} catch (error) {
console.error('Data processing failed:', error);
throw error;
}
}
2. 多进程并行处理:
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const cluster = require('cluster');
const os = require('os');
// Worker线程数据处理器
class DataProcessor {
constructor(options = {}) {
this.numWorkers = options.numWorkers || os.cpus().length;
this.workers = [];
this.taskQueue = [];
this.results = new Map();
this.activeJobs = 0;
}
async processDataset(dataset, processFunction) {
return new Promise((resolve, reject) => {
const chunks = this.chunkArray(dataset, Math.ceil(dataset.length / this.numWorkers));
const jobPromises = chunks.map((chunk, index) =>
this.processChunk(chunk, processFunction, index)
);
Promise.all(jobPromises)
.then(results => {
// 合并结果
const mergedResult = results.flat();
resolve(mergedResult);
})
.catch(reject);
});
}
async processChunk(chunk, processFunction, jobId) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, {
workerData: {
chunk,
processFunction: processFunction.toString(),
jobId
}
});
worker.on('message', (result) => {
if (result.error) {
reject(new Error(result.error));
} else {
resolve(result.data);
}
});
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
this.workers.push(worker);
});
}
chunkArray(array, chunkSize) {
const chunks = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
async cleanup() {
// 终止所有worker
const terminationPromises = this.workers.map(worker =>
worker.terminate()
);
await Promise.all(terminationPromises);
this.workers = [];
}
}
// Worker线程代码
if (!isMainThread) {
const { chunk, processFunction, jobId } = workerData;
try {
// 重建处理函数
const fn = new Function('return ' + processFunction)();
// 处理数据块
const processedData = chunk.map(fn);
// 发送结果
parentPort.postMessage({
jobId,
data: processedData
});
} catch (error) {
parentPort.postMessage({
jobId,
error: error.message
});
}
}
// 集群化的数据处理服务
class ClusterDataProcessor {
constructor() {
this.workers = new Map();
this.pendingJobs = new Map();
}
start(numWorkers = os.cpus().length) {
if (cluster.isMaster) {
console.log(`Master ${process.pid} starting ${numWorkers} workers`);
for (let i = 0; i < numWorkers; i++) {
this.forkWorker();
}
cluster.on('exit', (worker) => {
console.log(`Worker ${worker.process.pid} died, restarting...`);
this.workers.delete(worker.id);
this.forkWorker();
});
} else {
this.startWorker();
}
}
forkWorker() {
const worker = cluster.fork();
this.workers.set(worker.id, {
worker,
busy: false,
lastTask: null
});
worker.on('message', (msg) => {
this.handleWorkerMessage(worker.id, msg);
});
}
startWorker() {
console.log(`Worker ${process.pid} started`);
process.on('message', async (msg) => {
if (msg.type === 'PROCESS_DATA') {
try {
const result = await this.processDataChunk(msg.data);
process.send({
type: 'RESULT',
jobId: msg.jobId,
workerId: process.pid,
result
});
} catch (error) {
process.send({
type: 'ERROR',
jobId: msg.jobId,
workerId: process.pid,
error: error.message
});
}
}
});
}
async processDataChunk(data) {
// 模拟复杂的数据处理
return data.map(item => ({
...item,
processed: true,
processedAt: Date.now(),
workerId: process.pid,
// 示例:计算移动平均
movingAverage: this.calculateMovingAverage(item.values || [])
}));
}
calculateMovingAverage(values, window = 5) {
if (values.length < window) return null;
const sum = values.slice(-window).reduce((a, b) => a + b, 0);
return sum / window;
}
handleWorkerMessage(workerId, message) {
const workerInfo = this.workers.get(workerId);
if (!workerInfo) return;
if (message.type === 'RESULT') {
const jobPromise = this.pendingJobs.get(message.jobId);
if (jobPromise) {
jobPromise.resolve(message.result);
this.pendingJobs.delete(message.jobId);
}
workerInfo.busy = false;
} else if (message.type === 'ERROR') {
const jobPromise = this.pendingJobs.get(message.jobId);
if (jobPromise) {
jobPromise.reject(new Error(message.error));
this.pendingJobs.delete(message.jobId);
}
workerInfo.busy = false;
}
}
async distributeWork(dataset) {
const chunks = this.chunkArray(dataset, Math.ceil(dataset.length / this.workers.size));
const promises = chunks.map((chunk, index) => this.assignWork(chunk, index));
const results = await Promise.all(promises);
return results.flat();
}
assignWork(chunk, jobId) {
return new Promise((resolve, reject) => {
// 找到空闲的worker
const availableWorker = Array.from(this.workers.values())
.find(w => !w.busy);
if (!availableWorker) {
reject(new Error('No available workers'));
return;
}
availableWorker.busy = true;
this.pendingJobs.set(jobId, { resolve, reject });
availableWorker.worker.send({
type: 'PROCESS_DATA',
jobId,
data: chunk
});
});
}
}
3. 实时流式计算:
// 实时数据流处理器
class RealTimeStreamProcessor {
constructor(options = {}) {
this.windowSize = options.windowSize || 60000; // 1分钟窗口
this.slideInterval = options.slideInterval || 10000; // 10秒滑动
this.windows = new Map();
this.aggregators = new Map();
this.startWindowCleaner();
}
// 添加聚合器
addAggregator(name, aggregateFunction) {
this.aggregators.set(name, aggregateFunction);
}
// 处理数据点
processDataPoint(dataPoint) {
const timestamp = dataPoint.timestamp || Date.now();
const windowKey = this.getWindowKey(timestamp);
// 获取或创建窗口
if (!this.windows.has(windowKey)) {
this.windows.set(windowKey, {
startTime: windowKey,
endTime: windowKey + this.windowSize,
data: [],
aggregated: false
});
}
const window = this.windows.get(windowKey);
window.data.push(dataPoint);
// 检查是否需要触发计算
this.checkForComputation(windowKey, timestamp);
}
getWindowKey(timestamp) {
return Math.floor(timestamp / this.slideInterval) * this.slideInterval;
}
checkForComputation(windowKey, currentTime) {
const window = this.windows.get(windowKey);
if (!window.aggregated && currentTime >= window.endTime) {
this.computeAggregations(windowKey);
}
}
computeAggregations(windowKey) {
const window = this.windows.get(windowKey);
if (!window || window.aggregated) return;
const results = {};
// 执行所有聚合函数
for (const [name, aggregateFunction] of this.aggregators) {
try {
results[name] = aggregateFunction(window.data);
} catch (error) {
console.error(`Aggregation ${name} failed:`, error);
results[name] = null;
}
}
window.aggregated = true;
window.results = results;
// 发布结果
this.publishResults(windowKey, results);
}
publishResults(windowKey, results) {
const event = {
windowKey,
startTime: windowKey,
endTime: windowKey + this.windowSize,
results,
computedAt: Date.now()
};
console.log('Window results:', event);
// 可以发送到消息队列、WebSocket等
this.emit('windowComplete', event);
}
startWindowCleaner() {
setInterval(() => {
const cutoff = Date.now() - (this.windowSize * 2); // 保留2个窗口的数据
for (const [windowKey, window] of this.windows) {
if (window.endTime < cutoff) {
this.windows.delete(windowKey);
}
}
}, this.windowSize);
}
// 获取实时统计
getRealTimeStats() {
const stats = {
activeWindows: this.windows.size,
totalDataPoints: 0,
aggregatedWindows: 0
};
for (const window of this.windows.values()) {
stats.totalDataPoints += window.data.length;
if (window.aggregated) {
stats.aggregatedWindows++;
}
}
return stats;
}
}
// 使用示例
const processor = new RealTimeStreamProcessor({
windowSize: 60000, // 1分钟窗口
slideInterval: 10000 // 10秒滑动
});
// 添加聚合函数
processor.addAggregator('average', (data) => {
const sum = data.reduce((acc, point) => acc + (point.value || 0), 0);
return sum / data.length;
});
processor.addAggregator('max', (data) => {
return Math.max(...data.map(point => point.value || 0));
});
processor.addAggregator('count', (data) => {
return data.length;
});
processor.addAggregator('distinctUsers', (data) => {
const users = new Set(data.map(point => point.userId).filter(Boolean));
return users.size;
});
// 处理实时数据流
setInterval(() => {
const mockDataPoint = {
timestamp: Date.now(),
value: Math.random() * 100,
userId: `user_${Math.floor(Math.random() * 1000)}`,
category: ['A', 'B', 'C'][Math.floor(Math.random() * 3)]
};
processor.processDataPoint(mockDataPoint);
}, 1000);
// 监听结果
processor.on('windowComplete', (results) => {
console.log('Real-time analytics:', results);
});
大数据处理最佳实践:
How to design API gateway and service governance for Node.js applications?
How to design API gateway and service governance for Node.js applications?
考察点:服务治理设计。
答案:
API网关是微服务架构的核心组件,负责请求路由、负载均衡、认证授权、限流熔断等功能。服务治理则确保微服务系统的稳定性和可维护性。
1. API网关核心功能:
const express = require('express');
const httpProxy = require('http-proxy-middleware');
const rateLimit = require('express-rate-limit');
class APIGateway {
constructor(config) {
this.app = express();
this.config = config;
this.services = new Map();
this.middlewares = [];
this.setupCore();
}
setupCore() {
// 基础中间件
this.app.use(express.json());
this.app.use(this.requestLogging());
this.app.use(this.corsHandler());
this.app.use(this.rateLimiting());
// 服务发现
this.setupServiceDiscovery();
// 路由配置
this.setupRoutes();
// 错误处理
this.app.use(this.errorHandler());
}
// 请求日志中间件
requestLogging() {
return (req, res, next) => {
const startTime = Date.now();
const requestId = `gw-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
req.requestId = requestId;
req.startTime = startTime;
console.log(`[${requestId}] ${req.method} ${req.url} - Start`);
res.on('finish', () => {
const duration = Date.now() - startTime;
console.log(`[${requestId}] ${req.method} ${req.url} - ${res.statusCode} (${duration}ms)`);
});
next();
};
}
// CORS处理
corsHandler() {
return (req, res, next) => {
res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-Request-ID');
if (req.method === 'OPTIONS') {
return res.sendStatus(200);
}
next();
};
}
// 全局限流
rateLimiting() {
return rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 1000, // 每个IP最多1000个请求
message: { error: 'Too many requests from this IP' },
standardHeaders: true
});
}
// 服务发现和健康检查
setupServiceDiscovery() {
// 定期检查服务健康状态
setInterval(async () => {
await this.checkServiceHealth();
}, 30000); // 30秒检查一次
}
async checkServiceHealth() {
for (const [serviceName, serviceConfig] of this.services) {
const healthyInstances = [];
for (const instance of serviceConfig.instances) {
try {
const response = await fetch(`${instance.url}/health`, { timeout: 5000 });
if (response.ok) {
instance.healthy = true;
instance.lastHealthCheck = Date.now();
healthyInstances.push(instance);
} else {
instance.healthy = false;
}
} catch (error) {
instance.healthy = false;
console.warn(`Health check failed for ${serviceName} at ${instance.url}`);
}
}
serviceConfig.healthyInstances = healthyInstances;
}
}
// 负载均衡算法
getServiceInstance(serviceName, algorithm = 'round-robin') {
const service = this.services.get(serviceName);
if (!service || service.healthyInstances.length === 0) {
throw new Error(`No healthy instances for service: ${serviceName}`);
}
switch (algorithm) {
case 'round-robin':
return this.roundRobinSelection(service);
case 'least-connections':
return this.leastConnectionsSelection(service);
case 'weighted':
return this.weightedSelection(service);
default:
return this.roundRobinSelection(service);
}
}
roundRobinSelection(service) {
service.currentIndex = (service.currentIndex || 0) % service.healthyInstances.length;
const instance = service.healthyInstances[service.currentIndex];
service.currentIndex++;
return instance;
}
leastConnectionsSelection(service) {
return service.healthyInstances.reduce((min, instance) => {
return (instance.activeConnections || 0) < (min.activeConnections || 0) ? instance : min;
});
}
weightedSelection(service) {
const totalWeight = service.healthyInstances.reduce((sum, instance) => sum + (instance.weight || 1), 0);
let random = Math.random() * totalWeight;
for (const instance of service.healthyInstances) {
random -= (instance.weight || 1);
if (random <= 0) {
return instance;
}
}
return service.healthyInstances[0]; // fallback
}
// 动态路由配置
setupRoutes() {
// API版本路由
this.app.use('/api/v1/users', this.createServiceProxy('user-service'));
this.app.use('/api/v1/orders', this.createServiceProxy('order-service'));
this.app.use('/api/v1/products', this.createServiceProxy('product-service'));
// 管理端点
this.app.get('/gateway/health', this.gatewayHealthCheck());
this.app.get('/gateway/services', this.servicesStatus());
this.app.post('/gateway/services/:name/reload', this.reloadService());
}
createServiceProxy(serviceName) {
return httpProxy.createProxyMiddleware({
target: 'http://placeholder', // 动态设置
changeOrigin: true,
pathRewrite: (path, req) => {
// 移除API版本前缀
return path.replace(/^\/api\/v\d+/, '');
},
router: (req) => {
try {
const instance = this.getServiceInstance(serviceName);
// 增加活跃连接数
instance.activeConnections = (instance.activeConnections || 0) + 1;
// 请求结束时减少连接数
req.on('close', () => {
instance.activeConnections = Math.max(0, (instance.activeConnections || 1) - 1);
});
return instance.url;
} catch (error) {
console.error(`Proxy routing failed for ${serviceName}:`, error);
return null;
}
},
onProxyReq: (proxyReq, req) => {
// 添加网关头部
proxyReq.setHeader('X-Gateway-ID', 'api-gateway');
proxyReq.setHeader('X-Request-ID', req.requestId);
proxyReq.setHeader('X-Forwarded-For', req.ip);
},
onProxyRes: (proxyRes, req, res) => {
// 添加响应头
res.setHeader('X-Gateway', 'api-gateway');
res.setHeader('X-Request-ID', req.requestId);
},
onError: (err, req, res) => {
console.error(`Proxy error for ${serviceName}:`, err);
res.status(503).json({
error: 'Service temporarily unavailable',
requestId: req.requestId
});
}
});
}
// 网关健康检查
gatewayHealthCheck() {
return (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
uptime: process.uptime(),
services: {}
};
for (const [serviceName, service] of this.services) {
health.services[serviceName] = {
totalInstances: service.instances.length,
healthyInstances: service.healthyInstances.length,
status: service.healthyInstances.length > 0 ? 'available' : 'unavailable'
};
}
const overallHealthy = Object.values(health.services)
.every(service => service.status === 'available');
health.status = overallHealthy ? 'healthy' : 'degraded';
res.status(overallHealthy ? 200 : 503).json(health);
};
}
// 服务状态查询
servicesStatus() {
return (req, res) => {
const services = {};
for (const [serviceName, service] of this.services) {
services[serviceName] = {
instances: service.instances.map(instance => ({
url: instance.url,
healthy: instance.healthy,
activeConnections: instance.activeConnections || 0,
lastHealthCheck: instance.lastHealthCheck
})),
loadBalancing: service.loadBalancing || 'round-robin'
};
}
res.json({ services });
};
}
// 服务重载
reloadService() {
return async (req, res) => {
const serviceName = req.params.name;
try {
// 重新发现服务实例
await this.discoverServiceInstances(serviceName);
res.json({
message: `Service ${serviceName} reloaded successfully`
});
} catch (error) {
res.status(500).json({
error: `Failed to reload service ${serviceName}: ${error.message}`
});
}
};
}
// 错误处理
errorHandler() {
return (err, req, res, next) => {
console.error(`Gateway error [${req.requestId}]:`, err);
res.status(500).json({
error: 'Gateway internal error',
requestId: req.requestId,
timestamp: new Date().toISOString()
});
};
}
// 注册服务
registerService(name, instances, options = {}) {
this.services.set(name, {
instances: instances.map(url => ({
url,
healthy: true,
activeConnections: 0,
weight: options.weight || 1
})),
healthyInstances: [],
loadBalancing: options.loadBalancing || 'round-robin',
currentIndex: 0
});
console.log(`Service registered: ${name} with ${instances.length} instances`);
}
start(port = 3000) {
return new Promise((resolve) => {
this.server = this.app.listen(port, () => {
console.log(`API Gateway listening on port ${port}`);
resolve(this.server);
});
});
}
}
2. 服务治理和熔断器:
// 熔断器实现
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.halfOpenMaxCalls = options.halfOpenMaxCalls || 3;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = null;
this.nextAttempt = null;
this.metrics = {
totalRequests: 0,
successCount: 0,
failureCount: 0,
timeoutCount: 0
};
}
async execute(operation, fallback = null) {
if (!this.canExecute()) {
if (fallback) {
return await fallback();
}
throw new Error('Circuit breaker is OPEN');
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
if (fallback) {
return await fallback();
}
throw error;
}
}
canExecute() {
if (this.state === 'CLOSED') {
return true;
}
if (this.state === 'OPEN') {
if (Date.now() > this.nextAttempt) {
this.state = 'HALF_OPEN';
this.successCount = 0;
return true;
}
return false;
}
if (this.state === 'HALF_OPEN') {
return this.successCount < this.halfOpenMaxCalls;
}
return false;
}
onSuccess() {
this.metrics.totalRequests++;
this.metrics.successCount++;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= this.halfOpenMaxCalls) {
this.reset();
}
} else {
this.failureCount = 0;
}
}
onFailure() {
this.metrics.totalRequests++;
this.metrics.failureCount++;
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.trip();
}
}
reset() {
this.state = 'CLOSED';
this.failureCount = 0;
this.successCount = 0;
this.nextAttempt = null;
}
trip() {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.resetTimeout;
}
getMetrics() {
const successRate = this.metrics.totalRequests > 0
? (this.metrics.successCount / this.metrics.totalRequests * 100).toFixed(2)
: 0;
return {
state: this.state,
successRate: parseFloat(successRate),
...this.metrics,
lastFailureTime: this.lastFailureTime,
nextAttempt: this.nextAttempt
};
}
}
// 服务治理管理器
class ServiceGovernance {
constructor() {
this.circuitBreakers = new Map();
this.serviceMetrics = new Map();
this.rateLimiters = new Map();
}
// 获取或创建熔断器
getCircuitBreaker(serviceName, options = {}) {
if (!this.circuitBreakers.has(serviceName)) {
this.circuitBreakers.set(serviceName, new CircuitBreaker(options));
}
return this.circuitBreakers.get(serviceName);
}
// 服务调用包装器
async callService(serviceName, operation, options = {}) {
const circuitBreaker = this.getCircuitBreaker(serviceName, options.circuitBreaker);
const startTime = Date.now();
try {
const result = await circuitBreaker.execute(
operation,
options.fallback
);
this.recordMetrics(serviceName, 'success', Date.now() - startTime);
return result;
} catch (error) {
this.recordMetrics(serviceName, 'failure', Date.now() - startTime);
throw error;
}
}
// 记录服务指标
recordMetrics(serviceName, status, duration) {
if (!this.serviceMetrics.has(serviceName)) {
this.serviceMetrics.set(serviceName, {
totalCalls: 0,
successCalls: 0,
failureCalls: 0,
totalDuration: 0,
minDuration: Infinity,
maxDuration: 0
});
}
const metrics = this.serviceMetrics.get(serviceName);
metrics.totalCalls++;
metrics.totalDuration += duration;
if (status === 'success') {
metrics.successCalls++;
} else {
metrics.failureCalls++;
}
metrics.minDuration = Math.min(metrics.minDuration, duration);
metrics.maxDuration = Math.max(metrics.maxDuration, duration);
}
// 获取服务治理仪表板数据
getDashboardData() {
const services = {};
for (const [serviceName, circuitBreaker] of this.circuitBreakers) {
const metrics = this.serviceMetrics.get(serviceName);
const cbMetrics = circuitBreaker.getMetrics();
services[serviceName] = {
circuitBreaker: cbMetrics,
performance: metrics ? {
...metrics,
averageDuration: metrics.totalCalls > 0
? (metrics.totalDuration / metrics.totalCalls).toFixed(2)
: 0,
successRate: metrics.totalCalls > 0
? ((metrics.successCalls / metrics.totalCalls) * 100).toFixed(2)
: 0
} : null
};
}
return { services };
}
// 重置服务指标
resetMetrics(serviceName) {
this.serviceMetrics.delete(serviceName);
const circuitBreaker = this.circuitBreakers.get(serviceName);
if (circuitBreaker) {
circuitBreaker.reset();
}
}
}
// 使用示例
const gateway = new APIGateway({
port: 3000,
services: {
'user-service': ['http://user-1:3001', 'http://user-2:3002'],
'order-service': ['http://order-1:3003', 'http://order-2:3004']
}
});
// 注册服务
gateway.registerService('user-service', [
'http://user-1:3001',
'http://user-2:3002'
], {
loadBalancing: 'round-robin'
});
gateway.registerService('order-service', [
'http://order-1:3003',
'http://order-2:3004'
], {
loadBalancing: 'least-connections'
});
// 启动网关
gateway.start(3000);
API网关和服务治理最佳实践:
What are the applications and optimization strategies of Node.js in Serverless architecture?
What are the applications and optimization strategies of Node.js in Serverless architecture?
考察点:Serverless架构设计。
答案:
Node.js在Serverless架构中具有冷启动快、内存占用低的优势。通过合理的函数设计、依赖优化、缓存策略等可以最大化Serverless的效益。
1. Serverless函数优化:
// AWS Lambda优化示例
const AWS = require('aws-sdk');
// 全局变量复用连接
let dbConnection = null;
let s3Client = null;
// 初始化函数
function initializeResources() {
if (!dbConnection) {
dbConnection = new AWS.RDS.DataService({
resourceArn: process.env.DB_CLUSTER_ARN,
secretArn: process.env.DB_SECRET_ARN,
database: process.env.DB_NAME
});
}
if (!s3Client) {
s3Client = new AWS.S3({
region: process.env.AWS_REGION
});
}
}
// 优化的Lambda处理函数
exports.handler = async (event, context) => {
// 设置上下文超时
context.callbackWaitsForEmptyEventLoop = false;
const startTime = Date.now();
try {
// 初始化资源(复用连接)
initializeResources();
// 解析请求
const { httpMethod, path, body, headers } = event;
const requestData = body ? JSON.parse(body) : {};
// 路由处理
const result = await routeHandler(httpMethod, path, requestData, headers);
// 记录执行时间
const executionTime = Date.now() - startTime;
console.log(`Function executed in ${executionTime}ms`);
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
body: JSON.stringify({
success: true,
data: result,
executionTime
})
};
} catch (error) {
console.error('Lambda execution error:', error);
return {
statusCode: 500,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
success: false,
error: error.message,
requestId: context.awsRequestId
})
};
}
};
// 路由处理器
async function routeHandler(method, path, data, headers) {
switch (true) {
case method === 'GET' && path === '/users':
return await getUsers();
case method === 'POST' && path === '/users':
return await createUser(data);
case method === 'GET' && path.startsWith('/users/'):
const userId = path.split('/')[2];
return await getUser(userId);
default:
throw new Error(`Route not found: ${method} ${path}`);
}
}
// 数据库操作优化
async function getUsers() {
const cacheKey = 'users:all';
// 检查缓存
const cached = await getFromCache(cacheKey);
if (cached) {
return cached;
}
// 数据库查询
const params = {
resourceArn: process.env.DB_CLUSTER_ARN,
secretArn: process.env.DB_SECRET_ARN,
database: process.env.DB_NAME,
sql: 'SELECT id, name, email FROM users WHERE active = true'
};
const result = await dbConnection.executeStatement(params).promise();
const users = result.records.map(formatRecord);
// 缓存结果
await setCache(cacheKey, users, 300); // 5分钟缓存
return users;
}
// 记录格式化
function formatRecord(record) {
return {
id: record[0].longValue,
name: record[1].stringValue,
email: record[2].stringValue
};
}
2. 冷启动优化:
// 减少依赖和包大小
const middy = require('@middy/core');
const jsonBodyParser = require('@middy/http-json-body-parser');
const httpErrorHandler = require('@middy/http-error-handler');
const validator = require('@middy/validator');
// 轻量级工具函数
const utils = {
// 避免使用重量级库如moment.js,使用原生Date
formatDate: (date) => {
return new Date(date).toISOString();
},
// 轻量级UUID生成
generateId: () => {
return Date.now().toString(36) + Math.random().toString(36).substr(2);
},
// 简单的验证函数
validateEmail: (email) => {
return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
}
};
// 预热函数
const warmupHandler = async (event) => {
if (event.source === 'serverless-plugin-warmup') {
console.log('Function warmed up');
return { statusCode: 200, body: 'warmed' };
}
};
// 缓存层实现
class FunctionCache {
constructor() {
this.cache = new Map();
this.ttl = new Map();
}
set(key, value, ttlSeconds = 300) {
this.cache.set(key, value);
this.ttl.set(key, Date.now() + ttlSeconds * 1000);
// 清理过期缓存
setTimeout(() => {
this.delete(key);
}, ttlSeconds * 1000);
}
get(key) {
if (this.ttl.has(key) && Date.now() > this.ttl.get(key)) {
this.delete(key);
return null;
}
return this.cache.get(key) || null;
}
delete(key) {
this.cache.delete(key);
this.ttl.delete(key);
}
clear() {
this.cache.clear();
this.ttl.clear();
}
}
const functionCache = new FunctionCache();
// 优化的处理函数
const optimizedHandler = async (event, context) => {
// 检查预热请求
if (event.source === 'serverless-plugin-warmup') {
return warmupHandler(event);
}
// 提取路径参数
const { id } = event.pathParameters || {};
const { action } = event.queryStringParameters || {};
try {
switch (action) {
case 'get':
return await getCachedData(id);
case 'process':
return await processData(event.body ? JSON.parse(event.body) : {});
default:
return {
statusCode: 400,
body: JSON.stringify({ error: 'Invalid action' })
};
}
} catch (error) {
console.error('Handler error:', error);
throw error;
}
};
async function getCachedData(id) {
const cacheKey = `data:${id}`;
// 检查本地缓存
let data = functionCache.get(cacheKey);
if (!data) {
// 从数据源获取
data = await fetchDataFromSource(id);
functionCache.set(cacheKey, data, 600); // 10分钟缓存
}
return {
statusCode: 200,
body: JSON.stringify({
data,
cached: !!functionCache.get(cacheKey)
})
};
}
// 使用Middy中间件
const handler = middy(optimizedHandler)
.use(jsonBodyParser())
.use(validator({
inputSchema: {
type: 'object',
properties: {
pathParameters: {
type: 'object',
properties: {
id: { type: 'string', pattern: '^[a-zA-Z0-9-_]+$' }
}
}
}
}
}))
.use(httpErrorHandler());
module.exports = { handler };
3. Serverless架构模式:
// 事件驱动架构
class EventProcessor {
constructor() {
this.processors = new Map();
}
// 注册事件处理器
register(eventType, processor) {
if (!this.processors.has(eventType)) {
this.processors.set(eventType, []);
}
this.processors.get(eventType).push(processor);
}
// 处理事件
async process(event) {
const { eventType, data, metadata } = event;
const processors = this.processors.get(eventType) || [];
if (processors.length === 0) {
console.warn(`No processors found for event type: ${eventType}`);
return [];
}
// 并行处理
const results = await Promise.allSettled(
processors.map(processor => processor(data, metadata))
);
return results.map((result, index) => ({
processor: processors[index].name || `processor-${index}`,
success: result.status === 'fulfilled',
result: result.status === 'fulfilled' ? result.value : result.reason
}));
}
}
// SQS消息处理函数
exports.sqsHandler = async (event) => {
const eventProcessor = new EventProcessor();
// 注册处理器
eventProcessor.register('user.created', async (data) => {
await sendWelcomeEmail(data.userId, data.email);
await updateUserStats('new_user');
});
eventProcessor.register('order.completed', async (data) => {
await generateInvoice(data.orderId);
await updateInventory(data.items);
await sendOrderConfirmation(data.userId, data.orderId);
});
const processResults = [];
// 处理批量消息
for (const record of event.Records) {
try {
const message = JSON.parse(record.body);
const results = await eventProcessor.process(message);
processResults.push({
messageId: record.messageId,
success: true,
results
});
} catch (error) {
console.error('Message processing failed:', error);
processResults.push({
messageId: record.messageId,
success: false,
error: error.message
});
}
}
return {
batchItemFailures: processResults
.filter(result => !result.success)
.map(result => ({ itemIdentifier: result.messageId }))
};
};
// S3事件处理
exports.s3Handler = async (event) => {
const results = [];
for (const record of event.Records) {
const bucket = record.s3.bucket.name;
const key = record.s3.object.key;
const eventName = record.eventName;
try {
if (eventName.startsWith('ObjectCreated')) {
await processUploadedFile(bucket, key);
} else if (eventName.startsWith('ObjectRemoved')) {
await cleanupFileReferences(bucket, key);
}
results.push({ bucket, key, success: true });
} catch (error) {
console.error(`Failed to process ${bucket}/${key}:`, error);
results.push({ bucket, key, success: false, error: error.message });
}
}
return { processedFiles: results };
};
async function processUploadedFile(bucket, key) {
// 文件处理逻辑
if (key.endsWith('.jpg') || key.endsWith('.png')) {
await generateThumbnails(bucket, key);
} else if (key.endsWith('.csv')) {
await processDataFile(bucket, key);
}
}
4. 成本优化和监控:
// 成本优化工具
class ServerlessCostOptimizer {
constructor() {
this.metrics = {
invocations: 0,
totalDuration: 0,
totalMemoryUsed: 0,
coldStarts: 0
};
}
// 记录函数调用指标
recordInvocation(duration, memoryUsed, isColdStart = false) {
this.metrics.invocations++;
this.metrics.totalDuration += duration;
this.metrics.totalMemoryUsed += memoryUsed;
if (isColdStart) {
this.metrics.coldStarts++;
}
}
// 计算成本估算
calculateCosts(pricePerGBSecond = 0.0000166667, pricePerInvocation = 0.0000002) {
const gbSeconds = (this.metrics.totalMemoryUsed / 1024) * (this.metrics.totalDuration / 1000);
return {
computeCost: gbSeconds * pricePerGBSecond,
invocationCost: this.metrics.invocations * pricePerInvocation,
totalCost: (gbSeconds * pricePerGBSecond) + (this.metrics.invocations * pricePerInvocation),
avgDuration: this.metrics.invocations > 0 ? this.metrics.totalDuration / this.metrics.invocations : 0,
coldStartRate: this.metrics.invocations > 0 ? (this.metrics.coldStarts / this.metrics.invocations * 100).toFixed(2) : 0
};
}
// 获取优化建议
getOptimizationSuggestions() {
const costs = this.calculateCosts();
const suggestions = [];
if (costs.coldStartRate > 20) {
suggestions.push({
type: 'cold-start',
message: 'High cold start rate detected. Consider using provisioned concurrency or warming strategies.',
priority: 'high'
});
}
if (costs.avgDuration > 5000) {
suggestions.push({
type: 'duration',
message: 'Average execution time is high. Consider optimizing code or increasing memory allocation.',
priority: 'medium'
});
}
if (this.metrics.totalMemoryUsed / this.metrics.invocations > 512) {
suggestions.push({
type: 'memory',
message: 'High memory usage detected. Consider optimizing memory allocation or code efficiency.',
priority: 'medium'
});
}
return suggestions;
}
}
// 监控装饰器
function withMonitoring(handler) {
const optimizer = new ServerlessCostOptimizer();
return async (event, context) => {
const startTime = Date.now();
const memoryLimit = parseInt(context.memoryLimitInMB);
const isColdStart = !global.isWarm;
global.isWarm = true;
try {
const result = await handler(event, context);
const duration = Date.now() - startTime;
optimizer.recordInvocation(duration, memoryLimit, isColdStart);
// 定期输出优化建议
if (optimizer.metrics.invocations % 100 === 0) {
const suggestions = optimizer.getOptimizationSuggestions();
console.log('Optimization suggestions:', suggestions);
}
return result;
} catch (error) {
const duration = Date.now() - startTime;
optimizer.recordInvocation(duration, memoryLimit, isColdStart);
throw error;
}
};
}
// 使用示例
const monitoredHandler = withMonitoring(async (event, context) => {
// 业务逻辑
return { statusCode: 200, body: 'Success' };
});
module.exports = { handler: monitoredHandler };
Serverless最佳实践: