Cómo usar el servidor MCP de Databricks

💡Antes de explorar Databricks MCP Server, prueba Apidog: herramienta gratuita para testear e integrar APIs. ¡Optimiza tu flujo de trabajo! El Protocolo de Contexto de Modelo (MCP) es un protocolo abierto creado por Anthropic.

Daniel Costa

Daniel Costa

5 July 2025

Cómo usar el servidor MCP de Databricks
💡
Antes de sumergirte en Databricks MCP Server, echa un vistazo a Apidog, una herramienta gratuita diseñada para simplificar las pruebas e integración de API. Con la interfaz intuitiva de Apidog, puedes depurar y optimizar fácilmente tus flujos de trabajo de API, agilizando el proceso de desarrollo y ahorrándote un tiempo valioso. Ya sea que estés construyendo API o solucionando problemas, Apidog tiene todo lo que necesitas para mejorar tu flujo de trabajo.
button

El Protocolo de Contexto de Modelo (MCP) es un protocolo abierto desarrollado originalmente por Anthropic que estandariza cómo las aplicaciones proporcionan contexto a los Modelos de Lenguaje Grande (LLM). Databricks MCP Server actúa como un puente entre la potente plataforma de análisis de datos de Databricks y los modelos de IA, permitiendo que los LLM interactúen con tu entorno de Databricks mediante programación. Esta integración permite a los asistentes de IA ejecutar consultas SQL, listar y obtener detalles de las ejecuciones de trabajos, y acceder a los datos dentro de tu cuenta de Databricks sin intervención humana directa.

En esta guía completa, te guiaremos a través del proceso de configuración, y utilización de Databricks MCP Server para operaciones y análisis de datos mejorados impulsados por IA.

Comprensión del Protocolo de Contexto de Modelo (MCP)

Antes de sumergirte en la implementación específica de Databricks, es importante entender qué es MCP y por qué es importante:

MCP implementa una arquitectura cliente/servidor con comunicación bidireccional:

Los beneficios clave que hacen que MCP sea valioso incluyen:

Requisitos previos para ejecutar Databricks MCP Server

Antes de configurar el Databricks MCP Server, asegúrate de tener:

  1. Una cuenta de Databricks con los permisos apropiados
  2. Python 3.8+ instalado en tu sistema
  3. Databricks CLI configurado con autenticación
  4. Conocimientos básicos de los conceptos del espacio de trabajo de Databricks
  5. Familiaridad con SQL y los trabajos de Databricks

Paso 1: Configuración del entorno

Primero, vamos a configurar nuestro entorno instalando los paquetes necesarios:

# Create a virtual environment
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate  # On Windows: databricks-mcp-env\\\\Scripts\\\\activate

# Install the required packages
pip install databricks-sdk mcp-server httpx fastapi uvicorn

Paso 2: Creación del Databricks MCP Server

Crea un nuevo archivo llamado databricks_mcp.py con la siguiente estructura de código:

from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState

# Initialize FastMCP server
mcp = FastMCP("databricks")

# Initialize Databricks client
workspace = WorkspaceClient()

Paso 3: Implementación de las herramientas centrales de MCP para Databricks

Ahora, vamos a implementar las herramientas centrales que permitirán a los LLM interactuar con tu entorno de Databricks:

@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
    """Run a SQL query in Databricks SQL warehouse.

    Args:
        query: SQL query to execute
        warehouse_id: Optional ID of the SQL warehouse to use

    Returns:
        Dictionary containing query results
    """
    # Validate the query for security
    if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
        return {"error": "Potentially destructive SQL operations are not allowed"}

    try:
        # Execute the query
        statement = workspace.statement_execution.execute_statement(
            warehouse_id=warehouse_id,
            statement=query,
            wait_timeout=60
        )

        # Process results
        if statement.status.state == StatementState.SUCCEEDED:
            # Convert results to a more usable format
            columns = [col.name for col in statement.manifest.schema.columns]
            rows = []
            for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
                for row_data in chunk.data:
                    rows.append(dict(zip(columns, row_data)))

            return {
                "success": True,
                "columns": columns,
                "rows": rows,
                "row_count": len(rows)
            }
        else:
            return {
                "success": False,
                "state": statement.status.state,
                "error": statement.status.error
            }
    except Exception as e:
        return {"success": False, "error": str(e)}

A continuación, vamos a añadir una herramienta para listar las bases de datos disponibles:

@mcp.tool()
async def list_databases() -> List[str]:
    """List all available databases in the Databricks workspace.

    Returns:
        List of database names
    """
    try:
        # Run a query to get all databases
        result = await run_sql_query("SHOW DATABASES")
        if result.get("success"):
            # Extract database names from the result
            return [row.get("databaseName") for row in result.get("rows", [])]
        else:
            return [f"Error listing databases: {result.get('error')}"]
    except Exception as e:
        return [f"Error: {str(e)}"]

