From 181ef3bca7b9d8708297a321732e24d5da7dc7ad Mon Sep 17 00:00:00 2001 From: Zed Date: Sat, 6 Jun 2020 09:27:25 +0200 Subject: [PATCH] Use snappy for rss compression, refactor --- nitter.nimble | 1 + src/nitter.nim | 2 +- src/redis_cache.nim | 35 ++++++++++++------- src/routes/rss.nim | 85 ++++++++++++++++++++++++--------------------- src/types.nim | 3 ++ 5 files changed, 73 insertions(+), 53 deletions(-) diff --git a/nitter.nimble b/nitter.nimble index 6ee8d5e..261b512 100644 --- a/nitter.nimble +++ b/nitter.nimble @@ -21,6 +21,7 @@ requires "https://github.com/zedeus/redis#head" requires "redpool#head" requires "msgpack4nim >= 0.3.1" requires "packedjson" +requires "snappy#head" # Tasks diff --git a/src/nitter.nim b/src/nitter.nim index f4eb8a2..0ca4cae 100644 --- a/src/nitter.nim +++ b/src/nitter.nim @@ -16,7 +16,7 @@ updateDefaultPrefs(fullCfg) setCacheTimes(cfg) setHmacKey(cfg.hmacKey) -initRedisPool(cfg) +waitFor initRedisPool(cfg) asyncCheck initTokenPool(cfg) createUnsupportedRouter(cfg) diff --git a/src/redis_cache.nim b/src/redis_cache.nim index f598046..4173a93 100644 --- a/src/redis_cache.nim +++ b/src/redis_cache.nim @@ -16,10 +16,20 @@ proc setCacheTimes*(cfg: Config) = rssCacheTime = cfg.rssCacheTime * 60 listCacheTime = cfg.listCacheTime * 60 -proc initRedisPool*(cfg: Config) = +proc initRedisPool*(cfg: Config) {.async.} = try: - pool = waitFor newRedisPool(cfg.redisConns, maxConns=cfg.redisMaxConns, - host=cfg.redisHost, port=cfg.redisPort) + pool = await newRedisPool(cfg.redisConns, maxConns=cfg.redisMaxConns, + host=cfg.redisHost, port=cfg.redisPort) + + pool.withAcquire(r): + let snappyRss = await r.get("snappyRss") + if snappyRss == redisNil: + let list = await r.scan(newCursor(0), "rss:*", 10000) + r.startPipelining() + for rss in list: + discard await r.del(rss) + discard await r.flushPipeline() + await r.setk("snappyRss", "true") except OSError: echo "Failed to connect to Redis." quit(1) @@ -60,12 +70,12 @@ proc cacheProfileId*(username, id: string) {.async.} = pool.withAcquire(r): discard await r.hset("p:", toLower(username), id) -proc cacheRss*(query, rss, cursor: string) {.async.} = +proc cacheRss*(query: string; rss: Rss) {.async.} = let key = "rss:" & query pool.withAcquire(r): r.startPipelining() - discard await r.hset(key, "rss", rss) - discard await r.hset(key, "min", cursor) + discard await r.hset(key, "rss", rss.feed) + discard await r.hset(key, "min", rss.cursor) discard await r.expire(key, rssCacheTime) discard await r.flushPipeline() @@ -104,10 +114,11 @@ proc getCachedList*(username=""; name=""; id=""): Future[List] {.async.} = result = await getGraphList(username, name) await cache(result) -proc getCachedRss*(key: string): Future[(string, string)] {.async.} = - var res: Table[string, string] +proc getCachedRss*(key: string): Future[Rss] {.async.} = + let k = "rss:" & key pool.withAcquire(r): - res = await r.hgetall("rss:" & key) - - if "rss" in res: - result = (res["rss"], res.getOrDefault("min")) + result.cursor = await r.hget(k, "min") + if result.cursor.len > 2: + result.feed = await r.hget(k, "rss") + else: + result.cursor.setLen 0 diff --git a/src/routes/rss.nim b/src/routes/rss.nim index 8072997..1ff87c6 100644 --- a/src/routes/rss.nim +++ b/src/routes/rss.nim @@ -1,15 +1,15 @@ -import asyncdispatch, strutils, tables, times, sequtils, hashes +import asyncdispatch, strutils, tables, times, sequtils, hashes, snappy import jester import router_utils, timeline -import ".."/[redis_cache, query], ../views/general +import ../query, ../views/general include "../views/rss.nimf" -export times, hashes +export times, hashes, snappy -proc showRss*(req: Request; hostname: string; query: Query): Future[(string, string)] {.async.} = +proc showRss*(req: Request; hostname: string; query: Query): Future[Rss] {.async.} = var profile: Profile var timeline: Timeline let @@ -31,19 +31,21 @@ proc showRss*(req: Request; hostname: string; query: Query): Future[(string, str ) if profile.suspended: - return (profile.username, "suspended") + return Rss(feed: profile.username, cursor: "suspended") if profile.fullname.len > 0: - let rss = renderTimelineRss(timeline, profile, hostname, multi=(names.len > 1)) - return (rss, timeline.bottom) + let rss = compress renderTimelineRss(timeline, profile, hostname, + multi=(names.len > 1)) + return Rss(feed: rss, cursor: timeline.bottom) -template respRss*(rss, minId) = - if rss.len == 0: +template respRss*(rss) = + if rss.cursor.len == 0: resp Http404, showError("User \"" & @"name" & "\" not found", cfg) - elif minId == "suspended": - resp Http404, showError(getSuspended(rss), cfg) - let headers = {"Content-Type": "application/rss+xml; charset=utf-8", "Min-Id": minId} - resp Http200, headers, rss + elif rss.cursor.len == 9 and rss.cursor == "suspended": + resp Http404, showError(getSuspended(rss.feed), cfg) + let headers = {"Content-Type": "application/rss+xml; charset=utf-8", + "Min-Id": rss.cursor} + resp Http200, headers, uncompress rss.feed proc createRssRouter*(cfg: Config) = router rss: @@ -58,33 +60,34 @@ proc createRssRouter*(cfg: Config) = let cursor = getCursor() key = $hash(genQueryUrl(query)) & cursor - (cRss, cCursor) = await getCachedRss(key) - if cRss.len > 0: - respRss(cRss, cCursor) + var rss = await getCachedRss(key) + if rss.cursor.len > 0: + respRss(rss) - let - tweets = await getSearch[Tweet](query, cursor) - rss = renderSearchRss(tweets.content, query.text, genQueryUrl(query), cfg.hostname) + let tweets = await getSearch[Tweet](query, cursor) + rss.cursor = tweets.bottom + rss.feed = compress renderSearchRss(tweets.content, query.text, + genQueryUrl(query), cfg.hostname) - await cacheRss(key, rss, tweets.bottom) - respRss(rss, tweets.bottom) + await cacheRss(key, rss) + respRss(rss) get "/@name/rss": cond '.' notin @"name" let cursor = getCursor() name = @"name" - (cRss, cCursor) = await getCachedRss(name & cursor) + key = name & cursor - if cRss.len > 0: - respRss(cRss, cCursor) + var rss = await getCachedRss(key) + if rss.cursor.len > 0: + respRss(rss) - let (rss, rssCursor) = await showRss(request, cfg.hostname, - Query(fromUser: @[name])) + rss = await showRss(request, cfg.hostname, Query(fromUser: @[name])) - await cacheRss(name & cursor, rss, rssCursor) - respRss(rss, rssCursor) + await cacheRss(key, rss) + respRss(rss) get "/@name/@tab/rss": cond '.' notin @"name" @@ -102,28 +105,30 @@ proc createRssRouter*(cfg: Config) = key &= hash(genQueryUrl(query)) key &= getCursor() - let (cRss, cCursor) = await getCachedRss(key) - if cRss.len > 0: - respRss(cRss, cCursor) + var rss = await getCachedRss(key) + if rss.cursor.len > 0: + respRss(rss) - let (rss, rssCursor) = await showRss(request, cfg.hostname, query) - await cacheRss(key, rss, rssCursor) - respRss(rss, rssCursor) + rss = await showRss(request, cfg.hostname, query) + + await cacheRss(key, rss) + respRss(rss) get "/@name/lists/@list/rss": cond '.' notin @"name" let cursor = getCursor() key = @"name" & "/" & @"list" & cursor - (cRss, cCursor) = await getCachedRss(key) - if cRss.len > 0: - respRss(cRss, cCursor) + var rss = await getCachedRss(key) + if rss.cursor.len > 0: + respRss(rss) let list = await getCachedList(@"name", @"list") timeline = await getListTimeline(list.id, cursor) - rss = renderListRss(timeline.content, list, cfg.hostname) + rss.cursor = timeline.bottom + rss.feed = compress renderListRss(timeline.content, list, cfg.hostname) - await cacheRss(key, rss, timeline.bottom) - respRss(rss, timeline.bottom) + await cacheRss(key, rss) + respRss(rss) diff --git a/src/types.nim b/src/types.nim index d2e8c45..b8849b0 100644 --- a/src/types.nim +++ b/src/types.nim @@ -212,5 +212,8 @@ type redisConns*: int redisMaxConns*: int + Rss* = object + feed*, cursor*: string + proc contains*(thread: Chain; tweet: Tweet): bool = thread.content.anyIt(it.id == tweet.id)