Introduction
Social media moves fast. A single post can trigger cascades of reactions, reshapes, and counter-movements that nobody predicted. What if you could see how a scenario plays out before it happens in the real world?
MiroFish does exactly that. It’s a swarm intelligence engine that creates digital parallel worlds where thousands of AI agents with distinct personalities, memories, and behavioral patterns interact freely. You upload seed material—a news article, a policy draft, even a novel—and MiroFish builds a high-fidelity simulation of how events might unfold.
This post breaks down the technical architecture behind MiroFish. You’ll learn how the system transforms raw documents into living simulations, how agents make decisions, and how the five-step workflow orchestrates everything from knowledge graph construction to real-time monitoring.

System Overview: The Five-Step Workflow
MiroFish processes simulations through five distinct phases:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Step 1: Ontology Generation
The system analyzes your input documents and simulation requirements, then uses an LLM to generate a custom ontology. This defines:
- 10 entity types (e.g., Student, Professor, University, MediaOutlet, GovernmentAgency)
- 10 relationship types (e.g., WORKS_FOR, COMMENTS_ON, RESPONDS_TO)
- Attributes for each type (avoiding reserved words like
name,uuid,created_at)
The ontology generator enforces a two-tier structure: 8 specific types based on your content, plus 2 fallback types (Person and Organization) to catch anything that doesn’t fit elsewhere.
Step 2: GraphRAG Construction
Documents get chunked (500 characters, 50 overlap) and sent to Zep Cloud in batches. The system:
- Creates a standalone graph with a unique ID
- Sets the custom ontology
- Sends text batches for entity and relationship extraction
- Waits for Zep to process each episode
- Retrieves the final graph with nodes and edges
Step 3: Environment Setup
The simulation config generator analyzes the knowledge graph and creates detailed agent parameters:
- Time configuration based on Chinese timezone patterns (peak hours 19-22, dead hours 0-5)
- Event configuration with initial posts and hot topics
- Agent activity configs (posts per hour, response delays, influence weights)
- Platform configs for Twitter and Reddit with different viral thresholds
Step 4: Simulation Run
Agents wake up according to their activity schedules and start posting, commenting, and reacting. The system runs parallel simulations on Twitter and Reddit, logging every action to JSONL files in real-time.
Step 5: Report Generation
The Report Agent uses three core retrieval tools to analyze what happened:
- InsightForge: Deep-dive search that decomposes questions into sub-queries
- PanoramaSearch: Full-scope view including expired/invalid historical facts
- InterviewAgents: Real-time interviews with active agents via IPC
Technical Deep Dive: Ontology Generation
The ontology generator lives in backend/app/services/ontology_generator.py. It uses a carefully crafted system prompt that enforces strict rules.
The system prompt includes extensive guidance on what counts as a valid entity (people, organizations, media outlets) versus what doesn’t (abstract concepts, themes, viewpoints). This distinction matters because the simulation needs agents that can actually speak and act on social media.
After the LLM generates the ontology, the _validate_and_process method enforces constraints:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
This validation layer ensures the output always works with Zep’s API limits while maintaining the two-tier structure.
Knowledge Graph Construction: Zep Integration
The graph builder service (backend/app/services/graph_builder.py) handles the async workflow:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Dynamic Pydantic Model Generation
One clever piece: the system dynamically creates Pydantic models for each entity type at runtime:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
This lets Zep validate entity attributes against the custom schema without requiring pre-defined models.
Paging Through Large Graphs
Zep returns paginated results. The zep_paging.py utility fetches everything:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Time-Based Agent Activity Simulation
The simulation config generator (backend/app/services/simulation_config_generator.py) creates realistic activity patterns based on Chinese timezone behavior:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Different agent types get different patterns:
| Agent Type | Activity Level | Active Hours | Response Delay | Influence |
|---|---|---|---|---|
| University | 0.2 | 9-17 | 60-240 min | 3.0 |
| MediaOutlet | 0.5 | 7-23 | 5-30 min | 2.5 |
| Student | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professor | 0.4 | 8-21 | 15-90 min | 2.0 |
The config generator uses LLM calls to customize these values based on your specific scenario, then falls back to rule-based defaults if the LLM fails.
Real-Time Action Tracking
The simulation runner (backend/app/services/simulation_runner.py) monitors agent activity by streaming JSONL logs:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
This runs in a background thread, updating the simulation state every 2 seconds. The frontend polls this state to show real-time progress.
Cross-Platform Process Management
Stopping simulations requires careful process management across Windows and Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
The cleanup handler registers signal handlers for SIGINT, SIGTERM, and SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
This ensures simulations stop gracefully when the server shuts down.
Report Generation: Three-Tier Retrieval
The Zep tools service (backend/app/services/zep_tools.py) provides three retrieval functions:
InsightForge (Deep Dive)
Decomposes complex questions into sub-queries, searches each, then aggregates:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Full Scope)
Retrieves everything including expired/invalid historical facts:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Real-Time)
Calls the actual OASIS interview API to talk to active agents:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Key Engineering Decisions
1. Async Task Management
Long-running operations (graph build, simulation run) use async tasks with progress tracking:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
The frontend polls task status via /api/graph/task/{task_id}.
2. Batch LLM Calls with Retry
Config generation splits large agent lists into batches of 15:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Each batch includes JSON repair logic for truncated outputs:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Dual-Platform Parallel Simulation
Twitter and Reddit run in parallel with separate databases and action logs:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
The runner detects completion per-platform via simulation_end events.
Performance Considerations
Memory Management
- Large documents get truncated to 50k characters for LLM context
- Entity summaries limited to 300 characters each
- Recent actions capped at 50 in memory (full history in JSONL files)
Database Isolation
Each platform uses its own SQLite database to avoid lock contention during parallel writes.
Graceful Degradation
When Zep Search API fails, the system falls back to local keyword matching:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusion
MiroFish demonstrates how to build a complete multi-agent simulation system from scratch. The five-step workflow transforms raw documents into living digital worlds where thousands of agents interact according to realistic behavioral patterns.
Key takeaways:
- Ontology design matters: The two-tier structure (8 specific + 2 fallback types) ensures coverage without exceeding API limits
- Async workflows enable long operations: Task tracking with progress updates keeps users informed during multi-minute operations
- Time-based activity creates realism: Chinese timezone patterns and agent-type-specific schedules produce believable behavior
- Dual-platform simulation provides comparison: Running Twitter and Reddit in parallel shows how platform dynamics affect outcomes
- Three-tier retrieval serves different needs: InsightForge for depth, PanoramaSearch for breadth, InterviewAgents for direct perspectives
The full source code is available at github.com/666ghj/MiroFish.
Want to try MiroFish? Visit the live demo to see a hotspot event simulation in action.



