Registry-Daten konsumieren
Registry‑Daten konsumieren
Integrationsmuster und Best Practices zum Aufbau von Anwendungen, die MCP‑Registry‑Daten über die REST‑API konsumieren.
Grundlegende Informationen
Basis‑URL: https://registry.modelcontextprotocol.io
Authentifizierung: Für Lesezugriff ist keine Authentifizierung erforderlich
Kern‑Endpunkte:
GET /v0/servers– Listet alle Server seitenweiseGET /v0/servers/{id}– Ruft Serverdetails per UUID ab
Siehe die interaktive API‑Dokumentation für vollständige Request/Response‑Schemata.
Schnellstart
Einfache Serverliste
# Zeige die ersten 10 Server
curl "https://registry.modelcontextprotocol.io/v0/servers?limit=10"{
"servers": [
{
"name": "io.modelcontextprotocol/filesystem",
"description": "Server für Dateisystem‑Operationen",
"status": "active",
"version": "1.0.2"
}
],
"metadata": {
"count": 10,
"next_cursor": "eyJ..."
}
}Server suchen
# Nach bestimmten Funktionen suchen
curl "https://registry.modelcontextprotocol.io/v0/servers?search=filesystem"
# Nach Status filtern
curl "https://registry.modelcontextprotocol.io/v0/servers?status=active"
# Kombinierte Abfragen
curl "https://registry.modelcontextprotocol.io/v0/servers?search=weather&limit=5"Serverdetails abrufen
# Vollständige Informationen eines bestimmten Servers abrufen
curl "https://registry.modelcontextprotocol.io/v0/servers/{server-uuid}"Sub‑Registries aufbauen
Erweiterte Registry erstellen – ETL der offiziellen Registry‑Daten und Ergänzung um eigene Metadaten wie Bewertungen, Security‑Scans oder Kompatibilität.
ETL‑Prozess
Aktuell empfehlen wir, den Endpunkt GET /v0/servers regelmäßig zu crawlen. Künftig könnte ein updated_at‑Filter (#291) verfügbar werden, um nur kürzlich geänderte Server abzurufen.
import requests
import time
from datetime import datetime
class RegistryETL:
def __init__(self, base_url="https://registry.modelcontextprotocol.io"):
self.base_url = base_url
def fetch_all_servers(self):
"""Alle Server abrufen und paginieren"""
servers = []
cursor = None
while True:
params = {"limit": 100}
if cursor:
params["cursor"] = cursor
response = requests.get(f"{self.base_url}/v0/servers", params=params)
response.raise_for_status()
data = response.json()
servers.extend(data["servers"])
# Prüfen, ob weitere Seiten vorhanden sind
if not data.get("metadata", {}).get("next_cursor"):
break
cursor = data["metadata"]["next_cursor"]
return servers
def process_servers(self, servers):
"""Serverdaten verarbeiten und Metadaten ergänzen"""
enhanced_servers = []
for server in servers:
# Inaktive Server überspringen
if server.get("status") != "active":
continue
# Eigene Metadaten hinzufügen
enhanced_server = {
**server,
"_meta": {
**server.get("_meta", {}),
"com.yourregistry/enhanced": {
"last_processed": datetime.utcnow().isoformat(),
"popularity_score": self.calculate_popularity(server),
"security_scan": self.perform_security_scan(server)
}
}
}
enhanced_servers.append(enhanced_server)
return enhanced_servers
def calculate_popularity(self, server):
"""Beliebtheit berechnen (Beispiellogik)"""
# Eigene Bewertungslogik implementieren
base_score = 1.0
# Anhand der Beschreibungsqualität anpassen
if len(server.get("description", "")) > 100:
base_score += 0.2
# Anhand der Paketanzahl anpassen
package_count = len(server.get("packages", []))
if package_count > 1:
base_score += 0.1 * package_count
return min(base_score, 5.0)
def perform_security_scan(self, server):
"""Sicherheitsprüfung durchführen (Beispiel)"""
return {
"last_scanned": datetime.utcnow().isoformat(),
"vulnerabilities_found": 0,
"scan_status": "passed"
}
# Anwendungsbeispiel
etl = RegistryETL()
servers = etl.fetch_all_servers()
enhanced_servers = etl.process_servers(servers)Statusverwaltung
Server sind in der Regel unveränderlich – mit Ausnahme des Feldes status, das auf deleted (oder andere Zustände) gesetzt werden kann. Für solche Pakete empfehlen wir, den Status ebenfalls zeitnah auf deleted zu setzen oder das Paket aus Ihrer Registry zu entfernen. Dieser Status weist meist auf einen Verstoß gegen unsere großzügigen Moderationsrichtlinien hin (z. B. illegal, Malware oder Spam).
def sync_server_status(self, local_server, remote_server):
"""Serverstatusänderungen synchronisieren"""
if remote_server["status"] == "deleted":
# Sofort aus der lokalen Registry entfernen oder markieren
self.mark_server_deleted(local_server["id"])
elif remote_server["status"] != local_server.get("status"):
# Andere Statusänderungen aktualisieren
self.update_server_status(
local_server["id"],
remote_server["status"]
)Filterung und Anreicherung
Die offizielle Registry hat eine großzügige Moderationspolitik. Möglicherweise möchten Sie daher auf Basis der Registry‑Daten eigene Filter implementieren.
def apply_quality_filters(self, servers):
"""Qualitätsfilter anwenden"""
filtered_servers = []
for server in servers:
# Grundlegende Qualitätsprüfung
if not self.meets_quality_standards(server):
continue
# Sicherheitsprüfung
if not self.passes_security_scan(server):
continue
# Lizenzprüfung
if not self.has_acceptable_license(server):
continue
filtered_servers.append(server)
return filtered_servers
def enhance_with_metadata(self, server):
"""Server mit erweiterten Metadaten anreichern"""
enhanced = {
**server,
"_meta": {
**server.get("_meta", {}),
"com.yourregistry/enhanced": {
"user_rating": self.get_user_rating(server["name"]),
"download_count": self.get_download_count(server["name"]),
"last_updated": self.get_last_update_time(server["name"]),
"tags": self.generate_tags(server),
"compatibility": self.check_compatibility(server)
}
}
}
return enhancedAPI bereitstellen
Wir empfehlen, dass Ihre Sub‑Registry eine API bereitstellt, die der Registry‑API‑Spezifikation entspricht, damit Clients leicht zwischen Registries wechseln können. Siehe Registry‑API‑Dokumentation für Details.
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/v0/servers')
def list_servers():
"""Serverlisten-Endpunkt kompatibel zur offiziellen Registry"""
# Query-Parameter parsen
limit = min(int(request.args.get('limit', 20)), 100)
cursor = request.args.get('cursor')
search = request.args.get('search')
status = request.args.get('status', 'active')
# Filter anwenden
servers = self.get_filtered_servers(
limit=limit,
cursor=cursor,
search=search,
status=status
)
# Nächsten Cursor berechnen
next_cursor = None
if len(servers) == limit:
next_cursor = self.generate_cursor(servers[-1])
return jsonify({
"servers": servers,
"metadata": {
"count": len(servers),
"next_cursor": next_cursor
}
})
@app.route('/v0/servers/<server_id>')
def get_server(server_id):
"""Details zu einem bestimmten Server abrufen"""
server = self.get_server_by_id(server_id)
if not server:
return jsonify({"error": "Server not found"}), 404
return jsonify(server)MCP‑Client‑Integration
Registry‑Daten in Client‑Konfiguration umwandeln – Server abrufen und Paketinformationen in das Konfigurationsformat Ihres MCP‑Clients konvertieren.
Grundlegende Integration
import requests
from typing import List, Dict, Any
class MCPRegistryClient:
def __init__(self, registry_url="https://registry.modelcontextprotocol.io"):
self.registry_url = registry_url
def discover_servers(self, search_query: str = None) -> List[Dict[str, Any]]:
"""Verfügbare MCP-Server entdecken"""
params = {"status": "active", "limit": 50}
if search_query:
params["search"] = search_query
response = requests.get(f"{self.registry_url}/v0/servers", params=params)
response.raise_for_status()
return response.json()["servers"]
def get_server_details(self, server_id: str) -> Dict[str, Any]:
"""Serverdetails abrufen"""
response = requests.get(f"{self.registry_url}/v0/servers/{server_id}")
response.raise_for_status()
return response.json()
def convert_to_client_config(self, server: Dict[str, Any]) -> Dict[str, Any]:
"""Registry-Serverdaten in Clientkonfiguration umwandeln"""
if "packages" in server:
return self.convert_package_server(server)
elif "remotes" in server:
return self.convert_remote_server(server)
else:
raise ValueError(f"Server {server['name']} has no packages or remotes")
def convert_package_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
"""Server des Typs "packages" umwandeln"""
# Bevorzugtes Paket auswählen (z. B. npm priorisieren)
package = self.select_preferred_package(server["packages"])
config = {
"name": server["name"],
"description": server["description"],
"type": "package",
"package": {
"registry": package["registry_type"],
"identifier": package["identifier"],
"version": package["version"]
}
}
# Laufzeitparameter hinzufügen
if "package_arguments" in package:
config["package"]["arguments"] = package["package_arguments"]
if "environment_variables" in package:
config["package"]["environment"] = package["environment_variables"]
return config
def convert_remote_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
"""Server des Typs "remote" umwandeln"""
# Bevorzugten Transport wählen (z. B. SSE priorisieren)
remote = self.select_preferred_remote(server["remotes"])
config = {
"name": server["name"],
"description": server["description"],
"type": "remote",
"remote": {
"transport": remote["type"],
"url": remote["url"]
}
}
# Header hinzufügen
if "headers" in remote:
config["remote"]["headers"] = remote["headers"]
return config
def select_preferred_package(self, packages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Bevorzugten Pakettyp wählen"""
# Prioritätenreihenfolge
priority_order = ["npm", "pypi", "nuget", "oci", "mcpb"]
for registry_type in priority_order:
for package in packages:
if package["registry_type"] == registry_type:
return package
# Falls kein bevorzugter Typ gefunden wird, den ersten nehmen
return packages[0]
def select_preferred_remote(self, remotes: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Bevorzugten Remote-Transport wählen"""
# SSE bevorzugen
for remote in remotes:
if remote["type"] == "sse":
return remote
# Fallback: ersten verfügbaren verwenden
return remotes[0]
# Anwendungsbeispiel
client = MCPRegistryClient()
# Server entdecken
servers = client.discover_servers("filesystem")
# In Clientkonfiguration umwandeln
configs = []
for server in servers:
try:
config = client.convert_to_client_config(server)
configs.append(config)
except ValueError as e:
print(f"Server {server['name']} übersprungen: {e}")
print(f"{len(configs)} verfügbare Server gefunden")Erweiterte Filterung
Wir empfehlen nachdrücklich, eine Sub‑Registry zu verwenden, statt Daten direkt aus der offiziellen Registry zu beziehen. Machen Sie dies idealerweise konfigurierbar, sodass Nutzer ihren bevorzugten Registry‑Endpunkt wählen können – insbesondere, da einige Unternehmen eine eigene Registry betreiben.
class AdvancedMCPClient:
def __init__(self, preferred_registries=None):
self.preferred_registries = preferred_registries or [
"https://registry.modelcontextprotocol.io",
"https://enterprise.registry.com/api/v0",
"https://curated.mcpregistry.org/api/v0"
]
def discover_servers_multi_registry(self, query: str) -> List[Dict[str, Any]]:
"""Server aus mehreren Registries entdecken"""
all_servers = []
for registry_url in self.preferred_registries:
try:
client = MCPRegistryClient(registry_url)
servers = client.discover_servers(query)
# Herkunftsinfo hinzufügen
for server in servers:
server["_source_registry"] = registry_url
all_servers.extend(servers)
except Exception as e:
print(f"Fehler beim Abrufen von {registry_url}: {e}")
continue
# Deduplizieren (basierend auf Servernamen)
seen = set()
unique_servers = []
for server in all_servers:
if server["name"] not in seen:
seen.add(server["name"])
unique_servers.append(server)
return unique_servers
def filter_by_criteria(self, servers: List[Dict[str, Any]], criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Server anhand von Kriterien filtern"""
filtered = []
for server in servers:
# Nach Status filtern
if criteria.get("status") and server.get("status") != criteria["status"]:
continue
# Nach Pakettyp filtern
if criteria.get("package_types"):
server_types = {pkg["registry_type"] for pkg in server.get("packages", [])}
if not server_types.intersection(set(criteria["package_types"])):
continue
# Nach Bewertung filtern
if criteria.get("min_rating"):
rating = server.get("_meta", {}).get("user_rating", 0)
if rating < criteria["min_rating"]:
continue
filtered.append(server)
return filteredIhr Client sollte mit Registries umgehen können, die nur das Minimum der Spezifikation erfüllen – d. h. keine harte Abhängigkeit von _meta‑Feldern.
Server ausführen
Über die Felder packages oder remotes lässt sich bestimmen, wie ein Server ausgeführt wird. Details zu diesen Feldern finden Sie in der server.json‑Dokumentation.
def execute_server(self, config: Dict[str, Any]) -> subprocess.Popen:
"""MCP-Server ausführen"""
if config["type"] == "package":
return self.execute_package_server(config)
elif config["type"] == "remote":
return self.connect_remote_server(config)
else:
raise ValueError(f"Nicht unterstützter Servertyp: {config['type']}")
def execute_package_server(self, config: Dict[str, Any]) -> subprocess.Popen:
"""Server vom Typ "packages" ausführen"""
package = config["package"]
# Befehl bauen
if package["registry"] == "npm":
cmd = ["npx", package["identifier"]]
elif package["registry"] == "pypi":
cmd = ["uvx" if "runtime_hint" in package else "python", "-m", package["identifier"]]
elif package["registry"] == "oci":
cmd = ["docker", "run", package["identifier"]]
else:
raise ValueError(f"Nicht unterstützte Paket-Registry: {package['registry']}")
# Argumente hinzufügen
if "arguments" in package:
for arg in package["arguments"]:
if arg["type"] == "positional":
cmd.append(arg["value"])
elif arg["type"] == "named":
cmd.extend([arg["name"], arg["value"]])
# Umgebungsvariablen setzen
env = os.environ.copy()
if "environment" in package:
for env_var in package["environment"]:
if env_var.get("is_required") and env_var["name"] not in env:
raise ValueError(f"Erforderliche Umgebungsvariable fehlt: {env_var['name']}")
env[env_var["name"]] = env.get(env_var["name"], env_var.get("default", ""))
return subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)Best Practices
1. Caching‑Strategie
import redis
import json
from datetime import timedelta
class CachedRegistryClient:
def __init__(self, redis_client=None, cache_ttl=3600):
self.redis = redis_client or redis.Redis()
self.cache_ttl = cache_ttl
def get_servers_cached(self, **params):
"""Serverabruf mit Cache"""
cache_key = f"registry:servers:{hash(frozenset(params.items()))}"
# Versuch, aus dem Cache zu holen
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Aus der Registry holen
servers = self.fetch_servers(**params)
# Ergebnis cachen
self.redis.setex(
cache_key,
timedelta(seconds=self.cache_ttl),
json.dumps(servers)
)
return servers2. Fehlerbehandlung
import time
import random
class RobustRegistryClient:
def __init__(self, max_retries=3, base_delay=1):
self.max_retries = max_retries
self.base_delay = base_delay
def fetch_with_retry(self, url, **kwargs):
"""Request mit Retry"""
for attempt in range(self.max_retries):
try:
response = requests.get(url, timeout=30, **kwargs)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt == self.max_retries - 1:
raise
# Exponentielles Backoff
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)3. Inkrementelle Aktualisierung
class IncrementalRegistrySync:
def __init__(self, last_sync_file="last_sync.txt"):
self.last_sync_file = last_sync_file
def get_last_sync_time(self):
"""Letzte Sync-Zeit abrufen"""
try:
with open(self.last_sync_file, 'r') as f:
return datetime.fromisoformat(f.read().strip())
except FileNotFoundError:
return datetime.min
def save_sync_time(self, sync_time):
"""Sync-Zeit speichern"""
with open(self.last_sync_file, 'w') as f:
f.write(sync_time.isoformat())
def sync_changes(self):
"""Änderungen synchronisieren (sobald updated_at unterstützt wird)"""
last_sync = self.get_last_sync_time()
# Zukünftiger API-Aufruf
# servers = self.fetch_servers(updated_after=last_sync)
# Derzeit alle Server abrufen und lokal filtern
all_servers = self.fetch_all_servers()
self.save_sync_time(datetime.utcnow())
return all_servers