# Week 3C: Building Data Preparation Pipelines

## ISM6251: Machine Learning for Business Applications

### Learning Objectives
By the end of this notebook, you will be able to:
- Design and implement complete data preparation pipelines
- Apply data cleaning, transformation, and validation in sequence
- Create reusable data processing functions
- Handle real-world data challenges systematically
- Build pipelines that scale to production environments

---

## Introduction: Why Data Pipelines?

Data pipelines are essential for:
- **Reproducibility**: Ensure consistent data processing
- **Scalability**: Handle growing data volumes
- **Maintainability**: Easy to update and debug
- **Automation**: Reduce manual intervention
- **Quality Control**: Systematic validation at each step

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
import json
from typing import Dict, List, Tuple, Optional, Any
import logging

# Configure display and warnings
pd.set_option('display.max_columns', None)
pd.set_option('display.precision', 2)
warnings.filterwarnings('ignore')

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Set visualization style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')

print("Libraries loaded successfully!")

## Part 1: Creating a Realistic Dataset

Let's create a comprehensive e-commerce dataset with intentional data quality issues:

In [None]:
def create_ecommerce_dataset(n_records: int = 2000, seed: int = 42) -> pd.DataFrame:
    """
    Create a realistic e-commerce dataset with various data quality issues.
    """
    np.random.seed(seed)
    
    # Generate base data
    data = {
        'order_id': [f'ORD{str(i).zfill(6)}' for i in range(1, n_records + 1)],
        'customer_id': np.random.choice(range(1000, 2000), n_records),
        'order_date': [datetime(2023, 1, 1) + timedelta(days=np.random.randint(0, 365)) 
                      for _ in range(n_records)],
        'product_name': np.random.choice(
            ['iPhone 14', 'Samsung Galaxy S23', 'iPad Pro', 'MacBook Pro', 'AirPods Pro',
             'Dell XPS 13', 'Sony WH-1000XM5', 'Nintendo Switch', 'PS5', 'Xbox Series X'],
            n_records
        ),
        'category': np.random.choice(
            ['Electronics', 'electronics', 'ELECTRONICS', 'Electrnics', 'Electronic'],  # Intentional inconsistency
            n_records
        ),
        'quantity': np.random.randint(1, 10, n_records),
        'unit_price': np.random.uniform(50, 2500, n_records),
        'discount_percent': np.random.choice([0, 5, 10, 15, 20, 25, 100, -10], n_records, p=[0.3, 0.2, 0.2, 0.15, 0.1, 0.03, 0.01, 0.01]),  # Invalid values
        'shipping_cost': np.random.uniform(0, 50, n_records),
        'payment_method': np.random.choice(
            ['Credit Card', 'credit card', 'PayPal', 'paypal', 'Debit Card', 'Cash', None],
            n_records, p=[0.3, 0.1, 0.2, 0.05, 0.2, 0.1, 0.05]
        ),
        'shipping_address': np.random.choice(
            ['123 Main St, New York, NY 10001', '456 Oak Ave, Los Angeles, CA 90001',
             '789 Pine Rd, Chicago, IL 60601', '', None, 'Invalid Address'],
            n_records, p=[0.4, 0.3, 0.2, 0.05, 0.03, 0.02]
        ),
        'customer_email': [f'customer{i}@example.com' if np.random.random() > 0.05 
                          else np.random.choice(['invalid-email', '', None])
                          for i in np.random.choice(range(100, 500), n_records)],
        'order_status': np.random.choice(
            ['Completed', 'completed', 'Pending', 'Cancelled', 'Shipped', None],
            n_records, p=[0.4, 0.1, 0.2, 0.1, 0.15, 0.05]
        ),
        'customer_age': np.random.choice(
            list(range(18, 80)) + [-1, 150, 999],  # Invalid ages
            n_records, p=[0.97] + [0.01, 0.01, 0.01]
        ),
        'review_score': np.random.choice(
            [1, 2, 3, 4, 5, -1, 10, None],  # Invalid scores
            n_records, p=[0.05, 0.1, 0.15, 0.3, 0.35, 0.01, 0.01, 0.03]
        )
    }
    
    df = pd.DataFrame(data)
    
    # Add calculated fields with potential errors
    df['total_before_discount'] = df['quantity'] * df['unit_price']
    df['discount_amount'] = df['total_before_discount'] * df['discount_percent'] / 100
    df['total_amount'] = df['total_before_discount'] - df['discount_amount'] + df['shipping_cost']
    
    # Introduce some duplicates
    duplicate_indices = np.random.choice(df.index, size=50, replace=False)
    duplicates = df.loc[duplicate_indices].copy()
    df = pd.concat([df, duplicates], ignore_index=True)
    
    # Introduce some missing values
    for col in ['quantity', 'unit_price', 'shipping_cost']:
        missing_indices = np.random.choice(df.index, size=30, replace=False)
        df.loc[missing_indices, col] = np.nan
    
    return df.sample(frac=1).reset_index(drop=True)  # Shuffle the data

