مقدمة
وسائل التواصل الاجتماعي سريعة الحركة. يمكن لمنشور واحد أن يطلق موجات متتالية من ردود الفعل، وإعادة التشكيل، والتحركات المضادة التي لم يتوقعها أحد. ماذا لو كان بإمكانك رؤية كيف يتطور سيناريو قبل حدوثه في العالم الحقيقي؟
هذا بالضبط ما يفعله MiroFish. إنه محرك ذكاء أسراب ينشئ عوالم رقمية متوازية تتفاعل فيها آلاف من وكلاء الذكاء الاصطناعي ذوي شخصيات وذكريات وأنماط سلوكية مميزة بحرية. تقوم بتحميل مادة أولية—مقالة إخبارية، مسودة سياسة، وحتى رواية—ويقوم MiroFish ببناء محاكاة عالية الدقة لكيفية تطور الأحداث.
يشرح هذا المنشور البنية التقنية وراء MiroFish. ستتعلم كيف يحول النظام المستندات الخام إلى محاكاة حية، وكيف يتخذ الوكلاء قراراتهم، وكيف تقوم سير العمل ذو الخطوات الخمس بتنظيم كل شيء بدءًا من بناء الرسم البياني المعرفي وصولاً إلى المراقبة في الوقت الفعلي.

