Comment utiliser le serveur MCP Databricks

Avant Databricks MCP, explore Apidog : test API gratuit. Debug, optimisez vos flux, gagnez du temps !

Louis Dupont

Louis Dupont

5 June 2025

Comment utiliser le serveur MCP Databricks
💡
Avant de plonger dans le Databricks MCP Server, consultez Apidog—un outil gratuit conçu pour simplifier les tests et l'intégration des API. Avec l'interface intuitive d'Apidog, vous pouvez facilement déboguer et optimiser vos flux de travail API, rationalisant ainsi le processus de développement et vous faisant gagner un temps précieux. Que vous construisiez des API ou que vous dépanniez des problèmes, Apidog a tout ce dont vous avez besoin pour améliorer votre flux de travail.
button

Le Model Context Protocol (MCP) est un protocole ouvert développé à l'origine par Anthropic qui standardise la façon dont les applications fournissent un contexte aux Large Language Models (LLMs). Le Databricks MCP Server agit comme un pont entre la puissante plateforme d'analyse de données de Databricks et les modèles d'IA, permettant aux LLMs d'interagir avec votre environnement Databricks par programmation. Cette intégration permet aux assistants d'IA d'exécuter des requêtes SQL, de lister et d'obtenir des détails sur les exécutions de tâches, et d'accéder aux données de votre compte Databricks sans intervention humaine directe.

Dans ce guide complet, nous allons vous guider à travers le processus de configuration et d'utilisation du Databricks MCP Server pour des opérations et des analyses de données améliorées basées sur l'IA.

Comprendre le Model Context Protocol (MCP)

Avant de plonger dans l'implémentation spécifique à Databricks, il est important de comprendre ce qu'est le MCP et pourquoi il est important :

MCP met en œuvre une architecture client/serveur avec une communication bidirectionnelle :

Les principaux avantages qui rendent le MCP précieux incluent :

Conditions préalables pour exécuter le Databricks MCP Server

Avant de configurer le Databricks MCP Server, assurez-vous d'avoir :

  1. Un compte Databricks avec les autorisations appropriées
  2. Python 3.8+ installé sur votre système
  3. Databricks CLI configuré avec l'authentification
  4. Une compréhension de base des concepts de l'espace de travail Databricks
  5. Une familiarité avec SQL et les tâches Databricks

Étape 1 : Configuration de l'environnement

Tout d'abord, configurons notre environnement en installant les packages nécessaires :

# Créer un environnement virtuel
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate  # Sur Windows : databricks-mcp-env\\\\Scripts\\\\activate

# Installer les packages requis
pip install databricks-sdk mcp-server httpx fastapi uvicorn

Étape 2 : Création du Databricks MCP Server

Créez un nouveau fichier appelé databricks_mcp.py avec la structure de code suivante :

from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState

# Initialiser le serveur FastMCP
mcp = FastMCP("databricks")

# Initialiser le client Databricks
workspace = WorkspaceClient()

Étape 3 : Implémentation des outils MCP de base pour Databricks

Maintenant, implémentons les outils de base qui permettront aux LLMs d'interagir avec votre environnement Databricks :

@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
    """Exécuter une requête SQL dans l'entrepôt SQL Databricks.

    Args:
        query: Requête SQL à exécuter
        warehouse_id: ID optionnel de l'entrepôt SQL à utiliser

    Returns:
        Dictionnaire contenant les résultats de la requête
    """
    # Valider la requête pour la sécurité
    if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
        return {"error": "Les opérations SQL potentiellement destructrices ne sont pas autorisées"}

    try:
        # Exécuter la requête
        statement = workspace.statement_execution.execute_statement(
            warehouse_id=warehouse_id,
            statement=query,
            wait_timeout=60
        )

        # Traiter les résultats
        if statement.status.state == StatementState.SUCCEEDED:
            # Convertir les résultats dans un format plus utilisable
            columns = [col.name for col in statement.manifest.schema.columns]
            rows = []
            for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
                for row_data in chunk.data:
                    rows.append(dict(zip(columns, row_data)))

            return {
                "success": True,
                "columns": columns,
                "rows": rows,
                "row_count": len(rows)
            }
        else:
            return {
                "success": False,
                "state": statement.status.state,
                "error": statement.status.error
            }
    except Exception as e:
        return {"success": False, "error": str(e)}

Ensuite, ajoutons un outil pour lister les bases de données disponibles :

@mcp.tool()
async def list_databases() -> List[str]:
    """Lister toutes les bases de données disponibles dans l'espace de travail Databricks.

    Returns:
        Liste des noms de bases de données
    """
    try:
        # Exécuter une requête pour obtenir toutes les bases de données
        result = await run_sql_query("SHOW DATABASES")
        if result.get("success"):
            # Extraire les noms de bases de données du résultat
            return [row.get("databaseName") for row in result.get("rows", [])]
        else:
            return [f"Erreur lors de la liste des bases de données : {result.get('error')}"]
    except Exception as e:
        return [f"Erreur : {str(e)}"]

Maintenant, implémentons un outil pour obtenir le schéma de la table :

@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
    """Obtenir le schéma d'une table spécifique.

    Args:
        database_name: Nom de la base de données
        table_name: Nom de la table

    Returns:
        Dictionnaire contenant les informations du schéma de la table
    """
    try:
        # Exécuter une requête pour obtenir le schéma de la table
        result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
        if result.get("success"):
            return {
                "database": database_name,
                "table": table_name,
                "columns": result.get("rows", [])
            }
        else:
            return {"error": result.get("error")}
    except Exception as e:
        return {"error": str(e)}

Étape 4 : Ajout d'outils de gestion des tâches

Ajoutons des outils pour gérer et interroger les tâches Databricks :

@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
    """Lister les tâches dans l'espace de travail Databricks.

    Args:
        limit: Nombre maximum de tâches à renvoyer

    Returns:
        Liste des détails des tâches
    """
    try:
        jobs = workspace.jobs.list(limit=limit)
        result = []
        for job in jobs:
            result.append({
                "job_id": job.job_id,
                "name": job.settings.name,
                "created_by": job.created_by,
                "created_time": job.created_time
            })
        return result
    except Exception as e:
        return [{"error": str(e)}]

@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
    """Obtenir les exécutions récentes d'une tâche spécifique.

    Args:
        job_id: ID de la tâche
        limit: Nombre maximum d'exécutions à renvoyer

    Returns:
        Dictionnaire contenant les détails de l'exécution de la tâche
    """
    try:
        runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)

        result = {
            "job_id": job_id,
            "runs": []
        }

        for run in runs.runs:
            run_info = {
                "run_id": run.run_id,
                "state": run.state.life_cycle_state,
                "result_state": run.state.result_state,
                "start_time": run.start_time,
                "setup_duration": run.setup_duration,
                "execution_duration": run.execution_duration,
                "cleanup_duration": run.cleanup_duration
            }
            result["runs"].append(run_info)

        return result
    except Exception as e:
        return {"error": str(e)}

Étape 5 : Implémentation des outils de gestion des clusters

Ajoutons maintenant des outils pour interagir avec les clusters Databricks :

@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
    """Lister tous les clusters dans l'espace de travail Databricks.

    Returns:
        Liste des détails du cluster
    """
    try:
        clusters = workspace.clusters.list()
        result = []

        for cluster in clusters:
            cluster_info = {
                "cluster_id": cluster.cluster_id,
                "cluster_name": cluster.cluster_name,
                "state": cluster.state,
                "creator_username": cluster.creator_user_name,
                "spark_version": cluster.spark_version,
                "node_type_id": cluster.node_type_id
            }
            result.append(cluster_info)

        return result
    except Exception as e:
        return [{"error": str(e)}]

Étape 6 : Exécution du serveur MCP

Enfin, ajoutez le code pour exécuter le serveur MCP :

if __name__ == "__main__":
    # Exécuter le serveur avec le transport stdio pour la compatibilité avec Claude Desktop
    # ou HTTP pour les clients basés sur le Web
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "--http":
        # Exécuter en tant que serveur HTTP
        mcp.run(transport='http', host="127.0.0.1", port=8000)
    else:
        # Par défaut à stdio pour Claude Desktop
        mcp.run(transport='stdio')

Étape 7 : Configuration du client MCP

Pour utiliser le Databricks MCP Server avec un client MCP comme Claude Desktop, vous devrez créer une configuration :

{
  "mcpServers": {
    "databricks": {
      "command": "python",
      "args": ["databricks_mcp.py"]
    }
  }
}

Si vous préférez utiliser le transport HTTP, vous pouvez exécuter le serveur dans un processus séparé et configurer le client en conséquence :

{
  "mcpServers": {
    "databricks": {
      "url": "<http://localhost:8000>"
    }
  }
}

Étape 8 : Création d'une application cliente (facultatif)

Pour une expérience plus personnalisée, vous pouvez créer une application cliente qui exploite le Databricks MCP Server. Voici un exemple de client basé sur Python qui se connecte à la fois au serveur MCP et au modèle Meta Llama de Databricks :

import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any

from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters

class DatabricksMCPClient:
    def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
        # Initialiser les objets de session et de client
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()

        # Initialiser le client Databricks
        self.workspace = WorkspaceClient()
        self.llm_endpoint_name = llm_endpoint_name
        self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
        print(f"Client compatible OpenAI initialisé pour {llm_endpoint_name}")

    async def connect_to_server(self, server_script_path: str):
        """Se connecter au serveur Databricks MCP"""
        server_params = StdioServerParameters(
            command="python",
            args=[server_script_path],
            env=None
        )
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # Lister les outils disponibles
        response = await self.session.list_tools()
        tools = response.tools
        print("\\\\nConnecté au serveur Databricks MCP avec les outils :", [tool.name for tool in tools])

        # Afficher les descriptions des outils
        print("\\\\nOutils Databricks disponibles :")
        for tool in tools:
            print(f"- {tool.name}: {tool.description}")

    # Ajouter des méthodes supplémentaires pour traiter les requêtes, etc.

async def main():
    client = DatabricksMCPClient()
    try:
        await client.connect_to_server("databricks_mcp.py")
        # Ajouter une boucle interactive ou d'autres fonctionnalités
    finally:
        await client.exit_stack.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Meilleures pratiques pour l'utilisation du Databricks MCP Server

  1. La sécurité avant tout : appliquez toujours des contrôles d'accès et validez les requêtes SQL pour éviter les opérations destructrices.
  2. Gestion des erreurs : implémentez une gestion robuste des erreurs dans les applications serveur et client.
  3. Limitation du débit : ajoutez une limitation du débit pour éviter les appels d'API excessifs vers Databricks.
  4. Journalisation : implémentez une journalisation complète à des fins de débogage et d'audit.
  5. Pagination : pour les grands ensembles de résultats, implémentez la pagination pour éviter les problèmes de mémoire.
  6. Informations d'identification sécurisées : utilisez des variables d'environnement ou un stockage d'informations d'identification sécurisé plutôt que de coder en dur les informations d'identification.
  7. Documentation des outils : rédigez des descriptions claires et détaillées pour chaque outil afin d'aider le LLM à les utiliser correctement.

Fonctionnalités avancées

Ajout de la prise en charge des Delta Live Tables

@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
    """Lister tous les pipelines Delta Live Table dans l'espace de travail.

    Returns:
        Liste des détails du pipeline
    """
    try:
        pipelines = workspace.pipelines.list()
        result = []

        for pipeline in pipelines:
            pipeline_info = {
                "pipeline_id": pipeline.pipeline_id,
                "name": pipeline.name,
                "state": pipeline.state,
                "creator": pipeline.creator,
                "target": pipeline.target
            }
            result.append(pipeline_info)

        return result
    except Exception as e:
        return [{"error": str(e)}]

Ajout de la gestion des notebooks

@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
    """Lister les notebooks dans un chemin spécifié dans l'espace de travail.

    Args:
        path: Chemin de l'espace de travail pour lister les notebooks

    Returns:
        Liste des détails du notebook
    """
    try:
        objects = workspace.workspace.list(path)
        notebooks = []

        for obj in objects:
            if obj.object_type == "NOTEBOOK":
                notebooks.append({
                    "path": obj.path,
                    "name": obj.path.split("/")[-1],
                    "language": obj.language
                })

        return notebooks
    except Exception as e:
        return [{"error": str(e)}]

Conclusion

Le Databricks MCP Server offre un moyen puissant de combler le fossé entre les modèles d'IA et l'infrastructure de données de votre organisation. En suivant ce guide complet, vous avez appris à :

  1. Configurer le Databricks MCP Server
  2. Implémenter des outils de base pour l'exécution de requêtes SQL et l'exploration de bases de données
  3. Ajouter des fonctionnalités de gestion des tâches et des clusters
  4. Configurer les clients MCP pour interagir avec votre serveur
  5. Créer éventuellement une application cliente personnalisée

Cette intégration permet à vos assistants d'IA d'exploiter directement les puissantes capacités de traitement des données de Databricks, ce qui permet une analyse des données, une surveillance et une automatisation plus sophistiquées. Au fur et à mesure que l'écosystème du Model Context Protocol continue d'évoluer, votre Databricks MCP Server peut être étendu avec des outils et des capacités supplémentaires pour répondre aux besoins spécifiques de votre organisation.

En adoptant cette intégration, vous faites un pas important vers la création de systèmes d'IA plus intelligents et conscients des données qui peuvent fournir des informations plus approfondies et une assistance plus efficace pour vos flux de travail d'analyse de données.

Explore more

Fathom-R1-14B : Modèle de raisonnement IA avancé d'Inde

Fathom-R1-14B : Modèle de raisonnement IA avancé d'Inde

L'IA en expansion rapide. Fathom-R1-14B (14,8 milliards de paramètres) excelle en raisonnement mathématique et général, conçu par Fractal AI Research.

5 June 2025

Mistral Code : L'assistant de codage le plus personnalisable basé sur l'IA pour les entreprises

Mistral Code : L'assistant de codage le plus personnalisable basé sur l'IA pour les entreprises

Découvrez Mistral Code, l'IA d'aide au code la plus personnalisable pour les entreprises.

5 June 2025

Comment Claude Code transforme le codage de l'IA en 2025

Comment Claude Code transforme le codage de l'IA en 2025

Découvrez Claude Code en 2025 : codage IA révolutionné. Fonctionnalités, démo, et pourquoi il gagne du terrain après Windsurf d'Anthropic. Indispensable !

5 June 2025

Pratiquez le Design-first d'API dans Apidog

Découvrez une manière plus simple de créer et utiliser des API