diff --git a/src/input_pin_manager.rs b/src/input_pin_manager.rs new file mode 100644 index 0000000..2bb2981 --- /dev/null +++ b/src/input_pin_manager.rs @@ -0,0 +1,20 @@ +use std::sync::{mpsc, Arc, Mutex}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct PinConfiguration { + pub id: usize, + pub gpio: usize, +} +#[derive(Debug)] +pub struct PulseInfo { + pub timestamp_ns: u64, + pub pin_id: usize, + pub level: bool, +} + +pub trait InputPinManager { + fn set_input_config(&mut self, input_pins: Vec); + fn get_channel_recv(&self) -> &Arc>>; + fn get_pins(&self) -> &Vec; + fn close_inputs(&mut self); +} diff --git a/src/main.rs b/src/main.rs index 9ad9353..6002246 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate clap; mod cfg_reader; +mod pulse_counter; const APP_VERSION: &'static str = env!("CARGO_PKG_VERSION"); const DEFAULT_CONFIG_FILE_NAME: &str = "/etc/s0_logger.cfg"; diff --git a/src/pulse_counter.rs b/src/pulse_counter.rs new file mode 100644 index 0000000..960a9f8 --- /dev/null +++ b/src/pulse_counter.rs @@ -0,0 +1,113 @@ +// #![allow(dead_code)] + +#[cfg(test)] +#[path = "./pulse_counter_test.rs"] +mod pulse_counter_test; + +#[path = "./input_pin_manager.rs"] +mod input_pin_manager; +use input_pin_manager::{InputPinManager, PinConfiguration, PulseInfo}; + +use std::collections::{hash_map::HashMap, VecDeque}; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; + +struct PulseCounter { + pin_mgr: Box, + pulses: Arc>>>, +} + +#[derive(Debug)] +struct PulseCounterError { + msg: String, +} + +impl PulseCounter { + fn new(pin_mgr: Box, input_pins: Vec) -> Self { + let channel_lists_arc: Arc>>> = + Arc::new(Mutex::new(HashMap::with_capacity(input_pins.len()))); + let mut channel_lists = channel_lists_arc.lock().unwrap(); + for pin in &input_pins { + channel_lists.insert(pin.id, VecDeque::::new()); + } + let mut myself = Self { + pin_mgr: pin_mgr, + pulses: channel_lists_arc.clone(), + }; + myself.pin_mgr.set_input_config(input_pins); + myself + } + + fn start(&mut self) -> Result<(), PulseCounterError> { + let msg_channel_recv_arc = Arc::clone(&self.pin_mgr.get_channel_recv()); + let (err_tx, err_rx) = mpsc::channel::>(); + let pulse_list = self.pulses.clone(); + //TODO Give this thread a name + thread::spawn(move || { + let msg_channel_recv_lock = msg_channel_recv_arc.try_lock(); + match msg_channel_recv_lock { + Ok(msg_channel_recv) => { + { + err_tx.send(Ok(())).unwrap(); + } + // let pulses = pulse_list.deref(); + loop { + match msg_channel_recv.recv() { + Ok(pulse_info) => { + Self::process_pulse_info(&pulse_list, pulse_info); + } + Err(_) => { + break; + } + } + } + } + Err(_) => { + let result: Result<(), PulseCounterError> = Err(PulseCounterError { + msg: String::from("Thread is already running"), + }); + err_tx.send(result).unwrap(); + } + } + }); + err_rx.recv().unwrap() + } + + fn stop(&mut self) { + self.pin_mgr.close_inputs(); + } + + fn process_pulse_info( + pulses: &Arc>>>, + pulse: PulseInfo, + ) { + //TODO Do not exceed a maximum length of the list + match pulses.lock().unwrap().get_mut(&pulse.pin_id) { + Some(pulse_list) => { + // println!("Pushing pulse {:?} to list", pulse); + pulse_list.push_back(pulse); + } + None => println!("Could not push pulse {:?} to list", pulse), + }; + } + + fn get_pulses_count_by_channel(&self, channel: usize) -> usize { + match self.pulses.lock().unwrap().get(&channel) { + Some(channel_pulses) => channel_pulses.len(), + None => 0, + } + } + + fn get_pulses_by_channel(&mut self, channel: usize) -> Vec { + let mut result: Vec = Vec::new(); + match self.pulses.lock().unwrap().get_mut(&channel) { + Some(pulse_list) => { + while !pulse_list.is_empty() { + result.push(pulse_list.pop_front().unwrap()); + } + } + None => println!("Cannot get pulses of channel {}", channel), + } + result + } +} diff --git a/src/pulse_counter_test.rs b/src/pulse_counter_test.rs new file mode 100644 index 0000000..0beb296 --- /dev/null +++ b/src/pulse_counter_test.rs @@ -0,0 +1,222 @@ +use super::*; + +struct TestPinManager { + input_pins: Vec, + pulse_recv: Arc>>, +} + +impl InputPinManager for TestPinManager { + fn set_input_config(&mut self, input_pins: Vec) { + *&mut self.input_pins = input_pins; + } + + fn get_channel_recv(&self) -> &Arc>> { + &self.pulse_recv + } + + fn get_pins(&self) -> &Vec { + &self.input_pins + } + + fn close_inputs(&mut self) {} +} + +impl TestPinManager { + fn new(cmd_rx: mpsc::Receiver) -> TestPinManager { + let (pulse_tx, pulse_rx) = mpsc::channel::(); + thread::spawn(move || loop { + match cmd_rx.recv() { + Ok(cmd) => { + if cmd.timestamp_ns == 0u64 { + break; + } else { + &pulse_tx.send(cmd); + } + } + Err(_) => break, + } + }); + Self { + input_pins: vec![], + pulse_recv: Arc::new(Mutex::new(pulse_rx)), + } + } +} + +#[test] +fn new_creates_instance() { + let (_, cmd_rx) = mpsc::channel(); + let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + + let mut input_pins = Vec::new(); + input_pins.push(PinConfiguration { id: 1, gpio: 11 }); + input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + + let input_pins_copy = &input_pins.clone(); + + let testee = PulseCounter::new(test_pin_manager_box, input_pins); + + let testee_pins = &testee.pin_mgr.get_pins(); + assert_eq!(testee_pins.len(), input_pins_copy.len()); + assert_eq!(testee_pins[0], input_pins_copy[0]); + assert_eq!(testee_pins[1], input_pins_copy[1]); +} + +#[test] +fn start_and_stop_thread() { + let (_, cmd_rx) = mpsc::channel(); + let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + + let mut input_pins = Vec::new(); + input_pins.push(PinConfiguration { id: 1, gpio: 11 }); + input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + + let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + assert!(testee.start().is_ok()); + std::thread::sleep(std::time::Duration::from_millis(10)); + testee.stop(); +} + +#[test] +fn start_thread_twice() { + // ATTENTION: Do NOT use _ as name of the tx channel because the channel would be closed + // immediately and the thread would exit before it will be started the second time! + let (_usused_but_required_tx, cmd_rx) = mpsc::channel(); + let test_pin_manager_box = Box::new(TestPinManager::new(cmd_rx)); + + let mut input_pins = Vec::new(); + input_pins.push(PinConfiguration { id: 1, gpio: 11 }); + input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + + let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + assert!(testee.start().is_ok()); + std::thread::sleep(std::time::Duration::from_millis(10)); + let result = testee.start(); + assert!(&result.is_err()); + std::thread::sleep(std::time::Duration::from_millis(10)); +} + +#[test] +fn start_thread_and_send_pulses() { + let millis_to_wait = 1u64; + let (cmd_tx, cmd_rx) = mpsc::channel(); + let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + + let mut input_pins = Vec::new(); + input_pins.push(PinConfiguration { id: 1, gpio: 11 }); + input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + + let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + assert!(testee.start().is_ok()); + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + + assert_eq!(testee.get_pulses_count_by_channel(1), 0); + assert_eq!(testee.get_pulses_count_by_channel(3), 0); + + &cmd_tx.send(PulseInfo { + timestamp_ns: 1234u64, + pin_id: 1, + level: true, + }); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + assert_eq!(testee.get_pulses_count_by_channel(1), 1); + assert_eq!(testee.get_pulses_count_by_channel(3), 0); + + &cmd_tx.send(PulseInfo { + timestamp_ns: 1235u64, + pin_id: 1, + level: false, + }); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + assert_eq!(testee.get_pulses_count_by_channel(1), 2); + assert_eq!(testee.get_pulses_count_by_channel(3), 0); + + &cmd_tx.send(PulseInfo { + timestamp_ns: 1280u64, + pin_id: 3, + level: true, + }); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + assert_eq!(testee.get_pulses_count_by_channel(1), 2); + assert_eq!(testee.get_pulses_count_by_channel(3), 1); + + &cmd_tx.send(PulseInfo { + timestamp_ns: 1288u64, + pin_id: 3, + level: false, + }); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + assert_eq!(testee.get_pulses_count_by_channel(1), 2); + assert_eq!(testee.get_pulses_count_by_channel(3), 2); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + testee.stop(); +} + +#[test] +fn retrieve_pulses_by_channel() { + let millis_to_wait = 1u64; + let (cmd_tx, cmd_rx) = mpsc::channel(); + let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + + let mut input_pins = Vec::new(); + input_pins.push(PinConfiguration { id: 1, gpio: 11 }); + input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + + let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + assert!(testee.start().is_ok()); + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + + &cmd_tx.send(PulseInfo { + timestamp_ns: 1234u64, + pin_id: 1, + level: true, + }); + &cmd_tx.send(PulseInfo { + timestamp_ns: 1235u64, + pin_id: 1, + level: false, + }); + &cmd_tx.send(PulseInfo { + timestamp_ns: 1280u64, + pin_id: 3, + level: true, + }); + &cmd_tx.send(PulseInfo { + timestamp_ns: 1288u64, + pin_id: 3, + level: false, + }); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + assert_eq!(testee.get_pulses_count_by_channel(1), 2); + assert_eq!(testee.get_pulses_count_by_channel(3), 2); + + let pulses_ch1 = testee.get_pulses_by_channel(1); + assert_eq!(pulses_ch1.len(), 2); + assert_eq!(pulses_ch1[0].timestamp_ns, 1234u64); + assert_eq!(pulses_ch1[0].pin_id, 1); + assert_eq!(pulses_ch1[0].level, true); + assert_eq!(pulses_ch1[1].timestamp_ns, 1235u64); + assert_eq!(pulses_ch1[1].pin_id, 1); + assert_eq!(pulses_ch1[1].level, false); + + let pulses_ch1 = testee.get_pulses_by_channel(1); + assert_eq!(pulses_ch1.len(), 0); + + let pulses_ch1 = testee.get_pulses_by_channel(3); + assert_eq!(pulses_ch1.len(), 2); + assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64); + assert_eq!(pulses_ch1[0].pin_id, 3); + assert_eq!(pulses_ch1[0].level, true); + assert_eq!(pulses_ch1[1].timestamp_ns, 1288u64); + assert_eq!(pulses_ch1[1].pin_id, 3); + assert_eq!(pulses_ch1[1].level, false); + + let pulses_ch1 = testee.get_pulses_by_channel(3); + assert_eq!(pulses_ch1.len(), 0); +}