Skip to content

[pull] master from cube-js:master #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/pages/product/apis-integrations/rest-api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data applications, including but not limited to the following ones:
Often, the REST API is used to enable [embedded analytics][cube-ea] and
[real-time analytics][cube-rta] use cases.

See [REST API reference][ref-ref-rest-api] for the list of supported API endpoinsts.
See [REST API reference][ref-ref-rest-api] for the list of supported API endpoints.
Also, check [query format][ref-rest-query-format] for details about query syntax.

<InfoBox>
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/product/caching.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ cache instead of being processed on the database or in Cube Store. This is the
fastest query retrieval method, but it requires that the exact same query was
run very recently.
- **No cache.** This cache type indicates that the query was processed in the upstream
data source and was not accelrated using pre-aggregations. These queries could have
data source and was not accelerated using pre-aggregations. These queries could have
a significant performance boost if pre-aggregations and Cube Store were utilized.

In [Query History][ref-query-history] and throughout Cube Cloud, colored bolt
Expand Down
1 change: 0 additions & 1 deletion rust/cubestore/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#![feature(hash_set_entry)]
#![feature(is_sorted)]
#![feature(result_flattening)]
#![feature(extract_if)]
// #![feature(trace_macros)]

// trace_macros!(true);
Expand Down
46 changes: 21 additions & 25 deletions rust/cubestore/cubestore/src/metastore/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,18 @@ use tokio::sync::Notify;

#[async_trait]
pub trait MetastoreListener: Send + Sync {
async fn wait_for_event(
&self,
event_fn: Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
) -> Result<(), CubeError>;
async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError>;
}

pub type MetastoreListenerWaitFun = Box<dyn Fn(&MetaStoreEvent) -> bool + Send + Sync>;
pub struct MetastoreListenerImpl {
event_receiver: Mutex<Receiver<MetaStoreEvent>>,
wait_fns: Mutex<
Vec<(
Arc<Notify>,
Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
)>,
>,
wait_fns: Mutex<Vec<(Arc<Notify>, MetastoreListenerWaitFun)>>,
}

#[async_trait]
impl MetastoreListener for MetastoreListenerImpl {
async fn wait_for_event(
&self,
event_fn: Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
) -> Result<(), CubeError> {
async fn wait_for_event(&self, event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> {
let notify = Arc::new(Notify::new());
self.wait_fns.lock().await.push((notify.clone(), event_fn));
notify.notified().await;
Expand All @@ -42,10 +32,7 @@ pub struct MockMetastoreListener;

#[async_trait]
impl MetastoreListener for MockMetastoreListener {
async fn wait_for_event(
&self,
_event_fn: Box<dyn Fn(MetaStoreEvent) -> bool + Send + Sync>,
) -> Result<(), CubeError> {
async fn wait_for_event(&self, _event_fn: MetastoreListenerWaitFun) -> Result<(), CubeError> {
Ok(())
}
}
Expand All @@ -67,7 +54,7 @@ impl MetastoreListenerImpl {
pub async fn run_listener(&self) -> Result<(), CubeError> {
loop {
let event = self.event_receiver.lock().await.recv().await?;
let res = self.process_event(event.clone()).await;
let res = self.process_event(&event).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
Expand All @@ -80,7 +67,7 @@ impl MetastoreListenerImpl {
) -> Result<(), CubeError> {
loop {
let event = self.event_receiver.lock().await.recv().await?;
let res = self.process_event(event.clone()).await;
let res = self.process_event(&event).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
Expand All @@ -90,13 +77,22 @@ impl MetastoreListenerImpl {
}
}

async fn process_event(&self, event: MetaStoreEvent) -> Result<(), CubeError> {
async fn process_event(&self, event: &MetaStoreEvent) -> Result<(), CubeError> {
let mut wait_fns = self.wait_fns.lock().await;
let to_notify = wait_fns
.extract_if(|(_, wait_fn)| wait_fn(event.clone()))
.collect::<Vec<_>>();
let mut to_notify = Vec::new();

wait_fns.retain(|(notify, wait_fn)| {
if wait_fn(event) {
to_notify.push(notify.clone());
false
} else {
true
}
});

drop(wait_fns);

for (notify, _) in to_notify {
for notify in to_notify {
notify.notify_waiters();
}

Expand Down
5 changes: 5 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4954,6 +4954,11 @@ mod tests {
assert_eq!(format_table_value!(s, name, String), "foo");
}

#[test]
fn test_structures_size() {
assert_eq!(std::mem::size_of::<MetaStoreEvent>(), 680);
}

#[tokio::test]
async fn schema_test() {
let config = Config::test("schema_test");
Expand Down
3 changes: 3 additions & 0 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,9 @@ mod tests {

println!("All partitions: {:#?}", partitions);

// TODO API to wait for all jobs to be completed and all events processed
Delay::new(Duration::from_millis(500)).await;

let plans = service
.plan_query("SELECT sum(num) from foo.numbers where num = 50")
.await
Expand Down
Loading