
O Protocócolo de Contexto do Modelo (MCP) é um protocolo aberto originalmente desenvolvido pela Anthropic que padroniza como as aplicações fornecem contexto aos Modelos de Linguagem de Grande Escala (LLMs). O Databricks MCP Server atua como uma ponte entre a poderosa plataforma de análise de dados do Databricks e os modelos de IA, permitindo que os LLMs interajam com seu ambiente Databricks de forma programática. Essa integração permite que assistentes de IA executem consultas SQL, listem e obtenham detalhes das execuções de tarefas e acessem dados dentro de sua conta do Databricks sem intervenção humana direta.
Neste guia abrangente, vamos percorrer o processo de configuração, configuração e utilização do Databricks MCP Server para operações e análises de dados impulsionadas por IA.
Entendendo o Protocolo de Contexto do Modelo (MCP)
Antes de mergulhar na implementação específica do Databricks, é importante entender o que é o MCP e por que ele é relevante:
O MCP implementa uma arquitetura cliente/servidor com comunicação bidirecional:
- O servidor hospeda ferramentas que o LLM pode usar
- O cliente orquestra conversas entre o usuário, o LLM e as ferramentas
Os principais benefícios que tornam o MCP valioso incluem:
- Separação de Preocupações: Os desenvolvedores de ferramentas podem criar e atualizar funcionalidades de forma independente
- Modularidade: Novas ferramentas podem ser adicionadas de forma incremental sem mudanças em todo o sistema
- Padronização: Uma interface consistente significa menos trabalho de integração entre os componentes
- Escalabilidade: Os componentes podem ser escalados independentemente com base na demanda
Pré-requisitos para Executar o Databricks MCP Server