# Create the dataset
raw_data = create_ecommerce_dataset(2000)
print(f"Dataset created with {len(raw_data)} records and {len(raw_data.columns)} columns")
print(f"\nData types:")
print(raw_data.dtypes)
print(f"\nFirst 5 records:")
raw_data.head()

## Part 2: Data Quality Assessment Functions

In [None]:
class DataQualityAssessor:
    """
    A comprehensive data quality assessment tool.
    """
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.report = {}
    
    def check_missing_values(self) -> Dict:
        """Check for missing values in the dataset."""
        missing_count = self.df.isnull().sum()
        missing_percent = (missing_count / len(self.df)) * 100
        
        missing_report = pd.DataFrame({
            'Missing_Count': missing_count,
            'Missing_Percentage': missing_percent
        })
        
        self.report['missing_values'] = missing_report[missing_report['Missing_Count'] > 0]
        return self.report['missing_values']
    
    def check_duplicates(self) -> Dict:
        """Check for duplicate records."""
        duplicates = self.df.duplicated()
        duplicate_count = duplicates.sum()
        
        self.report['duplicates'] = {
            'total_duplicates': duplicate_count,
            'duplicate_percentage': (duplicate_count / len(self.df)) * 100,
            'duplicate_indices': self.df[duplicates].index.tolist()[:10]  # First 10
        }
        return self.report['duplicates']
    
    def check_data_types(self) -> Dict:
        """Check for potential data type issues."""
        type_issues = []
        
        for col in self.df.columns:
            if self.df[col].dtype == 'object':
                # Check if column should be numeric
                try:
                    pd.to_numeric(self.df[col].dropna(), errors='raise')
                    type_issues.append({
                        'column': col,
                        'current_type': str(self.df[col].dtype),
                        'suggested_type': 'numeric'
                    })
                except:
                    pass
                
                # Check if column should be datetime
                if 'date' in col.lower() or 'time' in col.lower():
                    type_issues.append({
                        'column': col,
                        'current_type': str(self.df[col].dtype),
                        'suggested_type': 'datetime'
                    })
        
        self.report['type_issues'] = type_issues
        return type_issues
    
    def check_outliers(self, columns: List[str] = None, method: str = 'iqr') -> Dict:
        """Check for outliers in numeric columns."""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        outlier_report = {}
        
        for col in columns:
            if col in self.df.columns and pd.api.types.is_numeric_dtype(self.df[col]):
                data = self.df[col].dropna()
                
                if method == 'iqr':
                    Q1 = data.quantile(0.25)
                    Q3 = data.quantile(0.75)
                    IQR = Q3 - Q1
                    lower_bound = Q1 - 1.5 * IQR
                    upper_bound = Q3 + 1.5 * IQR
                    outliers = data[(data < lower_bound) | (data > upper_bound)]
                
                elif method == 'zscore':
                    z_scores = np.abs((data - data.mean()) / data.std())
                    outliers = data[z_scores > 3]
                
                if len(outliers) > 0:
                    outlier_report[col] = {
                        'count': len(outliers),
                        'percentage': (len(outliers) / len(data)) * 100,
                        'min_outlier': outliers.min(),
                        'max_outlier': outliers.max()
                    }
        
        self.report['outliers'] = outlier_report
        return outlier_report
    
    def check_data_consistency(self) -> Dict:
        """Check for data consistency issues."""
        consistency_issues = []
        
        # Check for inconsistent categorical values
        for col in self.df.select_dtypes(include=['object']).columns:
            unique_values = self.df[col].dropna().unique()
            
            # Check for case inconsistencies
            lower_values = [str(v).lower() for v in unique_values]
            if len(unique_values) != len(set(lower_values)):
                consistency_issues.append({
                    'column': col,
                    'issue': 'case_inconsistency',
                    'unique_values': unique_values.tolist()[:10]
                })
        
        # Check for invalid values
        if 'discount_percent' in self.df.columns:
            invalid_discounts = self.df[
                (self.df['discount_percent'] < 0) | 
                (self.df['discount_percent'] > 100)
            ]
            if len(invalid_discounts) > 0:
                consistency_issues.append({
                    'column': 'discount_percent',
                    'issue': 'invalid_range',
                    'count': len(invalid_discounts)
                })
        
        self.report['consistency_issues'] = consistency_issues
        return consistency_issues
    
    def generate_full_report(self) -> Dict:
        """Generate a comprehensive data quality report."""
        print("===== DATA QUALITY ASSESSMENT REPORT =====")
        print(f"\nDataset Shape: {self.df.shape}")
        
        print("\n1. MISSING VALUES:")
        missing = self.check_missing_values()
        if not missing.empty:
            print(missing)
        else:
            print("No missing values found.")
        
        print("\n2. DUPLICATES:")
        duplicates = self.check_duplicates()
        print(f"Total duplicates: {duplicates['total_duplicates']} ({duplicates['duplicate_percentage']:.2f}%)")
        
        print("\n3. DATA TYPE ISSUES:")
        type_issues = self.check_data_types()
        if type_issues:
            for issue in type_issues:
                print(f"  - {issue['column']}: {issue['current_type']} → {issue['suggested_type']}")
        else:
            print("No data type issues found.")
        
        print("\n4. OUTLIERS:")
        outliers = self.check_outliers()
        if outliers:
            for col, stats in outliers.items():
                print(f"  - {col}: {stats['count']} outliers ({stats['percentage']:.2f}%)")
        else:
            print("No outliers found.")
        
        print("\n5. CONSISTENCY ISSUES:")
        consistency = self.check_data_consistency()
        if consistency:
            for issue in consistency:
                print(f"  - {issue['column']}: {issue['issue']}")
        else:
            print("No consistency issues found.")
        
        return self.report

# Run assessment
assessor = DataQualityAssessor(raw_data)
quality_report = assessor.generate_full_report()

## Part 3: Building the Data Pipeline

Now let's build a comprehensive data pipeline to clean and prepare our data:

In [None]:
class DataPipeline:
    """
    A comprehensive data preparation pipeline.
    """
    
    def __init__(self, verbose: bool = True):
        self.verbose = verbose
        self.steps_executed = []
        self.original_shape = None
        self.final_shape = None
    
    def _log(self, message: str):
        """Log message if verbose mode is enabled."""
        if self.verbose:
            logger.info(message)
    
    def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        """Remove duplicate records."""
        self._log("Removing duplicates...")
        initial_count = len(df)
        df_clean = df.drop_duplicates()
        removed_count = initial_count - len(df_clean)
        self._log(f"  Removed {removed_count} duplicate records")
        self.steps_executed.append('remove_duplicates')
        return df_clean
    
    def standardize_text_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        """Standardize text columns (case, whitespace, etc.)."""
        self._log("Standardizing text columns...")
        df_clean = df.copy()
        
        text_columns = df_clean.select_dtypes(include=['object']).columns
        
        for col in text_columns:
            # Strip whitespace and standardize case for specific columns
            if col in ['category', 'payment_method', 'order_status']:
                df_clean[col] = df_clean[col].str.strip().str.title()
                self._log(f"  Standardized {col}")
        
        # Fix specific inconsistencies
        if 'category' in df_clean.columns:
            df_clean['category'] = df_clean['category'].replace({
                'Electrnics': 'Electronics',
                'Electronic': 'Electronics'
            })
        
        self.steps_executed.append('standardize_text')
        return df_clean
    
    def handle_missing_values(self, df: pd.DataFrame, strategy: Dict[str, str] = None) -> pd.DataFrame:
        """Handle missing values based on specified strategies."""
        self._log("Handling missing values...")
        df_clean = df.copy()
        
        if strategy is None:
            strategy = {
                'quantity': 'median',
                'unit_price': 'median',
                'shipping_cost': 'mean',
                'payment_method': 'mode',
                'shipping_address': 'drop',
                'order_status': 'fill_pending',
                'review_score': 'drop'
            }
        
        for col, method in strategy.items():
            if col in df_clean.columns:
                missing_count = df_clean[col].isnull().sum()
                
                if missing_count > 0:
                    if method == 'mean':
                        df_clean[col].fillna(df_clean[col].mean(), inplace=True)
                    elif method == 'median':
                        df_clean[col].fillna(df_clean[col].median(), inplace=True)
                    elif method == 'mode':
                        mode_value = df_clean[col].mode()[0] if not df_clean[col].mode().empty else 'Unknown'
                        df_clean[col].fillna(mode_value, inplace=True)
                    elif method == 'fill_pending':
                        df_clean[col].fillna('Pending', inplace=True)
                    elif method == 'drop':
                        df_clean = df_clean.dropna(subset=[col])
                    
                    self._log(f"  {col}: {method} (handled {missing_count} missing values)")
        
        self.steps_executed.append('handle_missing')
        return df_clean
    
    def fix_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
        """Convert columns to appropriate data types."""
        self._log("Fixing data types...")
        df_clean = df.copy()
        
        # Convert date columns
        date_columns = [col for col in df_clean.columns if 'date' in col.lower()]
        for col in date_columns:
            if df_clean[col].dtype == 'object':
                df_clean[col] = pd.to_datetime(df_clean[col], errors='coerce')
                self._log(f"  Converted {col} to datetime")
        
        # Ensure numeric columns are float/int
        numeric_columns = ['quantity', 'unit_price', 'discount_percent', 
                          'shipping_cost', 'customer_age', 'review_score']
        for col in numeric_columns:
            if col in df_clean.columns:
                df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
        
        self.steps_executed.append('fix_data_types')
        return df_clean
    
    def validate_and_clean_values(self, df: pd.DataFrame) -> pd.DataFrame:
        """Validate and clean invalid values."""
        self._log("Validating and cleaning values...")
        df_clean = df.copy()
        
        # Clean discount percentages
        if 'discount_percent' in df_clean.columns:
            invalid_discounts = (df_clean['discount_percent'] < 0) | (df_clean['discount_percent'] > 100)
            df_clean.loc[invalid_discounts, 'discount_percent'] = df_clean.loc[~invalid_discounts, 'discount_percent'].median()
            self._log(f"  Fixed {invalid_discounts.sum()} invalid discount values")
        
        # Clean customer age
        if 'customer_age' in df_clean.columns:
            invalid_ages = (df_clean['customer_age'] < 18) | (df_clean['customer_age'] > 120)
            df_clean.loc[invalid_ages, 'customer_age'] = df_clean.loc[~invalid_ages, 'customer_age'].median()
            self._log(f"  Fixed {invalid_ages.sum()} invalid age values")
        
        # Clean review scores
        if 'review_score' in df_clean.columns:
            invalid_scores = (df_clean['review_score'] < 1) | (df_clean['review_score'] > 5)
            df_clean.loc[invalid_scores, 'review_score'] = np.nan
            self._log(f"  Set {invalid_scores.sum()} invalid review scores to NaN")
        
        # Validate email addresses
        if 'customer_email' in df_clean.columns:
            email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            invalid_emails = ~df_clean['customer_email'].str.match(email_pattern, na=False)
            df_clean.loc[invalid_emails, 'customer_email'] = 'invalid@example.com'
            self._log(f"  Fixed {invalid_emails.sum()} invalid email addresses")
        
        self.steps_executed.append('validate_values')
        return df_clean
    
    def create_derived_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create new features from existing data."""
        self._log("Creating derived features...")
        df_clean = df.copy()
        
        # Recalculate totals
        if all(col in df_clean.columns for col in ['quantity', 'unit_price', 'discount_percent', 'shipping_cost']):
            df_clean['total_before_discount'] = df_clean['quantity'] * df_clean['unit_price']
            df_clean['discount_amount'] = df_clean['total_before_discount'] * df_clean['discount_percent'] / 100
            df_clean['total_amount'] = df_clean['total_before_discount'] - df_clean['discount_amount'] + df_clean['shipping_cost']
            self._log("  Recalculated order totals")
        
        # Extract date components
        if 'order_date' in df_clean.columns:
            df_clean['order_year'] = df_clean['order_date'].dt.year
            df_clean['order_month'] = df_clean['order_date'].dt.month
            df_clean['order_day'] = df_clean['order_date'].dt.day
            df_clean['order_dayofweek'] = df_clean['order_date'].dt.dayofweek
            df_clean['order_quarter'] = df_clean['order_date'].dt.quarter
            self._log("  Created date-based features")
        
        # Create customer segments
        if 'total_amount' in df_clean.columns:
            df_clean['order_size'] = pd.cut(
                df_clean['total_amount'],
                bins=[0, 100, 500, 1000, float('inf')],
                labels=['Small', 'Medium', 'Large', 'Extra Large']
            )
            self._log("  Created order size categories")
        
        # Flag high-value customers
        if 'customer_id' in df_clean.columns and 'total_amount' in df_clean.columns:
            customer_totals = df_clean.groupby('customer_id')['total_amount'].transform('sum')
            df_clean['is_high_value_customer'] = customer_totals > customer_totals.quantile(0.8)
            self._log("  Flagged high-value customers")
        
        self.steps_executed.append('create_features')
        return df_clean
    
    def run_pipeline(self, df: pd.DataFrame, steps: List[str] = None) -> pd.DataFrame:
        """Run the complete pipeline."""
        self.original_shape = df.shape
        self._log(f"\nStarting pipeline with {self.original_shape[0]} rows and {self.original_shape[1]} columns")
        self._log("="*60)
        
        if steps is None:
            steps = [
                'remove_duplicates',
                'standardize_text',
                'fix_data_types',
                'validate_values',
                'handle_missing',
                'create_features'
            ]
        
        df_processed = df.copy()
        
        for step in steps:
            if step == 'remove_duplicates':
                df_processed = self.remove_duplicates(df_processed)
            elif step == 'standardize_text':
                df_processed = self.standardize_text_columns(df_processed)
            elif step == 'fix_data_types':
                df_processed = self.fix_data_types(df_processed)
            elif step == 'validate_values':
                df_processed = self.validate_and_clean_values(df_processed)
            elif step == 'handle_missing':
                df_processed = self.handle_missing_values(df_processed)
            elif step == 'create_features':
                df_processed = self.create_derived_features(df_processed)
        
        self.final_shape = df_processed.shape
        self._log("="*60)
        self._log(f"Pipeline completed with {self.final_shape[0]} rows and {self.final_shape[1]} columns")
        self._log(f"Rows removed: {self.original_shape[0] - self.final_shape[0]}")
        self._log(f"Columns added: {self.final_shape[1] - self.original_shape[1]}")
        
        return df_processed

# Run the pipeline
pipeline = DataPipeline(verbose=True)
cleaned_data = pipeline.run_pipeline(raw_data)

## Part 4: Validate Pipeline Results

In [None]:
# Compare before and after
print("\n" + "="*60)
print("BEFORE AND AFTER COMPARISON")
print("="*60)

# Run quality assessment on cleaned data
print("\nQuality Assessment After Cleaning:")
assessor_after = DataQualityAssessor(cleaned_data)
quality_report_after = assessor_after.generate_full_report()

In [None]:
# Visualize the improvements
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# 1. Missing values comparison
missing_before = raw_data.isnull().sum().sum()
missing_after = cleaned_data.isnull().sum().sum()

axes[0, 0].bar(['Before', 'After'], [missing_before, missing_after], color=['red', 'green'])
axes[0, 0].set_title('Missing Values')
axes[0, 0].set_ylabel('Count')

# 2. Duplicate records
duplicates_before = raw_data.duplicated().sum()
duplicates_after = cleaned_data.duplicated().sum()

axes[0, 1].bar(['Before', 'After'], [duplicates_before, duplicates_after], color=['red', 'green'])
axes[0, 1].set_title('Duplicate Records')
axes[0, 1].set_ylabel('Count')

# 3. Dataset size
axes[0, 2].bar(['Before', 'After'], 
               [len(raw_data), len(cleaned_data)], 
               color=['blue', 'navy'])
axes[0, 2].set_title('Total Records')
axes[0, 2].set_ylabel('Count')

# 4. Category distribution
if 'category' in raw_data.columns:
    raw_categories = raw_data['category'].value_counts()
    clean_categories = cleaned_data['category'].value_counts()
    
    axes[1, 0].bar(range(len(raw_categories)), raw_categories.values, alpha=0.5, label='Before')
    axes[1, 0].bar(range(len(clean_categories)), clean_categories.values, alpha=0.5, label='After')
    axes[1, 0].set_title('Category Distribution')
    axes[1, 0].legend()

# 5. Discount percentage distribution
if 'discount_percent' in cleaned_data.columns:
    axes[1, 1].hist([raw_data['discount_percent'].dropna(), 
                     cleaned_data['discount_percent'].dropna()],
                    bins=20, alpha=0.5, label=['Before', 'After'])
    axes[1, 1].set_title('Discount Percentage Distribution')
    axes[1, 1].legend()

# 6. Feature count
axes[1, 2].bar(['Before', 'After'], 
               [raw_data.shape[1], cleaned_data.shape[1]], 
               color=['orange', 'green'])
axes[1, 2].set_title('Number of Features')
axes[1, 2].set_ylabel('Count')

plt.tight_layout()
plt.show()

## Part 5: Advanced Pipeline Features

### 5.1 Pipeline with Configuration

In [None]:
class ConfigurablePipeline:
    """
    A pipeline that can be configured via JSON/dictionary.
    """
    
    def __init__(self, config: Dict):
        self.config = config
        self.pipeline = DataPipeline(verbose=config.get('verbose', True))
    
    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        """Run pipeline based on configuration."""
        steps = self.config.get('steps', [])
        return self.pipeline.run_pipeline(df, steps)
    
    @classmethod
    def from_json(cls, json_path: str):
        """Load configuration from JSON file."""
        with open(json_path, 'r') as f:
            config = json.load(f)
        return cls(config)

# Example configuration
pipeline_config = {
    'name': 'ecommerce_cleaning_pipeline',
    'version': '1.0.0',
    'verbose': True,
    'steps': [
        'remove_duplicates',
        'standardize_text',
        'fix_data_types',
        'validate_values',
        'handle_missing',
        'create_features'
    ],
    'missing_value_strategy': {
        'quantity': 'median',
        'unit_price': 'median',
        'shipping_cost': 'mean'
    }
}

# Run configurable pipeline
config_pipeline = ConfigurablePipeline(pipeline_config)
result = config_pipeline.run(raw_data.head(100))  # Test on subset
print(f"Configured pipeline result: {result.shape}")

### 5.2 Pipeline with Validation

In [None]:
class DataValidator:
    """
    Validates data against predefined rules.
    """
    
    def __init__(self, rules: Dict):
        self.rules = rules
        self.validation_results = {}
    
    def validate(self, df: pd.DataFrame) -> Tuple[bool, Dict]:
        """Validate dataframe against rules."""
        all_valid = True
        
        for rule_name, rule_config in self.rules.items():
            if rule_config['type'] == 'not_null':
                column = rule_config['column']
                is_valid = df[column].notna().all()
                
            elif rule_config['type'] == 'unique':
                column = rule_config['column']
                is_valid = df[column].nunique() == len(df)
                
            elif rule_config['type'] == 'range':
                column = rule_config['column']
                min_val = rule_config.get('min', float('-inf'))
                max_val = rule_config.get('max', float('inf'))
                is_valid = df[column].between(min_val, max_val).all()
                
            elif rule_config['type'] == 'in_list':
                column = rule_config['column']
                valid_values = rule_config['values']
                is_valid = df[column].isin(valid_values).all()
            
            else:
                is_valid = True
            
            self.validation_results[rule_name] = is_valid
            if not is_valid:
                all_valid = False
        
        return all_valid, self.validation_results

# Define validation rules
validation_rules = {
    'order_id_unique': {
        'type': 'unique',
        'column': 'order_id'
    },
    'quantity_positive': {
        'type': 'range',
        'column': 'quantity',
        'min': 1,
        'max': 100
    },
    'discount_valid': {
        'type': 'range',
        'column': 'discount_percent',
        'min': 0,
        'max': 100
    },
    'category_valid': {
        'type': 'in_list',
        'column': 'category',
        'values': ['Electronics', 'Accessories']
    }
}

# Validate cleaned data
validator = DataValidator(validation_rules)
is_valid, validation_results = validator.validate(cleaned_data)

print("Validation Results:")
print("="*40)
for rule, passed in validation_results.items():
    status = "✓ PASSED" if passed else "✗ FAILED"
    print(f"{rule}: {status}")
print(f"\nOverall validation: {'PASSED' if is_valid else 'FAILED'}")

### Practice Exercise 1: Custom Pipeline Step

Create a custom pipeline step that detects and handles outliers:

In [None]:
# TODO: Create a function that:
# 1. Detects outliers using the IQR method
# 2. Offers options to: cap, remove, or transform outliers
# 3. Logs the number of outliers handled

def handle_outliers(df: pd.DataFrame, columns: List[str], method: str = 'cap') -> pd.DataFrame:
    """
    Handle outliers in specified columns.
    
    Parameters:
    - df: Input dataframe
    - columns: List of columns to check for outliers
    - method: 'cap' (cap at bounds), 'remove' (remove rows), or 'transform' (log transform)
    """
    # Your code here:
    pass

# Test your function
# test_data = cleaned_data.copy()
# result = handle_outliers(test_data, ['total_amount', 'quantity'], method='cap')

## Part 6: Production-Ready Pipeline

### 6.1 Error Handling and Logging

In [None]:
class RobustPipeline:
    """
    A production-ready pipeline with error handling and detailed logging.
    """
    
    def __init__(self, log_file: str = 'pipeline.log'):
        self.setup_logging(log_file)
        self.errors = []
        self.warnings = []
    
    def setup_logging(self, log_file: str):
        """Set up file and console logging."""
        self.logger = logging.getLogger('RobustPipeline')
        self.logger.setLevel(logging.INFO)
        
        # File handler
        fh = logging.FileHandler(log_file)
        fh.setLevel(logging.DEBUG)
        
        # Console handler
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        
        # Formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)
        
        self.logger.addHandler(fh)
        self.logger.addHandler(ch)
    
    def safe_execute(self, func, *args, **kwargs):
        """Safely execute a function with error handling."""
        try:
            result = func(*args, **kwargs)
            self.logger.info(f"Successfully executed: {func.__name__}")
            return result
        except Exception as e:
            error_msg = f"Error in {func.__name__}: {str(e)}"
            self.logger.error(error_msg)
            self.errors.append(error_msg)
            return args[0] if args else None  # Return original data on error
    
    def run(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
        """Run the pipeline with comprehensive error handling."""
        self.logger.info("="*60)
        self.logger.info("Starting Robust Pipeline")
        self.logger.info(f"Input shape: {df.shape}")
        
        # Create pipeline instance
        pipeline = DataPipeline(verbose=False)
        
        # Execute each step safely
        df_processed = df.copy()
        
        steps = [
            (pipeline.remove_duplicates, 'Removing duplicates'),
            (pipeline.standardize_text_columns, 'Standardizing text'),
            (pipeline.fix_data_types, 'Fixing data types'),
            (pipeline.validate_and_clean_values, 'Validating values'),
            (pipeline.handle_missing_values, 'Handling missing values'),
            (pipeline.create_derived_features, 'Creating features')
        ]
        
        for step_func, step_name in steps:
            self.logger.info(f"Executing: {step_name}")
            df_processed = self.safe_execute(step_func, df_processed)
            
            # Validate after each step
            if df_processed is None or df_processed.empty:
                self.logger.error(f"Pipeline failed at: {step_name}")
                break
        
        # Generate summary
        summary = {
            'input_shape': df.shape,
            'output_shape': df_processed.shape if df_processed is not None else None,
            'errors': self.errors,
            'warnings': self.warnings,
            'success': len(self.errors) == 0
        }
        
        self.logger.info(f"Pipeline completed. Success: {summary['success']}")
        self.logger.info(f"Output shape: {summary['output_shape']}")
        self.logger.info("="*60)
        
        return df_processed, summary

# Test robust pipeline
robust_pipeline = RobustPipeline('pipeline_test.log')
result_df, execution_summary = robust_pipeline.run(raw_data.head(100))

print("\nExecution Summary:")
print(f"Success: {execution_summary['success']}")
print(f"Input shape: {execution_summary['input_shape']}")
print(f"Output shape: {execution_summary['output_shape']}")
print(f"Errors: {len(execution_summary['errors'])}")
print(f"Warnings: {len(execution_summary['warnings'])}")

### 6.2 Pipeline Performance Monitoring

In [None]:
import time
from functools import wraps

def timer_decorator(func):
    """Decorator to time function execution."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"{func.__name__} took {execution_time:.4f} seconds")
        return result
    return wrapper

class MonitoredPipeline(DataPipeline):
    """
    Pipeline with performance monitoring.
    """
    
    def __init__(self, verbose: bool = True):
        super().__init__(verbose)
        self.performance_metrics = {}
    
    @timer_decorator
    def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        return super().remove_duplicates(df)
    
    @timer_decorator
    def standardize_text_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        return super().standardize_text_columns(df)
    
    @timer_decorator
    def handle_missing_values(self, df: pd.DataFrame, strategy: Dict[str, str] = None) -> pd.DataFrame:
        return super().handle_missing_values(df, strategy)

# Test monitored pipeline
monitored_pipeline = MonitoredPipeline(verbose=False)
print("Performance Monitoring:")
print("="*40)
result = monitored_pipeline.run_pipeline(raw_data.head(500), 
                                         ['remove_duplicates', 'standardize_text', 'handle_missing'])

### Practice Exercise 2: End-to-End Pipeline

Create a complete pipeline for a new dataset:

In [None]:
# TODO: Create a pipeline that:
# 1. Loads customer churn data
# 2. Performs quality assessment
# 3. Cleans and prepares the data
# 4. Creates features for ML
# 5. Validates the output
# 6. Saves the processed data

def create_churn_pipeline():
    """
    Create a complete pipeline for customer churn analysis.
    """
    # Step 1: Generate sample churn data
    # Your code here:
    
    # Step 2: Quality assessment
    # Your code here:
    
    # Step 3: Data cleaning
    # Your code here:
    
    # Step 4: Feature engineering
    # Your code here:
    
    # Step 5: Validation
    # Your code here:
    
    # Step 6: Save results
    # Your code here:
    
    pass

