Remove java multi thread

This commit is contained in:
FongMi 2023-12-13 11:39:05 +08:00
parent 390f76b1b7
commit 444053491c
5 changed files with 11 additions and 350 deletions

View File

@ -34,7 +34,6 @@ import com.github.catvod.net.OkHttp;
import com.github.catvod.net.OkResult; import com.github.catvod.net.OkResult;
import com.github.catvod.spider.Init; import com.github.catvod.spider.Init;
import com.github.catvod.spider.Proxy; import com.github.catvod.spider.Proxy;
import com.github.catvod.utils.MultiThread;
import com.github.catvod.utils.Path; import com.github.catvod.utils.Path;
import com.github.catvod.utils.ProxyVideo; import com.github.catvod.utils.ProxyVideo;
import com.github.catvod.utils.QRCode; import com.github.catvod.utils.QRCode;
@ -511,10 +510,11 @@ public class AliYun {
headers.remove("templateId"); headers.remove("templateId");
headers.remove("remote-addr"); headers.remove("remote-addr");
headers.remove("http-client-ip"); headers.remove("http-client-ip");
if (thread == 1) { if (thread == 1) {
return new Object[]{ProxyVideo.proxy(downloadUrl, headers)}; return new Object[]{ProxyVideo.proxy(downloadUrl, headers)};
} else { } else {
return new Object[]{ProxyVideo.proxy(MultiThread.go(downloadUrl, thread), headers)}; return new Object[]{ProxyVideo.multi(downloadUrl, headers, thread)};
} }
} }

View File

