Thursday, 30 October 2025

Building a Batch Feature Store for Machine Learning with AWS EMR and DynamoDB (2025 Guide)

Building a Batch Feature Store for Machine Learning with AWS EMR and DynamoDB

Batch feature store architecture with AWS EMR for computation and DynamoDB for serving machine learning features

In 2025, feature stores have become the backbone of production machine learning systems, enabling teams to manage, version, and serve features consistently across training and inference. While real-time feature stores grab headlines, batch feature processing remains crucial for historical data, model retraining, and cost-effective feature engineering at scale. This comprehensive guide explores how to build a robust batch feature store using AWS EMR for distributed processing and DynamoDB for low-latency serving. You'll learn advanced patterns for feature computation, versioning, monitoring, and integration with modern ML pipelines that can handle terabytes of data while maintaining millisecond latency for feature retrieval.

🚀 Why Batch Feature Stores Are Essential in 2025

Batch feature stores provide the foundation for reliable, reproducible machine learning systems. They solve critical challenges in ML operations by providing consistent feature definitions, efficient computation, and scalable serving infrastructure.

  • Feature Consistency: Ensure identical feature computation during training and inference
  • Historical Point-in-Time: Accurately recreate feature values as they existed at prediction time
  • Cost Optimization: Process large datasets efficiently using distributed computing
  • Reproducibility: Version features and maintain lineage for model audits
  • Team Collaboration: Share and reuse features across multiple ML projects

🔧 Architecture Overview: EMR + DynamoDB Feature Store

Our batch feature store architecture leverages AWS EMR for distributed feature computation and DynamoDB for high-performance feature serving. This combination provides the perfect balance of computational power and low-latency access.

  • AWS EMR: Distributed Spark processing for feature computation
  • DynamoDB: NoSQL database for low-latency feature serving
  • S3: Data lake for raw data and computed feature storage
  • Glue Data Catalog: Central metadata repository
  • Step Functions: Orchestration of feature computation pipelines

💻 Infrastructure as Code: Terraform Configuration

Let's start with the complete Terraform configuration for our batch feature store infrastructure, including EMR cluster, DynamoDB tables, and supporting AWS services.