نظرة عامة على النظام: سير العمل ذو الخطوات الخمس
يعالج MiroFish المحاكاة عبر خمس مراحل مميزة:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
الخطوة 1: توليد الأنطولوجيا
يحلل النظام مستندات الإدخال ومتطلبات المحاكاة الخاصة بك، ثم يستخدم نموذج لغة كبير (LLM) لتوليد أنطولوجيا مخصصة. تحدد هذه الأنطولوجيا:
- 10 أنواع كيانات (على سبيل المثال، طالب، أستاذ، جامعة، منفذ إعلامي، وكالة حكومية)
- 10 أنواع علاقات (على سبيل المثال، WORKS_FOR، COMMENTS_ON، RESPONDS_TO)
- السمات لكل نوع (مع تجنب الكلمات المحجوزة مثل
name،uuid،created_at)
يفرض مولد الأنطولوجيا بنية من مستويين: 8 أنواع محددة بناءً على المحتوى الخاص بك، بالإضافة إلى نوعين احتياطيين (Person وOrganization) لالتقاط أي شيء لا يتناسب مع أماكن أخرى.
الخطوة 2: بناء GraphRAG
يتم تقسيم المستندات إلى أجزاء (500 حرف، بتداخل 50 حرفًا) وإرسالها إلى Zep Cloud على دفعات. يقوم النظام بـ:
- إنشاء رسم بياني مستقل بمعرف فريد
- تعيين الأنطولوجيا المخصصة
- إرسال دفعات نصية لاستخراج الكيانات والعلاقات
- الانتظار حتى يعالج Zep كل حلقة
- استرداد الرسم البياني النهائي بالعقد والحواف
الخطوة 3: إعداد البيئة
يحلل مولد تكوين المحاكاة الرسم البياني المعرفي وينشئ معلمات وكيل مفصلة:
- تكوين الوقت بناءً على أنماط المنطقة الزمنية الصينية (ساعات الذروة 19-22، ساعات الخمول 0-5)
- تكوين الأحداث مع المنشورات الأولية والمواضيع الساخنة
- تكوينات نشاط الوكيل (المنشورات في الساعة، تأخيرات الاستجابة، أوزان التأثير)
- تكوينات المنصة لتويتر وريديت مع عتبات انتشار مختلفة
الخطوة 4: تشغيل المحاكاة
يستيقظ الوكلاء وفقًا لجداول نشاطهم ويبدأون في النشر والتعليق والتفاعل. يدير النظام محاكاة متوازية على تويتر وريديت، ويسجل كل إجراء في ملفات JSONL في الوقت الفعلي.
الخطوة 5: توليد التقرير
يستخدم وكيل التقارير ثلاث أدوات استرجاع أساسية لتحليل ما حدث:
- InsightForge: بحث معمق يفكك الأسئلة إلى استعلامات فرعية
- PanoramaSearch: عرض كامل النطاق يتضمن الحقائق التاريخية المنتهية الصلاحية/غير الصالحة
- InterviewAgents: مقابلات في الوقت الفعلي مع الوكلاء النشطين عبر IPC
تعمق تقني: توليد الأنطولوجيا
يتواجد مولد الأنطولوجيا في backend/app/services/ontology_generator.py. ويستخدم موجه نظام مصمم بعناية يفرض قواعد صارمة.
يتضمن موجه النظام إرشادات واسعة حول ما يعتبر كيانًا صالحًا (الأشخاص، المنظمات، المنافذ الإعلامية) مقابل ما لا يعتبر كذلك (المفاهيم المجردة، الثيمات، وجهات النظر). هذا التمييز مهم لأن المحاكاة تحتاج إلى وكلاء يمكنهم بالفعل التحدث والتصرف على وسائل التواصل الاجتماعي.
بعد أن يقوم نموذج اللغة الكبير (LLM) بتوليد الأنطولوجيا، تفرض دالة _validate_and_process القيود:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
تضمن طبقة التحقق هذه أن المخرجات تعمل دائمًا مع حدود واجهة برمجة تطبيقات Zep مع الحفاظ على البنية ذات المستويين.
بناء الرسم البياني المعرفي: تكامل Zep
تتعامل خدمة بناء الرسم البياني (backend/app/services/graph_builder.py) مع سير العمل غير المتزامن:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
توليد نموذج Pydantic الديناميكي
جزء ذكي واحد: ينشئ النظام ديناميكيًا نماذج Pydantic لكل نوع كيان في وقت التشغيل:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
يتيح ذلك لـ Zep التحقق من سمات الكيان مقابل المخطط المخصص دون الحاجة إلى نماذج معرفة مسبقًا.
التنقل عبر الرسوم البيانية الكبيرة
يعيد Zep نتائج مقسمة على صفحات. تقوم أداة zep_paging.py بجلب كل شيء:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
محاكاة نشاط الوكيل القائم على الوقت
ينشئ مولد تكوين المحاكاة (backend/app/services/simulation_config_generator.py) أنماط نشاط واقعية بناءً على سلوك المنطقة الزمنية الصينية:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
تحصل أنواع الوكلاء المختلفة على أنماط مختلفة:
| نوع الوكيل | مستوى النشاط | ساعات النشاط | تأخير الاستجابة | التأثير |
|---|---|---|---|---|
| University | 0.2 | 9-17 | 60-240 min | 3.0 |
| MediaOutlet | 0.5 | 7-23 | 5-30 min | 2.5 |
| Student | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professor | 0.4 | 8-21 | 15-90 min | 2.0 |
يستخدم مولد التكوين استدعاءات LLM لتخصيص هذه القيم بناءً على السيناريو الخاص بك، ثم يعود إلى الإعدادات الافتراضية المستندة إلى القواعد إذا فشل LLM.
تتبع الإجراءات في الوقت الفعلي
يراقب مشغل المحاكاة (backend/app/services/simulation_runner.py) نشاط الوكلاء عن طريق بث سجلات JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
يعمل هذا في خيط خلفي، ويحدث حالة المحاكاة كل ثانيتين. تستعلم الواجهة الأمامية عن هذه الحالة لعرض التقدم في الوقت الفعلي.
إدارة العمليات عبر المنصات
يتطلب إيقاف المحاكاة إدارة دقيقة للعمليات عبر أنظمة Windows و Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
يسجل معالج التنظيف (cleanup handler) معالجات الإشارات لـ SIGINT و SIGTERM و SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
يضمن هذا توقف المحاكاة بشكل سلس عند إغلاق الخادم.
توليد التقرير: استرجاع ثلاثي الطبقات
توفر خدمة أدوات Zep (backend/app/services/zep_tools.py) ثلاث وظائف استرجاع:
InsightForge (بحث معمق)
يفكك الأسئلة المعقدة إلى استعلامات فرعية، يبحث في كل منها، ثم يجمع النتائج:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (نطاق كامل)
يسترجع كل شيء بما في ذلك الحقائق التاريخية المنتهية الصلاحية/غير الصالحة:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (في الوقت الفعلي)
يستدعي واجهة برمجة تطبيقات OASIS للمقابلة الفعلية للتحدث مع الوكلاء النشطين:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
قرارات هندسية رئيسية
1. إدارة المهام غير المتزامنة
تستخدم العمليات طويلة الأمد (بناء الرسم البياني، تشغيل المحاكاة) مهامًا غير متزامنة مع تتبع التقدم:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
تستعلم الواجهة الأمامية عن حالة المهمة عبر /api/graph/task/{task_id}.
2. استدعاءات LLM المجمعة مع إعادة المحاولة
يقوم توليد التكوين بتقسيم قوائم الوكلاء الكبيرة إلى دفعات من 15:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
تتضمن كل دفعة منطق إصلاح JSON للمخرجات المقطوعة:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. محاكاة متوازية لمنصتين
يعمل تويتر وريديت بالتوازي مع قواعد بيانات وسجلات إجراءات منفصلة:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
يكتشف المشغل الاكتمال لكل منصة عبر أحداث simulation_end.
اعتبارات الأداء
إدارة الذاكرة
- يتم اقتصاص المستندات الكبيرة إلى 50 ألف حرف لسياق LLM
- ملخصات الكيانات محدودة بـ 300 حرف لكل منها
- الإجراءات الأخيرة محدودة بـ 50 في الذاكرة (السجل الكامل في ملفات JSONL)
عزل قواعد البيانات
تستخدم كل منصة قاعدة بيانات SQLite خاصة بها لتجنب تضارب الأقفال أثناء عمليات الكتابة المتوازية.
التدهور السلس
عند فشل واجهة برمجة تطبيقات Zep Search، يعود النظام إلى مطابقة الكلمات الرئيسية المحلية:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
الخاتمة
يوضح MiroFish كيفية بناء نظام محاكاة كامل متعدد الوكلاء من الصفر. يحول سير العمل ذو الخطوات الخمس المستندات الخام إلى عوالم رقمية حية تتفاعل فيها آلاف الوكلاء وفقًا لأنماط سلوكية واقعية.
النقاط الرئيسية المستخلصة:
- تصميم الأنطولوجيا مهم: تضمن البنية ذات المستويين (8 أنواع محددة + نوعين احتياطيين) التغطية دون تجاوز حدود واجهة برمجة التطبيقات.
- سير العمل غير المتزامن يتيح العمليات الطويلة: يحافظ تتبع المهام مع تحديثات التقدم على إطلاع المستخدمين أثناء العمليات التي تستغرق عدة دقائق.
- النشاط القائم على الوقت يخلق الواقعية: أنماط المنطقة الزمنية الصينية وجداول زمنية خاصة بنوع الوكيل تنتج سلوكًا واقعيًا.
- المحاكاة ثنائية المنصة توفر مقارنة: تشغيل تويتر وريديت بالتوازي يظهر كيف تؤثر ديناميكيات المنصة على النتائج.
- الاسترجاع ثلاثي الطبقات يلبي احتياجات مختلفة: InsightForge للعمق، PanoramaSearch للشمولية، InterviewAgents لوجهات النظر المباشرة.
الكود المصدري الكامل متاح على github.com/666ghj/MiroFish.
هل ترغب في تجربة MiroFish؟ قم بزيارة العرض التوضيحي المباشر لمشاهدة محاكاة حدث نقطة ساخنة في العمل.
