كيف تصنع MiroFish عوالم رقمية موازية؟

Ashley Innocent

Ashley Innocent

19 مارس 2026

كيف تصنع MiroFish عوالم رقمية موازية؟

Apidog للمؤسسات

نشر محلي

SSO & RBAC

متوافق مع SOC 2

استكشاف Apidog Enterprise

مقدمة

وسائل التواصل الاجتماعي سريعة الحركة. يمكن لمنشور واحد أن يطلق موجات متتالية من ردود الفعل، وإعادة التشكيل، والتحركات المضادة التي لم يتوقعها أحد. ماذا لو كان بإمكانك رؤية كيف يتطور سيناريو قبل حدوثه في العالم الحقيقي؟

هذا بالضبط ما يفعله MiroFish. إنه محرك ذكاء أسراب ينشئ عوالم رقمية متوازية تتفاعل فيها آلاف من وكلاء الذكاء الاصطناعي ذوي شخصيات وذكريات وأنماط سلوكية مميزة بحرية. تقوم بتحميل مادة أولية—مقالة إخبارية، مسودة سياسة، وحتى رواية—ويقوم MiroFish ببناء محاكاة عالية الدقة لكيفية تطور الأحداث.

💡
تطلب بناء MiroFish أساسًا موثوقًا لاختبار واجهات برمجة التطبيقات (API). استخدم الفريق Apidog لتصميم وتصحيح وتوثيق جميع واجهات برمجة التطبيقات الخلفية قبل كتابة منطق المحاكاة. وقد أدى ذلك إلى اكتشاف مشكلات نقاط النهاية مبكرًا والحفاظ على تزامن الواجهة الخلفية Python والواجهة الأمامية Vue طوال فترة التطوير.
زر

يشرح هذا المنشور البنية التقنية وراء MiroFish. ستتعلم كيف يحول النظام المستندات الخام إلى محاكاة حية، وكيف يتخذ الوكلاء قراراتهم، وكيف تقوم سير العمل ذو الخطوات الخمس بتنظيم كل شيء بدءًا من بناء الرسم البياني المعرفي وصولاً إلى المراقبة في الوقت الفعلي.

نظرة عامة على النظام: سير العمل ذو الخطوات الخمس

يعالج MiroFish المحاكاة عبر خمس مراحل مميزة:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

الخطوة 1: توليد الأنطولوجيا

يحلل النظام مستندات الإدخال ومتطلبات المحاكاة الخاصة بك، ثم يستخدم نموذج لغة كبير (LLM) لتوليد أنطولوجيا مخصصة. تحدد هذه الأنطولوجيا:

يفرض مولد الأنطولوجيا بنية من مستويين: 8 أنواع محددة بناءً على المحتوى الخاص بك، بالإضافة إلى نوعين احتياطيين (Person وOrganization) لالتقاط أي شيء لا يتناسب مع أماكن أخرى.

الخطوة 2: بناء GraphRAG

يتم تقسيم المستندات إلى أجزاء (500 حرف، بتداخل 50 حرفًا) وإرسالها إلى Zep Cloud على دفعات. يقوم النظام بـ:

  1. إنشاء رسم بياني مستقل بمعرف فريد
  2. تعيين الأنطولوجيا المخصصة
  3. إرسال دفعات نصية لاستخراج الكيانات والعلاقات
  4. الانتظار حتى يعالج Zep كل حلقة
  5. استرداد الرسم البياني النهائي بالعقد والحواف

الخطوة 3: إعداد البيئة

يحلل مولد تكوين المحاكاة الرسم البياني المعرفي وينشئ معلمات وكيل مفصلة:

الخطوة 4: تشغيل المحاكاة

يستيقظ الوكلاء وفقًا لجداول نشاطهم ويبدأون في النشر والتعليق والتفاعل. يدير النظام محاكاة متوازية على تويتر وريديت، ويسجل كل إجراء في ملفات JSONL في الوقت الفعلي.

الخطوة 5: توليد التقرير

يستخدم وكيل التقارير ثلاث أدوات استرجاع أساسية لتحليل ما حدث:

تعمق تقني: توليد الأنطولوجيا

يتواجد مولد الأنطولوجيا في backend/app/services/ontology_generator.py. ويستخدم موجه نظام مصمم بعناية يفرض قواعد صارمة.

يتضمن موجه النظام إرشادات واسعة حول ما يعتبر كيانًا صالحًا (الأشخاص، المنظمات، المنافذ الإعلامية) مقابل ما لا يعتبر كذلك (المفاهيم المجردة، الثيمات، وجهات النظر). هذا التمييز مهم لأن المحاكاة تحتاج إلى وكلاء يمكنهم بالفعل التحدث والتصرف على وسائل التواصل الاجتماعي.

بعد أن يقوم نموذج اللغة الكبير (LLM) بتوليد الأنطولوجيا، تفرض دالة _validate_and_process القيود:

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

تضمن طبقة التحقق هذه أن المخرجات تعمل دائمًا مع حدود واجهة برمجة تطبيقات Zep مع الحفاظ على البنية ذات المستويين.

بناء الرسم البياني المعرفي: تكامل Zep

تتعامل خدمة بناء الرسم البياني (backend/app/services/graph_builder.py) مع سير العمل غير المتزامن:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)

توليد نموذج Pydantic الديناميكي

جزء ذكي واحد: ينشئ النظام ديناميكيًا نماذج Pydantic لكل نوع كيان في وقت التشغيل:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

يتيح ذلك لـ Zep التحقق من سمات الكيان مقابل المخطط المخصص دون الحاجة إلى نماذج معرفة مسبقًا.

