
بروتوكول سياق النموذج (MCP) هو بروتوكول مفتوح تم تطويره أصلاً بواسطة Anthropic الذي يحدد كيفية تقديم التطبيقات للسياق لنماذج اللغة الكبيرة (LLMs). يعمل خادم Databricks MCP كجسر بين منصة تحليل البيانات القوية من Databricks ونماذج الذكاء الاصطناعي، مما يسمح لـ LLMs بالتفاعل مع بيئة Databricks الخاصة بك برمجيًا. يمكّن هذا التكامل المساعدات الذكية من تنفيذ استعلامات SQL، وقائمة والحصول على تفاصيل تنفيذ المهام، والوصول إلى البيانات داخل حساب Databricks الخاص بك بدون تدخل بشري مباشر.
في هذا الدليل الشامل، سنستعرض عملية إعداد وتكوين واستخدام خادم Databricks MCP لعمليات البيانات والتحليلات المعززة بالذكاء الاصطناعي.
فهم بروتوكول سياق النموذج (MCP)
قبل الغوص في التنفيذ المحدد لـ Databricks، من المهم فهم ما هو MCP ولماذا هو مهم:
ينفذ MCP هيكلية عميل / خادم مع اتصالات ثنائية الاتجاه:
- يستضيف الخادم أدوات يمكن لـ LLM استخدامها
- يدير العميل المحادثات بين المستخدم و LLM والأدوات
تشمل فوائد MCP الرئيسية التي تجعل منه قيمة ما يلي:
- فصل المخاوف: يمكن لمطوري الأدوات إنشاء وتحديث القدرات بشكل مستقل
- الوحدانية: يمكن إضافة أدوات جديدة بشكل تدريجي دون تغيير على مستوى النظام
- التوحيد القياسي: يعني واجهة متسقة أعمال تكامل أقل عبر المكونات
- القابلية للتوسع: يمكن توسيع المكونات بشكل مستقل بناءً على الطلب
متطلبات تشغيل خادم Databricks MCP

