Comment MiroFish Crée des Mondes Parallèles Numériques ?

Ashley Innocent

Ashley Innocent

19 March 2026

Comment MiroFish Crée des Mondes Parallèles Numériques ?

Introduction

Les médias sociaux évoluent rapidement. Un simple message peut déclencher des cascades de réactions, de reformulations et de contre-mouvements que personne n'aurait prédits. Et si vous pouviez voir comment un scénario se déroule avant qu'il ne se produise dans le monde réel ?

MiroFish fait exactement cela. C'est un moteur d'intelligence collective qui crée des mondes parallèles numériques où des milliers d'agents IA dotés de personnalités, de mémoires et de modèles comportementaux distincts interagissent librement. Vous téléchargez du matériel source — un article de presse, un projet de politique, même un roman — et MiroFish construit une simulation haute fidélité de la manière dont les événements pourraient se dérouler.

💡
La construction de MiroFish a nécessité une base fiable pour les tests d'API. L'équipe a utilisé Apidog pour concevoir, déboguer et documenter toutes les API backend avant d'écrire la logique de simulation. Cela a permis de détecter les problèmes de points d'extrémité tôt et de maintenir le backend Python et le frontend Vue synchronisés tout au long du développement.
bouton

Ce billet décompose l'architecture technique derrière MiroFish. Vous apprendrez comment le système transforme des documents bruts en simulations vivantes, comment les agents prennent des décisions, et comment le flux de travail en cinq étapes orchestre tout, de la construction du graphe de connaissances à la surveillance en temps réel.

Vue d'ensemble du système : Le flux de travail en cinq étapes

MiroFish traite les simulations à travers cinq phases distinctes :

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Étape 1   │ ──► │   Étape 2   │ ──► │   Étape 3   │ ──► │   Étape 4   │ ──► │   Étape 5   │
│  Génération │     │  Construction │     │ Configuration │     │ Exécution   │     │ Génération  │
│  d'ontologie│     │   GraphRAG    │     │  Environnement│     │ Simulation  │     │ de rapports │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

Étape 1 : Génération de l'ontologie

Le système analyse vos documents d'entrée et vos exigences de simulation, puis utilise un LLM pour générer une ontologie personnalisée. Cela définit :

Le générateur d'ontologie impose une structure à deux niveaux : 8 types spécifiques basés sur votre contenu, plus 2 types de repli (Personne et Organisation) pour saisir tout ce qui ne correspond pas ailleurs.

Étape 2 : Construction du GraphRAG

Les documents sont découpés en morceaux (500 caractères, 50 de chevauchement) et envoyés à Zep Cloud par lots. Le système :

  1. Crée un graphe autonome avec un ID unique
  2. Définit l'ontologie personnalisée
  3. Envoie des lots de texte pour l'extraction d'entités et de relations
  4. Attend que Zep traite chaque épisode
  5. Récupère le graphe final avec les nœuds et les arêtes

Étape 3 : Configuration de l'environnement

Le générateur de configuration de simulation analyse le graphe de connaissances et crée des paramètres d'agent détaillés :

Étape 4 : Exécution de la simulation

Les agents se réveillent selon leurs horaires d'activité et commencent à publier, commenter et réagir. Le système exécute des simulations parallèles sur Twitter et Reddit, enregistrant chaque action dans des fichiers JSONL en temps réel.

Étape 5 : Génération de rapports

L'Agent de rapport utilise trois outils de récupération principaux pour analyser ce qui s'est passé :

Approfondissement technique : Génération de l'ontologie

Le générateur d'ontologie se trouve dans backend/app/services/ontology_generator.py. Il utilise une invite système soigneusement élaborée qui applique des règles strictes.

L'invite système comprend des directives étendues sur ce qui constitue une entité valide (personnes, organisations, médias) par rapport à ce qui ne l'est pas (concepts abstraits, thèmes, points de vue). Cette distinction est importante car la simulation a besoin d'agents capables de parler et d'agir sur les médias sociaux.

Après que le LLM a généré l'ontologie, la méthode _validate_and_process applique les contraintes :

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

Cette couche de validation garantit que la sortie fonctionne toujours avec les limites de l'API de Zep tout en conservant la structure à deux niveaux.

Construction du graphe de connaissances : Intégration Zep

Le service de construction de graphes (backend/app/services/graph_builder.py) gère le flux de travail asynchrone :

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)

Génération dynamique de modèles Pydantic

Une astuce ingénieuse : le système crée dynamiquement des modèles Pydantic pour chaque type d'entité à l'exécution :

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

Cela permet à Zep de valider les attributs d'entité par rapport au schéma personnalisé sans nécessiter de modèles prédéfinis.

