บทนำ
โซเชียลมีเดียเคลื่อนไหวรวดเร็ว โพสต์เดียวสามารถกระตุ้นปฏิกิริยา การปรับเปลี่ยน และการเคลื่อนไหวตอบโต้ที่ไม่มีใครคาดคิดได้ จะเป็นอย่างไรถ้าคุณสามารถเห็นว่าสถานการณ์จะคลี่คลายลงอย่างไรก่อนที่จะเกิดขึ้นจริงในโลก?
MiroFish ทำเช่นนั้นได้อย่างแม่นยำ มันคือเอ็นจิ้นปัญญาแบบฝูงที่สร้างโลกดิจิทัลคู่ขนานที่ซึ่งเอเจนต์ AI นับพันตัวซึ่งมีบุคลิก ความทรงจำ และรูปแบบพฤติกรรมที่แตกต่างกันมีปฏิสัมพันธ์กันอย่างอิสระ คุณอัปโหลดข้อมูลเริ่มต้น—เช่น ข่าว บทความ ร่างนโยบาย หรือแม้แต่นวนิยาย—แล้ว MiroFish จะสร้างการจำลองสถานการณ์ที่มีความแม่นยำสูงว่าเหตุการณ์ต่างๆ อาจคลี่คลายลงอย่างไร
โพสต์นี้จะเจาะลึกสถาปัตยกรรมทางเทคนิคเบื้องหลัง MiroFish คุณจะได้เรียนรู้ว่าระบบเปลี่ยนเอกสารดิบให้เป็นการจำลองที่มีชีวิตได้อย่างไร เอเจนต์ตัดสินใจได้อย่างไร และเวิร์กโฟลว์ห้าขั้นตอนจัดระเบียบทุกอย่างตั้งแต่การสร้างกราฟความรู้ไปจนถึงการตรวจสอบแบบเรียลไทม์ได้อย่างไร

