import fs from 'fs'
import path from 'path'
import axios from 'axios'
import { Throttle } from 'stream-throttle'
function getFileNameFromUrl(url) {
const nameForpathParse = path.parse(url)
const base = nameForpathParse.base
return base
// const pathname = new URL(url).pathname // /dir/file.zip
// return path.basename(pathname) // file.zip 这种方式中文文件名显示乱码异常
}
const rateBytesPerSecond: number = 1024 * 1024 * 3 // 3 MB/s
const controllerMap = new Map()
/**
* 限速下载
* 1:让其他电脑可以下载;不会互相抢爆
* 2:防止下载过快导致服务器压力过大
* 3:保护低速磁盘 / U 盘
* 4:云厂商按「出流量」计费;突发 1 GB 立刻计费
* @param {string} url 文件下载地址
* @param {string} [destDir='.'] 保存目录
* @param {number} [rateBps=1024*1024] 限速,单位 byte/s,默认 3 MB/s
* @use 必须在外层 try catch 块中调用或者catch 错误捕获
*/
export async function downloadThrottled(parmas, onProgress = '' as any) {
const { url, destDir = '', rateBps = rateBytesPerSecond, id } = parmas
try {
const fileName = parmas.fileName || getFileNameFromUrl(url)
const outPath = path.join(destDir, fileName)
/* if (fs.existsSync(outPath)) {
return {
code: 1,
msg: '文件已存在!',
data: outPath
}
} */
const controller = new AbortController()
const { signal } = controller
if (controllerMap.has(id)) {
controller.abort()
return {
code: 0,
msg: '下载进度已存在!',
data: null
}
}
if (!fs.existsSync(destDir)) {
fs.mkdirSync(destDir, { recursive: true })
}
controllerMap.set(id, controller)
const response = await axios({
method: 'get',
timeout: 1000 * 60 * 60, // 60分钟超时
url,
responseType: 'stream',
signal, // 将signal传入请求配置中,用于取消请求
onDownloadProgress: (progressEvent) => {
if (progressEvent.total) {
// 计算进度百分比(已下载 / 总大小)
const percent = Math.round((progressEvent.loaded / progressEvent.total) * 100)
// console.log(`下载进度内部:${percent}%`)
onProgress && onProgress?.(percent, progressEvent)
}
}
})
const writeStream = fs.createWriteStream(outPath)
const throttle = new Throttle({ rate: rateBps })
response.data.pipe(throttle).pipe(writeStream)
console.time(`下载时间`)
return new Promise((resolve, reject) => {
writeStream.on('finish', () => {
console.log(`下载完成:${outPath}`)
console.timeEnd(`下载时间`)
controllerMap.delete(id)
resolve({
code: 1,
msg: '下载成功!',
data: outPath
})
})
writeStream.on('error', (error) => {
logger.error(`Stream下载失败!${JSON.stringify(error)}`)
controllerMap.delete(id)
reject({
code: 0,
msg: error.message,
data: null
})
})
})
} catch (error) {
logger.error(`下载失败!${JSON.stringify(error)}`)
controllerMap.delete(id)
return {
code: 0,
msg: '下载失败!',
data: null
}
}
}
downloadThrottled({ url, destDir, fileName }, (percent, progressEvent) => {
updateDownloadPercentage({ percent })
})
.then(async response => {
console.log('下载完成,开始处理流:', response)
if (response && response.code === 1) {
}
})
.catch(err => {
console.error('处理失败:', err)
})
import fs from 'fs'
import path from 'path'
import { Throttle } from 'stream-throttle'
const LIMIT = 3 * 1024 * 1024 // 3 MB/s
class StreamServer {
async download(req, res, next) {
try {
const { clientVersion, fileName = 'client.zip', type = 1 } = req.query
console.log(` 类型: ${type}`)
const destDir = '被下载的数据目录'
const FILE = path.join(destDir, fileName)
// 1. 检查文件是否存在
if (!fs.existsSync(FILE)) {
res.statusCode = 404
return res.end('File Not Found')
}
// 2. 获取文件信息(大小、类型)
const stats = fs.statSync(FILE)
const fileSize = stats.size
const mimeType = 'application/zip' // 针对zip文件的MIME类型
// 3. 设置完整的响应头(关键!)
res.writeHead(200, {
'Content-Type': mimeType,
'Content-Length': fileSize, // 告诉浏览器文件总大小
'Content-Disposition': `attachment; filename="${fileName}"`, // 指定下载文件名
Connection: 'close' // 明确下载完成后关闭连接
})
const readStream = fs.createReadStream(FILE)
const throttle = new Throttle({ rate: LIMIT })
readStream
.on('error', () => {
res.statusCode = 404
res.end('Not Found')
})
.pipe(throttle)
.pipe(res)
.on('finish', () => {
// 流传输完成(所有数据已发送)
console.log('Download completed successfully')
res.end()
})
.on('close', () => {
console.log(`当前并发下载数`)
})
} catch (error) {
next(error)
}
}
}
export const streamServer = new StreamServer()
/* const http = require('http')
const fs = require('fs')
const os = require('os')
const path = require('path')
const { Throttle } = require('stream-throttle')
const url = require('url')
let active = 0 // 当前并发下载数
const server = http.createServer((req, res) => {
const WORKERS = os.cpus().length
const filePath = path.join(__dirname, 'files', 'app.asar')
const readStream = fs.createReadStream(filePath)
const file = url.parse(req.url, true).pathname.replace('/download/', '')
console.log(`正在下载 ${file}`)
// res.setHeader('Content-Disposition', `attachment; filename="${file}"`)
active++
console.log(`当前并发下载数: ${active}`)
// 设置限速(100KB/s)
const throttle = new Throttle({ rate: 1024 * 1024 * 2 })
// 管道流:文件流 → 限速流 → 响应流
readStream
.on('error', () => (res.statusCode = 404 && res.end('Not Found')))
.pipe(throttle)
.pipe(res)
.on('close', () => {
// 连接关闭/下载完成
active--
console.log(`当前并发下载数: ${active}`)
})
})
server.listen(7003)
console.log('Server running at http://localhost:7003') */
// cluster-server.js
const cluster = require('cluster')
const os = require('os')
const http = require('http')
const fs = require('fs')
const path = require('path')
const { Throttle } = require('stream-throttle')
const WORKERS = os.cpus().length // 想跑几个进程就改几
const FILE = path.join(__dirname, 'files', 'app.asar')
const LIMIT = 2 * 1024 * 1024 // 2 MB/s
// 多进程模式
if (cluster.isMaster) {
console.log(`Master ${process.pid} 启动,准备 fork ${WORKERS} 个工作进程`)
for (let i = 0; i < WORKERS; i++) cluster.fork()
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} 退出,重启中…`)
cluster.fork()
})
} else {
// 子进程代码
http
.createServer((req, res) => {
res.setHeader('Content-Disposition', 'attachment; filename="app.asar"')
const readStream = fs.createReadStream(FILE)
const throttle = new Throttle({ rate: LIMIT })
readStream
.on('error', () => (res.statusCode = 404 && res.end('Not Found')))
.pipe(throttle)
.pipe(res)
.on('close', () => {
// 连接关闭/下载完成
active--
console.log(`当前并发下载数: ${active}`)
})
})
.listen(7003, () => {
console.log(`Worker ${process.pid} 已监听 7003`)
})
}
/**
* main.js
*/
const { Worker } = require('worker_threads')
const http = require('http')
const server = http.createServer()
// 简单轮询把请求分发给 Worker
let idx = 0
server.on('request', (req, res) => {
new Worker('./downloadWorker.js', { workerData: { req, res } }) // 每次一个短生命周期 Worker
})
server.listen(7003, () => console.log('主线程监听 7003'))
/**
* downloadWorker.js
*/
const { parentPort, workerData } = require('worker_threads')
const fs = require('fs')
const path = require('path')
const { Throttle } = require('stream-throttle')
const FILE = path.join(__dirname, 'files', 'app.asar')
const LIMIT = 2 * 1024 * 1024
// 把主线程传进来的 req/res 转成真实对象
const { req, res } = workerData
res.setHeader('Content-Disposition', 'attachment; filename="app.asar"')
fs.createReadStream(FILE)
.on('error', () => {
res.statusCode = 404
res.end('Not Found')
})
.pipe(new Throttle({ rate: LIMIT }))
.pipe(res)
.on('close', () => parentPort.close()) // Worker 结束
const http = require('http')
const fs = require('fs')
const path = require('path')
// 限速配置(单位:字节/秒,例如 1024 * 100 = 100KB/s)
const SPEED_LIMIT = 1024 * 100
// 每次发送的数据块大小(建议与限速匹配,如 1KB),(建议 1KB~8KB,过小会增加开销,过大则限速精度降低)。
const CHUNK_SIZE = 1024
// 计算每块数据的发送间隔(毫秒),计算公式为 (块大小 / 限速) * 1000,确保单位时间内发送的数据量不超过限制。
const INTERVAL = (CHUNK_SIZE / SPEED_LIMIT) * 1000
// 创建 HTTP 服务器
const server = http.createServer((req, res) => {
// 要下载的文件路径
const filePath = path.join(__dirname, 'files', 'app.asar')
// 检查文件是否存在
fs.access(filePath, fs.constants.F_OK, err => {
if (err) {
res.writeHead(404, { 'Content-Type': 'text/plain' })
return res.end('文件不存在')
}
// 获取文件信息(用于设置 Content-Length)
fs.stat(filePath, (err, stats) => {
if (err) {
res.writeHead(500, { 'Content-Type': 'text/plain' })
return res.end('获取文件信息失败')
}
// 设置响应头(告诉浏览器这是附件,触发下载)
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename="${path.basename(
filePath
)}"`,
'Content-Length': stats.size
})
// 创建文件可读流
const readStream = fs.createReadStream(filePath, {
highWaterMark: CHUNK_SIZE
})
let isPaused = false
// 流数据事件:每次读取到一块数据
readStream.on('data', chunk => {
// 如果当前处于暂停状态,不发送数据
if (isPaused) return
// 发送数据块
const canContinue = res.write(chunk)
if (!canContinue) {
// 如果缓冲区满了,暂停读取,等待 drain 事件
readStream.pause()
isPaused = true
res.once('drain', () => {
isPaused = false
readStream.resume()
})
}
// 限速核心:每发送一块数据后延迟 INTERVAL 毫秒
readStream.pause()
setTimeout(() => {
readStream.resume()
}, INTERVAL)
})
// 流结束事件
readStream.on('end', () => {
res.end() // 完成响应
})
// 流错误事件
readStream.on('error', err => {
res.writeHead(500, { 'Content-Type': 'text/plain' })
res.end('文件读取错误')
})
})
})
})
// 启动服务器
const PORT = 7003
server.listen(PORT, () => {
console.log(
`服务器运行在 http://localhost:${PORT},下载限速为 ${
SPEED_LIMIT / 1024
}KB/s`
)
})