@ -3,7 +3,6 @@ package com.github.catvod.spider;
import com.github.catvod.crawler.Spider; import com.github.catvod.crawler.Spider;
import com.github.catvod.crawler.SpiderDebug; import com.github.catvod.crawler.SpiderDebug;
import com.github.catvod.net.OkHttp; import com.github.catvod.net.OkHttp;
import com.github.catvod.utils.MultiThread;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.util.Map; import java.util.Map;
@ -16,8 +15,6 @@ public class Proxy extends Spider {
switch (params.get("do")) { switch (params.get("do")) {
case "ck": case "ck":
return new Object[]{200, "text/plain; charset=utf-8", new ByteArrayInputStream("ok".getBytes("UTF-8"))}; return new Object[]{200, "text/plain; charset=utf-8", new ByteArrayInputStream("ok".getBytes("UTF-8"))};
case "multi":
return MultiThread.proxy(params);
case "ali": case "ali":
return Ali.proxy(params); return Ali.proxy(params);
case "bili": case "bili":

View File

@ -1,37 +0,0 @@
package com.github.catvod.utils;
import com.github.catvod.spider.Proxy;
import java.net.URLEncoder;
import java.util.Map;
import java.util.TreeMap;
public class MultiThread {
public static String url(String url, int thread) {
return String.format(Proxy.getUrl() + "?do=multi&url=%s&thread=%d", URLEncoder.encode(url), thread);
}
public static String go(String url, int thread) {
return String.format("http://127.0.0.1:7777/?url=%s&thread=%d", URLEncoder.encode(url), thread);
}
public static Object[] proxy(Map<String, String> params) throws Exception {
String url = params.get("url");
int thread = Integer.parseInt(params.get("thread"));
Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (String key : params.keySet()) headers.put(key, params.get(key));
MultiThreadedDownloader downloader = new MultiThreadedDownloader(url, removeHeaders(headers), thread);
return new Object[]{downloader.start()};
}
private static Map<String, String> removeHeaders(Map<String, String> headers) {
headers.remove("do");
headers.remove("url");
headers.remove("host");
headers.remove("thread");
headers.remove("remote-addr");
headers.remove("http-client-ip");
return headers;
}
}

View File

@ -1,303 +0,0 @@
package com.github.catvod.utils;
import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse;
import android.os.SystemClock;
import com.github.catvod.net.OkHttp;
import com.github.catvod.spider.Init;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import fi.iki.elonen.NanoHTTPD;
import okhttp3.Response;
public class MultiThreadedDownloader {
//已开始下载的 chunk 队列
private final BlockingQueue<Chunk> readyChunkQueue;
private final Map<String, String> headers;
private final ExecutorService executor;
private final String url;
private final Lock lock;
//最多缓存多少个未被取走的chunk
private final int maxBufferedChunk = 1024;
//线程数
private final int numThreads;
//读超时
private final int timeout = 10;
//每个线程每轮下载的字节数
private int chunkSize = 1024 * 64;
//下载起始的偏移量
private long startOffset = -1;
//下载结束的偏移量
private long endOffset = -1;
//当前读取的偏移量
private long currentOffset = -1;
//下一个 chunk 的起始偏移量
private long nextChunkStartOffset = -1;
//多线程是否在运行中
private boolean running;
public MultiThreadedDownloader(String url, Map<String, String> headers, int numThreads) {
this.url = url;
this.headers = headers;
this.numThreads = numThreads;
this.lock = new ReentrantLock();
this.readyChunkQueue = new LinkedBlockingQueue<>();
this.executor = Executors.newFixedThreadPool(numThreads);
}
//开始下载
public NanoHTTPD.Response start() throws Exception {
//确保请求header包含Range
NanoHTTPD.Response.Status status = NanoHTTPD.Response.Status.PARTIAL_CONTENT;
if (!headers.containsKey("Range")) {
headers.put("Range", "bytes=0-");
status = NanoHTTPD.Response.Status.OK;
}
//尝试从Range判断下载的起始偏移量和结束偏移量
Matcher matcher = Pattern.compile("bytes=(\\d+)-(\\d+)?").matcher(headers.get("Range"));
if (matcher.find()) {
startOffset = Long.parseLong(matcher.group(1));
if (matcher.group(2) != null) {
endOffset = Long.parseLong(matcher.group(2));
}
} else {
throw new Exception("invalid Range: " + headers.get("Range"));
}
nextChunkStartOffset = startOffset;
currentOffset = startOffset;
//建立连接
Response response = OkHttp.newCall(getDownloadUrl(), headers);
//尽早关闭连接我们不需要body的数据
if (response.body() != null) response.body().close();
//检查状态码
int responseCode = response.code();
if (responseCode < 200 || responseCode >= 300) {
throw new Exception("response code: " + responseCode);
}
//获取header
String contentType = response.headers().get("Content-Type");
String contentDisposition = response.headers().get("Content-Disposition");
if (contentDisposition != null) contentType = Utils.getMimeType(contentDisposition);
if (contentType == null) throw new Exception("missing response header: Content-Type");
String hContentLength = response.headers().get("Content-Length");
if (hContentLength == null) throw new Exception("missing response header: Content-Length");
long contentLength = Long.parseLong(hContentLength);
//尝试从Content-Range获取下载结束的偏移量
if (endOffset <= 0) {
String hContentRange = response.headers().get("Content-Range");
if (hContentRange != null) {
matcher = Pattern.compile(".*/(\\d+)").matcher(hContentRange);
if (matcher.find()) {
endOffset = Long.parseLong(matcher.group(1)) - 1;
} else {
throw new Exception("invalid `Content-Range`: " + hContentRange);
}
} else {
throw new Exception("missing response header: Content-Range");
}
}
//如果下载的内容过小那么减少chunkSize使得每个线程刚好只需要下载一轮
long downloadSize = endOffset - startOffset + 1;
long onetimeChunkSize = downloadSize / numThreads + 1;
if (chunkSize > onetimeChunkSize) {
chunkSize = (int) onetimeChunkSize;
}
//开启多线程下载
running = true;
for (int i = 0; i < numThreads; ++i) {
executor.execute(this::worker);
}
//构造response
PipedInputStream input = new PipedInputStream();
PipedOutputStream output = new PipedOutputStream(input);
NanoHTTPD.Response mResponse = newFixedLengthResponse(status, contentType, input, contentLength);
for (String key : response.headers().names()) {
String value = response.headers().get(key);
if (!key.equalsIgnoreCase("Content-Type") && !key.equalsIgnoreCase("Content-Length")) {
mResponse.addHeader(key, value);
}
}
//搬运数据流
Init.execute(() -> {
try {
while (true) {
byte[] buffer = read();
if (buffer == null || buffer.length == 0) {
break;
}
output.write(buffer);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
stop();
output.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
return mResponse;
}
//读取文件内容
private byte[] read() throws Exception {
//判断文件是否下载结束
if (currentOffset > endOffset) {
stop();
return null;
}
//获取当前的chunk的数据
Chunk currentChunk = readyChunkQueue.poll(timeout, TimeUnit.SECONDS);
if (currentChunk == null) {
stop();
throw new Exception("read timeout");
}
while (running) {
byte[] buffer = currentChunk.get();
if (buffer != null) {
currentOffset += buffer.length;
return buffer;
} else {
SystemClock.sleep(100);
}
}
return null;
}
private void worker() {
while (running) {
//打斷技能
if (Thread.interrupted()) break;
//生成下一个chunk
Chunk chunk = null;
lock.lock();
long startOffset = nextChunkStartOffset;
nextChunkStartOffset += chunkSize;
if (startOffset <= endOffset) {
long endOffset = startOffset + chunkSize - 1;
if (endOffset > this.endOffset) {
endOffset = this.endOffset;
}
chunk = new Chunk(startOffset, endOffset);
readyChunkQueue.add(chunk);
}
lock.unlock();
//所有chunk已下载完
if (chunk == null) {
break;
}
while (running) {
//过多的数据未被取走先休息一下避免内存溢出
if (chunk.startOffset - currentOffset >= (long) chunkSize * maxBufferedChunk) {
SystemClock.sleep(1000);
} else {
break;
}
}
while (running) {
try {
//建立连接
Map<String, String> mHeaders = new HashMap<>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
if (entry.getKey().equalsIgnoreCase("Range")) {
//设置下载范围
mHeaders.put("Range", String.format(Locale.getDefault(), "bytes=%d-%d", chunk.startOffset, chunk.endOffset));
} else {
mHeaders.put(entry.getKey(), entry.getValue());
}
}
Response response = OkHttp.newCall(getDownloadUrl(), mHeaders);
//检查状态码
int responseCode = response.code();
if (responseCode < 200 || responseCode >= 300) {
throw new Exception("response code: " + responseCode);
}
//接收数据
if (response.body() != null) {
chunk.put(response.body().bytes());
}
break;
} catch (Exception e) {
e.printStackTrace();
SystemClock.sleep(1000);
}
}
}
}
private String getDownloadUrl() {
if (url.contains("/proxy?")) {
return OkHttp.string(url);
} else {
return url;
}
}
public void stop() {
running = false;
}
public void destory() {
running = false;
executor.shutdownNow();
}
private static class Chunk {
private final long startOffset;
private final long endOffset;
private byte[] buffer;
public Chunk(long startOffset, long endOffset) {
this.startOffset = startOffset;
this.endOffset = endOffset;
}
public byte[] get() {
return buffer;
}
public void put(byte[] buffer) {
this.buffer = buffer;
}
}
}

View File

@ -1,9 +1,12 @@
package com.github.catvod.utils; package com.github.catvod.utils;
import static fi.iki.elonen.NanoHTTPD.Response.Status;
import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse; import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse;
import com.github.catvod.net.OkHttp; import com.github.catvod.net.OkHttp;
import java.net.URLEncoder;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import fi.iki.elonen.NanoHTTPD; import fi.iki.elonen.NanoHTTPD;
@ -11,12 +14,13 @@ import okhttp3.Response;
public class ProxyVideo { public class ProxyVideo {
public static NanoHTTPD.Response multi(String url, Map<String, String> headers, int thread) throws Exception {
return proxy(String.format(Locale.getDefault(), "http://127.0.0.1:7777/?url=%s&thread=%d", URLEncoder.encode(url), thread), headers);
}
public static NanoHTTPD.Response proxy(String url, Map<String, String> headers) throws Exception { public static NanoHTTPD.Response proxy(String url, Map<String, String> headers) throws Exception {
NanoHTTPD.Response.Status status = NanoHTTPD.Response.Status.PARTIAL_CONTENT; Status status = headers.containsKey("Range") ? Status.PARTIAL_CONTENT : Status.OK;
if (!headers.containsKey("Range")) { if (!headers.containsKey("Range")) headers.put("Range", "bytes=0-");
headers.put("Range", "bytes=0-");
status = NanoHTTPD.Response.Status.OK;
}
Response response = OkHttp.newCall(url, headers); Response response = OkHttp.newCall(url, headers);
String contentType = response.headers().get("Content-Type"); String contentType = response.headers().get("Content-Type");
String hContentLength = response.headers().get("Content-Length"); String hContentLength = response.headers().get("Content-Length");