Introduction
In this blog post, I’ll walk you through a real-time crypto data pipeline I built using a powerful stack:
🔹 PostgreSQL for structured storage and a data source
🔹 Debezium + Confluent Kafka for Change Data Capture (CDC)
🔹 Apache Spark for stream processing
🔹 Apache Cassandra for fast NoSQL storage hosted on an Azure Ubuntu virtual Machine used as a data sink.
This architecture enables near real-time data extraction and transformation, ideal for live dashboards, analytics, distributed systems, and downstream uses e.g., building of machine learning models.
This project is divided into four phases:
Phase 1.
Extracting data from the Binance API endpoints, transforming and loading the data into a PostgreSQL database using Python scripts.
Phase 2.
Using a Debezium Source connector to connect into the PostgreSQL database to a Kafka topic to store change logs
Phase 3.
Using a Spark Streaming job to act as a sink to send our replicated data from the Kafka topics into a Cassandra keyspace binance_test
Phase 4.
Visualizing the data in Apache Cassandra on a Grafana dashboard using a PostgreSQL bridge to pull data from Cassandra.
For more technical details on the project, check the linked GitHub repository.
What is Change Data Capture (CDC)?
Change Data Capture (CDC) is a technique used in data pipelines to identify and track changes made to data in a database. It captures these changes, INSERT
, UPDATE
, and DELETE
, and makes them available for use in other systems, often in real-time or near real-time. CDC helps improve data efficiency and consistency across systems by only replicating the changed data, rather than the entire dataset.
There are three types of CDC mechanisms namely:
-
Log-based CDC: This captures changes by reading the database’s transaction logs, which are used internally to ensure durability and consistency. For example, PostgreSQL uses Write-Ahead Logs (WALs) to record all changes before they're applied to the actual database. Tools like Debezium tap into these logs using logical decoding and convert them into structured JSON messages. These include:
-
before
: The state of the record before the change. -
after
: The new state after the change. -
source
: Metadata about the origin of the change e.g., database, table, LSN, timestamp.
-
An example of a log captured from Postgres into a Kafka Topic via a Debezium connector in JSON:
{
"before": null,
"after": {
"id": 101,
"symbol": "BTCUSDT",
"price": "61550.00",
"time": 1724822831000
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "binance",
"ts_ms": 1724822831200,
"snapshot": "false",
"db": "crypto_db",
"sequence": "[null,\"240300112\"]",
"schema": "binance",
"table": "latest_prices",
"txId": 78321,
"lsn": 240300112,
"xmin": null
},
"op": "c",
"ts_ms": 1724822831210,
"transaction": null
}
-
Query-based CDC: This method involves periodically querying the source database to identify changes, usually by comparing the last updated timestamps or using
ROWVERSION
fields. Below is an example:
SELECT * FROM orders WHERE updated_at > '2025-07-02 10:00:00';
- Trigger-based CDC: This method uses database triggers to detect changes and write them to an audit/log table. An example includes:
CREATE TRIGGER audit_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION log_user_changes();
log_user_changes()
would write the old and new row values to a separate user_audit_log
table which will store the changes.
Project Requirements
This project requires the following:
python 3.x
pyspark 3.5.x
cassandra-driver
confluent-kafka
sqlalchemy
psycopg2-binary
- Confluent Cloud cluster
- Debezium's PostgresCDCConnectorV2 connector, available on Confluent Cloud
- Microsoft Azure Virtual Machine to host our Apache Cassandra. (You can host it locally if you want to)
Project Workflow
Project Walkthrough
Phase 1.
The Python script extracts and transforms 5 types of data from the Binance REST API endpoints:
Recent Trades
Order Book Depth
30-minute Klines / Candlesticks
24h Daily Ticker prices
Latest Prices
Here's an example of the extraction, transformation and loading of the latest_prices
prices data:
def get_latest_prices(BASE_URL, symbols, DB_URI):
coin_data = []
url = f"{BASE_URL}/api/v3/ticker/price"
for s in symbols:
params = {'symbol': s}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
coin_data.append({
'symbol': s,
'price': data["price"],
'time': int(time.time() * 1000)
})
else:
print(f"Error fetching recent trades: {response.status_code}, {response.text}")
df = pd.DataFrame(coin_data)
df["price"] = df["price"].astype(float)
df["time"] = pd.to_datetime(df["time"], unit='ms')
try:
engine = create_engine(DB_URI)
df.to_sql('latest_prices', con=engine, schema='binance', index=False, if_exists='append')
print("Latest price data loaded successfully!")
except Exception as e:
print(f"Loading latest prices to db: {e}")
After transformation, it loads the data into a PostgreSQL database's Binance
schema. The PostgreSQL tables created are:
binance.recent_trades
binance.order_book
binance.klines
binance.daily_ticker
binance.latest_prices
Phase 2
Head over to Confluent Cloud and if you do not have an account, please create a free account to claim free $400 credits and create an environment and cluster. When prompted to add a payment method.
After creating a cluster, on the left-hand side, select Connectors and search for Postgres CDC Source V2 (Debezium) connector and start the setup.
Here's the connector receiving data from the Postgres database and storing it into Kafka topics.
Phase 3
A script containing a Spark Streaming job that acts as a data sink into Apache Cassandra.
It consumes the logs from the various Kafka topics into JSON format, flattens after, source, op, ts_ms, and transaction fields, transforms them, and loads them into Cassandra hosted on an Azure Ubuntu VM
The data received in Cassandra is as follows for the klines
table:
Phase 4.
After streaming data into Cassandra, we need to visualize this data on a Grafana dashboard to derive insights from the data.
For this project, I have used Grafana Cloud. There is a plugin that is used as a data source, but it only supports Cassandra 3.0.0 and older, and I am running 4.1.9 hence coming into some issues with connecting Grafana
Hence, I built a simple Postgres-Cassandra bridge to send data to a PostgreSQL database which Grafana natively supports to draw insights from the data.
NOTE: By default, Cassandra comes with an authenticator and authorizer set to AllowAllAuthenticator, which disables authentication and allows any user to access Cassandra's shell
cqlsh
. Remove the default configuration, and restart the Cassandra service as so:
sudo nano /etc/cassandra/cassandra.yaml # Cassandra's config file
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer
Then exit the editor and run the following comand to restart the Cassandra service:
sudo systemctl restart cassandra
Insights gained from the data
The dashboards reveal several valuable insights into crypto trading patterns. The first chart shows that XRPUSDT, AVAXUSDT, BTCUSDT, and DOGEUSDT are among the most frequently traded symbols in the trading day, indicating high liquidity or market interest.
The ETHUSDT volume trend indicates significant fluctuations, with notable spikes between 13:00 and 14:00, suggesting peak trading hours possibly influenced by market news or institutional actions.
The candlestick chart for BTCUSDT displays hourly open, high, low, and close prices, offering a useful tool for technical analysis and identifying market trends or reversal patterns.
Additionally, the quantity of coins ordered throughout the trading day highlights DOGEUSDT and ADAUSDT as dominant in trade volume, with DOGE exceeding 270,000 units, likely due to its lower price and retail trader appeal. Overall, the visualizations suggest strong market activity around a few key coins, with volume and price data providing critical insights for real-time monitoring, trading strategy, or machine learning applications.
Conclusion
This project was a rewarding journey through the world of real-time data engineering. It demonstrates how Change Data Capture can save time and resources while updating our databases for streaming applications and data pipelines. This stack can easily be used for downstream use cases e.g. data analytics or even real-time ML predictions for crypto coins.
Do you have any questions? Don't forget to like, comment and follow for more data engineering tutorials and content!
Top comments (0)