# main.tf - Batch Feature Store Infrastructure
terraform {
  required_version = ">= 1.5.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

# EMR Cluster for feature computation
resource "aws_emr_cluster" "feature_store" {
  name          = "feature-store-cluster"
  release_label = "emr-7.0.0"
  applications  = ["Spark", "Hive", "Livy"]
  
  ec2_attributes {
    subnet_id                         = aws_subnet.private.id
    emr_managed_master_security_group = aws_security_group.emr_master.id
    emr_managed_slave_security_group  = aws_security_group.emr_slave.id
    instance_profile                  = aws_iam_instance_profile.emr_ec2_profile.arn
  }
  
  master_instance_group {
    instance_type = "m5.2xlarge"
    instance_count = 1
  }
  
  core_instance_group {
    instance_type  = "m5.4xlarge"
    instance_count = 4
    ebs_config {
      size                 = 256
      type                 = "gp3"
      volumes_per_instance = 1
    }
  }
  
  configurations_json = jsonencode([
    {
      "Classification" : "spark-defaults",
      "Properties" : {
        "spark.sql.adaptive.enabled" : "true",
        "spark.sql.adaptive.coalescePartitions.enabled" : "true",
        "spark.sql.adaptive.skewJoin.enabled" : "true",
        "spark.dynamicAllocation.enabled" : "true",
        "spark.serializer" : "org.apache.spark.serializer.KryoSerializer",
        "spark.sql.catalogImplementation" : "hive",
        "spark.hadoop.hive.metastore.client.factory.class" : "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
      }
    }
  ])
  
  service_role = aws_iam_role.emr_service_role.arn
  autoscaling_role = aws_iam_role.emr_autoscaling_role.arn
  
  tags = {
    Project     = "feature-store"
    Environment = "production"
  }
}

# DynamoDB tables for feature serving
resource "aws_dynamodb_table" "feature_store" {
  name           = "feature-store"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "entity_id"
  range_key      = "feature_timestamp"
  
  attribute {
    name = "entity_id"
    type = "S"
  }
  
  attribute {
    name = "feature_timestamp"
    type = "N"
  }
  
  # GSI for feature type queries
  global_secondary_index {
    name               = "feature_type-index"
    hash_key           = "feature_type"
    range_key          = "feature_timestamp"
    projection_type    = "INCLUDE"
    non_key_attributes = ["entity_id", "feature_values", "feature_version"]
  }
  
  # GSI for feature version queries
  global_secondary_index {
    name               = "feature_version-index"
    hash_key           = "feature_version"
    range_key          = "feature_timestamp"
    projection_type    = "ALL"
  }
  
  ttl {
    attribute_name = "expiry_time"
    enabled        = true
  }
  
  point_in_time_recovery {
    enabled = true
  }
  
  tags = {
    Project     = "feature-store"
    Environment = "production"
  }
}

# S3 buckets for raw data and features
resource "aws_s3_bucket" "feature_store" {
  bucket = "feature-store-${var.environment}-${random_id.bucket_suffix.hex}"
  
  tags = {
    Project     = "feature-store"
    Environment = var.environment
  }
}

resource "aws_s3_bucket_versioning" "feature_store" {
  bucket = aws_s3_bucket.feature_store.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "feature_store" {
  bucket = aws_s3_bucket.feature_store.id
  
  rule {
    id     = "raw-data-transition"
    status = "Enabled"
    
    filter {
      prefix = "raw/"
    }
    
    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }
    
    transition {
      days          = 90
      storage_class = "GLACIER"
    }
  }
  
  rule {
    id     = "feature-data-retention"
    status = "Enabled"
    
    filter {
      prefix = "features/"
    }
    
    expiration {
      days = 730  # 2 years retention
    }
  }
}

# Glue Data Catalog database
resource "aws_glue_catalog_database" "feature_store" {
  name = "feature_store"
  
  parameters = {
    description = "Feature store metadata database"
  }
}

# Step Functions for pipeline orchestration
resource "aws_sfn_state_machine" "feature_pipeline" {
  name     = "feature-pipeline"
  role_arn = aws_iam_role.step_functions.arn
  
  definition = jsonencode({
    "Comment" : "Batch Feature Computation Pipeline",
    "StartAt" : "ValidateInput",
    "States" : {
      "ValidateInput" : {
        "Type" : "Task",
        "Resource" : "arn:aws:states:::lambda:invoke",
        "Parameters" : {
          "FunctionName" : "${aws_lambda_function.validate_input.arn}",
          "Payload" : {
            "input.$" : "$"
          }
        },
        "Next" : "ComputeFeatures"
      },
      "ComputeFeatures" : {
        "Type" : "Task",
        "Resource" : "arn:aws:states:::elasticmapreduce:addStep.sync",
        "Parameters" : {
          "ClusterId" : aws_emr_cluster.feature_store.id,
          "Step" : {
            "Name" : "ComputeFeatures",
            "ActionOnFailure" : "TERMINATE_CLUSTER",
            "HadoopJarStep" : {
              "Jar" : "command-runner.jar",
              "Args" : [
                "spark-submit",
                "--deploy-mode",
                "cluster",
                "--class",
                "com.featurestore.BatchFeatureComputation",
                "s3://${aws_s3_bucket.feature_store.id}/jobs/feature-computation.jar",
                "--input-path",
                "s3://${aws_s3_bucket.feature_store.id}/raw/",
                "--output-path",
                "s3://${aws_s3_bucket.feature_store.id}/features/",
                "--feature-version",
                "v1.0"
              ]
            }
          }
        },
        "Next" : "LoadToDynamoDB"
      },
      "LoadToDynamoDB" : {
        "Type" : "Task",
        "Resource" : "arn:aws:states:::lambda:invoke",
        "Parameters" : {
          "FunctionName" : "${aws_lambda_function.load_to_dynamodb.arn}",
          "Payload" : {
            "feature_path.$" : "$.OutputPath",
            "feature_version.$" : "$.FeatureVersion"
          }
        },
        "End" : true
      }
    }
  })
  
  tags = {
    Project     = "feature-store"
    Environment = "production"
  }
}

  

🛠️ Advanced Spark Feature Computation

Here's the core Spark application for distributed feature computation with support for point-in-time correctness, feature versioning, and efficient window operations.


# feature_computation.py - Advanced Spark Feature Computation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime, timedelta
import json
from typing import Dict, List, Any

class BatchFeatureComputation:
    def __init__(self, spark_session: SparkSession):
        self.spark = spark_session
        self.feature_registry = {}
        
    def register_feature_definition(self, feature_name: str, 
                                  computation_func, 
                                  dependencies: List[str] = None):
        """Register feature computation logic with dependencies"""
        self.feature_registry[feature_name] = {
            'computation': computation_func,
            'dependencies': dependencies or []
        }
    
    def compute_user_features(self, users_df, transactions_df, 
                            feature_date: str, feature_version: str):
        """Compute comprehensive user features with point-in-time correctness"""
        
        # Filter data for point-in-time correctness
        transactions_pit = transactions_df.filter(
            col("transaction_timestamp") <= feature_date
        )
        
        users_pit = users_df.filter(
            col("created_at") <= feature_date
        )
        
        # User demographic features
        demographic_features = users_pit.select(
            col("user_id").alias("entity_id"),
            lit(feature_date).cast("timestamp").alias("feature_timestamp"),
            col("age"),
            col("gender"),
            col("location"),
            (year(current_date()) - year(col("created_at"))).alias("account_age_years"),
            when(col("premium_member") == True, 1).otherwise(0).alias("is_premium_member"),
            lit(feature_version).alias("feature_version"),
            lit("user_demographic").alias("feature_type")
        )
        
        # Transaction behavior features (last 30 days)
        thirty_days_ago = (datetime.strptime(feature_date, "%Y-%m-%d") - 
                          timedelta(days=30)).strftime("%Y-%m-%d")
        
        recent_transactions = transactions_pit.filter(
            col("transaction_timestamp") >= thirty_days_ago
        )
        
        # Window functions for sequential features
        user_window = Window.partitionBy("user_id").orderBy("transaction_timestamp")
        
        transaction_features = recent_transactions.groupBy("user_id").agg(
            count("*").alias("transaction_count_30d"),
            sum("amount").alias("total_spend_30d"),
            avg("amount").alias("avg_transaction_amount_30d"),
            stddev("amount").alias("std_transaction_amount_30d"),
            countDistinct("merchant_category").alias("unique_categories_30d"),
            sum(when(col("amount") > 100, 1).otherwise(0)).alias("large_transactions_30d"),
            
            # Time-based features
            datediff(
                lit(feature_date), 
                max("transaction_timestamp").cast("date")
            ).alias("days_since_last_transaction"),
            
            # Sequential features using window functions
            first("amount").over(user_window.rowsBetween(-10, -1)).alias("last_10_transactions_avg")
        ).withColumnRenamed("user_id", "entity_id")
        
        # Advanced feature: Spending patterns by day of week
        spending_patterns = recent_transactions.groupBy(
            "user_id", 
            dayofweek("transaction_timestamp").alias("day_of_week")
        ).agg(
            sum("amount").alias("daily_spend"),
            count("*").alias("daily_transactions")
        ).groupBy("user_id").pivot("day_of_week").agg(
            first("daily_spend").alias("spend"),
            first("daily_transactions").alias("transactions")
        ).fillna(0)
        
        # Feature: Transaction frequency changes
        current_period = recent_transactions.filter(
            col("transaction_timestamp") >= (datetime.strptime(feature_date, "%Y-%m-%d") - 
                                           timedelta(days=15)).strftime("%Y-%m-%d")
        ).groupBy("user_id").agg(
            count("*").alias("recent_transaction_count")
        )
        
        previous_period = transactions_pit.filter(
            (col("transaction_timestamp") >= (datetime.strptime(feature_date, "%Y-%m-%d") - 
                                            timedelta(days=30)).strftime("%Y-%m-%d")) &
            (col("transaction_timestamp") < (datetime.strptime(feature_date, "%Y-%m-%d") - 
                                           timedelta(days=15)).strftime("%Y-%m-%d"))
        ).groupBy("user_id").agg(
            count("*").alias("previous_transaction_count")
        )
        
        frequency_change = current_period.join(
            previous_period, "user_id", "left"
        ).fillna(0).withColumn(
            "transaction_frequency_change",
            when(col("previous_transaction_count") == 0, 0).otherwise(
                (col("recent_transaction_count") - col("previous_transaction_count")) / 
                col("previous_transaction_count")
            )
        ).select("user_id", "transaction_frequency_change")
        
        # Combine all features
        final_features = demographic_features \
            .join(transaction_features, "entity_id", "left") \
            .join(spending_patterns, "entity_id", "left") \
            .join(frequency_change.withColumnRenamed("user_id", "entity_id"), "entity_id", "left") \
            .fillna(0)
        
        return final_features
    
    def compute_rolling_window_features(self, df: DataFrame, entity_col: str, 
                                      timestamp_col: str, value_col: str,
                                      windows: List[int] = [7, 30, 90]):
        """Compute rolling window statistics for time-series features"""
        
        features = df
        
        for window_days in windows:
            window_spec = Window.partitionBy(entity_col) \
                              .orderBy(col(timestamp_col).cast("timestamp").cast("long")) \
                              .rangeBetween(-window_days * 86400, 0)
            
            features = features \
                .withColumn(f"rolling_avg_{window_days}d", 
                           avg(value_col).over(window_spec)) \
                .withColumn(f"rolling_std_{window_days}d", 
                           stddev(value_col).over(window_spec)) \
                .withColumn(f"rolling_sum_{window_days}d", 
                           sum(value_col).over(window_spec)) \
                .withColumn(f"rolling_count_{window_days}d", 
                           count(value_col).over(window_spec))
        
        return features
    
    def compute_cross_entity_features(self, primary_df: DataFrame, 
                                    secondary_df: DataFrame, 
                                    join_key: str, feature_prefix: str):
        """Compute features by joining with related entities"""
        
        # Aggregate secondary entity features
        secondary_agg = secondary_df.groupBy(join_key).agg(
            count("*").alias(f"{feature_prefix}_count"),
            sum("amount").alias(f"{feature_prefix}_total_amount"),
            avg("amount").alias(f"{feature_prefix}_avg_amount"),
            countDistinct("category").alias(f"{feature_prefix}_unique_categories")
        )
        
        # Join with primary entities
        cross_features = primary_df.join(secondary_agg, join_key, "left").fillna(0)
        
        return cross_features
    
    def save_features_to_s3(self, features_df: DataFrame, output_path: str, 
                          partition_cols: List[str] = None):
        """Save computed features to S3 with partitioning"""
        
        writer = features_df.write \
            .mode("overwrite") \
            .option("compression", "snappy")
        
        if partition_cols:
            writer = writer.partitionBy(*partition_cols)
        
        writer.parquet(output_path)
        
        # Write feature metadata
        feature_metadata = {
            "feature_count": features_df.count(),
            "computation_timestamp": datetime.now().isoformat(),
            "schema": features_df.schema.json(),
            "partition_columns": partition_cols or []
        }
        
        # Save metadata
        metadata_rdd = self.spark.sparkContext.parallelize([json.dumps(feature_metadata)])
        metadata_rdd.saveAsTextFile(f"{output_path}/_metadata/")
    
    def validate_features(self, features_df: DataFrame) -> Dict[str, Any]:
        """Validate feature quality and data integrity"""
        
        validation_results = {}
        
        # Check for null values
        null_counts = {}
        for column in features_df.columns:
            null_count = features_df.filter(col(column).isNull()).count()
            null_counts[column] = null_count
        
        validation_results["null_counts"] = null_counts
        
        # Check for data type consistency
        schema_validation = {}
        for field in features_df.schema.fields:
            schema_validation[field.name] = {
                "data_type": str(field.dataType),
                "nullable": field.nullable
            }
        
        validation_results["schema_validation"] = schema_validation
        
        # Statistical validation
        numeric_columns = [f.name for f in features_df.schema.fields 
                          if isinstance(f.dataType, (DoubleType, FloatType, IntegerType, LongType))]
        
        stats_validation = {}
        for column in numeric_columns:
            stats = features_df.select(
                mean(col(column)).alias("mean"),
                stddev(col(column)).alias("stddev"),
                min(col(column)).alias("min"),
                max(col(column)).alias("max")
            ).collect()[0]
            
            stats_validation[column] = {
                "mean": stats["mean"],
                "stddev": stats["stddev"],
                "min": stats["min"],
                "max": stats["max"]
            }
        
        validation_results["statistical_validation"] = stats_validation
        
        return validation_results

# Main execution
def main():
    spark = SparkSession.builder \
        .appName("BatchFeatureComputation") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .getOrCreate()
    
    # Initialize feature computation
    feature_engine = BatchFeatureComputation(spark)
    
    # Load source data
    users_df = spark.read.parquet("s3://feature-store/raw/users/")
    transactions_df = spark.read.parquet("s3://feature-store/raw/transactions/")
    
    # Compute features for specific date
    feature_date = "2025-01-25"
    feature_version = "v1.2"
    
    user_features = feature_engine.compute_user_features(
        users_df, transactions_df, feature_date, feature_version
    )
    
    # Validate features
    validation_results = feature_engine.validate_features(user_features)
    print("Feature validation results:", validation_results)
    
    # Save features
    feature_engine.save_features_to_s3(
        user_features, 
        "s3://feature-store/features/user_features/",
        partition_cols=["feature_timestamp"]
    )
    
    spark.stop()

if __name__ == "__main__":
    main()

  

🚀 DynamoDB Feature Serving Layer

The DynamoDB serving layer provides low-latency access to computed features. Here's the implementation for efficient feature retrieval and updates.


# feature_serving.py - DynamoDB Feature Serving Layer
import boto3
from botocore.config import Config
from datetime import datetime, timedelta
import json
from typing import Dict, List, Any, Optional
import pandas as pd
from decimal import Decimal

class DynamoDBFeatureStore:
    def __init__(self, table_name: str, region: str = "us-east-1"):
        self.config = Config(
            retries={
                'max_attempts': 10,
                'mode': 'adaptive'
            },
            read_timeout=300,
            connect_timeout=300
        )
        
        self.dynamodb = boto3.resource('dynamodb', region_name=region, config=self.config)
        self.table = self.dynamodb.Table(table_name)
        self.client = boto3.client('dynamodb', region_name=region, config=self.config)
    
    def _convert_floats_to_decimals(self, obj):
        """Convert float values to Decimal for DynamoDB compatibility"""
        if isinstance(obj, list):
            return [self._convert_floats_to_decimals(item) for item in obj]
        elif isinstance(obj, dict):
            return {k: self._convert_floats_to_decimals(v) for k, v in obj.items()}
        elif isinstance(obj, float):
            return Decimal(str(obj))
        else:
            return obj
    
    def store_features(self, entity_id: str, features: Dict[str, Any], 
                      feature_timestamp: datetime, feature_version: str,
                      feature_type: str, ttl_days: int = 365):
        """Store computed features in DynamoDB with TTL"""
        
        # Prepare feature item
        feature_item = {
            'entity_id': entity_id,
            'feature_timestamp': int(feature_timestamp.timestamp()),
            'feature_type': feature_type,
            'feature_version': feature_version,
            'feature_values': self._convert_floats_to_decimals(features),
            'created_at': datetime.now().isoformat(),
            'expiry_time': int((datetime.now() + timedelta(days=ttl_days)).timestamp())
        }
        
        try:
            response = self.table.put_item(Item=feature_item)
            return True
        except Exception as e:
            print(f"Error storing features for {entity_id}: {e}")
            return False
    
    def batch_store_features(self, feature_batch: List[Dict[str, Any]]):
        """Batch store features for better performance"""
        
        with self.table.batch_writer() as batch:
            for feature_item in feature_batch:
                batch.put_item(Item=feature_item)
    
    def get_features(self, entity_id: str, feature_timestamp: datetime,
                    feature_type: str = None, feature_version: str = None) -> Optional[Dict[str, Any]]:
        """Retrieve features for a specific entity and timestamp"""
        
        key_conditions = {
            'entity_id': {
                'AttributeValueList': [{'S': entity_id}],
                'ComparisonOperator': 'EQ'
            },
            'feature_timestamp': {
                'AttributeValueList': [{'N': str(int(feature_timestamp.timestamp()))}],
                'ComparisonOperator': 'EQ'
            }
        }
        
        # Add filter conditions if provided
        query_kwargs = {
            'KeyConditions': key_conditions,
            'Limit': 1
        }
        
        if feature_type:
            query_kwargs['QueryFilter'] = {
                'feature_type': {
                    'AttributeValueList': [{'S': feature_type}],
                    'ComparisonOperator': 'EQ'
                }
            }
        
        try:
            response = self.client.query(
                TableName=self.table.name,
                **query_kwargs
            )
            
            if response['Items']:
                item = response['Items'][0]
                return self._convert_dynamo_to_python(item)
            else:
                return None
                
        except Exception as e:
            print(f"Error retrieving features for {entity_id}: {e}")
            return None
    
    def get_latest_features(self, entity_id: str, feature_type: str = None,
                          max_lookback_days: int = 30) -> Optional[Dict[str, Any]]:
        """Get the most recent features for an entity within lookback window"""
        
        lookback_timestamp = int((datetime.now() - timedelta(days=max_lookback_days)).timestamp())
        
        key_conditions = {
            'entity_id': {
                'AttributeValueList': [{'S': entity_id}],
                'ComparisonOperator': 'EQ'
            },
            'feature_timestamp': {
                'AttributeValueList': [{'N': str(lookback_timestamp)}],
                'ComparisonOperator': 'GT'
            }
        }
        
        query_kwargs = {
            'KeyConditions': key_conditions,
            'ScanIndexForward': False,  # Most recent first
            'Limit': 1
        }
        
        if feature_type:
            query_kwargs['QueryFilter'] = {
                'feature_type': {
                    'AttributeValueList': [{'S': feature_type}],
                    'ComparisonOperator': 'EQ'
                }
            }
        
        try:
            response = self.client.query(
                TableName=self.table.name,
                **query_kwargs
            )
            
            if response['Items']:
                item = response['Items'][0]
                return self._convert_dynamo_to_python(item)
            else:
                return None
                
        except Exception as e:
            print(f"Error retrieving latest features for {entity_id}: {e}")
            return None
    
    def get_feature_history(self, entity_id: str, start_time: datetime,
                          end_time: datetime, feature_type: str = None) -> List[Dict[str, Any]]:
        """Get feature history for an entity within a time range"""
        
        key_conditions = {
            'entity_id': {
                'AttributeValueList': [{'S': entity_id}],
                'ComparisonOperator': 'EQ'
            },
            'feature_timestamp': {
                'AttributeValueList': [
                    {'N': str(int(start_time.timestamp()))},
                    {'N': str(int(end_time.timestamp()))}
                ],
                'ComparisonOperator': 'BETWEEN'
            }
        }
        
        query_kwargs = {
            'KeyConditions': key_conditions
        }
        
        if feature_type:
            query_kwargs['QueryFilter'] = {
                'feature_type': {
                    'AttributeValueList': [{'S': feature_type}],
                    'ComparisonOperator': 'EQ'
                }
            }
        
        try:
            response = self.client.query(
                TableName=self.table.name,
                **query_kwargs
            )
            
            features = []
            for item in response['Items']:
                features.append(self._convert_dynamo_to_python(item))
            
            return features
            
        except Exception as e:
            print(f"Error retrieving feature history for {entity_id}: {e}")
            return []
    
    def batch_get_features(self, entity_ids: List[str], feature_timestamp: datetime,
                          feature_type: str = None) -> Dict[str, Any]:
        """Batch retrieve features for multiple entities"""
        
        keys = []
        for entity_id in entity_ids:
            key = {
                'entity_id': {'S': entity_id},
                'feature_timestamp': {'N': str(int(feature_timestamp.timestamp()))}
            }
            keys.append(key)
        
        request_items = {
            self.table.name: {
                'Keys': keys
            }
        }
        
        if feature_type:
            request_items[self.table.name]['ExpressionAttributeNames'] = {
                '#ft': 'feature_type'
            }
            request_items[self.table.name]['ExpressionAttributeValues'] = {
                ':ft': {'S': feature_type}
            }
            request_items[self.table.name]['FilterExpression'] = '#ft = :ft'
        
        try:
            response = self.client.batch_get_item(RequestItems=request_items)
            features = {}
            
            for item in response['Responses'][self.table.name]:
                entity_id = item['entity_id']['S']
                features[entity_id] = self._convert_dynamo_to_python(item)
            
            return features
            
        except Exception as e:
            print(f"Error in batch get features: {e}")
            return {}
    
    def _convert_dynamo_to_python(self, dynamo_item: Dict) -> Dict[str, Any]:
        """Convert DynamoDB item to Python native types"""
        result = {}
        
        for key, value in dynamo_item.items():
            if 'S' in value:
                result[key] = value['S']
            elif 'N' in value:
                # Try to convert to int first, then float
                num_str = value['N']
                if '.' in num_str:
                    result[key] = float(num_str)
                else:
                    result[key] = int(num_str)
            elif 'M' in value:
                result[key] = self._convert_dynamo_to_python(value['M'])
            elif 'L' in value:
                result[key] = [self._convert_dynamo_to_python(item) for item in value['L']]
            elif 'BOOL' in value:
                result[key] = value['BOOL']
            elif 'NULL' in value:
                result[key] = None
            else:
                result[key] = value
        
        return result
    
    def get_feature_statistics(self, feature_type: str, 
                             start_time: datetime, end_time: datetime) -> Dict[str, Any]:
        """Get statistics about stored features for monitoring"""
        
        # Use GSI for feature type queries
        response = self.client.query(
            TableName=self.table.name,
            IndexName='feature_type-index',
            KeyConditions={
                'feature_type': {
                    'AttributeValueList': [{'S': feature_type}],
                    'ComparisonOperator': 'EQ'
                },
                'feature_timestamp': {
                    'AttributeValueList': [
                        {'N': str(int(start_time.timestamp()))},
                        {'N': str(int(end_time.timestamp()))}
                    ],
                    'ComparisonOperator': 'BETWEEN'
                }
            },
            Select='COUNT'
        )
        
        stats = {
            'feature_count': response['Count'],
            'scanned_count': response['ScannedCount'],
            'feature_type': feature_type,
            'time_range': {
                'start': start_time.isoformat(),
                'end': end_time.isoformat()
            }
        }
        
        return stats

# Example usage
def example_usage():
    feature_store = DynamoDBFeatureStore("feature-store")
    
    # Store features
    features = {
        "transaction_count_30d": 45,
        "total_spend_30d": 1250.75,
        "avg_transaction_amount": 27.79,
        "is_premium_member": True
    }
    
    feature_store.store_features(
        entity_id="user_12345",
        features=features,
        feature_timestamp=datetime.now(),
        feature_version="v1.2",
        feature_type="user_behavior"
    )
    
    # Retrieve features
    latest_features = feature_store.get_latest_features("user_12345")
    print("Latest features:", latest_features)
    
    # Batch retrieval
    entity_ids = ["user_12345", "user_67890", "user_11111"]
    batch_features = feature_store.batch_get_features(
        entity_ids, 
        datetime.now() - timedelta(days=1)
    )
    print("Batch features:", batch_features)

if __name__ == "__main__":
    example_usage()

  

📊 Monitoring and Data Quality Framework

Production feature stores require comprehensive monitoring and data quality checks. Here's the implementation for ensuring feature reliability.


# monitoring.py - Feature Store Monitoring and Data Quality
import boto3
from datetime import datetime, timedelta
import json
import pandas as pd
from typing import Dict, List, Any
import logging
from dataclasses import dataclass

@dataclass
class DataQualityCheck:
    name: str
    check_type: str  # 'completeness', 'freshness', 'distribution', 'schema'
    threshold: float
    description: str

class FeatureStoreMonitor:
    def __init__(self, dynamodb_table: str, cloudwatch_namespace: str = "FeatureStore"):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(dynamodb_table)
        self.cloudwatch = boto3.client('cloudwatch')
        self.namespace = cloudwatch_namespace
        self.logger = logging.getLogger(__name__)
        
        # Define data quality checks
        self.quality_checks = [
            DataQualityCheck(
                name="feature_freshness",
                check_type="freshness",
                threshold=24,  # hours
                description="Features should be updated within 24 hours"
            ),
            DataQualityCheck(
                name="feature_completeness",
                check_type="completeness", 
                threshold=0.95,  # 95% completeness
                description="At least 95% of expected features should be available"
            ),
            DataQualityCheck(
                name="value_distribution",
                check_type="distribution",
                threshold=0.01,  # 1% outlier threshold
                description="Feature values should be within expected distribution"
            )
        ]
    
    def check_feature_freshness(self, feature_type: str, 
                              expected_update_frequency_hours: int = 24) -> Dict[str, Any]:
        """Check if features are being updated as expected"""
        
        # Get the most recent feature timestamp
        response = self.table.query(
            IndexName='feature_type-index',
            KeyConditionExpression='feature_type = :ft',
            ExpressionAttributeValues={':ft': feature_type},
            ScanIndexForward=False,  # Most recent first
            Limit=1
        )
        
        if not response['Items']:
            return {
                'check_name': 'feature_freshness',
                'status': 'FAILED',
                'message': f'No features found for type: {feature_type}',
                'last_update': None,
                'hours_since_update': None
            }
        
        latest_item = response['Items'][0]
        last_update_timestamp = latest_item['feature_timestamp']
        last_update_time = datetime.fromtimestamp(int(last_update_timestamp))
        hours_since_update = (datetime.now() - last_update_time).total_seconds() / 3600
        
        status = 'PASS' if hours_since_update <= expected_update_frequency_hours else 'FAIL'
        
        return {
            'check_name': 'feature_freshness',
            'status': status,
            'message': f'Last update: {hours_since_update:.1f} hours ago',
            'last_update': last_update_time.isoformat(),
            'hours_since_update': hours_since_update
        }
    
    def check_feature_completeness(self, feature_type: str, 
                                 expected_entity_count: int) -> Dict[str, Any]:
        """Check if all expected entities have features"""
        
        # Count distinct entities with features
        # Note: This is a simplified implementation
        # In production, you might need more sophisticated counting
        
        response = self.table.query(
            IndexName='feature_type-index',
            KeyConditionExpression='feature_type = :ft',
            ExpressionAttributeValues={':ft': feature_type},
            Select='COUNT'
        )
        
        feature_count = response['Count']
        completeness_ratio = feature_count / expected_entity_count
        
        status = 'PASS' if completeness_ratio >= 0.95 else 'FAIL'
        
        return {
            'check_name': 'feature_completeness',
            'status': status,
            'message': f'Completeness: {completeness_ratio:.2%} ({feature_count}/{expected_entity_count})',
            'completeness_ratio': completeness_ratio,
            'feature_count': feature_count,
            'expected_count': expected_entity_count
        }
    
    def run_data_quality_checks(self, feature_type: str, 
                              expected_entity_count: int = None) -> List[Dict[str, Any]]:
        """Run all data quality checks for a feature type"""
        
        results = []
        
        for check in self.quality_checks:
            if check.check_type == 'freshness':
                result = self.check_feature_freshness(feature_type)
            elif check.check_type == 'completeness' and expected_entity_count:
                result = self.check_feature_completeness(feature_type, expected_entity_count)
            else:
                continue
            
            results.append(result)
            
            # Publish to CloudWatch
            self._publish_metric(
                metric_name=f"{check.name}_{feature_type}",
                value=1 if result['status'] == 'PASS' else 0,
                dimensions={'FeatureType': feature_type, 'CheckName': check.name}
            )
        
        return results
    
    def monitor_feature_serving_latency(self):
        """Monitor feature retrieval latency"""
        # This would integrate with your serving layer
        # For example, you could use X-Ray or custom timing
        pass
    
    def track_feature_usage(self, feature_type: str, entity_count: int):
        """Track feature usage patterns"""
        
        self._publish_metric(
            metric_name="feature_usage_count",
            value=entity_count,
            dimensions={'FeatureType': feature_type}
        )
    
    def _publish_metric(self, metric_name: str, value: float, 
                       dimensions: Dict[str, str] = None):
        """Publish custom metric to CloudWatch"""
        
        metric_data = {
            'MetricName': metric_name,
            'Value': value,
            'Unit': 'Count',
            'Timestamp': datetime.now()
        }
        
        if dimensions:
            metric_data['Dimensions'] = [
                {'Name': k, 'Value': v} for k, v in dimensions.items()
            ]
        
        try:
            self.cloudwatch.put_metric_data(
                Namespace=self.namespace,
                MetricData=[metric_data]
            )
        except Exception as e:
            self.logger.error(f"Failed to publish metric {metric_name}: {e}")
    
    def create_dashboard(self, feature_types: List[str]):
        """Create CloudWatch dashboard for feature store monitoring"""
        
        dashboard_body = {
            "widgets": []
        }
        
        for feature_type in feature_types:
            # Add freshness widget
            dashboard_body["widgets"].append({
                "type": "metric",
                "properties": {
                    "metrics": [
                        [self.namespace, "feature_freshness_status", "FeatureType", feature_type]
                    ],
                    "period": 300,
                    "stat": "Average",
                    "region": "us-east-1",
                    "title": f"{feature_type} - Freshness Status",
                    "yAxis": {
                        "left": {
                            "min": 0,
                            "max": 1
                        }
                    }
                }
            })
            
            # Add completeness widget
            dashboard_body["widgets"].append({
                "type": "metric", 
                "properties": {
                    "metrics": [
                        [self.namespace, "feature_completeness_ratio", "FeatureType", feature_type]
                    ],
                    "period": 300,
                    "stat": "Average",
                    "region": "us-east-1",
                    "title": f"{feature_type} - Completeness Ratio"
                }
            })
        
        try:
            self.cloudwatch.put_dashboard(
                DashboardName="FeatureStore-Monitoring",
                DashboardBody=json.dumps(dashboard_body)
            )
            self.logger.info("CloudWatch dashboard created successfully")
        except Exception as e:
            self.logger.error(f"Failed to create dashboard: {e}")

# Example usage
def monitor_example():
    monitor = FeatureStoreMonitor("feature-store")
    
    # Run data quality checks
    results = monitor.run_data_quality_checks(
        feature_type="user_behavior",
        expected_entity_count=10000
    )
    
    for result in results:
        print(f"Check: {result['check_name']}, Status: {result['status']}")
    
    # Create monitoring dashboard
    monitor.create_dashboard(["user_behavior", "product_features", "transaction_features"])

if __name__ == "__main__":
    monitor_example()

  

⚡ Key Takeaways

  1. Architecture Matters: EMR for computation + DynamoDB for serving provides optimal cost-performance balance
  2. Point-in-Time Correctness: Essential for model training and evaluation to avoid data leakage
  3. Feature Versioning: Critical for model reproducibility and A/B testing
  4. Data Quality Monitoring: Automated checks ensure feature reliability in production
  5. Scalable Design: Horizontal scaling with proper partitioning handles growing data volumes
  6. Cost Optimization: Use spot instances for EMR and DynamoDB auto-scaling for cost efficiency
  7. Operational Excellence: Comprehensive monitoring and alerting for production reliability

❓ Frequently Asked Questions

When should I use a batch feature store vs. a real-time feature store?
Use batch feature stores for historical data, model training, and features that don't require real-time computation. Use real-time feature stores for low-latency online inference and features that change frequently. Many organizations use both - batch for training and historical features, real-time for fresh features during inference.
How do I handle feature schema evolution without breaking existing models?
Implement feature versioning and backward compatibility. When adding new features, create new feature versions while maintaining old versions for existing models. Use feature flags to gradually roll out new features. Always test new feature versions with shadow deployment before full rollout.
What's the optimal data partitioning strategy for DynamoDB feature storage?
Partition by entity ID (user_id, product_id, etc.) with feature timestamp as sort key. This enables efficient point-in-time queries and time-range scans. Use Global Secondary Indexes for querying by feature type or version. Monitor partition heat and use random suffixing for high-cardinality partition keys.
How can I ensure point-in-time correctness for feature computation?
Always filter source data by the feature timestamp cutoff. Use event time from your source systems rather than processing time. Implement watermarking for late-arriving data. Store feature computation metadata including source data versions and computation parameters.
What monitoring and alerting should I implement for production feature stores?
Monitor feature freshness (update frequency), completeness (coverage of entities), data quality (null rates, value distributions), and serving latency. Set up alerts for pipeline failures, data quality violations, and performance degradation. Implement circuit breakers for feature serving during outages.
How do I manage costs for large-scale feature stores?
Use EMR spot instances for computation, implement data lifecycle policies in S3, use DynamoDB auto-scaling, and implement feature TTL policies. Monitor feature usage and archive unused features. Use compression for feature storage and implement query optimization to reduce DynamoDB read capacity.

💬 Have you implemented a batch feature store in production? Share your architecture decisions, challenges, or performance optimization tips in the comments below! If you found this guide helpful, please share it with your ML engineering team or on social media.

About LK-TECH Academy — Practical tutorials & explainers on software engineering, AI, and infrastructure. Follow for concise, hands-on guides.

No comments:

Post a Comment