node

文件流

# 1. 限速下载

import fs from 'fs'
import path from 'path'
import axios from 'axios'
import { Throttle } from 'stream-throttle'

function getFileNameFromUrl(url) {
  const nameForpathParse = path.parse(url)
  const base = nameForpathParse.base
  return base
  // const pathname = new URL(url).pathname //  /dir/file.zip
  // return path.basename(pathname) //  file.zip 这种方式中文文件名显示乱码异常
}

const rateBytesPerSecond: number = 1024 * 1024 * 3 // 3 MB/s
const controllerMap = new Map()
/**
 * 限速下载
 * 1:让其他电脑可以下载;不会互相抢爆
 * 2:防止下载过快导致服务器压力过大
 * 3:保护低速磁盘 / U 盘
 * 4:云厂商按「出流量」计费;突发 1 GB 立刻计费
 * @param {string} url           文件下载地址
 * @param {string} [destDir='.'] 保存目录
 * @param {number} [rateBps=1024*1024] 限速,单位 byte/s,默认 3 MB/s
 * @use 必须在外层 try catch 块中调用或者catch 错误捕获
 */
export async function downloadThrottled(parmas, onProgress = '' as any) {
  const { url, destDir = '', rateBps = rateBytesPerSecond, id } = parmas
  try {
    const fileName = parmas.fileName || getFileNameFromUrl(url)
    const outPath = path.join(destDir, fileName)

    /* if (fs.existsSync(outPath)) {
      return {
        code: 1,
        msg: '文件已存在!',
        data: outPath
      }
    } */

    const controller = new AbortController()
    const { signal } = controller
    if (controllerMap.has(id)) {
      controller.abort()
      return {
        code: 0,
        msg: '下载进度已存在!',
        data: null
      }
    }

    if (!fs.existsSync(destDir)) {
      fs.mkdirSync(destDir, { recursive: true })
    }
    controllerMap.set(id, controller)

    const response = await axios({
      method: 'get',
      timeout: 1000 * 60 * 60, // 60分钟超时
      url,
      responseType: 'stream',
      signal, // 将signal传入请求配置中,用于取消请求

      onDownloadProgress: (progressEvent) => {
        if (progressEvent.total) {
          // 计算进度百分比(已下载 / 总大小)
          const percent = Math.round((progressEvent.loaded / progressEvent.total) * 100)
          // console.log(`下载进度内部:${percent}%`)
          onProgress && onProgress?.(percent, progressEvent)
        }
      }
    })

    const writeStream = fs.createWriteStream(outPath)
    const throttle = new Throttle({ rate: rateBps })

    response.data.pipe(throttle).pipe(writeStream)

    console.time(`下载时间`)
    return new Promise((resolve, reject) => {
      writeStream.on('finish', () => {
        console.log(`下载完成:${outPath}`)
        console.timeEnd(`下载时间`)
        controllerMap.delete(id)
        resolve({
          code: 1,
          msg: '下载成功!',
          data: outPath
        })
      })
      writeStream.on('error', (error) => {
        logger.error(`Stream下载失败!${JSON.stringify(error)}`)
        controllerMap.delete(id)
        reject({
          code: 0,
          msg: error.message,
          data: null
        })
      })
    })
  } catch (error) {
    logger.error(`下载失败!${JSON.stringify(error)}`)
    controllerMap.delete(id)
    return {
      code: 0,
      msg: '下载失败!',
      data: null
    }
  }
}

# 1.2 使用

downloadThrottled({ url, destDir, fileName }, (percent, progressEvent) => {
  updateDownloadPercentage({ percent })
})
  .then(async response => {
    console.log('下载完成,开始处理流:', response)
    if (response && response.code === 1) {
    }
  })
  .catch(err => {
    console.error('处理失败:', err)
  })

# 2. 流文件服务器

import fs from 'fs'
import path from 'path'
import { Throttle } from 'stream-throttle'

const LIMIT = 3 * 1024 * 1024 // 3 MB/s

