From 760640cadfa8e6a6728608de2cc7d71431fe93ce Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 4 Aug 2025 14:32:02 +0200 Subject: [PATCH] feat(cubesql): Allow to bind float64 (support in pg-srv) (#9846) Implements float64 (FLOAT8) parameter binding in the PostgreSQL wire protocol server (pg-srv). This allows SQL clients to bind floating-point parameters in prepared statements using both text and binary formats. Changes: - Add f64 decoding support in pg-srv for both text and binary wire formats - Handle FLOAT8 type in Bind message processing - Add comprehensive tests for float64 binding in both formats - Rename test_prepare to test_prepare_autodetect for clarity This enhancement improves SQL client compatibility by supporting floating-point parameters. In prepared statements, which are commonly used by BI tools and database drivers. --- rust/cubesql/CLAUDE.md | 146 +++++++++++++++++++++ rust/cubesql/cubesql/e2e/tests/postgres.rs | 14 +- rust/cubesql/pg-srv/src/decoding.rs | 26 ++++ rust/cubesql/pg-srv/src/protocol.rs | 62 +++++++++ 4 files changed, 240 insertions(+), 8 deletions(-) create mode 100644 rust/cubesql/CLAUDE.md diff --git a/rust/cubesql/CLAUDE.md b/rust/cubesql/CLAUDE.md new file mode 100644 index 0000000000000..759869ca9334f --- /dev/null +++ b/rust/cubesql/CLAUDE.md @@ -0,0 +1,146 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Repository Overview + +CubeSQL is a SQL proxy server that enables SQL-based access to Cube.js semantic layer. It emulates the PostgreSQL wire protocol, allowing standard SQL clients and BI tools to query Cube.js deployments as if they were traditional databases. Note: MySQL protocol support has been deprecated and is no longer available. + +This is a Rust workspace containing three crates: +- **cubesql**: Main SQL proxy server with query compilation and protocol emulation +- **cubeclient**: Rust client library for Cube.js API communication +- **pg-srv**: PostgreSQL wire protocol server implementation + +## Development Commands + +### Prerequisites +```bash +# Install required Rust toolchain (1.84.1) +rustup update + +# Install snapshot testing tool +cargo install cargo-insta +``` + +### Core Build Commands +```bash +# Build all workspace members +cargo build + +# Build release version +cargo build --release + +# Format code +cargo fmt + +# Run linting (note: many clippy rules are disabled) +cargo clippy +``` + +### Running CubeSQL Server +```bash +# Run with required environment variables +CUBESQL_CUBE_URL=$CUBE_URL/cubejs-api \ +CUBESQL_CUBE_TOKEN=$CUBE_TOKEN \ +CUBESQL_LOG_LEVEL=debug \ +CUBESQL_BIND_ADDR=0.0.0.0:4444 \ +cargo run --bin cubesqld + +# Connect via PostgreSQL client +psql -h 127.0.0.1 -p 4444 -U root +``` + +### Testing Commands +```bash +# Run all unit tests +cargo test + +# Run specific test module +cargo test test_introspection +cargo test test_udfs + +# Run integration tests (requires Cube.js instance) +cargo test --test e2e + +# Review snapshot test changes +cargo insta review + +# Run benchmarks +cargo bench +``` + +## Architecture Overview + +### Query Processing Pipeline +1. **Protocol Layer**: Accepts PostgreSQL wire protocol connections +2. **SQL Parser**: Modified sqlparser-rs parses incoming SQL queries +3. **Query Rewriter**: egg-based rewrite engine transforms SQL to Cube.js queries +4. **Compilation**: Generates Cube.js REST API calls or DataFusion execution plans +5. **Execution**: DataFusion executes queries or proxies to Cube.js +6. **Result Formatting**: Converts results back to wire protocol format + +### Key Components + +#### cubesql crate structure: +- **`/compile`**: SQL compilation and query planning + - `/engine`: DataFusion integration and query execution + - `/rewrite`: egg-based query optimization rules +- **`/sql`**: Database protocol implementations + - `/postgres`: PostgreSQL system catalog emulation + - `/database_variables`: Variable system for PostgreSQL protocol +- **`/transport`**: Network transport and session management +- **`/config`**: Configuration and service initialization + +#### Testing Approach: +- **Unit Tests**: Inline tests in source files using `#[cfg(test)]` +- **Integration Tests**: End-to-end tests in `/e2e` directory +- **Snapshot Tests**: Extensive use of `insta` for SQL compilation snapshots +- **BI Tool Tests**: Compatibility tests for Metabase, Tableau, PowerBI, etc. + +### Important Implementation Details + +1. **DataFusion Integration**: Uses forked Apache Arrow DataFusion for query execution +2. **Rewrite Rules**: Complex SQL transformations using egg e-graph library +3. **Protocol Emulation**: Implements enough of PostgreSQL protocol for BI tools +4. **System Catalogs**: Emulates pg_catalog (PostgreSQL) +5. **Variable Handling**: Supports SET/SHOW commands for protocol compatibility + +## Common Development Tasks + +### Adding New SQL Support +1. Add parsing support in `/compile/parser` +2. Create rewrite rules in `/compile/rewrite/rules` +3. Add tests with snapshot expectations +4. Update protocol-specific handling if needed + +### Debugging Query Compilation +```bash +# Enable detailed logging +CUBESQL_LOG_LEVEL=trace cargo run --bin cubesqld + +# Check rewrite traces in logs +# Look for "Rewrite" entries showing transformation steps +``` + +### Working with Snapshots +```bash +# After making changes that affect SQL compilation +cargo test +cargo insta review # Review and accept/reject changes +``` + +## Key Dependencies + +- **DataFusion**: Query execution engine (forked version with custom modifications) +- **sqlparser-rs**: SQL parser (forked with CubeSQL-specific extensions) +- **egg**: E-graph library for query optimization +- **tokio**: Async runtime for network and I/O operations +- **pgwire**: PostgreSQL wire protocol implementation + +## Important Notes + +- This codebase uses heavily modified forks of DataFusion and sqlparser-rs +- Many clippy lints are disabled due to code generation and complex patterns +- Integration tests require a running Cube.js instance +- The rewrite engine is performance-critical and uses advanced optimization techniques +- Protocol compatibility is paramount for BI tool support \ No newline at end of file diff --git a/rust/cubesql/cubesql/e2e/tests/postgres.rs b/rust/cubesql/cubesql/e2e/tests/postgres.rs index c70237b889074..46987943f5e0f 100644 --- a/rust/cubesql/cubesql/e2e/tests/postgres.rs +++ b/rust/cubesql/cubesql/e2e/tests/postgres.rs @@ -407,19 +407,17 @@ impl PostgresIntegrationTestSuite { Ok(()) } - async fn test_prepare(&self) -> RunResult<()> { + async fn test_prepare_autodetect(&self) -> RunResult<()> { // Unknown variables will be detected as TEXT // LIMIT has a typehint for i64 let stmt = self .client .prepare("SELECT $1 as t1, $2 as t2 LIMIT $3") - .await - .unwrap(); + .await?; self.client .query(&stmt, &[&"test1", &"test2", &0_i64]) - .await - .unwrap(); + .await?; Ok(()) } @@ -444,9 +442,9 @@ impl PostgresIntegrationTestSuite { } async fn test_prepare_empty_query(&self) -> RunResult<()> { - let stmt = self.client.prepare("").await.unwrap(); + let stmt = self.client.prepare("").await?; - self.client.query(&stmt, &[]).await.unwrap(); + self.client.query(&stmt, &[]).await?; Ok(()) } @@ -1170,7 +1168,7 @@ impl AsyncTestSuite for PostgresIntegrationTestSuite { async fn run(&mut self) -> RunResult<()> { self.test_cancel_simple_query().await?; self.test_cancel_execute_prepared().await?; - self.test_prepare().await?; + self.test_prepare_autodetect().await?; self.test_extended_error().await?; self.test_prepare_empty_query().await?; self.test_stream_all().await?; diff --git a/rust/cubesql/pg-srv/src/decoding.rs b/rust/cubesql/pg-srv/src/decoding.rs index 94c565d63013f..afbbc5948ae03 100644 --- a/rust/cubesql/pg-srv/src/decoding.rs +++ b/rust/cubesql/pg-srv/src/decoding.rs @@ -102,6 +102,26 @@ impl FromProtocolValue for bool { } } +impl FromProtocolValue for f64 { + fn from_text(raw: &[u8]) -> Result { + let as_str = std::str::from_utf8(raw).map_err(|err| ProtocolError::ErrorResponse { + source: ErrorResponse::error(ErrorCode::ProtocolViolation, err.to_string()), + backtrace: Backtrace::capture(), + })?; + + as_str + .parse::() + .map_err(|err| ProtocolError::ErrorResponse { + source: ErrorResponse::error(ErrorCode::ProtocolViolation, err.to_string()), + backtrace: Backtrace::capture(), + }) + } + + fn from_binary(raw: &[u8]) -> Result { + Ok(BigEndian::read_f64(raw)) + } +} + #[cfg(test)] mod tests { use crate::*; @@ -132,6 +152,9 @@ mod tests { assert_test_decode(false, Format::Text)?; assert_test_decode(1_i64, Format::Text)?; assert_test_decode(100_i64, Format::Text)?; + assert_test_decode(std::f64::consts::PI, Format::Text)?; + assert_test_decode(-std::f64::consts::E, Format::Text)?; + assert_test_decode(0.0_f64, Format::Text)?; Ok(()) } @@ -143,6 +166,9 @@ mod tests { assert_test_decode(false, Format::Binary)?; assert_test_decode(1_i64, Format::Binary)?; assert_test_decode(100_i64, Format::Binary)?; + assert_test_decode(std::f64::consts::PI, Format::Binary)?; + assert_test_decode(-std::f64::consts::E, Format::Binary)?; + assert_test_decode(0.0_f64, Format::Binary)?; Ok(()) } diff --git a/rust/cubesql/pg-srv/src/protocol.rs b/rust/cubesql/pg-srv/src/protocol.rs index 5b4755477ae8d..cd9ff1f609b01 100644 --- a/rust/cubesql/pg-srv/src/protocol.rs +++ b/rust/cubesql/pg-srv/src/protocol.rs @@ -783,6 +783,9 @@ impl Bind { PgTypeId::INT8 => { BindValue::Int64(i64::from_protocol(raw_value, param_format)?) } + PgTypeId::FLOAT8 => { + BindValue::Float64(f64::from_protocol(raw_value, param_format)?) + } _ => { return Err(ErrorResponse::error( ErrorCode::FeatureNotSupported, @@ -1279,6 +1282,65 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_frontend_message_parse_bind_float64() -> Result<(), ProtocolError> { + // Test text format float64 + let buffer = parse_hex_dump( + r#" + 42 00 00 00 1a 00 73 30 00 00 01 00 00 00 01 00 B.....s0........ + 00 00 05 32 36 2e 31 31 00 00 00 00 ...26.11.... + "# + .to_string(), + ); + let mut cursor = Cursor::new(buffer); + + let message = read_message(&mut cursor, MessageTagParserDefaultImpl::with_arc()).await?; + match message { + FrontendMessage::Bind(body) => { + assert_eq!( + body, + Bind { + portal: "".to_string(), + statement: "s0".to_string(), + parameter_formats: vec![Format::Text], + parameter_values: vec![Some(vec![50, 54, 46, 49, 49])], // "26.11" + result_formats: vec![] + }, + ); + + assert_eq!( + body.to_bind_values(&ParameterDescription::new(vec![PgTypeId::FLOAT8]))?, + vec![BindValue::Float64(26.11)] + ); + } + _ => panic!("Wrong message, must be Bind"), + } + + // Test binary format float64 + let buffer = parse_hex_dump( + r#" + 42 00 00 00 1e 00 73 30 00 00 01 00 01 00 01 00 B.....s0........ + 00 00 08 40 3a 1c 28 f5 c2 8f 5c 00 00 00 00 ...@:.(....\... + "# + .to_string(), + ); + let mut cursor = Cursor::new(buffer); + + let message = read_message(&mut cursor, MessageTagParserDefaultImpl::with_arc()).await?; + match message { + FrontendMessage::Bind(body) => { + assert_eq!(body.parameter_formats, vec![Format::Binary]); + assert_eq!( + body.to_bind_values(&ParameterDescription::new(vec![PgTypeId::FLOAT8]))?, + vec![BindValue::Float64(26.11)] + ); + } + _ => panic!("Wrong message, must be Bind"), + } + + Ok(()) + } + #[tokio::test] async fn test_frontend_message_parse_describe() -> Result<(), ProtocolError> { let buffer = parse_hex_dump(