diff --git a/Cargo.toml b/Cargo.toml index 7d45637..dd675ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[target.armv7-unknown-linux-musleabihf.dependencies] +rppal = { version = "0.11.3", features = ["hal"] } + +[features] +rpi_gpio = ["rppal"] + [dependencies] async-std = "1.9" clap = "2.33" diff --git a/src/cfg_reader.rs b/src/cfg_reader.rs index 8ac4b12..c3118b1 100644 --- a/src/cfg_reader.rs +++ b/src/cfg_reader.rs @@ -1,3 +1,7 @@ +#[cfg(test)] +#[path = "./cfg_reader_test.rs"] +mod cfg_reader_test; + use serde_derive::Deserialize; use std::fs::File; use std::{error::Error, io::Read}; @@ -5,17 +9,20 @@ use std::{error::Error, io::Read}; extern crate toml; #[derive(Deserialize)] -pub struct Config { - channels: Vec, - // debounce_millis : Option, - // invert : Option, +pub struct S0Channel { + pub id: u8, + pub gpio: u8, } #[derive(Deserialize)] -pub struct S0Channel { - id: u8, - gpio: u8, - // debounce_millis: Option, +pub struct Config { + channel: Vec, +} + +impl Config { + pub fn channels(&self) -> &Vec { + &self.channel + } } // use std::any::type_name; @@ -46,7 +53,3 @@ fn parse_string(cfg_str: &String) -> Result> { Err(err) => Err(From::from(err)), } } - -#[cfg(test)] -#[path = "./cfg_reader_test.rs"] -mod cfg_reader_test; diff --git a/src/cfg_reader_test.rs b/src/cfg_reader_test.rs index 956a581..0851deb 100644 --- a/src/cfg_reader_test.rs +++ b/src/cfg_reader_test.rs @@ -31,16 +31,54 @@ fn parse_file_fails_if_file_is_greater_than_1mb() { #[test] #[named] -fn parse_file_succeeds() { +fn parse_file_succeeds_on_entry() { let cfg_file_name = create_cfg_file( function_name!(), r#" - [[channels]] + [[channel]] id = 1 gpio = 4 "#, ); - assert!(parse_file(&cfg_file_name).is_ok()); + let config = parse_file(&cfg_file_name); + assert!(config.is_ok()); + let config = config.unwrap(); + let channels = config.channels(); + assert_eq!(channels.len(), 1); + assert_eq!(channels[0].id, 1); + assert_eq!(channels[0].gpio, 4); + remove_cfg_file(&cfg_file_name); +} + +#[test] +#[named] +fn parse_file_succeeds_multiple_entries() { + let cfg_file_name = create_cfg_file( + function_name!(), + r#" + [[channel]] + id = 1 + gpio = 4 + [[channel]] + id = 3 + gpio = 1 + + [[channel]] + id = 2 + gpio = 9 + "#, + ); + let config = parse_file(&cfg_file_name); + assert!(config.is_ok()); + let config = config.unwrap(); + let channels = config.channels(); + assert_eq!(channels.len(), 3); + assert_eq!(channels[0].id, 1); + assert_eq!(channels[0].gpio, 4); + assert_eq!(channels[1].id, 3); + assert_eq!(channels[1].gpio, 1); + assert_eq!(channels[2].id, 2); + assert_eq!(channels[2].gpio, 9); remove_cfg_file(&cfg_file_name); } @@ -77,7 +115,7 @@ fn parse_string_fails_on_unknown_section() { fn parse_string_fails_on_section_format_error() { let cfg_str = String::from( r#" - [[channels] + [[channel] id = 1 gpio = 4 "#, @@ -86,7 +124,7 @@ fn parse_string_fails_on_section_format_error() { let cfg_str = String::from( r#" - [channels] + [channel] id = 1 gpio = 4 "#, @@ -95,7 +133,7 @@ fn parse_string_fails_on_section_format_error() { let cfg_str = String::from( r#" - [[channels]] + [[channel]] id = gpio = 4 "#, @@ -104,7 +142,7 @@ fn parse_string_fails_on_section_format_error() { let cfg_str = String::from( r#" - [[channels]] + [[channel]] id = 1 gpio = 4 2 @@ -117,7 +155,7 @@ fn parse_string_fails_on_section_format_error() { fn parse_string_fails_on_missing_field_id() { let cfg_str = String::from( r#" - [[channels]] + [[channel]] gpio = 4 "#, ); @@ -128,7 +166,7 @@ fn parse_string_fails_on_missing_field_id() { fn parse_string_fails_on_missing_field_gpio() { let cfg_str = String::from( r#" - [[channels]] + [[channel]] id = 1 "#, ); @@ -139,7 +177,7 @@ fn parse_string_fails_on_missing_field_gpio() { fn parse_string_ignores_unknown_attributes() { let cfg_str = String::from( r#" - [[channels]] + [[channel]] unknown = 1 id = 1 gpio = 4 @@ -152,11 +190,11 @@ fn parse_string_ignores_unknown_attributes() { fn parse_string_succeeds() { let cfg = String::from( r#" - [[channels]] + [[channel]] id = 1 gpio = 4 - [[channels]] + [[channel]] id = 2 gpio = 17 "#, @@ -164,9 +202,9 @@ fn parse_string_succeeds() { let config = parse_string(&cfg); assert!(&config.is_ok()); let config = config.unwrap(); - assert_eq!(config.channels.len(), 2); - assert_eq!(config.channels[0].id, 1); - assert_eq!(config.channels[0].gpio, 4); - assert_eq!(config.channels[1].id, 2); - assert_eq!(config.channels[1].gpio, 17); + assert_eq!(config.channel.len(), 2); + assert_eq!(config.channel[0].id, 1); + assert_eq!(config.channel[0].gpio, 4); + assert_eq!(config.channel[1].id, 2); + assert_eq!(config.channel[1].gpio, 17); } diff --git a/src/input_pin_manager.rs b/src/input_pin_manager.rs index 2bb2981..f8a15c8 100644 --- a/src/input_pin_manager.rs +++ b/src/input_pin_manager.rs @@ -2,8 +2,8 @@ use std::sync::{mpsc, Arc, Mutex}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct PinConfiguration { - pub id: usize, - pub gpio: usize, + pub channel_id: usize, + pub gpio_id: usize, } #[derive(Debug)] pub struct PulseInfo { diff --git a/src/main.rs b/src/main.rs index 26f01f5..e61d38e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,18 @@ mod cfg_reader; mod pulse_counter; mod rest_api; +#[cfg(not(feature = "rppal"))] +mod input_pin_manager; +#[cfg(feature = "rppal")] +mod rpi_pin_manager; +// #[cfg(not(feature = "rppal"))] +use input_pin_manager::PinConfiguration; +#[cfg(feature = "rppal")] +use rpi_pin_manager::{input_pin_manager, RPiPinManager}; + use async_std; use async_std::task; +use std::sync::{Arc, Mutex}; const APP_VERSION: &'static str = env!("CARGO_PKG_VERSION"); @@ -14,6 +24,36 @@ const DEFAULT_CONFIG_FILE_NAME: &str = "/etc/s0_logger.cfg"; const DEFAULT_IP_ADDRESS: &str = "127.0.0.1"; const DEFAULT_IP_PORT: &str = "6310"; +struct PulseDataProvider { + pulse_counter: pulse_counter::PulseCounter, +} + +impl rest_api::DataProvider for PulseDataProvider { + fn get_channels(&self) -> Vec { + self.pulse_counter.get_channels() + } + + fn get_pulses_by_channel( + &mut self, + channel_id: usize, + ) -> Result, std::io::Error> { + match self.pulse_counter.get_pulses_by_channel(channel_id) { + Ok(pulses) => { + let mut pulse_list: Vec = vec![]; + for p in pulses { + pulse_list.push(rest_api::PulseInfo { + timestamp_ns: p.timestamp_ns, + pin_id: p.pin_id, + level: p.level, + }) + } + Ok(pulse_list) + } + Err(e) => Err(e), + } + } +} + fn main() { let cli_args = clap_app!(s0_meter => (version: APP_VERSION) @@ -36,33 +76,67 @@ fn main() { .unwrap_or(DEFAULT_CONFIG_FILE_NAME); println!("Read the config from file '{}'", config_file_name); - if cfg!(feature = "rppal") { + let mut pin_config: Vec = vec![]; + match cfg_reader::parse_file(config_file_name) { + Ok(config) => { + println!("GPIO-Config:"); + for channel in config.channels() { + println!("Channel {} @ GPIO {}", &channel.id, &channel.gpio); + &mut pin_config.push(PinConfiguration { + channel_id: channel.id as usize, + gpio_id: channel.gpio as usize, + }); + } + println!("---"); + } + Err(err_msg) => { + // TODO Log + panic!( + "Error while parsing config file {}: {}", + config_file_name, err_msg + ); + } + }; + + #[allow(unused_assignments, unused_mut)] + let mut pin_manager: Option< + Arc>>, + > = None; + #[cfg(feature = "rppal")] + { println!("Will access GPIO pins"); - } else if cfg!(feature = "hal") { - println!("Will access HAL"); - } else { + let input_pin_manager_box: Box = + Box::new(RPiPinManager::new()); + pin_manager = Some(Arc::new(Mutex::new(input_pin_manager_box))); + } + #[cfg(not(feature = "rppal"))] + { println!("Will NOT access GPIO pins"); } let rest_ip_addr = DEFAULT_IP_ADDRESS; let rest_ip_port = DEFAULT_IP_PORT; + let mut p_counter = pulse_counter::PulseCounter::new(pin_manager.clone().unwrap(), pin_config); + match p_counter.start() { + Ok(()) => {} + Err(e) => { + // TODO Log + println!("Could not start pulse_counter thread: {:?}", e); + } + } + let dp: Arc>> = + Arc::new(Mutex::new(Box::new(PulseDataProvider { + pulse_counter: p_counter, + }))); let rest_api_config = rest_api::RestApiConfig { ip_and_port: format!("{}:{}", &rest_ip_addr, &rest_ip_port), - get_channels: fake_get_channels, - get_pulses_by_channel: fake_get_pulses_by_channel, + data_provider: dp.clone(), }; let _ = task::block_on(rest_api::start(&rest_api_config)); -} -fn fake_get_channels() -> Vec { - vec![] -} - -fn fake_get_pulses_by_channel(_: usize) -> Result, std::io::Error> { - Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "Not implemented", - )) + if pin_manager.is_some() { + &pin_manager.unwrap().lock().unwrap().close_inputs(); + } } diff --git a/src/pulse_counter.rs b/src/pulse_counter.rs index 960a9f8..08c274f 100644 --- a/src/pulse_counter.rs +++ b/src/pulse_counter.rs @@ -1,45 +1,57 @@ -// #![allow(dead_code)] - #[cfg(test)] #[path = "./pulse_counter_test.rs"] mod pulse_counter_test; #[path = "./input_pin_manager.rs"] mod input_pin_manager; -use input_pin_manager::{InputPinManager, PinConfiguration, PulseInfo}; use std::collections::{hash_map::HashMap, VecDeque}; use std::sync::{mpsc, Arc, Mutex}; use std::thread; - -struct PulseCounter { - pin_mgr: Box, - pulses: Arc>>>, +pub struct PulseCounter { + pin_mgr: Arc>>, + pulses: Arc>>>, } #[derive(Debug)] -struct PulseCounterError { +pub struct PulseCounterError { msg: String, } impl PulseCounter { - fn new(pin_mgr: Box, input_pins: Vec) -> Self { - let channel_lists_arc: Arc>>> = - Arc::new(Mutex::new(HashMap::with_capacity(input_pins.len()))); + pub fn new( + pin_mgr: Arc>>, + input_pins: Vec, + ) -> Self { + let channel_lists_arc: Arc< + Mutex>>, + > = Arc::new(Mutex::new(HashMap::with_capacity(input_pins.len()))); let mut channel_lists = channel_lists_arc.lock().unwrap(); for pin in &input_pins { - channel_lists.insert(pin.id, VecDeque::::new()); + channel_lists.insert( + pin.channel_id, + VecDeque::::new(), + ); } - let mut myself = Self { + let myself = Self { pin_mgr: pin_mgr, pulses: channel_lists_arc.clone(), }; - myself.pin_mgr.set_input_config(input_pins); + myself.pin_mgr.lock().unwrap().set_input_config(input_pins); myself } - fn start(&mut self) -> Result<(), PulseCounterError> { - let msg_channel_recv_arc = Arc::clone(&self.pin_mgr.get_channel_recv()); + pub fn get_channels(&self) -> Vec { + let mut channels = vec![]; + let pin_mgr = self.pin_mgr.lock().unwrap(); + for p in pin_mgr.get_pins() { + channels.push(p.channel_id); + } + channels + } + + pub fn start(&mut self) -> Result<(), PulseCounterError> { + let msg_channel_recv_arc = Arc::clone(&self.pin_mgr.lock().unwrap().get_channel_recv()); let (err_tx, err_rx) = mpsc::channel::>(); let pulse_list = self.pulses.clone(); //TODO Give this thread a name @@ -50,13 +62,13 @@ impl PulseCounter { { err_tx.send(Ok(())).unwrap(); } - // let pulses = pulse_list.deref(); loop { match msg_channel_recv.recv() { Ok(pulse_info) => { Self::process_pulse_info(&pulse_list, pulse_info); } Err(_) => { + // TODO Log break; } } @@ -73,41 +85,52 @@ impl PulseCounter { err_rx.recv().unwrap() } - fn stop(&mut self) { - self.pin_mgr.close_inputs(); + #[allow(dead_code)] + pub fn stop(&mut self) { + self.pin_mgr.lock().unwrap().close_inputs(); } fn process_pulse_info( - pulses: &Arc>>>, - pulse: PulseInfo, + pulses: &Arc>>>, + pulse: crate::input_pin_manager::PulseInfo, ) { //TODO Do not exceed a maximum length of the list - match pulses.lock().unwrap().get_mut(&pulse.pin_id) { + let mut channel_lists = pulses.lock().unwrap(); + match channel_lists.get_mut(&pulse.pin_id) { Some(pulse_list) => { - // println!("Pushing pulse {:?} to list", pulse); pulse_list.push_back(pulse); } - None => println!("Could not push pulse {:?} to list", pulse), + None => { + // TODO Log + println!("Could not push pulse {:?} to list", pulse) + } }; } - fn get_pulses_count_by_channel(&self, channel: usize) -> usize { + #[allow(dead_code)] + pub fn get_pulses_count_by_channel(&self, channel: usize) -> usize { match self.pulses.lock().unwrap().get(&channel) { Some(channel_pulses) => channel_pulses.len(), None => 0, } } - fn get_pulses_by_channel(&mut self, channel: usize) -> Vec { - let mut result: Vec = Vec::new(); + pub fn get_pulses_by_channel( + &mut self, + channel: usize, + ) -> Result, std::io::Error> { match self.pulses.lock().unwrap().get_mut(&channel) { Some(pulse_list) => { + let mut result: Vec = Vec::new(); while !pulse_list.is_empty() { result.push(pulse_list.pop_front().unwrap()); } + Ok(result) } - None => println!("Cannot get pulses of channel {}", channel), + None => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Cannot get pulses of channel {}", channel), + )), } - result } } diff --git a/src/pulse_counter_test.rs b/src/pulse_counter_test.rs index 0beb296..ff3ef08 100644 --- a/src/pulse_counter_test.rs +++ b/src/pulse_counter_test.rs @@ -1,20 +1,20 @@ use super::*; struct TestPinManager { - input_pins: Vec, - pulse_recv: Arc>>, + input_pins: Vec, + pulse_recv: Arc>>, } -impl InputPinManager for TestPinManager { - fn set_input_config(&mut self, input_pins: Vec) { +impl crate::input_pin_manager::InputPinManager for TestPinManager { + fn set_input_config(&mut self, input_pins: Vec) { *&mut self.input_pins = input_pins; } - fn get_channel_recv(&self) -> &Arc>> { + fn get_channel_recv(&self) -> &Arc>> { &self.pulse_recv } - fn get_pins(&self) -> &Vec { + fn get_pins(&self) -> &Vec { &self.input_pins } @@ -22,8 +22,8 @@ impl InputPinManager for TestPinManager { } impl TestPinManager { - fn new(cmd_rx: mpsc::Receiver) -> TestPinManager { - let (pulse_tx, pulse_rx) = mpsc::channel::(); + fn new(cmd_rx: mpsc::Receiver) -> TestPinManager { + let (pulse_tx, pulse_rx) = mpsc::channel::(); thread::spawn(move || loop { match cmd_rx.recv() { Ok(cmd) => { @@ -46,17 +46,26 @@ impl TestPinManager { #[test] fn new_creates_instance() { let (_, cmd_rx) = mpsc::channel(); - let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + let test_pin_manager_box: Arc< + Mutex>, + > = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx)))); - let mut input_pins = Vec::new(); - input_pins.push(PinConfiguration { id: 1, gpio: 11 }); - input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + let mut input_pins: Vec = Vec::new(); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 1, + gpio_id: 11, + }); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 3, + gpio_id: 33, + }); let input_pins_copy = &input_pins.clone(); - let testee = PulseCounter::new(test_pin_manager_box, input_pins); + let testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins); - let testee_pins = &testee.pin_mgr.get_pins(); + let testee_pin_manager = testee.pin_mgr.lock().unwrap(); + let testee_pins = testee_pin_manager.get_pins(); assert_eq!(testee_pins.len(), input_pins_copy.len()); assert_eq!(testee_pins[0], input_pins_copy[0]); assert_eq!(testee_pins[1], input_pins_copy[1]); @@ -65,13 +74,21 @@ fn new_creates_instance() { #[test] fn start_and_stop_thread() { let (_, cmd_rx) = mpsc::channel(); - let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + let test_pin_manager_box: Arc< + Mutex>, + > = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx)))); - let mut input_pins = Vec::new(); - input_pins.push(PinConfiguration { id: 1, gpio: 11 }); - input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + let mut input_pins: Vec = Vec::new(); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 1, + gpio_id: 11, + }); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 3, + gpio_id: 33, + }); - let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins); assert!(testee.start().is_ok()); std::thread::sleep(std::time::Duration::from_millis(10)); testee.stop(); @@ -82,13 +99,21 @@ fn start_thread_twice() { // ATTENTION: Do NOT use _ as name of the tx channel because the channel would be closed // immediately and the thread would exit before it will be started the second time! let (_usused_but_required_tx, cmd_rx) = mpsc::channel(); - let test_pin_manager_box = Box::new(TestPinManager::new(cmd_rx)); + let test_pin_manager_box: Arc< + Mutex>, + > = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx)))); - let mut input_pins = Vec::new(); - input_pins.push(PinConfiguration { id: 1, gpio: 11 }); - input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + let mut input_pins: Vec = Vec::new(); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 1, + gpio_id: 11, + }); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 3, + gpio_id: 33, + }); - let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins); assert!(testee.start().is_ok()); std::thread::sleep(std::time::Duration::from_millis(10)); let result = testee.start(); @@ -100,20 +125,28 @@ fn start_thread_twice() { fn start_thread_and_send_pulses() { let millis_to_wait = 1u64; let (cmd_tx, cmd_rx) = mpsc::channel(); - let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + let test_pin_manager_box: Arc< + Mutex>, + > = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx)))); - let mut input_pins = Vec::new(); - input_pins.push(PinConfiguration { id: 1, gpio: 11 }); - input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + let mut input_pins: Vec = Vec::new(); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 1, + gpio_id: 11, + }); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 3, + gpio_id: 33, + }); - let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins); assert!(testee.start().is_ok()); std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); assert_eq!(testee.get_pulses_count_by_channel(1), 0); assert_eq!(testee.get_pulses_count_by_channel(3), 0); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1234u64, pin_id: 1, level: true, @@ -123,7 +156,7 @@ fn start_thread_and_send_pulses() { assert_eq!(testee.get_pulses_count_by_channel(1), 1); assert_eq!(testee.get_pulses_count_by_channel(3), 0); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1235u64, pin_id: 1, level: false, @@ -133,7 +166,7 @@ fn start_thread_and_send_pulses() { assert_eq!(testee.get_pulses_count_by_channel(1), 2); assert_eq!(testee.get_pulses_count_by_channel(3), 0); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1280u64, pin_id: 3, level: true, @@ -143,7 +176,7 @@ fn start_thread_and_send_pulses() { assert_eq!(testee.get_pulses_count_by_channel(1), 2); assert_eq!(testee.get_pulses_count_by_channel(3), 1); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1288u64, pin_id: 3, level: false, @@ -161,32 +194,40 @@ fn start_thread_and_send_pulses() { fn retrieve_pulses_by_channel() { let millis_to_wait = 1u64; let (cmd_tx, cmd_rx) = mpsc::channel(); - let test_pin_manager_box: Box = Box::new(TestPinManager::new(cmd_rx)); + let test_pin_manager_box: Arc< + Mutex>, + > = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx)))); - let mut input_pins = Vec::new(); - input_pins.push(PinConfiguration { id: 1, gpio: 11 }); - input_pins.push(PinConfiguration { id: 3, gpio: 33 }); + let mut input_pins: Vec = Vec::new(); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 1, + gpio_id: 11, + }); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 3, + gpio_id: 33, + }); - let mut testee = PulseCounter::new(test_pin_manager_box, input_pins); + let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins); assert!(testee.start().is_ok()); std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1234u64, pin_id: 1, level: true, }); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1235u64, pin_id: 1, level: false, }); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1280u64, pin_id: 3, level: true, }); - &cmd_tx.send(PulseInfo { + &cmd_tx.send(crate::input_pin_manager::PulseInfo { timestamp_ns: 1288u64, pin_id: 3, level: false, @@ -196,7 +237,7 @@ fn retrieve_pulses_by_channel() { assert_eq!(testee.get_pulses_count_by_channel(1), 2); assert_eq!(testee.get_pulses_count_by_channel(3), 2); - let pulses_ch1 = testee.get_pulses_by_channel(1); + let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap(); assert_eq!(pulses_ch1.len(), 2); assert_eq!(pulses_ch1[0].timestamp_ns, 1234u64); assert_eq!(pulses_ch1[0].pin_id, 1); @@ -205,10 +246,10 @@ fn retrieve_pulses_by_channel() { assert_eq!(pulses_ch1[1].pin_id, 1); assert_eq!(pulses_ch1[1].level, false); - let pulses_ch1 = testee.get_pulses_by_channel(1); + let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap(); assert_eq!(pulses_ch1.len(), 0); - let pulses_ch1 = testee.get_pulses_by_channel(3); + let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap(); assert_eq!(pulses_ch1.len(), 2); assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64); assert_eq!(pulses_ch1[0].pin_id, 3); @@ -217,6 +258,6 @@ fn retrieve_pulses_by_channel() { assert_eq!(pulses_ch1[1].pin_id, 3); assert_eq!(pulses_ch1[1].level, false); - let pulses_ch1 = testee.get_pulses_by_channel(3); + let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap(); assert_eq!(pulses_ch1.len(), 0); } diff --git a/src/rest_api.rs b/src/rest_api.rs index ca0f23a..995a17a 100644 --- a/src/rest_api.rs +++ b/src/rest_api.rs @@ -1,29 +1,28 @@ -#[allow(dead_code)] #[cfg(test)] #[path = "./rest_api_test.rs"] mod rest_api_test; use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex}; use tide; use tide::prelude::json; -// struct PinConfiguration { -// channel: usize, -// gpio: usize, -// } +pub trait DataProvider { + fn get_channels(&self) -> Vec; + fn get_pulses_by_channel(&mut self, channel: usize) -> Result, std::io::Error>; +} #[derive(Serialize, Deserialize, Debug)] pub struct PulseInfo { - timestamp_ns: u64, - pin_id: usize, - level: bool, + pub timestamp_ns: u64, + pub pin_id: usize, + pub level: bool, } #[derive(Clone)] pub struct RestApiConfig { pub ip_and_port: String, - pub get_channels: fn() -> Vec, - pub get_pulses_by_channel: fn(channel: usize) -> Result, std::io::Error>, + pub data_provider: Arc>>, } pub async fn start<'a>(config: &'a RestApiConfig) -> std::io::Result<()> { @@ -57,7 +56,9 @@ async fn api_versions_get(_: tide::Request) -> tide::Result { } async fn v1_channels_get(req: tide::Request) -> tide::Result { - let channel_list = &(req.state().get_channels)(); + let data_provider = &(*req.state().data_provider).lock().unwrap(); + // let data_provider = data_provider_mut.lock().unwrap(); + let channel_list = data_provider.get_channels(); Ok(tide::Response::builder(200) .content_type(tide::http::mime::JSON) .body(json!({ "channels": channel_list })) @@ -68,16 +69,14 @@ async fn v1_channel_pulses_get(req: tide::Request) -> tide::Resul match req.param("channel_id") { Ok(channel_id) => match usize::from_str_radix(channel_id, 10) { Ok(channel_id) => { - let info = &req.state().ip_and_port; - let channel_pulses_res = &(req.state().get_pulses_by_channel)(channel_id); + let data_provider = &mut (*req.state().data_provider).lock().unwrap(); + let channel_pulses_res = data_provider.get_pulses_by_channel(channel_id); match channel_pulses_res { Ok(channel_pulses) => Ok(tide::Response::builder(200) .content_type(tide::http::mime::JSON) .body(json!({ "channel": channel_id, "pulses": channel_pulses, - "question": "channel", - "info": info, })) .build()), Err(_) => Ok(tide::Response::new(404)), diff --git a/src/rest_api_test.rs b/src/rest_api_test.rs index cb86f2e..914f8fd 100644 --- a/src/rest_api_test.rs +++ b/src/rest_api_test.rs @@ -14,12 +14,19 @@ use std::thread; static IP_AND_PORT: &str = "127.0.0.1:8123"; +struct TestDataProvider {} + lazy_static! { + // static ref TEST_DATA_PROVIDER: Arc>> = + // Arc::new(Mutex::new(Box::new(TestDataProvider{}))); static ref TESTEE_THREAD_HANDLE: Arc>>> = { let config = RestApiConfig { ip_and_port: String::from(IP_AND_PORT), - get_channels, - get_pulses_by_channel, + data_provider: Arc::new(Mutex::new(Box::new(TestDataProvider{}))), + // data_provider: TEST_DATA_PROVIDER, + // get_channels: Box::new(|| get_channels()) as Box Vec + Send>, + // get_pulses_by_channel, + // get_pulses_by_channel: Box::new(|channel| get_pulses_by_channel(channel)) as Box Result, std::io::Error> + Send>, }; let hdl = Arc::new(Mutex::new(Some(thread::spawn(move || { task::block_on(start(&config.clone())).unwrap(); @@ -32,18 +39,23 @@ lazy_static! { static ref CHANNEL_PULSES: Mutex>> = Mutex::new(HashMap::new()); } -fn get_channels() -> Vec { - CHANNEL_LIST.lock().unwrap().to_vec() -} +impl DataProvider for TestDataProvider { + fn get_channels(&self) -> Vec { + CHANNEL_LIST.lock().unwrap().to_vec() + } -fn get_pulses_by_channel(channel_id: usize) -> Result, std::io::Error> { - let mut pulse_channel_map = CHANNEL_PULSES.lock().unwrap(); - match pulse_channel_map.get_mut(&channel_id) { - Some(pulse_list) => Ok(pulse_list.drain(0..).collect()), - None => Err(Error::new( - ErrorKind::NotFound, - format!("Channel {} does not exist", &channel_id), - )), + fn get_pulses_by_channel( + &mut self, + channel_id: usize, + ) -> Result, std::io::Error> { + let mut pulse_channel_map = CHANNEL_PULSES.lock().unwrap(); + match pulse_channel_map.get_mut(&channel_id) { + Some(pulse_list) => Ok(pulse_list.drain(0..).collect()), + None => Err(Error::new( + ErrorKind::NotFound, + format!("Channel {} does not exist", &channel_id), + )), + } } } diff --git a/src/rpi_pin_manager.rs b/src/rpi_pin_manager.rs new file mode 100644 index 0000000..94c5200 --- /dev/null +++ b/src/rpi_pin_manager.rs @@ -0,0 +1,161 @@ +#[path = "./input_pin_manager.rs"] +pub mod input_pin_manager; + +pub use self::input_pin_manager::InputPinManager; +pub use input_pin_manager::PinConfiguration; +pub use input_pin_manager::PulseInfo; + +use rppal::gpio; +use std::sync::{mpsc, Arc, Mutex}; +use std::{thread, time}; + +struct IsrInfo { + timestamp_ns: u64, + channel: usize, + level: gpio::Level, +} + +pub struct RPiPinManager { + gpio: gpio::Gpio, + pin_configuration: Vec, + gpio_handles: Vec, + pulse_recv: Arc>>, + ch_isr_send: Arc>>, + start_time: time::Instant, +} + +impl InputPinManager for RPiPinManager { + fn set_input_config(&mut self, input_pins: Vec) { + if self.pin_configuration.len() > 0 { + // TODO Log + println!("RPiPinManager::set_input_config() already set"); + } else { + println!("set_input_config:"); + let isr_start = self.start_time; + + for p in input_pins { + println!(" channel: {} > gpio: {}", &p.channel_id, &p.gpio_id); + let mut pin = self.gpio.get(p.gpio_id as u8).unwrap().into_input_pullup(); + + let ch_isr_send = self.ch_isr_send.clone(); + let pin_id = p.channel_id.clone(); + + let interrupt_service_routine = move |level: gpio::Level| { + let isr_info = IsrInfo { + timestamp_ns: isr_start.elapsed().as_nanos() as u64, + channel: pin_id, + level: level, + }; + match ch_isr_send.lock().unwrap().send(isr_info) { + Ok(_) => {} + Err(e) => { + // TODO Log + println!("Error while sending: {}", e) + } + }; + }; + pin.set_async_interrupt(gpio::Trigger::Both, interrupt_service_routine.clone()) + .unwrap(); + self.pin_configuration.push(p); + self.gpio_handles.push(pin); + } + } + } + + fn get_channel_recv(&self) -> &Arc>> { + &self.pulse_recv + } + + fn get_pins(&self) -> &Vec { + &self.pin_configuration + } + + fn close_inputs(&mut self) {} +} + +impl RPiPinManager { + pub fn new() -> Self { + println!("RPiPinManager::new()"); + let gpio = match gpio::Gpio::new() { + Ok(gpio) => gpio, + Err(e) => { + // TODO Log + println!("ERROR: Cannot access GPIO: {}", e); + panic!("Error"); + } + }; + let (pulse_tx, pulse_rx) = mpsc::channel::(); + + let (hdl_isr_send, hdl_isr_recv) = mpsc::channel::(); + + thread::spawn(move || { + let mut last_state = gpio::Level::High; + let mut timeout = time::Duration::from_secs(u64::MAX); + loop { + if Self::wait_for_isr_event_and_process_it( + &hdl_isr_recv, + &pulse_tx, + &mut last_state, + &mut timeout, + ) { + break; + } + } + }); + + let start_time = time::Instant::now(); + Self { + gpio, + pin_configuration: vec![], + gpio_handles: vec![], + pulse_recv: Arc::new(Mutex::new(pulse_rx)), + ch_isr_send: Arc::new(Mutex::new(hdl_isr_send)), + start_time, + } + } + + fn wait_for_isr_event_and_process_it( + receiver: &mpsc::Receiver, + client_channel: &mpsc::Sender, + last_state: &mut gpio::Level, + timeout: &mut time::Duration, + ) -> bool { + let mut shall_stop = false; + match receiver.recv_timeout(*timeout) { + Ok(isr_info) => { + let level = match isr_info.level { + gpio::Level::High => true, + _ => false, + }; + let pulse_info = PulseInfo { + timestamp_ns: isr_info.timestamp_ns, + pin_id: isr_info.channel, + level, + }; + match client_channel.send(pulse_info) { + Ok(_) => { + // Sending succeeded + } + Err(e) => { + // Silently ignore the sending error because + // there may be no event receive at the other end of this channel + println!("ISR: Send error {}", e); + } + }; + } + Err(e) => { + match e { + mpsc::RecvTimeoutError::Timeout => { + *timeout = time::Duration::from_secs(u64::MAX); + println!("------> {}", last_state); + } + _ => { + println!("Last sender stopped: {}", e); + shall_stop = true; + } + }; + } + }; + shall_stop + } +}