Implement pulse_counter

This commit is contained in:
Harald Kube 2021-02-25 21:59:09 +01:00
parent 1070815482
commit 2ab7fcbe08
4 changed files with 356 additions and 0 deletions

20
src/input_pin_manager.rs Normal file
View file

@ -0,0 +1,20 @@
use std::sync::{mpsc, Arc, Mutex};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PinConfiguration {
pub id: usize,
pub gpio: usize,
}
#[derive(Debug)]
pub struct PulseInfo {
pub timestamp_ns: u64,
pub pin_id: usize,
pub level: bool,
}
pub trait InputPinManager {
fn set_input_config(&mut self, input_pins: Vec<PinConfiguration>);
fn get_channel_recv(&self) -> &Arc<Mutex<mpsc::Receiver<PulseInfo>>>;
fn get_pins(&self) -> &Vec<PinConfiguration>;
fn close_inputs(&mut self);
}

View file

@ -2,6 +2,7 @@
extern crate clap;
mod cfg_reader;
mod pulse_counter;
const APP_VERSION: &'static str = env!("CARGO_PKG_VERSION");
const DEFAULT_CONFIG_FILE_NAME: &str = "/etc/s0_logger.cfg";

113
src/pulse_counter.rs Normal file
View file

@ -0,0 +1,113 @@
// #![allow(dead_code)]
#[cfg(test)]
#[path = "./pulse_counter_test.rs"]
mod pulse_counter_test;
#[path = "./input_pin_manager.rs"]
mod input_pin_manager;
use input_pin_manager::{InputPinManager, PinConfiguration, PulseInfo};
use std::collections::{hash_map::HashMap, VecDeque};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
struct PulseCounter {
pin_mgr: Box<dyn InputPinManager>,
pulses: Arc<Mutex<HashMap<usize, VecDeque<PulseInfo>>>>,
}
#[derive(Debug)]
struct PulseCounterError {
msg: String,
}
impl PulseCounter {
fn new(pin_mgr: Box<dyn InputPinManager>, input_pins: Vec<PinConfiguration>) -> Self {
let channel_lists_arc: Arc<Mutex<HashMap<usize, VecDeque<PulseInfo>>>> =
Arc::new(Mutex::new(HashMap::with_capacity(input_pins.len())));
let mut channel_lists = channel_lists_arc.lock().unwrap();
for pin in &input_pins {
channel_lists.insert(pin.id, VecDeque::<PulseInfo>::new());
}
let mut myself = Self {
pin_mgr: pin_mgr,
pulses: channel_lists_arc.clone(),
};
myself.pin_mgr.set_input_config(input_pins);
myself
}
fn start(&mut self) -> Result<(), PulseCounterError> {
let msg_channel_recv_arc = Arc::clone(&self.pin_mgr.get_channel_recv());
let (err_tx, err_rx) = mpsc::channel::<Result<(), PulseCounterError>>();
let pulse_list = self.pulses.clone();
//TODO Give this thread a name
thread::spawn(move || {
let msg_channel_recv_lock = msg_channel_recv_arc.try_lock();
match msg_channel_recv_lock {
Ok(msg_channel_recv) => {
{
err_tx.send(Ok(())).unwrap();
}
// let pulses = pulse_list.deref();
loop {
match msg_channel_recv.recv() {
Ok(pulse_info) => {
Self::process_pulse_info(&pulse_list, pulse_info);
}
Err(_) => {
break;
}
}
}
}
Err(_) => {
let result: Result<(), PulseCounterError> = Err(PulseCounterError {
msg: String::from("Thread is already running"),
});
err_tx.send(result).unwrap();
}
}
});
err_rx.recv().unwrap()
}
fn stop(&mut self) {
self.pin_mgr.close_inputs();
}
fn process_pulse_info(
pulses: &Arc<Mutex<HashMap<usize, VecDeque<PulseInfo>>>>,
pulse: PulseInfo,
) {
//TODO Do not exceed a maximum length of the list
match pulses.lock().unwrap().get_mut(&pulse.pin_id) {
Some(pulse_list) => {
// println!("Pushing pulse {:?} to list", pulse);
pulse_list.push_back(pulse);
}
None => println!("Could not push pulse {:?} to list", pulse),
};
}
fn get_pulses_count_by_channel(&self, channel: usize) -> usize {
match self.pulses.lock().unwrap().get(&channel) {
Some(channel_pulses) => channel_pulses.len(),
None => 0,
}
}
fn get_pulses_by_channel(&mut self, channel: usize) -> Vec<PulseInfo> {
let mut result: Vec<PulseInfo> = Vec::new();
match self.pulses.lock().unwrap().get_mut(&channel) {
Some(pulse_list) => {
while !pulse_list.is_empty() {
result.push(pulse_list.pop_front().unwrap());
}
}
None => println!("Cannot get pulses of channel {}", channel),
}
result
}
}

222
src/pulse_counter_test.rs Normal file
View file

