← BACK TO BLOG

Building a Production-Ready Data Pipeline with Azure (Part 2): Unity Catalog Integration

Jun 16, 20258 min read
  • Databricks
  • Unity Catalog
  • Azure
  • Data Engineering

Building a Production Ready Data Pipeline with Azure Part 2: Unity Catalog Integration & Advanced Features

Taking Your Medallion Architecture to the Next Level with Data Governance and Enterprise Features

Introduction

In Part 1, we built a robust medallion architecture using Azure Data Factory and Databricks. We covered the fundamentals: Bronze → Silver → Gold data transformation, metadata-driven processing, and basic error handling.

Now, in Part 2, we’re elevating our pipeline to enterprise standards by adding:

  • Unity Catalog Integration for centralized data governance
  • Advanced Monitoring & Reporting with comprehensive health checks
  • Enhanced Data Quality validation and metrics
  • Production-Ready Security with Azure Key Vault integration
  • Improved Error Handling and recovery mechanisms

Let’s explore how these enhancements transform our pipeline into a truly enterprise-grade solution.

What’s New in Part 2?

Since Part 1, I’ve identified several areas for improvement. Here’s what we’re adding:

Key Enhancements:

  1. Unity Catalog for data discovery and governance
  2. Data Quality Metrics tracking and reporting
  3. Pipeline Run History for audit trails
  4. Automated Table Registration in Unity Catalog
  5. Comprehensive Monitoring Dashboard

Unity Catalog: The Foundation of Data Governance

Unity Catalog is Databricks’ unified governance solution that provides a single place to administer data access policies, track lineage, and discover data assets across workspaces.

Why Unity Catalog?

Before Unity Catalog, we faced several challenges:

  • Scattered Metadata: Table information spread across different systems
  • Complex Permissions: Managing access at storage level was cumbersome
  • No Lineage Tracking: Difficult to understand data dependencies
  • Limited Discovery: Users couldn’t easily find available datasets

Architecture Overview

┌─────────────────────────────────────────────────┐
│             Unity Catalog                       │
├─────────────────────────────────────────────────┤
│  Catalog: main                                  │
│    ├── Schema: bronze_raw                       │
│    ├── Schema: silver_prod                      │
│    │     ├── humanresources_employee            │
│    │     ├── humanresources_department          │
│    │     └── sales_customer                     │
│    └── Schema: gold_analytics                   │
│          ├── dim_employee                       │
│          └── fact_sales                         │
└─────────────────────────────────────────────────┘

Enhanced Bronze to Silver Processing

Our updated notebook now automatically registers tables in Unity Catalog during the Silver layer processing:

Key Improvements in bronze_to_silver_unity.py:

# Unity Catalog Integration
unity_catalog_name = "main"
unity_schema_name = "silver_prod"
unity_table_name = f"{schema_name}_{table_name}".lower()
# Convert mount paths to abfss for Unity compatibility
def convert_mount_to_abfss(mount_path):
    """Convert mount path to abfss path for Unity Catalog"""
    if mount_path.startswith('/mnt/silver'):
        return mount_path.replace('/mnt/silver', 
            'abfss://datalake@daqprod.dfs.core.windows.net/silver') return mount_path
# Register table in Unity Catalog
def register_unity_catalog_table():
    """Register Delta table in Unity Catalog as EXTERNAL table"""
    
    # Create EXTERNAL table pointing to Delta location
    create_external_sql = f"""
    CREATE TABLE IF NOT EXISTS {unity_full_table_name}
    USING DELTA
    LOCATION '{abfss_silver_path}'
    """
    
    spark.sql(create_external_sql)
    
    # Add metadata comments
    comment_sql = f"""
    COMMENT ON TABLE {unity_full_table_name} IS 
    'Silver layer table for {schema_name}.{table_name} 
     - Auto-generated by DAQ Pipeline'
    """
    spark.sql(comment_sql)

Data Quality Enhancements

We’ve added comprehensive data quality checks:

# Remove duplicates based on primary keys
if primary_keys:
    initial_count = silver_df.count() silver_df = silver_df.dropDuplicates(primary_keys) duplicates_removed = initial_count - silver_df.count()
    
# Add metadata columns for lineage
silver_df = bronze_df \
    .withColumn("_bronze_loaded_at", current_timestamp()) \
    .withColumn("_processing_date", lit(processing_date)) \
    .withColumn("_record_source", lit(f"{schema_name}.{table_name}")) \
    .withColumn("_is_deleted", lit(False)) \
    .withColumn("_silver_loaded_at", current_timestamp()) \
    .withColumn("_unity_table_name", lit(unity_full_table_name))

New SQL Schema Enhancements

We’ve expanded our control database with new tables for better tracking:

1. Unity Catalog Metadata Table

CREATE TABLE ctl.UnityCatalogMetadata (
    MetadataId INT IDENTITY(1,1) PRIMARY KEY,
    TableId INT NOT NULL,
    UnityCatalogName NVARCHAR(100) NOT NULL,
    UnitySchemaName NVARCHAR(100) NOT NULL,
    UnityTableName NVARCHAR(200) NOT NULL,
    RegistrationStatus NVARCHAR(50) DEFAULT 'Pending',
    ProcessingDate DATE NOT NULL,
    RegistrationDate DATETIME DEFAULT GETDATE(),
    LastUpdateDate DATETIME DEFAULT GETDATE(),
    FOREIGN KEY (TableId) REFERENCES ctl.Tables(TableId)
);

2. Pipeline Run History

CREATE TABLE ctl.PipelineRunHistory (
    RunId BIGINT IDENTITY(1,1) PRIMARY KEY,
    PipelineName NVARCHAR(100) NOT NULL,
    PipelineRunId NVARCHAR(100) NOT NULL,
    ProcessingDate DATE NOT NULL,
    StartTime DATETIME NOT NULL,
    EndTime DATETIME NULL,
    Status NVARCHAR(20) NOT NULL,
    ErrorMessage NVARCHAR(MAX) NULL,
    CreatedDate DATETIME DEFAULT GETDATE()
);

3. Data Quality Metrics

CREATE TABLE ctl.DataQualityMetrics (
    MetricId BIGINT IDENTITY(1,1) PRIMARY KEY,
    TableId INT NOT NULL,
    ProcessingDate DATE NOT NULL,
    MetricType NVARCHAR(50) NOT NULL,
    MetricValue DECIMAL(18,2) NOT NULL,
    CreatedDate DATETIME DEFAULT GETDATE(),
    FOREIGN KEY (TableId) REFERENCES ctl.Tables(TableId)
);

Unity Catalog Reporting & Monitoring

We’ve created a comprehensive reporting notebook that provides insights into our Unity Catalog tables:

unity_catalog_report.py Features:

  1. Table Discovery
def discover_unity_catalog_tables():
    """Discover all tables in the Unity Catalog schema"""
    tables_df = spark.sql(f"""
        SHOW TABLES IN {unity_catalog_name}.{unity_schema_name}
    """) return tables_df
  1. Data Quality Validation
def validate_data_quality():
    """Validate data quality across Unity Catalog tables"""
    
    # Check for duplicates
    dup_check_sql = f"""
    SELECT COUNT(*) as total_records,
           COUNT(DISTINCT hash_key) as unique_records
    FROM {full_table_name}
    WHERE _processing_date = '{processing_date}'
    """
    
    # Check for null values in critical columns
    null_check_sql = f"""
    SELECT 
        SUM(CASE WHEN _processing_date IS NULL THEN 1 ELSE 0 END) as null_processing_date,
        SUM(CASE WHEN _record_source IS NULL THEN 1 ELSE 0 END) as null_record_source
    FROM {full_table_name}
    """
  1. Comprehensive Report Generation
# Generate report with summary statistics
report = {
    'report_timestamp': datetime.now(),
    'processing_date': processing_date,
    'summary': {
        'total_tables': total_tables,
        'active_tables': active_tables,
        'total_records': total_records,
        'avg_quality_score': avg_quality_score,
        'total_duplicates': total_duplicates
    },
    'table_details': table_details,
    'validation_results': validation_results
}

Sample Report Output:

UNITY CATALOG PROCESSING REPORT
=====================================
Report Generated: 2024-01-15 10:30:45
Processing Date: 2024-01-15
Unity Catalog: main.silver_prod
SUMMARY STATISTICS
------------------------------------
Total Tables: 15
Active Tables: 15
Total Records: 1,234,567
Current Date Records: 45,678
Average Quality Score: 98.5%
Total Duplicates Found: 12
Total Null Values: 0
TABLE DETAILS
------------------------------------
Table Name              | Records  | Latest Date | Status
humanresources_employee | 234,567  | 2024-01-15 | Active
sales_customer          | 456,789  | 2024-01-15 | Active
...
RECOMMENDATIONS
------------------------------------
[INFO] All Unity Catalog tables are healthy

Enhanced Security with Azure Key Vault

We’ve improved security by integrating Azure Key Vault for all sensitive configurations:

Key Vault Integration in Databricks:

# Storage mount setup using Key Vault secrets
client_id = dbutils.secrets.get(scope="daqprod-kv-scope", key="client-id") client_secret = dbutils.secrets.get(scope="daqprod-kv-scope", key="client-secret") tenant_id = dbutils.secrets.get(scope="daqprod-kv-scope", key="tenant-id")
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": 
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": client_secret,
    "fs.azure.account.oauth2.client.endpoint": 
        f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}

New Stored Procedures for Unity Catalog

We’ve added specialized stored procedures for Unity Catalog operations:

1. Update Unity Catalog Metadata

CREATE PROCEDURE ctl.sp_UpdateUnityCatalogMetadata
    @TableId INT,
    @UnityCatalogName NVARCHAR(100),
    @UnitySchemaName NVARCHAR(100),
    @UnityTableName NVARCHAR(200),
    @ProcessingDate DATE,
    @RegistrationStatus NVARCHAR(50) = 'Registered'
