Files
chatapp/backend/src/messenger/cache.rs
T

86 lines
2.5 KiB
Rust

use redis::AsyncCommands;
use rocket_db_pools::Connection;
use crate::{
db::{Postgres, Redis},
messenger::ChatMsg,
};
// Helper function to cache message in Redis
pub async fn insert(
cache: &mut Connection<Redis>,
channel_id: i32,
msg: &super::ChatMsg,
) -> Result<(), redis::RedisError> {
let key = format!("messages:{}", channel_id);
let msg_json = serde_json::to_string(msg).unwrap();
redis::pipe()
.atomic()
.lpush(&key, &msg_json)
.ltrim(&key, 0, 99)
.expire(&key, 86400) // 24h expiry
.query_async(&mut ***cache)
.await
}
// Helper function to get cached messages from Redis
pub async fn get(
redis: &mut Connection<Redis>,
channel_id: i32,
) -> Result<Vec<super::ChatMsg>, redis::RedisError> {
let key = format!("messages:{}", channel_id);
// query last 100 messages
let messages: Vec<ChatMsg> = redis
.lrange::<_, Vec<String>>(&key, 0, 99)
.await?
.into_iter()
.rev() // Reverse because LPUSH adds to front, so newest is first
.filter_map(|msg_str| serde_json::from_str(&msg_str).ok())
.collect();
Ok(messages)
}
// Helper function to initialize cache from Postgres if empty
pub async fn initialise(
cache: &mut Connection<Redis>,
db: &mut Connection<Postgres>,
channel_id: i32,
) -> Result<(), Box<dyn std::error::Error>> {
let key = format!("messages:{}", channel_id);
// less than 100 messages in cache?
if cache.llen::<_, i32>(&key).await? < 100 {
// Fetch from Postgres
let messages = sqlx::query!(
"SELECT u.username, u.display_name, u.id, m.content, m.created_at
FROM messages m
JOIN users u ON m.user_id = u.id
WHERE m.channel_id = $1
ORDER BY m.created_at DESC
LIMIT 100",
channel_id
)
.fetch_all(&mut ***db)
.await?;
// Populate cache (in reverse order so oldest is at the end)
for msg in messages.into_iter().rev() {
let msg_json = serde_json::to_string(&ChatMsg {
display_name: Some(msg.display_name.unwrap_or(msg.username)),
user_id: msg.id as usize,
text: msg.content,
timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize,
})?;
cache.lpush::<_, _, ()>(&key, &msg_json).await?;
}
cache.expire::<_, ()>(&key, 86400).await?;
}
Ok(())
}