Add http pool to reduce connection overhead
This commit is contained in:
		
							parent
							
								
									a180e5649c
								
							
						
					
					
						commit
						3bd0488c66
					
				| 
						 | 
				
			
			@ -1,9 +1,11 @@
 | 
			
		|||
import httpclient, asyncdispatch, options, times, strutils, uri
 | 
			
		||||
import packedjson
 | 
			
		||||
import types, tokens, consts, parserutils
 | 
			
		||||
import types, tokens, consts, parserutils, http_pool
 | 
			
		||||
 | 
			
		||||
const rl = "x-rate-limit-"
 | 
			
		||||
 | 
			
		||||
var pool {.threadvar.}: HttpPool
 | 
			
		||||
 | 
			
		||||
proc genParams*(pars: openarray[(string, string)] = @[]; cursor="";
 | 
			
		||||
                count="20"; ext=true): seq[(string, string)] =
 | 
			
		||||
  result = timelineParams
 | 
			
		||||
| 
						 | 
				
			
			@ -18,6 +20,7 @@ proc genParams*(pars: openarray[(string, string)] = @[]; cursor="";
 | 
			
		|||
 | 
			
		||||
proc genHeaders*(token: Token = nil): HttpHeaders =
 | 
			
		||||
  result = newHttpHeaders({
 | 
			
		||||
    "connection": "keep-alive",
 | 
			
		||||
    "authorization": auth,
 | 
			
		||||
    "content-type": "application/json",
 | 
			
		||||
    "x-guest-token": if token == nil: "" else: token.tok,
 | 
			
		||||
| 
						 | 
				
			
			@ -29,14 +32,17 @@ proc genHeaders*(token: Token = nil): HttpHeaders =
 | 
			
		|||
  })
 | 
			
		||||
 | 
			
		||||
proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} =
 | 
			
		||||
  once:
 | 
			
		||||
    pool = HttpPool()
 | 
			
		||||
 | 
			
		||||
  var token = await getToken()
 | 
			
		||||
  if token.tok.len == 0:
 | 
			
		||||
    result = newJNull()
 | 
			
		||||
 | 
			
		||||
  var client = newAsyncHttpClient(headers=genHeaders(token))
 | 
			
		||||
  let headers = genHeaders(token)
 | 
			
		||||
  try:
 | 
			
		||||
    let
 | 
			
		||||
      resp = await client.get($url)
 | 
			
		||||
      resp = pool.use(headers): await c.get($url)
 | 
			
		||||
      body = await resp.body
 | 
			
		||||
 | 
			
		||||
    if body.startsWith('{') or body.startsWith('['):
 | 
			
		||||
| 
						 | 
				
			
			@ -54,5 +60,3 @@ proc fetch*(url: Uri; oldApi=false): Future[JsonNode] {.async.} =
 | 
			
		|||
  except Exception:
 | 
			
		||||
    echo "error: ", url
 | 
			
		||||
    result = newJNull()
 | 
			
		||||
  finally:
 | 
			
		||||
    client.close()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,37 @@
 | 
			
		|||
import asyncdispatch, httpclient
 | 
			
		||||
 | 
			
		||||
type
 | 
			
		||||
  HttpPool* = ref object
 | 
			
		||||
    conns*: seq[AsyncHttpClient]
 | 
			
		||||
 | 
			
		||||
var maxConns {.threadvar.}: int
 | 
			
		||||
 | 
			
		||||
