
モデルコンテキストプロトコル(MCP)は、Anthropicによって最初に開発されたオープンプロトコルで、アプリケーションが大規模言語モデル(LLM)にコンテキストを提供する方法を標準化します。Databricks MCPサーバーは、Databricksの強力なデータ分析プラットフォームとAIモデルとの間の橋渡しを行い、LLMがプログラムでDatabricks環境と対話できるようにします。この統合により、AIアシスタントはSQLクエリを実行し、ジョブの実行についての詳細をリストおよび取得し、直接的な人間の介入なしにDatabricksアカウント内のデータにアクセスできます。
この包括的なガイドでは、データ駆動型のAIオペレーションと分析を強化するためにDatabricks MCPサーバーをセットアップ、構成、そして利用するプロセスを説明します。
モデルコンテキストプロトコル(MCP)の理解
Databricks特有の実装に飛び込む前に、MCPとは何か、なぜ重要なのかを理解することが重要です:
MCPは、双方向の通信を持つクライアント/サーバーアーキテクチャを実装しています:
- サーバーは、LLMが使用できるツールをホストします。
- クライアントは、ユーザー、LLM、およびツールの間の会話を調整します。
MCPが価値ある理由は、次の主要な利点を提供することです:
- 関心の分離:ツール開発者は、機能を独立して作成および更新できます。
- モジュール性:システム全体の変更なしに新しいツールを段階的に追加できます。
- 標準化:一貫したインターフェースは、コンポーネント間の統合作業を減らします。
- スケーラビリティ:コンポーネントは、需要に基づいて独立してスケールできます。
Databricks MCPサーバーを実行するための前提条件

