MiroFish สร้างโลกคู่ขนานดิจิทัล ได้อย่างไร

Ashley Innocent

Ashley Innocent

19 March 2026

MiroFish สร้างโลกคู่ขนานดิจิทัล ได้อย่างไร

Apidog สำหรับองค์กร

ติดตั้งภายในองค์กร

SSO & RBAC

รองรับ SOC 2

สำรวจ Apidog Enterprise

บทนำ

โซเชียลมีเดียเคลื่อนไหวรวดเร็ว โพสต์เดียวสามารถกระตุ้นปฏิกิริยา การปรับเปลี่ยน และการเคลื่อนไหวตอบโต้ที่ไม่มีใครคาดคิดได้ จะเป็นอย่างไรถ้าคุณสามารถเห็นว่าสถานการณ์จะคลี่คลายลงอย่างไรก่อนที่จะเกิดขึ้นจริงในโลก?

MiroFish ทำเช่นนั้นได้อย่างแม่นยำ มันคือเอ็นจิ้นปัญญาแบบฝูงที่สร้างโลกดิจิทัลคู่ขนานที่ซึ่งเอเจนต์ AI นับพันตัวซึ่งมีบุคลิก ความทรงจำ และรูปแบบพฤติกรรมที่แตกต่างกันมีปฏิสัมพันธ์กันอย่างอิสระ คุณอัปโหลดข้อมูลเริ่มต้น—เช่น ข่าว บทความ ร่างนโยบาย หรือแม้แต่นวนิยาย—แล้ว MiroFish จะสร้างการจำลองสถานการณ์ที่มีความแม่นยำสูงว่าเหตุการณ์ต่างๆ อาจคลี่คลายลงอย่างไร

💡
การสร้าง MiroFish ต้องอาศัยรากฐานการทดสอบ API ที่เชื่อถือได้ ทีมงานใช้ Apidog ในการออกแบบ ดีบั๊ก และจัดทำเอกสาร API แบ็กเอนด์ทั้งหมดก่อนที่จะเขียนตรรกะการจำลอง ซึ่งช่วยให้ตรวจพบปัญหาของเอนด์พอยต์ได้ตั้งแต่เนิ่นๆ และทำให้แบ็กเอนด์ Python และฟรอนต์เอนด์ Vue ทำงานสอดคล้องกันตลอดการพัฒนา
ปุ่ม

โพสต์นี้จะเจาะลึกสถาปัตยกรรมทางเทคนิคเบื้องหลัง MiroFish คุณจะได้เรียนรู้ว่าระบบเปลี่ยนเอกสารดิบให้เป็นการจำลองที่มีชีวิตได้อย่างไร เอเจนต์ตัดสินใจได้อย่างไร และเวิร์กโฟลว์ห้าขั้นตอนจัดระเบียบทุกอย่างตั้งแต่การสร้างกราฟความรู้ไปจนถึงการตรวจสอบแบบเรียลไทม์ได้อย่างไร

ภาพรวมระบบ: เวิร์กโฟลว์ห้าขั้นตอน

MiroFish ประมวลผลการจำลองผ่านห้าขั้นตอนที่แตกต่างกัน:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

ขั้นตอนที่ 1: การสร้าง Ontology

ระบบจะวิเคราะห์เอกสารอินพุตและข้อกำหนดการจำลองของคุณ จากนั้นใช้ LLM เพื่อสร้าง Ontology ที่กำหนดเอง ซึ่งกำหนดสิ่งต่อไปนี้:

ตัวสร้าง Ontology บังคับใช้โครงสร้างสองชั้น: 8 ประเภทเฉพาะตามเนื้อหาของคุณ บวกกับ 2 ประเภทสำรอง (Person และ Organization) เพื่อครอบคลุมสิ่งที่ไม่เข้ากับประเภทอื่น

ขั้นตอนที่ 2: การสร้าง GraphRAG

เอกสารจะถูกแบ่งเป็นชิ้นย่อย (500 อักขระ, ซ้อนทับกัน 50) และส่งไปยัง Zep Cloud เป็นชุด ระบบจะดำเนินการดังนี้:

  1. สร้างกราฟอิสระด้วย ID ที่ไม่ซ้ำกัน
  2. ตั้งค่า ontology ที่กำหนดเอง
  3. ส่งชุดข้อความเพื่อแยกเอนทิตีและความสัมพันธ์
  4. รอให้ Zep ประมวลผลแต่ละตอน
  5. เรียกคืนกราฟสุดท้ายพร้อมโหนดและเอดจ์

ขั้นตอนที่ 3: การตั้งค่าสภาพแวดล้อม

ตัวสร้างการกำหนดค่าการจำลองจะวิเคราะห์กราฟความรู้และสร้างพารามิเตอร์เอเจนต์โดยละเอียด:

ขั้นตอนที่ 4: การเรียกใช้การจำลอง

เอเจนต์จะตื่นขึ้นตามตารางกิจกรรมและเริ่มโพสต์ แสดงความคิดเห็น และตอบสนอง ระบบจะเรียกใช้การจำลองแบบคู่ขนานบน Twitter และ Reddit โดยบันทึกทุกการกระทำลงในไฟล์ JSONL แบบเรียลไทม์

