diff --git a/agent/src/discovery_handler_manager/discovery_handler_registry.rs b/agent/src/discovery_handler_manager/discovery_handler_registry.rs index 27c34466e..4c56e9fda 100644 --- a/agent/src/discovery_handler_manager/discovery_handler_registry.rs +++ b/agent/src/discovery_handler_manager/discovery_handler_registry.rs @@ -134,8 +134,10 @@ pub trait DiscoveryHandlerEndpoint: Send + Sync { /// results across the different registered handlers of that type, and generate the Instance objects for discovered /// devices. #[cfg_attr(test, automock)] +#[async_trait] pub trait DiscoveryHandlerRequest: Sync + Send { - fn get_instances(&self) -> Result, DiscoveryError>; + async fn get_instances(&self) -> Result, DiscoveryError>; + async fn set_extra_device_properties(&self, extra_device_properties: HashMap); } /// This trait is here to help with testing for code that interract with the discovery handler registry @@ -169,35 +171,60 @@ pub trait DiscoveryHandlerRegistry: Sync + Send { /// Real world implementation of the Discovery Handler Request struct DHRequestImpl { endpoints: RwLock>>>>, - notifier: watch::Sender>>, + notifier: watch::Sender, key: String, handler_name: String, details: String, properties: Vec, - extra_device_properties: HashMap, + extra_device_properties: RwLock>, kube_client: Arc, termination_notifier: Arc, } +#[async_trait] impl DiscoveryHandlerRequest for DHRequestImpl { - fn get_instances(&self) -> Result, DiscoveryError> { + async fn get_instances(&self) -> Result, DiscoveryError> { + let properties = self.extra_device_properties.read().await; Ok(self - .notifier - .borrow() + .endpoints + .read() + .await .iter() - .map(|i| self.device_to_instance(i)) + .flat_map(|r| r.borrow().clone().into_iter()) + .map(|i| self.device_to_instance(i.as_ref(), &properties)) .collect()) } + + async fn set_extra_device_properties(&self, extra_device_properties: HashMap) { + let mut current = self.extra_device_properties.write().await; + if extra_device_properties != *current { + let edit = extra_device_properties + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect(); + *current = extra_device_properties; + self.notifier + .send_modify(|k| k.container_edits.first_mut().unwrap().env = edit); + } + } } impl DHRequestImpl { - fn device_to_instance(&self, dev: &DiscoveredDevice) -> Instance { + fn device_to_instance( + &self, + dev: &DiscoveredDevice, + extra_device_properties: &HashMap, + ) -> Instance { let (rdev, shared) = match dev { DiscoveredDevice::LocalDevice(d, _) => (d, false), DiscoveredDevice::SharedDevice(d) => (d, true), }; let mut properties = rdev.properties.clone(); - properties.extend(self.extra_device_properties.clone()); + properties.extend( + extra_device_properties + .iter() + .map(|(k, v)| (k.clone(), v.clone())), + ); Instance { spec: InstanceSpec { cdi_name: self.get_device_cdi_fqdn(dev), @@ -255,13 +282,27 @@ impl DHRequestImpl { .await .iter_mut() .flat_map(|r| r.borrow_and_update().clone().into_iter()) + .unique_by(|d| self.get_device_cdi_fqdn(d)) .collect(); - self.notifier.send_replace( - devices - .into_iter() - .unique_by(|d| self.get_device_cdi_fqdn(d)) - .collect(), - ); + self.notifier + .send_replace(crate::device_manager::cdi::Kind { + kind: format!("{}/{}", AKRI_PREFIX, self.key), + annotations: Default::default(), + devices: devices + .into_iter() + .map(|d| d.as_ref().clone().into()) + .collect(), + container_edits: vec![ContainerEdit { + env: self + .extra_device_properties + .read() + .await + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect(), + ..Default::default() + }], + }); } } @@ -324,41 +365,23 @@ impl DHRegistryImpl { } async fn handle_request( - mut req_notifier: watch::Receiver>>, - key: String, + mut req_notifier: watch::Receiver, + key: &String, namespace: &String, cdi_sender: Arc>>>, local_config_sender: mpsc::Sender>, - extra_device_properties: HashMap, ) { let cdi_kind = format!("{}/{}", AKRI_PREFIX, key); loop { match req_notifier.changed().await { Ok(_) => { - cdi_sender.lock().await.send_modify(|kind| { - kind.insert( - cdi_kind.clone(), - crate::device_manager::cdi::Kind { - kind: cdi_kind.clone(), - annotations: Default::default(), - devices: req_notifier - .borrow_and_update() - .iter() - .map(|d| d.as_ref().clone().into()) - .collect(), - container_edits: vec![ContainerEdit { - env: extra_device_properties - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect(), - ..Default::default() - }], - }, - ); + let kind = req_notifier.borrow_and_update().clone(); + cdi_sender.lock().await.send_modify(|kinds| { + kinds.insert(cdi_kind.clone(), kind); }); trace!("Ask for reconciliation of {}::{}", namespace, key); let res = local_config_sender - .send(ObjectRef::::new(&key).within(namespace)) + .send(ObjectRef::::new(key).within(namespace)) .await; if res.is_err() { cdi_sender.lock().await.send_modify(|kind| { @@ -370,7 +393,7 @@ async fn handle_request( Err(_) => { trace!("Ask for reconciliation of {}::{}", namespace, key); let _ = local_config_sender - .send(ObjectRef::::new(&key).within(namespace)) + .send(ObjectRef::::new(key).within(namespace)) .await; cdi_sender.lock().await.send_modify(|kind| { kind.remove(&cdi_kind); @@ -394,7 +417,7 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl { ) -> Result<(), DiscoveryError> { match self.handlers.read().await.get(dh_name) { Some(handlers) => { - let (notifier, _) = watch::channel(vec![]); + let (notifier, _) = watch::channel(Default::default()); let terminated = Arc::new(Notify::new()); let mut dh_req = DHRequestImpl { endpoints: Default::default(), @@ -403,7 +426,7 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl { handler_name: dh_name.to_string(), details: dh_details.to_string(), properties: dh_properties.to_vec(), - extra_device_properties: extra_device_properties.clone(), + extra_device_properties: RwLock::new(extra_device_properties), kube_client: self.kube_client.clone(), termination_notifier: terminated.clone(), }; @@ -433,11 +456,10 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl { tokio::spawn(async move { handle_request( local_req_notifier, - local_key, + &local_key, &namespace, local_cdi_sender, local_config_sender, - extra_device_properties, ) .await }); @@ -608,9 +630,9 @@ mod tests { ); } - #[test] - fn test_dh_request_impl_get_instances() { - let (notifier, _) = watch::channel(vec![Arc::new(DiscoveredDevice::LocalDevice( + #[tokio::test] + async fn test_dh_request_impl_get_instances() { + let (_, notifier) = watch::channel(vec![Arc::new(DiscoveredDevice::LocalDevice( Device { id: "my_local_device".to_owned(), properties: HashMap::from([( @@ -622,23 +644,25 @@ mod tests { }, "my_node".to_owned(), ))]); + let endpoints = RwLock::new(vec![notifier]); + let (cdi_notifier, _) = watch::channel(Default::default()); let req = DHRequestImpl { - endpoints: Default::default(), - notifier, + endpoints, + notifier: cdi_notifier, key: "my_config".to_owned(), handler_name: "mock_handler".to_string(), details: Default::default(), properties: Default::default(), - extra_device_properties: HashMap::from([( + extra_device_properties: RwLock::new(HashMap::from([( "MY_EXTRA_KEY".to_owned(), "value".to_owned(), - )]), + )])), kube_client: Arc::new(MockDiscoveryManagerKubeInterface::new()), termination_notifier: Arc::new(Notify::new()), }; assert_eq!( - req.get_instances().unwrap(), + req.get_instances().await.unwrap(), vec![Instance { metadata: ObjectMeta { name: Some("my_config-e77db4".to_owned()), @@ -662,7 +686,7 @@ mod tests { #[tokio::test] async fn test_dh_request_impl_watch_devices() { - let (notifier, mut n_rec) = watch::channel(vec![]); + let (notifier, mut n_rec) = watch::channel(Default::default()); let (dh_send, dh_rec) = watch::channel(Default::default()); let req = Arc::new(DHRequestImpl { endpoints: RwLock::new(vec![dh_rec]), @@ -675,10 +699,10 @@ mod tests { value: Some("value_1".to_string()), value_from: None, }], - extra_device_properties: HashMap::from([( + extra_device_properties: RwLock::new(HashMap::from([( "MY_EXTRA_KEY".to_owned(), "value".to_owned(), - )]), + )])), kube_client: Arc::new(MockDiscoveryManagerKubeInterface::new()), termination_notifier: Arc::new(Notify::new()), }); @@ -687,7 +711,7 @@ mod tests { let (new_dh_sen, rec) = broadcast::channel(1); let task = tokio::spawn(async move { req_ref.watch_devices(rec).await }); - assert!(n_rec.borrow_and_update().is_empty()); + assert!(n_rec.borrow_and_update().devices.is_empty()); let new_device = Arc::new(DiscoveredDevice::SharedDevice(Device { id: "my_shared_device".to_owned(), @@ -698,7 +722,10 @@ mod tests { dh_send.send(vec![new_device.clone()]).unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; - assert_eq!(n_rec.borrow_and_update().clone(), vec![new_device]); + assert_eq!( + n_rec.borrow_and_update().devices.clone(), + vec![new_device.as_ref().clone().into()] + ); let mut new_dh = MockDiscoveryHandlerEndpoint::new(); let new_dh_senders = Arc::new(std::sync::Mutex::new(vec![])); diff --git a/agent/src/util/discovery_configuration_controller.rs b/agent/src/util/discovery_configuration_controller.rs index 496a783fa..21a8929aa 100644 --- a/agent/src/util/discovery_configuration_controller.rs +++ b/agent/src/util/discovery_configuration_controller.rs @@ -115,17 +115,21 @@ pub async fn reconcile( let discovered_instances: Vec = match ctx.dh_registry.get_request(&dc.name_any()).await { - Some(req) => req - .get_instances()? - .into_iter() - .map(|mut instance| { - // Add - instance.spec.nodes = vec![ctx.agent_identifier.to_owned()]; - instance.owner_references_mut().push(owner_ref.clone()); - instance.spec.capacity = dc.spec.capacity; - instance - }) - .collect(), + Some(req) => { + req.set_extra_device_properties(dc.spec.broker_properties.clone()) + .await; + req.get_instances() + .await? + .into_iter() + .map(|mut instance| { + // Add + instance.spec.nodes = vec![ctx.agent_identifier.to_owned()]; + instance.owner_references_mut().push(owner_ref.clone()); + instance.spec.capacity = dc.spec.capacity; + instance + }) + .collect() + } None => { ctx.dh_registry .new_request( @@ -474,6 +478,9 @@ mod tests { let mut registry = MockDiscoveryHandlerRegistry::new(); let mut request = MockDiscoveryHandlerRequest::new(); + request + .expect_set_extra_device_properties() + .returning(|_| {}); request.expect_get_instances().returning(|| Ok(vec![])); registry .expect_get_request()