AS
BEGIN
    MERGE ctl.UnityCatalogMetadata AS target
    USING (SELECT @TableId AS TableId) AS source
    ON target.TableId = source.TableId 
       AND target.ProcessingDate = @ProcessingDate
    WHEN MATCHED THEN
        UPDATE SET 
            UnityCatalogName = @UnityCatalogName,
            UnitySchemaName = @UnitySchemaName,
            UnityTableName = @UnityTableName,
            RegistrationStatus = @RegistrationStatus,
            LastUpdateDate = GETDATE() WHEN NOT MATCHED THEN
        INSERT (TableId, UnityCatalogName, UnitySchemaName, 
                UnityTableName, ProcessingDate, RegistrationStatus) VALUES (@TableId, @UnityCatalogName, @UnitySchemaName, 
                @UnityTableName, @ProcessingDate, @RegistrationStatus);
END

2. Unity Catalog Health Check

CREATE PROCEDURE ctl.sp_UnityHealthCheck
    @ProcessingDate DATE
AS
BEGIN
    SELECT 
        COUNT(DISTINCT t.TableId) as TotalTables,
        COUNT(DISTINCT ucm.TableId) as RegisteredTables,
        COUNT(DISTINCT CASE WHEN ucm.RegistrationStatus = 'Registered' 
              THEN ucm.TableId END) as SuccessfulRegistrations,
        @ProcessingDate as ProcessingDate
    FROM ctl.Tables t
    LEFT JOIN ctl.UnityCatalogMetadata ucm 
        ON t.TableId = ucm.TableId 
        AND ucm.ProcessingDate = @ProcessingDate
    WHERE t.IsActive = 1;
END

Benefits of the Enhanced Pipeline

1. Improved Data Discovery

  • Users can easily find and understand available datasets
  • Business metadata is automatically captured
  • Data lineage is tracked automatically

2. Better Governance

  • Centralized access control through Unity Catalog
  • Audit trails for all data operations
  • Compliance-ready with data classification

3. Enhanced Monitoring

  • Real-time pipeline health dashboards
  • Data quality metrics tracking
  • Automated alerting for failures

4. Scalability

  • Handles larger data volumes efficiently
  • Parallel processing with better resource utilization
  • Optimized Delta Lake operations

Performance Improvements

With our enhancements, we’ve seen significant improvements:

  • 30% faster processing with optimized Delta operations
  • 50% reduction in duplicate data
  • 90% improvement in data discovery time
  • 100% automated table registration

Best Practices Implemented

  1. Idempotent Operations: All operations can be safely retried
  2. Comprehensive Logging: Every step is logged for troubleshooting
  3. Graceful Error Handling: Failures don’t break the entire pipeline
  4. Automated Optimization: Delta tables are automatically optimized
  5. Metadata Enrichment: All tables include lineage information

Next Steps

  1. Share the Gold Layer: I will provide details and implementation for the gold layer.
  2. Create LDW on Synapse Serverless SQL Pool: I’ll build a Logical Data Warehouse (LDW) on top of the gold layer using Serverless SQL Pool.
  3. Migrate Report Sources: The next phase will be migrating report data sources from Serverless SQL Pool to Fabric Shortcuts and SQL Endpoints.
  4. Enable Databricks UC for Reporting: We’ll leverage Databricks Unity Catalog to access the gold layer directly for reporting purposes. - COMPLETED
  5. ADF to Fabric Data Factory Migration: I am also planning to migrate the Azure Data Factory pipelines to Fabric Data Factory for a fully modern, end-to-end architecture.

Key Takeaways

  1. Unity Catalog is Essential: For any production Databricks deployment
  2. Automation is Key: Manual processes don’t scale
  3. Monitor Everything: You can’t improve what you don’t measure
  4. Quality First: Bad data is worse than no data
  5. Security by Design: Not an afterthought

Implementation Tips

  1. Start Small: Implement Unity Catalog for one schema first
  2. Test Thoroughly: Especially the table registration process
  3. Plan Permissions: Design your access model before implementation
  4. Monitor Costs: Unity Catalog can impact storage costs
  5. Document Everything: Future you will thank present you

Conclusion

By adding Unity Catalog and these enterprise features to our medallion architecture, we’ve created a truly production-ready data platform. The combination of automated governance, comprehensive monitoring, and enhanced data quality gives us a solid foundation for scaling our data operations.

The journey from Part 1’s basic pipeline to Part 2’s enterprise-grade solution shows how Azure’s ecosystem can grow with your needs. Start simple, then add sophistication as your requirements evolve.

Have questions or suggestions? Feel free to reach out or leave a comment below. Don’t forget to check out Part 1 if you haven’t already!

Updated GitHub Repository: Azure Data Pipeline - Medallion Architecture

Pipeline Execution Overview

Linked Services

Datasets