use crate::api::auth::Session; use crate::error::ApiResult; use crate::svc::chat_svc::ChatService; use chrono::{DateTime, Utc}; use rocket::response::stream::Event; use rocket::serde::json::Json; use rocket::serde::{Deserialize, Serialize}; use rocket::{Shutdown, State, ___internal_EventStream as EventStream}; use sqlx::FromRow; use tokio::select; use tokio::sync::broadcast; /// ---------- Rocket routes ---------- #[derive(Debug, Serialize, Deserialize, Clone, FromRow)] pub struct ChatMsg { pub display_name: Option, pub user_id: i64, pub text: String, pub timestamp: DateTime, } #[post("/chat/", format = "json", data = "")] pub async fn post_message( msg: Json, chat: &State, session: Session, channel_id: i64, ) -> ApiResult<()> { chat.send(channel_id, session.uid, &msg.text, Utc::now()).await } #[get("/events/")] pub async fn event_stream( chat: &State, s: Session, mut shutdown: Shutdown, channel_id: i64, ) -> ApiResult { let messages = chat.get_messages(channel_id, 100) .await?; // if get message returned err, inform user. let mut rx = chat.subscribe(channel_id).await; let id = s.uid; Ok(EventStream! { for msg in messages { yield Event::json(&msg); } loop { select!{ _ = &mut shutdown => break, // exit early on shutdown msg = rx.recv() => match msg { Ok(msg) => { tracing::info!("yielding message!"); yield Event::json(&msg) }, Err(broadcast::error::RecvError::Lagged(n)) => { tracing::warn!("Receiver lagging on channel {channel_id} by {n} events",); yield Event::comment("RecvError::Lagged"); } Err(broadcast::error::RecvError::Closed) => { tracing::info!("Broadcaster hung up on channel {channel_id}!"); break }, }, } } }) }