
El Protocolo de Contexto de Modelo (MCP) es un protocolo abierto desarrollado originalmente por Anthropic que estandariza c贸mo las aplicaciones proporcionan contexto a los Modelos de Lenguaje Grande (LLM). Databricks MCP Server act煤a como un puente entre la potente plataforma de an谩lisis de datos de Databricks y los modelos de IA, permitiendo que los LLM interact煤en con tu entorno de Databricks mediante programaci贸n. Esta integraci贸n permite a los asistentes de IA ejecutar consultas SQL, listar y obtener detalles de las ejecuciones de trabajos, y acceder a los datos dentro de tu cuenta de Databricks sin intervenci贸n humana directa.
En esta gu铆a completa, te guiaremos a trav茅s del proceso de configuraci贸n, y utilizaci贸n de Databricks MCP Server para operaciones y an谩lisis de datos mejorados impulsados por IA.
Comprensi贸n del Protocolo de Contexto de Modelo (MCP)
Antes de sumergirte en la implementaci贸n espec铆fica de Databricks, es importante entender qu茅 es MCP y por qu茅 es importante:
MCP implementa una arquitectura cliente/servidor con comunicaci贸n bidireccional:
- El servidor aloja herramientas que el LLM puede utilizar
- El cliente orquesta las conversaciones entre el usuario, el LLM y las herramientas
Los beneficios clave que hacen que MCP sea valioso incluyen:
- Separaci贸n de preocupaciones: Los desarrolladores de herramientas pueden crear y actualizar capacidades de forma independiente
- Modularidad: Se pueden a帽adir nuevas herramientas de forma incremental sin cambios en todo el sistema
- Estandarizaci贸n: Una interfaz consistente significa menos trabajo de integraci贸n entre los componentes
- Escalabilidad: Los componentes se pueden escalar de forma independiente en funci贸n de la demanda
Requisitos previos para ejecutar Databricks MCP Server

