diff --git a/app/src/main/java/com/github/catvod/utils/BufferedHttpServer.kt b/app/src/main/java/com/github/catvod/utils/BufferedHttpServer.kt new file mode 100644 index 00000000..5ee12b91 --- /dev/null +++ b/app/src/main/java/com/github/catvod/utils/BufferedHttpServer.kt @@ -0,0 +1,360 @@ +package com.github.catvod.utils + +import com.github.catvod.crawler.SpiderDebug +import java.io.BufferedOutputStream +import java.io.BufferedReader +import java.io.IOException +import java.io.InputStreamReader +import java.net.ServerSocket +import java.net.Socket +import java.net.URLDecoder +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.text.Charsets.UTF_8 + +class AdvancedHttpServer(private val port: Int) { + private val serverSocket: ServerSocket + private val threadPool: ExecutorService + private var isRunning = false + private val routes = mutableMapOf Unit>() + + init { + serverSocket = ServerSocket(port) + threadPool = Executors.newFixedThreadPool(10) + } + + fun addRoutes(path: String, handler: (Request, Response) -> Unit) { + routes[path] = handler + } + + + fun start() { + isRunning = true + SpiderDebug.log("服务器已启动,监听端口: $port") + + while (isRunning) { + try { + val clientSocket = serverSocket.accept() + threadPool.execute { handleRequest(clientSocket) } + } catch (e: IOException) { + e.printStackTrace();SpiderDebug.log("出错:" + e.message) + if (isRunning) e.printStackTrace();SpiderDebug.log("出错:" + e.message) + + } + } + } + + private fun handleRequest(clientSocket: Socket) { + try { + val reader = BufferedReader(InputStreamReader(clientSocket.inputStream, UTF_8)) + val writer = BufferedOutputStream(clientSocket.outputStream) + + // 解析请求行 + val requestLine = reader.readLine() ?: "" + val (method, path, _) = parseRequestLine(requestLine) + + // 解析路径和查询参数 + val (basePath, queryParams) = parsePath(path) + + // 读取请求头 + val headers = mutableMapOf() + var line: String? + while (reader.readLine().also { line = it } != null && line!!.isNotEmpty()) { + val colonIndex = line!!.indexOf(':') + if (colonIndex > 0) { + headers[line!!.substring(0, colonIndex).trim()] = + line!!.substring(colonIndex + 1).trim() + } + } + + // 解析请求体参数 + val contentLength = headers["Content-Length"]?.toIntOrNull() ?: 0 + val requestBody = if (contentLength > 0) { + buildString { + repeat(contentLength) { + val char = reader.read().takeIf { it != -1 }?.toChar() ?: return@buildString + append(char) + } + } + } else "" + + val contentType = headers["Content-Type"] ?: "" + val bodyParams = parseRequestBody(contentType, requestBody) + + // 创建请求对象 + val request = Request( + method = method, + path = basePath, + queryParams = queryParams, + headers = headers, + body = requestBody, + bodyParams = bodyParams + ) + + // 创建响应处理器 + val response = Response(writer) + + // 路由处理 + routeRequest(request, response) + + response.flush() + } catch (e: IOException) { + e.printStackTrace() + SpiderDebug.log("出错:" + e.message) + } finally { + clientSocket.close() + } + } + + private fun routeRequest(request: Request, response: Response) { + val route = routes[request.path] + if (route == null) { + handleNotFound(response) + } else { + route.invoke(request, response) + } + } + + private fun handleRoot(request: Request, response: Response) { + response.setContentType("text/html") + response.start() + + response.write("") + response.write("

高级HTTP服务器

") + response.write("

支持参数解析和分块响应

") + response.write("

功能演示

") + response.write("") + + // 演示表单提交 + response.write("

表单提交测试

") + response.write("
") + response.write("Name:
") + response.write("Age:
") + response.write("") + response.write("
") + + response.write("") + } + + private fun handleEcho(request: Request, response: Response) { + response.setContentType("text/html") + response.start() + + response.write("") + response.write("

Echo服务

") + + // 输出请求信息 + response.write("

请求信息

") + response.write("

方法: ${request.method}

") + response.write("

路径: ${request.path}

") + + // 输出查询参数 + response.write("

查询参数

") + response.write("") + + // 输出请求头 + response.write("

请求头

") + response.write("") + + // 输出请求体 + response.write("

请求体

") + response.write("
${request.body}
") + + // 输出解析后的参数 + response.write("

解析后的参数

") + response.write("") + + response.write("") + } + + private fun handleStreamResponse(response: Response) { + response.setContentType("text/plain") + response.start() + + for (i in 1..5) { + response.write("这是第 $i 部分内容\n") + Thread.sleep(500) + } + } + + private fun handleSlowResponse(response: Response) { + response.setContentType("text/event-stream") + response.setHeader("Cache-Control", "no-cache") + response.setHeader("Connection", "keep-alive") + response.start() + + for (i in 1..10) { + response.write("data: 消息 $i\n\n") + response.flush() + Thread.sleep(1000) + } + } + + private fun handleNotFound(response: Response) { + response.setStatusCode(404) + response.setContentType("text/html") + response.start() + response.write("

404 Not Found

") + } + + private fun parseRequestLine(requestLine: String): Triple { + val parts = requestLine.split(" ", limit = 3) + return when { + parts.size >= 3 -> Triple(parts[0], parts[1], parts[2]) + parts.size == 2 -> Triple(parts[0], parts[1], "HTTP/1.1") + else -> Triple("GET", "/", "HTTP/1.1") + } + } + + private fun parsePath(path: String): Pair> { + val queryIndex = path.indexOf('?') + return if (queryIndex >= 0) { + Pair(path.substring(0, queryIndex), parseQueryString(path.substring(queryIndex + 1))) + } else { + Pair(path, emptyMap()) + } + } + + private fun parseQueryString(queryString: String): Map { + return queryString.split("&").filter { it.isNotEmpty() }.map { param -> + val equalsIndex = param.indexOf('=') + if (equalsIndex >= 0) { + val key = URLDecoder.decode(param.substring(0, equalsIndex), "UTF-8") + val value = URLDecoder.decode(param.substring(equalsIndex + 1), "UTF-8") + key to value + } else { + URLDecoder.decode(param, "UTF-8") to "" + } + }.toMap() + } + + private fun parseRequestBody(contentType: String, body: String): Map { + return when { + contentType.contains("application/x-www-form-urlencoded") -> parseQueryString(body) + + contentType.contains("multipart/form-data") -> parseMultipartFormData(contentType, body) + + else -> emptyMap() + } + } + + private fun parseMultipartFormData(contentType: String, body: String): Map { + // 简化的multipart/form-data解析,实际实现需要处理boundary等 + return emptyMap() + } + + fun stop() { + isRunning = false + threadPool.shutdown() + serverSocket.close() + } + + data class Request( + val method: String, + val path: String, + val queryParams: Map, + val headers: Map, + val body: String, + val bodyParams: Map + ) + + class Response(private val writer: BufferedOutputStream) { + private val headers = mutableMapOf() + private var contentType = "text/plain" + private var statusCode = 200 + private var started = AtomicBoolean(false) + + fun setContentType(contentType: String) { + this.contentType = contentType + } + + fun setHeader(name: String, value: String) { + headers[name] = value + } + + fun setStatusCode(statusCode: Int) { + this.statusCode = statusCode + } + + private val codeMap = mutableMapOf( + 206 to "Partial Content", + 200 to "OK", + 404 to "NOT FOUND", + 400 to "BAD REQUEST", + 401 to "UNAUTHORIZED", + 403 to "FORBIDDEN", + 405 to "METHOD NOT ALLOWED", + 408 to "REQUEST TIMEOUT", + 413 to "PAYLOAD TOO LARGE", + 414 to "URI TOO LONG", + 415 to "UNSUPPORTED MEDIA TYPE", + 429 to "TOO MANY REQUESTS", + 500 to "INTERNAL SERVER ERROR", + 501 to "NOT IMPLEMENTED", + 503 to "SERVICE UNAVAILABLE", + 504 to "GATEWAY TIMEOUT", + 505 to "HTTP VERSION NOT SUPPORTED", + 507 to "INSUFFICIENT STORAGE", + 511 to "NETWORK AUTHENTICATION REQUIRED" + ) + + fun start() { + if (started.compareAndSet(false, true)) { + writer.write("HTTP/1.1 $statusCode ${codeMap[statusCode]}\r\n".toByteArray(UTF_8)) + + writer.write("Content-Type: $contentType; charset=utf-8\r\n".toByteArray(UTF_8)) + + headers.forEach { (name, value) -> + writer.write("$name: $value\r\n".toByteArray(UTF_8)) + } + + writer.write("\r\n".toByteArray(UTF_8)) + writer.flush() + } + } + + fun write(content: ByteArray) { + if (!started.get()) { + start() + } + writer.write(content) + } + + fun write(content: String) { + if (!started.get()) { + start() + } + writer.write(content.toByteArray(UTF_8)) + } + + fun flush() { + writer.flush() + } + } +} + diff --git a/app/src/main/java/com/github/catvod/utils/ProxyServer.kt b/app/src/main/java/com/github/catvod/utils/ProxyServer.kt index a50dd566..685cab3f 100644 --- a/app/src/main/java/com/github/catvod/utils/ProxyServer.kt +++ b/app/src/main/java/com/github/catvod/utils/ProxyServer.kt @@ -3,16 +3,12 @@ package com.github.catvod.utils import com.github.catvod.crawler.SpiderDebug import com.github.catvod.net.OkHttp import com.google.gson.Gson -import com.sun.net.httpserver.HttpExchange -import com.sun.net.httpserver.HttpServer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import java.io.BufferedOutputStream -import java.io.OutputStreamWriter -import java.net.InetSocketAddress +import kotlinx.coroutines.runBlocking import java.nio.charset.Charset @@ -20,157 +16,142 @@ object ProxyServer { private val THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2 private const val partSize = 1024 * 1024 * 1 private var port = 12345 - private var httpServer: HttpServer? = null + private var httpServer: AdvancedHttpServer? = null private val infos = mutableMapOf>>(); fun stop() { - httpServer?.stop(1_000) + httpServer?.stop() } fun start() { try { - httpServer = HttpServer.create(InetSocketAddress(port), 100); - httpServer?.createContext("/") { httpExchange -> + httpServer = AdvancedHttpServer(port) + httpServer!!.addRoutes("/") { _, response -> run { - httpExchange.sendResponseHeaders(200, "server running ".length.toLong()); - - val os = httpExchange.responseBody - val writer = OutputStreamWriter(os, Charset.defaultCharset()) - writer.write("server running ") - writer.close() - os.close() - httpExchange.close() + response.setContentType("text/html") + response.start() + response.write("Hello, world!") } - } - httpServer?.createContext("/proxy") { httpExchange -> + }; + httpServer!!.addRoutes("/proxy") { req, response -> run { - val params = queryToMap(httpExchange.requestURI.query) - - val url = Util.base64Decode(params?.get("url")) + val url = Util.base64Decode(req.queryParams["url"]) val header: Map = Gson().fromJson>( - Util.base64Decode(params?.get("headers")), MutableMap::class.java + Util.base64Decode(req.queryParams["headers"]), MutableMap::class.java ) - CoroutineScope(Dispatchers.IO).launch { - proxyAsync( - url, header, httpExchange - ) - } - + proxyAsync(url, header, req, response) } } - httpServer?.executor = null; - - httpServer?.start(); - + httpServer!!.start() } catch (e: Exception) { SpiderDebug.log("start server e:" + e.message) e.printStackTrace() - httpServer?.stop(1000) + httpServer?.stop() } - port = httpServer?.address?.port!! - SpiderDebug.log("ktorServer start on " + port) + + SpiderDebug.log("Server start on $port") } - private suspend fun proxyAsync( - url: String, headers: Map, httpExchange: HttpExchange + private fun proxyAsync( + url: String, + headers: Map, + request: AdvancedHttpServer.Request, + response: AdvancedHttpServer.Response ) { - val channels = List(THREAD_NUM) { Channel() } - val outputStream = httpExchange.responseBody - - val bufferedOutputStream = BufferedOutputStream(outputStream) - - try { - SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM") + runBlocking { + val channels = List(THREAD_NUM) { Channel() } - var rangeHeader = httpExchange.requestHeaders.getFirst("Range") - //没有range头 - if (rangeHeader.isNullOrEmpty()) { - // 处理初始请求 - rangeHeader = "bytes=0-" - } - headers.toMutableMap().apply { - put("Range", rangeHeader) - } - - // 解析范围请求 - val (startPoint, endPoint) = parseRangePoint( - rangeHeader - ) - - //缓存response header - var info = infos[url] - if (info == null) { - info = getInfo(url, headers) - infos[url] = info - } - - SpiderDebug.log("startPoint: $startPoint; endPoint: $endPoint") - val contentLength = getContentLength(info) - SpiderDebug.log("contentLength: $contentLength") - val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint - - httpExchange.responseHeaders.apply { - set("Connection", "keep-alive") - set("Content-Length", (finalEndPoint - startPoint + 1).toString()) - set("Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength") - set("Content-Type", info["Content-Type"]?.get(0)) - } - httpExchange.sendResponseHeaders(206, 0) - - // 使用流式响应 - - var currentStart = startPoint + try { + SpiderDebug.log("--proxyMultiThread: THREAD_NUM: $THREAD_NUM") - // 启动生产者协程下载数据 + var rangeHeader = request.headers["Range"] + //没有range头 + if (rangeHeader.isNullOrEmpty()) { + // 处理初始请求 + rangeHeader = "bytes=0-" + } + headers.toMutableMap().apply { + put("Range", rangeHeader) + } - val producerJob = mutableListOf() + // 解析范围请求 + val (startPoint, endPoint) = parseRangePoint( + rangeHeader + ) - while (currentStart <= finalEndPoint) { - producerJob.clear() - // 创建通道用于接收数据块 + //缓存response header + var info = infos[url] + if (info == null) { + info = getInfo(url, headers) + infos[url] = info + } - for (i in 0 until THREAD_NUM) { + SpiderDebug.log("startPoint: $startPoint; endPoint: $endPoint") + val contentLength = getContentLength(info) + SpiderDebug.log("contentLength: $contentLength") + val finalEndPoint = if (endPoint == -1L) contentLength - 1 else endPoint + response.setContentType("text/html") - if (currentStart > finalEndPoint) break - val chunkStart = currentStart - val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint) - producerJob += CoroutineScope(Dispatchers.IO).launch { - // 异步下载数据块 - val data = getVideoStream(chunkStart, chunkEnd, url, headers) - channels[i].send(data) + response.setHeader("Connection", "keep-alive") + response.setHeader("Content-Length", (finalEndPoint - startPoint + 1).toString()) + response.setHeader( + "Content-Range", "bytes $startPoint-$finalEndPoint/$contentLength" + ) + info["Content-Type"]?.get(0)?.let { response.setHeader("Content-Type", it) } + + response.setStatusCode(206) + response.start() + // 使用流式响应 + + var currentStart = startPoint + + + // 启动生产者协程下载数据 + + val producerJob = mutableListOf() + + while (currentStart <= finalEndPoint) { + producerJob.clear() + // 创建通道用于接收数据块 + + for (i in 0 until THREAD_NUM) { + + if (currentStart > finalEndPoint) break + val chunkStart = currentStart + val chunkEnd = minOf(currentStart + partSize - 1, finalEndPoint) + producerJob += CoroutineScope(Dispatchers.IO).launch { + // 异步下载数据块 + val data = getVideoStream(chunkStart, chunkEnd, url, headers) + channels[i].send(data) + + } + currentStart = chunkEnd + 1 } - currentStart = chunkEnd + 1 - } - for ((index, _) in producerJob.withIndex()) { + for ((index, _) in producerJob.withIndex()) { - val data = channels[index].receive() - SpiderDebug.log("Received chunk: ${data.size} bytes") - - bufferedOutputStream.write(data) - bufferedOutputStream.flush() + val data = channels[index].receive() + SpiderDebug.log("Received chunk: ${data.size} bytes") + response.write(data) + } } + } catch (e: Exception) { + SpiderDebug.log("proxyAsync error: ${e.message}") + e.printStackTrace() + response.write("proxyAsync error: ${e.message}") + + } finally { + channels.forEach { it.close() } } - } catch (e: Exception) { - SpiderDebug.log("error: ${e.message}") - - outputStream.write("error: ${e.message}".toByteArray()) - - } finally { - channels.forEach { it.close() } - bufferedOutputStream.close() - outputStream.close() - - httpExchange.close() } } diff --git a/jar/custom_spider.jar b/jar/custom_spider.jar index d2a40406..4c87d7cc 100644 Binary files a/jar/custom_spider.jar and b/jar/custom_spider.jar differ diff --git a/jar/custom_spider.jar.md5 b/jar/custom_spider.jar.md5 index d9d372ba..cb9e3149 100644 --- a/jar/custom_spider.jar.md5 +++ b/jar/custom_spider.jar.md5 @@ -1 +1 @@ -567abc0625af2feb5541627c7505dd4e +c1590ab280fefce3b7d9ebf4d8b029a5 diff --git a/json/test.json b/json/test.json index a3e60006..961cbc0f 100644 --- a/json/test.json +++ b/json/test.json @@ -1,5 +1,5 @@ { - "spider": "https://gh.llkk.cc/https://raw.githubusercontent.com/lushunming/AndroidCatVodSpider/multiThreadNew/jar/custom_spider.jar;md5;567abc0625af2feb5541627c7505dd4e", + "spider": "https://gh.llkk.cc/https://raw.githubusercontent.com/lushunming/AndroidCatVodSpider/multiThreadNew/jar/custom_spider.jar;md5;c1590ab280fefce3b7d9ebf4d8b029a5", "lives": [ { "name": "电视直播",