use backend::rocket_builder; use backend::repo::mock::{MockUserRepo, MockTokenRepo}; use backend::repo::message_repo::MessageRepository; use backend::svc::chat_svc::ChatService; use backend::repo::{Repo, AccessTokenRepoTrait}; use rocket::local::asynchronous::Client; use rocket::http::{Status, ContentType, Header}; use serde_json::{json, Value}; use std::sync::{Arc, Mutex}; use sqlx::PgPool; use chrono::Utc; use backend::svc::llm_service::LlmService; async fn setup_client_with_svc(chat_service: ChatService, users: Arc, tokens: Arc) -> (Client, String) { let client = Client::tracked(rocket_builder(users.clone(), tokens.clone(), chat_service)).await.expect("valid rocket instance"); // Create a user and get JWT let token_code = "valid-token"; tokens.create_new(1, "test", token_code, 1, Utc::now(), Utc::now()).await.unwrap(); let jwt = { let signup_res = client.post("/api/signup") .header(ContentType::JSON) .body(json!({ "email": "test@example.com", "username": "testuser", "password": "password123", "access_token": token_code }).to_string()) .dispatch() .await; assert_eq!(signup_res.status(), Status::Ok); let login_res = client.post("/api/login") .header(ContentType::JSON) .body(json!({ "username": "testuser", "password": "password123" }).to_string()) .dispatch() .await; assert_eq!(login_res.status(), Status::Ok, "Login failed"); let body = login_res.into_string().await.expect("login body"); let auth_resp: serde_json::Value = serde_json::from_str(&body).unwrap(); auth_resp["token"].as_str().unwrap().to_string() }; (client, jwt) } #[rocket::async_test] async fn test_chat_event_stream_consistency() { unsafe { std::env::set_var("JWT_SECRET", "test_secret"); } let pool = PgPool::connect_lazy("postgres://localhost/unused").unwrap(); let messages = ::new(pool.clone()); let users_repo = Arc::new(MockUserRepo { users: Mutex::new(vec![]) }); let tokens_repo = Arc::new(MockTokenRepo { tokens: Mutex::new(vec![]) }); let llm_service = LlmService::new(); let chat_service = ChatService::new(1024, messages, users_repo.clone(), llm_service); let (client, jwt) = setup_client_with_svc(chat_service.clone(), users_repo.clone(), tokens_repo.clone()).await; // Use the same client for sender but with a different user (or the same, doesn't matter for broadcast) // Actually, to simulate another user, we should sign up another user. let jwt_sender = { let token_code = "valid-token-2"; tokens_repo.create_new(1, "test2", token_code, 1, Utc::now(), Utc::now() + chrono::Duration::days(1)).await.unwrap(); let signup_res = client.post("/api/signup") .header(ContentType::JSON) .body(json!({ "email": "test2@example.com", "username": "testuser2", "password": "password123", "access_token": token_code }).to_string()) .dispatch() .await; assert_eq!(signup_res.status(), Status::Ok); let login_res = client.post("/api/login") .header(ContentType::JSON) .body(json!({ "username": "testuser2", "password": "password123" }).to_string()) .dispatch() .await; let body = login_res.into_string().await.unwrap(); let auth_resp: serde_json::Value = serde_json::from_str(&body).unwrap(); auth_resp["token"].as_str().unwrap().to_string() }; let channel_id = 1; // Start listening to the event stream let mut response = client.get(format!("/api/events/{}", channel_id)) .header(Header::new("Authorization", format!("Bearer {}", jwt))) .dispatch() .await; assert_eq!(response.status(), Status::Ok); let num_messages = 5; // Reduced for faster debugging let mut received_count = 0; let jwt_clone = jwt.clone(); tokio::spawn(async move { for i in 0..num_messages { let msg = format!("Message {}", i); let res = sender_client.post(format!("/api/chat/{}", channel_id)) .header(ContentType::JSON) .header(Header::new("Authorization", format!("Bearer {}", jwt_clone))) .body(json!({ "display_name": "testuser", "user_id": 1, "text": msg, "timestamp": Utc::now() }).to_string()) .dispatch() .await; assert_eq!(res.status(), Status::Ok); tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } }); // Wait a bit for messages to be posted tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; // Consume the stream let text = response.into_string().await.unwrap(); println!("Received chunk: {}", text); let mut received_count = 0; for line in text.lines() { if line.starts_with("data:") { received_count += 1; } } assert_eq!(received_count, num_messages, "Should receive all posted messages. Received: {}. Full text: {}", received_count, text); }