Introduction
Les médias sociaux évoluent rapidement. Un simple message peut déclencher des cascades de réactions, de reformulations et de contre-mouvements que personne n'aurait prédits. Et si vous pouviez voir comment un scénario se déroule avant qu'il ne se produise dans le monde réel ?
MiroFish fait exactement cela. C'est un moteur d'intelligence collective qui crée des mondes parallèles numériques où des milliers d'agents IA dotés de personnalités, de mémoires et de modèles comportementaux distincts interagissent librement. Vous téléchargez du matériel source — un article de presse, un projet de politique, même un roman — et MiroFish construit une simulation haute fidélité de la manière dont les événements pourraient se dérouler.
Ce billet décompose l'architecture technique derrière MiroFish. Vous apprendrez comment le système transforme des documents bruts en simulations vivantes, comment les agents prennent des décisions, et comment le flux de travail en cinq étapes orchestre tout, de la construction du graphe de connaissances à la surveillance en temps réel.

Vue d'ensemble du système : Le flux de travail en cinq étapes
MiroFish traite les simulations à travers cinq phases distinctes :
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Étape 1 │ ──► │ Étape 2 │ ──► │ Étape 3 │ ──► │ Étape 4 │ ──► │ Étape 5 │
│ Génération │ │ Construction │ │ Configuration │ │ Exécution │ │ Génération │
│ d'ontologie│ │ GraphRAG │ │ Environnement│ │ Simulation │ │ de rapports │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Étape 1 : Génération de l'ontologie
Le système analyse vos documents d'entrée et vos exigences de simulation, puis utilise un LLM pour générer une ontologie personnalisée. Cela définit :
- 10 types d'entités (par exemple, Étudiant, Professeur, Université, Organe de presse, Agence gouvernementale)
- 10 types de relations (par exemple, TRAVAILLE_POUR, COMMENTE_SUR, RÉPOND_À)
- Attributs pour chaque type (en évitant les mots réservés comme
name,uuid,created_at)
Le générateur d'ontologie impose une structure à deux niveaux : 8 types spécifiques basés sur votre contenu, plus 2 types de repli (Personne et Organisation) pour saisir tout ce qui ne correspond pas ailleurs.
Étape 2 : Construction du GraphRAG
Les documents sont découpés en morceaux (500 caractères, 50 de chevauchement) et envoyés à Zep Cloud par lots. Le système :
- Crée un graphe autonome avec un ID unique
- Définit l'ontologie personnalisée
- Envoie des lots de texte pour l'extraction d'entités et de relations
- Attend que Zep traite chaque épisode
- Récupère le graphe final avec les nœuds et les arêtes
Étape 3 : Configuration de l'environnement
Le générateur de configuration de simulation analyse le graphe de connaissances et crée des paramètres d'agent détaillés :
- Configuration temporelle basée sur les modèles de fuseau horaire chinois (heures de pointe 19-22, heures creuses 0-5)
- Configuration des événements avec des publications initiales et des sujets d'actualité
- Configurations d'activité des agents (publications par heure, délais de réponse, poids d'influence)
- Configurations de plateforme pour Twitter et Reddit avec des seuils viraux différents
Étape 4 : Exécution de la simulation
Les agents se réveillent selon leurs horaires d'activité et commencent à publier, commenter et réagir. Le système exécute des simulations parallèles sur Twitter et Reddit, enregistrant chaque action dans des fichiers JSONL en temps réel.
Étape 5 : Génération de rapports
L'Agent de rapport utilise trois outils de récupération principaux pour analyser ce qui s'est passé :
- InsightForge : Recherche approfondie qui décompose les questions en sous-requêtes
- PanoramaSearch : Vue d'ensemble complète incluant les faits historiques expirés/invalides
- InterviewAgents : Entretiens en temps réel avec des agents actifs via IPC
Approfondissement technique : Génération de l'ontologie
Le générateur d'ontologie se trouve dans backend/app/services/ontology_generator.py. Il utilise une invite système soigneusement élaborée qui applique des règles strictes.
L'invite système comprend des directives étendues sur ce qui constitue une entité valide (personnes, organisations, médias) par rapport à ce qui ne l'est pas (concepts abstraits, thèmes, points de vue). Cette distinction est importante car la simulation a besoin d'agents capables de parler et d'agir sur les médias sociaux.
Après que le LLM a généré l'ontologie, la méthode _validate_and_process applique les contraintes :
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Cette couche de validation garantit que la sortie fonctionne toujours avec les limites de l'API de Zep tout en conservant la structure à deux niveaux.
Construction du graphe de connaissances : Intégration Zep
Le service de construction de graphes (backend/app/services/graph_builder.py) gère le flux de travail asynchrone :
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Génération dynamique de modèles Pydantic
Une astuce ingénieuse : le système crée dynamiquement des modèles Pydantic pour chaque type d'entité à l'exécution :
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Cela permet à Zep de valider les attributs d'entité par rapport au schéma personnalisé sans nécessiter de modèles prédéfinis.
Pagination des grands graphes
Zep renvoie des résultats paginés. L'utilitaire zep_paging.py récupère tout :
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Simulation d'activité d'agent basée sur le temps
Le générateur de configuration de simulation (backend/app/services/simulation_config_generator.py) crée des modèles d'activité réalistes basés sur le comportement du fuseau horaire chinois :
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Différents types d'agents ont des modèles différents :
| Type d'Agent | Niveau d'activité | Heures d'activité | Délai de réponse | Influence |
|---|---|---|---|---|
| Université | 0.2 | 9-17 | 60-240 min | 3.0 |
| Organe de presse | 0.5 | 7-23 | 5-30 min | 2.5 |
| Étudiant | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professeur | 0.4 | 8-21 | 15-90 min | 2.0 |
Le générateur de configuration utilise des appels LLM pour personnaliser ces valeurs en fonction de votre scénario spécifique, puis revient aux valeurs par défaut basées sur des règles si le LLM échoue.
Suivi des actions en temps réel
L'exécuteur de simulation (backend/app/services/simulation_runner.py) surveille l'activité des agents en diffusant les journaux JSONL :
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
Cela s'exécute dans un thread d'arrière-plan, mettant à jour l'état de la simulation toutes les 2 secondes. Le frontend interroge cet état pour afficher la progression en temps réel.
Gestion des processus multiplateforme
L'arrêt des simulations nécessite une gestion rigoureuse des processus sur Windows et Unix :
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Le gestionnaire de nettoyage enregistre les gestionnaires de signaux pour SIGINT, SIGTERM et SIGHUP :
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Cela garantit que les simulations s'arrêtent correctement lorsque le serveur s'éteint.
Génération de rapports : Récupération à trois niveaux
Le service d'outils Zep (backend/app/services/zep_tools.py) fournit trois fonctions de récupération :
InsightForge (Analyse approfondie)
Décompose les questions complexes en sous-requêtes, recherche chacune, puis les agrège :
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Portée complète)
Récupère tout, y compris les faits historiques expirés/invalides :
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Temps réel)
Appelle l'API d'entretien OASIS réelle pour parler aux agents actifs :
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Décisions d'ingénierie clés
1. Gestion des tâches asynchrones
Les opérations de longue durée (construction de graphes, exécution de simulations) utilisent des tâches asynchrones avec suivi de progression :
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
Le frontend interroge le statut de la tâche via /api/graph/task/{task_id}.
2. Appels LLM par lots avec réessai
La génération de configuration divise les grandes listes d'agents en lots de 15 :
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Chaque lot inclut une logique de réparation JSON pour les sorties tronquées :
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Simulation parallèle multiplateforme
Twitter et Reddit fonctionnent en parallèle avec des bases de données et des journaux d'actions séparés :
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
L'exécuteur détecte l'achèvement par plateforme via les événements simulation_end.
Considérations de performance
Gestion de la mémoire
- Les grands documents sont tronqués à 50 000 caractères pour le contexte LLM
- Les résumés d'entités sont limités à 300 caractères chacun
- Les actions récentes sont plafonnées à 50 en mémoire (historique complet dans les fichiers JSONL)
Isolation de la base de données
Chaque plateforme utilise sa propre base de données SQLite pour éviter les conflits de verrouillage lors des écritures parallèles.
Dégradation gracieuse
Lorsque l'API de recherche Zep échoue, le système revient à la correspondance de mots-clés locale :
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusion
MiroFish démontre comment construire un système complet de simulation multi-agents à partir de zéro. Le flux de travail en cinq étapes transforme des documents bruts en mondes numériques vivants où des milliers d'agents interagissent selon des modèles de comportement réalistes.
Points clés à retenir :
- La conception de l'ontologie est importante : La structure à deux niveaux (8 types spécifiques + 2 types de repli) assure une couverture sans dépasser les limites de l'API.
- Les flux de travail asynchrones permettent des opérations longues : Le suivi des tâches avec des mises à jour de progression tient les utilisateurs informés pendant les opérations de plusieurs minutes.
- L'activité basée sur le temps crée du réalisme : Les modèles de fuseau horaire chinois et les horaires spécifiques aux types d'agents produisent un comportement crédible.
- La simulation multiplateforme offre une comparaison : L'exécution de Twitter et Reddit en parallèle montre comment la dynamique des plateformes affecte les résultats.
- La récupération à trois niveaux répond à différents besoins : InsightForge pour la profondeur, PanoramaSearch pour l'étendue, InterviewAgents pour les perspectives directes.
Le code source complet est disponible sur github.com/666ghj/MiroFish.
Vous voulez essayer MiroFish ? Visitez la démo en direct pour voir une simulation d'événement hotspot en action.
