From f6d2999b96543cb3447c90b92dde24b9b78f0fa3 Mon Sep 17 00:00:00 2001 From: zxq5 Date: Mon, 20 Oct 2025 00:53:27 +0100 Subject: [PATCH] caching implementation --- .gitignore | 1 + backend/Cargo.toml | 2 + backend/Rocket.toml | 4 + backend/cdn/profiles/full/0.jpg | Bin 0 -> 7408 bytes backend/cdn/profiles/{ => full}/8.jpg | Bin backend/cdn/profiles/{ => full}/default.jpg | Bin backend/cdn/profiles/{ => full}/default.svg | 0 backend/cdn/profiles/thumb/0.jpg | Bin 0 -> 1735 bytes backend/src/cdn.rs | 123 +++++++++++++++++++- backend/src/llm.rs | 2 +- backend/src/main.rs | 64 ++++++++-- backend/src/messages/cache.rs | 87 ++++++++++++++ backend/src/{ => messages}/messages.rs | 108 +++++++++-------- backend/src/messages/mod.rs | 4 + build.sh | 4 + doc/arch.md | 37 ++++++ 16 files changed, 372 insertions(+), 64 deletions(-) create mode 100644 backend/cdn/profiles/full/0.jpg rename backend/cdn/profiles/{ => full}/8.jpg (100%) rename backend/cdn/profiles/{ => full}/default.jpg (100%) rename backend/cdn/profiles/{ => full}/default.svg (100%) create mode 100644 backend/cdn/profiles/thumb/0.jpg create mode 100644 backend/src/messages/cache.rs rename backend/src/{ => messages}/messages.rs (68%) create mode 100644 backend/src/messages/mod.rs create mode 100755 build.sh create mode 100644 doc/arch.md diff --git a/.gitignore b/.gitignore index 467b666..d956887 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .log* Cargo.lock .cargo/ +docker-compose* diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 1f1fd71..22207d7 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" argon2 = "0.5.3" chrono = { version = "0.4.42", features = ["serde"] } futures-util = "0.3.31" +image = "0.25.8" rand = "0.9.2" redis = { version = "0.25.4", features = ["tokio-comp"] } reqwest = { version = "0.12.23", features = ["json"] } @@ -15,6 +16,7 @@ rocket_cors = "0.6.0" rocket_db_pools = { version = "0.2.0", features = ["deadpool_redis", "sqlx_macros", "sqlx_postgres"] } rocket_dyn_templates = { version = "0.2.0", features = ["tera"] } serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" sha2 = "0.10.9" sqlx = { version = "0.7.4", features = ["macros", "time"] } tokio = { version = "1.47.1", features = ["full"] } diff --git a/backend/Rocket.toml b/backend/Rocket.toml index edba168..c55efe8 100644 --- a/backend/Rocket.toml +++ b/backend/Rocket.toml @@ -9,6 +9,10 @@ url = "postgresql://chatapp:chatapp@100.118.108.58:5432/chatapp" [default.databases.redis_cache] url = "redis://chatapp_redis:6379" +[debug.databases.redis_cache] +url = "redis://localhost:6379" + + [default] # run inside a docker container or pod address = "0.0.0.0" port = 8000 diff --git a/backend/cdn/profiles/full/0.jpg b/backend/cdn/profiles/full/0.jpg new file mode 100644 index 0000000000000000000000000000000000000000..f72a08279c52678626ec89fa1c92cdcc326b530a GIT binary patch literal 7408 zcmbVRc|26#`#)psjXey}glth6jbt6wCu85Kh%6P87E(xQD1}Osq%>hhWC=;gmaLx? z$-btLEM?7P88hboUU$^z`+5ET`7QU4xzD-h+-J^n&hwn-{k$*K2Mq&J3sZAbfZzfE z0T2K{H9(9XmXHWBz$J!2iXos+U_apJ<>lk$;pgMy6A<7RL~Rs8ZP|S08doMo4z>V6G@oD@@`wD9%9opx&?(#l&RX|Wm8Z9HMtfH!>z740dTX&D1{{FuX zm=es)EgYSkT@D^P?CNvu_z7P>;>n=%7lK1VFNR%d@QVMr)(!h_c8LKl1Q$0qlA8~<3&9lv4-n(#QNZ$w@3rUi3Y1XPj^y9CFFviX zRzPW&!@Q*TxpqORt+)~8pRj2h%l>zUUHyMq_K#uzv1q{Zs#&qg}q3Y@#mZL~F@DJLBNgx39OYxE61pk$kQl7X<7O zCY!Fv&5wPozBty%tUM0^ckJYoiuojSZ{_2qE`NDn_VUI5c?lXchMDTusF{e*a;c zV>I9$Aj9y4a450=N9qR=+xa`U*5cC1 z`bf!N84Sj$Z@~pYh#O0|wK1a6M~i!vx>IG5?V;--1qVFhTs2_^zo z-V!GK)<{`vpp1=EiHtSjw+6~cS!*qE&YDNYQh)_7j^e3|6osW@q1UY0?VZ&jV)uS% z8|KM*hCG+67n=Eg>DhuC;>&o2uvjs;zwUal|9L7SIeKsuVBsT5fuD8wq3_oaaVQJ0 z&kv!7?jS~R7O9-(Eny;H@-_jU+`44KZ=-(u`};g$gn9(WSF_0)QE`^oNsuvMF@2)J zL!w97fKHRtpg}<*RiaVlmmpBtN@ZV;5bvXv$f3_@7=E%zV@o zi9U%)?#|+>MqBTTT_UW`Tme>5)?P-I0hdPJDgE+5~25<)C z#8?RM8^hyA>FG?<=zy^k;S?tfj9@yO%;x4XlR4@X1`IKLQMr+@Al9YB(p*eH2H+!5$#fC2=Sj$?-(nFwZc2WC{Ox zr%;-w@~zVRP`yM|+2l2u;?2%aTKtLce2e5k>ehX%u+v5C>9h6mdm$(rI~uEhv7^dp)u z$sx(|eDDHKutVGv9z}mUj)TH3&xwEtQ3|iJmVQ{xuqXxgdj%ZoLC@?Z^&bBawb8Qd zWF?*Et+?~?p-lTYuGe1g=8lLrtLvs+o(^GYZkl5ZtCNm*&Kd`+vNSd37{hAp=Bz=z zKI;67IS5Qdq%(I=1IDG-9j zV5Za4`xNe4=d^4~q z!T$OD=!|~-m;_sLDRFI-u$1Dp5Y5t<(O|wM&*;}SZDSd9kblbJOSWB)Ce>AR;9ryU zj}9bWts~WycNke?1`?ya@FPcvm@oJg%s}GRS_lXZnX!MeCfQwiQX(rZtArfZB)dD0 zi&Xcjea&`)u$1o=3jx7TghhM`e#W(ipj&UW{cg-twa;;T#=9N65sfF`Y-Bf;FJ!*w zn=Q=`a|%rD(KYc+Q4w+d^vTSdbZqYw;niLK^D+FxU&*Hx9+ke@>HjfAHUF&OO%sO} zKdc%;{WcMY=)h+OspDu5Ka*-c;S5yH*YDqV_TWrYrMk-=eD0v^a}(d zx!xRVQv)=v2xa+grW@g0$^+JKTlA>1b>xiLO>=3(y}?}8PA@iJ>O1%nyUV13#=mo; zQSP-)$$;Rya^-W2&qrdde1c4bXufvV-T|D{$no)56Crp58#1{cluwJJUIPNPeSsrWN1UKxSEFLz(wZsAZ;jI-*dk3b-`9RfqnElo^taUKG#eh`>@ zrgzj+#2CYv!K^J3G%LYkc@zYiIXy>SPGi4Z#?uGz%t?o_@Cd^d(nK+7xh*|$3H+mN z-<^Yf$ku4~B?1J(1^=V$m)h9FKZ@7o;{6aXdIEtrf6Kj`Ms9umD%R4d!HaM-$*9Qib~v1O14`+lCdO2|p>^^Owp$D+MDDq@>~l+r>bGa30mIrJjcU zpGmE+smm=pLYL7)g})ne^RwVpZ_K%PtE@gTg5l=rV@X|WJMga5&lkbEwA1pPx7f3= zBUg&&EZ=dz&G{a<+RiLch>bhQP9?eGFMQ+jafUz^wTJxqBI=CJ(bbD~B>r*U-mFbJUNwwum-CUO1Jo zz>-=y4Lxdrhzf==idkDw8xOBBOd?I(CM~zfwU+TEh_HT=*^G_u%V5C^0;yOC3@LKd zMzcd3A#nMo9yLJX-S38=9yMS~MdztGMQqDi8NT*FXThRt6`mI}D>LFW?EUs#lVN5_ z&W=o7I&=iplDpQZ$Q*^oJjP z-gaCPoX}ZEDDZhlZ^*=|Au{05zF#wKy!i-WbC#i%NeHuKY&`x9#jh;&;b8AoJ3E94 z7nO2@yF^J4aKakCWgtxC`0l6>x)kHgG^t@#x8H2vTbP`q^94RNA|L0KN!8jqy4ktj zu6sw z&Q;gM5m&#IjCM7Rkpd^(o2m0JepS!PHZyl*AF*9IL^-%Dgz3LP2*i(ge?K~iBOUMF za0erfOY0WeIsGgxkTi1aJN`3{blhb_G)psIm6=JNepaK((#%)li1i{Xhr5PI~MJMUvM*Mp}301(oIReFv>PpXA*zv9^Pcvi1Go zfbNe^3it3PmSTvm{ z3Pzgl5TGg$db3Gd)2b|mia8cTkmQlZs`bK;pn6FY@9Y;>F(0HvS9W=<51b@vy`}Q{ z57ED>t*(iTBk&^|daEZIIKz1S3sC-2{uTa;(p{+TdTG-zbyV8Y1~TKD##!R#Vb#7J zq=~!f1B9}-Ygb3v7X^(nE=BadEZdtPy|LTuO}fTr1LueVUS*_s68}HG64!zhhvCfb zB>BBMku|#`Q~0j^is~*2oIuq}Ya03Al6UXO%A6){ei;+L#jM@=U<4>gK3y8@XMl^? zWg@7zhbF)$sU!{OaU>3$h(NU=C}5WvZ-wSf`!4NNenraZlM(cz8;Q3vfA}_`Z`zpr zDhLvA?z&N#mUN+u>qKO%73#D{zoW#Kw_&~3GDD`j6id-?aeydB?So_OPBFneS4%%v1oLC}-Iz>UykCvZgj2J6%NKpg zb3-i6Try)}%^yFV1OcsC%>2t){6zC&I#UJzL0W<><=)6*Tq!0UYh+ygN{;PSVjqc~ zuw*LYKNv)_wr?Oa7BDH8elfOZ8L6(l40aAi`&v8(_6Xt;QfmxrlKqZMNA3S?|uMM~m^!1b&^2pB8Tvwn{~`Fm_5nGQ&G-LN7tM*A8WGcMz>{PpzO2h%-iV!k;o zL|gSN>EKx@y{umMzM>t67Pt1Etk1w^xt(Q<V&`uE|h5;o}EJ4(k;nr!a>rj~1lS!SlN zuS2hfE7QU5$Oqp`S*+bcTvan4^D%kwoKc5!uJ;Bm&$Fr{p`L5+UzBpvN;!;6k zv1IWjCxZb5t`oB$&{~_$bjk9%gF)fwJ>NRmJvn_yw&ax*2;f!7Ghrza_>qIeA^L(J zyEmr~ULW@?QtSJ4rbiYB)s@;nfSN#9sx?@ zg|Ju@e94dC4fp5XNY)if6<0JzE{FSDT&%3hi2e|muyWbWs;$YQB3aBMInc0a@`+OV zfwk2upCh6V++zM(t3)@pI}-aDtS?BOI^m>mF7ZM%wA0#D0TZojdv&2+MHs6xQiJ5qZwgZJ~%viwP-Q#jA@ikln~ zWqs>lIwi%(Gl1V(i^Gz@SRI^NIqJr)?|yIs0uB?x8V{wTm%3)Ud<$JqN!GJB zp%%L7yR!Ben4=1-@DmxN0M%1j{4HU%55|VK zjB9rMfXX(Ny#G9&W`eV}jH&fkIVyeDRU)eJ6M3ZNZmIAH zYxtjjKi1y#c*@P}c9E^@Gy3wgbQ_0YGm#xXg`ew$n?H5nwyX>Cn3W~Jyk7AjJ|X@G zUVjoDMYd|Q(PzFQr!rHww0oqJaLv_4UGBCm%! zFTr$x0Q*V?Y{rrk6Ho^#qFHP2ArS7kgkz#?XHFB?SH5w2ad7V{dA-+unz)3(mn>o! ztOWLIdSB->HtS#)X=1^a`Gf3Z`$AauLEfl_PVMIYr_(t*V!YPU@<^?2)OYRElS?M5 zOE(H3V04D0L?}w{7uhFsMn}E6N2mS@Z;mNtpk_b&(c1g!xdi7~lRi8+;rA%uI_Q&*g3v4W+`ZVQ>%-SCaZdzavhPy81pmty1{ zz#)soIh0$H5Y5|;i$g1Qt`|BmV6yV{8a^wSS!&?!PX6Ub{>)yySbk%B`;o%Wu|?f! z9n;%K#s2n3+jnYsa6juxs62Kr{AMpZ9LD(vY>TM@64Nf#^cm^31+}GwS+4x+Y51R< z)&S@zj91`GDVNDuyG zuXOaPT}ImnA%_Onvh&Qi)r0QalAAtcd>H71ODURgGNoZy9F8(hZ44u74voc;I6#^^ z6DzX+7fr-tg=ly#G_;}YOZ2!T{ZWQ%ez2v11^us$zz&%{mI`Lu;_d1gQLn#W- z{)Rqk7GAkCRbTq%JRWy?xwEIdaF1VaT zFij7aoCCgjL12SH{Sdv2?L<2M)qqb)WaaRe%@9b9nKx7S_k>dkF`pDc`1Not@8Q*|P1U_OGA!2pMuIE{_5 z25|O=xe5gw=zw|1!A^Aa=4%Ty?mfRSiO5ZjG*&i3*obe;$~$lpzYP%K2fs&*wq0>T zS$N{wuxbb)q8QOHk zb5(YAz}d@1Fd)QAz&wBbJHddD1iTvPs7{1BjL=}J77X}^a_DRtZ-Nykz&Qnv`g&(n z+aKu>?O2et{A+8?{3uMmuS=dcS2CPaj4tcvGADZgPXNOiJwoVPKxX+skpJ!e`fp8<S}CAc7ow>h*rr(|8{u^V7V=X~UVLjips47O=;0uQc_IF;LP329FAQK1-w zQVzSF6zM6-ciHGUVc)N?`(CAPo*i{b9OD?ZP?xC*y-gZ zXD;x(kSc{_4GxNzo%B;JO(aCpw}#0Pjc!h7%!NB&2; zNxa8@`(^T$AO89!UKYYZ!DgGx-5>Spe-oQokZyT5Vd(w(wP}#4)RH9y0+Gqjo)$(2 kT{8MIq}R+dX{0exh;Hh;b%}5ZZDHeOV literal 0 HcmV?d00001 diff --git a/backend/cdn/profiles/8.jpg b/backend/cdn/profiles/full/8.jpg similarity index 100% rename from backend/cdn/profiles/8.jpg rename to backend/cdn/profiles/full/8.jpg diff --git a/backend/cdn/profiles/default.jpg b/backend/cdn/profiles/full/default.jpg similarity index 100% rename from backend/cdn/profiles/default.jpg rename to backend/cdn/profiles/full/default.jpg diff --git a/backend/cdn/profiles/default.svg b/backend/cdn/profiles/full/default.svg similarity index 100% rename from backend/cdn/profiles/default.svg rename to backend/cdn/profiles/full/default.svg diff --git a/backend/cdn/profiles/thumb/0.jpg b/backend/cdn/profiles/thumb/0.jpg new file mode 100644 index 0000000000000000000000000000000000000000..aa5cd7235a72a0d817a61a096031f7c1a7d00bd3 GIT binary patch literal 1735 zcmbWzYc$k(7{Kx0fBthZE?Z+J*U>Nt(I}UQ+S!!b9yP{U$zSd1DDhr{F5)HO+3ni?9K`r5>KBqIaL zLL&n*+1QL`VNA6#C6g_ktZbGpb8v8=Sgcs(Y`2nT?_f7~1j6I-ni`sVT3UK`CS()4 z{~U53Pym21&>w-XK`0oF!Q#~L>Ke#>aD6^NK`@L$!x#)24FGfqLcst{z-XH+Ta6`d z48~FSk?fKM=haNzsv8#gJ()1G4~b5}tLy0M=^L0YvaqyT?BKZE$$5p#nzim8o?hOJ zO`Dl4w*Qs@PAE4l{QHQ=nEkN_;^Ge`r2cp$?Pz+&vE1V)^72m>2n&lY6ql5im0zq7 zU%hty#?4zbwf7qDH#N7kK6v=_S!Y*wPj6rU@XL``qpxMZj=hn8_&E7#N}>EbJ?DY| z{3i>7zRLdLA^;SGLZe|cZq5av;^r0zXpG4+toG`SxZr(6s$H@g$xU#+x&d!$?>Dg^ zB>IWEj+w)-xqOcHhwR_MQvR3hFWBF%exM0M07ZZafDT4(TS?z0T5rD78RA{_3d(oV z_dmL^;my#)x=i?FGhtJC)zB~k(6TcggM#i{P%l;g#BUW|JRu-DV*E1-g)C}Vsoz>! zpWbgiBJ$1jBv-9A+t_e}mc56sQu%n?v!2gAf}BSjKU#@-gl)?#-m}P_<2ZIYd?#0G zNsHQXcI)T3*5vm5!{x2*`j;1$Mp#OkJULunH=dH!&8NRIiABH<-B)M3&wVJmQ*dySTkkFWdT2ddGfd-eF2wq>-gl+yVuuYX9u~ zn}_yQ)%A&ntTK1^UZ`2@wQ4p`Hd0!?HCH3Z)Jt@Rp=3SDaVf1tKyu~wm}cM4rV8B< ztJ_*7j|WQcbQVW(Y!CQ;By|U~T-^^fa9lS}V2|pYTZw?kTF)#EYj5XZ!9ZC;e?E7c z#N$zr$=Sr^!l-KdJ+a2&=C$lABE~3L%LQ}F6(tp_I*sfnk8X0+%tOxsH% zNwe>ju-KG0FWx%h`P9#xT-zLq@{!a$fRP*C6qFqaU7dIKlr~bs}uH;W7^jy{N z64#BA2OBMg_oKGJMrEDs&^@~t?qQMQT-O{e+DY2AOp!QqS5}rxed`V1vH)pQZNw@DnV~*^1XSJqlJH@L904y>nPQkL{RRQO2v8__S+)@cyu^1#0>i$fRsC^1 zPVx{jW@b#~)2JHS+rF|UXRWSwx({}VEO^P|=kjTGwQj&eMUAKHUJj9-AAKr&sN{gh zRA(_etip<_6UjB(ui2H+ED#MR-l07U^p1`1s!UmmIUX%KU>5g2u(y?&XYH48B6~7E zwSKsc6Pzt#+utJ~Wy8ozI+R^mh5#uA0i1PjdJ#asJF7O{Oi#a!x%SF` zX^Zieg*u-Wjg)>wf)`@~vJ8J3dDa5;ikaPm@E& z`xm!V@b0xubTqTxj!gy6I!bxmmq~T<=xT}F^WMOEzp-1YyIis=w&Y7*#s^hPYr8{j z&n5Gke6NxCCxgNb7K7*s>%Q!T5oQw7mbO5K*w?h^H&%$-G&_o!m!)mok>tIg4ZrIk zwZ#QgjPEt01O>^qFA6GXE@rw^Jq^m4A@W~@w=<^Hy^a(L*ZOom^vd`;kDMZrIFW^7 ncf&O>HZM&BzK4KMZ=2g Vec { routes![profile_pic, general] @@ -9,11 +21,15 @@ pub fn routes() -> Vec { #[get("/profile/")] pub async fn profile_pic(user_id: usize) -> Option { if let Ok(image) = - NamedFile::open(Path::new("./cdn/profiles/").join(format!("{}.jpg", user_id))).await + NamedFile::open(Path::new("./cdn/profiles/full/").join(format!("{}.jpg", user_id))).await { Some(image) } else { - Some(NamedFile::open("./cdn/profiles/default.svg").await.ok()?) + Some( + NamedFile::open("./cdn/profiles/full/default.svg") + .await + .ok()?, + ) } } @@ -23,3 +39,102 @@ pub async fn general(file_name: PathBuf) -> Option { .await .ok() } + +#[derive(Serialize)] +pub struct UploadResponse { + success: bool, + message: String, + url: Option, +} +// Upload endpoint - handles image upload and creates multiple sizes +#[post("/profile//upload", data = "")] +pub async fn upload_profile_pic( + user_id: usize, + mut file: Form>, +) -> Result, Status> { + const MAX_FILE_SIZE: u64 = 5 * 1024 * 1024; + if file.len() > MAX_FILE_SIZE { + return Ok(Json(UploadResponse { + success: false, + message: "File size exceeds 5MB limit".to_string(), + url: None, + })); + } + + // Read file contents into buffer + let mut buffer = Vec::new(); + file.open() + .await + .map_err(|e| { + eprintln!("Failed to open file: {}", e); + Status::BadRequest + })? + .read_to_end(&mut buffer) + .await + .map_err(|e| { + eprintln!("Failed to read file: {}", e); + Status::BadRequest + })?; + + // Check if buffer is empty + if buffer.is_empty() { + return Ok(Json(UploadResponse { + success: false, + message: "Uploaded file is empty".to_string(), + url: None, + })); + } + + // Validate format from buffer + let format = image::guess_format(&buffer).map_err(|e| { + eprintln!("Failed to guess format: {}", e); + Status::BadRequest + })?; + + // Only allow specific image formats + let allowed_formats = [ + ImageFormat::Jpeg, + ImageFormat::Png, + ImageFormat::WebP, + ImageFormat::Gif, + ]; + + if !allowed_formats.contains(&format) { + return Ok(Json(UploadResponse { + success: false, + message: "Unsupported image format. Only JPEG, PNG, WebP, and GIF are allowed." + .to_string(), + url: None, + })); + } + + // Decode and validate the image before persisting anything + let img = image::load_from_memory(&buffer).map_err(|e| { + eprintln!("Image decode error: {}", e); + Status::BadRequest + })?; + + // Now that we know it's valid, ensure directories exist + let base_path = Path::new("./cdn/profiles"); + fs::create_dir_all(base_path.join("thumb")).map_err(|_| Status::InternalServerError)?; + fs::create_dir_all(base_path.join("full")).map_err(|_| Status::InternalServerError)?; + + // Create thumbnail (64x64) for chat lists + let thumb = img.resize_to_fill(64, 64, FilterType::Lanczos3); + let thumb_path = base_path.join("thumb").join(format!("{}.jpg", user_id)); + thumb + .save_with_format(&thumb_path, ImageFormat::Jpeg) + .map_err(|_| Status::InternalServerError)?; + + // Create full size (256x256) for profile views + let full = img.resize_to_fill(256, 256, FilterType::Lanczos3); + let full_path = base_path.join("full").join(format!("{}.jpg", user_id)); + full.save_with_format(&full_path, ImageFormat::Jpeg) + .map_err(|_| Status::InternalServerError)?; + + Ok(Json(UploadResponse { + success: true, + message: "Profile picture uploaded successfully".to_string(), + url: Some(format!("/cdn/profile/{}", user_id)), + })) +} diff --git a/backend/src/llm.rs b/backend/src/llm.rs index f1aa988..5a5388a 100644 --- a/backend/src/llm.rs +++ b/backend/src/llm.rs @@ -29,7 +29,7 @@ impl LlmWorker { // Build the request body let payload = LlmRequest { - model: "gemma2-9b-it".into(), // whatever model you run locally + model: "gpt-oss-20b".into(), // whatever model you run locally messages: vec![Message { role: "user".into(), content: message.text.clone().into(), diff --git a/backend/src/main.rs b/backend/src/main.rs index 733757a..a1a298c 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -2,6 +2,7 @@ #[macro_use] extern crate rocket; +use redis::cmd; use rocket::fs::{FileServer, NamedFile}; use rocket::http::Method; use rocket::serde::json::Json; @@ -13,7 +14,6 @@ use std::sync::Arc; use crate::auth::Session; use crate::db::{Postgres, Redis}; -use crate::messages::ChatBroadcaster; pub mod auth; pub mod cdn; @@ -35,20 +35,18 @@ async fn users(_ag: Session, mut db: Connection) -> Json> { } #[get("/users/", rank = 1)] -async fn display_name(id: usize, _ag: Session, mut db: Connection) -> String { - sqlx::query!( - "SELECT display_name, username FROM users WHERE id = $1", - id as i32 - ) - .fetch_one(&mut **db) - .await - .map(|row| row.display_name.unwrap_or(row.username)) - .unwrap_or_else(|_| "User not found".to_string()) +async fn display_name( + id: usize, + _ag: Session, + mut pgsql_conn: Connection, + mut redis_conn: Connection, +) -> String { + UserCache::username(id, &mut redis_conn, &mut pgsql_conn).await } #[launch] fn rocket() -> Rocket { - let chat = Arc::new(ChatBroadcaster::new(32)); + let chat = Arc::new(crate::messages::ChatBroadcaster::new(32)); let cors = CorsOptions::default() .allowed_origins(AllowedOrigins::all()) @@ -73,7 +71,6 @@ fn rocket() -> Rocket { routes![ favicon, messages::chat_page, - messages::chat_page_preview, auth::signup_page, auth::login_page, auth::mfa_page, @@ -83,6 +80,7 @@ fn rocket() -> Rocket { .mount( "/api", routes![ + cdn::upload_profile_pic, messages::get_messages, messages::post_message, messages::event_stream, @@ -109,3 +107,45 @@ fn rocket() -> Rocket { async fn favicon() -> NamedFile { NamedFile::open("static/favicon.ico").await.unwrap() } + +pub struct UserCache {} + +impl UserCache { + pub async fn username( + id: usize, + redis_conn: &mut Connection, + pgsql_conn: &mut Connection, + ) -> String { + if let Ok(val) = cmd("GET") + .arg(&[format!("users:{id}")]) + .query_async(&mut **redis_conn) + .await + { + return val; + } + + if let Ok(v) = sqlx::query!("SELECT username FROM users WHERE id = $1", id as i32) + .fetch_one(&mut ***pgsql_conn) + .await + { + let username = v.username; + Self::insert(id, &username, redis_conn).await; + username + } else { + unimplemented!() + } + } + + pub async fn insert(id: usize, username: &str, conn: &mut Connection) { + cmd("SET") + .arg(&[ + format!("users:{id}"), + username.to_string(), + "EX".to_string(), + "1800".to_string(), + ]) + .query_async(&mut **conn) + .await + .expect("failed to insert key") + } +} diff --git a/backend/src/messages/cache.rs b/backend/src/messages/cache.rs new file mode 100644 index 0000000..445f06f --- /dev/null +++ b/backend/src/messages/cache.rs @@ -0,0 +1,87 @@ +use redis::AsyncCommands; +use rocket_db_pools::Connection; + +use crate::{ + db::{Postgres, Redis}, + messages::ChatMsg, +}; + +// Helper function to cache message in Redis +pub async fn insert( + cache: &mut Connection, + channel_id: i32, + msg: &super::ChatMsg, +) -> Result<(), redis::RedisError> { + let key = format!("messages:{}", channel_id); + let msg_json = serde_json::to_string(msg).unwrap(); + + redis::pipe() + .atomic() + .lpush(&key, &msg_json) + .ltrim(&key, 0, 99) + .expire(&key, 86400) // 24h expiry + .query_async(&mut ***cache) + .await +} + +// Helper function to get cached messages from Redis +pub async fn get( + redis: &mut Connection, + channel_id: i32, +) -> Result, redis::RedisError> { + let key = format!("messages:{}", channel_id); + + // query last 100 messages + let messages: Vec = redis + .lrange::<_, Vec>(&key, 0, 99) + .await? + .into_iter() + .rev() // Reverse because LPUSH adds to front, so newest is first + .filter_map(|msg_str| serde_json::from_str(&msg_str).ok()) + .collect(); + + Ok(messages) +} + +// Helper function to initialize cache from Postgres if empty +pub async fn initialise( + cache: &mut Connection, + db: &mut Connection, + channel_id: i32, +) -> Result<(), Box> { + let key = format!("messages:{}", channel_id); + + let length: usize = cache.llen(&key).await?; + if length < 100 { + // Fetch from Postgres + let messages = sqlx::query!( + "SELECT u.username, u.display_name, u.id, m.content, m.created_at + FROM messages m + JOIN users u ON m.user_id = u.id + WHERE m.channel_id = $1 + ORDER BY m.created_at DESC + LIMIT 100", + channel_id + ) + .fetch_all(&mut ***db) + .await?; + + // Populate cache (in reverse order so oldest is at the end) + for msg in messages.into_iter().rev() { + let chat_msg = ChatMsg { + display_name: Some(msg.display_name.unwrap_or(msg.username)), + user_id: msg.id as usize, + text: msg.content, + timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize, + }; + + let msg_json = serde_json::to_string(&chat_msg)?; + + cache.lpush::<_, _, ()>(&key, &msg_json).await?; + } + + cache.expire::<_, ()>(&key, 86400).await?; + } + + Ok(()) +} diff --git a/backend/src/messages.rs b/backend/src/messages/messages.rs similarity index 68% rename from backend/src/messages.rs rename to backend/src/messages/messages.rs index 0a35fd8..3723c70 100644 --- a/backend/src/messages.rs +++ b/backend/src/messages/messages.rs @@ -1,12 +1,13 @@ use std::sync::Arc; -use redis::cmd; +use redis::{AsyncCommands, cmd}; use rocket::{ Shutdown, response::stream::{Event, EventStream}, serde::json::Json, time::OffsetDateTime, }; +use rocket_cors::CorsOptions; use rocket_db_pools::Connection; use rocket_dyn_templates::{Template, context}; use serde::{Deserialize, Serialize}; @@ -52,10 +53,12 @@ pub struct ChatMsg { pub async fn post_message( mut msg: Json, chat: &rocket::State>, - mut db: Connection, + mut postgres: Connection, + mut cache: Connection, session: Session, ) -> Result<(), String> { const CHANNEL_ID: i32 = 1; + let channel_id = CHANNEL_ID; const LMSTUDIO_URI: &'static str = "http://127.0.0.1:1234/v1/chat/completions"; let chat = chat.inner().clone(); @@ -64,7 +67,7 @@ pub async fn post_message( "SELECT display_name, username FROM users WHERE id = $1", session.user_id as i32 ) - .fetch_one(&mut **db) + .fetch_one(&mut **postgres) .await .map(|row| row.display_name.unwrap_or(row.username)) .unwrap_or_else(|_| "Unknown".to_string()); @@ -80,16 +83,22 @@ pub async fn post_message( msg.text, OffsetDateTime::from_unix_timestamp_nanos(msg.timestamp as i128 * 1_000_000).unwrap() ) - .execute(&mut **db) + .execute(&mut **postgres) .await .map_err(|_| "Failed".to_string())?; + super::cache::insert(&mut cache, channel_id, &msg) + .await + .map_err(|_| "Redis cache failed".to_string())?; // get response tokio::spawn(async move { let response = LlmWorker::new(LMSTUDIO_URI.to_string()).query(&msg).await; if let Ok(reply) = response { chat.publish(reply.clone()).await; + super::cache::insert(&mut cache, CHANNEL_ID, &reply) + .await + .ok(); sqlx::query!( "INSERT INTO messages (channel_id, user_id, content, created_at) VALUES ($1, $2, $3, $4)", @@ -98,7 +107,7 @@ pub async fn post_message( reply.text, OffsetDateTime::from_unix_timestamp_nanos(reply.timestamp as i128 * 1_000_000).unwrap() ) - .execute(&mut **db) + .execute(&mut **postgres) .await .map_err(|_| "Failed".to_string()) .unwrap(); @@ -109,31 +118,58 @@ pub async fn post_message( } #[get("/messages")] -pub async fn get_messages(mut db: Connection, _session: Session) -> Json> { - Json( - sqlx::query!( - "SELECT u.username, u.display_name, u.id, m.content, m.created_at FROM messages m JOIN users u ON m.user_id = u.id ORDER BY m.created_at DESC LIMIT 100" - ) - .fetch_all(&mut **db) - .await - .unwrap_or_else(|_| Vec::new()) - .into_iter() - .rev() - .map(|msg| ChatMsg { - display_name: Some(msg.display_name.unwrap_or(msg.username)), - user_id: msg.id as usize, - text: msg.content, - timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize, - }) - .collect(), +pub async fn get_messages( + mut db: Connection, + mut redis: Connection, + _session: Session, +) -> Json> { + const CHANNEL_ID: i32 = 1; + let channel_id = CHANNEL_ID; + + if let Ok(messages) = super::cache::get(&mut redis, channel_id).await + && !messages.is_empty() + { + return Json(messages); + }; + + if let Err(x) = super::cache::initialise(&mut redis, &mut db, channel_id).await { + eprintln!("WARN: {x:?}"); + } + + if let Ok(messages) = super::cache::get(&mut redis, channel_id).await + && !messages.is_empty() + { + return Json(messages); + }; + + let res = sqlx::query!( + "SELECT u.username, u.display_name, u.id, m.content, m.created_at + FROM messages m + JOIN users u ON m.user_id = u.id + WHERE m.channel_id = $1 + ORDER BY m.created_at DESC LIMIT 100", + channel_id ) - // Json(vec![]) + .fetch_all(&mut **db) + .await + .unwrap_or_else(|_| Vec::new()) + .into_iter() + .rev() + .map(|msg| ChatMsg { + display_name: Some(msg.display_name.unwrap_or(msg.username)), + user_id: msg.id as usize, + text: msg.content, + timestamp: (msg.created_at.unwrap().unix_timestamp_nanos() / 1_000_000) as usize, + }) + .collect(); + Json(res) } #[get("/events")] pub async fn event_stream( chat: &rocket::State>, - db: Connection, + postgres: Connection, + cache: Connection, ag: Session, mut shutdown: Shutdown, ) -> EventStream![] { @@ -141,7 +177,7 @@ pub async fn event_stream( EventStream! { // Initialize the stream with the last 100 messages - for msg in get_messages(db, ag).await.0 { + for msg in get_messages(postgres, cache, ag).await.0 { yield Event::json(&msg); } @@ -171,25 +207,3 @@ pub async fn chat_page(session: Session) -> Template { pub async fn chat_page_preview(session: Session) -> Template { Template::render("chatpreview", context!(user_id: session.user_id)) } - -pub struct UserCache {} - -impl UserCache { - pub async fn username(&mut self, id: usize, redis_conn: &mut Connection) -> String { - if let Ok(val) = cmd("GET") - .arg(&[format!("users:{id}")]) - .query_async(&mut **redis_conn) - .await - { - return val; - } - } - - pub async fn insert(id: usize, username: &str, conn: &mut Connection) { - cmd("SET") - .arg(&[format!("users:{id}"), username.to_owned()]) - .query_async(&mut **conn) - .await - .expect("failed to insert key") - } -} diff --git a/backend/src/messages/mod.rs b/backend/src/messages/mod.rs new file mode 100644 index 0000000..556635c --- /dev/null +++ b/backend/src/messages/mod.rs @@ -0,0 +1,4 @@ +mod cache; +mod messages; + +pub use messages::{ChatBroadcaster, ChatMsg, chat_page, event_stream, get_messages, post_message}; diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..c6d2364 --- /dev/null +++ b/build.sh @@ -0,0 +1,4 @@ +source backend/.env +echo "Building against DB: $DATABASE_URL" +podman build --build-arg DATABASE_URL=$DATABASE_URL -t git.zxq5.dev/zxq5/chatapp-backend:$1 ./backend +podman push git.zxq5.dev/zxq5/chatapp-backend:$1 diff --git a/doc/arch.md b/doc/arch.md new file mode 100644 index 0000000..05c4770 --- /dev/null +++ b/doc/arch.md @@ -0,0 +1,37 @@ +- **Frontend Service** (serving React frontend) + - Communicates with backend APIs via HTTPS + - Authenticates users via JWT tokens + +- **Messenger** + - Handles real‑time messaging (WebSocket or long polling) + - Stores message metadata in Postgres + - Uses Redis for message caching + +- **CDN Service** (user uploaded media) + - Receives uploads, stores files on object storage + - Generates signed URLs with built in auth for secure access + +- **LLM Service** + - Exposes language‑model inference API for scripts + - Runs model in a containerized environment + - Stores usage logs in Postgres + +- **Auth Service** (user authentication & authorization) + - Validates credentials and issues JWTs + - Manages refresh tokens and token revocation + - Persists session state and audit logs in PostgreSQL + +- **Script Runner** + - Executes user or system scripts on demand + - Isolates execution in sandbox containers + - Provides access to "Tables" - SQLite files kept in object storage + - Persists logs to Postgres + +- **Redis Cache** + - Provides distributed caching for session data and messages + +- **PostgreSQL Service** (with volumes) + - Stores relational data: users, messages, media metadata, script logs + - Uses Docker volume for persistent storage + - Backed up via scheduled snapshots +```