kt线程和分片优化

This commit is contained in:
lushunming 2025-07-14 10:27:38 +08:00
parent 946ff4a6f9
commit 4f47bd68c6
6 changed files with 29 additions and 50 deletions

View File

@ -5,7 +5,6 @@ import com.github.catvod.net.OkHttp
import com.github.catvod.utils.ProxyVideo.getInfo import com.github.catvod.utils.ProxyVideo.getInfo
import com.github.catvod.utils.ProxyVideo.getMimeType import com.github.catvod.utils.ProxyVideo.getMimeType
import com.github.catvod.utils.ProxyVideo.parseRange import com.github.catvod.utils.ProxyVideo.parseRange
import com.github.catvod.utils.ProxyVideo.proxy
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
@ -21,24 +20,25 @@ import java.io.ByteArrayOutputStream
import kotlin.math.min import kotlin.math.min
object DownloadMT { object DownloadMT {
val CORE_NUM: Int = Runtime.getRuntime().availableProcessors() val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2
private val infos = mutableMapOf<String, Array<Any>>(); private val infos = mutableMapOf<String, Array<Any>>();
fun proxyMultiThread(url: String, headers: Map<String, String>): Array<out Any?>? = fun proxyMultiThread(url: String, headers: Map<String, String>): Array<out Any?>? =
runBlocking { runBlocking {
proxyAsync(url, headers) proxy(url, headers)
} }
suspend fun proxyAsync(url: String, headers: Map<String, String>): Array<out Any?>? { suspend fun proxy(url: String, headers: Map<String, String>): Array<out Any?>? {
/* val service = Executors.newFixedThreadPool(THREAD_NUM)*/ /* val service = Executors.newFixedThreadPool(THREAD_NUM)*/
SpiderDebug.log("--proxyMultiThread: CORE_NUM " + CORE_NUM) SpiderDebug.log("--proxyMultiThread: THREAD_NUM " + THREAD_NUM)
//默认线程数核心数两倍
var threadNum = CORE_NUM * 2
try { try {
//缓存避免每次都请求total等信息 //缓存避免每次都请求total等信息
var info = infos[url] var info = infos[url]
if (info == null) { if (info == null) {
infos.clear() infos.clear()
@ -47,7 +47,7 @@ object DownloadMT {
} }
val code = info?.get(0) as Int val code = info?.get(0) as Int
SpiderDebug.log("-----------code:$code") SpiderDebug.log("-----------code:" + code)
if (code != 206) { if (code != 206) {
return proxy(url, headers) return proxy(url, headers)
} }
@ -60,19 +60,10 @@ object DownloadMT {
val total = StringUtils.split(contentRange, "/")[1] val total = StringUtils.split(contentRange, "/")[1]
SpiderDebug.log("--文件总大小:$total") SpiderDebug.log("--文件总大小:$total")
//如果文件小于50MB也不走代理 //如果文件太小,也不走代理
if (total.toLong() < 1024 * 1024 * 50L) { /* if (total.toLong() < 1024 * 1024 * 100) {
return proxy(url, headers) return proxy(url, headers)
} else if (total.toLong() < 1024 * 1024 * 1024L * 10L) { }*/
//10GB以下
threadNum = CORE_NUM * 3
} else if (total.toLong() < 1024 * 1024 * 1024L * 40L) {
//40GB以下
threadNum = CORE_NUM * 4
} else {
//40GB以上
threadNum = CORE_NUM * 5
}
var range = var range =
if (StringUtils.isAllBlank(headers["range"])) headers["Range"] else headers["range"] if (StringUtils.isAllBlank(headers["range"])) headers["Range"] else headers["range"]
if (StringUtils.isAllBlank(range)) range = "bytes=0-"; if (StringUtils.isAllBlank(range)) range = "bytes=0-";
@ -80,16 +71,13 @@ object DownloadMT {
val rangeObj = parseRange( val rangeObj = parseRange(
range!! range!!
) )
//没有range,无需分割
//视频开始,加大线程数 val partList = generatePart(rangeObj, total)
if (rangeObj["start"]!!.toLong() == 0L) {
threadNum = CORE_NUM * 4
}
val partList = generatePart(rangeObj, total, threadNum)
// 存储执行结果的List // 存储执行结果的List
val jobs = mutableListOf<Job>() val jobs = mutableListOf<Job>()
val channels = List(threadNum) { Channel<ByteArray>() } val channels = List(THREAD_NUM) { Channel<ByteArray>() }
for ((index, part) in partList.withIndex()) { for ((index, part) in partList.withIndex()) {
@ -154,11 +142,11 @@ object DownloadMT {
/* respHeaders.put("Access-Control-Allow-Credentials", "true"); /* respHeaders.put("Access-Control-Allow-Credentials", "true");
respHeaders.put("Access-Control-Allow-Origin", "*");*/ respHeaders.put("Access-Control-Allow-Origin", "*");*/
resHeader["Content-Length"] = resHeader["Content-Length"] =
(partList[threadNum - 1][1] - partList[0][0] + 1).toString() (partList[THREAD_NUM - 1][1] - partList[0][0] + 1).toString()
resHeader.remove("content-length") resHeader.remove("content-length")
resHeader["Content-Range"] = String.format( resHeader["Content-Range"] = String.format(
"bytes %s-%s/%s", partList[0][0], partList[threadNum - 1][1], total "bytes %s-%s/%s", partList[0][0], partList[THREAD_NUM - 1][1], total
) )
resHeader.remove("content-range") resHeader.remove("content-range")
@ -178,33 +166,24 @@ object DownloadMT {
} }
} }
private fun generatePart( fun generatePart(rangeObj: Map<String?, String>, total: String): List<LongArray> {
rangeObj: Map<String?, String>, total: String, threadNum: Int
): List<LongArray> {
val totalSize = total.toLong() val totalSize = total.toLong()
//超过10GB分块是80Mb不然是16MB
//默认8MB val partSize =
var partSize = 1024 * 1024 * 8L if (totalSize > 8L * 1024L * 1024L * 1024L * 10L) 1024 * 1024 * 8 * 10L else 1024 * 1024 * 8 * 2L
if (totalSize < 1024 * 1024 * 1024L * 10L) {
//10GB以下分片8MB
partSize = 1024 * 1024 * 8L
} else {
//40GB以下分片64MB
partSize = 1024 * 1024 * 8L * 8
}
var start = rangeObj["start"]!!.toLong() var start = rangeObj["start"]!!.toLong()
var end = var end =
if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!.toLong() if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!
.toLong()
end = min(end.toDouble(), (totalSize - 1).toDouble()).toLong() end = min(end.toDouble(), (totalSize - 1).toDouble()).toLong()
val length = end - start + 1 val length = end - start + 1
val size = length / threadNum val size = length /THREAD_NUM
val partList: MutableList<LongArray> = ArrayList() val partList: MutableList<LongArray> = ArrayList()
for (i in 0..<threadNum) { for (i in 0..<THREAD_NUM) {
val partEnd = min((start + size).toDouble(), end.toDouble()).toLong() val partEnd = min((start + size).toDouble(), end.toDouble()).toLong()
partList.add(longArrayOf(start, partEnd)) partList.add(longArrayOf(start, partEnd))

View File

@ -16,7 +16,7 @@ public class ProxyVideoTest {
Server.get().start(); Server.get().start();
String url = ProxyVideo.buildCommonProxyUrl( String url = ProxyVideo.buildCommonProxyUrl(
// "https://js.shipin520.com/pc/images/new/banner20250225.mp4", new HashMap<>()); // "https://js.shipin520.com/pc/images/new/banner20250225.mp4", new HashMap<>());
"https://video.shipin520.com/videos/42/33/21/b_hsTXjZv04HeM1613423321_v1.mp4", new HashMap<>()); "http://172.16.1.217:18089/ng-grid/video.mp4", new HashMap<>());
System.out.println(url); System.out.println(url);
while (true) { while (true) {

Binary file not shown.

View File

@ -1 +1 @@
8b6de828883275e559c19d37c8dff9d1 d62dc3073c5c2ff1bff14a5128c5d4dd

View File

@ -1,5 +1,5 @@
{ {
"spider": "https://andoridspidermt.netlify.app/jar/custom_spider.jar;md5;8b6de828883275e559c19d37c8dff9d1", "spider": "https://andoridspidermt.netlify.app/jar/custom_spider.jar;md5;d62dc3073c5c2ff1bff14a5128c5d4dd",
"lives": [ "lives": [
{ {
"name": "电视直播", "name": "电视直播",

View File

@ -1,5 +1,5 @@
{ {
"spider": "https://ghproxy.net/https://raw.githubusercontent.com/lushunming/AndroidCatVodSpider/multiThreadkt/jar/custom_spider.jar;md5;8b6de828883275e559c19d37c8dff9d1", "spider": "https://ghproxy.net/https://raw.githubusercontent.com/lushunming/AndroidCatVodSpider/multiThreadkt/jar/custom_spider.jar;md5;d62dc3073c5c2ff1bff14a5128c5d4dd",
"lives": [ "lives": [
{ {
"name": "电视直播", "name": "电视直播",