diff --git a/app/src/main/java/com/github/catvod/utils/ProxyServer.kt b/app/src/main/java/com/github/catvod/utils/ProxyServer.kt index adab0022..043728ee 100644 --- a/app/src/main/java/com/github/catvod/utils/ProxyServer.kt +++ b/app/src/main/java/com/github/catvod/utils/ProxyServer.kt @@ -68,102 +68,97 @@ object ProxyServer { response: AdvancedHttpServer.Response ) { runBlocking { - val channels = List(THREAD_NUM) { Channel() } + // 创建固定大小的通道队列,保持16个并发下载 + val downloadChannel = Channel>(16) + val dataChannel = Channel>(16) + SpiderDebug.log("--proxyAsync url: $url") SpiderDebug.log("--proxyAsync headers: ${Json.toJson(headers)}") try { - SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM") - - + // 解析范围请求 var rangeHeader = request.headers["Range"] - //没有range头 if (rangeHeader.isNullOrEmpty()) { - // 处理初始请求 rangeHeader = "bytes=0-" } - headers.toMutableMap().apply { - put("Range", rangeHeader) - } - // 解析范围请求 - val (startPoint, endPoint) = parseRangePoint( - rangeHeader - ) + val (startPoint, endPoint) = parseRangePoint(rangeHeader) - //缓存response header + // 获取内容信息 var info = infos[url] if (info == null) { info = getInfo(url, headers) infos[url] = info } - SpiderDebug.log("startPoint: $startPoint; endPoint: $endPoint") val contentLength = getContentLength(info) - SpiderDebug.log("contentLength: $contentLength") val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint + + SpiderDebug.log("startPoint: $startPoint; endPoint: $finalEndPoint") + SpiderDebug.log("contentLength: $contentLength") + + // 设置响应头 response.setContentType("text/html") - - - response.setHeader("Connection", "keep-alive") - response.setHeader( - "Content-Length", (finalEndPoint - startPoint + 1).toString() - ) - response.setHeader( - "Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength" - ) + response.setHeader("Content-Length", (finalEndPoint - startPoint + 1).toString()) + response.setHeader("Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength") info["Content-Type"]?.get(0)?.let { response.setHeader("Content-Type", it) } - response.setStatusCode(206) response.start() - // 使用流式响应 - - var currentStart = startPoint - - - // 启动生产者协程下载数据 - - val producerJob = mutableListOf() - - while (currentStart <= finalEndPoint) { - producerJob.clear() - // 创建通道用于接收数据块 - - for (i in 0 until THREAD_NUM) { - - if (currentStart > finalEndPoint) break - val chunkStart = currentStart - val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint) - producerJob += CoroutineScope(Dispatchers.IO).launch { - // 异步下载数据块 - val data = getVideoStream(chunkStart, chunkEnd, url, headers) - channels[i].send(data) + // 启动16个固定的下载协程 + val downloadJobs = List(16) { + CoroutineScope(Dispatchers.IO).launch { + for (range in downloadChannel) { + val data = getVideoStream(range.first, range.second, url, headers) + dataChannel.send(Pair(range.first, data)) } - currentStart = chunkEnd + 1 } - for ((index, _) in producerJob.withIndex()) { - - val data = channels[index].receive() - SpiderDebug.log("Received chunk: ${data.size} bytes") - response.write(data) - } - } - channels.forEach { it.close() } + + // 启动数据发送协程 + val sendJob = CoroutineScope(Dispatchers.IO).launch { + val receivedData = mutableMapOf() + var nextExpectedStart = startPoint + + for (dataPair in dataChannel) { + receivedData[dataPair.first] = dataPair.second + + // 按顺序发送数据 + while (receivedData.containsKey(nextExpectedStart)) { + val data = receivedData.remove(nextExpectedStart)!! + SpiderDebug.log("Sending chunk: ${data.size} bytes at $nextExpectedStart") + response.write(data) + nextExpectedStart += data.size + } + } + } + + // 分发下载任务 + var currentStart = startPoint + while (currentStart <= finalEndPoint) { + val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint) + downloadChannel.send(Pair(currentStart, chunkEnd)) + currentStart = chunkEnd + 1 + } + + // 关闭下载通道并等待所有下载完成 + downloadChannel.close() + downloadJobs.forEach { it.join() } + + // 关闭数据通道 + dataChannel.close() + sendJob.join() + } catch (e: Exception) { SpiderDebug.log("proxyAsync error: ${e.message}") e.printStackTrace() response.write("proxyAsync error: ${e.message}") - - } finally { - // channels.forEach { it.close() } - } } } + private fun queryToMap(query: String?): Map? { if (query == null) { return null