Merge branch 'multiThreadkt' into multiThread

This commit is contained in:
lushunming 2025-07-14 13:58:03 +08:00
commit 8e5a2be92a
2 changed files with 46 additions and 68 deletions

View File

@ -2,54 +2,68 @@ package com.github.catvod.utils
import com.github.catvod.crawler.SpiderDebug
import com.github.catvod.net.OkHttp
import com.github.catvod.utils.ProxyVideo.getInfo
import com.github.catvod.utils.ProxyVideo.getMimeType
import com.github.catvod.utils.ProxyVideo.parseRange
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import okhttp3.Response
import org.apache.commons.lang3.StringUtils
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.SequenceInputStream
import java.util.Vector
import kotlin.math.min
object DownloadMT {
val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2
private val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2
private val infos = mutableMapOf<String, Array<Any>>();
fun proxyMultiThread(url: String, headers: Map<String, String>): Array<out Any?>? =
runBlocking {
proxy(url, headers)
proxyAsync(url, headers)
}
suspend fun proxy(url: String, headers: Map<String, String>): Array<out Any?>? {
/**
* 获取是否分片信息顺带请求一个1MB块
*/
@Throws(java.lang.Exception::class)
fun getInfo(url: String?, headers: Map<String, String>): Array<Any> {
val newHeaders: MutableMap<String, String> = java.util.HashMap(headers)
newHeaders["Range"] = "bytes=0-" + (1024 * 1024 - 1)
newHeaders["range"] = "bytes=0-" + (1024 * 1024 - 1)
val info = ProxyVideo.proxy(url, newHeaders)
return info
}
private suspend fun proxyAsync(url: String, headers: Map<String, String>): Array<out Any?>? {
/* val service = Executors.newFixedThreadPool(THREAD_NUM)*/
SpiderDebug.log("--proxyMultiThread: THREAD_NUM " + THREAD_NUM)
SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM")
try {
//缓存避免每次都请求total等信息
var info = infos[url]
//第一次请求请求是否支持range顺带返回一个1MB块
if (info == null) {
infos.clear()
info = getInfo(url, headers)
info = CoroutineScope(Dispatchers.IO).async { getInfo(url, headers) }.await()
infos[url] = info
//支持分片先返回这个1MB块
if (info[0] as Int == 206) {
return info
}
}
val code = info?.get(0) as Int
SpiderDebug.log("-----------code:" + code)
val code = info[0] as Int
SpiderDebug.log("-----------code:$code")
if (code != 206) {
return proxy(url, headers)
return ProxyVideo.proxy(url, headers)
}
val resHeader = info[3] as MutableMap<String, String>
val contentRange =
@ -76,57 +90,30 @@ object DownloadMT {
val partList = generatePart(rangeObj, total)
// 存储执行结果的List
val jobs = mutableListOf<Job>()
val channels = List(THREAD_NUM) { Channel<ByteArray>() }
val jobs = mutableListOf<Deferred<InputStream>>()
val inputStreams = mutableListOf<InputStream>()
for ((index, part) in partList.withIndex()) {
val newRange = "bytes=" + part[0] + "-" + part[1]
SpiderDebug.log("下载开始;newRange:$newRange")
val headerNew: MutableMap<String, String> = HashMap(headers)
headerNew["range"] = newRange
headerNew["Range"] = newRange
jobs += CoroutineScope(Dispatchers.IO).launch {
jobs += CoroutineScope(Dispatchers.IO).async {
val res = downloadRange(url, headerNew)
if (res != null) {
val buffer = ByteArray(1024)
var bytesRead: Int = 0
while (res.body()?.byteStream()?.read(buffer).also {
if (it != null) {
bytesRead = it
}
} != -1) {
// 处理读取的数据
channels[index].send(buffer.copyOfRange(0, bytesRead))
}
channels[index].close() // 发送完成后关闭通道
SpiderDebug.log("---第" + index + "块下载完成" + ";Content-Range:" + res.headers()["Content-Range"])
}
SpiderDebug.log(
("---第" + index + "块下载完成" + ";Content-Range:" + (res?.headers())?.get(
"Content-Range"
))
)
res?.body()!!.byteStream()
}
}
inputStreams += jobs.awaitAll()
val outputStream = ByteArrayOutputStream();
var pipedInputStream: ByteArrayInputStream? = null
var contentType: String? = ""
val res = CoroutineScope(Dispatchers.Default).async {
repeat(jobs.size) { index ->
for (bytes in channels[index]) {
// 处理读取的数据
outputStream.write(bytes);
}
}
// 等待所有下载完成
jobs.joinAll()
}
res.await()
// SpiderDebug.log(" ++proxy res data:" + Json.toJson(response.body()));
contentType = resHeader["Content-Type"]
@ -153,10 +140,10 @@ object DownloadMT {
SpiderDebug.log("----proxy res contentType:$contentType")
// SpiderDebug.log("++proxy res body:" + response.body());
SpiderDebug.log("----proxy res respHeaders:" + Json.toJson(resHeader))
pipedInputStream = ByteArrayInputStream(outputStream.toByteArray());
outputStream.close()
return arrayOf(206, contentType, pipedInputStream, resHeader)
return arrayOf(
206, contentType, SequenceInputStream(Vector(inputStreams).elements()), resHeader
)
} catch (e: Exception) {
@ -174,14 +161,13 @@ object DownloadMT {
var start = rangeObj["start"]!!.toLong()
var end =
if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!
.toLong()
if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!.toLong()
end = min(end.toDouble(), (totalSize - 1).toDouble()).toLong()
val length = end - start + 1
val size = length /THREAD_NUM
val size = length / THREAD_NUM
val partList: MutableList<LongArray> = ArrayList()
for (i in 0..<THREAD_NUM) {
val partEnd = min((start + size).toDouble(), end.toDouble()).toLong()

View File

@ -86,15 +86,7 @@ public class ProxyVideo {
return new Object[]{response.code(), contentType, response.body().byteStream(), respHeaders};
}
public static Object[] getInfo(String url, Map<String, String> headers) throws Exception {
SpiderDebug.log("--proxyMultiThread: start ");
Map<String, String> newHeaders = new HashMap<>(headers);
newHeaders.put("range", "bytes=0-0");
newHeaders.put("Range", "bytes=0-0");
Object[] info = proxy(url, newHeaders);
return info;
}
public static Object[] proxyMultiThread(String url, Map<String, String> headers) {
return DownloadMT.INSTANCE.proxyMultiThread(url, headers);