多线程加速优化

This commit is contained in:
lushunming 2025-07-07 11:35:57 +08:00
parent 272d9db20c
commit 619403dbc6
4 changed files with 113 additions and 82 deletions

View File

@ -2,31 +2,37 @@ package com.github.catvod.utils;
import android.os.SystemClock; import android.os.SystemClock;
import android.text.TextUtils; import android.text.TextUtils;
import com.github.catvod.crawler.SpiderDebug; import com.github.catvod.crawler.SpiderDebug;
import com.github.catvod.net.OkHttp; import com.github.catvod.net.OkHttp;
import com.github.catvod.spider.Proxy; import com.github.catvod.spider.Proxy;
import com.google.gson.Gson; import com.google.gson.Gson;
import okhttp3.Response;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject; import org.json.JSONObject;
import java.io.ByteArrayInputStream; import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import okhttp3.Response;
public class ProxyVideo { public class ProxyVideo {
private static final String GO_SERVER = "http://127.0.0.1:7777/"; private static final String GO_SERVER = "http://127.0.0.1:7777/";
//线程数4 //线程数4
private static final int THREAD_NUM = 16; private static final int THREAD_NUM = 8;
private static Map<String, Object[]> infos = new HashMap<>();
private static final ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);
public static String buildCommonProxyUrl(String url, Map<String, String> headers) { public static String buildCommonProxyUrl(String url, Map<String, String> headers) {
@ -80,97 +86,122 @@ public class ProxyVideo {
return new Object[]{response.code(), contentType, response.body().byteStream(), respHeaders}; return new Object[]{response.code(), contentType, response.body().byteStream(), respHeaders};
} }
public static Object[] getInfo(String url, Map<String, String> headers) throws Exception {
public static Object[] proxyMultiThread(String url, Map<String, String> headers) throws Exception {
SpiderDebug.log("--proxyMultiThread: start "); SpiderDebug.log("--proxyMultiThread: start ");
Map<String, String> newHeaders = new HashMap<>(headers); Map<String, String> newHeaders = new HashMap<>(headers);
newHeaders.put("range", "bytes=0-0"); newHeaders.put("range", "bytes=0-0");
newHeaders.put("Range", "bytes=0-0"); newHeaders.put("Range", "bytes=0-0");
Object[] info = proxy(url, newHeaders); Object[] info = proxy(url, newHeaders);
int code = (int) info[0]; return info;
if (code != 206) {
return proxy(url, headers); }
}
String contentRange = StringUtils.isAllBlank(((Map<String, String>) info[3]).get("Content-Range")) ? ((Map<String, String>) info[3]).get("content-range") : ((Map<String, String>) info[3]).get("Content-Range"); public static Object[] proxyMultiThread(String url, Map<String, String> headers) {
SpiderDebug.log("--contentRange:" + contentRange); ExecutorService service = Executors.newFixedThreadPool(THREAD_NUM);
//文件总大小
String total = StringUtils.split(contentRange, "/")[1]; SequenceInputStream in = null;
SpiderDebug.log("--文件总大小:" + total); try {
String range = StringUtils.isAllBlank(headers.get("range")) ? headers.get("Range") : headers.get("range"); //缓存避免每次都请求total等信息
SpiderDebug.log("---proxyMultiThread,Range:" + range); Object[] info = infos.get(url);
Map<String, String> rangeObj = parseRange(range); if (info == null) {
//没有range,无需分割 infos.clear();
if (rangeObj == null) { info = getInfo(url, headers);
SpiderDebug.log("没有range,无需分割"); infos.put(url, info);
return proxy(url, headers);
} else {
List<long[]> partList = generatePart(rangeObj, total);
// 存储执行结果的List
List<Future<Response>> results = new ArrayList<>();
for (long[] part : partList) {
String newRange = "bytes=" + part[0] + "-" + part[1];
SpiderDebug.log("下载开始" + ";newRange:" + newRange);
Map<String, String> headerNew = new HashMap<>(headers);
headerNew.put("range", newRange);
headerNew.put("Range", newRange);
Future<Response> result = service.submit(() -> {
try {
return OkHttp.newCall(url, headerNew);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
results.add(result);
} }
byte[] bytes = null;
Response response = null;
for (int i = 0; i < THREAD_NUM; i++) {
// 获取包含返回结果的future对象
Future<Response> future = results.get(i);
// 从future中取出执行结果若尚未返回结果则get方法被阻塞直到结果被返回为止
response = future.get();
bytes = ArrayUtils.addAll(bytes, response.body().bytes());
SpiderDebug.log("---第" + i + "块下载完成" + ";Content-Range:" + response.headers().get("Content-Range"));
SpiderDebug.log("---第" + i + "块下载完成" + ";content-range:" + response.headers().get("content-range"));
int code = (int) info[0];
if (code != 206) {
return proxy(url, headers);
} }
service.shutdown(); Map<String, String> resHeader = (Map<String, String>) info[3];
String contentType = response.headers().get("Content-Type"); String contentRange = StringUtils.isAllBlank(resHeader.get("Content-Range")) ? resHeader.get("content-range") : resHeader.get("Content-Range");
String contentDisposition = response.headers().get("Content-Disposition"); SpiderDebug.log("--contentRange:" + contentRange);
if (contentDisposition != null) contentType = getMimeType(contentDisposition); //文件总大小
Map<String, String> respHeaders = new HashMap<>(); String total = StringUtils.split(contentRange, "/")[1];
/* respHeaders.put("Access-Control-Allow-Credentials", "true"); SpiderDebug.log("--文件总大小:" + total);
String range = StringUtils.isAllBlank(headers.get("range")) ? headers.get("Range") : headers.get("range");
SpiderDebug.log("---proxyMultiThread,Range:" + range);
Map<String, String> rangeObj = parseRange(range);
//没有range,无需分割
if (rangeObj == null) {
SpiderDebug.log("没有range,无需分割");
return proxy(url, headers);
} else {
List<long[]> partList = generatePart(rangeObj, total);
// 存储执行结果的List
List<Future<Response>> results = new ArrayList<>();
for (long[] part : partList) {
String newRange = "bytes=" + part[0] + "-" + part[1];
SpiderDebug.log("下载开始" + ";newRange:" + newRange);
Map<String, String> headerNew = new HashMap<>(headers);
headerNew.put("range", newRange);
headerNew.put("Range", newRange);
Future<Response> result = service.submit(() -> {
try {
return OkHttp.newCall(url, headerNew);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
results.add(result);
}
List<InputStream> inputStreams = new ArrayList<>();
for (int i = 0; i < THREAD_NUM; i++) {
// 获取包含返回结果的future对象
Future<Response> future = results.get(i);
// 从future中取出执行结果若尚未返回结果则get方法被阻塞直到结果被返回为止
Response response = future.get();
okhttp3.ResponseBody body = response.body();
/* int bytesRead;
while ((bytesRead = body.byteStream().read(bytes)) != -1) {
out.write(bytes, 0, bytesRead);
}*/
inputStreams.add(body.byteStream());
SpiderDebug.log("---第" + i + "块下载完成" + ";Content-Range:" + response.headers().get("Content-Range"));
SpiderDebug.log("---第" + i + "块下载完成" + ";content-range:" + response.headers().get("content-range"));
}
in = new SequenceInputStream(new Vector<>(inputStreams).elements());
service.shutdown();
String contentType = resHeader.get("Content-Type");
String contentDisposition = resHeader.get("Content-Disposition");
if (contentDisposition != null) contentType = getMimeType(contentDisposition);
/* respHeaders.put("Access-Control-Allow-Credentials", "true");
respHeaders.put("Access-Control-Allow-Origin", "*");*/ respHeaders.put("Access-Control-Allow-Origin", "*");*/
for (String key : response.headers().names()) {
respHeaders.put(key, response.headers().get(key)); resHeader.put("Content-Length", String.valueOf(partList.get(THREAD_NUM - 1)[1] - partList.get(0)[0] + 1));
// respHeaders.put("content-length", String.valueOf(bytes.length));
resHeader.put("Content-Range", String.format("bytes %s-%s/%s", partList.get(0)[0], partList.get(THREAD_NUM - 1)[1], total));
// respHeaders.put("content-range", String.format("bytes %s-%s/%s", partList.get(0)[0], partList.get(THREAD_NUM - 1)[1], total));
SpiderDebug.log("++proxy res contentType:" + contentType);
// SpiderDebug.log("++proxy res body:" + response.body());
SpiderDebug.log("++proxy res respHeaders:" + Json.toJson(resHeader));
return new Object[]{206, contentType, in, resHeader};
} }
respHeaders.put("Content-Length", String.valueOf(bytes.length)); } catch (Exception e) {
respHeaders.put("content-length", String.valueOf(bytes.length)); SpiderDebug.log("proxyMultiThread error:" + e.getMessage());
respHeaders.put("Content-Range", String.format("bytes %s-%s/%s", partList.get(0)[0], partList.get(THREAD_NUM - 1)[1], total)); e.printStackTrace();
respHeaders.put("content-range", String.format("bytes %s-%s/%s", partList.get(0)[0], partList.get(THREAD_NUM - 1)[1], total)); return null;
SpiderDebug.log("++proxy res contentType:" + contentType);
// SpiderDebug.log("++proxy res body:" + response.body());
SpiderDebug.log("++proxy res respHeaders:" + Json.toJson(respHeaders));
return new Object[]{response.code(), contentType, new ByteArrayInputStream(bytes), respHeaders};
} }
} }
private static List<long[]> generatePart(Map<String, String> rangeObj, String total) { private static List<long[]> generatePart(Map<String, String> rangeObj, String total) {
long start = Long.parseLong(rangeObj.get("start")); long start = Long.parseLong(rangeObj.get("start"));
long end = StringUtils.isAllBlank(rangeObj.get("end")) ? start + 1024 * 1024 *8 : Long.parseLong(rangeObj.get("end")); long end = StringUtils.isAllBlank(rangeObj.get("end")) ? start + 1024 * 1024 * 8 : Long.parseLong(rangeObj.get("end"));
long totalSize = Long.parseLong(total); long totalSize = Long.parseLong(total);

Binary file not shown.

View File

@ -1 +1 @@
5abe7e76705934af62d214ed64044353 2f2f43db60e7493b3af54c85a8172218

View File

@ -1,5 +1,5 @@
{ {
"spider": "https://ghproxy.net/https://raw.githubusercontent.com/lushunming/AndroidCatVodSpider/multiThread/jar/custom_spider.jar;md5;5abe7e76705934af62d214ed64044353", "spider": "https://ghproxy.net/https://raw.githubusercontent.com/lushunming/AndroidCatVodSpider/multiThread/jar/custom_spider.jar;md5;2f2f43db60e7493b3af54c85a8172218",
"lives": [ "lives": [
{ {
"name": "电视直播", "name": "电视直播",