
The Model Context Protocol (MCP) is an open protocol originally developed by Anthropic that standardizes how applications provide context to Large Language Models (LLMs). Databricks MCP Server acts as a bridge between Databricks' powerful data analytics platform and AI models, allowing LLMs to interact with your Databricks environment programmatically. This integration enables AI assistants to run SQL queries, list and get details of job executions, and access data within your Databricks account without direct human intervention.
In this comprehensive guide, we'll walk through the process of setting up, configuring, and utilizing the Databricks MCP Server for enhanced AI-driven data operations and analytics.
Understanding Model Context Protocol (MCP)
Before diving into the Databricks-specific implementation, it's important to understand what MCP is and why it matters:
MCP implements a client/server architecture with bi-directional communication:
- The server hosts tools that the LLM can use
- The client orchestrates conversations between the user, the LLM, and the tools
The key benefits that make MCP valuable include:
- Separation of Concerns: Tool developers can create and update capabilities independently
- Modularity: New tools can be added incrementally without system-wide changes
- Standardization: A consistent interface means less integration work across components
- Scalability: Components can be scaled independently based on demand
Prerequisites to Run Databricks MCP Server

Before setting up the Databricks MCP Server, ensure you have:
- A Databricks account with appropriate permissions
- Python 3.8+ installed on your system
- Databricks CLI configured with authentication
- Basic understanding of Databricks workspace concepts
- Familiarity with SQL and Databricks jobs
Step 1: Setting Up the Environment
First, let's set up our environment by installing the necessary packages:
# Create a virtual environment
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # On Windows: databricks-mcp-env\\\\Scripts\\\\activate
# Install the required packages
pip install databricks-sdk mcp-server httpx fastapi uvicorn
Step 2: Creating the Databricks MCP Server
Create a new file called databricks_mcp.py
with the following code structure:
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# Initialize FastMCP server
mcp = FastMCP("databricks")
# Initialize Databricks client
workspace = WorkspaceClient()
Step 3: Implementing Core MCP Tools for Databricks
Now, let's implement the core tools that will allow LLMs to interact with your Databricks environment:
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""Run a SQL query in Databricks SQL warehouse.
Args:
query: SQL query to execute
warehouse_id: Optional ID of the SQL warehouse to use
Returns:
Dictionary containing query results
"""
# Validate the query for security
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "Potentially destructive SQL operations are not allowed"}
try:
# Execute the query
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# Process results
if statement.status.state == StatementState.SUCCEEDED:
# Convert results to a more usable format
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
Next, let's add a tool to list available databases:
@mcp.tool()
async def list_databases() -> List[str]:
"""List all available databases in the Databricks workspace.
Returns:
List of database names
"""
try:
# Run a query to get all databases
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# Extract database names from the result
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"Error listing databases: {result.get('error')}"]
except Exception as e:
return [f"Error: {str(e)}"]
Now, let's implement a tool to get table schema:
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""Get the schema of a specific table.
Args:
database_name: Name of the database
table_name: Name of the table
Returns:
Dictionary containing table schema information
"""
try:
# Run a query to get the table schema
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
Step 4: Adding Job Management Tools
Let's add tools to manage and query Databricks jobs:
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""List jobs in the Databricks workspace.
Args:
limit: Maximum number of jobs to return
Returns:
List of job details
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""Get recent runs for a specific job.
Args:
job_id: ID of the job
limit: Maximum number of runs to return
Returns:
Dictionary containing job run details
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
Step 5: Implementing Cluster Management Tools
Now let's add tools to interact with Databricks clusters:
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""List all clusters in the Databricks workspace.
Returns:
List of cluster details
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
Step 6: Running the MCP Server
Finally, add the code to run the MCP server:
if __name__ == "__main__":
# Run server with stdio transport for Claude Desktop compatibility
# or HTTP for web-based clients
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# Run as HTTP server
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# Default to stdio for Claude Desktop
mcp.run(transport='stdio')
Step 7: Configuring the MCP Client
To use the Databricks MCP Server with an MCP client like Claude Desktop, you'll need to create a configuration:
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
If you prefer to use the HTTP transport, you can run the server in a separate process and configure the client accordingly:
{
"mcpServers": {
"databricks": {
"url": "<http://localhost:8000>"
}
}
}
Step 8: Building a Client Application (Optional)
For a more customized experience, you can build a client application that leverages the Databricks MCP Server. Here's an example of a Python-based client that connects to both the MCP server and Databricks' Meta Llama model:
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Initialize Databricks client
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"Initialized OpenAI-compatible client for {llm_endpoint_name}")
async def connect_to_server(self, server_script_path: str):
"""Connect to the Databricks MCP server"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\\\\nConnected to Databricks MCP server with tools:", [tool.name for tool in tools])
# Display tool descriptions
print("\\\\nAvailable Databricks Tools:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Add additional methods for processing queries, etc.
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# Add interactive loop or other functionality
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
Best Practices for Using Databricks MCP Server
- Security First: Always apply access controls and validate SQL queries to prevent destructive operations.
- Error Handling: Implement robust error handling in both server and client applications.
- Rate Limiting: Add rate limiting to prevent excessive API calls to Databricks.
- Logging: Implement comprehensive logging for debugging and auditing purposes.
- Pagination: For large result sets, implement pagination to avoid memory issues.
- Secure Credentials: Use environment variables or secure credential storage rather than hardcoding credentials.
- Tool Documentation: Write clear, detailed descriptions for each tool to help the LLM use them correctly.
Advanced Features
Adding Support for Delta Live Tables
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""List all Delta Live Table pipelines in the workspace.
Returns:
List of pipeline details
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
Adding Notebook Management
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""List notebooks in a specified path in the workspace.
Args:
path: Workspace path to list notebooks from
Returns:
List of notebook details
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
Conclusion
The Databricks MCP Server provides a powerful way to bridge the gap between AI models and your organization's data infrastructure. By following this comprehensive guide, you've learned how to:
- Set up and configure the Databricks MCP Server
- Implement core tools for SQL query execution and database exploration
- Add job and cluster management capabilities
- Configure MCP clients to interact with your server
- Optionally build a custom client application
This integration empowers your AI assistants to directly leverage Databricks' powerful data processing capabilities, enabling more sophisticated data analysis, monitoring, and automation. As the Model Context Protocol ecosystem continues to evolve, your Databricks MCP Server can be extended with additional tools and capabilities to meet your organization's specific needs.
By embracing this integration, you're taking a significant step towards creating more intelligent, data-aware AI systems that can provide deeper insights and more effective assistance for your data analytics workflows.