diff --git a/app/src/main/java/com/github/catvod/utils/DownloadMT.kt b/app/src/main/java/com/github/catvod/utils/DownloadMT.kt index 29a5463a..48192b7c 100644 --- a/app/src/main/java/com/github/catvod/utils/DownloadMT.kt +++ b/app/src/main/java/com/github/catvod/utils/DownloadMT.kt @@ -2,54 +2,68 @@ package com.github.catvod.utils import com.github.catvod.crawler.SpiderDebug import com.github.catvod.net.OkHttp -import com.github.catvod.utils.ProxyVideo.getInfo import com.github.catvod.utils.ProxyVideo.getMimeType import com.github.catvod.utils.ProxyVideo.parseRange import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job import kotlinx.coroutines.async -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.runBlocking import okhttp3.Response import org.apache.commons.lang3.StringUtils -import java.io.ByteArrayInputStream -import java.io.ByteArrayOutputStream +import java.io.InputStream +import java.io.SequenceInputStream +import java.util.Vector import kotlin.math.min object DownloadMT { - val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2 + private val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2 private val infos = mutableMapOf>(); fun proxyMultiThread(url: String, headers: Map): Array? = runBlocking { - proxy(url, headers) + proxyAsync(url, headers) } - suspend fun proxy(url: String, headers: Map): Array? { + /** + * 获取是否分片信息,顺带请求一个1MB块 + */ + @Throws(java.lang.Exception::class) + fun getInfo(url: String?, headers: Map): Array { + val newHeaders: MutableMap = java.util.HashMap(headers) + newHeaders["Range"] = "bytes=0-" + (1024 * 1024 - 1) + newHeaders["range"] = "bytes=0-" + (1024 * 1024 - 1) + val info = ProxyVideo.proxy(url, newHeaders) + return info + } + + private suspend fun proxyAsync(url: String, headers: Map): Array? { /* val service = Executors.newFixedThreadPool(THREAD_NUM)*/ - SpiderDebug.log("--proxyMultiThread: THREAD_NUM " + THREAD_NUM) + SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM") try { //缓存,避免每次都请求total等信息 - - var info = infos[url] + //第一次请求,请求是否支持range,顺带返回一个1MB块 if (info == null) { infos.clear() - info = getInfo(url, headers) + info = CoroutineScope(Dispatchers.IO).async { getInfo(url, headers) }.await() infos[url] = info + //支持分片,先返回这个1MB块 + if (info[0] as Int == 206) { + return info + } } - val code = info?.get(0) as Int - SpiderDebug.log("-----------code:" + code) + val code = info[0] as Int + SpiderDebug.log("-----------code:$code") + if (code != 206) { - return proxy(url, headers) + return ProxyVideo.proxy(url, headers) } val resHeader = info[3] as MutableMap val contentRange = @@ -76,57 +90,30 @@ object DownloadMT { val partList = generatePart(rangeObj, total) // 存储执行结果的List - val jobs = mutableListOf() - val channels = List(THREAD_NUM) { Channel() } + val jobs = mutableListOf>() + val inputStreams = mutableListOf() for ((index, part) in partList.withIndex()) { val newRange = "bytes=" + part[0] + "-" + part[1] - SpiderDebug.log("下载开始;newRange:$newRange") - val headerNew: MutableMap = HashMap(headers) headerNew["range"] = newRange headerNew["Range"] = newRange - jobs += CoroutineScope(Dispatchers.IO).launch { + jobs += CoroutineScope(Dispatchers.IO).async { val res = downloadRange(url, headerNew) - - if (res != null) { - val buffer = ByteArray(1024) - var bytesRead: Int = 0 - - while (res.body()?.byteStream()?.read(buffer).also { - if (it != null) { - bytesRead = it - } - } != -1) { - // 处理读取的数据 - channels[index].send(buffer.copyOfRange(0, bytesRead)) - - } - channels[index].close() // 发送完成后关闭通道 - SpiderDebug.log("---第" + index + "块下载完成" + ";Content-Range:" + res.headers()["Content-Range"]) - } + SpiderDebug.log( + ("---第" + index + "块下载完成" + ";Content-Range:" + (res?.headers())?.get( + "Content-Range" + )) + ) + res?.body()!!.byteStream() } } + inputStreams += jobs.awaitAll() - val outputStream = ByteArrayOutputStream(); - var pipedInputStream: ByteArrayInputStream? = null var contentType: String? = "" - val res = CoroutineScope(Dispatchers.Default).async { - repeat(jobs.size) { index -> - - for (bytes in channels[index]) { - // 处理读取的数据 - outputStream.write(bytes); - } - - } - // 等待所有下载完成 - jobs.joinAll() - } - res.await() // SpiderDebug.log(" ++proxy res data:" + Json.toJson(response.body())); contentType = resHeader["Content-Type"] @@ -153,10 +140,10 @@ object DownloadMT { SpiderDebug.log("----proxy res contentType:$contentType") // SpiderDebug.log("++proxy res body:" + response.body()); SpiderDebug.log("----proxy res respHeaders:" + Json.toJson(resHeader)) - pipedInputStream = ByteArrayInputStream(outputStream.toByteArray()); - outputStream.close() - return arrayOf(206, contentType, pipedInputStream, resHeader) + return arrayOf( + 206, contentType, SequenceInputStream(Vector(inputStreams).elements()), resHeader + ) } catch (e: Exception) { @@ -174,14 +161,13 @@ object DownloadMT { var start = rangeObj["start"]!!.toLong() var end = - if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!! - .toLong() + if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!.toLong() end = min(end.toDouble(), (totalSize - 1).toDouble()).toLong() val length = end - start + 1 - val size = length /THREAD_NUM + val size = length / THREAD_NUM val partList: MutableList = ArrayList() for (i in 0.. headers) throws Exception { - SpiderDebug.log("--proxyMultiThread: start "); - Map newHeaders = new HashMap<>(headers); - newHeaders.put("range", "bytes=0-0"); - newHeaders.put("Range", "bytes=0-0"); - Object[] info = proxy(url, newHeaders); - return info; - } public static Object[] proxyMultiThread(String url, Map headers) { return DownloadMT.INSTANCE.proxyMultiThread(url, headers);