Registry 데이터 활용

Registry 데이터 활용

REST API를 통해 MCP Registry 데이터를 활용하는 애플리케이션을 구축할 때 유용한 통합 패턴과 모범 사례를 정리합니다.

기본 정보

Base URL: https://registry.modelcontextprotocol.io

인증(Authentication): 읽기 전용(read-only) 접근은 인증이 필요하지 않습니다.

핵심 엔드포인트:

  • GET /v0/servers - 페이지네이션으로 서버 목록 조회
  • GET /v0/servers/{id} - UUID로 서버 상세 조회

대화형 API 문서에서 전체 요청/응답 스키마를 확인할 수 있습니다.

면책 고지: 공식 Registry는 가용성(uptime)이나 데이터 지속성에 대한 보장을 제공하지 않습니다. 애플리케이션은 캐싱을 통해 서비스 중단을 견딜 수 있도록 설계해야 합니다.

빠른 시작

기본 서버 목록

# Fetch first 10 servers
curl "https://registry.modelcontextprotocol.io/v0/servers?limit=10"
{
  "servers": [
    {
      "name": "io.modelcontextprotocol/filesystem",
      "description": "Filesystem operations server",
      "status": "active",
      "version": "1.0.2"
    }
  ],
  "metadata": {
    "count": 10,
    "next_cursor": "eyJ..."
  }
}

서버 검색

# Search by keyword
curl "https://registry.modelcontextprotocol.io/v0/servers?search=filesystem"

# Filter by status
curl "https://registry.modelcontextprotocol.io/v0/servers?status=active"

# Combine filters
curl "https://registry.modelcontextprotocol.io/v0/servers?search=weather&limit=5"

서버 상세 조회

# Fetch full details for a server
curl "https://registry.modelcontextprotocol.io/v0/servers/{server-uuid}"

서브레지스트리 구축

확장 레지스트리 만들기 - 공식 Registry 데이터를 ETL로 수집하고, 평점/보안 스캔/호환성 같은 자체 메타데이터를 추가해 더 가치 있는 레지스트리를 만들 수 있습니다.

ETL 흐름

현재는 GET /v0/servers 엔드포인트를 주기적으로 수집하는 방식을 권장합니다. 향후 updated_at 필터(#291)가 제공되면, 최근 변경된 서버만 증분으로 가져올 수 있을 가능성이 있습니다.

import requests
import time
from datetime import datetime

class RegistryETL:
    def __init__(self, base_url="https://registry.modelcontextprotocol.io"):
        self.base_url = base_url
        
    def fetch_all_servers(self):
        """Fetch all servers and handle pagination."""
        servers = []
        cursor = None
        
        while True:
            params = {"limit": 100}
            if cursor:
                params["cursor"] = cursor
                
            response = requests.get(f"{self.base_url}/v0/servers", params=params)
            response.raise_for_status()
            
            data = response.json()
            servers.extend(data["servers"])
            
            # Check if there are more pages
            if not data.get("metadata", {}).get("next_cursor"):
                break
                
            cursor = data["metadata"]["next_cursor"]
            
        return servers
    
    def process_servers(self, servers):
        """Process servers and attach enrichment metadata."""
        enhanced_servers = []
        
        for server in servers:
            # Skip non-active servers
            if server.get("status") != "active":
                continue
                
            # Attach custom metadata
            enhanced_server = {
                **server,
                "_meta": {
                    **server.get("_meta", {}),
                    "com.yourregistry/enhanced": {
                        "last_processed": datetime.utcnow().isoformat(),
                        "popularity_score": self.calculate_popularity(server),
                        "security_scan": self.perform_security_scan(server)
                    }
                }
            }
            
            enhanced_servers.append(enhanced_server)
            
        return enhanced_servers
    
    def calculate_popularity(self, server):
        """Calculate a popularity score (example logic)."""
        # Implement your scoring logic
        base_score = 1.0
        
        # Adjust based on description quality
        if len(server.get("description", "")) > 100:
            base_score += 0.2
            
        # Adjust based on number of packages
        package_count = len(server.get("packages", []))
        if package_count > 1:
            base_score += 0.1 * package_count
            
        return min(base_score, 5.0)
    
    def perform_security_scan(self, server):
        """Perform a security scan (example)."""
        return {
            "last_scanned": datetime.utcnow().isoformat(),
            "vulnerabilities_found": 0,
            "scan_status": "passed"
        }

# Example usage
etl = RegistryETL()
servers = etl.fetch_all_servers()
enhanced_servers = etl.process_servers(servers)

상태 관리

서버 데이터는 일반적으로 불변(immutable)이며, 예외적으로 status 필드는 deleted(및 기타 상태)로 변경될 수 있습니다. 서버가 deleted로 표시되면, 여러분의 레지스트리에서도 해당 엔트리를 즉시 deleted로 반영하거나 제거하는 것을 권장합니다. 이 상태는 보통 (다소 완화된) 모더레이션 가이드라인 위반을 의미하며, 불법/악성코드/스팸 등의 가능성을 시사합니다.

def sync_server_status(self, local_server, remote_server):
    """Sync server status changes."""
    if remote_server["status"] == "deleted":
        # Immediately remove or mark as deleted locally
        self.mark_server_deleted(local_server["id"])
        
    elif remote_server["status"] != local_server.get("status"):
        # Update other status changes
        self.update_server_status(
            local_server["id"], 
            remote_server["status"]
        )

필터링 및 강화

공식 Registry는 완화된 모더레이션 정책을 채택하고 있습니다. 따라서 서비스 목적에 따라 Registry 데이터 위에 자체 품질/보안 필터링을 적용하고 싶을 수 있습니다.

def apply_quality_filters(self, servers):
    """Apply quality filters."""
    filtered_servers = []
    
    for server in servers:
        # Basic quality checks
        if not self.meets_quality_standards(server):
            continue
            
        # Security checks
        if not self.passes_security_scan(server):
            continue
            
        # License checks
        if not self.has_acceptable_license(server):
            continue
            
        filtered_servers.append(server)
        
    return filtered_servers

def enhance_with_metadata(self, server):
    """Attach enrichment metadata to a server."""
    enhanced = {
        **server,
        "_meta": {
            **server.get("_meta", {}),
            "com.yourregistry/enhanced": {
                "user_rating": self.get_user_rating(server["name"]),
                "download_count": self.get_download_count(server["name"]),
                "last_updated": self.get_last_update_time(server["name"]),
                "tags": self.generate_tags(server),
                "compatibility": self.check_compatibility(server)
            }
        }
    }
    
    return enhanced

API 제공

클라이언트가 레지스트리 간 전환을 쉽게 할 수 있도록, 서브레지스트리에서도 Registry API 사양과 호환되는 API를 제공하는 것을 권장합니다. 자세한 내용은 Registry API 문서를 참고하세요.

from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/v0/servers')
def list_servers():
    """Implement a server list endpoint compatible with the official Registry."""
    
    # Parse query params
    limit = min(int(request.args.get('limit', 20)), 100)
    cursor = request.args.get('cursor')
    search = request.args.get('search')
    status = request.args.get('status', 'active')
    
    # Apply filters
    servers = self.get_filtered_servers(
        limit=limit,
        cursor=cursor,
        search=search,
        status=status
    )
    
    # Compute next cursor
    next_cursor = None
    if len(servers) == limit:
        next_cursor = self.generate_cursor(servers[-1])
    
    return jsonify({
        "servers": servers,
        "metadata": {
            "count": len(servers),
            "next_cursor": next_cursor
        }
    })

@app.route('/v0/servers/<server_id>')
def get_server(server_id):
    """Fetch server details."""
    server = self.get_server_by_id(server_id)
    
    if not server:
        return jsonify({"error": "Server not found"}), 404
        
    return jsonify(server)

MCP 클라이언트 통합

Registry 데이터를 클라이언트 설정으로 변환 - 서버 정보를 조회한 뒤, 패키지/원격 정보를 MCP 클라이언트가 이해하는 설정 형태로 변환합니다.

기본 통합

import requests
from typing import List, Dict, Any

class MCPRegistryClient:
    def __init__(self, registry_url="https://registry.modelcontextprotocol.io"):
        self.registry_url = registry_url
        
    def discover_servers(self, search_query: str = None) -> List[Dict[str, Any]]:
        """Discover available MCP servers."""
        params = {"status": "active", "limit": 50}
        if search_query:
            params["search"] = search_query
            
        response = requests.get(f"{self.registry_url}/v0/servers", params=params)
        response.raise_for_status()
        
        return response.json()["servers"]
    
    def get_server_details(self, server_id: str) -> Dict[str, Any]:
        """Fetch server details."""
        response = requests.get(f"{self.registry_url}/v0/servers/{server_id}")
        response.raise_for_status()
        
        return response.json()
    
    def convert_to_client_config(self, server: Dict[str, Any]) -> Dict[str, Any]:
        """Convert a Registry server entry to a client config."""
        
        if "packages" in server:
            return self.convert_package_server(server)
        elif "remotes" in server:
            return self.convert_remote_server(server)
        else:
            raise ValueError(f"Server {server['name']} has no packages or remotes")
    
    def convert_package_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
        """Convert a package-based server."""
        # Select preferred package (e.g., prefer npm)
        package = self.select_preferred_package(server["packages"])
        
        config = {
            "name": server["name"],
            "description": server["description"],
            "type": "package",
            "package": {
                "registry": package["registry_type"],
                "identifier": package["identifier"],
                "version": package["version"]
            }
        }
        
        # Attach runtime parameters
        if "package_arguments" in package:
            config["package"]["arguments"] = package["package_arguments"]
            
        if "environment_variables" in package:
            config["package"]["environment"] = package["environment_variables"]
            
        return config
    
    def convert_remote_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
        """Convert a remote server."""
        # Select preferred transport (e.g., prefer SSE)
        remote = self.select_preferred_remote(server["remotes"])
        
        config = {
            "name": server["name"],
            "description": server["description"],
            "type": "remote",
            "remote": {
                "transport": remote["type"],
                "url": remote["url"]
            }
        }
        
        # Attach headers
        if "headers" in remote:
            config["remote"]["headers"] = remote["headers"]
            
        return config
    
    def select_preferred_package(self, packages: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Select a preferred package type."""
        # Priority order
        priority_order = ["npm", "pypi", "nuget", "oci", "mcpb"]
        
        for registry_type in priority_order:
            for package in packages:
                if package["registry_type"] == registry_type:
                    return package
                    
        # Fallback to the first available
        return packages[0]
    
    def select_preferred_remote(self, remotes: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Select a preferred remote transport."""
        # Prefer SSE
        for remote in remotes:
            if remote["type"] == "sse":
                return remote
                
        # Fallback to the first available
        return remotes[0]

# Example usage
client = MCPRegistryClient()

# Discover servers
servers = client.discover_servers("filesystem")

# Convert to client configs
configs = []
for server in servers:
    try:
        config = client.convert_to_client_config(server)
        configs.append(config)
    except ValueError as e:
        print(f"Skipping server {server['name']}: {e}")

print(f"Found {len(configs)} usable servers")

고급 필터링

공식 Registry에서 직접 데이터를 가져오기보다는, 서브레지스트리를 두고 이를 통해 데이터를 제공하는 방식을 강력히 권장합니다. 또한 클라이언트 사용자가 선호하는 레지스트리를 선택할 수 있도록(예: 기업 내부 레지스트리) 레지스트리 엔드포인트를 설정 가능하게 만드는 것이 좋습니다.

class AdvancedMCPClient:
    def __init__(self, preferred_registries=None):
        self.preferred_registries = preferred_registries or [
            "https://registry.modelcontextprotocol.io",
            "https://enterprise.registry.com/api/v0",
            "https://curated.mcpregistry.org/api/v0"
        ]
    
    def discover_servers_multi_registry(self, query: str) -> List[Dict[str, Any]]:
        """Discover servers across multiple registries."""
        all_servers = []
        
        for registry_url in self.preferred_registries:
            try:
                client = MCPRegistryClient(registry_url)
                servers = client.discover_servers(query)
                
                # Attach source registry information
                for server in servers:
                    server["_source_registry"] = registry_url
                    
                all_servers.extend(servers)
                
            except Exception as e:
                print(f"Failed to fetch from {registry_url}: {e}")
                continue
        
        # Deduplicate (by server name)
        seen = set()
        unique_servers = []
        
        for server in all_servers:
            if server["name"] not in seen:
                seen.add(server["name"])
                unique_servers.append(server)
                
        return unique_servers
    
    def filter_by_criteria(self, servers: List[Dict[str, Any]], criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Filter servers based on criteria."""
        filtered = []
        
        for server in servers:
            # Status filter
            if criteria.get("status") and server.get("status") != criteria["status"]:
                continue
                
            # Package type filter
            if criteria.get("package_types"):
                server_types = {pkg["registry_type"] for pkg in server.get("packages", [])}
                if not server_types.intersection(set(criteria["package_types"])):
                    continue
            
            # Rating filter
            if criteria.get("min_rating"):
                rating = server.get("_meta", {}).get("user_rating", 0)
                if rating < criteria["min_rating"]:
                    continue
                    
            filtered.append(server)
            
        return filtered

클라이언트는 최소 사양만 만족하는 레지스트리도 우아하게 처리해야 합니다. 즉, _meta 필드에 강하게 의존하지 않도록 설계하세요.

서버 실행

packages 또는 remotes 필드를 사용해 서버를 어떤 방식으로 실행할지 결정할 수 있습니다. 필드 정의와 세부 사항은 server.json 문서를 참고하세요.

def execute_server(self, config: Dict[str, Any]) -> subprocess.Popen:
    """Execute an MCP server."""
    
    if config["type"] == "package":
        return self.execute_package_server(config)
    elif config["type"] == "remote":
        return self.connect_remote_server(config)
    else:
        raise ValueError(f"Unsupported server type: {config['type']}")

def execute_package_server(self, config: Dict[str, Any]) -> subprocess.Popen:
    """Execute a package-based server."""
    package = config["package"]
    
    # Build command
    if package["registry"] == "npm":
        cmd = ["npx", package["identifier"]]
    elif package["registry"] == "pypi":
        cmd = ["uvx" if "runtime_hint" in package else "python", "-m", package["identifier"]]
    elif package["registry"] == "oci":
        cmd = ["docker", "run", package["identifier"]]
    else:
        raise ValueError(f"Unsupported package registry: {package['registry']}")
    
    # Attach arguments
    if "arguments" in package:
        for arg in package["arguments"]:
            if arg["type"] == "positional":
                cmd.append(arg["value"])
            elif arg["type"] == "named":
                cmd.extend([arg["name"], arg["value"]])
    
    # Set environment variables
    env = os.environ.copy()
    if "environment" in package:
        for env_var in package["environment"]:
            if env_var.get("is_required") and env_var["name"] not in env:
                raise ValueError(f"Missing required environment variable: {env_var['name']}")
            env[env_var["name"]] = env.get(env_var["name"], env_var.get("default", ""))
    
    return subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

모범 사례

1. 캐싱 전략

import redis
import json
from datetime import timedelta

class CachedRegistryClient:
    def __init__(self, redis_client=None, cache_ttl=3600):
        self.redis = redis_client or redis.Redis()
        self.cache_ttl = cache_ttl
    
    def get_servers_cached(self, **params):
        """Fetch servers with caching."""
        cache_key = f"registry:servers:{hash(frozenset(params.items()))}"
        
        # Try cache first
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Fetch from Registry
        servers = self.fetch_servers(**params)
        
        # Cache result
        self.redis.setex(
            cache_key, 
            timedelta(seconds=self.cache_ttl),
            json.dumps(servers)
        )
        
        return servers

2. 오류 처리

import time
import random

class RobustRegistryClient:
    def __init__(self, max_retries=3, base_delay=1):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def fetch_with_retry(self, url, **kwargs):
        """Send a request with retries."""
        for attempt in range(self.max_retries):
            try:
                response = requests.get(url, timeout=30, **kwargs)
                response.raise_for_status()
                return response.json()
                
            except requests.RequestException as e:
                if attempt == self.max_retries - 1:
                    raise
                
                # Exponential backoff
                delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                time.sleep(delay)

3. 증분 업데이트

class IncrementalRegistrySync:
    def __init__(self, last_sync_file="last_sync.txt"):
        self.last_sync_file = last_sync_file
    
    def get_last_sync_time(self):
        """Get the last sync time."""
        try:
            with open(self.last_sync_file, 'r') as f:
                return datetime.fromisoformat(f.read().strip())
        except FileNotFoundError:
            return datetime.min
    
    def save_sync_time(self, sync_time):
        """Persist the sync time."""
        with open(self.last_sync_file, 'w') as f:
            f.write(sync_time.isoformat())
    
    def sync_changes(self):
        """Sync changes (once the Registry supports updated_at filtering)."""
        last_sync = self.get_last_sync_time()
        
        # Future API call
        # servers = self.fetch_servers(updated_after=last_sync)
        
        # For now, fetch all servers and filter locally
        all_servers = self.fetch_all_servers()
        
        self.save_sync_time(datetime.utcnow())
        return all_servers

다음 단계

전체 API 사양을 확인하세요. 자신의 서버를 Registry에 게시하는 방법을 학습하세요. Registry의 기술 아키텍처를 이해하세요. Registry의 설계 원칙을 이해하세요.
권장 사항: 프로덕션에서는 공식 Registry를 직접 사용하기보다 서브레지스트리를 사용하세요. 이렇게 하면 캐싱, 필터링, 강화(enrichment) 기능을 자연스럽게 추가할 수 있습니다.