قبل إعداد Databricks MCP Server، تأكد من أن لديك:
- حساب Databricks مع الأذونات المناسبة
- Python 3.8+ مثبتًا على نظامك
- تكوين Databricks CLI مع المصادقة
- فهم أساسي لمفاهيم مساحة عمل Databricks
- الإلمام بـ SQL و وظائف Databricks
الخطوة 1: إعداد البيئة
أولاً، دعنا نعد بيئتنا عن طريق تثبيت الحزم اللازمة:
# إنشاء بيئة افتراضية
python -m venv databricks-mcp-env
source databricks-mcp-env/bin/activate # على Windows: databricks-mcp-env\\\\Scripts\\\\activate
# تثبيت الحزم المطلوبة
pip install databricks-sdk mcp-server httpx fastapi uvicorn
الخطوة 2: إنشاء خادم Databricks MCP
قم بإنشاء ملف جديد يسمى databricks_mcp.py مع هيكل الشيفرة التالي:
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import ExecuteStatementRequest, StatementState
# تهيئة خادم FastMCP
mcp = FastMCP("databricks")
# تهيئة عميل Databricks
workspace = WorkspaceClient()
الخطوة 3: تنفيذ أدوات MCP الأساسية لـ Databricks
الآن، دعنا نقوم بتنفيذ الأدوات الأساسية التي ستسمح لـ LLMs بالتفاعل مع بيئة Databricks الخاصة بك:
@mcp.tool()
async def run_sql_query(query: str, warehouse_id: Optional[str] = None) -> Dict[str, Any]:
"""تنفيذ استعلام SQL في مستودع بيانات Databricks SQL.
args:
query: الاستعلام SQL للتنفيذ
warehouse_id: تعريف اختياري لمستودع SQL لاستخدامه
Returns:
كائن قاموس يحتوي على نتائج الاستعلام
"""
# تحقق من صحة الاستعلام لأغراض الأمان
if not query or any(keyword in query.lower() for keyword in ["drop", "delete", "truncate", "alter"]):
return {"error": "العمليات في SQL التي قد تكون مدمرة غير مسموح بها"}
try:
# تنفيذ الاستعلام
statement = workspace.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=query,
wait_timeout=60
)
# معالجة النتائج
if statement.status.state == StatementState.SUCCEEDED:
# تحويل النتائج إلى تنسيق أكثر قابلية للاستخدام
columns = [col.name for col in statement.manifest.schema.columns]
rows = []
for chunk in workspace.statement_execution.get_statement_result_chunks(statement.statement_id):
for row_data in chunk.data:
rows.append(dict(zip(columns, row_data)))
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows)
}
else:
return {
"success": False,
"state": statement.status.state,
"error": statement.status.error
}
except Exception as e:
return {"success": False, "error": str(e)}
بعد ذلك، دعنا نضيف أداة لقائمة قواعد البيانات المتاحة:
@mcp.tool()
async def list_databases() -> List[str]:
"""قائمة جميع قواعد البيانات المتاحة في مساحة عمل Databricks.
Returns:
قائمة بأسماء قواعد البيانات
"""
try:
# تنفيذ استعلام للحصول على جميع قواعد البيانات
result = await run_sql_query("SHOW DATABASES")
if result.get("success"):
# استخراج أسماء قواعد البيانات من النتيجة
return [row.get("databaseName") for row in result.get("rows", [])]
else:
return [f"خطأ في قائمة قواعد البيانات: {result.get('error')}"]
except Exception as e:
return [f"خطأ: {str(e)}"]
الآن، دعنا ننفذ أداة للحصول على مخطط الجدول:
@mcp.tool()
async def get_table_schema(database_name: str, table_name: str) -> Dict[str, Any]:
"""الحصول على مخطط جدول معين.
args:
database_name: اسم قاعدة البيانات
table_name: اسم الجدول
Returns:
كائن قاموس يحتوي على معلومات مخطط الجدول
"""
try:
# تنفيذ استعلام للحصول على مخطط الجدول
result = await run_sql_query(f"DESCRIBE {database_name}.{table_name}")
if result.get("success"):
return {
"database": database_name,
"table": table_name,
"columns": result.get("rows", [])
}
else:
return {"error": result.get("error")}
except Exception as e:
return {"error": str(e)}
الخطوة 4: إضافة أدوات إدارة الوظائف
دعنا نضيف أدوات لإدارة واستعلام وظائف Databricks:
@mcp.tool()
async def list_jobs(limit: int = 100) -> List[Dict[str, Any]]:
"""قائمة الوظائف في مساحة عمل Databricks.
args:
limit: الحد الأقصى لعدد الوظائف لإرجاعها
Returns:
قائمة بتفاصيل الوظيفة
"""
try:
jobs = workspace.jobs.list(limit=limit)
result = []
for job in jobs:
result.append({
"job_id": job.job_id,
"name": job.settings.name,
"created_by": job.created_by,
"created_time": job.created_time
})
return result
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
async def get_job_runs(job_id: int, limit: int = 10) -> Dict[str, Any]:
"""الحصول على عمليات التشغيل الحديثة لوظيفة معينة.
args:
job_id: تعريف الوظيفة
limit: الحد الأقصى لعدد عمليات التشغيل لإرجاعها
Returns:
كائن قاموس يحتوي على تفاصيل تشغيل الوظيفة
"""
try:
runs = workspace.jobs.list_runs(job_id=job_id, limit=limit)
result = {
"job_id": job_id,
"runs": []
}
for run in runs.runs:
run_info = {
"run_id": run.run_id,
"state": run.state.life_cycle_state,
"result_state": run.state.result_state,
"start_time": run.start_time,
"setup_duration": run.setup_duration,
"execution_duration": run.execution_duration,
"cleanup_duration": run.cleanup_duration
}
result["runs"].append(run_info)
return result
except Exception as e:
return {"error": str(e)}
الخطوة 5: تنفيذ أدوات إدارة الكتلة
الآن دعنا نضيف أدوات للتفاعل مع الكتل في Databricks:
@mcp.tool()
async def list_clusters() -> List[Dict[str, Any]]:
"""قائمة بجميع الكتل في مساحة عمل Databricks.
Returns:
قائمة بتفاصيل الكتلة
"""
try:
clusters = workspace.clusters.list()
result = []
for cluster in clusters:
cluster_info = {
"cluster_id": cluster.cluster_id,
"cluster_name": cluster.cluster_name,
"state": cluster.state,
"creator_username": cluster.creator_user_name,
"spark_version": cluster.spark_version,
"node_type_id": cluster.node_type_id
}
result.append(cluster_info)
return result
except Exception as e:
return [{"error": str(e)}]
الخطوة 6: تشغيل خادم MCP
أخيرًا، أضف الشيفرة لتشغيل خادم MCP:
if __name__ == "__main__":
# تشغيل الخادم مع النقل القياسي للتوافق مع Claude Desktop
# أو HTTP للعملاء المستندة إلى الويب
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# التشغيل كخادم HTTP
mcp.run(transport='http', host="127.0.0.1", port=8000)
else:
# الافتراضي إلى stdio لـ Claude Desktop
mcp.run(transport='stdio')
الخطوة 7: تكوين عميل MCP
لاستخدام خادم Databricks MCP مع عميل MCP مثل Claude Desktop، ستحتاج إلى إنشاء تكوين:
{
"mcpServers": {
"databricks": {
"command": "python",
"args": ["databricks_mcp.py"]
}
}
}
إذا كنت تفضل استخدام النقل عبر HTTP، يمكنك تشغيل الخادم في عملية منفصلة وتكوين العميل وفقًا لذلك:
{
"mcpServers": {
"databricks": {
"url": "<http://localhost:8000>"
}
}
}
الخطوة 8: بناء تطبيق عميل (اختياري)
لتجربة أكثر تخصيصًا، يمكنك بناء تطبيق عميل يستفيد من خادم Databricks MCP. إليك مثال على عميل مبني بلغة بايثون يتصل بكل من خادم MCP ونموذج Meta Llama الخاص بـ Databricks:
import asyncio
from contextlib import AsyncExitStack
from typing import Optional, Dict, Any
from databricks.sdk import WorkspaceClient
from mcp.client import ClientSession, stdio_client
from mcp.stdio import StdioServerParameters
class DatabricksMCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-llama3-70b-instruct"):
# تهيئة الجلسة وكائنات العميل
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# تهيئة عميل Databricks
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"تم تهيئة عميل متوافق مع OpenAI لـ {llm_endpoint_name}")
async def connect_to_server(self, server_script_path: str):
"""الاتصال بخادم Databricks MCP"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# قائمة بالأدوات المتاحة
response = await self.session.list_tools()
tools = response.tools
print("\\\\nمتصل بخادم Databricks MCP مع الأدوات:", [tool.name for tool in tools])
# عرض أوصاف الأدوات
print("\\\\nالأدوات المتاحة في Databricks:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# إضافة طرق إضافية لمعالجة الاستعلامات، إلخ.
async def main():
client = DatabricksMCPClient()
try:
await client.connect_to_server("databricks_mcp.py")
# إضافة حلقة تفاعلية أو وظائف أخرى
finally:
await client.exit_stack.aclose()
if __name__ == "__main__":
asyncio.run(main())
أهم الممارسات لاستخدام خادم Databricks MCP
- الأمان أولاً: تطبيق ضوابط الوصول والتحقق من الاستعلامات SQL لمنع العمليات المدمرة.
- التعامل مع الأخطاء: تنفيذ معالجة أخطاء قوية في كل من تطبيقات الخادم والعميل.
- تحديد السرعة: إضافة تحديد للسرعة لمنع المكالمات الزائدة لواجهة برمجة التطبيقات إلى Databricks.
- التسجيل: تنفيذ تسجيل شامل للتصحيح وغرض التدقيق.
- ترقيم الصفحات: لتنفيذ مجموعات النتائج الكبيرة، تنفيذ ترقيم الصفحات لتجنب مشاكل الذاكرة.
- البيانات ذات الأمان: استخدام متغيرات البيئة أو تخزين بيانات الاعتماد الآمنة بدلاً من تشفير بيانات الاعتماد.
- توثيق الأدوات: كتابة أوصاف واضحة ومفصلة لكل أداة لمساعدة LLM في استخدامها بشكل صحيح.
الميزات المتقدمة
إضافة دعم لجداول Delta Live
@mcp.tool()
async def list_delta_pipelines() -> List[Dict[str, Any]]:
"""قائمة جميع خطوط أنابيب Delta Live Table في مساحة العمل.
Returns:
قائمة بتفاصيل خطوط الأنابيب
"""
try:
pipelines = workspace.pipelines.list()
result = []
for pipeline in pipelines:
pipeline_info = {
"pipeline_id": pipeline.pipeline_id,
"name": pipeline.name,
"state": pipeline.state,
"creator": pipeline.creator,
"target": pipeline.target
}
result.append(pipeline_info)
return result
except Exception as e:
return [{"error": str(e)}]
إضافة إدارة دفاتر الملاحظات
@mcp.tool()
async def list_notebooks(path: str = "/") -> List[Dict[str, Any]]:
"""قائمة دفاتر الملاحظات في مسار محدد في مساحة العمل.
args:
path: مسار مساحة العمل لقائمة دفاتر الملاحظات
Returns:
قائمة بتفاصيل دفتر الملاحظات
"""
try:
objects = workspace.workspace.list(path)
notebooks = []
for obj in objects:
if obj.object_type == "NOTEBOOK":
notebooks.append({
"path": obj.path,
"name": obj.path.split("/")[-1],
"language": obj.language
})
return notebooks
except Exception as e:
return [{"error": str(e)}]
الخاتمة
يوفر خادم Databricks MCP طريقة قوية لربط الفجوة بين نماذج الذكاء الاصطناعي والبنية التحتية للبيانات في منظمتك. من خلال اتباع هذا الدليل الشامل، لقد تعلمت كيفية:
- إعداد وتكوين خادم Databricks MCP
- تنفيذ الأدوات الأساسية لتنفيذ استعلامات SQL واستكشاف قواعد البيانات
- إضافة قدرات إدارة الوظائف والكتل
- تكوين عملاء MCP للتفاعل مع خادمك
- اختياريًا بناء تطبيق عميل مخصص
يمكن لهذه التكاملات أن تمكّن مساعديك الذكيين من الاستفادة مباشرةً من قدرات معالجة البيانات القوية من Databricks، مما يتيح تحليلات بيانات أكثر تطوراً، ورصد، وأتمتة. مع استمرار تطور نظام بروتوكول سياق النموذج بشكل مستمر، يمكن توسيع خادم Databricks MCP الخاص بك بأدوات وميزات إضافية لتلبية احتياجات منظمتك الخاصة.
من خلال تبني هذا التكامل، أنت تأخذ خطوة كبيرة نحو إنشاء أنظمة ذكاء اصطناعي أكثر ذكاءً ووعياً بالبيانات يمكنها تزويدك برؤى أعمق ومساعدة أكثر فعالية لعمليات تحليل البيانات الخاصة بك.