ภาพรวมระบบ: เวิร์กโฟลว์ห้าขั้นตอน
MiroFish ประมวลผลการจำลองผ่านห้าขั้นตอนที่แตกต่างกัน:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
ขั้นตอนที่ 1: การสร้าง Ontology
ระบบจะวิเคราะห์เอกสารอินพุตและข้อกำหนดการจำลองของคุณ จากนั้นใช้ LLM เพื่อสร้าง Ontology ที่กำหนดเอง ซึ่งกำหนดสิ่งต่อไปนี้:
- 10 ประเภทเอนทิตี (เช่น นักเรียน, ศาสตราจารย์, มหาวิทยาลัย, สำนักข่าว, หน่วยงานราชการ)
- 10 ประเภทความสัมพันธ์ (เช่น ทำงานให้, แสดงความคิดเห็นเกี่ยวกับ, ตอบกลับ)
- คุณสมบัติ (Attributes) สำหรับแต่ละประเภท (หลีกเลี่ยงคำสงวน เช่น
name,uuid,created_at)
ตัวสร้าง Ontology บังคับใช้โครงสร้างสองชั้น: 8 ประเภทเฉพาะตามเนื้อหาของคุณ บวกกับ 2 ประเภทสำรอง (Person และ Organization) เพื่อครอบคลุมสิ่งที่ไม่เข้ากับประเภทอื่น
ขั้นตอนที่ 2: การสร้าง GraphRAG
เอกสารจะถูกแบ่งเป็นชิ้นย่อย (500 อักขระ, ซ้อนทับกัน 50) และส่งไปยัง Zep Cloud เป็นชุด ระบบจะดำเนินการดังนี้:
- สร้างกราฟอิสระด้วย ID ที่ไม่ซ้ำกัน
- ตั้งค่า ontology ที่กำหนดเอง
- ส่งชุดข้อความเพื่อแยกเอนทิตีและความสัมพันธ์
- รอให้ Zep ประมวลผลแต่ละตอน
- เรียกคืนกราฟสุดท้ายพร้อมโหนดและเอดจ์
ขั้นตอนที่ 3: การตั้งค่าสภาพแวดล้อม
ตัวสร้างการกำหนดค่าการจำลองจะวิเคราะห์กราฟความรู้และสร้างพารามิเตอร์เอเจนต์โดยละเอียด:
- การกำหนดค่าเวลา ตามรูปแบบเขตเวลาของจีน (ชั่วโมงเร่งด่วน 19-22 น., ชั่วโมงเงียบ 0-5 น.)
- การกำหนดค่าเหตุการณ์ ด้วยโพสต์เริ่มต้นและหัวข้อที่กำลังมาแรง
- การกำหนดค่ากิจกรรมของเอเจนต์ (จำนวนโพสต์ต่อชั่วโมง, ความล่าช้าในการตอบสนอง, น้ำหนักอิทธิพล)
- การกำหนดค่าแพลตฟอร์ม สำหรับ Twitter และ Reddit ด้วยเกณฑ์การแพร่กระจายที่แตกต่างกัน
ขั้นตอนที่ 4: การเรียกใช้การจำลอง
เอเจนต์จะตื่นขึ้นตามตารางกิจกรรมและเริ่มโพสต์ แสดงความคิดเห็น และตอบสนอง ระบบจะเรียกใช้การจำลองแบบคู่ขนานบน Twitter และ Reddit โดยบันทึกทุกการกระทำลงในไฟล์ JSONL แบบเรียลไทม์
ขั้นตอนที่ 5: การสร้างรายงาน
เอเจนต์รายงานใช้เครื่องมือการเรียกค้นข้อมูลหลักสามอย่างเพื่อวิเคราะห์สิ่งที่เกิดขึ้น:
- InsightForge: การค้นหาเชิงลึกที่แยกคำถามออกเป็นคำค้นย่อย
- PanoramaSearch: มุมมองที่ครอบคลุมทั้งหมด รวมถึงข้อเท็จจริงทางประวัติศาสตร์ที่หมดอายุ/ไม่ถูกต้อง
- InterviewAgents: การสัมภาษณ์แบบเรียลไทม์กับเอเจนต์ที่ใช้งานอยู่ผ่าน IPC
เจาะลึกทางเทคนิค: การสร้าง Ontology
ตัวสร้าง Ontology อยู่ที่ backend/app/services/ontology_generator.py ซึ่งใช้พร้อมต์ระบบที่สร้างขึ้นอย่างพิถีพิถันเพื่อบังคับใช้กฎที่เข้มงวด
พร้อมต์ของระบบประกอบด้วยคำแนะนำที่ครอบคลุมเกี่ยวกับสิ่งใดที่ถือเป็นเอนทิตีที่ถูกต้อง (บุคคล, องค์กร, สำนักข่าว) เทียบกับสิ่งที่ไม่ใช่ (แนวคิดเชิงนามธรรม, ธีม, มุมมอง) ความแตกต่างนี้มีความสำคัญเนื่องจากการจำลองต้องมีเอเจนต์ที่สามารถพูดและกระทำบนโซเชียลมีเดียได้จริง
หลังจาก LLM สร้าง ontology แล้ว เมธอด _validate_and_process จะบังคับใช้ข้อจำกัด:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
เลเยอร์การตรวจสอบนี้ช่วยให้มั่นใจว่าผลลัพธ์ทำงานร่วมกับขีดจำกัด API ของ Zep ได้เสมอ ในขณะที่ยังคงรักษาสถาปัตยกรรมสองชั้นไว้
การสร้างกราฟความรู้: การผสานรวม Zep
บริการสร้างกราฟ (backend/app/services/graph_builder.py) จัดการเวิร์กโฟลว์แบบอะซิงโครนัส:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
การสร้างโมเดล Pydantic แบบไดนามิก
ส่วนหนึ่งที่ชาญฉลาด: ระบบสร้างโมเดล Pydantic แบบไดนามิกสำหรับแต่ละประเภทเอนทิตีในขณะรันไทม์:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
สิ่งนี้ช่วยให้ Zep ตรวจสอบคุณสมบัติของเอนทิตีกับสคีมาที่กำหนดเองได้ โดยไม่จำเป็นต้องมีโมเดลที่กำหนดไว้ล่วงหน้า
การแบ่งหน้าผ่านกราฟขนาดใหญ่
Zep ส่งคืนผลลัพธ์แบบแบ่งหน้า ยูทิลิตี zep_paging.py จะเรียกข้อมูลทั้งหมด:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
การจำลองกิจกรรมเอเจนต์ตามเวลา
ตัวสร้างการกำหนดค่าการจำลอง (backend/app/services/simulation_config_generator.py) สร้างรูปแบบกิจกรรมที่สมจริงตามพฤติกรรมของเขตเวลาจีน:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
ประเภทเอเจนต์ที่แตกต่างกันจะได้รับรูปแบบที่แตกต่างกัน:
| ประเภทเอเจนต์ | ระดับกิจกรรม | ชั่วโมงที่ใช้งาน | ความล่าช้าในการตอบสนอง | อิทธิพล |
|---|---|---|---|---|
| มหาวิทยาลัย | 0.2 | 9-17 | 60-240 นาที | 3.0 |
| สำนักข่าว | 0.5 | 7-23 | 5-30 นาที | 2.5 |
| นักเรียน | 0.8 | 8-12, 18-23 | 1-15 นาที | 0.8 |
| ศาสตราจารย์ | 0.4 | 8-21 | 15-90 นาที | 2.0 |
ตัวสร้างการกำหนดค่าใช้การเรียก LLM เพื่อปรับแต่งค่าเหล่านี้ตามสถานการณ์เฉพาะของคุณ จากนั้นจะกลับไปใช้ค่าเริ่มต้นตามกฎหาก LLM ล้มเหลว
การติดตามการกระทำแบบเรียลไทม์
ตัวเรียกใช้การจำลอง (backend/app/services/simulation_runner.py) ตรวจสอบกิจกรรมของเอเจนต์โดยการสตรีมบันทึก JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
สิ่งนี้ทำงานในเธรดเบื้องหลัง โดยอัปเดตสถานะการจำลองทุก 2 วินาที ฟรอนต์เอนด์จะดึงสถานะนี้เพื่อแสดงความคืบหน้าแบบเรียลไทม์
การจัดการกระบวนการข้ามแพลตฟอร์ม
การหยุดการจำลองต้องอาศัยการจัดการกระบวนการอย่างรอบคอบทั้งบน Windows และ Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
ตัวจัดการการล้างข้อมูลจะลงทะเบียนตัวจัดการสัญญาณสำหรับ SIGINT, SIGTERM และ SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
สิ่งนี้ทำให้แน่ใจว่าการจำลองจะหยุดทำงานอย่างราบรื่นเมื่อเซิร์ฟเวอร์ปิดตัวลง
การสร้างรายงาน: การเรียกค้นแบบสามระดับ
บริการเครื่องมือ Zep (backend/app/services/zep_tools.py) มีฟังก์ชันการเรียกค้นสามอย่าง:
InsightForge (การเจาะลึก)
แยกคำถามที่ซับซ้อนออกเป็นคำค้นย่อย ค้นหาแต่ละคำค้นย่อย จากนั้นรวบรวมข้อมูล:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (ครอบคลุมทั้งหมด)
เรียกข้อมูลทุกอย่าง รวมถึงข้อเท็จจริงทางประวัติศาสตร์ที่หมดอายุ/ไม่ถูกต้อง:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (เรียลไทม์)
เรียกใช้ OASIS interview API จริงเพื่อพูดคุยกับเอเจนต์ที่ใช้งานอยู่:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
การตัดสินใจทางวิศวกรรมที่สำคัญ
1. การจัดการงานแบบ Async
การดำเนินการที่ใช้เวลานาน (การสร้างกราฟ, การเรียกใช้การจำลอง) ใช้งายแบบอะซิงโครนัสพร้อมการติดตามความคืบหน้า:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
ฟรอนต์เอนด์จะสอบถามสถานะของงานผ่าน /api/graph/task/{task_id}
2. การเรียกใช้ LLM แบบแบทช์พร้อมการลองใหม่
การสร้างการกำหนดค่าจะแบ่งรายการเอเจนต์ขนาดใหญ่ออกเป็นชุดละ 15 รายการ:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
แต่ละชุดจะรวมตรรกะการซ่อมแซม JSON สำหรับผลลัพธ์ที่ถูกตัด:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. การจำลองแบบคู่ขนานหลายแพลตฟอร์ม
Twitter และ Reddit ทำงานแบบคู่ขนานด้วยฐานข้อมูลและบันทึกการกระทำที่แยกจากกัน:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
ตัวเรียกใช้จะตรวจจับการเสร็จสิ้นต่อแพลตฟอร์มผ่านเหตุการณ์ simulation_end
ข้อควรพิจารณาด้านประสิทธิภาพ
การจัดการหน่วยความจำ
- เอกสารขนาดใหญ่จะถูกตัดทอนให้เหลือ 50k ตัวอักษรสำหรับบริบทของ LLM
- สรุปเอนทิตีถูกจำกัดไว้ที่ 300 ตัวอักษรต่อรายการ
- การกระทำล่าสุดจำกัดไว้ที่ 50 รายการในหน่วยความจำ (ประวัติทั้งหมดอยู่ในไฟล์ JSONL)
การแยกฐานข้อมูล
แต่ละแพลตฟอร์มใช้ฐานข้อมูล SQLite ของตนเองเพื่อหลีกเลี่ยงการแย่งชิงการล็อกระหว่างการเขียนแบบขนาน
การลดทอนประสิทธิภาพอย่างนุ่มนวล
เมื่อ Zep Search API ล้มเหลว ระบบจะกลับไปใช้การจับคู่คีย์เวิร์ดในเครื่องแทน:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
บทสรุป
MiroFish แสดงให้เห็นถึงวิธีการสร้างระบบจำลองหลายเอเจนต์ที่สมบูรณ์ตั้งแต่เริ่มต้น เวิร์กโฟลว์ห้าขั้นตอนเปลี่ยนเอกสารดิบให้เป็นโลกดิจิทัลที่มีชีวิต ซึ่งเอเจนต์นับพันมีปฏิสัมพันธ์ตามรูปแบบพฤติกรรมที่สมจริง
ประเด็นสำคัญที่ได้เรียนรู้:
- การออกแบบ Ontology มีความสำคัญ: โครงสร้างสองชั้น (8 ประเภทเฉพาะ + 2 ประเภทสำรอง) ช่วยให้ครอบคลุมโดยไม่เกินขีดจำกัดของ API
- เวิร์กโฟลว์แบบ Async ช่วยให้การดำเนินการที่ใช้เวลานานเป็นไปได้: การติดตามงานพร้อมการอัปเดตความคืบหน้าทำให้ผู้ใช้ได้รับข้อมูลระหว่างการดำเนินการหลายนาที
- กิจกรรมตามเวลาสร้างความสมจริง: รูปแบบเขตเวลาของจีนและตารางเวลาเฉพาะประเภทเอเจนต์ทำให้เกิดพฤติกรรมที่น่าเชื่อถือ
- การจำลองแบบสองแพลตฟอร์มช่วยในการเปรียบเทียบ: การเรียกใช้ Twitter และ Reddit แบบคู่ขนานแสดงให้เห็นว่าพลวัตของแพลตฟอร์มส่งผลต่อผลลัพธ์อย่างไร
- การเรียกค้นแบบสามระดับตอบสนองความต้องการที่แตกต่างกัน: InsightForge สำหรับความลึก, PanoramaSearch สำหรับความกว้าง, InterviewAgents สำหรับมุมมองโดยตรง
ซอร์สโค้ดฉบับเต็มมีอยู่ที่ github.com/666ghj/MiroFish
ต้องการลองใช้ MiroFish ไหม? เยี่ยมชม เดโมสด เพื่อดูการจำลองเหตุการณ์ฮอตสปอตในการทำงาน
