diff --git a/app/src/main/java/com/github/catvod/api/AliYun.java b/app/src/main/java/com/github/catvod/api/AliYun.java index 0d0cb2f9..81f4c47d 100644 --- a/app/src/main/java/com/github/catvod/api/AliYun.java +++ b/app/src/main/java/com/github/catvod/api/AliYun.java @@ -364,7 +364,7 @@ public class AliYun { } else if (flag.split("#")[0].equals("原畫")) { return Result.get().url(getDownloadUrl(ids[0], ids[1])).octet().subs(getSubs(ids)).header(getHeader()).string(); } else if (flag.split("#")[0].equals("極速")) { - return Result.get().url(MultiThread.url(getDownloadUrl(ids[0], ids[1]), 4)).octet().subs(getSubs(ids)).header(getHeader()).string(); + return Result.get().url(MultiThread.url(getDownloadUrl(ids[0], ids[1]), 5)).octet().subs(getSubs(ids)).header(getHeader()).string(); } else { return ""; } diff --git a/app/src/main/java/com/github/catvod/utils/MultiThread.java b/app/src/main/java/com/github/catvod/utils/MultiThread.java index b8c51c19..08d2bf5d 100644 --- a/app/src/main/java/com/github/catvod/utils/MultiThread.java +++ b/app/src/main/java/com/github/catvod/utils/MultiThread.java @@ -17,13 +17,20 @@ public class MultiThread { public static Object[] proxy(Map params) throws Exception { String url = params.get("url"); int thread = Integer.parseInt(params.get("thread")); - Map reqHeaders = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (String key : params.keySet()) reqHeaders.put(key, params.get(key)); - if (reqHeaders.containsKey("do")) reqHeaders.remove("do"); - if (reqHeaders.containsKey("url")) reqHeaders.remove("url"); - if (reqHeaders.containsKey("thread")) reqHeaders.remove("thread"); - MultiThreadedDownloader downloader = new MultiThreadedDownloader(url, reqHeaders, thread); + Map headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + for (String key : params.keySet()) headers.put(key, params.get(key)); + MultiThreadedDownloader downloader = new MultiThreadedDownloader(url, removeHeaders(headers), thread); NanoHTTPD.Response response = downloader.start(); return new Object[]{response}; } + + private static Map removeHeaders(Map headers) { + headers.remove("do"); + headers.remove("url"); + headers.remove("host"); + headers.remove("thread"); + headers.remove("remote-addr"); + headers.remove("http-client-ip"); + return headers; + } } diff --git a/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java b/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java index d29cdf49..d0874b90 100644 --- a/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java +++ b/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java @@ -1,13 +1,15 @@ package com.github.catvod.utils; -import static java.lang.Thread.sleep; import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse; +import android.os.SystemClock; + import com.github.catvod.net.OkHttp; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -20,71 +22,76 @@ import java.util.regex.Pattern; import fi.iki.elonen.NanoHTTPD; import okhttp3.Response; -// 多线程内存下载器 public class MultiThreadedDownloader { - private String url; // 资源URL - private Map headers; // HTTP Headers. 主要关注`Range-Bytes`这个字段. - private int chunkSize = 1024 * 128; // 每个线程每轮下载的字节数. - private int maxBufferedChunk = 1024; // 最多缓存多少个未被取走的chunk. 避免内存溢出. 过多chunk未被取走时,下载线程可以暂时休息等待. - private int numThreads; // 线程数. - private int timeout = 10; // 读超时(秒) - private boolean running = false; // 多线程是否在运行中. - private long startOffset = -1; // 下载起始的偏移量. - private long endOffset = -1; // 下载结束的偏移量. - private long currentOffset = -1; // 当前读取的偏移量. - private long nextChunkStartOffset = -1; // 下一个chunk的起始偏移量. - private BlockingQueue readyChunkQueue = new LinkedBlockingQueue<>(); // 已开始下载的chunk队列(有序). - private Lock lock = new ReentrantLock(); // 锁. + //已开始下载的 chunk 队列 + private final BlockingQueue readyChunkQueue; + private final Map headers; + private final String url; + private final Lock lock; + //最多缓存多少个未被取走的chunk + private final int maxBufferedChunk = 1024; + //线程数 + private final int numThreads; + //读超时 + private final int timeout = 10; + //每个线程每轮下载的字节数 + private int chunkSize = 1024 * 64; + //下载起始的偏移量 + private long startOffset = -1; + //下载结束的偏移量 + private long endOffset = -1; + //当前读取的偏移量 + private long currentOffset = -1; + //下一个 chunk 的起始偏移量 + private long nextChunkStartOffset = -1; + //多线程是否在运行中 + private boolean running; public MultiThreadedDownloader(String url, Map headers, int numThreads) { this.url = url; this.headers = headers; this.numThreads = numThreads; + this.lock = new ReentrantLock(); + this.readyChunkQueue = new LinkedBlockingQueue<>(); } - // 开始下载 + //开始下载 public NanoHTTPD.Response start() throws Exception { - // 确保请求header包含Range + //确保请求header包含Range NanoHTTPD.Response.Status status = NanoHTTPD.Response.Status.PARTIAL_CONTENT; - if (!this.headers.containsKey("Range")) { - this.headers.put("Range", "bytes=0-"); + if (!headers.containsKey("Range")) { + headers.put("Range", "bytes=0-"); status = NanoHTTPD.Response.Status.OK; } - // 尝试从Range判断下载的起始偏移量和结束偏移量 - Matcher matcher = Pattern.compile("bytes=(\\d+)-(\\d+)?").matcher(this.headers.get("Range")); + //尝试从Range判断下载的起始偏移量和结束偏移量 + Matcher matcher = Pattern.compile("bytes=(\\d+)-(\\d+)?").matcher(headers.get("Range")); if (matcher.find()) { - this.startOffset = Long.parseLong(matcher.group(1)); + startOffset = Long.parseLong(matcher.group(1)); if (matcher.group(2) != null) { - this.endOffset = Long.parseLong(matcher.group(2)); + endOffset = Long.parseLong(matcher.group(2)); } } else { - throw new Exception("invalid Range: " + this.headers.get("Range")); + throw new Exception("invalid Range: " + headers.get("Range")); } - this.nextChunkStartOffset = this.startOffset; - this.currentOffset = this.startOffset; + nextChunkStartOffset = startOffset; + currentOffset = startOffset; - // 建立连接 - Map mHeaders = new HashMap<>(); - for (Map.Entry entry : this.headers.entrySet()) { - if (!shouldFilterRequestHeaderKey(entry.getKey())) { - mHeaders.put(entry.getKey(), entry.getValue()); - } - } - Response response = OkHttp.newCall(getDownloadUrl(), mHeaders); + //建立连接 + Response response = OkHttp.newCall(getDownloadUrl(), headers); - // 尽早关闭连接,我们不需要body的数据 - response.body().close(); + //尽早关闭连接,我们不需要body的数据 + if (response.body() != null) response.body().close(); - // 检查状态码 + //检查状态码 int responseCode = response.code(); if (responseCode < 200 || responseCode >= 300) { throw new Exception("response code: " + responseCode); } - // 获取header + //获取header String contentType = response.headers().get("Content-Type"); String contentDisposition = response.headers().get("Content-Disposition"); if (contentDisposition != null) { @@ -118,13 +125,13 @@ public class MultiThreadedDownloader { } long contentLength = Long.parseLong(hContentLength); - // 尝试从Content-Range获取下载结束的偏移量 - if (this.endOffset <= 0) { + //尝试从Content-Range获取下载结束的偏移量 + if (endOffset <= 0) { String hContentRange = response.headers().get("Content-Range"); if (hContentRange != null) { matcher = Pattern.compile(".*/(\\d+)").matcher(hContentRange); if (matcher.find()) { - this.endOffset = Long.parseLong(matcher.group(1)) - 1; + endOffset = Long.parseLong(matcher.group(1)) - 1; } else { throw new Exception("invalid `Content-Range`: " + hContentRange); } @@ -133,20 +140,20 @@ public class MultiThreadedDownloader { } } - // 如果下载的内容过小,那么减少chunkSize,使得每个线程刚好只需要下载一轮 - long downloadSize = this.endOffset - this.startOffset + 1; - long onetimeChunkSize = downloadSize / this.numThreads + 1; - if (this.chunkSize > onetimeChunkSize) { - this.chunkSize = (int) onetimeChunkSize; + //如果下载的内容过小,那么减少chunkSize,使得每个线程刚好只需要下载一轮 + long downloadSize = endOffset - startOffset + 1; + long onetimeChunkSize = downloadSize / numThreads + 1; + if (chunkSize > onetimeChunkSize) { + chunkSize = (int) onetimeChunkSize; } - // 开启多线程下载 - this.running = true; - for (int i = 0; i < this.numThreads; ++i) { + //开启多线程下载 + running = true; + for (int i = 0; i < numThreads; ++i) { new Thread(MultiThreadedDownloader.this::worker).start(); } - // 构造response + //构造response PipedInputStream input = new PipedInputStream(); PipedOutputStream output = new PipedOutputStream(input); NanoHTTPD.Response mResponse = newFixedLengthResponse(status, contentType, input, contentLength); @@ -157,7 +164,7 @@ public class MultiThreadedDownloader { } } - // 搬运数据流 + //搬运数据流 new Thread(() -> { try { while (true) { @@ -182,32 +189,28 @@ public class MultiThreadedDownloader { return mResponse; } - // 读取文件内容 + //读取文件内容 private byte[] read() throws Exception { - // 判断文件是否下载结束 - if (this.currentOffset > this.endOffset) { - this.stop(); + //判断文件是否下载结束 + if (currentOffset > endOffset) { + stop(); return null; } - // 获取当前的chunk的数据 - Chunk currentChunk = this.readyChunkQueue.poll(this.timeout, TimeUnit.SECONDS); + //获取当前的chunk的数据 + Chunk currentChunk = readyChunkQueue.poll(timeout, TimeUnit.SECONDS); if (currentChunk == null) { - this.stop(); + stop(); throw new Exception("read timeout"); } - while (this.running) { + while (running) { byte[] buffer = currentChunk.get(); if (buffer != null) { - this.currentOffset += buffer.length; + currentOffset += buffer.length; return buffer; } else { - try { - sleep(100); - } catch (Exception e) { - e.printStackTrace(); - } + SystemClock.sleep(100); } } @@ -215,94 +218,79 @@ public class MultiThreadedDownloader { } private void worker() { - while (this.running) { - // 生成下一个chunk + while (running) { + //生成下一个chunk Chunk chunk = null; - this.lock.lock(); - long startOffset = this.nextChunkStartOffset; - this.nextChunkStartOffset += this.chunkSize; - if (startOffset <= this.endOffset) { - long endOffset = startOffset + this.chunkSize - 1; + lock.lock(); + long startOffset = nextChunkStartOffset; + nextChunkStartOffset += chunkSize; + if (startOffset <= endOffset) { + long endOffset = startOffset + chunkSize - 1; if (endOffset > this.endOffset) { endOffset = this.endOffset; } chunk = new Chunk(startOffset, endOffset); - this.readyChunkQueue.add(chunk); + readyChunkQueue.add(chunk); } - this.lock.unlock(); + lock.unlock(); - // 所有chunk已下载完 + //所有chunk已下载完 if (chunk == null) { break; } - while (this.running) { - // 过多的数据未被取走,先休息一下,避免内存溢出 - if (chunk.startOffset - this.currentOffset >= this.chunkSize * this.maxBufferedChunk) { - try { - sleep(1000); - } catch (Exception e) { - e.printStackTrace(); - } + while (running) { + //过多的数据未被取走,先休息一下,避免内存溢出 + if (chunk.startOffset - currentOffset >= (long) chunkSize * maxBufferedChunk) { + SystemClock.sleep(1000); } else { break; } } - while (this.running) { + while (running) { try { - // 建立连接 + //建立连接 Map mHeaders = new HashMap<>(); - for (Map.Entry entry : this.headers.entrySet()) { - if (!shouldFilterRequestHeaderKey(entry.getKey())) { - if (entry.getKey().equalsIgnoreCase("Range")) { - // 设置下载范围 - mHeaders.put("Range", String.format("bytes=%d-%d", chunk.startOffset, chunk.endOffset)); - } else { - mHeaders.put(entry.getKey(), entry.getValue()); - } + for (Map.Entry entry : headers.entrySet()) { + if (entry.getKey().equalsIgnoreCase("Range")) { + //设置下载范围 + mHeaders.put("Range", String.format(Locale.getDefault(), "bytes=%d-%d", chunk.startOffset, chunk.endOffset)); + } else { + mHeaders.put(entry.getKey(), entry.getValue()); } } Response response = OkHttp.newCall(getDownloadUrl(), mHeaders); - // 检查状态码 + //检查状态码 int responseCode = response.code(); if (responseCode < 200 || responseCode >= 300) { throw new Exception("response code: " + responseCode); } + //接收数据 + if (response.body() != null) { + chunk.put(response.body().bytes()); + } - // 接收数据 - byte[] buffer = response.body().bytes(); - chunk.put(buffer); break; } catch (Exception e) { e.printStackTrace(); - try { - sleep(1000); - } catch (Exception e1) { - e1.printStackTrace(); - } + SystemClock.sleep(1000); } } } } private String getDownloadUrl() { - if (this.url.contains("/proxy?")) { - return OkHttp.string(this.url); + if (url.contains("/proxy?")) { + return OkHttp.string(url); } else { - return this.url; + return url; } } - private boolean shouldFilterRequestHeaderKey(String key) { - if (key == null) return true; - key = key.toLowerCase(); - return key.equals("host") || key.equals("http-client-ip") || key.equals("remote-addr"); - } - public void stop() { - this.running = false; + running = false; } private static class Chunk { diff --git a/jar/custom_spider.jar b/jar/custom_spider.jar index 139e1842..e28eb9eb 100644 Binary files a/jar/custom_spider.jar and b/jar/custom_spider.jar differ diff --git a/jar/custom_spider.jar.md5 b/jar/custom_spider.jar.md5 index 996e7f5d..d89e595a 100644 --- a/jar/custom_spider.jar.md5 +++ b/jar/custom_spider.jar.md5 @@ -1 +1 @@ -41ef471afad9198e7273a12f9912578a +4bc4aada582cd00fafe20d12508dc2a7