
El Protocolo de Contexto de Modelo (MCP) es un protocolo abierto desarrollado originalmente por Anthropic que estandariza cómo las aplicaciones proporcionan contexto a los Modelos de Lenguaje Grande (LLM). Databricks MCP Server actúa como un puente entre la potente plataforma de análisis de datos de Databricks y los modelos de IA, permitiendo que los LLM interactúen con tu entorno de Databricks mediante programación. Esta integración permite a los asistentes de IA ejecutar consultas SQL, listar y obtener detalles de las ejecuciones de trabajos, y acceder a los datos dentro de tu cuenta de Databricks sin intervención humana directa.
En esta guía completa, te guiaremos a través del proceso de configuración, y utilización de Databricks MCP Server para operaciones y análisis de datos mejorados impulsados por IA.
Comprensión del Protocolo de Contexto de Modelo (MCP)
Antes de sumergirte en la implementación específica de Databricks, es importante entender qué es MCP y por qué es importante:
MCP implementa una arquitectura cliente/servidor con comunicación bidireccional:
- El servidor aloja herramientas que el LLM puede utilizar
- El cliente orquesta las conversaciones entre el usuario, el LLM y las herramientas
Los beneficios clave que hacen que MCP sea valioso incluyen:
- Separación de preocupaciones: Los desarrolladores de herramientas pueden crear y actualizar capacidades de forma independiente
- Modularidad: Se pueden añadir nuevas herramientas de forma incremental sin cambios en todo el sistema
- Estandarización: Una interfaz consistente significa menos trabajo de integración entre los componentes
- Escalabilidad: Los componentes se pueden escalar de forma independiente en función de la demanda
Requisitos previos para ejecutar Databricks MCP Server

