Introducción
Las redes sociales se mueven rápido. Una sola publicación puede desencadenar cascadas de reacciones, remodelaciones y contramovimientos que nadie predijo. ¿Qué pasaría si pudieras ver cómo se desarrolla un escenario antes de que suceda en el mundo real?
MiroFish hace exactamente eso. Es un motor de inteligencia de enjambre que crea mundos paralelos digitales donde miles de agentes de IA con personalidades, recuerdos y patrones de comportamiento distintos interactúan libremente. Subes material de origen —un artículo de noticias, un borrador de política, incluso una novela— y MiroFish construye una simulación de alta fidelidad de cómo podrían desarrollarse los eventos.
Esta publicación desglosa la arquitectura técnica detrás de MiroFish. Aprenderás cómo el sistema transforma documentos brutos en simulaciones vivas, cómo los agentes toman decisiones y cómo el flujo de trabajo de cinco pasos orquesta todo, desde la construcción del grafo de conocimiento hasta la monitorización en tiempo real.

Visión General del Sistema: El Flujo de Trabajo de Cinco Pasos
MiroFish procesa simulaciones a través de cinco fases distintas:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Paso 1 │ ──► │ Paso 2 │ ──► │ Paso 3 │ ──► │ Paso 4 │ ──► │ Paso 5 │
│ Generación │ │ Construcción│ │ Config. del │ │ Ejecución de│ │ Generación │
│ de Ontología│ │ de GraphRAG│ │ Entorno │ │ Simulación │ │ de Informe │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Paso 1: Generación de Ontología
El sistema analiza sus documentos de entrada y requisitos de simulación, luego utiliza un LLM para generar una ontología personalizada. Esto define:
- 10 tipos de entidades (por ejemplo, Estudiante, Profesor, Universidad, Medio de Comunicación, Agencia Gubernamental)
- 10 tipos de relaciones (por ejemplo, TRABAJA_PARA, COMENTA_SOBRE, RESPONDE_A)
- Atributos para cada tipo (evitando palabras reservadas como
name,uuid,created_at)
El generador de ontología aplica una estructura de dos niveles: 8 tipos específicos basados en su contenido, más 2 tipos de respaldo (Persona y Organización) para capturar cualquier cosa que no encaje en otro lugar.
Paso 2: Construcción de GraphRAG
Los documentos se dividen en fragmentos (500 caracteres, 50 de superposición) y se envían a Zep Cloud en lotes. El sistema:
- Crea un grafo independiente con un ID único
- Establece la ontología personalizada
- Envía lotes de texto para la extracción de entidades y relaciones
- Espera a que Zep procese cada episodio
- Recupera el grafo final con nodos y aristas
Paso 3: Configuración del Entorno
El generador de configuración de simulación analiza el grafo de conocimiento y crea parámetros detallados para los agentes:
- Configuración de tiempo basada en patrones de zona horaria china (horas pico 19-22, horas muertas 0-5)
- Configuración de eventos con publicaciones iniciales y temas candentes
- Configuraciones de actividad del agente (publicaciones por hora, retrasos en la respuesta, pesos de influencia)
- Configuraciones de plataforma para Twitter y Reddit con diferentes umbrales virales
Paso 4: Ejecución de la Simulación
Los agentes se activan según sus horarios de actividad y comienzan a publicar, comentar y reaccionar. El sistema ejecuta simulaciones paralelas en Twitter y Reddit, registrando cada acción en archivos JSONL en tiempo real.
Paso 5: Generación de Informes
El Agente de Informes utiliza tres herramientas de recuperación principales para analizar lo que sucedió:
- InsightForge: Búsqueda profunda que descompone las preguntas en subconsultas
- PanoramaSearch: Vista de alcance completo que incluye hechos históricos caducados/inválidos
- InterviewAgents: Entrevistas en tiempo real con agentes activos a través de IPC
Análisis Técnico Profundo: Generación de Ontología
El generador de ontología se encuentra en backend/app/services/ontology_generator.py. Utiliza un "system prompt" cuidadosamente elaborado que aplica reglas estrictas.
El "system prompt" incluye una guía exhaustiva sobre lo que cuenta como una entidad válida (personas, organizaciones, medios de comunicación) frente a lo que no (conceptos abstractos, temas, puntos de vista). Esta distinción es importante porque la simulación necesita agentes que puedan realmente hablar y actuar en las redes sociales.
Después de que el LLM genera la ontología, el método _validate_and_process aplica restricciones:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Límites de la API de Zep: máximo 10 tipos de entidades, máximo 10 tipos de aristas
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Asegurarse de que existan los tipos de respaldo
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Recortar si añadir los tipos de respaldo excedería el límite
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Esta capa de validación asegura que la salida siempre funcione con los límites de la API de Zep mientras mantiene la estructura de dos niveles.
Construcción del Grafo de Conocimiento: Integración de Zep
El servicio de construcción de grafos (backend/app/services/graph_builder.py) maneja el flujo de trabajo asíncrono:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Crear grafo
graph_id = self.create_graph(graph_name)
# 2. Establecer ontología
self.set_ontology(graph_id, ontology)
# 3. Dividir texto en fragmentos
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Enviar lotes
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Esperar procesamiento de Zep
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Recuperar grafo final
graph_info = self._get_graph_info(graph_id)
Generación Dinámica de Modelos Pydantic
Una pieza ingeniosa: el sistema crea dinámicamente modelos Pydantic para cada tipo de entidad en tiempo de ejecución:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Esto permite a Zep validar los atributos de las entidades contra el esquema personalizado sin requerir modelos predefinidos.
Paginación a Través de Grafos Grandes
Zep devuelve resultados paginados. La utilidad zep_paging.py recupera todo:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Simulación de Actividad de Agentes Basada en el Tiempo
El generador de configuración de simulación (backend/app/services/simulation_config_generator.py) crea patrones de actividad realistas basados en el comportamiento de la zona horaria china:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # Horas de poca actividad (casi nadie)
"morning_hours": [6, 7, 8], # Horas de la mañana (gradualmente activo)
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # Horas pico de la tarde
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Diferentes tipos de agentes obtienen diferentes patrones:
| Tipo de Agente | Nivel de Actividad | Horas Activas | Retraso de Respuesta | Influencia |
|---|---|---|---|---|
| Universidad | 0.2 | 9-17 | 60-240 min | 3.0 |
| Medio de Comunicación | 0.5 | 7-23 | 5-30 min | 2.5 |
| Estudiante | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Profesor | 0.4 | 8-21 | 15-90 min | 2.0 |
El generador de configuración utiliza llamadas a LLM para personalizar estos valores basándose en su escenario específico, luego recurre a los valores predeterminados basados en reglas si el LLM falla.
Seguimiento de Acciones en Tiempo Real
El ejecutor de simulación (backend/app/services/simulation_runner.py) monitoriza la actividad de los agentes mediante la transmisión de logs JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Manejar eventos
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # o reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parsear acciones de agente
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
Esto se ejecuta en un hilo en segundo plano, actualizando el estado de la simulación cada 2 segundos. El frontend consulta este estado para mostrar el progreso en tiempo real.
Gestión de Procesos Multiplataforma
Detener las simulaciones requiere una gestión de procesos cuidadosa en Windows y Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: usar taskkill para detener el árbol de procesos
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: detener grupo de procesos (creado con start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
El manejador de limpieza registra manejadores de señales para SIGINT, SIGTERM y SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Luego llamar al manejador original
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Esto asegura que las simulaciones se detengan de forma segura cuando el servidor se apaga.
Generación de Informes: Recuperación de Tres Niveles
El servicio de herramientas de Zep (backend/app/services/zep_tools.py) proporciona tres funciones de recuperación:
InsightForge (Análisis Profundo)
Descompone preguntas complejas en subconsultas, busca en cada una y luego agrega:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generar subconsultas usando LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Buscar por cada subconsulta
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extraer UUIDs de entidad de los bordes
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Obtener información detallada de la entidad
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Construir cadenas de relaciones
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Alcance Completo)
Recupera todo, incluidos los hechos históricos caducados/inválidos:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Tiempo Real)
Llama a la API de entrevista real de OASIS para hablar con agentes activos:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Cargar perfiles de agente desde CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Usar LLM para seleccionar agentes relevantes
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generar preguntas de entrevista
questions = self._generate_interview_questions(...)
# 4. Llamar a la API de entrevista real (doble plataforma)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Entrevistar a ambos Twitter y Reddit
timeout=180.0
)
# 5. Parsear y formatear resultados
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Decisiones Clave de Ingeniería
1. Gestión de Tareas Asíncronas
Las operaciones de larga duración (construcción de grafos, ejecución de simulaciones) utilizan tareas asíncronas con seguimiento de progreso:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
El frontend consulta el estado de la tarea a través de /api/graph/task/{task_id}.
2. Llamadas a LLM en Lotes con Reintento
La generación de configuración divide grandes listas de agentes en lotes de 15:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Cada lote incluye lógica de reparación de JSON para salidas truncadas:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Simulación Paralela en Plataforma Doble
Twitter y Reddit se ejecutan en paralelo con bases de datos y registros de acciones separados:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
El ejecutor detecta la finalización por plataforma a través de eventos simulation_end.
Consideraciones de Rendimiento
Gestión de Memoria
- Los documentos grandes se truncan a 50k caracteres para el contexto del LLM
- Los resúmenes de entidades se limitan a 300 caracteres cada uno
- Las acciones recientes se limitan a 50 en memoria (historial completo en archivos JSONL)
Aislamiento de la Base de Datos
Cada plataforma utiliza su propia base de datos SQLite para evitar la contención de bloqueos durante escrituras paralelas.
Degradación Elegante
Cuando la API de búsqueda de Zep falla, el sistema recurre a la coincidencia local de palabras clave:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"La API de búsqueda de Zep falló, recurriendo a la búsqueda local: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusión
MiroFish demuestra cómo construir un sistema completo de simulación multiagente desde cero. El flujo de trabajo de cinco pasos transforma documentos brutos en mundos digitales vivos donde miles de agentes interactúan de acuerdo con patrones de comportamiento realistas.
Puntos clave:
- El diseño de la ontología es importante: La estructura de dos niveles (8 tipos específicos + 2 de respaldo) asegura la cobertura sin exceder los límites de la API
- Los flujos de trabajo asíncronos permiten operaciones largas: El seguimiento de tareas con actualizaciones de progreso mantiene a los usuarios informados durante operaciones de varios minutos
- La actividad basada en el tiempo crea realismo: Los patrones de zona horaria china y los horarios específicos de cada tipo de agente producen un comportamiento creíble
- La simulación de doble plataforma proporciona comparación: Ejecutar Twitter y Reddit en paralelo muestra cómo la dinámica de la plataforma afecta los resultados
- La recuperación de tres niveles satisface diferentes necesidades: InsightForge para profundidad, PanoramaSearch para amplitud, InterviewAgents para perspectivas directas
El código fuente completo está disponible en github.com/666ghj/MiroFish.
¿Quieres probar MiroFish? Visita la demostración en vivo para ver una simulación de evento de punto caliente en acción.
