diff --git a/rust/cubestore/cubestore/src/app_metrics.rs b/rust/cubestore/cubestore/src/app_metrics.rs index c4d19075f4fba..b321db25fa833 100644 --- a/rust/cubestore/cubestore/src/app_metrics.rs +++ b/rust/cubestore/cubestore/src/app_metrics.rs @@ -7,6 +7,9 @@ use crate::util::metrics::{Counter, Gauge, Histogram}; /// The number of process startups. pub static STARTUPS: Counter = metrics::counter("cs.startup"); +/// Errors in IPC. +pub static WORKER_POOL_ERROR: Counter = metrics::counter("cs.worker_pool.errors"); + /// Incoming SQL queries that do data reads. pub static DATA_QUERIES: Counter = metrics::counter("cs.sql.query.data"); pub static DATA_QUERIES_CACHE_HIT: Counter = metrics::counter("cs.sql.query.data.cache.hit"); diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index 77bc6c72b8e8e..bb55b0de0c14b 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -363,6 +363,10 @@ impl WorkerProcessing for WorkerProcessor { .ok() .unwrap_or("--sel-worker".to_string()) } + + fn process_type() -> String { + "sel-worker".to_string() + } } #[cfg(not(target_os = "windows"))] diff --git a/rust/cubestore/cubestore/src/cluster/worker_pool.rs b/rust/cubestore/cubestore/src/cluster/worker_pool.rs index edc7b3f6a2326..23b3519b4ecb2 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_pool.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_pool.rs @@ -5,7 +5,9 @@ use std::process::{Child, ExitStatus}; use std::sync::Arc; use std::time::Duration; +use crate::app_metrics; use crate::util::cancellation_token_guard::CancellationGuard; +use crate::util::metrics; use deadqueue::unlimited; use futures::future::join_all; use ipc_channel::ipc; @@ -266,6 +268,13 @@ impl WorkerProcess { if sender.send(Ok(res)).is_err() { error!("Error during worker message processing: Send Error"); + app_metrics::WORKER_POOL_ERROR.add_with_tags( + 1, + Some(&vec![ + metrics::format_tag("subprocess_type", &P::process_type()), + metrics::format_tag("error_type", "send"), + ]), + ); } args_channel = Some((a, r)); } @@ -304,6 +313,22 @@ impl WorkerProcess { args_tx.send(message)?; let (res, res_rx) = cube_ext::spawn_blocking(move || (res_rx.recv(), res_rx)).await?; + + if let Err(ipc_err) = res { + app_metrics::WORKER_POOL_ERROR.add_with_tags( + 1, + Some(&vec![ + metrics::format_tag("subprocess_type", &P::process_type()), + metrics::format_tag("error_type", "receive"), + ]), + ); + return Err(CubeError::internal(format!( + "Failed to receive response from subprocess {}: {}", + P::process_titile(), + ipc_err + ))); + } + Ok((res??, args_tx, res_rx)) } @@ -546,6 +571,10 @@ mod tests { fn process_titile() -> String { "--sel-worker".to_string() } + + fn process_type() -> String { + "sel-worker".to_string() + } } type Transport = DefaultServicesTransport; @@ -758,6 +787,10 @@ mod tests { fn process_titile() -> String { "--sel-worker".to_string() } + + fn process_type() -> String { + "sel-worker".to_string() + } } type ServTransport = DefaultServicesTransport; diff --git a/rust/cubestore/cubestore/src/cluster/worker_services.rs b/rust/cubestore/cubestore/src/cluster/worker_services.rs index e3ead5ec56a80..945304e57b005 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_services.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_services.rs @@ -54,6 +54,8 @@ pub trait WorkerProcessing: Send + Sync + 'static { fn is_single_job_process() -> bool; fn process_titile() -> String; + + fn process_type() -> String; } pub trait ServicesTransport {