Pagination des grands graphes

Zep renvoie des résultats paginés. L'utilitaire zep_paging.py récupère tout :

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

Simulation d'activité d'agent basée sur le temps

Le générateur de configuration de simulation (backend/app/services/simulation_config_generator.py) crée des modèles d'activité réalistes basés sur le comportement du fuseau horaire chinois :

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

Différents types d'agents ont des modèles différents :

Type d'Agent Niveau d'activité Heures d'activité Délai de réponse Influence
Université 0.2 9-17 60-240 min 3.0
Organe de presse 0.5 7-23 5-30 min 2.5
Étudiant 0.8 8-12, 18-23 1-15 min 0.8
Professeur 0.4 8-21 15-90 min 2.0

Le générateur de configuration utilise des appels LLM pour personnaliser ces valeurs en fonction de votre scénario spécifique, puis revient aux valeurs par défaut basées sur des règles si le LLM échoue.

Suivi des actions en temps réel

L'exécuteur de simulation (backend/app/services/simulation_runner.py) surveille l'activité des agents en diffusant les journaux JSONL :

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

Cela s'exécute dans un thread d'arrière-plan, mettant à jour l'état de la simulation toutes les 2 secondes. Le frontend interroge cet état pour afficher la progression en temps réel.

Gestion des processus multiplateforme

L'arrêt des simulations nécessite une gestion rigoureuse des processus sur Windows et Unix :

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

Le gestionnaire de nettoyage enregistre les gestionnaires de signaux pour SIGINT, SIGTERM et SIGHUP :

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

Cela garantit que les simulations s'arrêtent correctement lorsque le serveur s'éteint.

Génération de rapports : Récupération à trois niveaux

Le service d'outils Zep (backend/app/services/zep_tools.py) fournit trois fonctions de récupération :

InsightForge (Analyse approfondie)

Décompose les questions complexes en sous-requêtes, recherche chacune, puis les agrège :

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (Portée complète)

Récupère tout, y compris les faits historiques expirés/invalides :

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (Temps réel)

Appelle l'API d'entretien OASIS réelle pour parler aux agents actifs :

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

Décisions d'ingénierie clés

1. Gestion des tâches asynchrones

Les opérations de longue durée (construction de graphes, exécution de simulations) utilisent des tâches asynchrones avec suivi de progression :

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

Le frontend interroge le statut de la tâche via /api/graph/task/{task_id}.

2. Appels LLM par lots avec réessai

La génération de configuration divise les grandes listes d'agents en lots de 15 :

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

Chaque lot inclut une logique de réparation JSON pour les sorties tronquées :

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. Simulation parallèle multiplateforme

Twitter et Reddit fonctionnent en parallèle avec des bases de données et des journaux d'actions séparés :

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

L'exécuteur détecte l'achèvement par plateforme via les événements simulation_end.

Considérations de performance

Gestion de la mémoire

Isolation de la base de données

Chaque plateforme utilise sa propre base de données SQLite pour éviter les conflits de verrouillage lors des écritures parallèles.

Dégradation gracieuse

Lorsque l'API de recherche Zep échoue, le système revient à la correspondance de mots-clés locale :

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)

Conclusion

MiroFish démontre comment construire un système complet de simulation multi-agents à partir de zéro. Le flux de travail en cinq étapes transforme des documents bruts en mondes numériques vivants où des milliers d'agents interagissent selon des modèles de comportement réalistes.

Points clés à retenir :

  1. La conception de l'ontologie est importante : La structure à deux niveaux (8 types spécifiques + 2 types de repli) assure une couverture sans dépasser les limites de l'API.
  2. Les flux de travail asynchrones permettent des opérations longues : Le suivi des tâches avec des mises à jour de progression tient les utilisateurs informés pendant les opérations de plusieurs minutes.
  3. L'activité basée sur le temps crée du réalisme : Les modèles de fuseau horaire chinois et les horaires spécifiques aux types d'agents produisent un comportement crédible.
  4. La simulation multiplateforme offre une comparaison : L'exécution de Twitter et Reddit en parallèle montre comment la dynamique des plateformes affecte les résultats.
  5. La récupération à trois niveaux répond à différents besoins : InsightForge pour la profondeur, PanoramaSearch pour l'étendue, InterviewAgents pour les perspectives directes.

Le code source complet est disponible sur github.com/666ghj/MiroFish.

bouton

Vous voulez essayer MiroFish ? Visitez la démo en direct pour voir une simulation d'événement hotspot en action.

Pratiquez le Design-first d'API dans Apidog

Découvrez une manière plus simple de créer et utiliser des API