Antes de configurar el Databricks MCP Server, asegúrate de tener:
- Una cuenta de Databricks con los permisos apropiados
- Python 3.8+ instalado en tu sistema
- Databricks CLI configurado con autenticación
- Conocimientos básicos de los conceptos del espacio de trabajo de Databricks
- Familiaridad con SQL y los trabajos de Databricks
Paso 1: Configuración del entorno
Primero, vamos a configurar nuestro entorno instalando los paquetes necesarios:
# Create a virtual environment
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # On Windows: databricks-mcp-env\\\\Scripts\\\\activate
# Install the required packages
pip install databricks-sdk mcp-server httpx fastapi uvicorn
Paso 2: Creación del Databricks MCP Server
Crea un nuevo archivo llamado databricks_mcp.py
con la siguiente estructura de código:
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# Initialize FastMCP server
mcp = FastMCP("databricks")
# Initialize Databricks client
workspace = WorkspaceClient()
Paso 3: Implementación de las herramientas centrales de MCP para Databricks
Ahora, vamos a implementar las herramientas centrales que permitirán a los LLM interactuar con tu entorno de Databricks:
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""Run a SQL query in Databricks SQL warehouse.
Args:
query: SQL query to execute
warehouse_id: Optional ID of the SQL warehouse to use
Returns:
Dictionary containing query results
"""
# Validate the query for security
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "Potentially destructive SQL operations are not allowed"}
try:
# Execute the query
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# Process results
if statement.status.state == StatementState.SUCCEEDED:
# Convert results to a more usable format
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
A continuación, vamos a añadir una herramienta para listar las bases de datos disponibles:
@mcp.tool()
async def list_databases() -> List[str]:
"""List all available databases in the Databricks workspace.
Returns:
List of database names
"""
try:
# Run a query to get all databases
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# Extract database names from the result
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"Error listing databases: {result.get('error')}"]
except Exception as e:
return [f"Error: {str(e)}"]
Ahora, vamos a implementar una herramienta para obtener el esquema de la tabla:
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""Get the schema of a specific table.
Args:
database_name: Name of the database
table_name: Name of the table
Returns:
Dictionary containing table schema information
"""
try:
# Run a query to get the table schema
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
Paso 4: Adición de herramientas de gestión de trabajos
Vamos a añadir herramientas para gestionar y consultar los trabajos de Databricks:
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""List jobs in the Databricks workspace.
Args:
limit: Maximum number of jobs to return
Returns:
List of job details
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""Get recent runs for a specific job.
Args:
job_id: ID of the job
limit: Maximum number of runs to return
Returns:
Dictionary containing job run details
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
Paso 5: Implementación de herramientas de gestión de clústeres
Ahora vamos a añadir herramientas para interactuar con los clústeres de Databricks:
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""List all clusters in the Databricks workspace.
Returns:
List of cluster details
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
Paso 6: Ejecución del servidor MCP
Finalmente, añade el código para ejecutar el servidor MCP:
if __name__ == "__main__":
# Run server with stdio transport for Claude Desktop compatibility
# or HTTP for web-based clients
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# Run as HTTP server
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# Default to stdio for Claude Desktop
mcp.run(transport='stdio')
Paso 7: Configuración del cliente MCP
Para utilizar el Databricks MCP Server con un cliente MCP como Claude Desktop, tendrás que crear una configuración:
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
Si prefieres utilizar el transporte HTTP, puedes ejecutar el servidor en un proceso separado y configurar el cliente en consecuencia:
{
"mcpServers": {
"databricks": {
"url": "<http://localhost:8000>"
}
}
}
Paso 8: Construcción de una aplicación cliente (opcional)
Para una experiencia más personalizada, puedes construir una aplicación cliente que aproveche el Databricks MCP Server. Aquí tienes un ejemplo de un cliente basado en Python que se conecta tanto al servidor MCP como al modelo Meta Llama de Databricks:
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Initialize Databricks client
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"Initialized OpenAI-compatible client for {llm_endpoint_name}")
async def connect_to_server(self, server_script_path: str):
"""Connect to the Databricks MCP server"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\\\\nConnected to Databricks MCP server with tools:", [tool.name for tool in tools])
# Display tool descriptions
print("\\\\nAvailable Databricks Tools:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Add additional methods for processing queries, etc.
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# Add interactive loop or other functionality
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
Mejores prácticas para utilizar Databricks MCP Server
- La seguridad es lo primero: Aplica siempre controles de acceso y valida las consultas SQL para evitar operaciones destructivas.
- Manejo de errores: Implementa un manejo de errores robusto tanto en las aplicaciones del servidor como del cliente.
- Limitación de velocidad: Añade limitación de velocidad para evitar llamadas API excesivas a Databricks.
- Registro: Implementa un registro exhaustivo para fines de depuración y auditoría.
- Paginación: Para conjuntos de resultados grandes, implementa la paginación para evitar problemas de memoria.
- Credenciales seguras: Utiliza variables de entorno o almacenamiento seguro de credenciales en lugar de codificar las credenciales.
- Documentación de herramientas: Escribe descripciones claras y detalladas para cada herramienta para ayudar al LLM a utilizarlas correctamente.
Características avanzadas
Añadir soporte para Delta Live Tables
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""List all Delta Live Table pipelines in the workspace.
Returns:
List of pipeline details
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
Añadir gestión de cuadernos
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""List notebooks in a specified path in the workspace.
Args:
path: Workspace path to list notebooks from
Returns:
List of notebook details
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
Conclusión
El Databricks MCP Server proporciona una forma poderosa de cerrar la brecha entre los modelos de IA y la infraestructura de datos de tu organización. Siguiendo esta guía completa, has aprendido a:
- Configurar y configurar el Databricks MCP Server
- Implementar herramientas centrales para la ejecución de consultas SQL y la exploración de bases de datos
- Añadir capacidades de gestión de trabajos y clústeres
- Configurar clientes MCP para interactuar con tu servidor
- Opcionalmente, construir una aplicación cliente personalizada
Esta integración permite a tus asistentes de IA aprovechar directamente las potentes capacidades de procesamiento de datos de Databricks, permitiendo un análisis de datos, una monitorización y una automatización más sofisticados. A medida que el ecosistema del Protocolo de Contexto de Modelo continúa evolucionando, tu Databricks MCP Server puede ampliarse con herramientas y capacidades adicionales para satisfacer las necesidades específicas de tu organización.
Al adoptar esta integración, estás dando un paso significativo hacia la creación de sistemas de IA más inteligentes y conscientes de los datos que pueden proporcionar conocimientos más profundos y una asistencia más eficaz para tus flujos de trabajo de análisis de datos.