diff --git a/.gitignore b/.gitignore index 467b666..d956887 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .log* Cargo.lock .cargo/ +docker-compose* diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 1f1fd71..22207d7 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" argon2 = "0.5.3" chrono = { version = "0.4.42", features = ["serde"] } futures-util = "0.3.31" +image = "0.25.8" rand = "0.9.2" redis = { version = "0.25.4", features = ["tokio-comp"] } reqwest = { version = "0.12.23", features = ["json"] } @@ -15,6 +16,7 @@ rocket_cors = "0.6.0" rocket_db_pools = { version = "0.2.0", features = ["deadpool_redis", "sqlx_macros", "sqlx_postgres"] } rocket_dyn_templates = { version = "0.2.0", features = ["tera"] } serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" sha2 = "0.10.9" sqlx = { version = "0.7.4", features = ["macros", "time"] } tokio = { version = "1.47.1", features = ["full"] } diff --git a/backend/Rocket.toml b/backend/Rocket.toml index edba168..c55efe8 100644 --- a/backend/Rocket.toml +++ b/backend/Rocket.toml @@ -9,6 +9,10 @@ url = "postgresql://chatapp:chatapp@100.118.108.58:5432/chatapp" [default.databases.redis_cache] url = "redis://chatapp_redis:6379" +[debug.databases.redis_cache] +url = "redis://localhost:6379" + + [default] # run inside a docker container or pod address = "0.0.0.0" port = 8000 diff --git a/backend/cdn/profiles/full/0.jpg b/backend/cdn/profiles/full/0.jpg new file mode 100644 index 0000000..f72a082 Binary files /dev/null and b/backend/cdn/profiles/full/0.jpg differ diff --git a/backend/cdn/profiles/8.jpg b/backend/cdn/profiles/full/8.jpg similarity index 100% rename from backend/cdn/profiles/8.jpg rename to backend/cdn/profiles/full/8.jpg diff --git a/backend/cdn/profiles/default.jpg b/backend/cdn/profiles/full/default.jpg similarity index 100% rename from backend/cdn/profiles/default.jpg rename to backend/cdn/profiles/full/default.jpg diff --git a/backend/cdn/profiles/default.svg b/backend/cdn/profiles/full/default.svg similarity index 100% rename from backend/cdn/profiles/default.svg rename to backend/cdn/profiles/full/default.svg diff --git a/backend/cdn/profiles/thumb/0.jpg b/backend/cdn/profiles/thumb/0.jpg new file mode 100644 index 0000000..aa5cd72 Binary files /dev/null and b/backend/cdn/profiles/thumb/0.jpg differ diff --git a/backend/src/cdn.rs b/backend/src/cdn.rs index 9c9b59b..35ced68 100644 --- a/backend/src/cdn.rs +++ b/backend/src/cdn.rs @@ -1,6 +1,18 @@ -use std::path::{Path, PathBuf}; +use std::{ + fs, + path::{Path, PathBuf}, +}; -use rocket::fs::NamedFile; +use image::{ImageFormat, imageops::FilterType}; +use rocket::{ + form::Form, + fs::{NamedFile, TempFile}, + http::Status, + serde::json::Json, +}; + +use serde::Serialize; +use tokio::io::AsyncReadExt; pub fn routes() -> Vec { routes![profile_pic, general] @@ -9,11 +21,15 @@ pub fn routes() -> Vec { #[get("/profile/")] pub async fn profile_pic(user_id: usize) -> Option { if let Ok(image) = - NamedFile::open(Path::new("./cdn/profiles/").join(format!("{}.jpg", user_id))).await + NamedFile::open(Path::new("./cdn/profiles/full/").join(format!("{}.jpg", user_id))).await { Some(image) } else { - Some(NamedFile::open("./cdn/profiles/default.svg").await.ok()?) + Some( + NamedFile::open("./cdn/profiles/full/default.svg") + .await + .ok()?, + ) } } @@ -23,3 +39,102 @@ pub async fn general(file_name: PathBuf) -> Option { .await .ok() } + +#[derive(Serialize)] +pub struct UploadResponse { + success: bool, + message: String, + url: Option, +} +// Upload endpoint - handles image upload and creates multiple sizes +#[post("/profile//upload", data = "")] +pub async fn upload_profile_pic( + user_id: usize, + mut file: Form>, +) -> Result, Status> { + const MAX_FILE_SIZE: u64 = 5 * 1024 * 1024; + if file.len() > MAX_FILE_SIZE { + return Ok(Json(UploadResponse { + success: false, + message: "File size exceeds 5MB limit".to_string(), + url: None, + })); + } + + // Read file contents into buffer + let mut buffer = Vec::new(); + file.open() + .await + .map_err(|e| { + eprintln!("Failed to open file: {}", e); + Status::BadRequest + })? + .read_to_end(&mut buffer) + .await + .map_err(|e| { + eprintln!("Failed to read file: {}", e); + Status::BadRequest + })?; + + // Check if buffer is empty + if buffer.is_empty() { + return Ok(Json(UploadResponse { + success: false, + message: "Uploaded file is empty".to_string(), + url: None, + })); + } + + // Validate format from buffer + let format = image::guess_format(&buffer).map_err(|e| { + eprintln!("Failed to guess format: {}", e); + Status::BadRequest + })?; + + // Only allow specific image formats + let allowed_formats = [ + ImageFormat::Jpeg, + ImageFormat::Png, + ImageFormat::WebP, + ImageFormat::Gif, + ]; + + if !allowed_formats.contains(&format) { + return Ok(Json(UploadResponse { + success: false, + message: "Unsupported image format. Only JPEG, PNG, WebP, and GIF are allowed." + .to_string(), + url: None, + })); + } + + // Decode and validate the image before persisting anything + let img = image::load_from_memory(&buffer).map_err(|e| { + eprintln!("Image decode error: {}", e); + Status::BadRequest + })?; + + // Now that we know it's valid, ensure directories exist + let base_path = Path::new("./cdn/profiles"); + fs::create_dir_all(base_path.join("thumb")).map_err(|_| Status::InternalServerError)?; + fs::create_dir_all(base_path.join("full")).map_err(|_| Status::InternalServerError)?; + + // Create thumbnail (64x64) for chat lists + let thumb = img.resize_to_fill(64, 64, FilterType::Lanczos3); + let thumb_path = base_path.join("thumb").join(format!("{}.jpg", user_id)); + thumb + .save_with_format(&thumb_path, ImageFormat::Jpeg) + .map_err(|_| Status::InternalServerError)?; + + // Create full size (256x256) for profile views + let full = img.resize_to_fill(256, 256, FilterType::Lanczos3); + let full_path = base_path.join("full").join(format!("{}.jpg", user_id)); + full.save_with_format(&full_path, ImageFormat::Jpeg) + .map_err(|_| Status::InternalServerError)?; + + Ok(Json(UploadResponse { + success: true, + message: "Profile picture uploaded successfully".to_string(), + url: Some(format!("/cdn/profile/{}", user_id)), + })) +} diff --git a/backend/src/llm.rs b/backend/src/llm.rs index f1aa988..5a5388a 100644 --- a/backend/src/llm.rs +++ b/backend/src/llm.rs @@ -29,7 +29,7 @@ impl LlmWorker { // Build the request body let payload = LlmRequest { - model: "gemma2-9b-it".into(), // whatever model you run locally + model: "gpt-oss-20b".into(), // whatever model you run locally messages: vec![Message { role: "user".into(), content: message.text.clone().into(), diff --git a/backend/src/main.rs b/backend/src/main.rs index 733757a..a1a298c 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -2,6 +2,7 @@ #[macro_use] extern crate rocket; +use redis::cmd; use rocket::fs::{FileServer, NamedFile}; use rocket::http::Method; use rocket::serde::json::Json; @@ -13,7 +14,6 @@ use std::sync::Arc; use crate::auth::Session; use crate::db::{Postgres, Redis}; -use crate::messages::ChatBroadcaster; pub mod auth; pub mod cdn; @@ -35,20 +35,18 @@ async fn users(_ag: Session, mut db: Connection) -> Json> { } #[get("/users/", rank = 1)] -async fn display_name(id: usize, _ag: Session, mut db: Connection) -> String { - sqlx::query!( - "SELECT display_name, username FROM users WHERE id = $1", - id as i32 - ) - .fetch_one(&mut **db) - .await - .map(|row| row.display_name.unwrap_or(row.username)) - .unwrap_or_else(|_| "User not found".to_string()) +async fn display_name( + id: usize, + _ag: Session, + mut pgsql_conn: Connection, + mut redis_conn: Connection, +) -> String { + UserCache::username(id, &mut redis_conn, &mut pgsql_conn).await } #[launch] fn rocket() -> Rocket { - let chat = Arc::new(ChatBroadcaster::new(32)); + let chat = Arc::new(crate::messages::ChatBroadcaster::new(32)); let cors = CorsOptions::default() .allowed_origins(AllowedOrigins::all()) @@ -73,7 +71,6 @@ fn rocket() -> Rocket { routes![ favicon, messages::chat_page, - messages::chat_page_preview, auth::signup_page, auth::login_page, auth::mfa_page, @@ -83,6 +80,7 @@ fn rocket() -> Rocket { .mount( "/api", routes![ + cdn::upload_profile_pic, messages::get_messages, messages::post_message, messages::event_stream, @@ -109,3 +107,45 @@ fn rocket() -> Rocket { async fn favicon() -> NamedFile { NamedFile::open("static/favicon.ico").await.unwrap() } + +pub struct UserCache {} + +impl UserCache { + pub async fn username( + id: usize, + redis_conn: &mut Connection, + pgsql_conn: &mut Connection, + ) -> String { + if let Ok(val) = cmd("GET") + .arg(&[format!("users:{id}")]) + .query_async(&mut **redis_conn) + .await + { + return val; + } + + if let Ok(v) = sqlx::query!("SELECT username FROM users WHERE id = $1", id as i32) + .fetch_one(&mut ***pgsql_conn) + .await + { + let username = v.username; + Self::insert(id, &username, redis_conn).await; + username + } else { + unimplemented!() + } + } + + pub async fn insert(id: usize, username: &str, conn: &mut Connection) { + cmd("SET") + .arg(&[ + format!("users:{id}"), + username.to_string(), + "EX".to_string(), + "1800".to_string(), + ]) + .query_async(&mut **conn) + .await + .expect("failed to insert key") + } +} diff --git a/backend/src/messages/cache.rs b/backend/src/messages/cache.rs new file mode 100644 index 0000000..445f06f --- /dev/null +++ b/backend/src/messages/cache.rs @@ -0,0 +1,87 @@ +use redis::AsyncCommands; +use rocket_db_pools::Connection; + +use crate::{ + db::{Postgres, Redis}, + messages::ChatMsg, +}; + +// Helper function to cache message in Redis +pub async fn insert( + cache: &mut Connection, + channel_id: i32, + msg: &super::ChatMsg, +) -> Result<(), redis::RedisError> { + let key = format!("messages:{}", channel_id); + let msg_json = serde_json::to_string(msg).unwrap(); + + redis::pipe() + .atomic() + .lpush(&key, &msg_json) + .ltrim(&key, 0, 99) + .expire(&key, 86400) // 24h expiry + .query_async(&mut ***cache) + .await +} + +// Helper function to get cached messages from Redis +pub async fn get( + redis: &mut Connection, + channel_id: i32, +) -> Result, redis::RedisError> { + let key = format!("messages:{}", channel_id); + + // query last 100 messages + let messages: Vec = redis + .lrange::<_, Vec>(&key, 0, 99) + .await? + .into_iter() + .rev() // Reverse because LPUSH adds to front, so newest is first + .filter_map(|msg_str| serde_json::from_str(&msg_str).ok()) + .collect(); + + Ok(messages) +} + +// Helper function to initialize cache from Postgres if empty +pub async fn initialise( + cache: &mut Connection, + db: &mut Connection, + channel_id: i32, +) -> Result<(), Box> { + let key = format!("messages:{}", channel_id); + + let length: usize = cache.llen(&key).await?; + if length < 100 { + // Fetch from Postgres + let messages = sqlx::query!( + "SELECT u.username, u.display_name, u.id, m.content, m.created_at + FROM messages m + JOIN users u ON m.user_id = u.id + WHERE m.channel_id = $1 + ORDER BY m.created_at DESC + LIMIT 100", + channel_id + ) + .fetch_all(&mut ***db) + .await?; + + // Populate cache (in reverse order so oldest is at the end) + for msg in messages.into_iter().rev() { + let chat_msg = ChatMsg { + display_name: Some(msg.display_name.unwrap_or(msg.username)), + user_id: msg.id as usize, + text: msg.content, + timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize, + }; + + let msg_json = serde_json::to_string(&chat_msg)?; + + cache.lpush::<_, _, ()>(&key, &msg_json).await?; + } + + cache.expire::<_, ()>(&key, 86400).await?; + } + + Ok(()) +} diff --git a/backend/src/messages.rs b/backend/src/messages/messages.rs similarity index 68% rename from backend/src/messages.rs rename to backend/src/messages/messages.rs index 0a35fd8..3723c70 100644 --- a/backend/src/messages.rs +++ b/backend/src/messages/messages.rs @@ -1,12 +1,13 @@ use std::sync::Arc; -use redis::cmd; +use redis::{AsyncCommands, cmd}; use rocket::{ Shutdown, response::stream::{Event, EventStream}, serde::json::Json, time::OffsetDateTime, }; +use rocket_cors::CorsOptions; use rocket_db_pools::Connection; use rocket_dyn_templates::{Template, context}; use serde::{Deserialize, Serialize}; @@ -52,10 +53,12 @@ pub struct ChatMsg { pub async fn post_message( mut msg: Json, chat: &rocket::State>, - mut db: Connection, + mut postgres: Connection, + mut cache: Connection, session: Session, ) -> Result<(), String> { const CHANNEL_ID: i32 = 1; + let channel_id = CHANNEL_ID; const LMSTUDIO_URI: &'static str = "http://127.0.0.1:1234/v1/chat/completions"; let chat = chat.inner().clone(); @@ -64,7 +67,7 @@ pub async fn post_message( "SELECT display_name, username FROM users WHERE id = $1", session.user_id as i32 ) - .fetch_one(&mut **db) + .fetch_one(&mut **postgres) .await .map(|row| row.display_name.unwrap_or(row.username)) .unwrap_or_else(|_| "Unknown".to_string()); @@ -80,16 +83,22 @@ pub async fn post_message( msg.text, OffsetDateTime::from_unix_timestamp_nanos(msg.timestamp as i128 * 1_000_000).unwrap() ) - .execute(&mut **db) + .execute(&mut **postgres) .await .map_err(|_| "Failed".to_string())?; + super::cache::insert(&mut cache, channel_id, &msg) + .await + .map_err(|_| "Redis cache failed".to_string())?; // get response tokio::spawn(async move { let response = LlmWorker::new(LMSTUDIO_URI.to_string()).query(&msg).await; if let Ok(reply) = response { chat.publish(reply.clone()).await; + super::cache::insert(&mut cache, CHANNEL_ID, &reply) + .await + .ok(); sqlx::query!( "INSERT INTO messages (channel_id, user_id, content, created_at) VALUES ($1, $2, $3, $4)", @@ -98,7 +107,7 @@ pub async fn post_message( reply.text, OffsetDateTime::from_unix_timestamp_nanos(reply.timestamp as i128 * 1_000_000).unwrap() ) - .execute(&mut **db) + .execute(&mut **postgres) .await .map_err(|_| "Failed".to_string()) .unwrap(); @@ -109,31 +118,58 @@ pub async fn post_message( } #[get("/messages")] -pub async fn get_messages(mut db: Connection, _session: Session) -> Json> { - Json( - sqlx::query!( - "SELECT u.username, u.display_name, u.id, m.content, m.created_at FROM messages m JOIN users u ON m.user_id = u.id ORDER BY m.created_at DESC LIMIT 100" - ) - .fetch_all(&mut **db) - .await - .unwrap_or_else(|_| Vec::new()) - .into_iter() - .rev() - .map(|msg| ChatMsg { - display_name: Some(msg.display_name.unwrap_or(msg.username)), - user_id: msg.id as usize, - text: msg.content, - timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize, - }) - .collect(), +pub async fn get_messages( + mut db: Connection, + mut redis: Connection, + _session: Session, +) -> Json> { + const CHANNEL_ID: i32 = 1; + let channel_id = CHANNEL_ID; + + if let Ok(messages) = super::cache::get(&mut redis, channel_id).await + && !messages.is_empty() + { + return Json(messages); + }; + + if let Err(x) = super::cache::initialise(&mut redis, &mut db, channel_id).await { + eprintln!("WARN: {x:?}"); + } + + if let Ok(messages) = super::cache::get(&mut redis, channel_id).await + && !messages.is_empty() + { + return Json(messages); + }; + + let res = sqlx::query!( + "SELECT u.username, u.display_name, u.id, m.content, m.created_at + FROM messages m + JOIN users u ON m.user_id = u.id + WHERE m.channel_id = $1 + ORDER BY m.created_at DESC LIMIT 100", + channel_id ) - // Json(vec![]) + .fetch_all(&mut **db) + .await + .unwrap_or_else(|_| Vec::new()) + .into_iter() + .rev() + .map(|msg| ChatMsg { + display_name: Some(msg.display_name.unwrap_or(msg.username)), + user_id: msg.id as usize, + text: msg.content, + timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize, + }) + .collect(); + Json(res) } #[get("/events")] pub async fn event_stream( chat: &rocket::State>, - db: Connection, + postgres: Connection, + cache: Connection, ag: Session, mut shutdown: Shutdown, ) -> EventStream![] { @@ -141,7 +177,7 @@ pub async fn event_stream( EventStream! { // Initialize the stream with the last 100 messages - for msg in get_messages(db, ag).await.0 { + for msg in get_messages(postgres, cache, ag).await.0 { yield Event::json(&msg); } @@ -171,25 +207,3 @@ pub async fn chat_page(session: Session) -> Template { pub async fn chat_page_preview(session: Session) -> Template { Template::render("chatpreview", context!(user_id: session.user_id)) } - -pub struct UserCache {} - -impl UserCache { - pub async fn username(&mut self, id: usize, redis_conn: &mut Connection) -> String { - if let Ok(val) = cmd("GET") - .arg(&[format!("users:{id}")]) - .query_async(&mut **redis_conn) - .await - { - return val; - } - } - - pub async fn insert(id: usize, username: &str, conn: &mut Connection) { - cmd("SET") - .arg(&[format!("users:{id}"), username.to_owned()]) - .query_async(&mut **conn) - .await - .expect("failed to insert key") - } -} diff --git a/backend/src/messages/mod.rs b/backend/src/messages/mod.rs new file mode 100644 index 0000000..556635c --- /dev/null +++ b/backend/src/messages/mod.rs @@ -0,0 +1,4 @@ +mod cache; +mod messages; + +pub use messages::{ChatBroadcaster, ChatMsg, chat_page, event_stream, get_messages, post_message}; diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..c6d2364 --- /dev/null +++ b/build.sh @@ -0,0 +1,4 @@ +source backend/.env +echo "Building against DB: $DATABASE_URL" +podman build --build-arg DATABASE_URL=$DATABASE_URL -t git.zxq5.dev/zxq5/chatapp-backend:$1 ./backend +podman push git.zxq5.dev/zxq5/chatapp-backend:$1 diff --git a/doc/arch.md b/doc/arch.md new file mode 100644 index 0000000..05c4770 --- /dev/null +++ b/doc/arch.md @@ -0,0 +1,37 @@ +- **Frontend Service** (serving React frontend) + - Communicates with backend APIs via HTTPS + - Authenticates users via JWT tokens + +- **Messenger** + - Handles real‑time messaging (WebSocket or long polling) + - Stores message metadata in Postgres + - Uses Redis for message caching + +- **CDN Service** (user uploaded media) + - Receives uploads, stores files on object storage + - Generates signed URLs with built in auth for secure access + +- **LLM Service** + - Exposes language‑model inference API for scripts + - Runs model in a containerized environment + - Stores usage logs in Postgres + +- **Auth Service** (user authentication & authorization) + - Validates credentials and issues JWTs + - Manages refresh tokens and token revocation + - Persists session state and audit logs in PostgreSQL + +- **Script Runner** + - Executes user or system scripts on demand + - Isolates execution in sandbox containers + - Provides access to "Tables" - SQLite files kept in object storage + - Persists logs to Postgres + +- **Redis Cache** + - Provides distributed caching for session data and messages + +- **PostgreSQL Service** (with volumes) + - Stores relational data: users, messages, media metadata, script logs + - Uses Docker volume for persistent storage + - Backed up via scheduled snapshots +```