Registry-Daten konsumieren

Registry‑Daten konsumieren

Integrationsmuster und Best Practices zum Aufbau von Anwendungen, die MCP‑Registry‑Daten über die REST‑API konsumieren.

Grundlegende Informationen

Basis‑URL: https://registry.modelcontextprotocol.io

Authentifizierung: Für Lesezugriff ist keine Authentifizierung erforderlich

Kern‑Endpunkte:

  • GET /v0/servers – Listet alle Server seitenweise
  • GET /v0/servers/{id} – Ruft Serverdetails per UUID ab

Siehe die interaktive API‑Dokumentation für vollständige Request/Response‑Schemata.

Hinweis: Die offizielle Registry gibt keine Garantien für Verfügbarkeit oder Datenpersistenz. Entwerfen Sie Ihre Anwendung so, dass Ausfälle per Caching überbrückt werden können.

Schnellstart

Einfache Serverliste

# Zeige die ersten 10 Server
curl "https://registry.modelcontextprotocol.io/v0/servers?limit=10"
{
  "servers": [
    {
      "name": "io.modelcontextprotocol/filesystem",
      "description": "Server für Dateisystem‑Operationen", 
      "status": "active",
      "version": "1.0.2"
    }
  ],
  "metadata": {
    "count": 10,
    "next_cursor": "eyJ..."
  }
}

Server suchen

# Nach bestimmten Funktionen suchen
curl "https://registry.modelcontextprotocol.io/v0/servers?search=filesystem"

# Nach Status filtern
curl "https://registry.modelcontextprotocol.io/v0/servers?status=active"

# Kombinierte Abfragen
curl "https://registry.modelcontextprotocol.io/v0/servers?search=weather&limit=5"

Serverdetails abrufen

# Vollständige Informationen eines bestimmten Servers abrufen
curl "https://registry.modelcontextprotocol.io/v0/servers/{server-uuid}"

Sub‑Registries aufbauen

Erweiterte Registry erstellen – ETL der offiziellen Registry‑Daten und Ergänzung um eigene Metadaten wie Bewertungen, Security‑Scans oder Kompatibilität.

ETL‑Prozess

Aktuell empfehlen wir, den Endpunkt GET /v0/servers regelmäßig zu crawlen. Künftig könnte ein updated_at‑Filter (#291) verfügbar werden, um nur kürzlich geänderte Server abzurufen.

import requests
import time
from datetime import datetime

class RegistryETL:
    def __init__(self, base_url="https://registry.modelcontextprotocol.io"):
        self.base_url = base_url
        
    def fetch_all_servers(self):
        """Alle Server abrufen und paginieren"""
        servers = []
        cursor = None
        
        while True:
            params = {"limit": 100}
            if cursor:
                params["cursor"] = cursor
                
            response = requests.get(f"{self.base_url}/v0/servers", params=params)
            response.raise_for_status()
            
            data = response.json()
            servers.extend(data["servers"])
            
            # Prüfen, ob weitere Seiten vorhanden sind
            if not data.get("metadata", {}).get("next_cursor"):
                break
                
            cursor = data["metadata"]["next_cursor"]
            
        return servers
    
    def process_servers(self, servers):
        """Serverdaten verarbeiten und Metadaten ergänzen"""
        enhanced_servers = []
        
        for server in servers:
            # Inaktive Server überspringen
            if server.get("status") != "active":
                continue
                
            # Eigene Metadaten hinzufügen
            enhanced_server = {
                **server,
                "_meta": {
                    **server.get("_meta", {}),
                    "com.yourregistry/enhanced": {
                        "last_processed": datetime.utcnow().isoformat(),
                        "popularity_score": self.calculate_popularity(server),
                        "security_scan": self.perform_security_scan(server)
                    }
                }
            }
            
            enhanced_servers.append(enhanced_server)
            
        return enhanced_servers
    
    def calculate_popularity(self, server):
        """Beliebtheit berechnen (Beispiellogik)"""
        # Eigene Bewertungslogik implementieren
        base_score = 1.0
        
        # Anhand der Beschreibungsqualität anpassen
        if len(server.get("description", "")) > 100:
            base_score += 0.2
            
        # Anhand der Paketanzahl anpassen
        package_count = len(server.get("packages", []))
        if package_count > 1:
            base_score += 0.1 * package_count
            
        return min(base_score, 5.0)
    
    def perform_security_scan(self, server):
        """Sicherheitsprüfung durchführen (Beispiel)"""
        return {
            "last_scanned": datetime.utcnow().isoformat(),
            "vulnerabilities_found": 0,
            "scan_status": "passed"
        }

# Anwendungsbeispiel
etl = RegistryETL()
servers = etl.fetch_all_servers()
enhanced_servers = etl.process_servers(servers)

Statusverwaltung

Server sind in der Regel unveränderlich – mit Ausnahme des Feldes status, das auf deleted (oder andere Zustände) gesetzt werden kann. Für solche Pakete empfehlen wir, den Status ebenfalls zeitnah auf deleted zu setzen oder das Paket aus Ihrer Registry zu entfernen. Dieser Status weist meist auf einen Verstoß gegen unsere großzügigen Moderationsrichtlinien hin (z. B. illegal, Malware oder Spam).

def sync_server_status(self, local_server, remote_server):
    """Serverstatusänderungen synchronisieren"""
    if remote_server["status"] == "deleted":
        # Sofort aus der lokalen Registry entfernen oder markieren
        self.mark_server_deleted(local_server["id"])
        
    elif remote_server["status"] != local_server.get("status"):
        # Andere Statusänderungen aktualisieren
        self.update_server_status(
            local_server["id"], 
            remote_server["status"]
        )

Filterung und Anreicherung

Die offizielle Registry hat eine großzügige Moderationspolitik. Möglicherweise möchten Sie daher auf Basis der Registry‑Daten eigene Filter implementieren.

def apply_quality_filters(self, servers):
    """Qualitätsfilter anwenden"""
    filtered_servers = []
    
    for server in servers:
        # Grundlegende Qualitätsprüfung
        if not self.meets_quality_standards(server):
            continue
            
        # Sicherheitsprüfung
        if not self.passes_security_scan(server):
            continue
            
        # Lizenzprüfung
        if not self.has_acceptable_license(server):
            continue
            
        filtered_servers.append(server)
        
    return filtered_servers

def enhance_with_metadata(self, server):
    """Server mit erweiterten Metadaten anreichern"""
    enhanced = {
        **server,
        "_meta": {
            **server.get("_meta", {}),
            "com.yourregistry/enhanced": {
                "user_rating": self.get_user_rating(server["name"]),
                "download_count": self.get_download_count(server["name"]),
                "last_updated": self.get_last_update_time(server["name"]),
                "tags": self.generate_tags(server),
                "compatibility": self.check_compatibility(server)
            }
        }
    }
    
    return enhanced

API bereitstellen

Wir empfehlen, dass Ihre Sub‑Registry eine API bereitstellt, die der Registry‑API‑Spezifikation entspricht, damit Clients leicht zwischen Registries wechseln können. Siehe Registry‑API‑Dokumentation für Details.

from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/v0/servers')
def list_servers():
    """Serverlisten-Endpunkt kompatibel zur offiziellen Registry"""
    
    # Query-Parameter parsen
    limit = min(int(request.args.get('limit', 20)), 100)
    cursor = request.args.get('cursor')
    search = request.args.get('search')
    status = request.args.get('status', 'active')
    
    # Filter anwenden
    servers = self.get_filtered_servers(
        limit=limit,
        cursor=cursor,
        search=search,
        status=status
    )
    
    # Nächsten Cursor berechnen
    next_cursor = None
    if len(servers) == limit:
        next_cursor = self.generate_cursor(servers[-1])
    
    return jsonify({
        "servers": servers,
        "metadata": {
            "count": len(servers),
            "next_cursor": next_cursor
        }
    })

@app.route('/v0/servers/<server_id>')
def get_server(server_id):
    """Details zu einem bestimmten Server abrufen"""
    server = self.get_server_by_id(server_id)
    
    if not server:
        return jsonify({"error": "Server not found"}), 404
        
    return jsonify(server)

MCP‑Client‑Integration

Registry‑Daten in Client‑Konfiguration umwandeln – Server abrufen und Paketinformationen in das Konfigurationsformat Ihres MCP‑Clients konvertieren.

Grundlegende Integration

import requests
from typing import List, Dict, Any

class MCPRegistryClient:
    def __init__(self, registry_url="https://registry.modelcontextprotocol.io"):
        self.registry_url = registry_url
        
    def discover_servers(self, search_query: str = None) -> List[Dict[str, Any]]:
        """Verfügbare MCP-Server entdecken"""
        params = {"status": "active", "limit": 50}
        if search_query:
            params["search"] = search_query
            
        response = requests.get(f"{self.registry_url}/v0/servers", params=params)
        response.raise_for_status()
        
        return response.json()["servers"]
    
    def get_server_details(self, server_id: str) -> Dict[str, Any]:
        """Serverdetails abrufen"""
        response = requests.get(f"{self.registry_url}/v0/servers/{server_id}")
        response.raise_for_status()
        
        return response.json()
    
    def convert_to_client_config(self, server: Dict[str, Any]) -> Dict[str, Any]:
        """Registry-Serverdaten in Clientkonfiguration umwandeln"""
        
        if "packages" in server:
            return self.convert_package_server(server)
        elif "remotes" in server:
            return self.convert_remote_server(server)
        else:
            raise ValueError(f"Server {server['name']} has no packages or remotes")
    
    def convert_package_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
        """Server des Typs "packages" umwandeln"""
        # Bevorzugtes Paket auswählen (z. B. npm priorisieren)
        package = self.select_preferred_package(server["packages"])
        
        config = {
            "name": server["name"],
            "description": server["description"],
            "type": "package",
            "package": {
                "registry": package["registry_type"],
                "identifier": package["identifier"],
                "version": package["version"]
            }
        }
        
        # Laufzeitparameter hinzufügen
        if "package_arguments" in package:
            config["package"]["arguments"] = package["package_arguments"]
            
        if "environment_variables" in package:
            config["package"]["environment"] = package["environment_variables"]
            
        return config
    
    def convert_remote_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
        """Server des Typs "remote" umwandeln"""
        # Bevorzugten Transport wählen (z. B. SSE priorisieren)
        remote = self.select_preferred_remote(server["remotes"])
        
        config = {
            "name": server["name"],
            "description": server["description"],
            "type": "remote",
            "remote": {
                "transport": remote["type"],
                "url": remote["url"]
            }
        }
        
        # Header hinzufügen
        if "headers" in remote:
            config["remote"]["headers"] = remote["headers"]
            
        return config
    
    def select_preferred_package(self, packages: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Bevorzugten Pakettyp wählen"""
        # Prioritätenreihenfolge
        priority_order = ["npm", "pypi", "nuget", "oci", "mcpb"]
        
        for registry_type in priority_order:
            for package in packages:
                if package["registry_type"] == registry_type:
                    return package
                    
        # Falls kein bevorzugter Typ gefunden wird, den ersten nehmen
        return packages[0]
    
    def select_preferred_remote(self, remotes: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Bevorzugten Remote-Transport wählen"""
        # SSE bevorzugen
        for remote in remotes:
            if remote["type"] == "sse":
                return remote
                
        # Fallback: ersten verfügbaren verwenden
        return remotes[0]

# Anwendungsbeispiel
client = MCPRegistryClient()

# Server entdecken
servers = client.discover_servers("filesystem")

# In Clientkonfiguration umwandeln
configs = []
for server in servers:
    try:
        config = client.convert_to_client_config(server)
        configs.append(config)
    except ValueError as e:
        print(f"Server {server['name']} übersprungen: {e}")

print(f"{len(configs)} verfügbare Server gefunden")

Erweiterte Filterung

Wir empfehlen nachdrücklich, eine Sub‑Registry zu verwenden, statt Daten direkt aus der offiziellen Registry zu beziehen. Machen Sie dies idealerweise konfigurierbar, sodass Nutzer ihren bevorzugten Registry‑Endpunkt wählen können – insbesondere, da einige Unternehmen eine eigene Registry betreiben.

class AdvancedMCPClient:
    def __init__(self, preferred_registries=None):
        self.preferred_registries = preferred_registries or [
            "https://registry.modelcontextprotocol.io",
            "https://enterprise.registry.com/api/v0",
            "https://curated.mcpregistry.org/api/v0"
        ]
    
    def discover_servers_multi_registry(self, query: str) -> List[Dict[str, Any]]:
        """Server aus mehreren Registries entdecken"""
        all_servers = []
        
        for registry_url in self.preferred_registries:
            try:
                client = MCPRegistryClient(registry_url)
                servers = client.discover_servers(query)
                
                # Herkunftsinfo hinzufügen
                for server in servers:
                    server["_source_registry"] = registry_url
                    
                all_servers.extend(servers)
                
            except Exception as e:
                print(f"Fehler beim Abrufen von {registry_url}: {e}")
                continue
        
        # Deduplizieren (basierend auf Servernamen)
        seen = set()
        unique_servers = []
        
        for server in all_servers:
            if server["name"] not in seen:
                seen.add(server["name"])
                unique_servers.append(server)
                
        return unique_servers
    
    def filter_by_criteria(self, servers: List[Dict[str, Any]], criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Server anhand von Kriterien filtern"""
        filtered = []
        
        for server in servers:
            # Nach Status filtern
            if criteria.get("status") and server.get("status") != criteria["status"]:
                continue
                
            # Nach Pakettyp filtern
            if criteria.get("package_types"):
                server_types = {pkg["registry_type"] for pkg in server.get("packages", [])}
                if not server_types.intersection(set(criteria["package_types"])):
                    continue
            
            # Nach Bewertung filtern
            if criteria.get("min_rating"):
                rating = server.get("_meta", {}).get("user_rating", 0)
                if rating < criteria["min_rating"]:
                    continue
                    
            filtered.append(server)
            
        return filtered

Ihr Client sollte mit Registries umgehen können, die nur das Minimum der Spezifikation erfüllen – d. h. keine harte Abhängigkeit von _meta‑Feldern.

Server ausführen

Über die Felder packages oder remotes lässt sich bestimmen, wie ein Server ausgeführt wird. Details zu diesen Feldern finden Sie in der server.json‑Dokumentation.

def execute_server(self, config: Dict[str, Any]) -> subprocess.Popen:
    """MCP-Server ausführen"""
    
    if config["type"] == "package":
        return self.execute_package_server(config)
    elif config["type"] == "remote":
        return self.connect_remote_server(config)
    else:
        raise ValueError(f"Nicht unterstützter Servertyp: {config['type']}")

def execute_package_server(self, config: Dict[str, Any]) -> subprocess.Popen:
    """Server vom Typ "packages" ausführen"""
    package = config["package"]
    
    # Befehl bauen
    if package["registry"] == "npm":
        cmd = ["npx", package["identifier"]]
    elif package["registry"] == "pypi":
        cmd = ["uvx" if "runtime_hint" in package else "python", "-m", package["identifier"]]
    elif package["registry"] == "oci":
        cmd = ["docker", "run", package["identifier"]]
    else:
        raise ValueError(f"Nicht unterstützte Paket-Registry: {package['registry']}")
    
    # Argumente hinzufügen
    if "arguments" in package:
        for arg in package["arguments"]:
            if arg["type"] == "positional":
                cmd.append(arg["value"])
            elif arg["type"] == "named":
                cmd.extend([arg["name"], arg["value"]])
    
    # Umgebungsvariablen setzen
    env = os.environ.copy()
    if "environment" in package:
        for env_var in package["environment"]:
            if env_var.get("is_required") and env_var["name"] not in env:
                raise ValueError(f"Erforderliche Umgebungsvariable fehlt: {env_var['name']}")
            env[env_var["name"]] = env.get(env_var["name"], env_var.get("default", ""))
    
    return subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

Best Practices

1. Caching‑Strategie

import redis
import json
from datetime import timedelta

class CachedRegistryClient:
    def __init__(self, redis_client=None, cache_ttl=3600):
        self.redis = redis_client or redis.Redis()
        self.cache_ttl = cache_ttl
    
    def get_servers_cached(self, **params):
        """Serverabruf mit Cache"""
        cache_key = f"registry:servers:{hash(frozenset(params.items()))}"
        
        # Versuch, aus dem Cache zu holen
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Aus der Registry holen
        servers = self.fetch_servers(**params)
        
        # Ergebnis cachen
        self.redis.setex(
            cache_key, 
            timedelta(seconds=self.cache_ttl),
            json.dumps(servers)
        )
        
        return servers

2. Fehlerbehandlung

import time
import random

class RobustRegistryClient:
    def __init__(self, max_retries=3, base_delay=1):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def fetch_with_retry(self, url, **kwargs):
        """Request mit Retry"""
        for attempt in range(self.max_retries):
            try:
                response = requests.get(url, timeout=30, **kwargs)
                response.raise_for_status()
                return response.json()
                
            except requests.RequestException as e:
                if attempt == self.max_retries - 1:
                    raise
                
                # Exponentielles Backoff
                delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                time.sleep(delay)

3. Inkrementelle Aktualisierung

class IncrementalRegistrySync:
    def __init__(self, last_sync_file="last_sync.txt"):
        self.last_sync_file = last_sync_file
    
    def get_last_sync_time(self):
        """Letzte Sync-Zeit abrufen"""
        try:
            with open(self.last_sync_file, 'r') as f:
                return datetime.fromisoformat(f.read().strip())
        except FileNotFoundError:
            return datetime.min
    
    def save_sync_time(self, sync_time):
        """Sync-Zeit speichern"""
        with open(self.last_sync_file, 'w') as f:
            f.write(sync_time.isoformat())
    
    def sync_changes(self):
        """Änderungen synchronisieren (sobald updated_at unterstützt wird)"""
        last_sync = self.get_last_sync_time()
        
        # Zukünftiger API-Aufruf
        # servers = self.fetch_servers(updated_after=last_sync)
        
        # Derzeit alle Server abrufen und lokal filtern
        all_servers = self.fetch_all_servers()
        
        self.save_sync_time(datetime.utcnow())
        return all_servers

Nächste Schritte

Vollständige API‑Spezifikation ansehen Lernen, wie Sie Ihren Server veröffentlichen Architektur der Registry verstehen Designprinzipien der Registry verstehen
Empfehlung: In Produktionsumgebungen eine Sub‑Registry statt der offiziellen Registry verwenden – so können Sie Caching, Filterung und Erweiterungen hinzufügen.