Add rpi_pin_manager and make the app working on RPi.

This commit is contained in:
Harald Kube 2021-02-28 22:22:15 +01:00
parent 38024dfc5d
commit ddd231c0eb
10 changed files with 507 additions and 150 deletions

View file

@ -6,6 +6,12 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[target.armv7-unknown-linux-musleabihf.dependencies]
rppal = { version = "0.11.3", features = ["hal"] }
[features]
rpi_gpio = ["rppal"]
[dependencies]
async-std = "1.9"
clap = "2.33"

View file

@ -1,3 +1,7 @@
#[cfg(test)]
#[path = "./cfg_reader_test.rs"]
mod cfg_reader_test;
use serde_derive::Deserialize;
use std::fs::File;
use std::{error::Error, io::Read};
@ -5,17 +9,20 @@ use std::{error::Error, io::Read};
extern crate toml;
#[derive(Deserialize)]
pub struct Config {
channels: Vec<S0Channel>,
// debounce_millis : Option<u8>,
// invert : Option<bool>,
pub struct S0Channel {
pub id: u8,
pub gpio: u8,
}
#[derive(Deserialize)]
pub struct S0Channel {
id: u8,
gpio: u8,
// debounce_millis: Option<u8>,
pub struct Config {
channel: Vec<S0Channel>,
}
impl Config {
pub fn channels(&self) -> &Vec<S0Channel> {
&self.channel
}
}
// use std::any::type_name;
@ -46,7 +53,3 @@ fn parse_string(cfg_str: &String) -> Result<Config, Box<dyn Error>> {
Err(err) => Err(From::from(err)),
}
}
#[cfg(test)]
#[path = "./cfg_reader_test.rs"]
mod cfg_reader_test;

View file

@ -31,16 +31,54 @@ fn parse_file_fails_if_file_is_greater_than_1mb() {
#[test]
#[named]
fn parse_file_succeeds() {
fn parse_file_succeeds_on_entry() {
let cfg_file_name = create_cfg_file(
function_name!(),
r#"
[[channels]]
[[channel]]
id = 1
gpio = 4
"#,
);
assert!(parse_file(&cfg_file_name).is_ok());
let config = parse_file(&cfg_file_name);
assert!(config.is_ok());
let config = config.unwrap();
let channels = config.channels();
assert_eq!(channels.len(), 1);
assert_eq!(channels[0].id, 1);
assert_eq!(channels[0].gpio, 4);
remove_cfg_file(&cfg_file_name);
}
#[test]
#[named]
fn parse_file_succeeds_multiple_entries() {
let cfg_file_name = create_cfg_file(
function_name!(),
r#"
[[channel]]
id = 1
gpio = 4
[[channel]]
id = 3
gpio = 1
[[channel]]
id = 2
gpio = 9
"#,
);
let config = parse_file(&cfg_file_name);
assert!(config.is_ok());
let config = config.unwrap();
let channels = config.channels();
assert_eq!(channels.len(), 3);
assert_eq!(channels[0].id, 1);
assert_eq!(channels[0].gpio, 4);
assert_eq!(channels[1].id, 3);
assert_eq!(channels[1].gpio, 1);
assert_eq!(channels[2].id, 2);
assert_eq!(channels[2].gpio, 9);
remove_cfg_file(&cfg_file_name);
}
@ -77,7 +115,7 @@ fn parse_string_fails_on_unknown_section() {
fn parse_string_fails_on_section_format_error() {
let cfg_str = String::from(
r#"
[[channels]
[[channel]
id = 1
gpio = 4
"#,
@ -86,7 +124,7 @@ fn parse_string_fails_on_section_format_error() {
let cfg_str = String::from(
r#"
[channels]
[channel]
id = 1
gpio = 4
"#,
@ -95,7 +133,7 @@ fn parse_string_fails_on_section_format_error() {
let cfg_str = String::from(
r#"
[[channels]]
[[channel]]
id =
gpio = 4
"#,
@ -104,7 +142,7 @@ fn parse_string_fails_on_section_format_error() {
let cfg_str = String::from(
r#"
[[channels]]
[[channel]]
id = 1
gpio = 4
2
@ -117,7 +155,7 @@ fn parse_string_fails_on_section_format_error() {
fn parse_string_fails_on_missing_field_id() {
let cfg_str = String::from(
r#"
[[channels]]
[[channel]]
gpio = 4
"#,
);
@ -128,7 +166,7 @@ fn parse_string_fails_on_missing_field_id() {
fn parse_string_fails_on_missing_field_gpio() {
let cfg_str = String::from(
r#"
[[channels]]
[[channel]]
id = 1
"#,
);
@ -139,7 +177,7 @@ fn parse_string_fails_on_missing_field_gpio() {
fn parse_string_ignores_unknown_attributes() {
let cfg_str = String::from(
r#"
[[channels]]
[[channel]]
unknown = 1
id = 1
gpio = 4
@ -152,11 +190,11 @@ fn parse_string_ignores_unknown_attributes() {
fn parse_string_succeeds() {
let cfg = String::from(
r#"
[[channels]]
[[channel]]
id = 1
gpio = 4
[[channels]]
[[channel]]
id = 2
gpio = 17
"#,
@ -164,9 +202,9 @@ fn parse_string_succeeds() {
let config = parse_string(&cfg);
assert!(&config.is_ok());
let config = config.unwrap();
assert_eq!(config.channels.len(), 2);
assert_eq!(config.channels[0].id, 1);
assert_eq!(config.channels[0].gpio, 4);
assert_eq!(config.channels[1].id, 2);
assert_eq!(config.channels[1].gpio, 17);
assert_eq!(config.channel.len(), 2);
assert_eq!(config.channel[0].id, 1);
assert_eq!(config.channel[0].gpio, 4);
assert_eq!(config.channel[1].id, 2);
assert_eq!(config.channel[1].gpio, 17);
}

View file

@ -2,8 +2,8 @@ use std::sync::{mpsc, Arc, Mutex};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PinConfiguration {
pub id: usize,
pub gpio: usize,
pub channel_id: usize,
pub gpio_id: usize,
}
#[derive(Debug)]
pub struct PulseInfo {

View file

@ -5,8 +5,18 @@ mod cfg_reader;
mod pulse_counter;
mod rest_api;
#[cfg(not(feature = "rppal"))]
mod input_pin_manager;
#[cfg(feature = "rppal")]
mod rpi_pin_manager;
// #[cfg(not(feature = "rppal"))]
use input_pin_manager::PinConfiguration;
#[cfg(feature = "rppal")]
use rpi_pin_manager::{input_pin_manager, RPiPinManager};
use async_std;
use async_std::task;
use std::sync::{Arc, Mutex};
const APP_VERSION: &'static str = env!("CARGO_PKG_VERSION");
@ -14,6 +24,36 @@ const DEFAULT_CONFIG_FILE_NAME: &str = "/etc/s0_logger.cfg";
const DEFAULT_IP_ADDRESS: &str = "127.0.0.1";
const DEFAULT_IP_PORT: &str = "6310";
struct PulseDataProvider {
pulse_counter: pulse_counter::PulseCounter,
}
impl rest_api::DataProvider for PulseDataProvider {
fn get_channels(&self) -> Vec<usize> {
self.pulse_counter.get_channels()
}
fn get_pulses_by_channel(
&mut self,
channel_id: usize,
) -> Result<Vec<rest_api::PulseInfo>, std::io::Error> {
match self.pulse_counter.get_pulses_by_channel(channel_id) {
Ok(pulses) => {
let mut pulse_list: Vec<rest_api::PulseInfo> = vec![];
for p in pulses {
pulse_list.push(rest_api::PulseInfo {
timestamp_ns: p.timestamp_ns,
pin_id: p.pin_id,
level: p.level,
})
}
Ok(pulse_list)
}
Err(e) => Err(e),
}
}
}
fn main() {
let cli_args = clap_app!(s0_meter =>
(version: APP_VERSION)
@ -36,33 +76,67 @@ fn main() {
.unwrap_or(DEFAULT_CONFIG_FILE_NAME);
println!("Read the config from file '{}'", config_file_name);
if cfg!(feature = "rppal") {
let mut pin_config: Vec<PinConfiguration> = vec![];
match cfg_reader::parse_file(config_file_name) {
Ok(config) => {
println!("GPIO-Config:");
for channel in config.channels() {
println!("Channel {} @ GPIO {}", &channel.id, &channel.gpio);
&mut pin_config.push(PinConfiguration {
channel_id: channel.id as usize,
gpio_id: channel.gpio as usize,
});
}
println!("---");
}
Err(err_msg) => {
// TODO Log
panic!(
"Error while parsing config file {}: {}",
config_file_name, err_msg
);
}
};
#[allow(unused_assignments, unused_mut)]
let mut pin_manager: Option<
Arc<Mutex<Box<dyn input_pin_manager::InputPinManager + Send>>>,
> = None;
#[cfg(feature = "rppal")]
{
println!("Will access GPIO pins");
} else if cfg!(feature = "hal") {
println!("Will access HAL");
} else {
let input_pin_manager_box: Box<dyn input_pin_manager::InputPinManager + Send> =
Box::new(RPiPinManager::new());
pin_manager = Some(Arc::new(Mutex::new(input_pin_manager_box)));
}
#[cfg(not(feature = "rppal"))]
{
println!("Will NOT access GPIO pins");
}
let rest_ip_addr = DEFAULT_IP_ADDRESS;
let rest_ip_port = DEFAULT_IP_PORT;
let mut p_counter = pulse_counter::PulseCounter::new(pin_manager.clone().unwrap(), pin_config);
match p_counter.start() {
Ok(()) => {}
Err(e) => {
// TODO Log
println!("Could not start pulse_counter thread: {:?}", e);
}
}
let dp: Arc<Mutex<Box<dyn crate::rest_api::DataProvider + Send>>> =
Arc::new(Mutex::new(Box::new(PulseDataProvider {
pulse_counter: p_counter,
})));
let rest_api_config = rest_api::RestApiConfig {
ip_and_port: format!("{}:{}", &rest_ip_addr, &rest_ip_port),
get_channels: fake_get_channels,
get_pulses_by_channel: fake_get_pulses_by_channel,
data_provider: dp.clone(),
};
let _ = task::block_on(rest_api::start(&rest_api_config));
}
fn fake_get_channels() -> Vec<usize> {
vec![]
}
fn fake_get_pulses_by_channel(_: usize) -> Result<Vec<rest_api::PulseInfo>, std::io::Error> {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Not implemented",
))
if pin_manager.is_some() {
&pin_manager.unwrap().lock().unwrap().close_inputs();
}
}

View file

@ -1,45 +1,57 @@
// #![allow(dead_code)]
#[cfg(test)]
#[path = "./pulse_counter_test.rs"]
mod pulse_counter_test;
#[path = "./input_pin_manager.rs"]
mod input_pin_manager;
use input_pin_manager::{InputPinManager, PinConfiguration, PulseInfo};
use std::collections::{hash_map::HashMap, VecDeque};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
struct PulseCounter {
pin_mgr: Box<dyn InputPinManager>,
pulses: Arc<Mutex<HashMap<usize, VecDeque<PulseInfo>>>>,
pub struct PulseCounter {
pin_mgr: Arc<Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>>,
pulses: Arc<Mutex<HashMap<usize, VecDeque<crate::input_pin_manager::PulseInfo>>>>,
}
#[derive(Debug)]
struct PulseCounterError {
pub struct PulseCounterError {
msg: String,
}
impl PulseCounter {
fn new(pin_mgr: Box<dyn InputPinManager>, input_pins: Vec<PinConfiguration>) -> Self {
let channel_lists_arc: Arc<Mutex<HashMap<usize, VecDeque<PulseInfo>>>> =
Arc::new(Mutex::new(HashMap::with_capacity(input_pins.len())));
pub fn new(
pin_mgr: Arc<Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>>,
input_pins: Vec<crate::input_pin_manager::PinConfiguration>,
) -> Self {
let channel_lists_arc: Arc<
Mutex<HashMap<usize, VecDeque<crate::input_pin_manager::PulseInfo>>>,
> = Arc::new(Mutex::new(HashMap::with_capacity(input_pins.len())));
let mut channel_lists = channel_lists_arc.lock().unwrap();
for pin in &input_pins {
channel_lists.insert(pin.id, VecDeque::<PulseInfo>::new());
channel_lists.insert(
pin.channel_id,
VecDeque::<crate::input_pin_manager::PulseInfo>::new(),
);
}
let mut myself = Self {
let myself = Self {
pin_mgr: pin_mgr,
pulses: channel_lists_arc.clone(),
};
myself.pin_mgr.set_input_config(input_pins);
myself.pin_mgr.lock().unwrap().set_input_config(input_pins);
myself
}
fn start(&mut self) -> Result<(), PulseCounterError> {
let msg_channel_recv_arc = Arc::clone(&self.pin_mgr.get_channel_recv());
pub fn get_channels(&self) -> Vec<usize> {
let mut channels = vec![];
let pin_mgr = self.pin_mgr.lock().unwrap();
for p in pin_mgr.get_pins() {
channels.push(p.channel_id);
}
channels
}
pub fn start(&mut self) -> Result<(), PulseCounterError> {
let msg_channel_recv_arc = Arc::clone(&self.pin_mgr.lock().unwrap().get_channel_recv());
let (err_tx, err_rx) = mpsc::channel::<Result<(), PulseCounterError>>();
let pulse_list = self.pulses.clone();
//TODO Give this thread a name
@ -50,13 +62,13 @@ impl PulseCounter {
{
err_tx.send(Ok(())).unwrap();
}
// let pulses = pulse_list.deref();
loop {
match msg_channel_recv.recv() {
Ok(pulse_info) => {
Self::process_pulse_info(&pulse_list, pulse_info);
}
Err(_) => {
// TODO Log
break;
}
}
@ -73,41 +85,52 @@ impl PulseCounter {
err_rx.recv().unwrap()
}
fn stop(&mut self) {
self.pin_mgr.close_inputs();
#[allow(dead_code)]
pub fn stop(&mut self) {
self.pin_mgr.lock().unwrap().close_inputs();
}
fn process_pulse_info(
pulses: &Arc<Mutex<HashMap<usize, VecDeque<PulseInfo>>>>,
pulse: PulseInfo,
pulses: &Arc<Mutex<HashMap<usize, VecDeque<crate::input_pin_manager::PulseInfo>>>>,
pulse: crate::input_pin_manager::PulseInfo,
) {
//TODO Do not exceed a maximum length of the list
match pulses.lock().unwrap().get_mut(&pulse.pin_id) {
let mut channel_lists = pulses.lock().unwrap();
match channel_lists.get_mut(&pulse.pin_id) {
Some(pulse_list) => {
// println!("Pushing pulse {:?} to list", pulse);
pulse_list.push_back(pulse);
}
None => println!("Could not push pulse {:?} to list", pulse),
None => {
// TODO Log
println!("Could not push pulse {:?} to list", pulse)
}
};
}
fn get_pulses_count_by_channel(&self, channel: usize) -> usize {
#[allow(dead_code)]
pub fn get_pulses_count_by_channel(&self, channel: usize) -> usize {
match self.pulses.lock().unwrap().get(&channel) {
Some(channel_pulses) => channel_pulses.len(),
None => 0,
}
}
fn get_pulses_by_channel(&mut self, channel: usize) -> Vec<PulseInfo> {
let mut result: Vec<PulseInfo> = Vec::new();
pub fn get_pulses_by_channel(
&mut self,
channel: usize,
) -> Result<Vec<crate::input_pin_manager::PulseInfo>, std::io::Error> {
match self.pulses.lock().unwrap().get_mut(&channel) {
Some(pulse_list) => {
let mut result: Vec<crate::input_pin_manager::PulseInfo> = Vec::new();
while !pulse_list.is_empty() {
result.push(pulse_list.pop_front().unwrap());
}
Ok(result)
}
None => println!("Cannot get pulses of channel {}", channel),
None => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Cannot get pulses of channel {}", channel),
)),
}
result
}
}

View file

@ -1,20 +1,20 @@
use super::*;
struct TestPinManager {
input_pins: Vec<PinConfiguration>,
pulse_recv: Arc<Mutex<mpsc::Receiver<PulseInfo>>>,
input_pins: Vec<crate::input_pin_manager::PinConfiguration>,
pulse_recv: Arc<Mutex<mpsc::Receiver<crate::input_pin_manager::PulseInfo>>>,
}
impl InputPinManager for TestPinManager {
fn set_input_config(&mut self, input_pins: Vec<PinConfiguration>) {
impl crate::input_pin_manager::InputPinManager for TestPinManager {
fn set_input_config(&mut self, input_pins: Vec<crate::input_pin_manager::PinConfiguration>) {
*&mut self.input_pins = input_pins;
}
fn get_channel_recv(&self) -> &Arc<Mutex<mpsc::Receiver<PulseInfo>>> {
fn get_channel_recv(&self) -> &Arc<Mutex<mpsc::Receiver<crate::input_pin_manager::PulseInfo>>> {
&self.pulse_recv
}
fn get_pins(&self) -> &Vec<PinConfiguration> {
fn get_pins(&self) -> &Vec<crate::input_pin_manager::PinConfiguration> {
&self.input_pins
}
@ -22,8 +22,8 @@ impl InputPinManager for TestPinManager {
}
impl TestPinManager {
fn new(cmd_rx: mpsc::Receiver<PulseInfo>) -> TestPinManager {
let (pulse_tx, pulse_rx) = mpsc::channel::<PulseInfo>();
fn new(cmd_rx: mpsc::Receiver<crate::input_pin_manager::PulseInfo>) -> TestPinManager {
let (pulse_tx, pulse_rx) = mpsc::channel::<crate::input_pin_manager::PulseInfo>();
thread::spawn(move || loop {
match cmd_rx.recv() {
Ok(cmd) => {
@ -46,17 +46,26 @@ impl TestPinManager {
#[test]
fn new_creates_instance() {
let (_, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let test_pin_manager_box: Arc<
Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>,
> = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx))));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut input_pins: Vec<crate::input_pin_manager::PinConfiguration> = Vec::new();
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 1,
gpio_id: 11,
});
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 3,
gpio_id: 33,
});
let input_pins_copy = &input_pins.clone();
let testee = PulseCounter::new(test_pin_manager_box, input_pins);
let testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins);
let testee_pins = &testee.pin_mgr.get_pins();
let testee_pin_manager = testee.pin_mgr.lock().unwrap();
let testee_pins = testee_pin_manager.get_pins();
assert_eq!(testee_pins.len(), input_pins_copy.len());
assert_eq!(testee_pins[0], input_pins_copy[0]);
assert_eq!(testee_pins[1], input_pins_copy[1]);
@ -65,13 +74,21 @@ fn new_creates_instance() {
#[test]
fn start_and_stop_thread() {
let (_, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let test_pin_manager_box: Arc<
Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>,
> = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx))));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut input_pins: Vec<crate::input_pin_manager::PinConfiguration> = Vec::new();
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 1,
gpio_id: 11,
});
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 3,
gpio_id: 33,
});
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(10));
testee.stop();
@ -82,13 +99,21 @@ fn start_thread_twice() {
// ATTENTION: Do NOT use _ as name of the tx channel because the channel would be closed
// immediately and the thread would exit before it will be started the second time!
let (_usused_but_required_tx, cmd_rx) = mpsc::channel();
let test_pin_manager_box = Box::new(TestPinManager::new(cmd_rx));
let test_pin_manager_box: Arc<
Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>,
> = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx))));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut input_pins: Vec<crate::input_pin_manager::PinConfiguration> = Vec::new();
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 1,
gpio_id: 11,
});
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 3,
gpio_id: 33,
});
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(10));
let result = testee.start();
@ -100,20 +125,28 @@ fn start_thread_twice() {
fn start_thread_and_send_pulses() {
let millis_to_wait = 1u64;
let (cmd_tx, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let test_pin_manager_box: Arc<
Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>,
> = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx))));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut input_pins: Vec<crate::input_pin_manager::PinConfiguration> = Vec::new();
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 1,
gpio_id: 11,
});
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 3,
gpio_id: 33,
});
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
assert_eq!(testee.get_pulses_count_by_channel(1), 0);
assert_eq!(testee.get_pulses_count_by_channel(3), 0);
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1234u64,
pin_id: 1,
level: true,
@ -123,7 +156,7 @@ fn start_thread_and_send_pulses() {
assert_eq!(testee.get_pulses_count_by_channel(1), 1);
assert_eq!(testee.get_pulses_count_by_channel(3), 0);
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1235u64,
pin_id: 1,
level: false,
@ -133,7 +166,7 @@ fn start_thread_and_send_pulses() {
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 0);
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1280u64,
pin_id: 3,
level: true,
@ -143,7 +176,7 @@ fn start_thread_and_send_pulses() {
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 1);
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1288u64,
pin_id: 3,
level: false,
@ -161,32 +194,40 @@ fn start_thread_and_send_pulses() {
fn retrieve_pulses_by_channel() {
let millis_to_wait = 1u64;
let (cmd_tx, cmd_rx) = mpsc::channel();
let test_pin_manager_box: Box<TestPinManager> = Box::new(TestPinManager::new(cmd_rx));
let test_pin_manager_box: Arc<
Mutex<Box<dyn crate::input_pin_manager::InputPinManager + Send>>,
> = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx))));
let mut input_pins = Vec::new();
input_pins.push(PinConfiguration { id: 1, gpio: 11 });
input_pins.push(PinConfiguration { id: 3, gpio: 33 });
let mut input_pins: Vec<crate::input_pin_manager::PinConfiguration> = Vec::new();
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 1,
gpio_id: 11,
});
input_pins.push(crate::input_pin_manager::PinConfiguration {
channel_id: 3,
gpio_id: 33,
});
let mut testee = PulseCounter::new(test_pin_manager_box, input_pins);
let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins);
assert!(testee.start().is_ok());
std::thread::sleep(std::time::Duration::from_millis(millis_to_wait));
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1234u64,
pin_id: 1,
level: true,
});
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1235u64,
pin_id: 1,
level: false,
});
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1280u64,
pin_id: 3,
level: true,
});
&cmd_tx.send(PulseInfo {
&cmd_tx.send(crate::input_pin_manager::PulseInfo {
timestamp_ns: 1288u64,
pin_id: 3,
level: false,
@ -196,7 +237,7 @@ fn retrieve_pulses_by_channel() {
assert_eq!(testee.get_pulses_count_by_channel(1), 2);
assert_eq!(testee.get_pulses_count_by_channel(3), 2);
let pulses_ch1 = testee.get_pulses_by_channel(1);
let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap();
assert_eq!(pulses_ch1.len(), 2);
assert_eq!(pulses_ch1[0].timestamp_ns, 1234u64);
assert_eq!(pulses_ch1[0].pin_id, 1);
@ -205,10 +246,10 @@ fn retrieve_pulses_by_channel() {
assert_eq!(pulses_ch1[1].pin_id, 1);
assert_eq!(pulses_ch1[1].level, false);
let pulses_ch1 = testee.get_pulses_by_channel(1);
let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap();
assert_eq!(pulses_ch1.len(), 0);
let pulses_ch1 = testee.get_pulses_by_channel(3);
let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap();
assert_eq!(pulses_ch1.len(), 2);
assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64);
assert_eq!(pulses_ch1[0].pin_id, 3);
@ -217,6 +258,6 @@ fn retrieve_pulses_by_channel() {
assert_eq!(pulses_ch1[1].pin_id, 3);
assert_eq!(pulses_ch1[1].level, false);
let pulses_ch1 = testee.get_pulses_by_channel(3);
let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap();
assert_eq!(pulses_ch1.len(), 0);
}

View file

@ -1,29 +1,28 @@
#[allow(dead_code)]
#[cfg(test)]
#[path = "./rest_api_test.rs"]
mod rest_api_test;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use tide;
use tide::prelude::json;
// struct PinConfiguration {
// channel: usize,
// gpio: usize,
// }
pub trait DataProvider {
fn get_channels(&self) -> Vec<usize>;
fn get_pulses_by_channel(&mut self, channel: usize) -> Result<Vec<PulseInfo>, std::io::Error>;
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PulseInfo {
timestamp_ns: u64,
pin_id: usize,
level: bool,
pub timestamp_ns: u64,
pub pin_id: usize,
pub level: bool,
}
#[derive(Clone)]
pub struct RestApiConfig {
pub ip_and_port: String,
pub get_channels: fn() -> Vec<usize>,
pub get_pulses_by_channel: fn(channel: usize) -> Result<Vec<PulseInfo>, std::io::Error>,
pub data_provider: Arc<Mutex<Box<dyn DataProvider + Send>>>,
}
pub async fn start<'a>(config: &'a RestApiConfig) -> std::io::Result<()> {
@ -57,7 +56,9 @@ async fn api_versions_get(_: tide::Request<RestApiConfig>) -> tide::Result {
}
async fn v1_channels_get(req: tide::Request<RestApiConfig>) -> tide::Result {
let channel_list = &(req.state().get_channels)();
let data_provider = &(*req.state().data_provider).lock().unwrap();
// let data_provider = data_provider_mut.lock().unwrap();
let channel_list = data_provider.get_channels();
Ok(tide::Response::builder(200)
.content_type(tide::http::mime::JSON)
.body(json!({ "channels": channel_list }))
@ -68,16 +69,14 @@ async fn v1_channel_pulses_get(req: tide::Request<RestApiConfig>) -> tide::Resul
match req.param("channel_id") {
Ok(channel_id) => match usize::from_str_radix(channel_id, 10) {
Ok(channel_id) => {
let info = &req.state().ip_and_port;
let channel_pulses_res = &(req.state().get_pulses_by_channel)(channel_id);
let data_provider = &mut (*req.state().data_provider).lock().unwrap();
let channel_pulses_res = data_provider.get_pulses_by_channel(channel_id);
match channel_pulses_res {
Ok(channel_pulses) => Ok(tide::Response::builder(200)
.content_type(tide::http::mime::JSON)
.body(json!({
"channel": channel_id,
"pulses": channel_pulses,
"question": "channel",
"info": info,
}))
.build()),
Err(_) => Ok(tide::Response::new(404)),

View file

@ -14,12 +14,19 @@ use std::thread;
static IP_AND_PORT: &str = "127.0.0.1:8123";
struct TestDataProvider {}
lazy_static! {
// static ref TEST_DATA_PROVIDER: Arc<Mutex<Box<dyn super::DataProvider>>> =
// Arc::new(Mutex::new(Box::new(TestDataProvider{})));
static ref TESTEE_THREAD_HANDLE: Arc<Mutex<Option<thread::JoinHandle<()>>>> = {
let config = RestApiConfig {
ip_and_port: String::from(IP_AND_PORT),
get_channels,
get_pulses_by_channel,
data_provider: Arc::new(Mutex::new(Box::new(TestDataProvider{}))),
// data_provider: TEST_DATA_PROVIDER,
// get_channels: Box::new(|| get_channels()) as Box<dyn Fn() -> Vec<usize> + Send>,
// get_pulses_by_channel,
// get_pulses_by_channel: Box::new(|channel| get_pulses_by_channel(channel)) as Box<dyn Fn(usize) -> Result<Vec<PulseInfo>, std::io::Error> + Send>,
};
let hdl = Arc::new(Mutex::new(Some(thread::spawn(move || {
task::block_on(start(&config.clone())).unwrap();
@ -32,18 +39,23 @@ lazy_static! {
static ref CHANNEL_PULSES: Mutex<HashMap<usize, Vec<PulseInfo>>> = Mutex::new(HashMap::new());
}
fn get_channels() -> Vec<usize> {
CHANNEL_LIST.lock().unwrap().to_vec()
}
impl DataProvider for TestDataProvider {
fn get_channels(&self) -> Vec<usize> {
CHANNEL_LIST.lock().unwrap().to_vec()
}
fn get_pulses_by_channel(channel_id: usize) -> Result<Vec<PulseInfo>, std::io::Error> {
let mut pulse_channel_map = CHANNEL_PULSES.lock().unwrap();
match pulse_channel_map.get_mut(&channel_id) {
Some(pulse_list) => Ok(pulse_list.drain(0..).collect()),
None => Err(Error::new(
ErrorKind::NotFound,
format!("Channel {} does not exist", &channel_id),
)),
fn get_pulses_by_channel(
&mut self,
channel_id: usize,
) -> Result<Vec<PulseInfo>, std::io::Error> {
let mut pulse_channel_map = CHANNEL_PULSES.lock().unwrap();
match pulse_channel_map.get_mut(&channel_id) {
Some(pulse_list) => Ok(pulse_list.drain(0..).collect()),
None => Err(Error::new(
ErrorKind::NotFound,
format!("Channel {} does not exist", &channel_id),
)),
}
}
}

161
src/rpi_pin_manager.rs Normal file
View file

@ -0,0 +1,161 @@
#[path = "./input_pin_manager.rs"]
pub mod input_pin_manager;
pub use self::input_pin_manager::InputPinManager;
pub use input_pin_manager::PinConfiguration;
pub use input_pin_manager::PulseInfo;
use rppal::gpio;
use std::sync::{mpsc, Arc, Mutex};
use std::{thread, time};
struct IsrInfo {
timestamp_ns: u64,
channel: usize,
level: gpio::Level,
}
pub struct RPiPinManager {
gpio: gpio::Gpio,
pin_configuration: Vec<PinConfiguration>,
gpio_handles: Vec<gpio::InputPin>,
pulse_recv: Arc<Mutex<mpsc::Receiver<PulseInfo>>>,
ch_isr_send: Arc<Mutex<mpsc::Sender<IsrInfo>>>,
start_time: time::Instant,
}
impl InputPinManager for RPiPinManager {
fn set_input_config(&mut self, input_pins: Vec<PinConfiguration>) {
if self.pin_configuration.len() > 0 {
// TODO Log
println!("RPiPinManager::set_input_config() already set");
} else {
println!("set_input_config:");
let isr_start = self.start_time;
for p in input_pins {
println!(" channel: {} > gpio: {}", &p.channel_id, &p.gpio_id);
let mut pin = self.gpio.get(p.gpio_id as u8).unwrap().into_input_pullup();
let ch_isr_send = self.ch_isr_send.clone();
let pin_id = p.channel_id.clone();
let interrupt_service_routine = move |level: gpio::Level| {
let isr_info = IsrInfo {
timestamp_ns: isr_start.elapsed().as_nanos() as u64,
channel: pin_id,
level: level,
};
match ch_isr_send.lock().unwrap().send(isr_info) {
Ok(_) => {}
Err(e) => {
// TODO Log
println!("Error while sending: {}", e)
}
};
};
pin.set_async_interrupt(gpio::Trigger::Both, interrupt_service_routine.clone())
.unwrap();
self.pin_configuration.push(p);
self.gpio_handles.push(pin);
}
}
}
fn get_channel_recv(&self) -> &Arc<Mutex<mpsc::Receiver<PulseInfo>>> {
&self.pulse_recv
}
fn get_pins(&self) -> &Vec<PinConfiguration> {
&self.pin_configuration
}
fn close_inputs(&mut self) {}
}
impl RPiPinManager {
pub fn new() -> Self {
println!("RPiPinManager::new()");
let gpio = match gpio::Gpio::new() {
Ok(gpio) => gpio,
Err(e) => {
// TODO Log
println!("ERROR: Cannot access GPIO: {}", e);
panic!("Error");
}
};
let (pulse_tx, pulse_rx) = mpsc::channel::<PulseInfo>();
let (hdl_isr_send, hdl_isr_recv) = mpsc::channel::<IsrInfo>();
thread::spawn(move || {
let mut last_state = gpio::Level::High;
let mut timeout = time::Duration::from_secs(u64::MAX);
loop {
if Self::wait_for_isr_event_and_process_it(
&hdl_isr_recv,
&pulse_tx,
&mut last_state,
&mut timeout,
) {
break;
}
}
});
let start_time = time::Instant::now();
Self {
gpio,
pin_configuration: vec![],
gpio_handles: vec![],
pulse_recv: Arc::new(Mutex::new(pulse_rx)),
ch_isr_send: Arc::new(Mutex::new(hdl_isr_send)),
start_time,
}
}
fn wait_for_isr_event_and_process_it(
receiver: &mpsc::Receiver<IsrInfo>,
client_channel: &mpsc::Sender<PulseInfo>,
last_state: &mut gpio::Level,
timeout: &mut time::Duration,
) -> bool {
let mut shall_stop = false;
match receiver.recv_timeout(*timeout) {
Ok(isr_info) => {
let level = match isr_info.level {
gpio::Level::High => true,
_ => false,
};
let pulse_info = PulseInfo {
timestamp_ns: isr_info.timestamp_ns,
pin_id: isr_info.channel,
level,
};
match client_channel.send(pulse_info) {
Ok(_) => {
// Sending succeeded
}
Err(e) => {
// Silently ignore the sending error because
// there may be no event receive at the other end of this channel
println!("ISR: Send error {}", e);
}
};
}
Err(e) => {
match e {
mpsc::RecvTimeoutError::Timeout => {
*timeout = time::Duration::from_secs(u64::MAX);
println!("------> {}", last_state);
}
_ => {
println!("Last sender stopped: {}", e);
shall_stop = true;
}
};
}
};
shall_stop
}
}