Antes de configurar el Databricks MCP Server, aseg煤rate de tener:
- Una cuenta de Databricks con los permisos apropiados
- Python 3.8+ instalado en tu sistema
- Databricks CLI configurado con autenticaci贸n
- Conocimientos b谩sicos de los conceptos del espacio de trabajo de Databricks
- Familiaridad con SQL y los trabajos de Databricks
Paso 1: Configuraci贸n del entorno
Primero, vamos a configurar nuestro entorno instalando los paquetes necesarios:
# Create a virtual environment
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # On Windows: databricks-mcp-env\\\\Scripts\\\\activate
# Install the required packages
pip install databricks-sdk mcp-server httpx fastapi uvicorn
Paso 2: Creaci贸n del Databricks MCP Server
Crea un nuevo archivo llamado databricks_mcp.py
con la siguiente estructura de c贸digo:
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# Initialize FastMCP server
mcp = FastMCP("databricks")
# Initialize Databricks client
workspace = WorkspaceClient()
Paso 3: Implementaci贸n de las herramientas centrales de MCP para Databricks
Ahora, vamos a implementar las herramientas centrales que permitir谩n a los LLM interactuar con tu entorno de Databricks:
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""Run a SQL query in Databricks SQL warehouse.
Args:
query: SQL query to execute
warehouse_id: Optional ID of the SQL warehouse to use
Returns:
Dictionary containing query results
"""
# Validate the query for security
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "Potentially destructive SQL operations are not allowed"}
try:
# Execute the query
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# Process results
if statement.status.state == StatementState.SUCCEEDED:
# Convert results to a more usable format
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
A continuaci贸n, vamos a a帽adir una herramienta para listar las bases de datos disponibles:
@mcp.tool()
async def list_databases() -> List[str]:
"""List all available databases in the Databricks workspace.
Returns:
List of database names
"""
try:
# Run a query to get all databases
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# Extract database names from the result
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"Error listing databases: {result.get('error')}"]
except Exception as e:
return [f"Error: {str(e)}"]
Ahora, vamos a implementar una herramienta para obtener el esquema de la tabla:
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""Get the schema of a specific table.
Args:
database_name: Name of the database
table_name: Name of the table
Returns:
Dictionary containing table schema information
"""
try:
# Run a query to get the table schema
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
Paso 4: Adici贸n de herramientas de gesti贸n de trabajos
Vamos a a帽adir herramientas para gestionar y consultar los trabajos de Databricks:
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""List jobs in the Databricks workspace.
Args:
limit: Maximum number of jobs to return
Returns:
List of job details
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""Get recent runs for a specific job.
Args:
job_id: ID of the job
limit: Maximum number of runs to return
Returns:
Dictionary containing job run details
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
Paso 5: Implementaci贸n de herramientas de gesti贸n de cl煤steres
Ahora vamos a a帽adir herramientas para interactuar con los cl煤steres de Databricks:
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""List all clusters in the Databricks workspace.
Returns:
List of cluster details
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
Paso 6: Ejecuci贸n del servidor MCP
Finalmente, a帽ade el c贸digo para ejecutar el servidor MCP:
if __name__ == "__main__":
# Run server with stdio transport for Claude Desktop compatibility
# or HTTP for web-based clients
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# Run as HTTP server
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# Default to stdio for Claude Desktop
mcp.run(transport='stdio')
Paso 7: Configuraci贸n del cliente MCP
Para utilizar el Databricks MCP Server con un cliente MCP como Claude Desktop, tendr谩s que crear una configuraci贸n:
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
Si prefieres utilizar el transporte HTTP, puedes ejecutar el servidor en un proceso separado y configurar el cliente en consecuencia:
{
"mcpServers": {
"databricks": {
"url": "<http://localhost:8000>"
}
}
}
Paso 8: Construcci贸n de una aplicaci贸n cliente (opcional)
Para una experiencia m谩s personalizada, puedes construir una aplicaci贸n cliente que aproveche el Databricks MCP Server. Aqu铆 tienes un ejemplo de un cliente basado en Python que se conecta tanto al servidor MCP como al modelo Meta Llama de Databricks:
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Initialize Databricks client
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"Initialized OpenAI-compatible client for {llm_endpoint_name}")
async def connect_to_server(self, server_script_path: str):
"""Connect to the Databricks MCP server"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\\\\nConnected to Databricks MCP server with tools:", [tool.name for tool in tools])
# Display tool descriptions
print("\\\\nAvailable Databricks Tools:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Add additional methods for processing queries, etc.
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# Add interactive loop or other functionality
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
Mejores pr谩cticas para utilizar Databricks MCP Server
- La seguridad es lo primero: Aplica siempre controles de acceso y valida las consultas SQL para evitar operaciones destructivas.
- Manejo de errores: Implementa un manejo de errores robusto tanto en las aplicaciones del servidor como del cliente.
- Limitaci贸n de velocidad: A帽ade limitaci贸n de velocidad para evitar llamadas API excesivas a Databricks.
- Registro: Implementa un registro exhaustivo para fines de depuraci贸n y auditor铆a.
- Paginaci贸n: Para conjuntos de resultados grandes, implementa la paginaci贸n para evitar problemas de memoria.
- Credenciales seguras: Utiliza variables de entorno o almacenamiento seguro de credenciales en lugar de codificar las credenciales.
- Documentaci贸n de herramientas: Escribe descripciones claras y detalladas para cada herramienta para ayudar al LLM a utilizarlas correctamente.
Caracter铆sticas avanzadas
A帽adir soporte para Delta Live Tables
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""List all Delta Live Table pipelines in the workspace.
Returns:
List of pipeline details
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
A帽adir gesti贸n de cuadernos
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""List notebooks in a specified path in the workspace.
Args:
path: Workspace path to list notebooks from
Returns:
List of notebook details
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
Conclusi贸n
El Databricks MCP Server proporciona una forma poderosa de cerrar la brecha entre los modelos de IA y la infraestructura de datos de tu organizaci贸n. Siguiendo esta gu铆a completa, has aprendido a:
- Configurar y configurar el Databricks MCP Server
- Implementar herramientas centrales para la ejecuci贸n de consultas SQL y la exploraci贸n de bases de datos
- A帽adir capacidades de gesti贸n de trabajos y cl煤steres
- Configurar clientes MCP para interactuar con tu servidor
- Opcionalmente, construir una aplicaci贸n cliente personalizada
Esta integraci贸n permite a tus asistentes de IA aprovechar directamente las potentes capacidades de procesamiento de datos de Databricks, permitiendo un an谩lisis de datos, una monitorizaci贸n y una automatizaci贸n m谩s sofisticados. A medida que el ecosistema del Protocolo de Contexto de Modelo contin煤a evolucionando, tu Databricks MCP Server puede ampliarse con herramientas y capacidades adicionales para satisfacer las necesidades espec铆ficas de tu organizaci贸n.
Al adoptar esta integraci贸n, est谩s dando un paso significativo hacia la creaci贸n de sistemas de IA m谩s inteligentes y conscientes de los datos que pueden proporcionar conocimientos m谩s profundos y una asistencia m谩s eficaz para tus flujos de trabajo de an谩lisis de datos.