add multi thread download from 小明
This commit is contained in:
parent
5754026910
commit
3e405d754d
|
|
@ -44,4 +44,5 @@ dependencies {
|
|||
implementation 'com.google.zxing:core:3.3.0'
|
||||
//implementation 'com.orhanobut:logger:2.2.0'
|
||||
implementation 'org.jsoup:jsoup:1.15.3'
|
||||
implementation 'org.nanohttpd:nanohttpd:2.3.1'
|
||||
}
|
||||
|
|
@ -44,4 +44,7 @@
|
|||
-keepclassmembers enum * {
|
||||
public static **[] values();
|
||||
public static ** valueOf(java.lang.String);
|
||||
}
|
||||
}
|
||||
|
||||
# Nano
|
||||
-keep class fi.iki.elonen.** {*;}
|
||||
|
|
@ -32,6 +32,7 @@ import com.github.catvod.net.OkHttp;
|
|||
import com.github.catvod.net.OkResult;
|
||||
import com.github.catvod.spider.Init;
|
||||
import com.github.catvod.spider.Proxy;
|
||||
import com.github.catvod.utils.MultiThread;
|
||||
import com.github.catvod.utils.Path;
|
||||
import com.github.catvod.utils.QRCode;
|
||||
import com.github.catvod.utils.Utils;
|
||||
|
|
@ -368,7 +369,7 @@ public class AliYun {
|
|||
}
|
||||
|
||||
public String playerContent(String[] ids, boolean original) {
|
||||
if (original) return Result.get().url(getDownloadUrl(ids[0], ids[1])).octet().subs(getSubs(ids)).header(getHeader()).string();
|
||||
if (original) return Result.get().url(getMultiThreadedDownloadUrl(ids[0], ids[1])).octet().subs(getSubs(ids)).header(getHeader()).string();
|
||||
else return getPreviewContent(ids);
|
||||
}
|
||||
|
||||
|
|
@ -421,6 +422,12 @@ public class AliYun {
|
|||
return resp.getResponse().getStatus() == 404;
|
||||
}
|
||||
|
||||
public String getMultiThreadedDownloadUrl(String shareId, String fileId) {
|
||||
String url = getDownloadUrl(shareId, fileId);
|
||||
url = MultiThread.proxyUrl(url, 20);
|
||||
return url;
|
||||
}
|
||||
|
||||
public Object[] proxySub(Map<String, String> params) throws Exception {
|
||||
String fileId = params.get("fileId");
|
||||
String shareId = params.get("shareId");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,330 @@
|
|||
package com.github.catvod.downloader;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse;
|
||||
|
||||
import com.github.catvod.net.OkHttp;
|
||||
|
||||
import java.io.PipedInputStream;
|
||||
import java.io.PipedOutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import fi.iki.elonen.NanoHTTPD;
|
||||
import okhttp3.Response;
|
||||
|
||||
// 多线程内存下载器
|
||||
public class MultiThreadedMemoryDownloader {
|
||||
private String url; // 资源URL
|
||||
private Map<String, String> headers; // HTTP Headers. 主要关注`Range-Bytes`这个字段.
|
||||
private int chunkSize = 1024 * 128; // 每个线程每轮下载的字节数.
|
||||
private int maxBufferedChunk = 1024; // 最多缓存多少个未被取走的chunk. 避免内存溢出. 过多chunk未被取走时,下载线程可以暂时休息等待.
|
||||
private int numThreads; // 线程数.
|
||||
private int timeout = 10; // 读超时(秒)
|
||||
private boolean running = false; // 多线程是否在运行中.
|
||||
private long startOffset = -1; // 下载起始的偏移量.
|
||||
private long endOffset = -1; // 下载结束的偏移量.
|
||||
private long currentOffset = -1; // 当前读取的偏移量.
|
||||
private long nextChunkStartOffset = -1; // 下一个chunk的起始偏移量.
|
||||
private BlockingQueue<Chunk> readyChunkQueue = new LinkedBlockingQueue<>(); // 已开始下载的chunk队列(有序).
|
||||
private Lock lock = new ReentrantLock(); // 锁.
|
||||
|
||||
public MultiThreadedMemoryDownloader(String url, Map<String, String> headers, int numThreads) {
|
||||
this.url = url;
|
||||
this.headers = headers;
|
||||
this.numThreads = numThreads;
|
||||
}
|
||||
|
||||
// 开始下载
|
||||
public NanoHTTPD.Response start() throws Exception {
|
||||
// 确保请求header包含Range
|
||||
NanoHTTPD.Response.Status status = NanoHTTPD.Response.Status.PARTIAL_CONTENT;
|
||||
if (!this.headers.containsKey("Range")) {
|
||||
this.headers.put("Range", "bytes=0-");
|
||||
status = NanoHTTPD.Response.Status.OK;
|
||||
}
|
||||
|
||||
// 尝试从Range判断下载的起始偏移量和结束偏移量
|
||||
Matcher matcher = Pattern.compile("bytes=(\\d+)-(\\d+)?").matcher(this.headers.get("Range"));
|
||||
if (matcher.find()) {
|
||||
this.startOffset = Long.parseLong(matcher.group(1));
|
||||
if (matcher.group(2) != null) {
|
||||
this.endOffset = Long.parseLong(matcher.group(2));
|
||||
}
|
||||
} else {
|
||||
throw new Exception("invalid Range: " + this.headers.get("Range"));
|
||||
}
|
||||
|
||||
this.nextChunkStartOffset = this.startOffset;
|
||||
this.currentOffset = this.startOffset;
|
||||
|
||||
// 建立连接
|
||||
Map<String, String> mHeaders = new HashMap<>();
|
||||
for (Map.Entry<String, String> entry : this.headers.entrySet()) {
|
||||
if (!shouldFilterRequestHeaderKey(entry.getKey())) {
|
||||
mHeaders.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
Response response = OkHttp.newCall(getDownloadUrl(), mHeaders);
|
||||
|
||||
// 尽早关闭连接,我们不需要body的数据
|
||||
response.body().close();
|
||||
|
||||
// 检查状态码
|
||||
int responseCode = response.code();
|
||||
if (responseCode < 200 || responseCode >= 300) {
|
||||
throw new Exception("response code: " + responseCode);
|
||||
}
|
||||
|
||||
// 获取header
|
||||
String contentType = response.headers().get("Content-Type");
|
||||
String contentDisposition = response.headers().get("Content-Disposition");
|
||||
if (contentDisposition != null) {
|
||||
if (contentDisposition.endsWith(".mp4")) {
|
||||
contentType = "video/mp4";
|
||||
} else if (contentDisposition.endsWith(".webm")) {
|
||||
contentType = "video/webm";
|
||||
} else if (contentDisposition.endsWith(".avi")) {
|
||||
contentType = "video/x-msvideo";
|
||||
} else if (contentDisposition.endsWith(".wmv")) {
|
||||
contentType = "video/x-ms-wmv";
|
||||
} else if (contentDisposition.endsWith(".flv")) {
|
||||
contentType = "video/x-flv";
|
||||
} else if (contentDisposition.endsWith(".mov")) {
|
||||
contentType = "video/quicktime";
|
||||
} else if (contentDisposition.endsWith(".mkv")) {
|
||||
contentType = "video/x-matroska";
|
||||
} else if (contentDisposition.endsWith(".mpeg")) {
|
||||
contentType = "video/mpeg";
|
||||
} else if (contentDisposition.endsWith(".3gp")) {
|
||||
contentType = "video/3gpp";
|
||||
}
|
||||
}
|
||||
if (contentType == null) {
|
||||
throw new Exception("missing response header: Content-Type");
|
||||
}
|
||||
|
||||
String hContentLength = response.headers().get("Content-Length");
|
||||
if (hContentLength == null) {
|
||||
throw new Exception("missing response header: Content-Length");
|
||||
}
|
||||
Long contentLength = Long.parseLong(hContentLength);
|
||||
|
||||
// 尝试从Content-Range获取下载结束的偏移量
|
||||
if (this.endOffset <= 0) {
|
||||
String hContentRange = response.headers().get("Content-Range");
|
||||
if (hContentRange != null) {
|
||||
matcher = Pattern.compile(".*/(\\d+)").matcher(hContentRange);
|
||||
if (matcher.find()) {
|
||||
this.endOffset = Long.parseLong(matcher.group(1)) - 1;
|
||||
} else {
|
||||
throw new Exception("invalid `Content-Range`: " + hContentRange);
|
||||
}
|
||||
} else {
|
||||
throw new Exception("missing response header: Content-Range");
|
||||
}
|
||||
}
|
||||
|
||||
// 如果下载的内容过小,那么减少chunkSize,使得每个线程刚好只需要下载一轮
|
||||
long downloadSize = this.endOffset - this.startOffset + 1;
|
||||
long onetimeChunkSize = downloadSize / this.numThreads + 1;
|
||||
if (this.chunkSize > onetimeChunkSize) {
|
||||
this.chunkSize = (int) onetimeChunkSize;
|
||||
}
|
||||
|
||||
// 开启多线程下载
|
||||
this.running = true;
|
||||
for (int i = 0; i < this.numThreads; ++i) {
|
||||
new Thread(() -> {
|
||||
MultiThreadedMemoryDownloader.this.worker();
|
||||
}).start();
|
||||
}
|
||||
|
||||
// 构造response
|
||||
PipedInputStream input = new PipedInputStream();
|
||||
PipedOutputStream output = new PipedOutputStream(input);
|
||||
NanoHTTPD.Response mResponse = newFixedLengthResponse(status, contentType, input, contentLength);
|
||||
for (String key : response.headers().names()) {
|
||||
String value = response.headers().get(key);
|
||||
if (key != null && !key.equalsIgnoreCase("Content-Type") && !key.equalsIgnoreCase("Content-Length")) {
|
||||
mResponse.addHeader(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
// 搬运数据流
|
||||
new Thread(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
byte[] buffer = read();
|
||||
if (buffer == null || buffer.length == 0) {
|
||||
break;
|
||||
}
|
||||
output.write(buffer);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
try {
|
||||
stop();
|
||||
output.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
return mResponse;
|
||||
}
|
||||
|
||||
// 读取文件内容
|
||||
private byte[] read() throws Exception {
|
||||
// 判断文件是否下载结束
|
||||
if (this.currentOffset > this.endOffset) {
|
||||
this.stop();
|
||||
return null;
|
||||
}
|
||||
|
||||
// 获取当前的chunk的数据
|
||||
Chunk currentChunk = this.readyChunkQueue.poll(this.timeout, TimeUnit.SECONDS);
|
||||
if (currentChunk == null) {
|
||||
this.stop();
|
||||
throw new Exception("read timeout");
|
||||
}
|
||||
|
||||
while (this.running) {
|
||||
byte[] buffer = currentChunk.get();
|
||||
if (buffer != null) {
|
||||
this.currentOffset += buffer.length;
|
||||
return buffer;
|
||||
} else {
|
||||
try {
|
||||
sleep(100);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void worker() {
|
||||
while (this.running) {
|
||||
// 生成下一个chunk
|
||||
Chunk chunk = null;
|
||||
this.lock.lock();
|
||||
long startOffset = this.nextChunkStartOffset;
|
||||
this.nextChunkStartOffset += this.chunkSize;
|
||||
if (startOffset <= this.endOffset) {
|
||||
long endOffset = startOffset + this.chunkSize - 1;
|
||||
if (endOffset > this.endOffset) {
|
||||
endOffset = this.endOffset;
|
||||
}
|
||||
chunk = new Chunk(startOffset, endOffset);
|
||||
this.readyChunkQueue.add(chunk);
|
||||
}
|
||||
this.lock.unlock();
|
||||
|
||||
// 所有chunk已下载完
|
||||
if (chunk == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
while (this.running) {
|
||||
// 过多的数据未被取走,先休息一下,避免内存溢出
|
||||
if (chunk.startOffset - this.currentOffset >= this.chunkSize * this.maxBufferedChunk) {
|
||||
try {
|
||||
sleep(1000);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while (this.running) {
|
||||
try {
|
||||
// 建立连接
|
||||
Map<String, String> mHeaders = new HashMap<>();
|
||||
for (Map.Entry<String, String> entry : this.headers.entrySet()) {
|
||||
if (!shouldFilterRequestHeaderKey(entry.getKey())) {
|
||||
if (entry.getKey().equalsIgnoreCase("Range")) {
|
||||
// 设置下载范围
|
||||
mHeaders.put("Range", String.format("bytes=%d-%d", chunk.startOffset, chunk.endOffset));
|
||||
} else {
|
||||
mHeaders.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
Response response = OkHttp.newCall(getDownloadUrl(), mHeaders);
|
||||
|
||||
// 检查状态码
|
||||
int responseCode = response.code();
|
||||
if (responseCode < 200 || responseCode >= 300) {
|
||||
throw new Exception("response code: " + responseCode);
|
||||
}
|
||||
|
||||
// 接收数据
|
||||
byte[] buffer = response.body().bytes();
|
||||
chunk.put(buffer);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
try {
|
||||
sleep(1000);
|
||||
} catch (Exception e1) {
|
||||
e1.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getDownloadUrl() {
|
||||
if (this.url.contains("/proxy?")) {
|
||||
return OkHttp.string(this.url);
|
||||
} else {
|
||||
return this.url;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private boolean shouldFilterRequestHeaderKey(String key) {
|
||||
if (key == null) {
|
||||
return true;
|
||||
}
|
||||
key = key.toLowerCase();
|
||||
return key.equals("host") || key.equals("http-client-ip") || key.equals("remote-addr");
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
this.running = false;
|
||||
}
|
||||
|
||||
private class Chunk {
|
||||
private long startOffset;
|
||||
private long endOffset;
|
||||
private byte[] buffer;
|
||||
|
||||
public Chunk(long startOffset, long endOffset) {
|
||||
this.startOffset = startOffset;
|
||||
this.endOffset = endOffset;
|
||||
}
|
||||
|
||||
public byte[] get() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public void put(byte[] buffer) throws InterruptedException {
|
||||
this.buffer = buffer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@ package com.github.catvod.spider;
|
|||
import com.github.catvod.crawler.Spider;
|
||||
import com.github.catvod.crawler.SpiderDebug;
|
||||
import com.github.catvod.net.OkHttp;
|
||||
import com.github.catvod.utils.MultiThread;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.Map;
|
||||
|
|
@ -15,6 +16,8 @@ public class Proxy extends Spider {
|
|||
switch (params.get("do")) {
|
||||
case "ck":
|
||||
return new Object[]{200, "text/plain; charset=utf-8", new ByteArrayInputStream("ok".getBytes("UTF-8"))};
|
||||
case "multi":
|
||||
return MultiThread.proxy(params);
|
||||
case "ali":
|
||||
return Ali.proxy(params);
|
||||
case "bili":
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
package com.github.catvod.utils;
|
||||
|
||||
import com.github.catvod.downloader.MultiThreadedMemoryDownloader;
|
||||
import com.github.catvod.spider.Proxy;
|
||||
|
||||
import java.net.URLEncoder;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import fi.iki.elonen.NanoHTTPD;
|
||||
|
||||
public class MultiThread {
|
||||
|
||||
public static String proxyUrl(String url, int thread) {
|
||||
return String.format(Proxy.getUrl() + "?do=multi&url=%s&thread=%d", URLEncoder.encode(url), thread);
|
||||
}
|
||||
|
||||
public static Object[] proxy(Map<String, String> params) throws Exception {
|
||||
String url = params.get("url");
|
||||
int threadNum = Integer.parseInt(params.get("thread"));
|
||||
Map<String, String> reqHeaders = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
|
||||
for (Map.Entry<String, String> entry : params.entrySet()) {
|
||||
if (entry.getKey() != null) {
|
||||
reqHeaders.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
if (reqHeaders.containsKey("do")) reqHeaders.remove("do");
|
||||
if (reqHeaders.containsKey("url")) reqHeaders.remove("url");
|
||||
if (reqHeaders.containsKey("thread")) reqHeaders.remove("thread");
|
||||
MultiThreadedMemoryDownloader downloader = new MultiThreadedMemoryDownloader(url, reqHeaders, threadNum);
|
||||
NanoHTTPD.Response response = downloader.start();
|
||||
Object[] result = new Object[1];
|
||||
result[0] = response;
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue