Building a Financial Data Pipeline: NYSE to Power BI

October 2025

Built an ETL pipeline to pull NYSE data, transform it, load it into PostgreSQL, and visualize it in Power BI. Standard data engineering work with proper security controls. Here's what actually matters.

Data Extraction

Used Alpha Vantage API for stock data. Free tier gives 25 requests per day, which is enough for testing but useless for production. Alternative sources: Yahoo Finance, Quandl, or direct FTP from NYSE archives.

API key goes in environment variables. Never hardcode credentials. Basic security that somehow still needs to be stated.

import os
API_KEY = os.environ.get('ALPHA_VANTAGE_KEY')

def extract_stock_data(symbol):
    params = {
        'function': 'TIME_SERIES_DAILY',
        'symbol': symbol,
        'outputsize': 'full',
        'apikey': API_KEY
    }
    response = requests.get(BASE_URL, params=params)
    return response.content

Added checksum logging for data integrity. SHA-256 hash of each file gets written to an audit log. If data gets corrupted or tampered with, you'll know.

Transformation

Raw financial data is filthy. Missing values, inconsistent formats, encoding issues. Don't trust the source.

Load data with explicit dtypes for performance. Pandas will guess types otherwise and get it wrong:

df = pd.read_csv(raw_file,
    parse_dates=['timestamp'],
    dtype={
        'open': 'float64',
        'high': 'float64',
        'low': 'float64',
        'close': 'float64',
        'volume': 'int64'
    })

Handle nulls strategically. Forward fill for prices (last known value makes sense), drop rows with missing volume (no volume means no trade, unusable data).

Feature engineering is where the value is. Raw close prices tell you nothing. Calculate returns, volatility, moving averages:

df['daily_return'] = df['close'].pct_change()
df['volatility'] = df['daily_return'].rolling(window=30).std()
df['moving_avg_50'] = df['close'].rolling(window=50).mean()
df['moving_avg_200'] = df['close'].rolling(window=200).mean()

Outlier detection catches trading halts and data errors. Anything beyond 3 standard deviations gets flagged:

df['is_outlier'] = np.abs(
    (df['close'] - df['close'].mean()) / df['close'].std()
) > 3

Database Setup

PostgreSQL 15. Superior performance for time-series data compared to MySQL. Better indexing, better window functions.

Schema design matters. Created separate schema for market data, fact table for daily prices, dimension table for stock metadata, audit table for data lineage.

CREATE SCHEMA market_data;

CREATE TABLE market_data.stock_prices (
    id SERIAL PRIMARY KEY,
    symbol VARCHAR(10) NOT NULL,
    trade_date DATE NOT NULL,
    close_price NUMERIC(12,4),
    volume BIGINT,
    daily_return NUMERIC(8,6),
    volatility NUMERIC(8,6),
    moving_avg_50 NUMERIC(12,4),
    moving_avg_200 NUMERIC(12,4),
    is_outlier BOOLEAN DEFAULT FALSE,
    checksum VARCHAR(64),
    UNIQUE(symbol, trade_date)
);

Indexes on symbol and date for query performance. Without these, queries take minutes instead of milliseconds:

CREATE INDEX idx_symbol_date 
ON market_data.stock_prices(symbol, trade_date DESC);

Loading Data

DBeaver works for manual imports. Right-click table, import CSV, map columns. Fine for one-off analysis.

For production, write scripts. Manual processes don't scale and create security gaps. Programmatic loading with upsert logic:

insert_query = """
INSERT INTO market_data.stock_prices 
(symbol, trade_date, close_price, volume, daily_return, 
 volatility, moving_avg_50, moving_avg_200, is_outlier, checksum)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (symbol, trade_date) DO UPDATE SET
    close_price = EXCLUDED.close_price,
    volume = EXCLUDED.volume
"""

execute_batch(cursor, insert_query, records, page_size=1000)

Batch inserts are significantly faster than individual inserts. 1000 records per batch is the sweet spot.

Data Validation

Run validation queries before visualization. Check for gaps, anomalies, quality issues:

-- Data completeness check
SELECT symbol, COUNT(*) as record_count,
    MIN(trade_date) as earliest,
    MAX(trade_date) as latest
FROM market_data.stock_prices
GROUP BY symbol;

-- Anomaly detection
SELECT symbol, trade_date, close_price
FROM market_data.stock_prices
WHERE is_outlier = TRUE;

Sudden spikes in outliers or null values could indicate data tampering. These queries double as intrusion detection.

Power BI Integration

Direct Query mode for sensitive financial data. Import mode is faster but less secure. Direct Query enables row-level security and real-time data.

Connection: Get Data → PostgreSQL, enter server details, use DirectQuery mode. Encrypt the connection. Don't embed credentials in .pbix files.

Create a date table for time intelligence functions:

DateTable = 
ADDCOLUMNS(
    CALENDAR(DATE(2020,1,1), DATE(2025,12,31)),
    "Year", YEAR([Date]),
    "Month", FORMAT([Date], "MMM"),
    "Quarter", "Q" & FORMAT([Date], "Q")
)

Key measures for financial analysis:

YoY_Return = 
DIVIDE(
    [Avg_Close_Price] - 
    CALCULATE([Avg_Close_Price], SAMEPERIODLASTYEAR(DateTable[Date])),
    CALCULATE([Avg_Close_Price], SAMEPERIODLASTYEAR(DateTable[Date]))
)

Sharpe_Ratio = 
VAR AvgReturn = AVERAGE(stock_prices[daily_return])
VAR StdDev = STDEV.P(stock_prices[daily_return])
VAR RiskFreeRate = 0.045 / 252
RETURN DIVIDE(AvgReturn - RiskFreeRate, StdDev)

Performance Optimization

Direct Query is slow. Accept it or build aggregation layers. Created materialized view for summary data:

CREATE MATERIALIZED VIEW market_data.monthly_summary AS
SELECT symbol, DATE_TRUNC('month', trade_date) as month,
    AVG(close_price) as avg_price,
    SUM(volume) as total_volume,
    STDDEV(daily_return) as volatility
FROM market_data.stock_prices
GROUP BY symbol, DATE_TRUNC('month', trade_date);

Refresh nightly via cron. Power BI queries this view instead of raw table. Queries drop from 20 seconds to under 2.

What Actually Matters

Data is never clean on first pass. Budget 60% of your time for transformation, not extraction.

Security isn't optional. Encrypted connections, credential management, audit trails. All non-negotiable.

DBeaver is a tool for analysis, not production ETL. Good for exploration, terrible for automation.

Direct Query in Power BI is inherently slow. Build aggregations or accept the performance hit.

Documentation saves time. When this breaks at 2 AM, detailed logs make the difference between 10 minutes and 2 hours of troubleshooting.

Pipeline is production-ready with monitoring. Query response under 5 seconds for 5-year dataset. Maintenance overhead approximately 2 hours per week.