ขั้นตอนที่ 5: การสร้างรายงาน

เอเจนต์รายงานใช้เครื่องมือการเรียกค้นข้อมูลหลักสามอย่างเพื่อวิเคราะห์สิ่งที่เกิดขึ้น:

เจาะลึกทางเทคนิค: การสร้าง Ontology

ตัวสร้าง Ontology อยู่ที่ backend/app/services/ontology_generator.py ซึ่งใช้พร้อมต์ระบบที่สร้างขึ้นอย่างพิถีพิถันเพื่อบังคับใช้กฎที่เข้มงวด

พร้อมต์ของระบบประกอบด้วยคำแนะนำที่ครอบคลุมเกี่ยวกับสิ่งใดที่ถือเป็นเอนทิตีที่ถูกต้อง (บุคคล, องค์กร, สำนักข่าว) เทียบกับสิ่งที่ไม่ใช่ (แนวคิดเชิงนามธรรม, ธีม, มุมมอง) ความแตกต่างนี้มีความสำคัญเนื่องจากการจำลองต้องมีเอเจนต์ที่สามารถพูดและกระทำบนโซเชียลมีเดียได้จริง

หลังจาก LLM สร้าง ontology แล้ว เมธอด _validate_and_process จะบังคับใช้ข้อจำกัด:

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

เลเยอร์การตรวจสอบนี้ช่วยให้มั่นใจว่าผลลัพธ์ทำงานร่วมกับขีดจำกัด API ของ Zep ได้เสมอ ในขณะที่ยังคงรักษาสถาปัตยกรรมสองชั้นไว้

การสร้างกราฟความรู้: การผสานรวม Zep

บริการสร้างกราฟ (backend/app/services/graph_builder.py) จัดการเวิร์กโฟลว์แบบอะซิงโครนัส:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)

การสร้างโมเดล Pydantic แบบไดนามิก

ส่วนหนึ่งที่ชาญฉลาด: ระบบสร้างโมเดล Pydantic แบบไดนามิกสำหรับแต่ละประเภทเอนทิตีในขณะรันไทม์:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

สิ่งนี้ช่วยให้ Zep ตรวจสอบคุณสมบัติของเอนทิตีกับสคีมาที่กำหนดเองได้ โดยไม่จำเป็นต้องมีโมเดลที่กำหนดไว้ล่วงหน้า

การแบ่งหน้าผ่านกราฟขนาดใหญ่

Zep ส่งคืนผลลัพธ์แบบแบ่งหน้า ยูทิลิตี zep_paging.py จะเรียกข้อมูลทั้งหมด:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

การจำลองกิจกรรมเอเจนต์ตามเวลา

ตัวสร้างการกำหนดค่าการจำลอง (backend/app/services/simulation_config_generator.py) สร้างรูปแบบกิจกรรมที่สมจริงตามพฤติกรรมของเขตเวลาจีน:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

ประเภทเอเจนต์ที่แตกต่างกันจะได้รับรูปแบบที่แตกต่างกัน:

ประเภทเอเจนต์ ระดับกิจกรรม ชั่วโมงที่ใช้งาน ความล่าช้าในการตอบสนอง อิทธิพล
มหาวิทยาลัย 0.2 9-17 60-240 นาที 3.0
สำนักข่าว 0.5 7-23 5-30 นาที 2.5
นักเรียน 0.8 8-12, 18-23 1-15 นาที 0.8
ศาสตราจารย์ 0.4 8-21 15-90 นาที 2.0

ตัวสร้างการกำหนดค่าใช้การเรียก LLM เพื่อปรับแต่งค่าเหล่านี้ตามสถานการณ์เฉพาะของคุณ จากนั้นจะกลับไปใช้ค่าเริ่มต้นตามกฎหาก LLM ล้มเหลว

การติดตามการกระทำแบบเรียลไทม์

ตัวเรียกใช้การจำลอง (backend/app/services/simulation_runner.py) ตรวจสอบกิจกรรมของเอเจนต์โดยการสตรีมบันทึก JSONL:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

สิ่งนี้ทำงานในเธรดเบื้องหลัง โดยอัปเดตสถานะการจำลองทุก 2 วินาที ฟรอนต์เอนด์จะดึงสถานะนี้เพื่อแสดงความคืบหน้าแบบเรียลไทม์

การจัดการกระบวนการข้ามแพลตฟอร์ม

การหยุดการจำลองต้องอาศัยการจัดการกระบวนการอย่างรอบคอบทั้งบน Windows และ Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

ตัวจัดการการล้างข้อมูลจะลงทะเบียนตัวจัดการสัญญาณสำหรับ SIGINT, SIGTERM และ SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

สิ่งนี้ทำให้แน่ใจว่าการจำลองจะหยุดทำงานอย่างราบรื่นเมื่อเซิร์ฟเวอร์ปิดตัวลง

การสร้างรายงาน: การเรียกค้นแบบสามระดับ

