diff --git a/src/input_pin_manager.rs b/src/input_pin_manager.rs index f8a15c8..3ef6c2e 100644 --- a/src/input_pin_manager.rs +++ b/src/input_pin_manager.rs @@ -5,7 +5,7 @@ pub struct PinConfiguration { pub channel_id: usize, pub gpio_id: usize, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PulseInfo { pub timestamp_ns: u64, pub pin_id: usize, diff --git a/src/main.rs b/src/main.rs index 0e31854..bd9998e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,20 @@ impl rest_api::DataProvider for PulseDataProvider { Err(e) => Err(e), } } + + fn delete_pulses_in_channel_upto_timestamp_ns( + &mut self, + channel_id: usize, + upto_timestamp_ns: u64, + ) -> Result<(), std::io::Error> { + match self + .pulse_counter + .delete_pulses_in_channel_upto_timestamp_ns(channel_id, upto_timestamp_ns) + { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } } fn main() { diff --git a/src/pulse_counter.rs b/src/pulse_counter.rs index 08c274f..fa03c04 100644 --- a/src/pulse_counter.rs +++ b/src/pulse_counter.rs @@ -119,12 +119,16 @@ impl PulseCounter { &mut self, channel: usize, ) -> Result, std::io::Error> { - match self.pulses.lock().unwrap().get_mut(&channel) { + match self.pulses.lock().unwrap().get(&channel) { Some(pulse_list) => { let mut result: Vec = Vec::new(); - while !pulse_list.is_empty() { - result.push(pulse_list.pop_front().unwrap()); - } + // let mut result: Vec = + // Vec::with_capacity(pulse_list.len()); + // while !pulse_list.is_empty() { + // result.push(pulse_list.pop_front().unwrap()); + // } + // result.clone_from_slice(&pulse_list[0..]); + result.extend(pulse_list.iter().cloned()); Ok(result) } None => Err(std::io::Error::new( @@ -133,4 +137,28 @@ impl PulseCounter { )), } } + + #[allow(dead_code)] + pub fn delete_pulses_in_channel_upto_timestamp_ns( + &mut self, + channel: usize, + upto_timestamp_ns: u64, + ) -> Result<(), std::io::Error> { + match self.pulses.lock().unwrap().get_mut(&channel) { + Some(pulse_list) => { + // println!("\nDeleting pulses of channel {}", channel); + while !pulse_list.is_empty() && pulse_list[0].timestamp_ns <= upto_timestamp_ns { + pulse_list.pop_front(); + } + Ok(()) + } + None => { + // println!("\nDeleting pulses - channel {} is unknown", channel); + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Cannot delete pulses of channel {}", channel), + )) + } + } + } } diff --git a/src/pulse_counter_test.rs b/src/pulse_counter_test.rs index ff3ef08..5c847fe 100644 --- a/src/pulse_counter_test.rs +++ b/src/pulse_counter_test.rs @@ -246,9 +246,6 @@ fn retrieve_pulses_by_channel() { assert_eq!(pulses_ch1[1].pin_id, 1); assert_eq!(pulses_ch1[1].level, false); - let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap(); - assert_eq!(pulses_ch1.len(), 0); - let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap(); assert_eq!(pulses_ch1.len(), 2); assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64); @@ -258,6 +255,136 @@ fn retrieve_pulses_by_channel() { assert_eq!(pulses_ch1[1].pin_id, 3); assert_eq!(pulses_ch1[1].level, false); + let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap(); + assert_eq!(pulses_ch1.len(), 2); + assert_eq!(pulses_ch1[0].timestamp_ns, 1234u64); + assert_eq!(pulses_ch1[0].pin_id, 1); + assert_eq!(pulses_ch1[0].level, true); + assert_eq!(pulses_ch1[1].timestamp_ns, 1235u64); + assert_eq!(pulses_ch1[1].pin_id, 1); + assert_eq!(pulses_ch1[1].level, false); + let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap(); - assert_eq!(pulses_ch1.len(), 0); + assert_eq!(pulses_ch1.len(), 2); + assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64); + assert_eq!(pulses_ch1[0].pin_id, 3); + assert_eq!(pulses_ch1[0].level, true); + assert_eq!(pulses_ch1[1].timestamp_ns, 1288u64); + assert_eq!(pulses_ch1[1].pin_id, 3); + assert_eq!(pulses_ch1[1].level, false); +} + +#[test] +fn delete_pulses_in_channel_upto_timestamp_ns() { + let millis_to_wait = 1u64; + let (cmd_tx, cmd_rx) = mpsc::channel(); + let test_pin_manager_box: Arc< + Mutex>, + > = Arc::new(Mutex::new(Box::new(TestPinManager::new(cmd_rx)))); + + let mut input_pins: Vec = Vec::new(); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 1, + gpio_id: 11, + }); + input_pins.push(crate::input_pin_manager::PinConfiguration { + channel_id: 3, + gpio_id: 33, + }); + + let mut testee = PulseCounter::new(test_pin_manager_box.clone(), input_pins); + assert!(testee.start().is_ok()); + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + + &cmd_tx.send(crate::input_pin_manager::PulseInfo { + timestamp_ns: 1234u64, + pin_id: 1, + level: true, + }); + &cmd_tx.send(crate::input_pin_manager::PulseInfo { + timestamp_ns: 1235u64, + pin_id: 1, + level: false, + }); + &cmd_tx.send(crate::input_pin_manager::PulseInfo { + timestamp_ns: 1236u64, + pin_id: 1, + level: true, + }); + &cmd_tx.send(crate::input_pin_manager::PulseInfo { + timestamp_ns: 1280u64, + pin_id: 3, + level: true, + }); + &cmd_tx.send(crate::input_pin_manager::PulseInfo { + timestamp_ns: 1288u64, + pin_id: 3, + level: false, + }); + &cmd_tx.send(crate::input_pin_manager::PulseInfo { + timestamp_ns: 1289u64, + pin_id: 3, + level: true, + }); + + std::thread::sleep(std::time::Duration::from_millis(millis_to_wait)); + assert_eq!(testee.get_pulses_count_by_channel(1), 3); + assert_eq!(testee.get_pulses_count_by_channel(3), 3); + + let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap(); + assert_eq!(pulses_ch1.len(), 3); + assert_eq!(pulses_ch1[0].timestamp_ns, 1234u64); + assert_eq!(pulses_ch1[0].pin_id, 1); + assert_eq!(pulses_ch1[0].level, true); + assert_eq!(pulses_ch1[1].timestamp_ns, 1235u64); + assert_eq!(pulses_ch1[1].pin_id, 1); + assert_eq!(pulses_ch1[1].level, false); + assert_eq!(pulses_ch1[2].timestamp_ns, 1236u64); + assert_eq!(pulses_ch1[2].pin_id, 1); + assert_eq!(pulses_ch1[2].level, true); + + let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap(); + assert_eq!(pulses_ch1.len(), 3); + assert_eq!(pulses_ch1[0].timestamp_ns, 1280u64); + assert_eq!(pulses_ch1[0].pin_id, 3); + assert_eq!(pulses_ch1[0].level, true); + assert_eq!(pulses_ch1[1].timestamp_ns, 1288u64); + assert_eq!(pulses_ch1[1].pin_id, 3); + assert_eq!(pulses_ch1[1].level, false); + assert_eq!(pulses_ch1[2].timestamp_ns, 1289u64); + assert_eq!(pulses_ch1[2].pin_id, 3); + assert_eq!(pulses_ch1[2].level, true); + + assert!(testee + .delete_pulses_in_channel_upto_timestamp_ns(1, 1234u64) + .is_ok()); + + assert_eq!(testee.get_pulses_count_by_channel(1), 2); + assert_eq!(testee.get_pulses_count_by_channel(3), 3); + + assert!(testee + .delete_pulses_in_channel_upto_timestamp_ns(2, 1235u64) + .is_err()); + + assert!(testee + .delete_pulses_in_channel_upto_timestamp_ns(3, 1288u64) + .is_ok()); + + assert_eq!(testee.get_pulses_count_by_channel(1), 2); + assert_eq!(testee.get_pulses_count_by_channel(3), 1); + + let pulses_ch1 = testee.get_pulses_by_channel(1).unwrap(); + assert_eq!(pulses_ch1.len(), 2); + assert_eq!(pulses_ch1[0].timestamp_ns, 1235u64); + assert_eq!(pulses_ch1[0].pin_id, 1); + assert_eq!(pulses_ch1[0].level, false); + assert_eq!(pulses_ch1[1].timestamp_ns, 1236u64); + assert_eq!(pulses_ch1[1].pin_id, 1); + assert_eq!(pulses_ch1[1].level, true); + + let pulses_ch1 = testee.get_pulses_by_channel(3).unwrap(); + assert_eq!(pulses_ch1.len(), 1); + assert_eq!(pulses_ch1[0].timestamp_ns, 1289u64); + assert_eq!(pulses_ch1[0].pin_id, 3); + assert_eq!(pulses_ch1[0].level, true); } diff --git a/src/rest_api.rs b/src/rest_api.rs index e36b523..00b61a1 100644 --- a/src/rest_api.rs +++ b/src/rest_api.rs @@ -10,15 +10,33 @@ use tide::prelude::json; pub trait DataProvider { fn get_channels(&self) -> Vec; fn get_pulses_by_channel(&mut self, channel: usize) -> Result, std::io::Error>; + fn delete_pulses_in_channel_upto_timestamp_ns( + &mut self, + channel: usize, + upto_timestamp_ns: u64, + ) -> Result<(), std::io::Error>; } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct PulseInfo { pub timestamp_ns: u64, pub channel_id: usize, pub level: bool, } +#[derive(Deserialize, Debug)] +#[serde(default)] +pub struct DeletePulsesQuery { + delete_upto_timestamp_ns: u64, +} +impl Default for DeletePulsesQuery { + fn default() -> Self { + Self { + delete_upto_timestamp_ns: 0u64, + } + } +} + #[derive(Clone)] pub struct RestApiConfig { pub ip_and_port: String, @@ -31,6 +49,8 @@ pub async fn start<'a>(config: &'a RestApiConfig) -> std::io::Result<()> { app.at("/v1/channels").get(v1_channels_get); app.at("/v1/channel/:channel_id/pulses") .get(v1_channel_pulses_get); + app.at("/v1/channel/:channel_id/delete_upto_timestamp_ns/:timestamp_ns") + .delete(v1_channel_pulses_delete2); let ip_and_port = String::from(&config.ip_and_port); match app.listen(ip_and_port).await { Ok(_) => Ok(()), @@ -59,7 +79,7 @@ async fn v1_channels_get(req: tide::Request) -> tide::Result { let data_provider = &(*req.state().data_provider).lock().unwrap(); // let data_provider = data_provider_mut.lock().unwrap(); let channel_list = data_provider.get_channels(); - Ok(tide::Response::builder(200) + Ok(tide::Response::builder(tide::http::StatusCode::Ok) .content_type(tide::http::mime::JSON) .body(json!({ "channels": channel_list })) .build()) @@ -72,18 +92,48 @@ async fn v1_channel_pulses_get(req: tide::Request) -> tide::Resul let data_provider = &mut (*req.state().data_provider).lock().unwrap(); let channel_pulses_res = data_provider.get_pulses_by_channel(channel_id); match channel_pulses_res { - Ok(channel_pulses) => Ok(tide::Response::builder(200) + Ok(channel_pulses) => Ok(tide::Response::builder(tide::http::StatusCode::Ok) .content_type(tide::http::mime::JSON) .body(json!({ "channel_id": channel_id, "pulses": channel_pulses, })) .build()), - Err(_) => Ok(tide::Response::new(404)), + Err(_) => Ok(tide::Response::new(tide::http::StatusCode::NotFound)), } } - Err(_) => Ok(tide::Response::new(405)), + Err(_) => Ok(tide::Response::new(tide::http::StatusCode::NotFound)), }, - Err(_e) => Ok(tide::Response::new(404)), + Err(_e) => Ok(tide::Response::new(tide::http::StatusCode::NotFound)), + } +} + +async fn v1_channel_pulses_delete2(req: tide::Request) -> tide::Result { + // println!("\nv1_channel_pulses_delete()"); + match req.param("channel_id") { + Ok(channel_id) => match usize::from_str_radix(channel_id, 10) { + Ok(channel_id) => match req.param("timestamp_ns") { + Ok(timestamp_ns) => match u64::from_str_radix(timestamp_ns, 10) { + Ok(timestamp_ns) => { + let data_provider = &mut (*req.state().data_provider).lock().unwrap(); + let result = data_provider + .delete_pulses_in_channel_upto_timestamp_ns(channel_id, timestamp_ns); + match result { + Ok(_) => Ok(tide::Response::new(tide::http::StatusCode::Ok)), + Err(_) => Ok(tide::Response::new(tide::http::StatusCode::NotFound)), + } + } + Err(_) => Ok(tide::Response::new(tide::http::StatusCode::NotAcceptable)), + }, + Err(_) => Ok(tide::Response::builder(tide::http::StatusCode::NotFound) + .content_type(tide::http::mime::JSON) + .body(json!({ + "Error": "Argument \"delete_upto_timestamp_ns\" required" + })) + .build()), + }, + Err(_) => Ok(tide::Response::new(tide::http::StatusCode::NotFound)), + }, + Err(_e) => Ok(tide::Response::new(tide::http::StatusCode::NotFound)), } } diff --git a/src/rest_api_test.rs b/src/rest_api_test.rs index 2d20a64..59f3d85 100644 --- a/src/rest_api_test.rs +++ b/src/rest_api_test.rs @@ -17,16 +17,10 @@ static IP_AND_PORT: &str = "127.0.0.1:8123"; struct TestDataProvider {} lazy_static! { - // static ref TEST_DATA_PROVIDER: Arc>> = - // Arc::new(Mutex::new(Box::new(TestDataProvider{}))); static ref TESTEE_THREAD_HANDLE: Arc>>> = { let config = RestApiConfig { ip_and_port: String::from(IP_AND_PORT), data_provider: Arc::new(Mutex::new(Box::new(TestDataProvider{}))), - // data_provider: TEST_DATA_PROVIDER, - // get_channels: Box::new(|| get_channels()) as Box Vec + Send>, - // get_pulses_by_channel, - // get_pulses_by_channel: Box::new(|channel| get_pulses_by_channel(channel)) as Box Result, std::io::Error> + Send>, }; let hdl = Arc::new(Mutex::new(Some(thread::spawn(move || { task::block_on(start(&config.clone())).unwrap(); @@ -48,9 +42,28 @@ impl DataProvider for TestDataProvider { &mut self, channel_id: usize, ) -> Result, std::io::Error> { + let pulse_channel_map = CHANNEL_PULSES.lock().unwrap(); + match pulse_channel_map.get(&channel_id) { + Some(pulse_list) => { + let mut result = Vec::new(); + result.extend(pulse_list.iter().cloned()); + Ok(result) + } + None => Err(Error::new( + ErrorKind::NotFound, + format!("Channel {} does not exist", &channel_id), + )), + } + } + + fn delete_pulses_in_channel_upto_timestamp_ns( + &mut self, + channel_id: usize, + _upto_timestamp_ns: u64, + ) -> Result<(), std::io::Error> { let mut pulse_channel_map = CHANNEL_PULSES.lock().unwrap(); match pulse_channel_map.get_mut(&channel_id) { - Some(pulse_list) => Ok(pulse_list.drain(0..).collect()), + Some(_) => Ok(()), None => Err(Error::new( ErrorKind::NotFound, format!("Channel {} does not exist", &channel_id), @@ -61,21 +74,22 @@ impl DataProvider for TestDataProvider { fn launch_testee() { let _hdl = &*TESTEE_THREAD_HANDLE.clone(); + set_channel_list(vec![]); } fn print_response(_context: &str, _response: &minreq::Response) { return; /* - let mut r_str = String::new(); - r_str.push_str(format!("Response in {}:\n", &_context).as_str()); - r_str.push_str(format!(" status: {}\n", &_response.status_code).as_str()); - r_str.push_str(format!(" reason: '{}'\n", &_response.reason_phrase).as_str()); - r_str.push_str(format!(" headers:\n").as_str()); - for (key, value) in _response.headers.iter() { - r_str.push_str(format!(" {}: '{}'\n", key, value).as_str()); - } - r_str.push_str(format!(" body: '{}'\n", &_response.as_str().unwrap()).as_str()); - println!("{}\n---", &r_str); + let mut r_str = String::new(); + r_str.push_str(format!("Response in {}:\n", &_context).as_str()); + r_str.push_str(format!(" status: {}\n", &_response.status_code).as_str()); + r_str.push_str(format!(" reason: '{}'\n", &_response.reason_phrase).as_str()); + r_str.push_str(format!(" headers:\n").as_str()); + for (key, value) in _response.headers.iter() { + r_str.push_str(format!(" {}: '{}'\n", key, value).as_str()); + } + r_str.push_str(format!(" body: '{}'\n", &_response.as_str().unwrap()).as_str()); + println!("{}\n---", &r_str); */ } @@ -83,6 +97,11 @@ fn set_channel_list(mut new_channel_list: Vec) { match CHANNEL_LIST.lock() { Ok(mut channel_list) => { channel_list.clear(); + match CHANNEL_PULSES.lock() { + Ok(mut pulses) => pulses.clear(), + Err(_) => assert!(false, "ERROR: Could not access CHANNEL_PULSES"), + } + channel_list.append(&mut new_channel_list); } Err(_) => assert!(false, "ERROR: Could not access CHANNEL_LIST"), @@ -97,7 +116,7 @@ fn set_channel_pulses(channel_id: usize, mut channel_pulses: Vec) { } #[test] -fn rest_api_fetch_api_versions() { +fn rest_api_get_api_versions() { launch_testee(); let response = minreq::get(format!("http://{}/api_versions", &IP_AND_PORT)).send(); assert!(&response.is_ok()); @@ -107,7 +126,7 @@ fn rest_api_fetch_api_versions() { } #[test] -fn rest_api_fetch_channels() { +fn rest_api_get_channels() { #[derive(Debug, Deserialize)] struct ChannelList { channels: Vec, @@ -115,7 +134,6 @@ fn rest_api_fetch_channels() { launch_testee(); - set_channel_list(vec![]); let response = minreq::get(format!("http://{}/v1/channels", &IP_AND_PORT)).send(); assert!(&response.is_ok()); let response = response.unwrap(); @@ -141,13 +159,14 @@ fn rest_api_fetch_channels() { } #[test] -fn rest_api_fetch_channel() { +fn rest_api_get_pulses_and_get_them_again() { #[derive(Debug, Deserialize)] struct ChannelPulsesList { channel_id: usize, pulses: Vec, } + println!("\nrest_api_get_pulses() start"); launch_testee(); let response = minreq::get(format!("http://{}/v1/channel/1/pulses", &IP_AND_PORT)).send(); @@ -202,6 +221,8 @@ fn rest_api_fetch_channel() { }, ]; set_channel_pulses(1, pulses); + + // Retrieve the pulses let response = minreq::get(format!("http://{}/v1/channel/1/pulses", &IP_AND_PORT)).send(); assert!(&response.is_ok()); let response = response.unwrap(); @@ -223,6 +244,7 @@ fn rest_api_fetch_channel() { assert_eq!(response_json.pulses.get(3).unwrap().channel_id, 1); assert_eq!(response_json.pulses.get(3).unwrap().level, false); + // Retrieve the pulses again let response = minreq::get(format!("http://{}/v1/channel/1/pulses", &IP_AND_PORT)).send(); assert!(&response.is_ok()); let response = response.unwrap(); @@ -230,5 +252,75 @@ fn rest_api_fetch_channel() { assert_eq!(response.status_code, tide::http::StatusCode::Ok as i32); let response_json = &response.json::().unwrap(); assert_eq!(response_json.channel_id, 1); - assert_eq!(response_json.pulses.len(), 0); + assert_eq!(response_json.pulses.len(), 4); + assert_eq!(response_json.pulses.get(0).unwrap().timestamp_ns, 1234u64); + assert_eq!(response_json.pulses.get(0).unwrap().channel_id, 1); + assert_eq!(response_json.pulses.get(0).unwrap().level, true); + assert_eq!(response_json.pulses.get(1).unwrap().timestamp_ns, 1256u64); + assert_eq!(response_json.pulses.get(1).unwrap().channel_id, 1); + assert_eq!(response_json.pulses.get(1).unwrap().level, false); + assert_eq!(response_json.pulses.get(2).unwrap().timestamp_ns, 1278u64); + assert_eq!(response_json.pulses.get(2).unwrap().channel_id, 1); + assert_eq!(response_json.pulses.get(2).unwrap().level, true); + assert_eq!(response_json.pulses.get(3).unwrap().timestamp_ns, 1290u64); + assert_eq!(response_json.pulses.get(3).unwrap().channel_id, 1); + assert_eq!(response_json.pulses.get(3).unwrap().level, false); +} + +#[test] +fn rest_api_delete_pulses_upto_timestamp_ns() { + #[derive(Debug, Deserialize)] + struct ChannelPulsesList { + channel_id: usize, + pulses: Vec, + } + + launch_testee(); + + let response = minreq::delete(format!( + "http://{}/v1/channel/1/delete_upto_timestamp_ns", + &IP_AND_PORT + )) + .send(); + assert!(&response.is_ok()); + let response = response.unwrap(); + print_response("\nrest_api_fetch_channel", &response); + assert_eq!( + response.status_code, + tide::http::StatusCode::NotFound as i32 + ); + + let pulses = vec![ + PulseInfo { + timestamp_ns: 1234u64, + channel_id: 1, + level: true, + }, + PulseInfo { + timestamp_ns: 1256u64, + channel_id: 1, + level: false, + }, + PulseInfo { + timestamp_ns: 1278u64, + channel_id: 1, + level: true, + }, + PulseInfo { + timestamp_ns: 1290u64, + channel_id: 1, + level: false, + }, + ]; + set_channel_pulses(1, pulses); + let response = minreq::delete(format!( + "http://{}/v1/channel/1/delete_upto_timestamp_ns/1260", + &IP_AND_PORT + )) + .send(); + assert!(&response.is_ok()); + let response = response.unwrap(); + print_response("\nrest_api_fetch_channel", &response); + assert_eq!(response.status_code, tide::http::StatusCode::Ok as i32); + // assert!(false); }