MLOps & Model Deployment
Bringen Sie Ihre ML-Modelle erfolgreich in die Produktion. Lernen Sie bewährte Praktiken für Deployment, Monitoring, Versionierung und Skalierung von Machine Learning Systemen für Unternehmensautomatisierung.
MLOps Lifecycle
Development
Model Training & Experimentation
Deployment
Production Deployment & CI/CD
Operations
Monitoring & Maintenance
Was ist MLOps?
MLOps (Machine Learning Operations) ist ein Satz von Praktiken, die darauf abzielen, ML-Entwicklung und -Betrieb zu vereinheitlichen. Es kombiniert Machine Learning,DevOps und Data Engineering, um ML-Systeme zuverlässig und effizient zu entwickeln, zu deployen und zu verwalten.
Der Hauptunterschied zu traditioneller Software-Entwicklung liegt in der Komplexität von ML-Systemen: Sie müssen nicht nur Code, sondern auch Daten, Modelle und deren kontinuierliche Qualität verwalten.
Vorteile von MLOps
- •Schnellere Time-to-Market
- •Bessere Modell-Performance
- •Automatisierte Qualitätskontrolle
- •Skalierbare ML-Systeme
Herausforderungen
- •Model Drift und Data Drift
- •Datenqualität und -verfügbarkeit
- •Reproduzierbarkeit
- •Compliance und Governance
Model Packaging und Containerisierung
Der erste Schritt für Production-Deployment ist die Verpackung Ihres Modells in einen reproduzierbaren Container.
Docker container for ML models
# Dockerfile für ML-Model Service FROM python:3.9-slim WORKDIR /app # System-Dependencies installieren RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # Python-Dependencies kopieren und installieren COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Modell und Code kopieren COPY model/ ./model/ COPY src/ ./src/ COPY config/ ./config/ # Gesundheitscheck hinzufügen HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 # Non-root User erstellen RUN useradd --create-home --shell /bin/bash app USER app # Service starten EXPOSE 8000 CMD ["python", "src/serve.py"]
FastAPI Model Service
# src/serve.py from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel import joblib import numpy as np import pandas as pd from typing import List, Dict, Any import logging import asyncio from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST import time app = FastAPI( title="ML Model API", description="Production ML Model Service", version="1.0.0" ) # Logging konfigurieren logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Prometheus Metriken PREDICTION_COUNTER = Counter('ml_predictions_total', 'Total predictions made') PREDICTION_DURATION = Histogram('ml_prediction_duration_seconds', 'Time spent on predictions') ERROR_COUNTER = Counter('ml_errors_total', 'Total errors', ['error_type']) class PredictionRequest(BaseModel): features: List[float] model_version: str = "latest" class PredictionResponse(BaseModel): prediction: float confidence: float model_version: str timestamp: str class ModelManager: def __init__(self): self.models = {} self.load_models() def load_models(self): try: # Hauptmodell laden self.models['latest'] = joblib.load('model/model_latest.pkl') self.models['v1.0'] = joblib.load('model/model_v1.0.pkl') logger.info("Models loaded successfully") except Exception as e: logger.error(f"Error loading models: {e}") raise def predict(self, features: np.ndarray, version: str = "latest"): if version not in self.models: raise ValueError(f"Model version {version} not found") model = self.models[version] prediction = model.predict(features.reshape(1, -1))[0] # Konfidenz berechnen (falls verfügbar) confidence = 0.95 # Placeholder if hasattr(model, 'predict_proba'): proba = model.predict_proba(features.reshape(1, -1))[0] confidence = max(proba) return prediction, confidence # Model Manager initialisieren model_manager = ModelManager() @app.get("/health") async def health_check(): return {"status": "healthy", "timestamp": time.time()} @app.get("/models") async def list_models(): return {"available_models": list(model_manager.models.keys())} @app.post("/predict", response_model=PredictionResponse) async def predict(request: PredictionRequest): start_time = time.time() try: # Eingabedaten validieren if len(request.features) == 0: ERROR_COUNTER.labels(error_type='invalid_input').inc() raise HTTPException(status_code=400, detail="No features provided") # Prediction durchführen features = np.array(request.features) prediction, confidence = model_manager.predict(features, request.model_version) # Metriken aktualisieren PREDICTION_COUNTER.inc() PREDICTION_DURATION.observe(time.time() - start_time) return PredictionResponse( prediction=float(prediction), confidence=float(confidence), model_version=request.model_version, timestamp=str(time.time()) ) except ValueError as e: ERROR_COUNTER.labels(error_type='model_error').inc() raise HTTPException(status_code=400, detail=str(e)) except Exception as e: ERROR_COUNTER.labels(error_type='unknown').inc() logger.error(f"Prediction error: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/metrics") async def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Requirements und Dependencies
# requirements.txt fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0 scikit-learn==1.3.2 pandas==2.1.3 numpy==1.25.2 joblib==1.3.2 prometheus-client==0.19.0 python-multipart==0.0.6 # Optionale Dependencies für erweiterte Features # tensorflow==2.15.0 # torch==2.1.0 # xgboost==2.0.2 # lightgbm==4.1.0 # Development Dependencies pytest==7.4.3 black==23.11.0 flake8==6.1.0 mypy==1.7.1
Kubernetes Deployment
Skalieren Sie Ihre ML-Services mit Kubernetes für hohe Verfügbarkeit und automatische Skalierung.
Deployment Manifest
# k8s/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: ml-model-service labels: app: ml-model-service version: v1.0.0 spec: replicas: 3 selector: matchLabels: app: ml-model-service template: metadata: labels: app: ml-model-service version: v1.0.0 spec: containers: - name: ml-model image: your-registry/ml-model:v1.0.0 ports: - containerPort: 8000 env: - name: MODEL_VERSION value: "v1.0.0" - name: LOG_LEVEL value: "INFO" resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 5 periodSeconds: 5 volumeMounts: - name: model-storage mountPath: /app/model readOnly: true volumes: - name: model-storage persistentVolumeClaim: claimName: model-pvc --- apiVersion: v1 kind: Service metadata: name: ml-model-service spec: selector: app: ml-model-service ports: - protocol: TCP port: 80 targetPort: 8000 type: ClusterIP --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: ml-model-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: ml-model-service minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80
Ingress und Load Balancing
# k8s/ingress.yaml apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: ml-model-ingress annotations: kubernetes.io/ingress.class: nginx nginx.ingress.kubernetes.io/rewrite-target: / nginx.ingress.kubernetes.io/ssl-redirect: "true" nginx.ingress.kubernetes.io/rate-limit: "100" nginx.ingress.kubernetes.io/rate-limit-window: "1m" spec: tls: - hosts: - ml-api.yourdomain.com secretName: ml-api-tls rules: - host: ml-api.yourdomain.com http: paths: - path: / pathType: Prefix backend: service: name: ml-model-service port: number: 80 --- # Canary Deployment für A/B Testing apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: ml-model-canary annotations: kubernetes.io/ingress.class: nginx nginx.ingress.kubernetes.io/canary: "true" nginx.ingress.kubernetes.io/canary-weight: "10" spec: rules: - host: ml-api.yourdomain.com http: paths: - path: / pathType: Prefix backend: service: name: ml-model-service-canary port: number: 80
CI/CD Pipeline for ML
Automatisieren Sie das Training, Testing und Deployment Ihrer ML-Modelle mit CI/CD-Pipelines.
GitHub Actions Workflow
# .github/workflows/ml-pipeline.yml name: ML Model CI/CD Pipeline on: push: branches: [ main, develop ] pull_request: branches: [ main ] env: REGISTRY: ghcr.io IMAGE_NAME: ${{ github.repository }}/ml-model jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install pytest pytest-cov - name: Run unit tests run: | pytest tests/ --cov=src --cov-report=xml - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: file: ./coverage.xml model-validation: runs-on: ubuntu-latest needs: test steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: | pip install -r requirements.txt - name: Download test data run: | aws s3 cp s3://ml-data-bucket/test-data.csv ./data/ env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - name: Validate model performance run: | python scripts/validate_model.py --threshold 0.85 - name: Generate model report run: | python scripts/generate_report.py --output model_report.html - name: Upload model artifacts uses: actions/upload-artifact@v3 with: name: model-artifacts path: | model/ model_report.html build-and-push: runs-on: ubuntu-latest needs: [test, model-validation] if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v4 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Log in to Container Registry uses: docker/login-action@v3 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Extract metadata id: meta uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} tags: | type=ref,event=branch type=sha,prefix={{branch}}- type=raw,value=latest,enable={{is_default_branch}} - name: Build and push Docker image uses: docker/build-push-action@v5 with: context: . push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha cache-to: type=gha,mode=max deploy: runs-on: ubuntu-latest needs: build-and-push if: github.ref == 'refs/heads/main' environment: production steps: - uses: actions/checkout@v4 - name: Set up kubectl uses: azure/setup-kubectl@v3 with: version: 'v1.28.0' - name: Configure kubectl run: | echo "${{ secrets.KUBE_CONFIG }}" | base64 -d > kubeconfig export KUBECONFIG=kubeconfig - name: Deploy to Kubernetes run: | envsubst < k8s/deployment.yaml | kubectl apply -f - kubectl rollout status deployment/ml-model-service env: IMAGE_TAG: ${{ github.sha }} - name: Run smoke tests run: | kubectl port-forward service/ml-model-service 8080:80 & sleep 10 python scripts/smoke_tests.py --endpoint http://localhost:8080
Model Validation Script
# scripts/validate_model.py import argparse import joblib import pandas as pd import numpy as np from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def validate_model(model_path: str, test_data_path: str, threshold: float): """ Validiert die Modell-Performance gegen einen Mindest-Threshold """ logger.info(f"Loading model from {model_path}") model = joblib.load(model_path) logger.info(f"Loading test data from {test_data_path}") test_data = pd.read_csv(test_data_path) # Features und Target separieren X_test = test_data.drop('target', axis=1) y_test = test_data['target'] # Predictions y_pred = model.predict(X_test) # Metriken berechnen accuracy = accuracy_score(y_test, y_pred) precision = precision_score(y_test, y_pred, average='weighted') recall = recall_score(y_test, y_pred, average='weighted') f1 = f1_score(y_test, y_pred, average='weighted') logger.info(f"Model Performance:") logger.info(f" Accuracy: {accuracy:.4f}") logger.info(f" Precision: {precision:.4f}") logger.info(f" Recall: {recall:.4f}") logger.info(f" F1-Score: {f1:.4f}") # Threshold-Check if accuracy < threshold: logger.error(f"Model accuracy {accuracy:.4f} below threshold {threshold}") return False logger.info(f"Model validation passed! Accuracy {accuracy:.4f} >= {threshold}") return True if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--model", default="model/model_latest.pkl") parser.add_argument("--data", default="data/test-data.csv") parser.add_argument("--threshold", type=float, default=0.8) args = parser.parse_args() success = validate_model(args.model, args.data, args.threshold) exit(0 if success else 1)
Model Monitoring und Observability
Überwachen Sie die Performance Ihrer ML-Modelle in der Produktion und erkennen Sie frühzeitig Probleme.
Data Drift Detection
Erkennen Sie Veränderungen in der Datenverteilung, die die Modell-Performance beeinträchtigen können.
# monitoring/drift_detector.py import numpy as np import pandas as pd from scipy import stats from typing import Dict, Tuple import logging class DataDriftDetector: def __init__(self, reference_data: pd.DataFrame, significance_level: float = 0.05): self.reference_data = reference_data self.significance_level = significance_level self.reference_stats = self._compute_reference_stats() def _compute_reference_stats(self) -> Dict: """Referenz-Statistiken berechnen""" stats_dict = {} for column in self.reference_data.columns: if self.reference_data[column].dtype in ['int64', 'float64']: stats_dict[column] = { 'mean': self.reference_data[column].mean(), 'std': self.reference_data[column].std(), 'distribution': self.reference_data[column].values } else: # Kategorische Variablen stats_dict[column] = { 'categories': self.reference_data[column].value_counts().to_dict() } return stats_dict def detect_drift(self, current_data: pd.DataFrame) -> Dict: """Data Drift Detection mit statistischen Tests""" drift_results = {} for column in self.reference_data.columns: if column not in current_data.columns: continue if self.reference_data[column].dtype in ['int64', 'float64']: # Numerische Variablen: Kolmogorov-Smirnov Test ref_values = self.reference_stats[column]['distribution'] current_values = current_data[column].values ks_statistic, p_value = stats.ks_2samp(ref_values, current_values) drift_results[column] = { 'test': 'ks_test', 'statistic': ks_statistic, 'p_value': p_value, 'drift_detected': p_value < self.significance_level, 'severity': self._calculate_severity(ks_statistic) } else: # Kategorische Variablen: Chi-Square Test ref_counts = self.reference_stats[column]['categories'] current_counts = current_data[column].value_counts().to_dict() # Gemeinsame Kategorien all_categories = set(ref_counts.keys()) | set(current_counts.keys()) ref_freq = [ref_counts.get(cat, 0) for cat in all_categories] current_freq = [current_counts.get(cat, 0) for cat in all_categories] if sum(current_freq) > 0 and sum(ref_freq) > 0: chi2_stat, p_value = stats.chisquare(current_freq, ref_freq) drift_results[column] = { 'test': 'chi2_test', 'statistic': chi2_stat, 'p_value': p_value, 'drift_detected': p_value < self.significance_level, 'severity': self._calculate_severity_categorical(chi2_stat) } return drift_results def _calculate_severity(self, ks_statistic: float) -> str: """Schweregrad des Drifts basierend auf KS-Statistik""" if ks_statistic < 0.1: return "low" elif ks_statistic < 0.25: return "medium" else: return "high" def _calculate_severity_categorical(self, chi2_stat: float) -> str: """Severity for categorical variables""" if chi2_stat < 10: return "low" elif chi2_stat < 50: return "medium" else: return "high" # Verwendung in der Monitoring-Pipeline def monitor_data_drift(current_batch: pd.DataFrame, reference_data: pd.DataFrame): detector = DataDriftDetector(reference_data) drift_results = detector.detect_drift(current_batch) # Alerts senden for column, result in drift_results.items(): if result['drift_detected']: logging.warning(f"Data drift detected in column '{column}': " f"p-value={result['p_value']:.4f}, " f"severity={result['severity']}") # Alert-System (Slack, Email, etc.) send_alert(f"Data drift in {column}", result) return drift_results
Model Performance Monitoring
Überwachen Sie die Modell-Performance kontinuierlich und reagieren Sie auf Verschlechterungen.
# monitoring/performance_monitor.py import pandas as pd import numpy as np from typing import Dict, List from dataclasses import dataclass from datetime import datetime, timedelta import sqlite3 @dataclass class PredictionLog: timestamp: datetime model_version: str features: List[float] prediction: float confidence: float actual_value: float = None # Wird später über Feedback-Loop gefüllt class ModelPerformanceMonitor: def __init__(self, db_path: str = "model_monitoring.db"): self.db_path = db_path self._setup_database() def _setup_database(self): """Set up SQLite database for logging""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS predictions ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME, model_version TEXT, features TEXT, prediction REAL, confidence REAL, actual_value REAL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE, model_version TEXT, metric_name TEXT, metric_value REAL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def log_prediction(self, log_entry: PredictionLog): """Prediction loggen""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO predictions (timestamp, model_version, features, prediction, confidence, actual_value) VALUES (?, ?, ?, ?, ?, ?) ''', ( log_entry.timestamp, log_entry.model_version, str(log_entry.features), # JSON als String log_entry.prediction, log_entry.confidence, log_entry.actual_value )) conn.commit() conn.close() def calculate_daily_metrics(self, date: datetime, model_version: str) -> Dict: """Calculate daily performance metrics""" conn = sqlite3.connect(self.db_path) # Predictions für den Tag abrufen query = ''' SELECT prediction, actual_value, confidence FROM predictions WHERE DATE(timestamp) = ? AND model_version = ? AND actual_value IS NOT NULL ''' df = pd.read_sql_query(query, conn, params=(date.date(), model_version)) conn.close() if len(df) == 0: return {} # Metriken berechnen mae = np.mean(np.abs(df['prediction'] - df['actual_value'])) mse = np.mean((df['prediction'] - df['actual_value']) ** 2) rmse = np.sqrt(mse) # Konfidenz-Kalibrierung avg_confidence = df['confidence'].mean() accuracy_in_conf_range = self._calculate_confidence_accuracy(df) metrics = { 'mae': mae, 'mse': mse, 'rmse': rmse, 'avg_confidence': avg_confidence, 'confidence_accuracy': accuracy_in_conf_range, 'prediction_count': len(df) } # Metriken in DB speichern self._store_metrics(date.date(), model_version, metrics) return metrics def _calculate_confidence_accuracy(self, df: pd.DataFrame) -> float: """Calculates how well the confidence estimates are calibrated""" # Binning nach Konfidenz-Levels bins = np.linspace(0, 1, 11) # 10 Bins df['conf_bin'] = pd.cut(df['confidence'], bins=bins) accuracies = [] for bin_name, group in df.groupby('conf_bin'): if len(group) > 0: # Genauigkeit in diesem Konfidenz-Bereich errors = np.abs(group['prediction'] - group['actual_value']) # Normalisieren auf 0-1 Skala (je nach Problem anpassen) normalized_accuracy = 1 - np.mean(errors) / np.std(df['actual_value']) accuracies.append(normalized_accuracy) return np.mean(accuracies) if accuracies else 0.0 def _store_metrics(self, date, model_version: str, metrics: Dict): """Metriken in Datenbank speichern""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() for metric_name, metric_value in metrics.items(): cursor.execute(''' INSERT INTO performance_metrics (date, model_version, metric_name, metric_value) VALUES (?, ?, ?, ?) ''', (date, model_version, metric_name, metric_value)) conn.commit() conn.close() def detect_performance_degradation(self, model_version: str, lookback_days: int = 7) -> Dict: """Performance-Verschlechterung erkennen""" conn = sqlite3.connect(self.db_path) end_date = datetime.now().date() start_date = end_date - timedelta(days=lookback_days) query = ''' SELECT date, metric_name, metric_value FROM performance_metrics WHERE model_version = ? AND date BETWEEN ? AND ? ORDER BY date ''' df = pd.read_sql_query(query, conn, params=(model_version, start_date, end_date)) conn.close() if len(df) == 0: return {"status": "insufficient_data"} # Trend-Analyse für wichtige Metriken trends = {} for metric in ['mae', 'rmse', 'confidence_accuracy']: metric_data = df[df['metric_name'] == metric].sort_values('date') if len(metric_data) >= 3: # Einfache Trend-Berechnung recent_avg = metric_data.tail(3)['metric_value'].mean() historical_avg = metric_data.head(-3)['metric_value'].mean() if len(metric_data) > 3 else recent_avg trend_pct = ((recent_avg - historical_avg) / historical_avg) * 100 # Für Fehler-Metriken ist ein Anstieg schlecht degradation = trend_pct > 10 if metric in ['mae', 'rmse'] else trend_pct < -10 trends[metric] = { 'trend_pct': trend_pct, 'degradation_detected': degradation, 'recent_avg': recent_avg, 'historical_avg': historical_avg } return trends # Usage in FastAPI service monitor = ModelPerformanceMonitor() @app.middleware("http") async def log_predictions(request: Request, call_next): response = await call_next(request) # Prediction logging (nur für /predict endpoint) if request.url.path == "/predict" and response.status_code == 200: # Log entry erstellen und speichern # (Details abhängig von der spezifischen Implementation) pass return response
MLOps Best Practices
- ✓Versionierung: Code, Daten und Modelle immer versionieren
- ✓Reproduzierbarkeit: Deterministische Pipelines aufbauen
- ✓Testing: Unit-, Integration- und Model-Tests
- ✓Monitoring: Set up continuous monitoring
- ✓Rollback: Schnelle Rollback-Strategien implementieren
- ✓Security: Consider security aspects from the outset
- ✓Documentation: Umfassende Dokumentation pflegen
- ✓Governance: Klare Rollen und Prozesse definieren
MLOps Tools & Technologien
Experiment Tracking
- •MLflow
- •Weights & Biases
- •Neptune AI
- •TensorBoard
Model Serving
- •TensorFlow Serving
- •TorchServe
- •Seldon Core
- •KServe (KubeFlow)
Orchestration
- •Apache Airflow
- •Kubeflow Pipelines
- •Prefect
- •MLflow Pipelines
Inhaltsverzeichnis
Deployment Checklist
Verwandte Artikel
Voraussetzungen
🚀 Bereit für Production-ML?
Lassen Sie uns Ihre ML-Modelle erfolgreich in die Produktion bringen. Von der Container-Entwicklung bis zum automatisierten Deployment - wir unterstützen Sie bei jedem Schritt.