Databricks MCPサーバーをセットアップする前に、以下の条件を満たしていることを確認してください:
- 適切な権限を持つDatabricksアカウント
- システムにインストールされたPython 3.8以上
- 認証が構成されたDatabricks CLI
- Databricksワークスペースの基本概念の理解
- SQLとDatabricksジョブへの親しみ
ステップ1:環境のセットアップ
まず、必要なパッケージをインストールして環境を設定しましょう:
# 仮想環境を作成
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # Windowsの場合:databricks-mcp-env\\\\Scripts\\\\activate
# 必要なパッケージをインストール
pip install databricks-sdk mcp-server httpx fastapi uvicorn
ステップ2:Databricks MCPサーバーの作成
次に、次のコード構造でdatabricks_mcp.py
という新しいファイルを作成します:
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# FastMCPサーバーの初期化
mcp = FastMCP("databricks")
# Databricksクライアントの初期化
workspace = WorkspaceClient()
ステップ3:DatabricksのためのコアMCPツールの実装
次に、LLMがDatabricks環境と対話できるようにするためのコアツールを実装しましょう:
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""Databricks SQLウェアハウスでSQLクエリを実行します。
引数:
query:実行するSQLクエリ
warehouse_id:使用するSQLウェアハウスのオプションのID
戻り値:
クエリ結果を含む辞書
"""
# セキュリティのためクエリを検証
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "潜在的に破壊的なSQL操作は許可されていません"}
try:
# クエリを実行
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# 結果を処理
if statement.status.state == StatementState.SUCCEEDED:
# 結果をより使いやすい形式に変換
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
次に、利用可能なデータベースをリストするツールを追加しましょう:
@mcp.tool()
async def list_databases() -> List[str]:
"""Databricksワークスペース内のすべての利用可能なデータベースをリスト表示します。
戻り値:
データベース名のリスト
"""
try:
# すべてのデータベースを取得するためにクエリを実行
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# 結果からデータベース名を抽出
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"データベースのリスト表示エラー: {result.get('error')}"]
except Exception as e:
return [f"エラー: {str(e)}"]
次に、テーブルスキーマを取得するためのツールを実装しましょう:
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""特定のテーブルのスキーマを取得します。
引数:
database_name: データベースの名前
table_name: テーブルの名前
戻り値:
テーブルスキーマ情報を含む辞書
"""
try:
# テーブルスキーマを取得するためにクエリを実行
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
ステップ4:ジョブ管理ツールの追加
Databricksジョブを管理およびクエリするためのツールを追加しましょう:
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""Databricksワークスペース内のジョブをリスト表示します。
引数:
limit: 返されるジョブの最大数
戻り値:
ジョ詳細のリスト
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""特定のジョブの最近の実行を取得します。
引数:
job_id: ジョブのID
limit: 返される実行の最大数
戻り値:
ジョブ実行の詳細を含む辞書
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
ステップ5:クラスタ管理ツールの実装
次に、Databricksクラスタと対話するツールを追加しましょう:
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""Databricksワークスペース内のすべてのクラスタをリスト表示します。
戻り値:
クラスタ詳細のリスト
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
ステップ6:MCPサーバーの実行
最後に、MCPサーバーを実行するコードを追加します:
if __name__ == "__main__":
# Claude Desktopとの互換性のためにstdioトランスポートでサーバーを実行します
# または、Webベースのクライアント用にHTTPを使用します
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# HTTPサーバーとして実行
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# デフォルトでClaude Desktopのためにstdioを使用
mcp.run(transport='stdio')
ステップ7:MCPクライアントの構成
Claude DesktopのようなMCPクライアントでDatabricks MCPサーバーを使用するには、構成を作成する必要があります:
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
HTTPトランスポートを使用したい場合は、サーバーを別のプロセスで実行し、クライアントを適切に構成することができます:
{
"mcpServers": {
"databricks": {
"url": ""
}
}
}
ステップ8:クライアントアプリケーションの構築(オプション)
よりカスタマイズされた体験のために、Databricks MCPサーバーを活用するクライアントアプリケーションを構築することができます。以下は、MCPサーバーおよびDatabricksのMeta Llamaモデルに接続するPythonベースのクライアントの例です:
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# セッションおよびクライアントオブジェクトを初期化
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Databricksクライアントを初期化
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"{llm_endpoint_name}のためにOpenAI互換クライアントを初期化しました。")
async def connect_to_server(self, server_script_path: str):
"""Databricks MCPサーバーに接続します"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# 利用可能なツールをリスト表示
response = await self.session.list_tools()
tools = response.tools
print("\\nDatabricks MCPサーバーに接続してツールが利用可能です:", [tool.name for tool in tools])
# ツールの説明を表示
print("\\n利用可能なDatabricksツール:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# クエリ処理などのための追加のメソッドを追加
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# インタラクティブループまたは他の機能を追加
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
Databricks MCPサーバーを使用するためのベストプラクティス
- セキュリティ第一:常にアクセス制御を適用し、破壊的操作を防ぐためにSQLクエリを検証します。
- エラーハンドリング:サーバーおよびクライアントアプリケーションの両方で堅牢なエラーハンドリングを実装します。
- レート制限:過剰なAPI呼び出しを防ぐためにレート制限を追加します。
- ロギング:デバッグおよび監査目的のために包括的なロギングを実装します。
- ページネーション:大きな結果セットに対しては、メモリ問題を回避するためにページネーションを実装します。
- 安全な資格情報:資格情報をハードコーディングするのではなく、環境変数または安全な資格情報ストレージを使用します。
- ツールの文書化:各ツールの明確で詳細な説明を作成し、LLMがそれらを正しく使用できるようにします。
高度な機能
デルタライブテーブルのサポート追加
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""ワークスペース内のすべてのDelta Liveテーブルパイプラインをリスト表示します。
戻り値:
パイプライン詳細のリスト
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
ノートブック管理の追加
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""ワークスペースの指定されたパス内のノートブックをリスト表示します。
引数:
path: ノートブックをリスト表示するワークスペースのパス
戻り値:
ノートブック詳細のリスト
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
結論
Databricks MCPサーバーは、AIモデルと組織のデータインフラストラクチャ間のギャップを埋める強力な方法を提供します。この包括的なガイドに従うことで、次のことを学びました:
- Databricks MCPサーバーをセットアップおよび構成する
- SQLクエリの実行とデータベース探索のためのコアツールを実装する
- ジョブおよびクラスタ管理機能を追加する
- サーバーと対話するためにMCPクライアントを構成する
- 任意でカスタムクライアントアプリケーションを構築する
この統合により、AIアシスタントはDatabricksの強力なデータ処理機能を直接活用できるようになり、より洗練されたデータ分析、モニタリング、そして自動化が可能になります。モデルコンテキストプロトコルのエコシステムが進化し続ける中で、Databricks MCPサーバーは、組織の特定のニーズを満たすために追加のツールや機能で拡張できます。
この統合を受け入れることで、データ分析ワークフローにおけるより深い洞察とより効果的な支援を提供できる、よりインテリジェントでデータを意識したAIシステムを構築するための重要なステップを踏むことができます。