Ahora, vamos a implementar una herramienta para obtener el esquema de la tabla:

@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
    """Get the schema of a specific table.

    Args:
        database_name: Name of the database
        table_name: Name of the table

    Returns:
        Dictionary containing table schema information
    """
    try:
        # Run a query to get the table schema
        result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
        if result.get("success"):
            return {
                "database": database_name,
                "table": table_name,
                "columns": result.get("rows", [])
            }
        else:
            return {"error": result.get("error")}
    except Exception as e:
        return {"error": str(e)}

Paso 4: Adición de herramientas de gestión de trabajos

Vamos a añadir herramientas para gestionar y consultar los trabajos de Databricks:

@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
    """List jobs in the Databricks workspace.

    Args:
        limit: Maximum number of jobs to return

    Returns:
        List of job details
    """
    try:
        jobs = workspace.jobs.list(limit=limit)
        result = []
        for job in jobs:
            result.append({
                "job_id": job.job_id,
                "name": job.settings.name,
                "created_by": job.created_by,
                "created_time": job.created_time
            })
        return result
    except Exception as e:
        return [{"error": str(e)}]

@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
    """Get recent runs for a specific job.

    Args:
        job_id: ID of the job
        limit: Maximum number of runs to return

    Returns:
        Dictionary containing job run details
    """
    try:
        runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)

        result = {
            "job_id": job_id,
            "runs": []
        }

        for run in runs.runs:
            run_info = {
                "run_id": run.run_id,
                "state": run.state.life_cycle_state,
                "result_state": run.state.result_state,
                "start_time": run.start_time,
                "setup_duration": run.setup_duration,
                "execution_duration": run.execution_duration,
                "cleanup_duration": run.cleanup_duration
            }
            result["runs"].append(run_info)

        return result
    except Exception as e:
        return {"error": str(e)}

Paso 5: Implementación de herramientas de gestión de clústeres

Ahora vamos a añadir herramientas para interactuar con los clústeres de Databricks:

@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
    """List all clusters in the Databricks workspace.

    Returns:
        List of cluster details
    """
    try:
        clusters = workspace.clusters.list()
        result = []

        for cluster in clusters:
            cluster_info = {
                "cluster_id": cluster.cluster_id,
                "cluster_name": cluster.cluster_name,
                "state": cluster.state,
                "creator_username": cluster.creator_user_name,
                "spark_version": cluster.spark_version,
                "node_type_id": cluster.node_type_id
            }
            result.append(cluster_info)

        return result
    except Exception as e:
        return [{"error": str(e)}]

Paso 6: Ejecución del servidor MCP

Finalmente, añade el código para ejecutar el servidor MCP:

if __name__ == "__main__":
    # Run server with stdio transport for Claude Desktop compatibility
    # or HTTP for web-based clients
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "--http":
        # Run as HTTP server
        mcp.run(transport='http', host="127.0.0.1", port=8000)
    else:
        # Default to stdio for Claude Desktop
        mcp.run(transport='stdio')

Paso 7: Configuración del cliente MCP

Para utilizar el Databricks MCP Server con un cliente MCP como Claude Desktop, tendrás que crear una configuración:

{
  "mcpServers": {
    "databricks": {
      "command": "python",
      "args": ["databricks_mcp.py"]
    }
  }
}

Si prefieres utilizar el transporte HTTP, puedes ejecutar el servidor en un proceso separado y configurar el cliente en consecuencia:

{
  "mcpServers": {
    "databricks": {
      "url": "<http://localhost:8000>"
    }
  }
}

Paso 8: Construcción de una aplicación cliente (opcional)

Para una experiencia más personalizada, puedes construir una aplicación cliente que aproveche el Databricks MCP Server. Aquí tienes un ejemplo de un cliente basado en Python que se conecta tanto al servidor MCP como al modelo Meta Llama de Databricks:

import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any

from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters

class DatabricksMCPClient:
    def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()

        # Initialize Databricks client
        self.workspace = WorkspaceClient()
        self.llm_endpoint_name = llm_endpoint_name
        self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
        print(f"Initialized OpenAI-compatible client for {llm_endpoint_name}")

    async def connect_to_server(self, server_script_path: str):
        """Connect to the Databricks MCP server"""
        server_params = StdioServerParameters(
            command="python",
            args=[server_script_path],
            env=None
        )
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\\\\nConnected to Databricks MCP server with tools:", [tool.name for tool in tools])

        # Display tool descriptions
        print("\\\\nAvailable Databricks Tools:")
        for tool in tools:
            print(f"- {tool.name}: {tool.description}")

    # Add additional methods for processing queries, etc.

