diff --git a/app/src/main/java/com/github/catvod/api/AliYun.java b/app/src/main/java/com/github/catvod/api/AliYun.java index 2e14d94a..c5b9f444 100644 --- a/app/src/main/java/com/github/catvod/api/AliYun.java +++ b/app/src/main/java/com/github/catvod/api/AliYun.java @@ -34,7 +34,6 @@ import com.github.catvod.net.OkHttp; import com.github.catvod.net.OkResult; import com.github.catvod.spider.Init; import com.github.catvod.spider.Proxy; -import com.github.catvod.utils.MultiThread; import com.github.catvod.utils.Path; import com.github.catvod.utils.ProxyVideo; import com.github.catvod.utils.QRCode; @@ -511,10 +510,11 @@ public class AliYun { headers.remove("templateId"); headers.remove("remote-addr"); headers.remove("http-client-ip"); + if (thread == 1) { return new Object[]{ProxyVideo.proxy(downloadUrl, headers)}; } else { - return new Object[]{ProxyVideo.proxy(MultiThread.go(downloadUrl, thread), headers)}; + return new Object[]{ProxyVideo.multi(downloadUrl, headers, thread)}; } } diff --git a/app/src/main/java/com/github/catvod/spider/Proxy.java b/app/src/main/java/com/github/catvod/spider/Proxy.java index 1c3e1d2b..59edd457 100644 --- a/app/src/main/java/com/github/catvod/spider/Proxy.java +++ b/app/src/main/java/com/github/catvod/spider/Proxy.java @@ -3,7 +3,6 @@ package com.github.catvod.spider; import com.github.catvod.crawler.Spider; import com.github.catvod.crawler.SpiderDebug; import com.github.catvod.net.OkHttp; -import com.github.catvod.utils.MultiThread; import java.io.ByteArrayInputStream; import java.util.Map; @@ -16,8 +15,6 @@ public class Proxy extends Spider { switch (params.get("do")) { case "ck": return new Object[]{200, "text/plain; charset=utf-8", new ByteArrayInputStream("ok".getBytes("UTF-8"))}; - case "multi": - return MultiThread.proxy(params); case "ali": return Ali.proxy(params); case "bili": diff --git a/app/src/main/java/com/github/catvod/utils/MultiThread.java b/app/src/main/java/com/github/catvod/utils/MultiThread.java deleted file mode 100644 index b545d206..00000000 --- a/app/src/main/java/com/github/catvod/utils/MultiThread.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.github.catvod.utils; - -import com.github.catvod.spider.Proxy; - -import java.net.URLEncoder; -import java.util.Map; -import java.util.TreeMap; - -public class MultiThread { - - public static String url(String url, int thread) { - return String.format(Proxy.getUrl() + "?do=multi&url=%s&thread=%d", URLEncoder.encode(url), thread); - } - - public static String go(String url, int thread) { - return String.format("http://127.0.0.1:7777/?url=%s&thread=%d", URLEncoder.encode(url), thread); - } - - public static Object[] proxy(Map params) throws Exception { - String url = params.get("url"); - int thread = Integer.parseInt(params.get("thread")); - Map headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (String key : params.keySet()) headers.put(key, params.get(key)); - MultiThreadedDownloader downloader = new MultiThreadedDownloader(url, removeHeaders(headers), thread); - return new Object[]{downloader.start()}; - } - - private static Map removeHeaders(Map headers) { - headers.remove("do"); - headers.remove("url"); - headers.remove("host"); - headers.remove("thread"); - headers.remove("remote-addr"); - headers.remove("http-client-ip"); - return headers; - } -} diff --git a/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java b/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java deleted file mode 100644 index 5099c76f..00000000 --- a/app/src/main/java/com/github/catvod/utils/MultiThreadedDownloader.java +++ /dev/null @@ -1,303 +0,0 @@ -package com.github.catvod.utils; - -import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse; - -import android.os.SystemClock; - -import com.github.catvod.net.OkHttp; -import com.github.catvod.spider.Init; - -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import fi.iki.elonen.NanoHTTPD; -import okhttp3.Response; - -public class MultiThreadedDownloader { - - //已开始下载的 chunk 队列 - private final BlockingQueue readyChunkQueue; - private final Map headers; - private final ExecutorService executor; - private final String url; - private final Lock lock; - //最多缓存多少个未被取走的chunk - private final int maxBufferedChunk = 1024; - //线程数 - private final int numThreads; - //读超时 - private final int timeout = 10; - //每个线程每轮下载的字节数 - private int chunkSize = 1024 * 64; - //下载起始的偏移量 - private long startOffset = -1; - //下载结束的偏移量 - private long endOffset = -1; - //当前读取的偏移量 - private long currentOffset = -1; - //下一个 chunk 的起始偏移量 - private long nextChunkStartOffset = -1; - //多线程是否在运行中 - private boolean running; - - public MultiThreadedDownloader(String url, Map headers, int numThreads) { - this.url = url; - this.headers = headers; - this.numThreads = numThreads; - this.lock = new ReentrantLock(); - this.readyChunkQueue = new LinkedBlockingQueue<>(); - this.executor = Executors.newFixedThreadPool(numThreads); - } - - //开始下载 - public NanoHTTPD.Response start() throws Exception { - //确保请求header包含Range - NanoHTTPD.Response.Status status = NanoHTTPD.Response.Status.PARTIAL_CONTENT; - if (!headers.containsKey("Range")) { - headers.put("Range", "bytes=0-"); - status = NanoHTTPD.Response.Status.OK; - } - - //尝试从Range判断下载的起始偏移量和结束偏移量 - Matcher matcher = Pattern.compile("bytes=(\\d+)-(\\d+)?").matcher(headers.get("Range")); - if (matcher.find()) { - startOffset = Long.parseLong(matcher.group(1)); - if (matcher.group(2) != null) { - endOffset = Long.parseLong(matcher.group(2)); - } - } else { - throw new Exception("invalid Range: " + headers.get("Range")); - } - - nextChunkStartOffset = startOffset; - currentOffset = startOffset; - - //建立连接 - Response response = OkHttp.newCall(getDownloadUrl(), headers); - - //尽早关闭连接,我们不需要body的数据 - if (response.body() != null) response.body().close(); - - //检查状态码 - int responseCode = response.code(); - if (responseCode < 200 || responseCode >= 300) { - throw new Exception("response code: " + responseCode); - } - - //获取header - String contentType = response.headers().get("Content-Type"); - String contentDisposition = response.headers().get("Content-Disposition"); - if (contentDisposition != null) contentType = Utils.getMimeType(contentDisposition); - if (contentType == null) throw new Exception("missing response header: Content-Type"); - String hContentLength = response.headers().get("Content-Length"); - if (hContentLength == null) throw new Exception("missing response header: Content-Length"); - long contentLength = Long.parseLong(hContentLength); - - //尝试从Content-Range获取下载结束的偏移量 - if (endOffset <= 0) { - String hContentRange = response.headers().get("Content-Range"); - if (hContentRange != null) { - matcher = Pattern.compile(".*/(\\d+)").matcher(hContentRange); - if (matcher.find()) { - endOffset = Long.parseLong(matcher.group(1)) - 1; - } else { - throw new Exception("invalid `Content-Range`: " + hContentRange); - } - } else { - throw new Exception("missing response header: Content-Range"); - } - } - - //如果下载的内容过小,那么减少chunkSize,使得每个线程刚好只需要下载一轮 - long downloadSize = endOffset - startOffset + 1; - long onetimeChunkSize = downloadSize / numThreads + 1; - if (chunkSize > onetimeChunkSize) { - chunkSize = (int) onetimeChunkSize; - } - - //开启多线程下载 - running = true; - for (int i = 0; i < numThreads; ++i) { - executor.execute(this::worker); - } - - //构造response - PipedInputStream input = new PipedInputStream(); - PipedOutputStream output = new PipedOutputStream(input); - NanoHTTPD.Response mResponse = newFixedLengthResponse(status, contentType, input, contentLength); - for (String key : response.headers().names()) { - String value = response.headers().get(key); - if (!key.equalsIgnoreCase("Content-Type") && !key.equalsIgnoreCase("Content-Length")) { - mResponse.addHeader(key, value); - } - } - - //搬运数据流 - Init.execute(() -> { - try { - while (true) { - byte[] buffer = read(); - if (buffer == null || buffer.length == 0) { - break; - } - output.write(buffer); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - stop(); - output.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - return mResponse; - } - - //读取文件内容 - private byte[] read() throws Exception { - //判断文件是否下载结束 - if (currentOffset > endOffset) { - stop(); - return null; - } - - //获取当前的chunk的数据 - Chunk currentChunk = readyChunkQueue.poll(timeout, TimeUnit.SECONDS); - if (currentChunk == null) { - stop(); - throw new Exception("read timeout"); - } - - while (running) { - byte[] buffer = currentChunk.get(); - if (buffer != null) { - currentOffset += buffer.length; - return buffer; - } else { - SystemClock.sleep(100); - } - } - - return null; - } - - private void worker() { - while (running) { - //打斷技能 - if (Thread.interrupted()) break; - - //生成下一个chunk - Chunk chunk = null; - lock.lock(); - long startOffset = nextChunkStartOffset; - nextChunkStartOffset += chunkSize; - if (startOffset <= endOffset) { - long endOffset = startOffset + chunkSize - 1; - if (endOffset > this.endOffset) { - endOffset = this.endOffset; - } - chunk = new Chunk(startOffset, endOffset); - readyChunkQueue.add(chunk); - } - lock.unlock(); - - //所有chunk已下载完 - if (chunk == null) { - break; - } - - while (running) { - //过多的数据未被取走,先休息一下,避免内存溢出 - if (chunk.startOffset - currentOffset >= (long) chunkSize * maxBufferedChunk) { - SystemClock.sleep(1000); - } else { - break; - } - } - - while (running) { - try { - //建立连接 - Map mHeaders = new HashMap<>(); - for (Map.Entry entry : headers.entrySet()) { - if (entry.getKey().equalsIgnoreCase("Range")) { - //设置下载范围 - mHeaders.put("Range", String.format(Locale.getDefault(), "bytes=%d-%d", chunk.startOffset, chunk.endOffset)); - } else { - mHeaders.put(entry.getKey(), entry.getValue()); - } - } - Response response = OkHttp.newCall(getDownloadUrl(), mHeaders); - - //检查状态码 - int responseCode = response.code(); - if (responseCode < 200 || responseCode >= 300) { - throw new Exception("response code: " + responseCode); - } - //接收数据 - if (response.body() != null) { - chunk.put(response.body().bytes()); - } - - break; - } catch (Exception e) { - e.printStackTrace(); - SystemClock.sleep(1000); - } - } - } - } - - private String getDownloadUrl() { - if (url.contains("/proxy?")) { - return OkHttp.string(url); - } else { - return url; - } - } - - public void stop() { - running = false; - } - - public void destory() { - running = false; - executor.shutdownNow(); - } - - private static class Chunk { - - private final long startOffset; - private final long endOffset; - private byte[] buffer; - - public Chunk(long startOffset, long endOffset) { - this.startOffset = startOffset; - this.endOffset = endOffset; - } - - public byte[] get() { - return buffer; - } - - public void put(byte[] buffer) { - this.buffer = buffer; - } - } -} \ No newline at end of file diff --git a/app/src/main/java/com/github/catvod/utils/ProxyVideo.java b/app/src/main/java/com/github/catvod/utils/ProxyVideo.java index 29dc61ea..98fc94fe 100644 --- a/app/src/main/java/com/github/catvod/utils/ProxyVideo.java +++ b/app/src/main/java/com/github/catvod/utils/ProxyVideo.java @@ -1,9 +1,12 @@ package com.github.catvod.utils; +import static fi.iki.elonen.NanoHTTPD.Response.Status; import static fi.iki.elonen.NanoHTTPD.newFixedLengthResponse; import com.github.catvod.net.OkHttp; +import java.net.URLEncoder; +import java.util.Locale; import java.util.Map; import fi.iki.elonen.NanoHTTPD; @@ -11,12 +14,13 @@ import okhttp3.Response; public class ProxyVideo { + public static NanoHTTPD.Response multi(String url, Map headers, int thread) throws Exception { + return proxy(String.format(Locale.getDefault(), "http://127.0.0.1:7777/?url=%s&thread=%d", URLEncoder.encode(url), thread), headers); + } + public static NanoHTTPD.Response proxy(String url, Map headers) throws Exception { - NanoHTTPD.Response.Status status = NanoHTTPD.Response.Status.PARTIAL_CONTENT; - if (!headers.containsKey("Range")) { - headers.put("Range", "bytes=0-"); - status = NanoHTTPD.Response.Status.OK; - } + Status status = headers.containsKey("Range") ? Status.PARTIAL_CONTENT : Status.OK; + if (!headers.containsKey("Range")) headers.put("Range", "bytes=0-"); Response response = OkHttp.newCall(url, headers); String contentType = response.headers().get("Content-Type"); String hContentLength = response.headers().get("Content-Length");