minor refactoring
This commit is contained in:
@@ -0,0 +1,87 @@
|
||||
use redis::AsyncCommands;
|
||||
use rocket_db_pools::Connection;
|
||||
|
||||
use crate::{
|
||||
db::{Postgres, Redis},
|
||||
messenger::ChatMsg,
|
||||
};
|
||||
|
||||
// Helper function to cache message in Redis
|
||||
pub async fn insert(
|
||||
cache: &mut Connection<Redis>,
|
||||
channel_id: i32,
|
||||
msg: &super::ChatMsg,
|
||||
) -> Result<(), redis::RedisError> {
|
||||
let key = format!("messages:{}", channel_id);
|
||||
let msg_json = serde_json::to_string(msg).unwrap();
|
||||
|
||||
redis::pipe()
|
||||
.atomic()
|
||||
.lpush(&key, &msg_json)
|
||||
.ltrim(&key, 0, 99)
|
||||
.expire(&key, 86400) // 24h expiry
|
||||
.query_async(&mut ***cache)
|
||||
.await
|
||||
}
|
||||
|
||||
// Helper function to get cached messages from Redis
|
||||
pub async fn get(
|
||||
redis: &mut Connection<Redis>,
|
||||
channel_id: i32,
|
||||
) -> Result<Vec<super::ChatMsg>, redis::RedisError> {
|
||||
let key = format!("messages:{}", channel_id);
|
||||
|
||||
// query last 100 messages
|
||||
let messages: Vec<ChatMsg> = redis
|
||||
.lrange::<_, Vec<String>>(&key, 0, 99)
|
||||
.await?
|
||||
.into_iter()
|
||||
.rev() // Reverse because LPUSH adds to front, so newest is first
|
||||
.filter_map(|msg_str| serde_json::from_str(&msg_str).ok())
|
||||
.collect();
|
||||
|
||||
Ok(messages)
|
||||
}
|
||||
|
||||
// Helper function to initialize cache from Postgres if empty
|
||||
pub async fn initialise(
|
||||
cache: &mut Connection<Redis>,
|
||||
db: &mut Connection<Postgres>,
|
||||
channel_id: i32,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let key = format!("messages:{}", channel_id);
|
||||
|
||||
let length: usize = cache.llen(&key).await?;
|
||||
if length < 100 {
|
||||
// Fetch from Postgres
|
||||
let messages = sqlx::query!(
|
||||
"SELECT u.username, u.display_name, u.id, m.content, m.created_at
|
||||
FROM messages m
|
||||
JOIN users u ON m.user_id = u.id
|
||||
WHERE m.channel_id = $1
|
||||
ORDER BY m.created_at DESC
|
||||
LIMIT 100",
|
||||
channel_id
|
||||
)
|
||||
.fetch_all(&mut ***db)
|
||||
.await?;
|
||||
|
||||
// Populate cache (in reverse order so oldest is at the end)
|
||||
for msg in messages.into_iter().rev() {
|
||||
let chat_msg = ChatMsg {
|
||||
display_name: Some(msg.display_name.unwrap_or(msg.username)),
|
||||
user_id: msg.id as usize,
|
||||
text: msg.content,
|
||||
timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize,
|
||||
};
|
||||
|
||||
let msg_json = serde_json::to_string(&chat_msg)?;
|
||||
|
||||
cache.lpush::<_, _, ()>(&key, &msg_json).await?;
|
||||
}
|
||||
|
||||
cache.expire::<_, ()>(&key, 86400).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user