Skip to content

[pull] master from cube-js:master #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions rust/cubesql/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Repository Overview

CubeSQL is a SQL proxy server that enables SQL-based access to Cube.js semantic layer. It emulates the PostgreSQL wire protocol, allowing standard SQL clients and BI tools to query Cube.js deployments as if they were traditional databases. Note: MySQL protocol support has been deprecated and is no longer available.

This is a Rust workspace containing three crates:
- **cubesql**: Main SQL proxy server with query compilation and protocol emulation
- **cubeclient**: Rust client library for Cube.js API communication
- **pg-srv**: PostgreSQL wire protocol server implementation

## Development Commands

### Prerequisites
```bash
# Install required Rust toolchain (1.84.1)
rustup update

# Install snapshot testing tool
cargo install cargo-insta
```

### Core Build Commands
```bash
# Build all workspace members
cargo build

# Build release version
cargo build --release

# Format code
cargo fmt

# Run linting (note: many clippy rules are disabled)
cargo clippy
```

### Running CubeSQL Server
```bash
# Run with required environment variables
CUBESQL_CUBE_URL=$CUBE_URL/cubejs-api \
CUBESQL_CUBE_TOKEN=$CUBE_TOKEN \
CUBESQL_LOG_LEVEL=debug \
CUBESQL_BIND_ADDR=0.0.0.0:4444 \
cargo run --bin cubesqld

# Connect via PostgreSQL client
psql -h 127.0.0.1 -p 4444 -U root
```

### Testing Commands
```bash
# Run all unit tests
cargo test

# Run specific test module
cargo test test_introspection
cargo test test_udfs

# Run integration tests (requires Cube.js instance)
cargo test --test e2e

# Review snapshot test changes
cargo insta review

# Run benchmarks
cargo bench
```

## Architecture Overview

### Query Processing Pipeline
1. **Protocol Layer**: Accepts PostgreSQL wire protocol connections
2. **SQL Parser**: Modified sqlparser-rs parses incoming SQL queries
3. **Query Rewriter**: egg-based rewrite engine transforms SQL to Cube.js queries
4. **Compilation**: Generates Cube.js REST API calls or DataFusion execution plans
5. **Execution**: DataFusion executes queries or proxies to Cube.js
6. **Result Formatting**: Converts results back to wire protocol format

### Key Components

#### cubesql crate structure:
- **`/compile`**: SQL compilation and query planning
- `/engine`: DataFusion integration and query execution
- `/rewrite`: egg-based query optimization rules
- **`/sql`**: Database protocol implementations
- `/postgres`: PostgreSQL system catalog emulation
- `/database_variables`: Variable system for PostgreSQL protocol
- **`/transport`**: Network transport and session management
- **`/config`**: Configuration and service initialization

#### Testing Approach:
- **Unit Tests**: Inline tests in source files using `#[cfg(test)]`
- **Integration Tests**: End-to-end tests in `/e2e` directory
- **Snapshot Tests**: Extensive use of `insta` for SQL compilation snapshots
- **BI Tool Tests**: Compatibility tests for Metabase, Tableau, PowerBI, etc.

### Important Implementation Details

1. **DataFusion Integration**: Uses forked Apache Arrow DataFusion for query execution
2. **Rewrite Rules**: Complex SQL transformations using egg e-graph library
3. **Protocol Emulation**: Implements enough of PostgreSQL protocol for BI tools
4. **System Catalogs**: Emulates pg_catalog (PostgreSQL)
5. **Variable Handling**: Supports SET/SHOW commands for protocol compatibility

## Common Development Tasks

### Adding New SQL Support
1. Add parsing support in `/compile/parser`
2. Create rewrite rules in `/compile/rewrite/rules`
3. Add tests with snapshot expectations
4. Update protocol-specific handling if needed

### Debugging Query Compilation
```bash
# Enable detailed logging
CUBESQL_LOG_LEVEL=trace cargo run --bin cubesqld

# Check rewrite traces in logs
# Look for "Rewrite" entries showing transformation steps
```

### Working with Snapshots
```bash
# After making changes that affect SQL compilation
cargo test
cargo insta review # Review and accept/reject changes
```

## Key Dependencies

- **DataFusion**: Query execution engine (forked version with custom modifications)
- **sqlparser-rs**: SQL parser (forked with CubeSQL-specific extensions)
- **egg**: E-graph library for query optimization
- **tokio**: Async runtime for network and I/O operations
- **pgwire**: PostgreSQL wire protocol implementation

## Important Notes

- This codebase uses heavily modified forks of DataFusion and sqlparser-rs
- Many clippy lints are disabled due to code generation and complex patterns
- Integration tests require a running Cube.js instance
- The rewrite engine is performance-critical and uses advanced optimization techniques
- Protocol compatibility is paramount for BI tool support
14 changes: 6 additions & 8 deletions rust/cubesql/cubesql/e2e/tests/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,19 +407,17 @@ impl PostgresIntegrationTestSuite {
Ok(())
}

async fn test_prepare(&self) -> RunResult<()> {
async fn test_prepare_autodetect(&self) -> RunResult<()> {
// Unknown variables will be detected as TEXT
// LIMIT has a typehint for i64
let stmt = self
.client
.prepare("SELECT $1 as t1, $2 as t2 LIMIT $3")
.await
.unwrap();
.await?;

self.client
.query(&stmt, &[&"test1", &"test2", &0_i64])
.await
.unwrap();
.await?;

Ok(())
}
Expand All @@ -444,9 +442,9 @@ impl PostgresIntegrationTestSuite {
}

async fn test_prepare_empty_query(&self) -> RunResult<()> {
let stmt = self.client.prepare("").await.unwrap();
let stmt = self.client.prepare("").await?;

self.client.query(&stmt, &[]).await.unwrap();
self.client.query(&stmt, &[]).await?;

Ok(())
}
Expand Down Expand Up @@ -1170,7 +1168,7 @@ impl AsyncTestSuite for PostgresIntegrationTestSuite {
async fn run(&mut self) -> RunResult<()> {
self.test_cancel_simple_query().await?;
self.test_cancel_execute_prepared().await?;
self.test_prepare().await?;
self.test_prepare_autodetect().await?;
self.test_extended_error().await?;
self.test_prepare_empty_query().await?;
self.test_stream_all().await?;
Expand Down
26 changes: 26 additions & 0 deletions rust/cubesql/pg-srv/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,26 @@ impl FromProtocolValue for bool {
}
}

impl FromProtocolValue for f64 {
fn from_text(raw: &[u8]) -> Result<Self, ProtocolError> {
let as_str = std::str::from_utf8(raw).map_err(|err| ProtocolError::ErrorResponse {
source: ErrorResponse::error(ErrorCode::ProtocolViolation, err.to_string()),
backtrace: Backtrace::capture(),
})?;

as_str
.parse::<f64>()
.map_err(|err| ProtocolError::ErrorResponse {
source: ErrorResponse::error(ErrorCode::ProtocolViolation, err.to_string()),
backtrace: Backtrace::capture(),
})
}

fn from_binary(raw: &[u8]) -> Result<Self, ProtocolError> {
Ok(BigEndian::read_f64(raw))
}
}

#[cfg(test)]
mod tests {
use crate::*;
Expand Down Expand Up @@ -132,6 +152,9 @@ mod tests {
assert_test_decode(false, Format::Text)?;
assert_test_decode(1_i64, Format::Text)?;
assert_test_decode(100_i64, Format::Text)?;
assert_test_decode(std::f64::consts::PI, Format::Text)?;
assert_test_decode(-std::f64::consts::E, Format::Text)?;
assert_test_decode(0.0_f64, Format::Text)?;

Ok(())
}
Expand All @@ -143,6 +166,9 @@ mod tests {
assert_test_decode(false, Format::Binary)?;
assert_test_decode(1_i64, Format::Binary)?;
assert_test_decode(100_i64, Format::Binary)?;
assert_test_decode(std::f64::consts::PI, Format::Binary)?;
assert_test_decode(-std::f64::consts::E, Format::Binary)?;
assert_test_decode(0.0_f64, Format::Binary)?;

Ok(())
}
Expand Down
62 changes: 62 additions & 0 deletions rust/cubesql/pg-srv/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,9 @@ impl Bind {
PgTypeId::INT8 => {
BindValue::Int64(i64::from_protocol(raw_value, param_format)?)
}
PgTypeId::FLOAT8 => {
BindValue::Float64(f64::from_protocol(raw_value, param_format)?)
}
_ => {
return Err(ErrorResponse::error(
ErrorCode::FeatureNotSupported,
Expand Down Expand Up @@ -1279,6 +1282,65 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_frontend_message_parse_bind_float64() -> Result<(), ProtocolError> {
// Test text format float64
let buffer = parse_hex_dump(
r#"
42 00 00 00 1a 00 73 30 00 00 01 00 00 00 01 00 B.....s0........
00 00 05 32 36 2e 31 31 00 00 00 00 ...26.11....
"#
.to_string(),
);
let mut cursor = Cursor::new(buffer);

let message = read_message(&mut cursor, MessageTagParserDefaultImpl::with_arc()).await?;
match message {
FrontendMessage::Bind(body) => {
assert_eq!(
body,
Bind {
portal: "".to_string(),
statement: "s0".to_string(),
parameter_formats: vec![Format::Text],
parameter_values: vec![Some(vec![50, 54, 46, 49, 49])], // "26.11"
result_formats: vec![]
},
);

assert_eq!(
body.to_bind_values(&ParameterDescription::new(vec![PgTypeId::FLOAT8]))?,
vec![BindValue::Float64(26.11)]
);
}
_ => panic!("Wrong message, must be Bind"),
}

// Test binary format float64
let buffer = parse_hex_dump(
r#"
42 00 00 00 1e 00 73 30 00 00 01 00 01 00 01 00 B.....s0........
00 00 08 40 3a 1c 28 f5 c2 8f 5c 00 00 00 00 ...@:.(....\...
"#
.to_string(),
);
let mut cursor = Cursor::new(buffer);

let message = read_message(&mut cursor, MessageTagParserDefaultImpl::with_arc()).await?;
match message {
FrontendMessage::Bind(body) => {
assert_eq!(body.parameter_formats, vec![Format::Binary]);
assert_eq!(
body.to_bind_values(&ParameterDescription::new(vec![PgTypeId::FLOAT8]))?,
vec![BindValue::Float64(26.11)]
);
}
_ => panic!("Wrong message, must be Bind"),
}

Ok(())
}

#[tokio::test]
async fn test_frontend_message_parse_describe() -> Result<(), ProtocolError> {
let buffer = parse_hex_dump(
Expand Down
Loading