Antes de configurar o Databricks MCP Server, certifique-se de ter:
- Uma conta Databricks com permissões apropriadas
- Python 3.8+ instalado em seu sistema
- CLI do Databricks configurado com autenticação
- Conhecimento básico dos conceitos do workspace do Databricks
- Familiaridade com SQL e tarefas do Databricks
Passo 1: Configurando o Ambiente
Primeiro, vamos configurar nosso ambiente instalando os pacotes necessários:
# Crie um ambiente virtual
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # No Windows: databricks-mcp-env\\\\Scripts\\\\activate
# Instale os pacotes necessários
pip install databricks-sdk mcp-server httpx fastapi uvicorn
Passo 2: Criando o Databricks MCP Server
Crie um novo arquivo chamado databricks_mcp.py
com a seguinte estrutura de código:
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# Inicialize o servidor FastMCP
mcp = FastMCP("databricks")
# Inicialize o cliente Databricks
workspace = WorkspaceClient()
Passo 3: Implementando Ferramentas Básicas do MCP para Databricks
Agora, vamos implementar as ferramentas principais que permitirão que os LLMs interajam com seu ambiente Databricks:
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""Executar uma consulta SQL no warehouse SQL do Databricks.
Args:
query: Consulta SQL a ser executada
warehouse_id: ID opcional do warehouse SQL a ser utilizado
Returns:
Dicionário contendo os resultados da consulta
"""
# Validar a consulta por segurança
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "Operações SQL potencialmente destrutivas não são permitidas"}
try:
# Executar a consulta
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# Processar resultados
if statement.status.state == StatementState.SUCCEEDED:
# Converter resultados para um formato mais utilizável
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
Em seguida, vamos adicionar uma ferramenta para listar os bancos de dados disponíveis:
@mcp.tool()
async def list_databases() -> List[str]:
"""Listar todos os bancos de dados disponíveis no workspace do Databricks.
Returns:
Lista de nomes de bancos de dados
"""
try:
# Executar uma consulta para obter todos os bancos de dados
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# Extrair nomes de bancos de dados do resultado
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"Erro ao listar bancos de dados: {result.get('error')}"]
except Exception as e:
return [f"Erro: {str(e)}"]
Agora, vamos implementar uma ferramenta para obter o esquema de uma tabela:
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""Obter o esquema de uma tabela específica.
Args:
database_name: Nome do banco de dados
table_name: Nome da tabela
Returns:
Dicionário contendo informações do esquema da tabela
"""
try:
# Executar uma consulta para obter o esquema da tabela
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
Passo 4: Adicionando Ferramentas de Gerenciamento de Tarefas
Vamos adicionar ferramentas para gerenciar e consultar tarefas do Databricks:
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""Listar tarefas no workspace do Databricks.
Args:
limit: Número máximo de tarefas a serem retornadas
Returns:
Lista de detalhes da tarefa
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""Obter execuções recentes de uma tarefa específica.
Args:
job_id: ID da tarefa
limit: Número máximo de execuções a serem retornadas
Returns:
Dicionário contendo detalhes da execução da tarefa
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
Passo 5: Implementando Ferramentas de Gerenciamento de Clusters
Agora vamos adicionar ferramentas para interagir com clusters do Databricks:
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""Listar todos os clusters no workspace do Databricks.
Returns:
Lista de detalhes do cluster
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
Passo 6: Executando o MCP Server
Finalmente, adicione o código para executar o MCP server:
if __name__ == "__main__":
# Executar servidor com transporte stdio para compatibilidade com Claude Desktop
# ou HTTP para clientes baseados na web
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# Executar como servidor HTTP
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# Padrão para stdio para Claude Desktop
mcp.run(transport='stdio')
Passo 7: Configurando o Cliente MCP
Para usar o Databricks MCP Server com um cliente MCP como o Claude Desktop, você precisará criar uma configuração:
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
Se você prefere usar o transporte HTTP, pode executar o servidor em um processo separado e configurar o cliente de acordo:
{
"mcpServers": {
"databricks": {
"url": "<http://localhost:8000>"
}
}
}
Passo 8: Construindo um Aplicativo Cliente (Opcional)
Para uma experiência mais personalizada, você pode construir um aplicativo cliente que aproveite o Databricks MCP Server. Aqui está um exemplo de um cliente baseado em Python que se conecta tanto ao servidor MCP quanto ao modelo Meta Llama do Databricks:
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# Inicialize os objetos de sessão e cliente
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Inicialize o cliente Databricks
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"Cliente compatível com OpenAI inicializado para {llm_endpoint_name}")
async def connect_to_server(self, server_script_path: str):
"""Conectar ao servidor Databricks MCP"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# Listar ferramentas disponíveis
response = await self.session.list_tools()
tools = response.tools
print("\\\\nConectado ao servidor Databricks MCP com ferramentas:", [tool.name for tool in tools])
# Exibir descrições das ferramentas
print("\\\\nFerramentas disponíveis no Databricks:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Adicionar métodos adicionais para processamento de consultas, etc.
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# Adicione um loop interativo ou outra funcionalidade
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
Melhores Práticas para Usar o Databricks MCP Server
- Segurança em Primeiro Lugar: Sempre aplique controles de acesso e valide as consultas SQL para evitar operações destrutivas.
- Tratamento de Erros: Implemente um tratamento robusto de erros tanto no servidor quanto nas aplicações clientes.
- Limitação de Taxa: Adicione limitação de taxa para evitar chamadas excessivas de API ao Databricks.
- Registro: Implemente um registro abrangente para fins de depuração e auditoria.
- Paginação: Para grandes conjuntos de resultados, implemente paginação para evitar problemas de memória.
- Credenciais Seguras: Use variáveis de ambiente ou armazenamento seguro de credenciais em vez de codificar credenciais.
- Documentação das Ferramentas: Escreva descrições claras e detalhadas para cada ferramenta para ajudar o LLM a usá-las corretamente.
Recursos Avançados
Adicionando Suporte para Delta Live Tables
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""Listar todos os pipelines de Delta Live Table no workspace.
Returns:
Lista de detalhes do pipeline
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
Adicionando Gerenciamento de Notebooks
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""Listar notebooks em um caminho especificado no workspace.
Args:
path: Caminho do workspace para listar notebooks
Returns:
Lista de detalhes do notebook
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
Conclusão
O Databricks MCP Server fornece uma maneira poderosa de conectar modelos de IA à infraestrutura de dados da sua organização. Ao seguir este guia abrangente, você aprendeu a:
- Configurar e configurar o Databricks MCP Server
- Implementar ferramentas principais para execução de consultas SQL e exploração de bancos de dados
- Adicionar capacidades de gerenciamento de tarefas e clusters
- Configurar clientes MCP para interagir com seu servidor
- Opcionalmente, construir um aplicativo cliente personalizado
Essa integração capacita seus assistentes de IA a aproveitar diretamente as poderosas capacidades de processamento de dados do Databricks, permitindo uma análise de dados mais sofisticada, monitoramento e automação. À medida que o ecossistema do Protocolo de Contexto do Modelo continua a evoluir, seu Databricks MCP Server pode ser expandido com ferramentas e capacidades adicionais para atender às necessidades específicas da sua organização.
Ao abraçar essa integração, você está dando um passo significativo na criação de sistemas de IA mais inteligentes e conscientes dos dados que podem fornecer percepções mais profundas e assistência mais eficaz para seus fluxos de trabalho de análise de dados.