التنقل عبر الرسوم البيانية الكبيرة

يعيد Zep نتائج مقسمة على صفحات. تقوم أداة zep_paging.py بجلب كل شيء:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

محاكاة نشاط الوكيل القائم على الوقت

ينشئ مولد تكوين المحاكاة (backend/app/services/simulation_config_generator.py) أنماط نشاط واقعية بناءً على سلوك المنطقة الزمنية الصينية:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

تحصل أنواع الوكلاء المختلفة على أنماط مختلفة:

نوع الوكيل مستوى النشاط ساعات النشاط تأخير الاستجابة التأثير
University 0.2 9-17 60-240 min 3.0
MediaOutlet 0.5 7-23 5-30 min 2.5
Student 0.8 8-12, 18-23 1-15 min 0.8
Professor 0.4 8-21 15-90 min 2.0

يستخدم مولد التكوين استدعاءات LLM لتخصيص هذه القيم بناءً على السيناريو الخاص بك، ثم يعود إلى الإعدادات الافتراضية المستندة إلى القواعد إذا فشل LLM.

تتبع الإجراءات في الوقت الفعلي

يراقب مشغل المحاكاة (backend/app/services/simulation_runner.py) نشاط الوكلاء عن طريق بث سجلات JSONL:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

يعمل هذا في خيط خلفي، ويحدث حالة المحاكاة كل ثانيتين. تستعلم الواجهة الأمامية عن هذه الحالة لعرض التقدم في الوقت الفعلي.

إدارة العمليات عبر المنصات

يتطلب إيقاف المحاكاة إدارة دقيقة للعمليات عبر أنظمة Windows و Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

يسجل معالج التنظيف (cleanup handler) معالجات الإشارات لـ SIGINT و SIGTERM و SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

يضمن هذا توقف المحاكاة بشكل سلس عند إغلاق الخادم.

توليد التقرير: استرجاع ثلاثي الطبقات

توفر خدمة أدوات Zep (backend/app/services/zep_tools.py) ثلاث وظائف استرجاع:

InsightForge (بحث معمق)

يفكك الأسئلة المعقدة إلى استعلامات فرعية، يبحث في كل منها، ثم يجمع النتائج:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (نطاق كامل)

يسترجع كل شيء بما في ذلك الحقائق التاريخية المنتهية الصلاحية/غير الصالحة:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (في الوقت الفعلي)

يستدعي واجهة برمجة تطبيقات OASIS للمقابلة الفعلية للتحدث مع الوكلاء النشطين:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

قرارات هندسية رئيسية

1. إدارة المهام غير المتزامنة

تستخدم العمليات طويلة الأمد (بناء الرسم البياني، تشغيل المحاكاة) مهامًا غير متزامنة مع تتبع التقدم:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

تستعلم الواجهة الأمامية عن حالة المهمة عبر /api/graph/task/{task_id}.

2. استدعاءات LLM المجمعة مع إعادة المحاولة

يقوم توليد التكوين بتقسيم قوائم الوكلاء الكبيرة إلى دفعات من 15:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

تتضمن كل دفعة منطق إصلاح JSON للمخرجات المقطوعة:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. محاكاة متوازية لمنصتين

يعمل تويتر وريديت بالتوازي مع قواعد بيانات وسجلات إجراءات منفصلة:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

يكتشف المشغل الاكتمال لكل منصة عبر أحداث simulation_end.

اعتبارات الأداء

إدارة الذاكرة

عزل قواعد البيانات

تستخدم كل منصة قاعدة بيانات SQLite خاصة بها لتجنب تضارب الأقفال أثناء عمليات الكتابة المتوازية.

التدهور السلس

عند فشل واجهة برمجة تطبيقات Zep Search، يعود النظام إلى مطابقة الكلمات الرئيسية المحلية:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)

الخاتمة

يوضح MiroFish كيفية بناء نظام محاكاة كامل متعدد الوكلاء من الصفر. يحول سير العمل ذو الخطوات الخمس المستندات الخام إلى عوالم رقمية حية تتفاعل فيها آلاف الوكلاء وفقًا لأنماط سلوكية واقعية.

النقاط الرئيسية المستخلصة:

  1. تصميم الأنطولوجيا مهم: تضمن البنية ذات المستويين (8 أنواع محددة + نوعين احتياطيين) التغطية دون تجاوز حدود واجهة برمجة التطبيقات.
  2. سير العمل غير المتزامن يتيح العمليات الطويلة: يحافظ تتبع المهام مع تحديثات التقدم على إطلاع المستخدمين أثناء العمليات التي تستغرق عدة دقائق.
  3. النشاط القائم على الوقت يخلق الواقعية: أنماط المنطقة الزمنية الصينية وجداول زمنية خاصة بنوع الوكيل تنتج سلوكًا واقعيًا.
  4. المحاكاة ثنائية المنصة توفر مقارنة: تشغيل تويتر وريديت بالتوازي يظهر كيف تؤثر ديناميكيات المنصة على النتائج.
  5. الاسترجاع ثلاثي الطبقات يلبي احتياجات مختلفة: InsightForge للعمق، PanoramaSearch للشمولية، InterviewAgents لوجهات النظر المباشرة.

الكود المصدري الكامل متاح على github.com/666ghj/MiroFish.

زر

هل ترغب في تجربة MiroFish؟ قم بزيارة العرض التوضيحي المباشر لمشاهدة محاكاة حدث نقطة ساخنة في العمل.

ممارسة تصميم API في Apidog

اكتشف طريقة أسهل لبناء واستخدام واجهات برمجة التطبيقات