caching implementation

This commit is contained in:
2025-10-20 00:53:27 +01:00
parent 561999f4f1
commit f6d2999b96
16 changed files with 372 additions and 64 deletions
+1
View File
@@ -3,3 +3,4 @@
.log*
Cargo.lock
.cargo/
docker-compose*
+2
View File
@@ -7,6 +7,7 @@ edition = "2024"
argon2 = "0.5.3"
chrono = { version = "0.4.42", features = ["serde"] }
futures-util = "0.3.31"
image = "0.25.8"
rand = "0.9.2"
redis = { version = "0.25.4", features = ["tokio-comp"] }
reqwest = { version = "0.12.23", features = ["json"] }
@@ -15,6 +16,7 @@ rocket_cors = "0.6.0"
rocket_db_pools = { version = "0.2.0", features = ["deadpool_redis", "sqlx_macros", "sqlx_postgres"] }
rocket_dyn_templates = { version = "0.2.0", features = ["tera"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
sha2 = "0.10.9"
sqlx = { version = "0.7.4", features = ["macros", "time"] }
tokio = { version = "1.47.1", features = ["full"] }
+4
View File
@@ -9,6 +9,10 @@ url = "postgresql://chatapp:chatapp@100.118.108.58:5432/chatapp"
[default.databases.redis_cache]
url = "redis://chatapp_redis:6379"
[debug.databases.redis_cache]
url = "redis://localhost:6379"
[default] # run inside a docker container or pod
address = "0.0.0.0"
port = 8000
Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 KiB

Before

Width:  |  Height:  |  Size: 93 KiB

After

Width:  |  Height:  |  Size: 93 KiB

Before

Width:  |  Height:  |  Size: 15 KiB

After

Width:  |  Height:  |  Size: 15 KiB

Before

Width:  |  Height:  |  Size: 548 B

After

Width:  |  Height:  |  Size: 548 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 KiB

+119 -4
View File
@@ -1,6 +1,18 @@
use std::path::{Path, PathBuf};
use std::{
fs,
path::{Path, PathBuf},
};
use rocket::fs::NamedFile;
use image::{ImageFormat, imageops::FilterType};
use rocket::{
form::Form,
fs::{NamedFile, TempFile},
http::Status,
serde::json::Json,
};
use serde::Serialize;
use tokio::io::AsyncReadExt;
pub fn routes() -> Vec<rocket::Route> {
routes![profile_pic, general]
@@ -9,11 +21,15 @@ pub fn routes() -> Vec<rocket::Route> {
#[get("/profile/<user_id>")]
pub async fn profile_pic(user_id: usize) -> Option<NamedFile> {
if let Ok(image) =
NamedFile::open(Path::new("./cdn/profiles/").join(format!("{}.jpg", user_id))).await
NamedFile::open(Path::new("./cdn/profiles/full/").join(format!("{}.jpg", user_id))).await
{
Some(image)
} else {
Some(NamedFile::open("./cdn/profiles/default.svg").await.ok()?)
Some(
NamedFile::open("./cdn/profiles/full/default.svg")
.await
.ok()?,
)
}
}
@@ -23,3 +39,102 @@ pub async fn general(file_name: PathBuf) -> Option<NamedFile> {
.await
.ok()
}
#[derive(Serialize)]
pub struct UploadResponse {
success: bool,
message: String,
url: Option<String>,
}
// Upload endpoint - handles image upload and creates multiple sizes
#[post("/profile/<user_id>/upload", data = "<file>")]
pub async fn upload_profile_pic(
user_id: usize,
mut file: Form<TempFile<'_>>,
) -> Result<Json<UploadResponse>, Status> {
const MAX_FILE_SIZE: u64 = 5 * 1024 * 1024;
if file.len() > MAX_FILE_SIZE {
return Ok(Json(UploadResponse {
success: false,
message: "File size exceeds 5MB limit".to_string(),
url: None,
}));
}
// Read file contents into buffer
let mut buffer = Vec::new();
file.open()
.await
.map_err(|e| {
eprintln!("Failed to open file: {}", e);
Status::BadRequest
})?
.read_to_end(&mut buffer)
.await
.map_err(|e| {
eprintln!("Failed to read file: {}", e);
Status::BadRequest
})?;
// Check if buffer is empty
if buffer.is_empty() {
return Ok(Json(UploadResponse {
success: false,
message: "Uploaded file is empty".to_string(),
url: None,
}));
}
// Validate format from buffer
let format = image::guess_format(&buffer).map_err(|e| {
eprintln!("Failed to guess format: {}", e);
Status::BadRequest
})?;
// Only allow specific image formats
let allowed_formats = [
ImageFormat::Jpeg,
ImageFormat::Png,
ImageFormat::WebP,
ImageFormat::Gif,
];
if !allowed_formats.contains(&format) {
return Ok(Json(UploadResponse {
success: false,
message: "Unsupported image format. Only JPEG, PNG, WebP, and GIF are allowed."
.to_string(),
url: None,
}));
}
// Decode and validate the image before persisting anything
let img = image::load_from_memory(&buffer).map_err(|e| {
eprintln!("Image decode error: {}", e);
Status::BadRequest
})?;
// Now that we know it's valid, ensure directories exist
let base_path = Path::new("./cdn/profiles");
fs::create_dir_all(base_path.join("thumb")).map_err(|_| Status::InternalServerError)?;
fs::create_dir_all(base_path.join("full")).map_err(|_| Status::InternalServerError)?;
// Create thumbnail (64x64) for chat lists
let thumb = img.resize_to_fill(64, 64, FilterType::Lanczos3);
let thumb_path = base_path.join("thumb").join(format!("{}.jpg", user_id));
thumb
.save_with_format(&thumb_path, ImageFormat::Jpeg)
.map_err(|_| Status::InternalServerError)?;
// Create full size (256x256) for profile views
let full = img.resize_to_fill(256, 256, FilterType::Lanczos3);
let full_path = base_path.join("full").join(format!("{}.jpg", user_id));
full.save_with_format(&full_path, ImageFormat::Jpeg)
.map_err(|_| Status::InternalServerError)?;
Ok(Json(UploadResponse {
success: true,
message: "Profile picture uploaded successfully".to_string(),
url: Some(format!("/cdn/profile/{}", user_id)),
}))
}
+1 -1
View File
@@ -29,7 +29,7 @@ impl LlmWorker {
// Build the request body
let payload = LlmRequest {
model: "gemma2-9b-it".into(), // whatever model you run locally
model: "gpt-oss-20b".into(), // whatever model you run locally
messages: vec![Message {
role: "user".into(),
content: message.text.clone().into(),
+52 -12
View File
@@ -2,6 +2,7 @@
#[macro_use]
extern crate rocket;
use redis::cmd;
use rocket::fs::{FileServer, NamedFile};
use rocket::http::Method;
use rocket::serde::json::Json;
@@ -13,7 +14,6 @@ use std::sync::Arc;
use crate::auth::Session;
use crate::db::{Postgres, Redis};
use crate::messages::ChatBroadcaster;
pub mod auth;
pub mod cdn;
@@ -35,20 +35,18 @@ async fn users(_ag: Session, mut db: Connection<Postgres>) -> Json<Vec<i32>> {
}
#[get("/users/<id>", rank = 1)]
async fn display_name(id: usize, _ag: Session, mut db: Connection<Postgres>) -> String {
sqlx::query!(
"SELECT display_name, username FROM users WHERE id = $1",
id as i32
)
.fetch_one(&mut **db)
.await
.map(|row| row.display_name.unwrap_or(row.username))
.unwrap_or_else(|_| "User not found".to_string())
async fn display_name(
id: usize,
_ag: Session,
mut pgsql_conn: Connection<Postgres>,
mut redis_conn: Connection<Redis>,
) -> String {
UserCache::username(id, &mut redis_conn, &mut pgsql_conn).await
}
#[launch]
fn rocket() -> Rocket<Build> {
let chat = Arc::new(ChatBroadcaster::new(32));
let chat = Arc::new(crate::messages::ChatBroadcaster::new(32));
let cors = CorsOptions::default()
.allowed_origins(AllowedOrigins::all())
@@ -73,7 +71,6 @@ fn rocket() -> Rocket<Build> {
routes![
favicon,
messages::chat_page,
messages::chat_page_preview,
auth::signup_page,
auth::login_page,
auth::mfa_page,
@@ -83,6 +80,7 @@ fn rocket() -> Rocket<Build> {
.mount(
"/api",
routes![
cdn::upload_profile_pic,
messages::get_messages,
messages::post_message,
messages::event_stream,
@@ -109,3 +107,45 @@ fn rocket() -> Rocket<Build> {
async fn favicon() -> NamedFile {
NamedFile::open("static/favicon.ico").await.unwrap()
}
pub struct UserCache {}
impl UserCache {
pub async fn username(
id: usize,
redis_conn: &mut Connection<Redis>,
pgsql_conn: &mut Connection<Postgres>,
) -> String {
if let Ok(val) = cmd("GET")
.arg(&[format!("users:{id}")])
.query_async(&mut **redis_conn)
.await
{
return val;
}
if let Ok(v) = sqlx::query!("SELECT username FROM users WHERE id = $1", id as i32)
.fetch_one(&mut ***pgsql_conn)
.await
{
let username = v.username;
Self::insert(id, &username, redis_conn).await;
username
} else {
unimplemented!()
}
}
pub async fn insert(id: usize, username: &str, conn: &mut Connection<Redis>) {
cmd("SET")
.arg(&[
format!("users:{id}"),
username.to_string(),
"EX".to_string(),
"1800".to_string(),
])
.query_async(&mut **conn)
.await
.expect("failed to insert key")
}
}
+87
View File
@@ -0,0 +1,87 @@
use redis::AsyncCommands;
use rocket_db_pools::Connection;
use crate::{
db::{Postgres, Redis},
messages::ChatMsg,
};
// Helper function to cache message in Redis
pub async fn insert(
cache: &mut Connection<Redis>,
channel_id: i32,
msg: &super::ChatMsg,
) -> Result<(), redis::RedisError> {
let key = format!("messages:{}", channel_id);
let msg_json = serde_json::to_string(msg).unwrap();
redis::pipe()
.atomic()
.lpush(&key, &msg_json)
.ltrim(&key, 0, 99)
.expire(&key, 86400) // 24h expiry
.query_async(&mut ***cache)
.await
}
// Helper function to get cached messages from Redis
pub async fn get(
redis: &mut Connection<Redis>,
channel_id: i32,
) -> Result<Vec<super::ChatMsg>, redis::RedisError> {
let key = format!("messages:{}", channel_id);
// query last 100 messages
let messages: Vec<ChatMsg> = redis
.lrange::<_, Vec<String>>(&key, 0, 99)
.await?
.into_iter()
.rev() // Reverse because LPUSH adds to front, so newest is first
.filter_map(|msg_str| serde_json::from_str(&msg_str).ok())
.collect();
Ok(messages)
}
// Helper function to initialize cache from Postgres if empty
pub async fn initialise(
cache: &mut Connection<Redis>,
db: &mut Connection<Postgres>,
channel_id: i32,
) -> Result<(), Box<dyn std::error::Error>> {
let key = format!("messages:{}", channel_id);
let length: usize = cache.llen(&key).await?;
if length < 100 {
// Fetch from Postgres
let messages = sqlx::query!(
"SELECT u.username, u.display_name, u.id, m.content, m.created_at
FROM messages m
JOIN users u ON m.user_id = u.id
WHERE m.channel_id = $1
ORDER BY m.created_at DESC
LIMIT 100",
channel_id
)
.fetch_all(&mut ***db)
.await?;
// Populate cache (in reverse order so oldest is at the end)
for msg in messages.into_iter().rev() {
let chat_msg = ChatMsg {
display_name: Some(msg.display_name.unwrap_or(msg.username)),
user_id: msg.id as usize,
text: msg.content,
timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize,
};
let msg_json = serde_json::to_string(&chat_msg)?;
cache.lpush::<_, _, ()>(&key, &msg_json).await?;
}
cache.expire::<_, ()>(&key, 86400).await?;
}
Ok(())
}
@@ -1,12 +1,13 @@
use std::sync::Arc;
use redis::cmd;
use redis::{AsyncCommands, cmd};
use rocket::{
Shutdown,
response::stream::{Event, EventStream},
serde::json::Json,
time::OffsetDateTime,
};
use rocket_cors::CorsOptions;
use rocket_db_pools::Connection;
use rocket_dyn_templates::{Template, context};
use serde::{Deserialize, Serialize};
@@ -52,10 +53,12 @@ pub struct ChatMsg {
pub async fn post_message(
mut msg: Json<ChatMsg>,
chat: &rocket::State<Arc<ChatBroadcaster>>,
mut db: Connection<Postgres>,
mut postgres: Connection<Postgres>,
mut cache: Connection<Redis>,
session: Session,
) -> Result<(), String> {
const CHANNEL_ID: i32 = 1;
let channel_id = CHANNEL_ID;
const LMSTUDIO_URI: &'static str = "http://127.0.0.1:1234/v1/chat/completions";
let chat = chat.inner().clone();
@@ -64,7 +67,7 @@ pub async fn post_message(
"SELECT display_name, username FROM users WHERE id = $1",
session.user_id as i32
)
.fetch_one(&mut **db)
.fetch_one(&mut **postgres)
.await
.map(|row| row.display_name.unwrap_or(row.username))
.unwrap_or_else(|_| "Unknown".to_string());
@@ -80,16 +83,22 @@ pub async fn post_message(
msg.text,
OffsetDateTime::from_unix_timestamp_nanos(msg.timestamp as i128 * 1_000_000).unwrap()
)
.execute(&mut **db)
.execute(&mut **postgres)
.await
.map_err(|_| "Failed".to_string())?;
super::cache::insert(&mut cache, channel_id, &msg)
.await
.map_err(|_| "Redis cache failed".to_string())?;
// get response
tokio::spawn(async move {
let response = LlmWorker::new(LMSTUDIO_URI.to_string()).query(&msg).await;
if let Ok(reply) = response {
chat.publish(reply.clone()).await;
super::cache::insert(&mut cache, CHANNEL_ID, &reply)
.await
.ok();
sqlx::query!(
"INSERT INTO messages (channel_id, user_id, content, created_at) VALUES ($1, $2, $3, $4)",
@@ -98,7 +107,7 @@ pub async fn post_message(
reply.text,
OffsetDateTime::from_unix_timestamp_nanos(reply.timestamp as i128 * 1_000_000).unwrap()
)
.execute(&mut **db)
.execute(&mut **postgres)
.await
.map_err(|_| "Failed".to_string())
.unwrap();
@@ -109,31 +118,58 @@ pub async fn post_message(
}
#[get("/messages")]
pub async fn get_messages(mut db: Connection<Postgres>, _session: Session) -> Json<Vec<ChatMsg>> {
Json(
sqlx::query!(
"SELECT u.username, u.display_name, u.id, m.content, m.created_at FROM messages m JOIN users u ON m.user_id = u.id ORDER BY m.created_at DESC LIMIT 100"
)
.fetch_all(&mut **db)
.await
.unwrap_or_else(|_| Vec::new())
.into_iter()
.rev()
.map(|msg| ChatMsg {
display_name: Some(msg.display_name.unwrap_or(msg.username)),
user_id: msg.id as usize,
text: msg.content,
timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize,
})
.collect(),
pub async fn get_messages(
mut db: Connection<Postgres>,
mut redis: Connection<Redis>,
_session: Session,
) -> Json<Vec<ChatMsg>> {
const CHANNEL_ID: i32 = 1;
let channel_id = CHANNEL_ID;
if let Ok(messages) = super::cache::get(&mut redis, channel_id).await
&& !messages.is_empty()
{
return Json(messages);
};
if let Err(x) = super::cache::initialise(&mut redis, &mut db, channel_id).await {
eprintln!("WARN: {x:?}");
}
if let Ok(messages) = super::cache::get(&mut redis, channel_id).await
&& !messages.is_empty()
{
return Json(messages);
};
let res = sqlx::query!(
"SELECT u.username, u.display_name, u.id, m.content, m.created_at
FROM messages m
JOIN users u ON m.user_id = u.id
WHERE m.channel_id = $1
ORDER BY m.created_at DESC LIMIT 100",
channel_id
)
// Json(vec![])
.fetch_all(&mut **db)
.await
.unwrap_or_else(|_| Vec::new())
.into_iter()
.rev()
.map(|msg| ChatMsg {
display_name: Some(msg.display_name.unwrap_or(msg.username)),
user_id: msg.id as usize,
text: msg.content,
timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize,
})
.collect();
Json(res)
}
#[get("/events")]
pub async fn event_stream(
chat: &rocket::State<Arc<ChatBroadcaster>>,
db: Connection<Postgres>,
postgres: Connection<Postgres>,
cache: Connection<Redis>,
ag: Session,
mut shutdown: Shutdown,
) -> EventStream![] {
@@ -141,7 +177,7 @@ pub async fn event_stream(
EventStream! {
// Initialize the stream with the last 100 messages
for msg in get_messages(db, ag).await.0 {
for msg in get_messages(postgres, cache, ag).await.0 {
yield Event::json(&msg);
}
@@ -171,25 +207,3 @@ pub async fn chat_page(session: Session) -> Template {
pub async fn chat_page_preview(session: Session) -> Template {
Template::render("chatpreview", context!(user_id: session.user_id))
}
pub struct UserCache {}
impl UserCache {
pub async fn username(&mut self, id: usize, redis_conn: &mut Connection<Redis>) -> String {
if let Ok(val) = cmd("GET")
.arg(&[format!("users:{id}")])
.query_async(&mut **redis_conn)
.await
{
return val;
}
}
pub async fn insert(id: usize, username: &str, conn: &mut Connection<Redis>) {
cmd("SET")
.arg(&[format!("users:{id}"), username.to_owned()])
.query_async(&mut **conn)
.await
.expect("failed to insert key")
}
}
+4
View File
@@ -0,0 +1,4 @@
mod cache;
mod messages;
pub use messages::{ChatBroadcaster, ChatMsg, chat_page, event_stream, get_messages, post_message};
Executable
+4
View File
@@ -0,0 +1,4 @@
source backend/.env
echo "Building against DB: $DATABASE_URL"
podman build --build-arg DATABASE_URL=$DATABASE_URL -t git.zxq5.dev/zxq5/chatapp-backend:$1 ./backend
podman push git.zxq5.dev/zxq5/chatapp-backend:$1
+37
View File
@@ -0,0 +1,37 @@
- **Frontend Service** (serving React frontend)
- Communicates with backend APIs via HTTPS
- Authenticates users via JWT tokens
- **Messenger**
- Handles realtime messaging (WebSocket or long polling)
- Stores message metadata in Postgres
- Uses Redis for message caching
- **CDN Service** (user uploaded media)
- Receives uploads, stores files on object storage
- Generates signed URLs with built in auth for secure access
- **LLM Service**
- Exposes languagemodel inference API for scripts
- Runs model in a containerized environment
- Stores usage logs in Postgres
- **Auth Service** (user authentication & authorization)
- Validates credentials and issues JWTs
- Manages refresh tokens and token revocation
- Persists session state and audit logs in PostgreSQL
- **Script Runner**
- Executes user or system scripts on demand
- Isolates execution in sandbox containers
- Provides access to "Tables" - SQLite files kept in object storage
- Persists logs to Postgres
- **Redis Cache**
- Provides distributed caching for session data and messages
- **PostgreSQL Service** (with volumes)
- Stores relational data: users, messages, media metadata, script logs
- Uses Docker volume for persistent storage
- Backed up via scheduled snapshots
```