kt线程和分片优化
This commit is contained in:
parent
4f47bd68c6
commit
295bc67569
|
|
@ -2,54 +2,68 @@ package com.github.catvod.utils
|
||||||
|
|
||||||
import com.github.catvod.crawler.SpiderDebug
|
import com.github.catvod.crawler.SpiderDebug
|
||||||
import com.github.catvod.net.OkHttp
|
import com.github.catvod.net.OkHttp
|
||||||
import com.github.catvod.utils.ProxyVideo.getInfo
|
|
||||||
import com.github.catvod.utils.ProxyVideo.getMimeType
|
import com.github.catvod.utils.ProxyVideo.getMimeType
|
||||||
import com.github.catvod.utils.ProxyVideo.parseRange
|
import com.github.catvod.utils.ProxyVideo.parseRange
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.Deferred
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.Job
|
|
||||||
import kotlinx.coroutines.async
|
import kotlinx.coroutines.async
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.awaitAll
|
||||||
import kotlinx.coroutines.joinAll
|
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import okhttp3.Response
|
import okhttp3.Response
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
import java.io.ByteArrayInputStream
|
import java.io.InputStream
|
||||||
import java.io.ByteArrayOutputStream
|
import java.io.SequenceInputStream
|
||||||
|
import java.util.Vector
|
||||||
import kotlin.math.min
|
import kotlin.math.min
|
||||||
|
|
||||||
object DownloadMT {
|
object DownloadMT {
|
||||||
val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2
|
private val THREAD_NUM: Int = Runtime.getRuntime().availableProcessors() * 2
|
||||||
|
|
||||||
private val infos = mutableMapOf<String, Array<Any>>();
|
private val infos = mutableMapOf<String, Array<Any>>();
|
||||||
|
|
||||||
fun proxyMultiThread(url: String, headers: Map<String, String>): Array<out Any?>? =
|
fun proxyMultiThread(url: String, headers: Map<String, String>): Array<out Any?>? =
|
||||||
runBlocking {
|
runBlocking {
|
||||||
proxy(url, headers)
|
proxyAsync(url, headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun proxy(url: String, headers: Map<String, String>): Array<out Any?>? {
|
/**
|
||||||
|
* 获取是否分片信息,顺带请求一个1MB块
|
||||||
|
*/
|
||||||
|
@Throws(java.lang.Exception::class)
|
||||||
|
fun getInfo(url: String?, headers: Map<String, String>): Array<Any> {
|
||||||
|
val newHeaders: MutableMap<String, String> = java.util.HashMap(headers)
|
||||||
|
newHeaders["Range"] = "bytes=0-" + (1024 * 1024 - 1)
|
||||||
|
newHeaders["range"] = "bytes=0-" + (1024 * 1024 - 1)
|
||||||
|
val info = ProxyVideo.proxy(url, newHeaders)
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun proxyAsync(url: String, headers: Map<String, String>): Array<out Any?>? {
|
||||||
|
|
||||||
/* val service = Executors.newFixedThreadPool(THREAD_NUM)*/
|
/* val service = Executors.newFixedThreadPool(THREAD_NUM)*/
|
||||||
SpiderDebug.log("--proxyMultiThread: THREAD_NUM " + THREAD_NUM)
|
SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM")
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
//缓存,避免每次都请求total等信息
|
//缓存,避免每次都请求total等信息
|
||||||
|
|
||||||
|
|
||||||
var info = infos[url]
|
var info = infos[url]
|
||||||
|
//第一次请求,请求是否支持range,顺带返回一个1MB块
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
infos.clear()
|
infos.clear()
|
||||||
info = getInfo(url, headers)
|
info = CoroutineScope(Dispatchers.IO).async { getInfo(url, headers) }.await()
|
||||||
infos[url] = info
|
infos[url] = info
|
||||||
|
//支持分片,先返回这个1MB块
|
||||||
|
if (info[0] as Int == 206) {
|
||||||
|
return info
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val code = info?.get(0) as Int
|
val code = info[0] as Int
|
||||||
SpiderDebug.log("-----------code:" + code)
|
SpiderDebug.log("-----------code:$code")
|
||||||
|
|
||||||
if (code != 206) {
|
if (code != 206) {
|
||||||
return proxy(url, headers)
|
return ProxyVideo.proxy(url, headers)
|
||||||
}
|
}
|
||||||
val resHeader = info[3] as MutableMap<String, String>
|
val resHeader = info[3] as MutableMap<String, String>
|
||||||
val contentRange =
|
val contentRange =
|
||||||
|
|
@ -76,57 +90,30 @@ object DownloadMT {
|
||||||
val partList = generatePart(rangeObj, total)
|
val partList = generatePart(rangeObj, total)
|
||||||
|
|
||||||
// 存储执行结果的List
|
// 存储执行结果的List
|
||||||
val jobs = mutableListOf<Job>()
|
val jobs = mutableListOf<Deferred<InputStream>>()
|
||||||
val channels = List(THREAD_NUM) { Channel<ByteArray>() }
|
val inputStreams = mutableListOf<InputStream>()
|
||||||
for ((index, part) in partList.withIndex()) {
|
for ((index, part) in partList.withIndex()) {
|
||||||
|
|
||||||
|
|
||||||
val newRange = "bytes=" + part[0] + "-" + part[1]
|
val newRange = "bytes=" + part[0] + "-" + part[1]
|
||||||
SpiderDebug.log("下载开始;newRange:$newRange")
|
|
||||||
|
|
||||||
val headerNew: MutableMap<String, String> = HashMap(headers)
|
val headerNew: MutableMap<String, String> = HashMap(headers)
|
||||||
|
|
||||||
headerNew["range"] = newRange
|
headerNew["range"] = newRange
|
||||||
headerNew["Range"] = newRange
|
headerNew["Range"] = newRange
|
||||||
jobs += CoroutineScope(Dispatchers.IO).launch {
|
jobs += CoroutineScope(Dispatchers.IO).async {
|
||||||
val res = downloadRange(url, headerNew)
|
val res = downloadRange(url, headerNew)
|
||||||
|
SpiderDebug.log(
|
||||||
if (res != null) {
|
("---第" + index + "块下载完成" + ";Content-Range:" + (res?.headers())?.get(
|
||||||
val buffer = ByteArray(1024)
|
"Content-Range"
|
||||||
var bytesRead: Int = 0
|
))
|
||||||
|
)
|
||||||
while (res.body()?.byteStream()?.read(buffer).also {
|
res?.body()!!.byteStream()
|
||||||
if (it != null) {
|
|
||||||
bytesRead = it
|
|
||||||
}
|
|
||||||
} != -1) {
|
|
||||||
// 处理读取的数据
|
|
||||||
channels[index].send(buffer.copyOfRange(0, bytesRead))
|
|
||||||
|
|
||||||
}
|
|
||||||
channels[index].close() // 发送完成后关闭通道
|
|
||||||
SpiderDebug.log("---第" + index + "块下载完成" + ";Content-Range:" + res.headers()["Content-Range"])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
inputStreams += jobs.awaitAll()
|
||||||
|
|
||||||
val outputStream = ByteArrayOutputStream();
|
|
||||||
var pipedInputStream: ByteArrayInputStream? = null
|
|
||||||
var contentType: String? = ""
|
var contentType: String? = ""
|
||||||
|
|
||||||
val res = CoroutineScope(Dispatchers.Default).async {
|
|
||||||
repeat(jobs.size) { index ->
|
|
||||||
|
|
||||||
for (bytes in channels[index]) {
|
|
||||||
// 处理读取的数据
|
|
||||||
outputStream.write(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
// 等待所有下载完成
|
|
||||||
jobs.joinAll()
|
|
||||||
}
|
|
||||||
res.await()
|
|
||||||
|
|
||||||
// SpiderDebug.log(" ++proxy res data:" + Json.toJson(response.body()));
|
// SpiderDebug.log(" ++proxy res data:" + Json.toJson(response.body()));
|
||||||
contentType = resHeader["Content-Type"]
|
contentType = resHeader["Content-Type"]
|
||||||
|
|
@ -153,10 +140,10 @@ object DownloadMT {
|
||||||
SpiderDebug.log("----proxy res contentType:$contentType")
|
SpiderDebug.log("----proxy res contentType:$contentType")
|
||||||
// SpiderDebug.log("++proxy res body:" + response.body());
|
// SpiderDebug.log("++proxy res body:" + response.body());
|
||||||
SpiderDebug.log("----proxy res respHeaders:" + Json.toJson(resHeader))
|
SpiderDebug.log("----proxy res respHeaders:" + Json.toJson(resHeader))
|
||||||
pipedInputStream = ByteArrayInputStream(outputStream.toByteArray());
|
|
||||||
outputStream.close()
|
|
||||||
|
|
||||||
return arrayOf(206, contentType, pipedInputStream, resHeader)
|
return arrayOf(
|
||||||
|
206, contentType, SequenceInputStream(Vector(inputStreams).elements()), resHeader
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
|
@ -174,14 +161,13 @@ object DownloadMT {
|
||||||
|
|
||||||
var start = rangeObj["start"]!!.toLong()
|
var start = rangeObj["start"]!!.toLong()
|
||||||
var end =
|
var end =
|
||||||
if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!
|
if (StringUtils.isAllBlank(rangeObj["end"])) start + partSize else rangeObj["end"]!!.toLong()
|
||||||
.toLong()
|
|
||||||
|
|
||||||
|
|
||||||
end = min(end.toDouble(), (totalSize - 1).toDouble()).toLong()
|
end = min(end.toDouble(), (totalSize - 1).toDouble()).toLong()
|
||||||
val length = end - start + 1
|
val length = end - start + 1
|
||||||
|
|
||||||
val size = length /THREAD_NUM
|
val size = length / THREAD_NUM
|
||||||
val partList: MutableList<LongArray> = ArrayList()
|
val partList: MutableList<LongArray> = ArrayList()
|
||||||
for (i in 0..<THREAD_NUM) {
|
for (i in 0..<THREAD_NUM) {
|
||||||
val partEnd = min((start + size).toDouble(), end.toDouble()).toLong()
|
val partEnd = min((start + size).toDouble(), end.toDouble()).toLong()
|
||||||
|
|
|
||||||
|
|
@ -86,15 +86,7 @@ public class ProxyVideo {
|
||||||
return new Object[]{response.code(), contentType, response.body().byteStream(), respHeaders};
|
return new Object[]{response.code(), contentType, response.body().byteStream(), respHeaders};
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Object[] getInfo(String url, Map<String, String> headers) throws Exception {
|
|
||||||
SpiderDebug.log("--proxyMultiThread: start ");
|
|
||||||
Map<String, String> newHeaders = new HashMap<>(headers);
|
|
||||||
newHeaders.put("range", "bytes=0-0");
|
|
||||||
newHeaders.put("Range", "bytes=0-0");
|
|
||||||
Object[] info = proxy(url, newHeaders);
|
|
||||||
return info;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Object[] proxyMultiThread(String url, Map<String, String> headers) {
|
public static Object[] proxyMultiThread(String url, Map<String, String> headers) {
|
||||||
return DownloadMT.INSTANCE.proxyMultiThread(url, headers);
|
return DownloadMT.INSTANCE.proxyMultiThread(url, headers);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue