优化下载

This commit is contained in:
lushunming 2025-11-03 16:07:31 +08:00
parent d6ee012dff
commit 85bff735dd
1 changed files with 61 additions and 55 deletions

View File

@ -68,92 +68,98 @@ object ProxyServer {
response: AdvancedHttpServer.Response
) {
runBlocking {
// 创建固定大小的通道队列保持16个并发下载
val downloadChannel = Channel<Pair<Long, Long>>(16)
val dataChannel = Channel<Pair<Long, ByteArray>>(16)
val channels = List(THREAD_NUM) { Channel<ByteArray>() }
SpiderDebug.log("--proxyAsync url: $url")
SpiderDebug.log("--proxyAsync headers: ${Json.toJson(headers)}")
try {
// 解析范围请求
SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM")
var rangeHeader = request.headers["Range"]
//没有range头
if (rangeHeader.isNullOrEmpty()) {
// 处理初始请求
rangeHeader = "bytes=0-"
}
headers.toMutableMap().apply {
put("Range", rangeHeader)
}
val (startPoint, endPoint) = parseRangePoint(rangeHeader)
// 解析范围请求
val (startPoint, endPoint) = parseRangePoint(
rangeHeader
)
// 获取内容信息
//缓存response header
var info = infos[url]
if (info == null) {
info = getInfo(url, headers)
infos[url] = info
}
SpiderDebug.log("startPoint: $startPoint; endPoint: $endPoint")
val contentLength = getContentLength(info)
val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint
SpiderDebug.log("startPoint: $startPoint; endPoint: $finalEndPoint")
SpiderDebug.log("contentLength: $contentLength")
// 设置响应头
val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint
response.setContentType("text/html")
response.setHeader("Connection", "keep-alive")
response.setHeader("Content-Length", (finalEndPoint - startPoint + 1).toString())
response.setHeader("Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength")
response.setHeader(
"Content-Length", (finalEndPoint - startPoint + 1).toString()
)
response.setHeader(
"Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength"
)
info["Content-Type"]?.get(0)?.let { response.setHeader("Content-Type", it) }
response.setStatusCode(206)
response.start()
// 使用流式响应
// 启动16个固定的下载协程
val downloadJobs = List(16) {
CoroutineScope(Dispatchers.IO).launch {
for (range in downloadChannel) {
val data = getVideoStream(range.first, range.second, url, headers)
dataChannel.send(Pair(range.first, data))
}
}
}
// 启动数据发送协程
val sendJob = CoroutineScope(Dispatchers.IO).launch {
val receivedData = mutableMapOf<Long, ByteArray>()
var nextExpectedStart = startPoint
for (dataPair in dataChannel) {
receivedData[dataPair.first] = dataPair.second
// 按顺序发送数据
while (receivedData.containsKey(nextExpectedStart)) {
val data = receivedData.remove(nextExpectedStart)!!
SpiderDebug.log("Sending chunk: ${data.size} bytes at $nextExpectedStart")
response.write(data)
nextExpectedStart += data.size
}
}
}
// 分发下载任务
var currentStart = startPoint
// 启动生产者协程下载数据
val producerJob = mutableListOf<Job>()
while (currentStart <= finalEndPoint) {
producerJob.clear()
// 创建通道用于接收数据块
for (i in 0 until THREAD_NUM) {
if (currentStart > finalEndPoint) break
val chunkStart = currentStart
val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint)
downloadChannel.send(Pair(currentStart, chunkEnd))
producerJob += CoroutineScope(Dispatchers.IO).launch {
// 异步下载数据块
val data = getVideoStream(chunkStart, chunkEnd, url, headers)
channels[i].send(data)
}
currentStart = chunkEnd + 1
}
for ((index, _) in producerJob.withIndex()) {
// 关闭下载通道并等待所有下载完成
downloadChannel.close()
downloadJobs.forEach { it.join() }
// 关闭数据通道
dataChannel.close()
sendJob.join()
val data = channels[index].receive()
SpiderDebug.log("Received chunk: ${data.size} bytes")
response.write(data)
}
}
channels.forEach { it.close() }
} catch (e: Exception) {
SpiderDebug.log("proxyAsync error: ${e.message}")
e.printStackTrace()
response.write("proxyAsync error: ${e.message}")
} finally {
// channels.forEach { it.close() }
}
}
}