class StreamServer {
  async download(req, res, next) {
    try {
      const { clientVersion, fileName = 'client.zip', type = 1 } = req.query
      console.log(` 类型: ${type}`)

      const destDir = '被下载的数据目录'
      const FILE = path.join(destDir, fileName)

      // 1. 检查文件是否存在
      if (!fs.existsSync(FILE)) {
        res.statusCode = 404
        return res.end('File Not Found')
      }

      // 2. 获取文件信息(大小、类型)
      const stats = fs.statSync(FILE)
      const fileSize = stats.size
      const mimeType = 'application/zip' // 针对zip文件的MIME类型

      // 3. 设置完整的响应头(关键!)
      res.writeHead(200, {
        'Content-Type': mimeType,
        'Content-Length': fileSize, // 告诉浏览器文件总大小
        'Content-Disposition': `attachment; filename="${fileName}"`, // 指定下载文件名
        Connection: 'close' // 明确下载完成后关闭连接
      })

      const readStream = fs.createReadStream(FILE)
      const throttle = new Throttle({ rate: LIMIT })

      readStream
        .on('error', () => {
          res.statusCode = 404
          res.end('Not Found')
        })
        .pipe(throttle)
        .pipe(res)
        .on('finish', () => {
          // 流传输完成(所有数据已发送)
          console.log('Download completed successfully')
          res.end()
        })
        .on('close', () => {
          console.log(`当前并发下载数`)
        })
    } catch (error) {
      next(error)
    }
  }
}

export const streamServer = new StreamServer()

/* const http = require('http')
const fs = require('fs')
const os = require('os')
const path = require('path')
const { Throttle } = require('stream-throttle')

const url = require('url')

let active = 0 // 当前并发下载数

const server = http.createServer((req, res) => {
  const WORKERS = os.cpus().length
  const filePath = path.join(__dirname, 'files', 'app.asar')
  const readStream = fs.createReadStream(filePath)

  const file = url.parse(req.url, true).pathname.replace('/download/', '')
  console.log(`正在下载 ${file}`)
  // res.setHeader('Content-Disposition', `attachment; filename="${file}"`)

  active++
  console.log(`当前并发下载数: ${active}`)
  // 设置限速(100KB/s)
  const throttle = new Throttle({ rate: 1024 * 1024 * 2 })

  // 管道流:文件流 → 限速流 → 响应流
  readStream
    .on('error', () => (res.statusCode = 404 && res.end('Not Found')))
    .pipe(throttle)
    .pipe(res)
    .on('close', () => {
      // 连接关闭/下载完成
      active--
      console.log(`当前并发下载数: ${active}`)
    })
})

server.listen(7003)
console.log('Server running at http://localhost:7003') */

# 3. 多进程模式流文件服务器

// cluster-server.js
const cluster = require('cluster')
const os = require('os')
const http = require('http')
const fs = require('fs')
const path = require('path')
const { Throttle } = require('stream-throttle')

const WORKERS = os.cpus().length // 想跑几个进程就改几
const FILE = path.join(__dirname, 'files', 'app.asar')
const LIMIT = 2 * 1024 * 1024 // 2 MB/s

// 多进程模式
if (cluster.isMaster) {
  console.log(`Master ${process.pid} 启动,准备 fork ${WORKERS} 个工作进程`)
  for (let i = 0; i < WORKERS; i++) cluster.fork()

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} 退出,重启中…`)
    cluster.fork()
  })
} else {
  // 子进程代码
  http
    .createServer((req, res) => {
      res.setHeader('Content-Disposition', 'attachment; filename="app.asar"')

      const readStream = fs.createReadStream(FILE)
      const throttle = new Throttle({ rate: LIMIT })

      readStream
        .on('error', () => (res.statusCode = 404 && res.end('Not Found')))
        .pipe(throttle)
        .pipe(res)
        .on('close', () => {
          // 连接关闭/下载完成
          active--
          console.log(`当前并发下载数: ${active}`)
        })
    })
    .listen(7003, () => {
      console.log(`Worker ${process.pid} 已监听 7003`)
    })
}

# 4. 多线程模式流文件服务器

/**
 * main.js
 */
const { Worker } = require('worker_threads')
const http = require('http')

const server = http.createServer()

// 简单轮询把请求分发给 Worker
let idx = 0
server.on('request', (req, res) => {
  new Worker('./downloadWorker.js', { workerData: { req, res } }) // 每次一个短生命周期 Worker
})

server.listen(7003, () => console.log('主线程监听 7003'))
/**
 * downloadWorker.js
 */
const { parentPort, workerData } = require('worker_threads')
const fs = require('fs')
const path = require('path')
const { Throttle } = require('stream-throttle')

const FILE = path.join(__dirname, 'files', 'app.asar')
const LIMIT = 2 * 1024 * 1024

// 把主线程传进来的 req/res 转成真实对象
const { req, res } = workerData
res.setHeader('Content-Disposition', 'attachment; filename="app.asar"')

fs.createReadStream(FILE)
  .on('error', () => {
    res.statusCode = 404
    res.end('Not Found')
  })
  .pipe(new Throttle({ rate: LIMIT }))
  .pipe(res)
  .on('close', () => parentPort.close()) // Worker 结束

# 5. 不使用限流插件的流文件服务器

const http = require('http')
const fs = require('fs')
const path = require('path')

// 限速配置(单位:字节/秒,例如 1024 * 100 = 100KB/s)
const SPEED_LIMIT = 1024 * 100
// 每次发送的数据块大小(建议与限速匹配,如 1KB),(建议 1KB~8KB,过小会增加开销,过大则限速精度降低)。
const CHUNK_SIZE = 1024
// 计算每块数据的发送间隔(毫秒),计算公式为 (块大小 / 限速) * 1000,确保单位时间内发送的数据量不超过限制。
const INTERVAL = (CHUNK_SIZE / SPEED_LIMIT) * 1000

// 创建 HTTP 服务器
const server = http.createServer((req, res) => {
  // 要下载的文件路径
  const filePath = path.join(__dirname, 'files', 'app.asar')

  // 检查文件是否存在
  fs.access(filePath, fs.constants.F_OK, err => {
    if (err) {
      res.writeHead(404, { 'Content-Type': 'text/plain' })
      return res.end('文件不存在')
    }

    // 获取文件信息(用于设置 Content-Length)
    fs.stat(filePath, (err, stats) => {
      if (err) {
        res.writeHead(500, { 'Content-Type': 'text/plain' })
        return res.end('获取文件信息失败')
      }

      // 设置响应头(告诉浏览器这是附件,触发下载)
      res.writeHead(200, {
        'Content-Type': 'application/octet-stream',
        'Content-Disposition': `attachment; filename="${path.basename(
          filePath
        )}"`,
        'Content-Length': stats.size
      })

      // 创建文件可读流
      const readStream = fs.createReadStream(filePath, {
        highWaterMark: CHUNK_SIZE
      })
      let isPaused = false

      // 流数据事件:每次读取到一块数据
      readStream.on('data', chunk => {
        // 如果当前处于暂停状态,不发送数据
        if (isPaused) return

        // 发送数据块
        const canContinue = res.write(chunk)
        if (!canContinue) {
          // 如果缓冲区满了,暂停读取,等待 drain 事件
          readStream.pause()
          isPaused = true
          res.once('drain', () => {
            isPaused = false
            readStream.resume()
          })
        }

        // 限速核心:每发送一块数据后延迟 INTERVAL 毫秒
        readStream.pause()
        setTimeout(() => {
          readStream.resume()
        }, INTERVAL)
      })

      // 流结束事件
      readStream.on('end', () => {
        res.end() // 完成响应
      })

      // 流错误事件
      readStream.on('error', err => {
        res.writeHead(500, { 'Content-Type': 'text/plain' })
        res.end('文件读取错误')
      })
    })
  })
})

// 启动服务器
const PORT = 7003
server.listen(PORT, () => {
  console.log(
    `服务器运行在 http://localhost:${PORT},下载限速为 ${
      SPEED_LIMIT / 1024
    }KB/s`
  )
})
上次更新: