
Le Model Context Protocol (MCP) est un protocole ouvert développé à l'origine par Anthropic qui standardise la façon dont les applications fournissent un contexte aux Large Language Models (LLMs). Le Databricks MCP Server agit comme un pont entre la puissante plateforme d'analyse de données de Databricks et les modèles d'IA, permettant aux LLMs d'interagir avec votre environnement Databricks par programmation. Cette intégration permet aux assistants d'IA d'exécuter des requêtes SQL, de lister et d'obtenir des détails sur les exécutions de tâches, et d'accéder aux données de votre compte Databricks sans intervention humaine directe.
Dans ce guide complet, nous allons vous guider à travers le processus de configuration et d'utilisation du Databricks MCP Server pour des opérations et des analyses de données améliorées basées sur l'IA.
Comprendre le Model Context Protocol (MCP)
Avant de plonger dans l'implémentation spécifique à Databricks, il est important de comprendre ce qu'est le MCP et pourquoi il est important :
MCP met en œuvre une architecture client/serveur avec une communication bidirectionnelle :
- Le serveur héberge des outils que le LLM peut utiliser
- Le client orchestre les conversations entre l'utilisateur, le LLM et les outils
Les principaux avantages qui rendent le MCP précieux incluent :
- Séparation des préoccupations : les développeurs d'outils peuvent créer et mettre à jour des capacités indépendamment
- Modularité : de nouveaux outils peuvent être ajoutés de manière incrémentielle sans modifications à l'échelle du système
- Normalisation : une interface cohérente signifie moins de travail d'intégration entre les composants
- Évolutivité : les composants peuvent être mis à l'échelle indépendamment en fonction de la demande
Conditions préalables pour exécuter le Databricks MCP Server

Avant de configurer le Databricks MCP Server, assurez-vous d'avoir :
- Un compte Databricks avec les autorisations appropriées
- Python 3.8+ installé sur votre système
- Databricks CLI configuré avec l'authentification
- Une compréhension de base des concepts de l'espace de travail Databricks
- Une familiarité avec SQL et les tâches Databricks
Étape 1 : Configuration de l'environnement
Tout d'abord, configurons notre environnement en installant les packages nécessaires :
# Créer un environnement virtuel
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # Sur Windows : databricks-mcp-env\\\\Scripts\\\\activate
# Installer les packages requis
pip install databricks-sdk mcp-server httpx fastapi uvicorn
Étape 2 : Création du Databricks MCP Server
Créez un nouveau fichier appelé databricks_mcp.py
avec la structure de code suivante :
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# Initialiser le serveur FastMCP
mcp = FastMCP("databricks")
# Initialiser le client Databricks
workspace = WorkspaceClient()
Étape 3 : Implémentation des outils MCP de base pour Databricks
Maintenant, implémentons les outils de base qui permettront aux LLMs d'interagir avec votre environnement Databricks :
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""Exécuter une requête SQL dans l'entrepôt SQL Databricks.
Args:
query: Requête SQL à exécuter
warehouse_id: ID optionnel de l'entrepôt SQL à utiliser
Returns:
Dictionnaire contenant les résultats de la requête
"""
# Valider la requête pour la sécurité
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "Les opérations SQL potentiellement destructrices ne sont pas autorisées"}
try:
# Exécuter la requête
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# Traiter les résultats
if statement.status.state == StatementState.SUCCEEDED:
# Convertir les résultats dans un format plus utilisable
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
Ensuite, ajoutons un outil pour lister les bases de données disponibles :
@mcp.tool()
async def list_databases() -> List[str]:
"""Lister toutes les bases de données disponibles dans l'espace de travail Databricks.
Returns:
Liste des noms de bases de données
"""
try:
# Exécuter une requête pour obtenir toutes les bases de données
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# Extraire les noms de bases de données du résultat
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"Erreur lors de la liste des bases de données : {result.get('error')}"]
except Exception as e:
return [f"Erreur : {str(e)}"]
Maintenant, implémentons un outil pour obtenir le schéma de la table :
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""Obtenir le schéma d'une table spécifique.
Args:
database_name: Nom de la base de données
table_name: Nom de la table
Returns:
Dictionnaire contenant les informations du schéma de la table
"""
try:
# Exécuter une requête pour obtenir le schéma de la table
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
Étape 4 : Ajout d'outils de gestion des tâches
Ajoutons des outils pour gérer et interroger les tâches Databricks :
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""Lister les tâches dans l'espace de travail Databricks.
Args:
limit: Nombre maximum de tâches à renvoyer
Returns:
Liste des détails des tâches
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""Obtenir les exécutions récentes d'une tâche spécifique.
Args:
job_id: ID de la tâche
limit: Nombre maximum d'exécutions à renvoyer
Returns:
Dictionnaire contenant les détails de l'exécution de la tâche
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
Étape 5 : Implémentation des outils de gestion des clusters
Ajoutons maintenant des outils pour interagir avec les clusters Databricks :
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""Lister tous les clusters dans l'espace de travail Databricks.
Returns:
Liste des détails du cluster
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
Étape 6 : Exécution du serveur MCP
Enfin, ajoutez le code pour exécuter le serveur MCP :
if __name__ == "__main__":
# Exécuter le serveur avec le transport stdio pour la compatibilité avec Claude Desktop
# ou HTTP pour les clients basés sur le Web
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# Exécuter en tant que serveur HTTP
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# Par défaut à stdio pour Claude Desktop
mcp.run(transport='stdio')
Étape 7 : Configuration du client MCP
Pour utiliser le Databricks MCP Server avec un client MCP comme Claude Desktop, vous devrez créer une configuration :
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
Si vous préférez utiliser le transport HTTP, vous pouvez exécuter le serveur dans un processus séparé et configurer le client en conséquence :
{
"mcpServers": {
"databricks": {
"url": "<http://localhost:8000>"
}
}
}
Étape 8 : Création d'une application cliente (facultatif)
Pour une expérience plus personnalisée, vous pouvez créer une application cliente qui exploite le Databricks MCP Server. Voici un exemple de client basé sur Python qui se connecte à la fois au serveur MCP et au modèle Meta Llama de Databricks :
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# Initialiser les objets de session et de client
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Initialiser le client Databricks
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"Client compatible OpenAI initialisé pour {llm_endpoint_name}")
async def connect_to_server(self, server_script_path: str):
"""Se connecter au serveur Databricks MCP"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# Lister les outils disponibles
response = await self.session.list_tools()
tools = response.tools
print("\\\\nConnecté au serveur Databricks MCP avec les outils :", [tool.name for tool in tools])
# Afficher les descriptions des outils
print("\\\\nOutils Databricks disponibles :")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Ajouter des méthodes supplémentaires pour traiter les requêtes, etc.
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# Ajouter une boucle interactive ou d'autres fonctionnalités
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
Meilleures pratiques pour l'utilisation du Databricks MCP Server
- La sécurité avant tout : appliquez toujours des contrôles d'accès et validez les requêtes SQL pour éviter les opérations destructrices.
- Gestion des erreurs : implémentez une gestion robuste des erreurs dans les applications serveur et client.
- Limitation du débit : ajoutez une limitation du débit pour éviter les appels d'API excessifs vers Databricks.
- Journalisation : implémentez une journalisation complète à des fins de débogage et d'audit.
- Pagination : pour les grands ensembles de résultats, implémentez la pagination pour éviter les problèmes de mémoire.
- Informations d'identification sécurisées : utilisez des variables d'environnement ou un stockage d'informations d'identification sécurisé plutôt que de coder en dur les informations d'identification.
- Documentation des outils : rédigez des descriptions claires et détaillées pour chaque outil afin d'aider le LLM à les utiliser correctement.
Fonctionnalités avancées
Ajout de la prise en charge des Delta Live Tables
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""Lister tous les pipelines Delta Live Table dans l'espace de travail.
Returns:
Liste des détails du pipeline
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
Ajout de la gestion des notebooks
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""Lister les notebooks dans un chemin spécifié dans l'espace de travail.
Args:
path: Chemin de l'espace de travail pour lister les notebooks
Returns:
Liste des détails du notebook
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
Conclusion
Le Databricks MCP Server offre un moyen puissant de combler le fossé entre les modèles d'IA et l'infrastructure de données de votre organisation. En suivant ce guide complet, vous avez appris à :
- Configurer le Databricks MCP Server
- Implémenter des outils de base pour l'exécution de requêtes SQL et l'exploration de bases de données
- Ajouter des fonctionnalités de gestion des tâches et des clusters
- Configurer les clients MCP pour interagir avec votre serveur
- Créer éventuellement une application cliente personnalisée
Cette intégration permet à vos assistants d'IA d'exploiter directement les puissantes capacités de traitement des données de Databricks, ce qui permet une analyse des données, une surveillance et une automatisation plus sophistiquées. Au fur et à mesure que l'écosystème du Model Context Protocol continue d'évoluer, votre Databricks MCP Server peut être étendu avec des outils et des capacités supplémentaires pour répondre aux besoins spécifiques de votre organisation.
En adoptant cette intégration, vous faites un pas important vers la création de systèmes d'IA plus intelligents et conscients des données qui peuvent fournir des informations plus approfondies et une assistance plus efficace pour vos flux de travail d'analyse de données.