let keepAlive* = newHttpHeaders({
 | 
			
		||||
  "Connection": "Keep-Alive"
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
proc setMaxHttpConns*(n: int) =
 | 
			
		||||
  maxConns = n
 | 
			
		||||
 | 
			
		||||
proc release*(pool: HttpPool; client: AsyncHttpClient) =
 | 
			
		||||
  if pool.conns.len >= maxConns:
 | 
			
		||||
    client.close()
 | 
			
		||||
  elif client != nil:
 | 
			
		||||
    pool.conns.insert(client)
 | 
			
		||||
 | 
			
		||||
template use*(pool: HttpPool; heads: HttpHeaders; body: untyped): untyped =
 | 
			
		||||
  var c {.inject.}: AsyncHttpClient
 | 
			
		||||
 | 
			
		||||
  if pool.conns.len == 0:
 | 
			
		||||
    c = newAsyncHttpClient(headers=heads)
 | 
			
		||||
  else:
 | 
			
		||||
    c = pool.conns.pop()
 | 
			
		||||
    c.headers = heads
 | 
			
		||||
 | 
			
		||||
  try:
 | 
			
		||||
    body
 | 
			
		||||
  except ProtocolError:
 | 
			
		||||
    # Twitter closed the connection, retry
 | 
			
		||||
    body
 | 
			
		||||
  finally:
 | 
			
		||||
    pool.release(c)
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@ from net import Port
 | 
			
		|||
 | 
			
		||||
import jester
 | 
			
		||||
 | 
			
		||||
import types, config, prefs, formatters, redis_cache, tokens
 | 
			
		||||
import types, config, prefs, formatters, redis_cache, http_pool, tokens
 | 
			
		||||
import views/[general, about]
 | 
			
		||||
import routes/[
 | 
			
		||||
  preferences, timeline, status, media, search, rss, list,
 | 
			
		||||
| 
						 | 
				
			
			@ -25,6 +25,7 @@ updateDefaultPrefs(fullCfg)
 | 
			
		|||
setCacheTimes(cfg)
 | 
			
		||||
setHmacKey(cfg.hmacKey)
 | 
			
		||||
setProxyEncoding(cfg.base64Media)
 | 
			
		||||
setMaxHttpConns(100)
 | 
			
		||||
 | 
			
		||||
waitFor initRedisPool(cfg)
 | 
			
		||||
stdout.write &"Connected to Redis at {cfg.redisHost}:{cfg.redisPort}\n"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -114,7 +114,8 @@ proc parseVideo(js: JsonNode): Video =
 | 
			
		|||
    views: js{"ext", "mediaStats", "r", "ok", "viewCount"}.getStr,
 | 
			
		||||
    available: js{"ext_media_availability", "status"}.getStr == "available",
 | 
			
		||||
    title: js{"ext_alt_text"}.getStr,
 | 
			
		||||
    durationMs: js{"duration_millis"}.getInt
 | 
			
		||||
    durationMs: js{"duration_millis"}.getInt,
 | 
			
		||||
    playbackType: mp4
 | 
			
		||||
  )
 | 
			
		||||
 | 
			
		||||
  with title, js{"additional_media_info", "title"}:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,8 @@
 | 
			
		|||
import asyncdispatch, httpclient, times, sequtils, strutils, json
 | 
			
		||||
import types, agents, consts
 | 
			
		||||
import types, agents, consts, http_pool
 | 
			
		||||
 | 
			
		||||
var
 | 
			
		||||
  clientPool {.threadvar.}: HttpPool
 | 
			
		||||
  tokenPool {.threadvar.}: seq[Token]
 | 
			
		||||
  lastFailed: Time
 | 
			
		||||
  minFail = initDuration(seconds=10)
 | 
			
		||||
| 
						 | 
				
			
			@ -10,22 +11,20 @@ proc fetchToken(): Future[Token] {.async.} =
 | 
			
		|||
  if getTime() - lastFailed < minFail:
 | 
			
		||||
    return Token()
 | 
			
		||||
 | 
			
		||||
  let
 | 
			
		||||
    headers = newHttpHeaders({
 | 
			
		||||
      "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
 | 
			
		||||
      "accept-language": "en-US,en;q=0.5",
 | 
			
		||||
      "connection": "keep-alive",
 | 
			
		||||
      "user-agent": getAgent(),
 | 
			
		||||
      "authorization": auth
 | 
			
		||||
    })
 | 
			
		||||
    client = newAsyncHttpClient(headers=headers)
 | 
			
		||||
  let headers = newHttpHeaders({
 | 
			
		||||
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
 | 
			
		||||
    "accept-language": "en-US,en;q=0.5",
 | 
			
		||||
    "connection": "keep-alive",
 | 
			
		||||
    "user-agent": getAgent(),
 | 
			
		||||
    "authorization": auth
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
  var
 | 
			
		||||
    resp: string
 | 
			
		||||
    tok: string
 | 
			
		||||
 | 
			
		||||
  try:
 | 
			
		||||
    resp = await client.postContent(activate)
 | 
			
		||||
    resp = clientPool.use(headers): await c.postContent(activate)
 | 
			
		||||
    tok = parseJson(resp)["guest_token"].getStr
 | 
			
		||||
 | 
			
		||||
    let time = getTime()
 | 
			
		||||
| 
						 | 
				
			
			@ -35,8 +34,6 @@ proc fetchToken(): Future[Token] {.async.} =
 | 
			
		|||
    lastFailed = getTime()
 | 
			
		||||
    echo "fetching token failed: ", e.msg
 | 
			
		||||
    result = Token()
 | 
			
		||||
  finally:
 | 
			
		||||
    client.close()
 | 
			
		||||
 | 
			
		||||
proc expired(token: Token): bool {.inline.} =
 | 
			
		||||
  const
 | 
			
		||||
| 
						 | 
				
			
			@ -74,6 +71,8 @@ proc poolTokens*(amount: int) {.async.} =
 | 
			
		|||
    release(await token)
 | 
			
		||||
 | 
			
		||||
proc initTokenPool*(cfg: Config) {.async.} =
 | 
			
		||||
  clientPool = HttpPool()
 | 
			
		||||
 | 
			
		||||
  while true:
 | 
			
		||||
    if tokenPool.countIt(not it.isLimited) < cfg.minTokens:
 | 
			
		||||
      await poolTokens(min(4, cfg.minTokens - tokenPool.len))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue