DEV Community

Cover image for Building a Real-Time Crypto Pipeline with Binance APIs, PostgreSQL, Debezium, Kafka, Spark & Cassandra
Denzel Kanyeki
Denzel Kanyeki

Posted on

Building a Real-Time Crypto Pipeline with Binance APIs, PostgreSQL, Debezium, Kafka, Spark & Cassandra

Introduction

In this blog post, I’ll walk you through a real-time crypto data pipeline I built using a powerful stack:

🔹 PostgreSQL for structured storage and a data source
🔹 Debezium + Confluent Kafka for Change Data Capture (CDC)

🔹 Apache Spark for stream processing

🔹 Apache Cassandra for fast NoSQL storage hosted on an Azure Ubuntu virtual Machine used as a data sink.

This architecture enables near real-time data extraction and transformation, ideal for live dashboards, analytics, distributed systems, and downstream uses e.g., building of machine learning models.

This project is divided into four phases:

Phase 1.

Extracting data from the Binance API endpoints, transforming and loading the data into a PostgreSQL database using Python scripts.

Phase 2.

Using a Debezium Source connector to connect into the PostgreSQL database to a Kafka topic to store change logs

Phase 3.

Using a Spark Streaming job to act as a sink to send our replicated data from the Kafka topics into a Cassandra keyspace binance_test

Phase 4.

Visualizing the data in Apache Cassandra on a Grafana dashboard using a PostgreSQL bridge to pull data from Cassandra.

For more technical details on the project, check the linked GitHub repository.

What is Change Data Capture (CDC)?

Change Data Capture (CDC) is a technique used in data pipelines to identify and track changes made to data in a database. It captures these changes, INSERT, UPDATE, and DELETE, and makes them available for use in other systems, often in real-time or near real-time. CDC helps improve data efficiency and consistency across systems by only replicating the changed data, rather than the entire dataset.

There are three types of CDC mechanisms namely:

  • Log-based CDC: This captures changes by reading the database’s transaction logs, which are used internally to ensure durability and consistency. For example, PostgreSQL uses Write-Ahead Logs (WALs) to record all changes before they're applied to the actual database. Tools like Debezium tap into these logs using logical decoding and convert them into structured JSON messages. These include:
    • before: The state of the record before the change.
    • after: The new state after the change.
    • source: Metadata about the origin of the change e.g., database, table, LSN, timestamp.

An example of a log captured from Postgres into a Kafka Topic via a Debezium connector in JSON:

{
  "before": null,
  "after": {
    "id": 101,
    "symbol": "BTCUSDT",
    "price": "61550.00",
    "time": 1724822831000
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "postgresql",
    "name": "binance",
    "ts_ms": 1724822831200,
    "snapshot": "false",
    "db": "crypto_db",
    "sequence": "[null,\"240300112\"]",
    "schema": "binance",
    "table": "latest_prices",
    "txId": 78321,
    "lsn": 240300112,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1724822831210,
  "transaction": null
}
Enter fullscreen mode Exit fullscreen mode
  • Query-based CDC: This method involves periodically querying the source database to identify changes, usually by comparing the last updated timestamps or using ROWVERSION fields. Below is an example:
SELECT * FROM orders WHERE updated_at > '2025-07-02 10:00:00';
Enter fullscreen mode Exit fullscreen mode
  • Trigger-based CDC: This method uses database triggers to detect changes and write them to an audit/log table. An example includes:
CREATE TRIGGER audit_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION log_user_changes();
Enter fullscreen mode Exit fullscreen mode

log_user_changes() would write the old and new row values to a separate user_audit_log table which will store the changes.

Project Requirements

This project requires the following:

  • python 3.x
  • pyspark 3.5.x
  • cassandra-driver
  • confluent-kafka
  • sqlalchemy
  • psycopg2-binary
  • Confluent Cloud cluster
  • Debezium's PostgresCDCConnectorV2 connector, available on Confluent Cloud
  • Microsoft Azure Virtual Machine to host our Apache Cassandra. (You can host it locally if you want to)

Project Workflow

Project Workflow

Project Walkthrough

Phase 1.

The Python script extracts and transforms 5 types of data from the Binance REST API endpoints:

  • Recent Trades

  • Order Book Depth

  • 30-minute Klines / Candlesticks

  • 24h Daily Ticker prices

  • Latest Prices

Here's an example of the extraction, transformation and loading of the latest_prices prices data:

def get_latest_prices(BASE_URL, symbols, DB_URI):
    coin_data = []
    url = f"{BASE_URL}/api/v3/ticker/price"
    for s in symbols:
        params = {'symbol': s}
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            coin_data.append({
                'symbol': s,
                'price': data["price"],
                'time': int(time.time() * 1000)
            })
        else:
            print(f"Error fetching recent trades: {response.status_code}, {response.text}")
    df = pd.DataFrame(coin_data)
    df["price"] = df["price"].astype(float)
    df["time"] = pd.to_datetime(df["time"], unit='ms')
    try:
        engine = create_engine(DB_URI)
        df.to_sql('latest_prices', con=engine, schema='binance', index=False, if_exists='append')
        print("Latest price data loaded successfully!")
    except Exception as e:
        print(f"Loading latest prices to db: {e}")
Enter fullscreen mode Exit fullscreen mode

After transformation, it loads the data into a PostgreSQL database's Binance schema. The PostgreSQL tables created are:

  • binance.recent_trades

  • binance.order_book

  • binance.klines

  • binance.daily_ticker

  • binance.latest_prices

Phase 2

Head over to Confluent Cloud and if you do not have an account, please create a free account to claim free $400 credits and create an environment and cluster. When prompted to add a payment method.

Cluster Setup

After creating a cluster, on the left-hand side, select Connectors and search for Postgres CDC Source V2 (Debezium) connector and start the setup.

CDC Setup

CDC Setup

Here's the connector receiving data from the Postgres database and storing it into Kafka topics.

Connecting to Kafka

Phase 3

A script containing a Spark Streaming job that acts as a data sink into Apache Cassandra.

It consumes the logs from the various Kafka topics into JSON format, flattens after, source, op, ts_ms, and transaction fields, transforms them, and loads them into Cassandra hosted on an Azure Ubuntu VM

The data received in Cassandra is as follows for the klines table:

Klines example

Phase 4.

After streaming data into Cassandra, we need to visualize this data on a Grafana dashboard to derive insights from the data.

For this project, I have used Grafana Cloud. There is a plugin that is used as a data source, but it only supports Cassandra 3.0.0 and older, and I am running 4.1.9 hence coming into some issues with connecting Grafana

Hence, I built a simple Postgres-Cassandra bridge to send data to a PostgreSQL database which Grafana natively supports to draw insights from the data.

NOTE: By default, Cassandra comes with an authenticator and authorizer set to AllowAllAuthenticator, which disables authentication and allows any user to access Cassandra's shell cqlsh. Remove the default configuration, and restart the Cassandra service as so:

sudo nano /etc/cassandra/cassandra.yaml # Cassandra's config file
Enter fullscreen mode Exit fullscreen mode
authenticator: PasswordAuthenticator

authorizer: CassandraAuthorizer
Enter fullscreen mode Exit fullscreen mode

Then exit the editor and run the following comand to restart the Cassandra service:

sudo systemctl restart cassandra
Enter fullscreen mode Exit fullscreen mode

Dashboard image 1

Dashboard image 2

Insights gained from the data

The dashboards reveal several valuable insights into crypto trading patterns. The first chart shows that XRPUSDT, AVAXUSDT, BTCUSDT, and DOGEUSDT are among the most frequently traded symbols in the trading day, indicating high liquidity or market interest.

The ETHUSDT volume trend indicates significant fluctuations, with notable spikes between 13:00 and 14:00, suggesting peak trading hours possibly influenced by market news or institutional actions.

The candlestick chart for BTCUSDT displays hourly open, high, low, and close prices, offering a useful tool for technical analysis and identifying market trends or reversal patterns.

Additionally, the quantity of coins ordered throughout the trading day highlights DOGEUSDT and ADAUSDT as dominant in trade volume, with DOGE exceeding 270,000 units, likely due to its lower price and retail trader appeal. Overall, the visualizations suggest strong market activity around a few key coins, with volume and price data providing critical insights for real-time monitoring, trading strategy, or machine learning applications.

Conclusion

This project was a rewarding journey through the world of real-time data engineering. It demonstrates how Change Data Capture can save time and resources while updating our databases for streaming applications and data pipelines. This stack can easily be used for downstream use cases e.g. data analytics or even real-time ML predictions for crypto coins.

Do you have any questions? Don't forget to like, comment and follow for more data engineering tutorials and content!

Top comments (0)