async def main():
    client = DatabricksMCPClient()
    try:
        await client.connect_to_server("databricks_mcp.py")
        # Add interactive loop or other functionality
    finally:
        await client.exit_stack.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Mejores prácticas para utilizar Databricks MCP Server

  1. La seguridad es lo primero: Aplica siempre controles de acceso y valida las consultas SQL para evitar operaciones destructivas.
  2. Manejo de errores: Implementa un manejo de errores robusto tanto en las aplicaciones del servidor como del cliente.
  3. Limitación de velocidad: Añade limitación de velocidad para evitar llamadas API excesivas a Databricks.
  4. Registro: Implementa un registro exhaustivo para fines de depuración y auditoría.
  5. Paginación: Para conjuntos de resultados grandes, implementa la paginación para evitar problemas de memoria.
  6. Credenciales seguras: Utiliza variables de entorno o almacenamiento seguro de credenciales en lugar de codificar las credenciales.
  7. Documentación de herramientas: Escribe descripciones claras y detalladas para cada herramienta para ayudar al LLM a utilizarlas correctamente.

Características avanzadas

Añadir soporte para Delta Live Tables

@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
    """List all Delta Live Table pipelines in the workspace.

    Returns:
        List of pipeline details
    """
    try:
        pipelines = workspace.pipelines.list()
        result = []

        for pipeline in pipelines:
            pipeline_info = {
                "pipeline_id": pipeline.pipeline_id,
                "name": pipeline.name,
                "state": pipeline.state,
                "creator": pipeline.creator,
                "target": pipeline.target
            }
            result.append(pipeline_info)

        return result
    except Exception as e:
        return [{"error": str(e)}]

Añadir gestión de cuadernos

@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
    """List notebooks in a specified path in the workspace.

    Args:
        path: Workspace path to list notebooks from

    Returns:
        List of notebook details
    """
    try:
        objects = workspace.workspace.list(path)
        notebooks = []

        for obj in objects:
            if obj.object_type == "NOTEBOOK":
                notebooks.append({
                    "path": obj.path,
                    "name": obj.path.split("/")[-1],
                    "language": obj.language
                })

        return notebooks
    except Exception as e:
        return [{"error": str(e)}]

Conclusión

El Databricks MCP Server proporciona una forma poderosa de cerrar la brecha entre los modelos de IA y la infraestructura de datos de tu organización. Siguiendo esta guía completa, has aprendido a:

  1. Configurar y configurar el Databricks MCP Server
  2. Implementar herramientas centrales para la ejecución de consultas SQL y la exploración de bases de datos
  3. Añadir capacidades de gestión de trabajos y clústeres
  4. Configurar clientes MCP para interactuar con tu servidor
  5. Opcionalmente, construir una aplicación cliente personalizada

Esta integración permite a tus asistentes de IA aprovechar directamente las potentes capacidades de procesamiento de datos de Databricks, permitiendo un análisis de datos, una monitorización y una automatización más sofisticados. A medida que el ecosistema del Protocolo de Contexto de Modelo continúa evolucionando, tu Databricks MCP Server puede ampliarse con herramientas y capacidades adicionales para satisfacer las necesidades específicas de tu organización.

Al adoptar esta integración, estás dando un paso significativo hacia la creación de sistemas de IA más inteligentes y conscientes de los datos que pueden proporcionar conocimientos más profundos y una asistencia más eficaz para tus flujos de trabajo de análisis de datos.

Explore more

Postman con una interfaz en español: Descargar gratis

Postman con una interfaz en español: Descargar gratis

Postman carece de una interfaz de usuario en español, lo que complica la colaboración y la eficiencia. Apidog emerge como la alternativa definitiva, ofreciendo una experiencia de desarrollo de API totalmente en español.

1 August 2025

Cómo usar Ollama: Guía Completa para Principiantes sobre LLMs Locales con Ollama

Cómo usar Ollama: Guía Completa para Principiantes sobre LLMs Locales con Ollama

El panorama de la inteligencia artificial evoluciona constantemente, y los Grandes Modelos de Lenguaje (LLM) se vuelven cada vez más potentes y accesibles. Aunque muchos interactúan con estos modelos a través de servicios basados en la nube, existe un movimiento creciente enfocado en ejecutarlos directamente en computadoras personales. Aquí es donde entra Ollama. Ollama es una herramienta potente pero fácil de usar, diseñada para simplificar drásticamente el complejo proceso de descargar, config

28 April 2025

¿Dónde Descargar Swagger UI en Español Gratis?

¿Dónde Descargar Swagger UI en Español Gratis?

¿Necesitas Swagger UI en español? Este artículo explica por qué no existe una descarga oficial gratuita y cómo habilitar la traducción. Explora las características de Swagger y por qué Apidog es la alternativa superior para diseño, pruebas y documentación API integrados.

23 April 2025

Practica el diseño de API en Apidog

Descubre una forma más fácil de construir y usar APIs