优化下载

This commit is contained in:
lushunming 2025-11-03 16:01:40 +08:00
parent bc388773d0
commit 1e7e846c5b
1 changed files with 56 additions and 61 deletions

View File

@ -68,101 +68,96 @@ object ProxyServer {
response: AdvancedHttpServer.Response response: AdvancedHttpServer.Response
) { ) {
runBlocking { runBlocking {
val channels = List(THREAD_NUM) { Channel<ByteArray>() } // 创建固定大小的通道队列保持16个并发下载
val downloadChannel = Channel<Pair<Long, Long>>(16)
val dataChannel = Channel<Pair<Long, ByteArray>>(16)
SpiderDebug.log("--proxyAsync url: $url") SpiderDebug.log("--proxyAsync url: $url")
SpiderDebug.log("--proxyAsync headers: ${Json.toJson(headers)}") SpiderDebug.log("--proxyAsync headers: ${Json.toJson(headers)}")
try { try {
SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM") // 解析范围请求
var rangeHeader = request.headers["Range"] var rangeHeader = request.headers["Range"]
//没有range头
if (rangeHeader.isNullOrEmpty()) { if (rangeHeader.isNullOrEmpty()) {
// 处理初始请求
rangeHeader = "bytes=0-" rangeHeader = "bytes=0-"
} }
headers.toMutableMap().apply {
put("Range", rangeHeader)
}
// 解析范围请求 val (startPoint, endPoint) = parseRangePoint(rangeHeader)
val (startPoint, endPoint) = parseRangePoint(
rangeHeader
)
//缓存response header // 获取内容信息
var info = infos[url] var info = infos[url]
if (info == null) { if (info == null) {
info = getInfo(url, headers) info = getInfo(url, headers)
infos[url] = info infos[url] = info
} }
SpiderDebug.log("startPoint: $startPoint; endPoint: $endPoint")
val contentLength = getContentLength(info) val contentLength = getContentLength(info)
SpiderDebug.log("contentLength: $contentLength")
val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint
SpiderDebug.log("startPoint: $startPoint; endPoint: $finalEndPoint")
SpiderDebug.log("contentLength: $contentLength")
// 设置响应头
response.setContentType("text/html") response.setContentType("text/html")
response.setHeader("Connection", "keep-alive") response.setHeader("Connection", "keep-alive")
response.setHeader( response.setHeader("Content-Length", (finalEndPoint - startPoint + 1).toString())
"Content-Length", (finalEndPoint - startPoint + 1).toString() response.setHeader("Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength")
)
response.setHeader(
"Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength"
)
info["Content-Type"]?.get(0)?.let { response.setHeader("Content-Type", it) } info["Content-Type"]?.get(0)?.let { response.setHeader("Content-Type", it) }
response.setStatusCode(206) response.setStatusCode(206)
response.start() response.start()
// 使用流式响应
var currentStart = startPoint
// 启动生产者协程下载数据
val producerJob = mutableListOf<Job>()
while (currentStart <= finalEndPoint) {
producerJob.clear()
// 创建通道用于接收数据块
for (i in 0 until THREAD_NUM) {
if (currentStart > finalEndPoint) break
val chunkStart = currentStart
val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint)
producerJob += CoroutineScope(Dispatchers.IO).launch {
// 异步下载数据块
val data = getVideoStream(chunkStart, chunkEnd, url, headers)
channels[i].send(data)
// 启动16个固定的下载协程
val downloadJobs = List(16) {
CoroutineScope(Dispatchers.IO).launch {
for (range in downloadChannel) {
val data = getVideoStream(range.first, range.second, url, headers)
dataChannel.send(Pair(range.first, data))
} }
}
}
// 启动数据发送协程
val sendJob = CoroutineScope(Dispatchers.IO).launch {
val receivedData = mutableMapOf<Long, ByteArray>()
var nextExpectedStart = startPoint
for (dataPair in dataChannel) {
receivedData[dataPair.first] = dataPair.second
// 按顺序发送数据
while (receivedData.containsKey(nextExpectedStart)) {
val data = receivedData.remove(nextExpectedStart)!!
SpiderDebug.log("Sending chunk: ${data.size} bytes at $nextExpectedStart")
response.write(data)
nextExpectedStart += data.size
}
}
}
// 分发下载任务
var currentStart = startPoint
while (currentStart <= finalEndPoint) {
val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint)
downloadChannel.send(Pair(currentStart, chunkEnd))
currentStart = chunkEnd + 1 currentStart = chunkEnd + 1
} }
for ((index, _) in producerJob.withIndex()) {
val data = channels[index].receive() // 关闭下载通道并等待所有下载完成
SpiderDebug.log("Received chunk: ${data.size} bytes") downloadChannel.close()
response.write(data) downloadJobs.forEach { it.join() }
}
// 关闭数据通道
dataChannel.close()
sendJob.join()
}
channels.forEach { it.close() }
} catch (e: Exception) { } catch (e: Exception) {
SpiderDebug.log("proxyAsync error: ${e.message}") SpiderDebug.log("proxyAsync error: ${e.message}")
e.printStackTrace() e.printStackTrace()
response.write("proxyAsync error: ${e.message}") response.write("proxyAsync error: ${e.message}")
}
}
}
} finally {
// channels.forEach { it.close() }
}
}
}
private fun queryToMap(query: String?): Map<String, String>? { private fun queryToMap(query: String?): Map<String, String>? {
if (query == null) { if (query == null) {