diff --git a/docs/pages/product/apis-integrations/rest-api.mdx b/docs/pages/product/apis-integrations/rest-api.mdx index ef8706e7cfae0..98b1e1d9a0cb7 100644 --- a/docs/pages/product/apis-integrations/rest-api.mdx +++ b/docs/pages/product/apis-integrations/rest-api.mdx @@ -11,7 +11,7 @@ data applications, including but not limited to the following ones: Often, the REST API is used to enable [embedded analytics][cube-ea] and [real-time analytics][cube-rta] use cases. -See [REST API reference][ref-ref-rest-api] for the list of supported API endpoinsts. +See [REST API reference][ref-ref-rest-api] for the list of supported API endpoints. Also, check [query format][ref-rest-query-format] for details about query syntax. diff --git a/docs/pages/product/caching.mdx b/docs/pages/product/caching.mdx index 8a7eb82da2825..301bb5ef7d7e4 100644 --- a/docs/pages/product/caching.mdx +++ b/docs/pages/product/caching.mdx @@ -270,7 +270,7 @@ cache instead of being processed on the database or in Cube Store. This is the fastest query retrieval method, but it requires that the exact same query was run very recently. - **No cache.** This cache type indicates that the query was processed in the upstream -data source and was not accelrated using pre-aggregations. These queries could have +data source and was not accelerated using pre-aggregations. These queries could have a significant performance boost if pre-aggregations and Cube Store were utilized. In [Query History][ref-query-history] and throughout Cube Cloud, colored bolt diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 05d24b86f0a14..2eeafc495cdf1 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -5,7 +5,6 @@ #![feature(hash_set_entry)] #![feature(is_sorted)] #![feature(result_flattening)] -#![feature(extract_if)] // #![feature(trace_macros)] // trace_macros!(true); diff --git a/rust/cubestore/cubestore/src/metastore/listener.rs b/rust/cubestore/cubestore/src/metastore/listener.rs index cd2c53afea888..0a6a9fcee899b 100644 --- a/rust/cubestore/cubestore/src/metastore/listener.rs +++ b/rust/cubestore/cubestore/src/metastore/listener.rs @@ -9,28 +9,18 @@ use tokio::sync::Notify; #[async_trait] pub trait MetastoreListener: Send + Sync { - async fn wait_for_event( - &self, - event_fn: Box bool + Send + Sync>, - ) -> Result<(), CubeError>; + async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError>; } +pub type MetastoreListenerWaitFun = Box bool + Send + Sync>; pub struct MetastoreListenerImpl { event_receiver: Mutex>, - wait_fns: Mutex< - Vec<( - Arc, - Box bool + Send + Sync>, - )>, - >, + wait_fns: Mutex, MetastoreListenerWaitFun)>>, } #[async_trait] impl MetastoreListener for MetastoreListenerImpl { - async fn wait_for_event( - &self, - event_fn: Box bool + Send + Sync>, - ) -> Result<(), CubeError> { + async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> { let notify = Arc::new(Notify::new()); self.wait_fns.lock().await.push((notify.clone(), event_fn)); notify.notified().await; @@ -42,10 +32,7 @@ pub struct MockMetastoreListener; #[async_trait] impl MetastoreListener for MockMetastoreListener { - async fn wait_for_event( - &self, - _event_fn: Box bool + Send + Sync>, - ) -> Result<(), CubeError> { + async fn wait_for_event(&self, _event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> { Ok(()) } } @@ -67,7 +54,7 @@ impl MetastoreListenerImpl { pub async fn run_listener(&self) -> Result<(), CubeError> { loop { let event = self.event_receiver.lock().await.recv().await?; - let res = self.process_event(event.clone()).await; + let res = self.process_event(&event).await; if let Err(e) = res { error!("Error processing event {:?}: {}", event, e); } @@ -80,7 +67,7 @@ impl MetastoreListenerImpl { ) -> Result<(), CubeError> { loop { let event = self.event_receiver.lock().await.recv().await?; - let res = self.process_event(event.clone()).await; + let res = self.process_event(&event).await; if let Err(e) = res { error!("Error processing event {:?}: {}", event, e); } @@ -90,13 +77,22 @@ impl MetastoreListenerImpl { } } - async fn process_event(&self, event: MetaStoreEvent) -> Result<(), CubeError> { + async fn process_event(&self, event: &MetaStoreEvent) -> Result<(), CubeError> { let mut wait_fns = self.wait_fns.lock().await; - let to_notify = wait_fns - .extract_if(|(_, wait_fn)| wait_fn(event.clone())) - .collect::>(); + let mut to_notify = Vec::new(); + + wait_fns.retain(|(notify, wait_fn)| { + if wait_fn(event) { + to_notify.push(notify.clone()); + false + } else { + true + } + }); + + drop(wait_fns); - for (notify, _) in to_notify { + for notify in to_notify { notify.notify_waiters(); } diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 43fde1e356731..a94ff04a346e7 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -4954,6 +4954,11 @@ mod tests { assert_eq!(format_table_value!(s, name, String), "foo"); } + #[test] + fn test_structures_size() { + assert_eq!(std::mem::size_of::(), 680); + } + #[tokio::test] async fn schema_test() { let config = Config::test("schema_test"); diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 793cece76fab8..76c802c9d1119 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -2815,6 +2815,9 @@ mod tests { println!("All partitions: {:#?}", partitions); + // TODO API to wait for all jobs to be completed and all events processed + Delay::new(Duration::from_millis(500)).await; + let plans = service .plan_query("SELECT sum(num) from foo.numbers where num = 50") .await