nitter/src/redis_cache.nim

193 lines
5.7 KiB
Nim
Raw Normal View History

2021-12-27 01:37:38 +00:00
# SPDX-License-Identifier: AGPL-3.0-only
import asyncdispatch, times, strformat, strutils, tables, hashes
import redis, redpool, flatty, supersnappy
2020-06-01 00:16:24 +00:00
import types, api
2022-01-02 06:00:44 +00:00
const
redisNil = "\0\0"
baseCacheTime = 60 * 60
2020-06-01 00:16:24 +00:00
var
2022-01-02 10:21:03 +00:00
pool: RedisPool
2020-06-01 00:16:24 +00:00
rssCacheTime: int
listCacheTime*: int
template dawait(future) =
discard await future
# flatty can't serialize DateTime, so we need to define this
proc toFlatty*(s: var string, x: DateTime) =
s.toFlatty(x.toTime().toUnix())
proc fromFlatty*(s: string, i: var int, x: var DateTime) =
2021-12-30 12:30:12 +00:00
var unix: int64
s.fromFlatty(i, unix)
x = fromUnix(unix).utc()
2020-06-01 00:16:24 +00:00
proc setCacheTimes*(cfg: Config) =
rssCacheTime = cfg.rssCacheTime * 60
listCacheTime = cfg.listCacheTime * 60
proc migrate*(key, match: string) {.async.} =
pool.withAcquire(r):
let hasKey = await r.get(key)
if hasKey == redisNil:
let list = await r.scan(newCursor(0), match, 100000)
r.startPipelining()
for item in list:
dawait r.del(item)
await r.setk(key, "true")
dawait r.flushPipeline()
proc initRedisPool*(cfg: Config) {.async.} =
2020-06-01 11:36:20 +00:00
try:
pool = await newRedisPool(cfg.redisConns, cfg.redisMaxConns,
host=cfg.redisHost, port=cfg.redisPort,
password=cfg.redisPassword)
await migrate("flatty", "*:*")
await migrate("snappyRss", "rss:*")
2021-12-26 21:59:27 +00:00
await migrate("userBuckets", "p:*")
await migrate("profileDates", "p:*")
await migrate("profileStats", "p:*")
await migrate("userType", "p:*")
pool.withAcquire(r):
# optimize memory usage for user ID buckets
await r.configSet("hash-max-ziplist-entries", "1000")
2020-06-01 11:36:20 +00:00
except OSError:
2020-06-07 07:18:40 +00:00
stdout.write "Failed to connect to Redis.\n"
stdout.flushFile
quit(1)
2020-06-01 00:16:24 +00:00
template uidKey(name: string): string = "pid:" & $(hash(name) div 1_000_000)
template userKey(name: string): string = "p:" & name
2021-10-02 08:13:56 +00:00
template listKey(l: List): string = "l:" & l.id
template tweetKey(id: int64): string = "t:" & $id
2020-06-01 00:16:24 +00:00
proc get(query: string): Future[string] {.async.} =
pool.withAcquire(r):
result = await r.get(query)
2022-01-06 02:57:14 +00:00
proc setEx(key: string; time: int; data: string) {.async.} =
2020-06-01 00:16:24 +00:00
pool.withAcquire(r):
dawait r.setEx(key, time, data)
2020-06-01 00:16:24 +00:00
proc cacheUserId(username, id: string) {.async.} =
if username.len == 0 or id.len == 0: return
let name = toLower(username)
pool.withAcquire(r):
dawait r.hSet(name.uidKey, name, id)
2020-06-02 20:36:02 +00:00
proc cache*(data: List) {.async.} =
2022-01-06 02:57:14 +00:00
await setEx(data.listKey, listCacheTime, compress(toFlatty(data)))
2020-06-01 00:16:24 +00:00
2020-06-16 22:20:34 +00:00
proc cache*(data: PhotoRail; name: string) {.async.} =
2022-01-06 02:57:14 +00:00
await setEx("pr:" & toLower(name), baseCacheTime, compress(toFlatty(data)))
2020-06-01 00:16:24 +00:00
proc cache*(data: User) {.async.} =
if data.username.len == 0: return
let name = toLower(data.username)
await cacheUserId(name, data.id)
2020-06-01 00:16:24 +00:00
pool.withAcquire(r):
dawait r.setEx(name.userKey, baseCacheTime, compress(toFlatty(data)))
2020-06-01 00:16:24 +00:00
proc cache*(data: Tweet) {.async.} =
if data.isNil or data.id == 0: return
2020-06-02 22:03:41 +00:00
pool.withAcquire(r):
dawait r.setEx(data.id.tweetKey, baseCacheTime, compress(toFlatty(data)))
2020-06-02 22:03:41 +00:00
proc cacheRss*(query: string; rss: Rss) {.async.} =
2020-06-01 00:16:24 +00:00
let key = "rss:" & query
pool.withAcquire(r):
dawait r.hSet(key, "min", rss.cursor)
2022-01-27 13:46:24 +00:00
if rss.cursor != "suspended":
dawait r.hSet(key, "rss", compress(rss.feed))
dawait r.expire(key, rssCacheTime)
2020-06-01 00:16:24 +00:00
template deserialize(data, T) =
try:
result = fromFlatty(uncompress(data), T)
except:
echo "Decompression failed($#): '$#'" % [astToStr(T), data]
proc getUserId*(username: string): Future[string] {.async.} =
let name = toLower(username)
2020-06-01 00:16:24 +00:00
pool.withAcquire(r):
result = await r.hGet(name.uidKey, name)
2020-06-01 00:16:24 +00:00
if result == redisNil:
let user = await getUser(username)
if user.suspended:
return "suspended"
else:
await cacheUserId(name, user.id)
return user.id
proc getCachedUser*(username: string; fetch=true): Future[User] {.async.} =
2020-06-01 00:16:24 +00:00
let prof = await get("p:" & toLower(username))
if prof != redisNil:
prof.deserialize(User)
2020-06-02 22:03:41 +00:00
elif fetch:
let userId = await getUserId(username)
result = await getGraphUser(userId)
await cache(result)
2020-06-01 00:16:24 +00:00
proc getCachedUsername*(userId: string): Future[string] {.async.} =
let
key = "i:" & userId
username = await get(key)
if username != redisNil:
2021-12-30 00:48:48 +00:00
result = username
else:
let user = await getUserById(userId)
result = user.username
await setEx(key, baseCacheTime, result)
proc getCachedTweet*(id: int64): Future[Tweet] {.async.} =
if id == 0: return
let tweet = await get(id.tweetKey)
if tweet != redisNil:
tweet.deserialize(Tweet)
else:
2022-01-23 06:45:01 +00:00
result = await getStatus($id)
if result.isNil:
await cache(result)
2020-06-16 22:20:34 +00:00
proc getCachedPhotoRail*(name: string): Future[PhotoRail] {.async.} =
if name.len == 0: return
let rail = await get("pr:" & toLower(name))
2020-06-01 00:16:24 +00:00
if rail != redisNil:
rail.deserialize(PhotoRail)
2020-06-01 00:16:24 +00:00
else:
2020-06-16 22:20:34 +00:00
result = await getPhotoRail(name)
await cache(result, name)
2020-06-01 00:16:24 +00:00
2021-10-02 08:13:56 +00:00
proc getCachedList*(username=""; slug=""; id=""): Future[List] {.async.} =
let list = if id.len == 0: redisNil
else: await get("l:" & id)
2020-06-01 00:16:24 +00:00
if list != redisNil:
list.deserialize(List)
2020-06-01 00:16:24 +00:00
else:
if id.len > 0:
2021-10-02 08:13:56 +00:00
result = await getGraphList(id)
2020-06-01 00:16:24 +00:00
else:
2021-10-02 08:13:56 +00:00
result = await getGraphListBySlug(username, slug)
2020-06-02 20:36:02 +00:00
await cache(result)
2020-06-01 00:16:24 +00:00
proc getCachedRss*(key: string): Future[Rss] {.async.} =
let k = "rss:" & key
2020-06-01 00:16:24 +00:00
pool.withAcquire(r):
2022-01-06 02:57:14 +00:00
result.cursor = await r.hGet(k, "min")
if result.cursor.len > 2:
2022-01-27 13:46:24 +00:00
if result.cursor != "suspended":
let feed = await r.hGet(k, "rss")
if feed.len > 0 and feed != redisNil:
try: result.feed = uncompress feed
except: echo "Decompressing RSS failed: ", feed
else:
result.cursor.setLen 0