@ -0,0 +1,222 @@
use super::*;
struct TestPinManager {
input_pins: Vec<PinConfiguration>,
pulse_recv: Arc<Mutex<mpsc::Receiver<PulseInfo>>>,
}
impl InputPinManager for TestPinManager {
fn set_input_config(&mut self, input_pins: Vec<PinConfiguration>) {
*&mut self.input_pins = input_pins;
}
fn get_channel_recv(&self) -> &Arc<Mutex<mpsc::Receiver<PulseInfo>>> {
&self.pulse_recv
}
fn get_pins(&self) -> &Vec<PinConfiguration> {
&self.input_pins
}
fn close_inputs(&mut self) {}
}
impl TestPinManager {
fn new(cmd_rx: mpsc::Receiver<PulseInfo>) -> TestPinManager {
let (pulse_tx, pulse_rx) = mpsc::channel::<PulseInfo>();
thread::spawn(move || loop {
match cmd_rx.recv() {
Ok(cmd) => {
if cmd.timestamp_ns == 0u64 {
break;
} else {
&pulse_tx.send(cmd);
}
}
Err(_) => break,
}
});
Self {
input_pins: vec![],
pulse_recv: Arc::new(Mutex::new(pulse_rx)),
}
}
}
#[test]
fn new_creates_instance() {
let (_, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let input_pins_copy = &input_pins.clone();
let testee = PulseCounter::new(test_pin_manager_box, input_pins);
let testee_pins = &testee.pin_mgr.get_pins();
assert_eq!(testee_pins.len(), input_pins_copy.len());
assert_eq!(testee_pins[0], input_pins_copy[0]);
assert_eq!(testee_pins[1], input_pins_copy[1]);
}
#[test]
fn start_and_stop_thread() {
let (_, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(10));
testee.stop();
}
#[test]
fn start_thread_twice() {
// ATTENTION: Do NOT use _ as name of the tx channel because the channel would be closed
// immediately and the thread would exit before it will be started the second time!
let (_usused_but_required_tx, cmd_rx) = mpsc::channel();
let test_pin_manager_box = Box::new(TestPinManager::new(cmd_rx));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(10));
let result = testee.start();
assert!(&result.is_err());
std::thread::sleep(std::time::Duration::from_millis(10));
}
#[test]
fn start_thread_and_send_pulses() {
let millis_to_wait = 1u64;
let (cmd_tx, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 0);
assert_eq!(testee.get_pulses_count_by_channel(3), 0);
&cmd_tx.send(PulseInfo {
timestamp_ns: 1234u64,
pin_id: 1,
level: true,
});
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 1);
assert_eq!(testee.get_pulses_count_by_channel(3), 0);
&cmd_tx.send(PulseInfo {
timestamp_ns: 1235u64,
pin_id: 1,
level: false,
});
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 0);
&cmd_tx.send(PulseInfo {
timestamp_ns: 1280u64,
pin_id: 3,
level: true,
});
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 1);
&cmd_tx.send(PulseInfo {
timestamp_ns: 1288u64,
pin_id: 3,
level: false,
});
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 2);
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
testee.stop();
}
#[test]
fn retrieve_pulses_by_channel() {
let millis_to_wait = 1u64;
let (cmd_tx, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
&cmd_tx.send(PulseInfo {
timestamp_ns: 1234u64,
pin_id: 1,
level: true,
});
&cmd_tx.send(PulseInfo {
timestamp_ns: 1235u64,
pin_id: 1,
level: false,
});
&cmd_tx.send(PulseInfo {
timestamp_ns: 1280u64,
pin_id: 3,
level: true,
});
&cmd_tx.send(PulseInfo {
timestamp_ns: 1288u64,
pin_id: 3,
level: false,
});
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 2);
let pulses_ch1 = testee.get_pulses_by_channel(1);
assert_eq!(pulses_ch1.len(), 2);
assert_eq!(pulses_ch1[0].timestamp_ns, 1234u64);
assert_eq!(pulses_ch1[0].pin_id, 1);
assert_eq!(pulses_ch1[0].level, true);
assert_eq!(pulses_ch1[1].timestamp_ns, 1235u64);
assert_eq!(pulses_ch1[1].pin_id, 1);
assert_eq!(pulses_ch1[1].level, false);
let pulses_ch1 = testee.get_pulses_by_channel(1);
assert_eq!(pulses_ch1.len(), 0);
let pulses_ch1 = testee.get_pulses_by_channel(3);
assert_eq!(pulses_ch1.len(), 2);
assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64);
assert_eq!(pulses_ch1[0].pin_id, 3);
assert_eq!(pulses_ch1[0].level, true);
assert_eq!(pulses_ch1[1].timestamp_ns, 1288u64);
assert_eq!(pulses_ch1[1].pin_id, 3);
assert_eq!(pulses_ch1[1].level, false);
let pulses_ch1 = testee.get_pulses_by_channel(3);
assert_eq!(pulses_ch1.len(), 0);
}