Skip to content

Commit 6465961

Browse files
authored
chore(cubestore): Metrics for worker pool errors (cube-js#9835)
1 parent 704eb5e commit 6465961

File tree

4 files changed

+42
-0
lines changed

4 files changed

+42
-0
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ use crate::util::metrics::{Counter, Gauge, Histogram};
77
/// The number of process startups.
88
pub static STARTUPS: Counter = metrics::counter("cs.startup");
99

10+
/// Errors in IPC.
11+
pub static WORKER_POOL_ERROR: Counter = metrics::counter("cs.worker_pool.errors");
12+
1013
/// Incoming SQL queries that do data reads.
1114
pub static DATA_QUERIES: Counter = metrics::counter("cs.sql.query.data");
1215
pub static DATA_QUERIES_CACHE_HIT: Counter = metrics::counter("cs.sql.query.data.cache.hit");

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,10 @@ impl WorkerProcessing for WorkerProcessor {
363363
.ok()
364364
.unwrap_or("--sel-worker".to_string())
365365
}
366+
367+
fn process_type() -> String {
368+
"sel-worker".to_string()
369+
}
366370
}
367371

368372
#[cfg(not(target_os = "windows"))]

rust/cubestore/cubestore/src/cluster/worker_pool.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::process::{Child, ExitStatus};
55
use std::sync::Arc;
66
use std::time::Duration;
77

8+
use crate::app_metrics;
89
use crate::util::cancellation_token_guard::CancellationGuard;
10+
use crate::util::metrics;
911
use deadqueue::unlimited;
1012
use futures::future::join_all;
1113
use ipc_channel::ipc;
@@ -266,6 +268,13 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
266268
Ok((res, a, r)) => {
267269
if sender.send(Ok(res)).is_err() {
268270
error!("Error during worker message processing: Send Error");
271+
app_metrics::WORKER_POOL_ERROR.add_with_tags(
272+
1,
273+
Some(&vec![
274+
metrics::format_tag("subprocess_type", &P::process_type()),
275+
metrics::format_tag("error_type", "send"),
276+
]),
277+
);
269278
}
270279
args_channel = Some((a, r));
271280
}
@@ -304,6 +313,22 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
304313
> {
305314
args_tx.send(message)?;
306315
let (res, res_rx) = cube_ext::spawn_blocking(move || (res_rx.recv(), res_rx)).await?;
316+
317+
if let Err(ipc_err) = res {
318+
app_metrics::WORKER_POOL_ERROR.add_with_tags(
319+
1,
320+
Some(&vec![
321+
metrics::format_tag("subprocess_type", &P::process_type()),
322+
metrics::format_tag("error_type", "receive"),
323+
]),
324+
);
325+
return Err(CubeError::internal(format!(
326+
"Failed to receive response from subprocess {}: {}",
327+
P::process_titile(),
328+
ipc_err
329+
)));
330+
}
331+
307332
Ok((res??, args_tx, res_rx))
308333
}
309334

@@ -546,6 +571,10 @@ mod tests {
546571
fn process_titile() -> String {
547572
"--sel-worker".to_string()
548573
}
574+
575+
fn process_type() -> String {
576+
"sel-worker".to_string()
577+
}
549578
}
550579

551580
type Transport = DefaultServicesTransport<DefaultServicesServerProcessor>;
@@ -758,6 +787,10 @@ mod tests {
758787
fn process_titile() -> String {
759788
"--sel-worker".to_string()
760789
}
790+
791+
fn process_type() -> String {
792+
"sel-worker".to_string()
793+
}
761794
}
762795

763796
type ServTransport = DefaultServicesTransport<TestServicesServerProcessor>;

rust/cubestore/cubestore/src/cluster/worker_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub trait WorkerProcessing: Send + Sync + 'static {
5454
fn is_single_job_process() -> bool;
5555

5656
fn process_titile() -> String;
57+
58+
fn process_type() -> String;
5759
}
5860

5961
pub trait ServicesTransport {

0 commit comments

Comments
 (0)