diff --git a/app/src/main/java/com/github/catvod/utils/ProxyServer.kt b/app/src/main/java/com/github/catvod/utils/ProxyServer.kt index 043728ee..7ea8b6d3 100644 --- a/app/src/main/java/com/github/catvod/utils/ProxyServer.kt +++ b/app/src/main/java/com/github/catvod/utils/ProxyServer.kt @@ -68,92 +68,98 @@ object ProxyServer { response: AdvancedHttpServer.Response ) { runBlocking { - // 创建固定大小的通道队列,保持16个并发下载 - val downloadChannel = Channel>(16) - val dataChannel = Channel>(16) - + val channels = List(THREAD_NUM) { Channel() } SpiderDebug.log("--proxyAsync url: $url") SpiderDebug.log("--proxyAsync headers: ${Json.toJson(headers)}") try { - // 解析范围请求 + SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM") + + var rangeHeader = request.headers["Range"] + //没有range头 if (rangeHeader.isNullOrEmpty()) { + // 处理初始请求 rangeHeader = "bytes=0-" } + headers.toMutableMap().apply { + put("Range", rangeHeader) + } - val (startPoint, endPoint) = parseRangePoint(rangeHeader) + // 解析范围请求 + val (startPoint, endPoint) = parseRangePoint( + rangeHeader + ) - // 获取内容信息 + //缓存response header var info = infos[url] if (info == null) { info = getInfo(url, headers) infos[url] = info } + SpiderDebug.log("startPoint: $startPoint; endPoint: $endPoint") val contentLength = getContentLength(info) - val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint - - SpiderDebug.log("startPoint: $startPoint; endPoint: $finalEndPoint") SpiderDebug.log("contentLength: $contentLength") - - // 设置响应头 + val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint response.setContentType("text/html") + + + response.setHeader("Connection", "keep-alive") - response.setHeader("Content-Length", (finalEndPoint - startPoint + 1).toString()) - response.setHeader("Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength") + response.setHeader( + "Content-Length", (finalEndPoint - startPoint + 1).toString() + ) + response.setHeader( + "Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength" + ) info["Content-Type"]?.get(0)?.let { response.setHeader("Content-Type", it) } + response.setStatusCode(206) response.start() + // 使用流式响应 - // 启动16个固定的下载协程 - val downloadJobs = List(16) { - CoroutineScope(Dispatchers.IO).launch { - for (range in downloadChannel) { - val data = getVideoStream(range.first, range.second, url, headers) - dataChannel.send(Pair(range.first, data)) - } - } - } - - // 启动数据发送协程 - val sendJob = CoroutineScope(Dispatchers.IO).launch { - val receivedData = mutableMapOf() - var nextExpectedStart = startPoint - - for (dataPair in dataChannel) { - receivedData[dataPair.first] = dataPair.second - - // 按顺序发送数据 - while (receivedData.containsKey(nextExpectedStart)) { - val data = receivedData.remove(nextExpectedStart)!! - SpiderDebug.log("Sending chunk: ${data.size} bytes at $nextExpectedStart") - response.write(data) - nextExpectedStart += data.size - } - } - } - - // 分发下载任务 var currentStart = startPoint + + + // 启动生产者协程下载数据 + + val producerJob = mutableListOf() + while (currentStart <= finalEndPoint) { - val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint) - downloadChannel.send(Pair(currentStart, chunkEnd)) - currentStart = chunkEnd + 1 + producerJob.clear() + // 创建通道用于接收数据块 + + for (i in 0 until THREAD_NUM) { + + if (currentStart > finalEndPoint) break + val chunkStart = currentStart + val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint) + producerJob += CoroutineScope(Dispatchers.IO).launch { + // 异步下载数据块 + val data = getVideoStream(chunkStart, chunkEnd, url, headers) + channels[i].send(data) + + } + currentStart = chunkEnd + 1 + } + for ((index, _) in producerJob.withIndex()) { + + val data = channels[index].receive() + SpiderDebug.log("Received chunk: ${data.size} bytes") + response.write(data) + } + } - - // 关闭下载通道并等待所有下载完成 - downloadChannel.close() - downloadJobs.forEach { it.join() } - - // 关闭数据通道 - dataChannel.close() - sendJob.join() - + channels.forEach { it.close() } } catch (e: Exception) { SpiderDebug.log("proxyAsync error: ${e.message}") e.printStackTrace() response.write("proxyAsync error: ${e.message}") + + } finally { + // channels.forEach { it.close() } + } } }