# Test your pipeline
# create_churn_pipeline()

## Summary and Best Practices

### Key Takeaways:

1. **Pipeline Design Principles:**
   - Modular and reusable components
   - Clear separation of concerns
   - Consistent error handling
   - Comprehensive logging

2. **Data Quality Steps:**
   - Always start with assessment
   - Handle duplicates early
   - Standardize before processing
   - Validate at multiple stages

3. **Performance Considerations:**
   - Monitor execution time
   - Optimize data types
   - Use vectorized operations
   - Consider chunking for large datasets

4. **Production Readiness:**
   - Configuration management
   - Error recovery mechanisms
   - Detailed logging and monitoring
   - Version control for pipelines

### Best Practices Checklist:

- [ ] Document all assumptions and decisions
- [ ] Create unit tests for each pipeline step
- [ ] Implement data validation rules
- [ ] Set up monitoring and alerting
- [ ] Version control your pipeline code
- [ ] Create rollback mechanisms
- [ ] Document data lineage
- [ ] Implement incremental processing where possible
- [ ] Create data quality dashboards
- [ ] Establish SLAs for pipeline execution

## Final Exercise: Complete Pipeline Implementation

Build a complete, production-ready pipeline for your own use case:

In [None]:
# TODO: Choose one of the following scenarios and build a complete pipeline:

# Scenario 1: Financial Transaction Pipeline
# - Detect and flag suspicious transactions
# - Handle currency conversions
# - Create risk scores
# - Generate compliance reports

# Scenario 2: Healthcare Data Pipeline
# - Anonymize patient information
# - Standardize medical codes
# - Handle missing lab results
# - Create patient risk profiles

# Scenario 3: Social Media Analytics Pipeline
# - Clean and normalize text data
# - Extract mentions and hashtags
# - Calculate engagement metrics
# - Identify trending topics

# Your implementation here:


## Conclusion

In this notebook, we've covered:

- **Building modular data pipelines** with reusable components
- **Implementing data quality checks** at each stage
- **Creating configurable pipelines** for different use cases
- **Adding error handling and logging** for production readiness
- **Monitoring pipeline performance** and optimization

These skills are essential for:
- Preparing data for machine learning models
- Ensuring reproducible data processing
- Building scalable data solutions
- Maintaining data quality in production

### Next Steps:
1. Apply these concepts to your own datasets
2. Explore pipeline orchestration tools (Airflow, Prefect)
3. Learn about streaming data pipelines
4. Study MLOps and model deployment pipelines