บริการเครื่องมือ Zep (backend/app/services/zep_tools.py) มีฟังก์ชันการเรียกค้นสามอย่าง:

InsightForge (การเจาะลึก)

แยกคำถามที่ซับซ้อนออกเป็นคำค้นย่อย ค้นหาแต่ละคำค้นย่อย จากนั้นรวบรวมข้อมูล:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (ครอบคลุมทั้งหมด)

เรียกข้อมูลทุกอย่าง รวมถึงข้อเท็จจริงทางประวัติศาสตร์ที่หมดอายุ/ไม่ถูกต้อง:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (เรียลไทม์)

เรียกใช้ OASIS interview API จริงเพื่อพูดคุยกับเอเจนต์ที่ใช้งานอยู่:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

การตัดสินใจทางวิศวกรรมที่สำคัญ

1. การจัดการงานแบบ Async

การดำเนินการที่ใช้เวลานาน (การสร้างกราฟ, การเรียกใช้การจำลอง) ใช้งายแบบอะซิงโครนัสพร้อมการติดตามความคืบหน้า:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

ฟรอนต์เอนด์จะสอบถามสถานะของงานผ่าน /api/graph/task/{task_id}

2. การเรียกใช้ LLM แบบแบทช์พร้อมการลองใหม่

การสร้างการกำหนดค่าจะแบ่งรายการเอเจนต์ขนาดใหญ่ออกเป็นชุดละ 15 รายการ:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

แต่ละชุดจะรวมตรรกะการซ่อมแซม JSON สำหรับผลลัพธ์ที่ถูกตัด:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. การจำลองแบบคู่ขนานหลายแพลตฟอร์ม

Twitter และ Reddit ทำงานแบบคู่ขนานด้วยฐานข้อมูลและบันทึกการกระทำที่แยกจากกัน:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

ตัวเรียกใช้จะตรวจจับการเสร็จสิ้นต่อแพลตฟอร์มผ่านเหตุการณ์ simulation_end

ข้อควรพิจารณาด้านประสิทธิภาพ

การจัดการหน่วยความจำ

การแยกฐานข้อมูล

แต่ละแพลตฟอร์มใช้ฐานข้อมูล SQLite ของตนเองเพื่อหลีกเลี่ยงการแย่งชิงการล็อกระหว่างการเขียนแบบขนาน

การลดทอนประสิทธิภาพอย่างนุ่มนวล

เมื่อ Zep Search API ล้มเหลว ระบบจะกลับไปใช้การจับคู่คีย์เวิร์ดในเครื่องแทน:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)

บทสรุป

MiroFish แสดงให้เห็นถึงวิธีการสร้างระบบจำลองหลายเอเจนต์ที่สมบูรณ์ตั้งแต่เริ่มต้น เวิร์กโฟลว์ห้าขั้นตอนเปลี่ยนเอกสารดิบให้เป็นโลกดิจิทัลที่มีชีวิต ซึ่งเอเจนต์นับพันมีปฏิสัมพันธ์ตามรูปแบบพฤติกรรมที่สมจริง

ประเด็นสำคัญที่ได้เรียนรู้:

  1. การออกแบบ Ontology มีความสำคัญ: โครงสร้างสองชั้น (8 ประเภทเฉพาะ + 2 ประเภทสำรอง) ช่วยให้ครอบคลุมโดยไม่เกินขีดจำกัดของ API
  2. เวิร์กโฟลว์แบบ Async ช่วยให้การดำเนินการที่ใช้เวลานานเป็นไปได้: การติดตามงานพร้อมการอัปเดตความคืบหน้าทำให้ผู้ใช้ได้รับข้อมูลระหว่างการดำเนินการหลายนาที
  3. กิจกรรมตามเวลาสร้างความสมจริง: รูปแบบเขตเวลาของจีนและตารางเวลาเฉพาะประเภทเอเจนต์ทำให้เกิดพฤติกรรมที่น่าเชื่อถือ
  4. การจำลองแบบสองแพลตฟอร์มช่วยในการเปรียบเทียบ: การเรียกใช้ Twitter และ Reddit แบบคู่ขนานแสดงให้เห็นว่าพลวัตของแพลตฟอร์มส่งผลต่อผลลัพธ์อย่างไร
  5. การเรียกค้นแบบสามระดับตอบสนองความต้องการที่แตกต่างกัน: InsightForge สำหรับความลึก, PanoramaSearch สำหรับความกว้าง, InterviewAgents สำหรับมุมมองโดยตรง

ซอร์สโค้ดฉบับเต็มมีอยู่ที่ github.com/666ghj/MiroFish

ปุ่ม

ต้องการลองใช้ MiroFish ไหม? เยี่ยมชม เดโมสด เพื่อดูการจำลองเหตุการณ์ฮอตสปอตในการทำงาน

ฝึกการออกแบบ API แบบ Design-first ใน Apidog

ค้นพบวิธีที่ง่ายขึ้นในการสร้างและใช้ API

MiroFish สร้างโลกคู่ขนานดิจิทัล ได้อย่างไร