//! Allows creation of asynchronous IO bound tasks. //! //! Written by @zxq5 for the most part with code from //! [here](https://github.com/phil-opp/blog_os/). //! use crate::prelude::*; use alloc::collections::BTreeMap; use alloc::sync::Arc; use alloc::task::Wake; use core::{ future::Future, pin::Pin, sync::atomic::AtomicU64, task::{Context, Poll, Waker}, }; use crossbeam::queue::ArrayQueue; use x86_64::instructions::interrupts::{self, enable_and_hlt}; pub struct Task { id: TaskId, future: Pin>>, } impl Task { pub fn new(future: impl Future + 'static) -> Self { Self { id: TaskId::new(), future: Box::pin(future), } } fn poll(&mut self, context: &mut Context) -> Poll<()> { self.future.as_mut().poll(context) } } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] struct TaskId(u64); impl TaskId { fn new() -> Self { static NEXT: AtomicU64 = AtomicU64::new(0); Self(NEXT.fetch_add(1, core::sync::atomic::Ordering::Relaxed)) } } pub struct Executor { tasks: BTreeMap, task_queue: Arc>, waker_cache: BTreeMap, } impl Executor { pub fn new() -> Self { Self { tasks: BTreeMap::new(), task_queue: Arc::new(ArrayQueue::new(100)), waker_cache: BTreeMap::new(), } } pub fn spawn(&mut self, task: Task) { let task_id = task.id; if self.tasks.insert(task.id, task).is_some() { panic!("task with same id already in tasks"); } self.task_queue.push(task_id).expect("queue full"); } fn run_ready_tasks(&mut self) { // destructure `self` to avoid borrow checker errors let Self { tasks, task_queue, waker_cache, } = self; while let Some(task_id) = task_queue.pop() { let task = match tasks.get_mut(&task_id) { Some(task) => task, None => continue, // task no longer exists }; let waker = waker_cache .entry(task_id) .or_insert_with(|| TaskWaker::new_waker(task_id, task_queue.clone())); let mut context = Context::from_waker(waker); match task.poll(&mut context) { Poll::Ready(()) => { // task done -> remove it and its cached waker tasks.remove(&task_id); waker_cache.remove(&task_id); } Poll::Pending => {} } } } pub fn run(&mut self) -> ! { loop { self.run_ready_tasks(); self.sleep_if_idle(); } } fn sleep_if_idle(&self) { interrupts::disable(); if self.task_queue.is_empty() { enable_and_hlt(); } else { interrupts::enable(); } } } impl Default for Executor { fn default() -> Self { Self::new() } } struct TaskWaker { task_id: TaskId, task_queue: Arc>, } impl TaskWaker { fn wake_task(&self) { self.task_queue.push(self.task_id).expect("task_queue full"); } fn new_waker(task_id: TaskId, task_queue: Arc>) -> Waker { Waker::from(Arc::new(Self { task_id, task_queue, })) } } impl Wake for TaskWaker { fn wake(self: Arc) { self.wake_task(); } fn wake_by_ref(self: &Arc) { self.wake_task(); } }