async works!

This commit is contained in:
2025-02-23 19:27:13 +00:00
parent efe3034ad0
commit 7ff33659fe
9 changed files with 73 additions and 91 deletions
+1 -1
View File
@@ -79,7 +79,7 @@ extern "x86-interrupt" fn double_fault_handler(
} }
extern "x86-interrupt" fn keyboard_interrupt_handler(_stack_frame: InterruptStackFrame) { extern "x86-interrupt" fn keyboard_interrupt_handler(_stack_frame: InterruptStackFrame) {
use pc_keyboard::{layouts, HandleControl, Keyboard, ScancodeSet1}; use pc_keyboard::{HandleControl, Keyboard, ScancodeSet1, layouts};
// use pc_keyboard::DecodedKey; // use pc_keyboard::DecodedKey;
use spin::Mutex; use spin::Mutex;
use x86_64::instructions::port::Port; use x86_64::instructions::port::Port;
+1 -1
View File
@@ -6,7 +6,7 @@ extern crate alloc;
use core::arch::asm; use core::arch::asm;
use limine::BaseRevision; use limine::BaseRevision;
use libk::alloc::init_heap; use libk::kalloc::init_heap;
use libk::prelude::*; use libk::prelude::*;
use x86_64::VirtAddr; use x86_64::VirtAddr;
+4 -3
View File
@@ -4,9 +4,10 @@
extern crate alloc; extern crate alloc;
use libk::{ use libk::{
io, io::{self, keyboard},
prelude::*, prelude::*,
scheduling::task::{Executor, Task}, scheduling::task::{Executor, Task},
// scheduling::task::{Executor, Task},
}; };
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
@@ -46,8 +47,8 @@ extern "C" fn kmain() -> ! {
); );
let mut executor = Executor::new(); let mut executor = Executor::new();
executor.spawn(Task::new(io::keyboard::print_keypresses())); executor.spawn(Task::new(keyboard::print_keypresses()));
executor.try_run(); executor.run();
loop {} loop {}
} }
+22 -38
View File
@@ -1,89 +1,73 @@
extern crate alloc;
use core::{ use core::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use crate::prelude::*;
use crossbeam::queue::ArrayQueue; use crossbeam::queue::ArrayQueue;
use futures_util::{Stream, task::AtomicWaker}; use futures_util::{Stream, StreamExt, task::AtomicWaker};
use pc_keyboard::{DecodedKey, HandleControl, Keyboard, ScancodeSet1, layouts};
use spin::Once; use spin::Once;
use crate::println;
use super::print;
static KBD_QUEUE: Once<ArrayQueue<u8>> = Once::new(); static KBD_QUEUE: Once<ArrayQueue<u8>> = Once::new();
static WAKER: AtomicWaker = AtomicWaker::new(); static WAKER: AtomicWaker = AtomicWaker::new();
pub fn add_scancode(scancode: u8) { pub fn add_scancode(scancode: u8) {
if let Some(queue) = KBD_QUEUE.get() { if let Some(queue) = KBD_QUEUE.get() {
if queue.push(scancode).is_err() { if let Err(_) = queue.push(scancode) {
println!("WARNING: scancode queue full; dropping keyboard input"); println!("WARNING: scancode queue full; dropping keyboard input");
} else { } else {
println!("waking waker");
WAKER.wake(); WAKER.wake();
} }
} else { } else {
println!("WARNING: scancode queue not initialised"); println!("WARNING: scancode queue not initialized");
} }
} }
pub struct ScanCodeStream { pub struct ScancodeStream {
_private: (), _private: (),
} }
impl ScanCodeStream { impl ScancodeStream {
pub fn new() -> Self { pub fn new() -> Self {
KBD_QUEUE.call_once(|| ArrayQueue::new(5)); KBD_QUEUE.call_once(|| ArrayQueue::new(5));
ScancodeStream { _private: () }
Self { _private: () }
} }
} }
impl Default for ScanCodeStream { impl Stream for ScancodeStream {
fn default() -> Self {
Self::new()
}
}
impl Stream for ScanCodeStream {
type Item = u8; type Item = u8;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let queue = KBD_QUEUE.get().expect("scancode queue not initialized"); let queue = KBD_QUEUE.get().unwrap();
serial_println!("This is called once.");
WAKER.register(cx.waker());
// fast path
if let Some(scancode) = queue.pop() { if let Some(scancode) = queue.pop() {
return Poll::Ready(Some(scancode)); return Poll::Ready(Some(scancode));
} }
queue.pop().map_or_else( WAKER.register(&cx.waker());
|| {
print!("Returning"); match queue.pop() {
Poll::Pending Some(scancode) => {
},
|scancode| {
print!("Scancode found");
WAKER.take(); WAKER.take();
Poll::Ready(Some(scancode)) Poll::Ready(Some(scancode))
}, }
) None => Poll::Pending,
}
} }
} }
use futures_util::stream::StreamExt;
use pc_keyboard::{DecodedKey, HandleControl, Keyboard, ScancodeSet1, layouts};
pub async fn print_keypresses() { pub async fn print_keypresses() {
let mut scancodes = ScanCodeStream::new(); let mut scancodes = ScancodeStream::new();
let mut keyboard = Keyboard::new( let mut keyboard = Keyboard::new(
ScancodeSet1::new(), ScancodeSet1::new(),
layouts::Us104Key, layouts::Uk105Key,
HandleControl::Ignore, HandleControl::Ignore,
); );
serial_println!("Printing keypresses.");
while let Some(scancode) = scancodes.next().await { while let Some(scancode) = scancodes.next().await {
if let Ok(Some(key_event)) = keyboard.add_byte(scancode) { if let Ok(Some(key_event)) = keyboard.add_byte(scancode) {
if let Some(key) = keyboard.process_keyevent(key_event) { if let Some(key) = keyboard.process_keyevent(key_event) {
@@ -1,9 +1,9 @@
use linked_list_allocator::LockedHeap; use linked_list_allocator::LockedHeap;
use x86_64::{ use x86_64::{
structures::paging::{
mapper::MapToError, FrameAllocator, Mapper, Page, PageTableFlags, Size4KiB,
},
VirtAddr, VirtAddr,
structures::paging::{
FrameAllocator, Mapper, Page, PageTableFlags, Size4KiB, mapper::MapToError,
},
}; };
/// We are currently using a linked list heap allocator which uses our underlying page allocator. /// We are currently using a linked list heap allocator which uses our underlying page allocator.
+3 -1
View File
@@ -6,8 +6,10 @@
// ????? // ?????
// scheduling / tasks : async // scheduling / tasks : async
pub mod alloc; extern crate alloc;
pub mod io; pub mod io;
pub mod kalloc;
pub mod scheduling; pub mod scheduling;
/// Re-exports most of the IO macros. /// Re-exports most of the IO macros.
+39 -44
View File
@@ -1,12 +1,13 @@
extern crate alloc; use alloc::boxed::Box;
use alloc::{boxed::Box, collections::BTreeMap, sync::Arc, task::Wake}; use alloc::collections::{BTreeMap, VecDeque};
use core::{ use alloc::sync::Arc;
future::Future, use alloc::task::Wake;
pin::Pin, use core::sync::atomic::AtomicU64;
sync::atomic::{AtomicU64, Ordering}, use core::task::{Context, Poll};
task::{Context, Poll, Waker}, use core::task::{RawWaker, Waker};
}; use core::{future::Future, pin::Pin};
use crossbeam::queue::ArrayQueue; use crossbeam::queue::ArrayQueue;
use x86_64::instructions::interrupts::{self, enable_and_hlt};
pub struct Task { pub struct Task {
id: TaskId, id: TaskId,
@@ -14,8 +15,8 @@ pub struct Task {
} }
impl Task { impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Self { pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Self { Task {
id: TaskId::new(), id: TaskId::new(),
future: Box::pin(future), future: Box::pin(future),
} }
@@ -26,21 +27,25 @@ impl Task {
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TaskId(u64);
impl TaskId {
fn new() -> Self {
static NEXT: AtomicU64 = AtomicU64::new(0);
TaskId(NEXT.fetch_add(1, core::sync::atomic::Ordering::Relaxed))
}
}
pub struct Executor { pub struct Executor {
tasks: BTreeMap<TaskId, Task>, tasks: BTreeMap<TaskId, Task>,
task_queue: Arc<ArrayQueue<TaskId>>, task_queue: Arc<ArrayQueue<TaskId>>,
waker_cache: BTreeMap<TaskId, Waker>, waker_cache: BTreeMap<TaskId, Waker>,
} }
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}
impl Executor { impl Executor {
pub fn new() -> Self { pub fn new() -> Self {
Self { Executor {
tasks: BTreeMap::new(), tasks: BTreeMap::new(),
task_queue: Arc::new(ArrayQueue::new(100)), task_queue: Arc::new(ArrayQueue::new(100)),
waker_cache: BTreeMap::new(), waker_cache: BTreeMap::new(),
@@ -50,12 +55,13 @@ impl Executor {
pub fn spawn(&mut self, task: Task) { pub fn spawn(&mut self, task: Task) {
let task_id = task.id; let task_id = task.id;
if self.tasks.insert(task.id, task).is_some() { if self.tasks.insert(task.id, task).is_some() {
panic!("a task with this id has already been allocated"); panic!("task with same id already in tasks");
} }
self.task_queue.push(task_id).expect("task queue is full"); self.task_queue.push(task_id).expect("queue full");
} }
fn run_ready_tasks(&mut self) { fn run_ready_tasks(&mut self) {
// destructure `self` to avoid borrow checker errors
let Self { let Self {
tasks, tasks,
task_queue, task_queue,
@@ -65,14 +71,15 @@ impl Executor {
while let Some(task_id) = task_queue.pop() { while let Some(task_id) = task_queue.pop() {
let task = match tasks.get_mut(&task_id) { let task = match tasks.get_mut(&task_id) {
Some(task) => task, Some(task) => task,
None => continue, None => continue, // task no longer exists
}; };
let waker = waker_cache let waker = waker_cache
.entry(task_id) .entry(task_id)
.or_insert_with(|| TaskWaker::waker(task_id, task_queue.clone())); .or_insert_with(|| TaskWaker::new(task_id, task_queue.clone()));
let mut context = Context::from_waker(waker); let mut context = Context::from_waker(waker);
match task.poll(&mut context) { match task.poll(&mut context) {
Poll::Ready(()) => { Poll::Ready(()) => {
// task done -> remove it and its cached waker
tasks.remove(&task_id); tasks.remove(&task_id);
waker_cache.remove(&task_id); waker_cache.remove(&task_id);
} }
@@ -81,14 +88,14 @@ impl Executor {
} }
} }
pub fn try_run(&mut self) { pub fn run(&mut self) -> ! {
self.run_ready_tasks(); loop {
self.sleep_if_idle(); self.run_ready_tasks();
self.sleep_if_idle();
}
} }
fn sleep_if_idle(&self) { fn sleep_if_idle(&self) {
use x86_64::instructions::interrupts::{self, enable_and_hlt};
interrupts::disable(); interrupts::disable();
if self.task_queue.is_empty() { if self.task_queue.is_empty() {
enable_and_hlt(); enable_and_hlt();
@@ -104,18 +111,16 @@ struct TaskWaker {
} }
impl TaskWaker { impl TaskWaker {
fn waker(task_id: TaskId, task_queue: Arc<ArrayQueue<TaskId>>) -> Waker { fn wake_task(&self) {
Waker::from(Arc::new(Self { self.task_queue.push(self.task_id).expect("task_queue full");
}
fn new(task_id: TaskId, task_queue: Arc<ArrayQueue<TaskId>>) -> Waker {
Waker::from(Arc::new(TaskWaker {
task_id, task_id,
task_queue, task_queue,
})) }))
} }
fn wake_task(&self) {
self.task_queue
.push(self.task_id)
.expect("task queue is full!");
}
} }
impl Wake for TaskWaker { impl Wake for TaskWaker {
@@ -127,13 +132,3 @@ impl Wake for TaskWaker {
self.wake_task(); self.wake_task();
} }
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct TaskId(u64);
impl TaskId {
fn new() -> Self {
static NEXT: AtomicU64 = AtomicU64::new(0);
Self(NEXT.fetch_add(1, Ordering::Relaxed))
}
}