Giới thiệu
Mạng xã hội phát triển nhanh chóng. Một bài đăng duy nhất có thể kích hoạt hàng loạt phản ứng, định hình lại và các phong trào phản đối mà không ai dự đoán được. Điều gì sẽ xảy ra nếu bạn có thể thấy một kịch bản diễn ra như thế nào trước khi nó xảy ra trong thế giới thực?
MiroFish làm chính xác điều đó. Đây là một công cụ trí tuệ bầy đàn tạo ra các thế giới song song kỹ thuật số, nơi hàng ngàn tác nhân AI với tính cách, ký ức và kiểu hành vi riêng biệt tương tác tự do. Bạn tải lên tài liệu gốc—một bài báo, một dự thảo chính sách, thậm chí là một cuốn tiểu thuyết—và MiroFish xây dựng một mô phỏng có độ chân thực cao về cách các sự kiện có thể diễn ra.
Bài viết này sẽ phân tích kiến trúc kỹ thuật đằng sau MiroFish. Bạn sẽ tìm hiểu cách hệ thống biến đổi các tài liệu thô thành các mô phỏng sống động, cách các tác nhân đưa ra quyết định và cách quy trình làm việc năm bước điều phối mọi thứ từ việc xây dựng biểu đồ tri thức đến giám sát thời gian thực.

