bda1ef251a
calling this v0.4.0
70 lines
2.2 KiB
Rust
70 lines
2.2 KiB
Rust
use crate::api::auth::Session;
|
|
use crate::error::ApiResult;
|
|
use crate::svc::chat_svc::ChatService;
|
|
use chrono::{DateTime, Utc};
|
|
use rocket::response::stream::Event;
|
|
use rocket::serde::json::Json;
|
|
use rocket::serde::{Deserialize, Serialize};
|
|
use rocket::{Shutdown, State, ___internal_EventStream as EventStream};
|
|
use sqlx::FromRow;
|
|
use tokio::select;
|
|
use tokio::sync::broadcast;
|
|
|
|
/// ---------- Rocket routes ----------
|
|
#[derive(Debug, Serialize, Deserialize, Clone, FromRow)]
|
|
pub struct ChatMsg {
|
|
pub display_name: Option<String>,
|
|
pub user_id: i64,
|
|
pub text: String,
|
|
pub timestamp: DateTime<Utc>,
|
|
}
|
|
|
|
#[post("/chat/<channel_id>", format = "json", data = "<msg>")]
|
|
pub async fn post_message(
|
|
msg: Json<ChatMsg>,
|
|
chat: &State<ChatService>,
|
|
session: Session,
|
|
channel_id: i64,
|
|
) -> ApiResult<()> {
|
|
chat.send(channel_id, session.uid, &msg.text, Utc::now()).await
|
|
}
|
|
|
|
#[get("/events/<channel_id>")]
|
|
pub async fn event_stream(
|
|
chat: &State<ChatService>,
|
|
s: Session,
|
|
mut shutdown: Shutdown,
|
|
channel_id: i64,
|
|
) -> ApiResult<EventStream![]> {
|
|
let messages = chat.get_messages(channel_id, 100)
|
|
.await?; // if get message returned err, inform user.
|
|
|
|
let mut rx = chat.subscribe(channel_id).await;
|
|
let id = s.uid;
|
|
|
|
Ok(EventStream! {
|
|
for msg in messages {
|
|
yield Event::json(&msg);
|
|
}
|
|
|
|
loop {
|
|
select!{
|
|
_ = &mut shutdown => break, // exit early on shutdown
|
|
msg = rx.recv() => match msg {
|
|
Ok(msg) => {
|
|
tracing::info!("yielding message!");
|
|
yield Event::json(&msg)
|
|
},
|
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
tracing::warn!("Receiver lagging on channel {channel_id} by {n} events",);
|
|
yield Event::comment("RecvError::Lagged");
|
|
}
|
|
Err(broadcast::error::RecvError::Closed) => {
|
|
tracing::info!("Broadcaster hung up on channel {channel_id}!");
|
|
break
|
|
},
|
|
},
|
|
}
|
|
}
|
|
})
|
|
} |