How MiroFish Creates Digital Parallel Worlds ?

Learn how MiroFish builds multi-agent simulation systems that create digital parallel worlds for social media prediction. Complete technical breakdown of swarm intelligence architecture.

Ashley Innocent

Ashley Innocent

19 March 2026

How MiroFish Creates Digital Parallel Worlds ?

Introduction

Social media moves fast. A single post can trigger cascades of reactions, reshapes, and counter-movements that nobody predicted. What if you could see how a scenario plays out before it happens in the real world?

MiroFish does exactly that. It’s a swarm intelligence engine that creates digital parallel worlds where thousands of AI agents with distinct personalities, memories, and behavioral patterns interact freely. You upload seed material—a news article, a policy draft, even a novel—and MiroFish builds a high-fidelity simulation of how events might unfold.

💡
Building MiroFish required a reliable API testing foundation. The team used Apidog to design, debug, and document all backend APIs before writing simulation logic. This caught endpoint issues early and kept the Python backend and Vue frontend in sync throughout development.
button

This post breaks down the technical architecture behind MiroFish. You’ll learn how the system transforms raw documents into living simulations, how agents make decisions, and how the five-step workflow orchestrates everything from knowledge graph construction to real-time monitoring.

System Overview: The Five-Step Workflow

MiroFish processes simulations through five distinct phases:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

Step 1: Ontology Generation

The system analyzes your input documents and simulation requirements, then uses an LLM to generate a custom ontology. This defines:

The ontology generator enforces a two-tier structure: 8 specific types based on your content, plus 2 fallback types (Person and Organization) to catch anything that doesn’t fit elsewhere.

Step 2: GraphRAG Construction

Documents get chunked (500 characters, 50 overlap) and sent to Zep Cloud in batches. The system:

  1. Creates a standalone graph with a unique ID
  2. Sets the custom ontology
  3. Sends text batches for entity and relationship extraction
  4. Waits for Zep to process each episode
  5. Retrieves the final graph with nodes and edges

Step 3: Environment Setup

The simulation config generator analyzes the knowledge graph and creates detailed agent parameters:

Step 4: Simulation Run

Agents wake up according to their activity schedules and start posting, commenting, and reacting. The system runs parallel simulations on Twitter and Reddit, logging every action to JSONL files in real-time.

Step 5: Report Generation

The Report Agent uses three core retrieval tools to analyze what happened:

Technical Deep Dive: Ontology Generation

The ontology generator lives in backend/app/services/ontology_generator.py. It uses a carefully crafted system prompt that enforces strict rules.

The system prompt includes extensive guidance on what counts as a valid entity (people, organizations, media outlets) versus what doesn’t (abstract concepts, themes, viewpoints). This distinction matters because the simulation needs agents that can actually speak and act on social media.

After the LLM generates the ontology, the _validate_and_process method enforces constraints:

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

This validation layer ensures the output always works with Zep’s API limits while maintaining the two-tier structure.

Knowledge Graph Construction: Zep Integration

The graph builder service (backend/app/services/graph_builder.py) handles the async workflow:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)

Dynamic Pydantic Model Generation

One clever piece: the system dynamically creates Pydantic models for each entity type at runtime:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

This lets Zep validate entity attributes against the custom schema without requiring pre-defined models.

Paging Through Large Graphs

Zep returns paginated results. The zep_paging.py utility fetches everything:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

Time-Based Agent Activity Simulation

The simulation config generator (backend/app/services/simulation_config_generator.py) creates realistic activity patterns based on Chinese timezone behavior:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

Different agent types get different patterns:

Agent Type Activity Level Active Hours Response Delay Influence
University 0.2 9-17 60-240 min 3.0
MediaOutlet 0.5 7-23 5-30 min 2.5
Student 0.8 8-12, 18-23 1-15 min 0.8
Professor 0.4 8-21 15-90 min 2.0

The config generator uses LLM calls to customize these values based on your specific scenario, then falls back to rule-based defaults if the LLM fails.

Real-Time Action Tracking

The simulation runner (backend/app/services/simulation_runner.py) monitors agent activity by streaming JSONL logs:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

This runs in a background thread, updating the simulation state every 2 seconds. The frontend polls this state to show real-time progress.

Cross-Platform Process Management

Stopping simulations requires careful process management across Windows and Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

The cleanup handler registers signal handlers for SIGINT, SIGTERM, and SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

This ensures simulations stop gracefully when the server shuts down.

Report Generation: Three-Tier Retrieval

The Zep tools service (backend/app/services/zep_tools.py) provides three retrieval functions:

InsightForge (Deep Dive)

Decomposes complex questions into sub-queries, searches each, then aggregates:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (Full Scope)

Retrieves everything including expired/invalid historical facts:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (Real-Time)

Calls the actual OASIS interview API to talk to active agents:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

Key Engineering Decisions

1. Async Task Management

Long-running operations (graph build, simulation run) use async tasks with progress tracking:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

The frontend polls task status via /api/graph/task/{task_id}.

2. Batch LLM Calls with Retry

Config generation splits large agent lists into batches of 15:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

Each batch includes JSON repair logic for truncated outputs:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. Dual-Platform Parallel Simulation

Twitter and Reddit run in parallel with separate databases and action logs:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

The runner detects completion per-platform via simulation_end events.

Performance Considerations

Memory Management

Database Isolation

Each platform uses its own SQLite database to avoid lock contention during parallel writes.

Graceful Degradation

When Zep Search API fails, the system falls back to local keyword matching:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)

Conclusion

MiroFish demonstrates how to build a complete multi-agent simulation system from scratch. The five-step workflow transforms raw documents into living digital worlds where thousands of agents interact according to realistic behavioral patterns.

Key takeaways:

  1. Ontology design matters: The two-tier structure (8 specific + 2 fallback types) ensures coverage without exceeding API limits
  2. Async workflows enable long operations: Task tracking with progress updates keeps users informed during multi-minute operations
  3. Time-based activity creates realism: Chinese timezone patterns and agent-type-specific schedules produce believable behavior
  4. Dual-platform simulation provides comparison: Running Twitter and Reddit in parallel shows how platform dynamics affect outcomes
  5. Three-tier retrieval serves different needs: InsightForge for depth, PanoramaSearch for breadth, InterviewAgents for direct perspectives

The full source code is available at github.com/666ghj/MiroFish.

button

Want to try MiroFish? Visit the live demo to see a hotspot event simulation in action.

Explore more

How to Remove Censorship from LLM Models with Heretic

How to Remove Censorship from LLM Models with Heretic

Learn how Heretic automatically removes safety filters from language models using directional ablation. Complete guide with installation, usage, and ethical deployment practices.

19 March 2026

Free Codex for Open Source Developers: Here is How to Apply

Free Codex for Open Source Developers: Here is How to Apply

Discover how to obtain the Free Codex for Open Source, including eligibility requirements, the application process, and real-world usage tips for open source developers.

19 March 2026

How to Train Your Own ChatGPT for $50?

How to Train Your Own ChatGPT for $50?

Train your own GPT-2 level chatbot for $50 in 2 hours. Complete guide to nanochat with code examples, benchmarks, and step-by-step instructions.

19 March 2026

Practice API Design-first in Apidog

Discover an easier way to build and use APIs