Tổng quan hệ thống: Quy trình làm việc năm bước
MiroFish xử lý các mô phỏng thông qua năm giai đoạn riêng biệt:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Bước 1: Tạo Ontology
Hệ thống phân tích tài liệu đầu vào và yêu cầu mô phỏng của bạn, sau đó sử dụng một LLM để tạo ra một ontology tùy chỉnh. Ontology này định nghĩa:
- 10 loại thực thể (ví dụ: Student, Professor, University, MediaOutlet, GovernmentAgency)
- 10 loại mối quan hệ (ví dụ: WORKS_FOR, COMMENTS_ON, RESPONDS_TO)
- Các thuộc tính cho mỗi loại (tránh các từ dành riêng như
name,uuid,created_at)
Trình tạo ontology áp dụng cấu trúc hai cấp: 8 loại cụ thể dựa trên nội dung của bạn, cộng với 2 loại dự phòng (Person và Organization) để xử lý bất cứ thứ gì không phù hợp ở những nơi khác.
Bước 2: Xây dựng GraphRAG
Các tài liệu được chia thành từng đoạn (500 ký tự, chồng lấn 50) và gửi đến Zep Cloud theo lô. Hệ thống:
- Tạo một biểu đồ độc lập với ID duy nhất
- Thiết lập ontology tùy chỉnh
- Gửi các lô văn bản để trích xuất thực thể và mối quan hệ
- Chờ Zep xử lý từng sự kiện
- Truy xuất biểu đồ cuối cùng với các nút và cạnh
Bước 3: Thiết lập môi trường
Trình tạo cấu hình mô phỏng phân tích biểu đồ tri thức và tạo các tham số tác nhân chi tiết:
- Cấu hình thời gian dựa trên các mẫu múi giờ Trung Quốc (giờ cao điểm 19-22, giờ thấp điểm 0-5)
- Cấu hình sự kiện với các bài đăng ban đầu và các chủ đề nóng
- Cấu hình hoạt động của tác nhân (số bài đăng mỗi giờ, độ trễ phản hồi, trọng số ảnh hưởng)
- Cấu hình nền tảng cho Twitter và Reddit với các ngưỡng lan truyền khác nhau
Bước 4: Chạy mô phỏng
Các tác nhân thức dậy theo lịch trình hoạt động của họ và bắt đầu đăng bài, bình luận và phản ứng. Hệ thống chạy các mô phỏng song song trên Twitter và Reddit, ghi lại mọi hành động vào các tệp JSONL trong thời gian thực.
Bước 5: Tạo báo cáo
Tác nhân Báo cáo sử dụng ba công cụ truy xuất cốt lõi để phân tích những gì đã xảy ra:
- InsightForge: Tìm kiếm chuyên sâu phân tách các câu hỏi thành các truy vấn con
- PanoramaSearch: Chế độ xem toàn diện bao gồm các sự kiện lịch sử đã hết hạn/không hợp lệ
- InterviewAgents: Phỏng vấn thời gian thực với các tác nhân đang hoạt động thông qua IPC
Tìm hiểu sâu về kỹ thuật: Tạo Ontology
Trình tạo ontology nằm trong backend/app/services/ontology_generator.py. Nó sử dụng một lời nhắc hệ thống được tạo cẩn thận để thực thi các quy tắc nghiêm ngặt.
Lời nhắc hệ thống bao gồm hướng dẫn chi tiết về những gì được coi là một thực thể hợp lệ (người, tổ chức, cơ quan truyền thông) so với những gì không (khái niệm trừu tượng, chủ đề, quan điểm). Sự khác biệt này rất quan trọng vì mô phỏng cần các tác nhân có thể thực sự nói và hành động trên mạng xã hội.
Sau khi LLM tạo ontology, phương thức _validate_and_process thực thi các ràng buộc:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Lớp xác thực này đảm bảo đầu ra luôn hoạt động với các giới hạn API của Zep đồng thời duy trì cấu trúc hai cấp.
Xây dựng biểu đồ tri thức: Tích hợp Zep
Dịch vụ xây dựng biểu đồ (backend/app/services/graph_builder.py) xử lý quy trình làm việc không đồng bộ:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Tạo mô hình Pydantic động
Một phần thông minh: hệ thống tự động tạo các mô hình Pydantic cho từng loại thực thể trong thời gian chạy:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Điều này cho phép Zep xác thực các thuộc tính thực thể dựa trên lược đồ tùy chỉnh mà không yêu cầu các mô hình được định nghĩa trước.
Phân trang qua các biểu đồ lớn
Zep trả về kết quả được phân trang. Tiện ích zep_paging.py truy xuất mọi thứ:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Mô phỏng hoạt động của tác nhân dựa trên thời gian
Trình tạo cấu hình mô phỏng (backend/app/services/simulation_config_generator.py) tạo các mẫu hoạt động thực tế dựa trên hành vi múi giờ Trung Quốc:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人 (Gần như không có ai vào sáng sớm)
"morning_hours": [6, 7, 8], # 早间逐渐活跃 (Sáng dần trở nên sôi động)
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰 (Cao điểm buổi tối)
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Các loại tác nhân khác nhau có các mẫu khác nhau:
| Loại tác nhân | Mức độ hoạt động | Giờ hoạt động | Độ trễ phản hồi | Ảnh hưởng |
|---|---|---|---|---|
| Đại học | 0.2 | 9-17 | 60-240 min | 3.0 |
| Cơ quan truyền thông | 0.5 | 7-23 | 5-30 min | 2.5 |
| Sinh viên | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Giáo sư | 0.4 | 8-21 | 15-90 min | 2.0 |
Trình tạo cấu hình sử dụng các cuộc gọi LLM để tùy chỉnh các giá trị này dựa trên kịch bản cụ thể của bạn, sau đó quay lại các giá trị mặc định dựa trên quy tắc nếu LLM thất bại.
Theo dõi hành động theo thời gian thực
Trình chạy mô phỏng (backend/app/services/simulation_runner.py) giám sát hoạt động của tác nhân bằng cách truyền trực tuyến nhật ký JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
Điều này chạy trong một luồng nền, cập nhật trạng thái mô phỏng cứ sau 2 giây. Frontend thăm dò trạng thái này để hiển thị tiến độ thời gian thực.
Quản lý tiến trình đa nền tảng
Việc dừng mô phỏng yêu cầu quản lý tiến trình cẩn thận trên cả Windows và Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: sử dụng taskkill để tắt cây tiến trình
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: tắt nhóm tiến trình (được tạo với start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Trình xử lý dọn dẹp đăng ký các trình xử lý tín hiệu cho SIGINT, SIGTERM và SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Sau đó gọi trình xử lý gốc
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Điều này đảm bảo các mô phỏng dừng lại một cách có trật tự khi máy chủ tắt.
Tạo báo cáo: Truy xuất ba cấp
Dịch vụ công cụ Zep (backend/app/services/zep_tools.py) cung cấp ba chức năng truy xuất:
InsightForge (Tìm hiểu chuyên sâu)
Phân tách các câu hỏi phức tạp thành các truy vấn con, tìm kiếm từng truy vấn, sau đó tổng hợp:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Phạm vi đầy đủ)
Truy xuất mọi thứ bao gồm các sự kiện lịch sử đã hết hạn/không hợp lệ:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Thời gian thực)
Gọi API phỏng vấn OASIS thực tế để nói chuyện với các tác nhân đang hoạt động:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Các quyết định kỹ thuật quan trọng
1. Quản lý tác vụ không đồng bộ
Các hoạt động chạy dài (xây dựng biểu đồ, chạy mô phỏng) sử dụng các tác vụ không đồng bộ với tính năng theo dõi tiến độ:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
Frontend thăm dò trạng thái tác vụ thông qua /api/graph/task/{task_id}.
2. Gọi LLM theo lô với tính năng thử lại
Việc tạo cấu hình chia danh sách tác nhân lớn thành các lô 15:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Mỗi lô bao gồm logic sửa chữa JSON cho các đầu ra bị cắt cụt:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Mô phỏng song song trên hai nền tảng
Twitter và Reddit chạy song song với các cơ sở dữ liệu và nhật ký hành động riêng biệt:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Trình chạy phát hiện hoàn thành trên mỗi nền tảng thông qua các sự kiện simulation_end.
Các cân nhắc về hiệu suất
Quản lý bộ nhớ
- Các tài liệu lớn được cắt ngắn thành 50 nghìn ký tự cho ngữ cảnh LLM
- Tóm tắt thực thể giới hạn 300 ký tự mỗi bản
- Các hành động gần đây được giới hạn ở 50 trong bộ nhớ (lịch sử đầy đủ trong các tệp JSONL)
Cách ly cơ sở dữ liệu
Mỗi nền tảng sử dụng cơ sở dữ liệu SQLite riêng để tránh tranh chấp khóa trong quá trình ghi song song.
Suy thoái có thể kiểm soát được
Khi API tìm kiếm Zep thất bại, hệ thống sẽ quay lại khớp từ khóa cục bộ:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Kết luận
MiroFish trình bày cách xây dựng một hệ thống mô phỏng đa tác nhân hoàn chỉnh từ đầu. Quy trình làm việc năm bước biến đổi các tài liệu thô thành các thế giới kỹ thuật số sống động, nơi hàng ngàn tác nhân tương tác theo các kiểu hành vi thực tế.
Những điểm chính cần rút ra:
- Thiết kế Ontology rất quan trọng: Cấu trúc hai cấp (8 loại cụ thể + 2 loại dự phòng) đảm bảo phạm vi bao phủ mà không vượt quá giới hạn API
- Quy trình làm việc không đồng bộ cho phép các hoạt động dài: Theo dõi tác vụ với cập nhật tiến độ giúp người dùng được thông báo trong các hoạt động kéo dài nhiều phút
- Hoạt động dựa trên thời gian tạo ra tính chân thực: Các mẫu múi giờ Trung Quốc và lịch trình cụ thể theo loại tác nhân tạo ra hành vi đáng tin cậy
- Mô phỏng song song trên hai nền tảng cung cấp so sánh: Chạy Twitter và Reddit song song cho thấy động lực nền tảng ảnh hưởng đến kết quả như thế nào
- Truy xuất ba cấp phục vụ các nhu cầu khác nhau: InsightForge cho chiều sâu, PanoramaSearch cho chiều rộng, InterviewAgents cho các góc nhìn trực tiếp
Mã nguồn đầy đủ có sẵn tại github.com/666ghj/MiroFish.
Bạn muốn thử MiroFish? Hãy truy cập bản demo trực tiếp để xem mô phỏng sự kiện điểm nóng đang hoạt động.
