Cómo MiroFish Crea Mundos Paralelos Digitales

Ashley Innocent

Ashley Innocent

19 March 2026

Cómo MiroFish Crea Mundos Paralelos Digitales

Introducción

Las redes sociales se mueven rápido. Una sola publicación puede desencadenar cascadas de reacciones, remodelaciones y contramovimientos que nadie predijo. ¿Qué pasaría si pudieras ver cómo se desarrolla un escenario antes de que suceda en el mundo real?

MiroFish hace exactamente eso. Es un motor de inteligencia de enjambre que crea mundos paralelos digitales donde miles de agentes de IA con personalidades, recuerdos y patrones de comportamiento distintos interactúan libremente. Subes material de origen —un artículo de noticias, un borrador de política, incluso una novela— y MiroFish construye una simulación de alta fidelidad de cómo podrían desarrollarse los eventos.

💡
La construcción de MiroFish requirió una base sólida para las pruebas de API. El equipo utilizó Apidog para diseñar, depurar y documentar todas las API de backend antes de escribir la lógica de simulación. Esto permitió detectar problemas en los endpoints tempranamente y mantener sincronizados el backend de Python y el frontend de Vue durante todo el desarrollo.
botón

Esta publicación desglosa la arquitectura técnica detrás de MiroFish. Aprenderás cómo el sistema transforma documentos brutos en simulaciones vivas, cómo los agentes toman decisiones y cómo el flujo de trabajo de cinco pasos orquesta todo, desde la construcción del grafo de conocimiento hasta la monitorización en tiempo real.

Diagrama de flujo de trabajo de MiroFish

Visión General del Sistema: El Flujo de Trabajo de Cinco Pasos

MiroFish procesa simulaciones a través de cinco fases distintas:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Paso 1    │ ──► │   Paso 2    │ ──► │   Paso 3    │ ──► │   Paso 4    │ ──► │   Paso 5    │
│  Generación │     │ Construcción│     │ Config. del │     │ Ejecución de│     │  Generación │
│  de Ontología│     │  de GraphRAG│     │  Entorno    │     │ Simulación  │     │ de Informe  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

Paso 1: Generación de Ontología

El sistema analiza sus documentos de entrada y requisitos de simulación, luego utiliza un LLM para generar una ontología personalizada. Esto define:

El generador de ontología aplica una estructura de dos niveles: 8 tipos específicos basados en su contenido, más 2 tipos de respaldo (Persona y Organización) para capturar cualquier cosa que no encaje en otro lugar.

Paso 2: Construcción de GraphRAG

Los documentos se dividen en fragmentos (500 caracteres, 50 de superposición) y se envían a Zep Cloud en lotes. El sistema:

  1. Crea un grafo independiente con un ID único
  2. Establece la ontología personalizada
  3. Envía lotes de texto para la extracción de entidades y relaciones
  4. Espera a que Zep procese cada episodio
  5. Recupera el grafo final con nodos y aristas

Paso 3: Configuración del Entorno

El generador de configuración de simulación analiza el grafo de conocimiento y crea parámetros detallados para los agentes:

Paso 4: Ejecución de la Simulación

Los agentes se activan según sus horarios de actividad y comienzan a publicar, comentar y reaccionar. El sistema ejecuta simulaciones paralelas en Twitter y Reddit, registrando cada acción en archivos JSONL en tiempo real.

Paso 5: Generación de Informes

El Agente de Informes utiliza tres herramientas de recuperación principales para analizar lo que sucedió:

Análisis Técnico Profundo: Generación de Ontología

El generador de ontología se encuentra en backend/app/services/ontology_generator.py. Utiliza un "system prompt" cuidadosamente elaborado que aplica reglas estrictas.

El "system prompt" incluye una guía exhaustiva sobre lo que cuenta como una entidad válida (personas, organizaciones, medios de comunicación) frente a lo que no (conceptos abstractos, temas, puntos de vista). Esta distinción es importante porque la simulación necesita agentes que puedan realmente hablar y actuar en las redes sociales.

Después de que el LLM genera la ontología, el método _validate_and_process aplica restricciones:

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Límites de la API de Zep: máximo 10 tipos de entidades, máximo 10 tipos de aristas
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Asegurarse de que existan los tipos de respaldo
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Recortar si añadir los tipos de respaldo excedería el límite
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

Esta capa de validación asegura que la salida siempre funcione con los límites de la API de Zep mientras mantiene la estructura de dos niveles.

Construcción del Grafo de Conocimiento: Integración de Zep

El servicio de construcción de grafos (backend/app/services/graph_builder.py) maneja el flujo de trabajo asíncrono:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Crear grafo
    graph_id = self.create_graph(graph_name)

    # 2. Establecer ontología
    self.set_ontology(graph_id, ontology)

    # 3. Dividir texto en fragmentos
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Enviar lotes
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Esperar procesamiento de Zep
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Recuperar grafo final
    graph_info = self._get_graph_info(graph_id)

Generación Dinámica de Modelos Pydantic

Una pieza ingeniosa: el sistema crea dinámicamente modelos Pydantic para cada tipo de entidad en tiempo de ejecución:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

Esto permite a Zep validar los atributos de las entidades contra el esquema personalizado sin requerir modelos predefinidos.

Paginación a Través de Grafos Grandes

Zep devuelve resultados paginados. La utilidad zep_paging.py recupera todo:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

Simulación de Actividad de Agentes Basada en el Tiempo

El generador de configuración de simulación (backend/app/services/simulation_config_generator.py) crea patrones de actividad realistas basados en el comportamiento de la zona horaria china:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # Horas de poca actividad (casi nadie)
    "morning_hours": [6, 7, 8],                  # Horas de la mañana (gradualmente activo)
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # Horas pico de la tarde
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

Diferentes tipos de agentes obtienen diferentes patrones:

Tipo de Agente Nivel de Actividad Horas Activas Retraso de Respuesta Influencia
Universidad 0.2 9-17 60-240 min 3.0
Medio de Comunicación 0.5 7-23 5-30 min 2.5
Estudiante 0.8 8-12, 18-23 1-15 min 0.8
Profesor 0.4 8-21 15-90 min 2.0

El generador de configuración utiliza llamadas a LLM para personalizar estos valores basándose en su escenario específico, luego recurre a los valores predeterminados basados en reglas si el LLM falla.

Seguimiento de Acciones en Tiempo Real

El ejecutor de simulación (backend/app/services/simulation_runner.py) monitoriza la actividad de los agentes mediante la transmisión de logs JSONL:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Manejar eventos
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # o reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parsear acciones de agente
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

Esto se ejecuta en un hilo en segundo plano, actualizando el estado de la simulación cada 2 segundos. El frontend consulta este estado para mostrar el progreso en tiempo real.

Gestión de Procesos Multiplataforma

Detener las simulaciones requiere una gestión de procesos cuidadosa en Windows y Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: usar taskkill para detener el árbol de procesos
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: detener grupo de procesos (creado con start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

El manejador de limpieza registra manejadores de señales para SIGINT, SIGTERM y SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Luego llamar al manejador original

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

Esto asegura que las simulaciones se detengan de forma segura cuando el servidor se apaga.

Generación de Informes: Recuperación de Tres Niveles

El servicio de herramientas de Zep (backend/app/services/zep_tools.py) proporciona tres funciones de recuperación:

InsightForge (Análisis Profundo)

Descompone preguntas complejas en subconsultas, busca en cada una y luego agrega:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generar subconsultas usando LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Buscar por cada subconsulta
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extraer UUIDs de entidad de los bordes
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Obtener información detallada de la entidad
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Construir cadenas de relaciones
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (Alcance Completo)

Recupera todo, incluidos los hechos históricos caducados/inválidos:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (Tiempo Real)

Llama a la API de entrevista real de OASIS para hablar con agentes activos:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Cargar perfiles de agente desde CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Usar LLM para seleccionar agentes relevantes
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generar preguntas de entrevista
    questions = self._generate_interview_questions(...)

    # 4. Llamar a la API de entrevista real (doble plataforma)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Entrevistar a ambos Twitter y Reddit
        timeout=180.0
    )

    # 5. Parsear y formatear resultados
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

Decisiones Clave de Ingeniería

1. Gestión de Tareas Asíncronas

Las operaciones de larga duración (construcción de grafos, ejecución de simulaciones) utilizan tareas asíncronas con seguimiento de progreso:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

El frontend consulta el estado de la tarea a través de /api/graph/task/{task_id}.

2. Llamadas a LLM en Lotes con Reintento

La generación de configuración divide grandes listas de agentes en lotes de 15:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

Cada lote incluye lógica de reparación de JSON para salidas truncadas:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. Simulación Paralela en Plataforma Doble

Twitter y Reddit se ejecutan en paralelo con bases de datos y registros de acciones separados:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

El ejecutor detecta la finalización por plataforma a través de eventos simulation_end.

Consideraciones de Rendimiento

Gestión de Memoria

Aislamiento de la Base de Datos

Cada plataforma utiliza su propia base de datos SQLite para evitar la contención de bloqueos durante escrituras paralelas.

Degradación Elegante

Cuando la API de búsqueda de Zep falla, el sistema recurre a la coincidencia local de palabras clave:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"La API de búsqueda de Zep falló, recurriendo a la búsqueda local: {e}")
    return self._local_search(graph_id, query, limit, scope)

Conclusión

MiroFish demuestra cómo construir un sistema completo de simulación multiagente desde cero. El flujo de trabajo de cinco pasos transforma documentos brutos en mundos digitales vivos donde miles de agentes interactúan de acuerdo con patrones de comportamiento realistas.

Puntos clave:

  1. El diseño de la ontología es importante: La estructura de dos niveles (8 tipos específicos + 2 de respaldo) asegura la cobertura sin exceder los límites de la API
  2. Los flujos de trabajo asíncronos permiten operaciones largas: El seguimiento de tareas con actualizaciones de progreso mantiene a los usuarios informados durante operaciones de varios minutos
  3. La actividad basada en el tiempo crea realismo: Los patrones de zona horaria china y los horarios específicos de cada tipo de agente producen un comportamiento creíble
  4. La simulación de doble plataforma proporciona comparación: Ejecutar Twitter y Reddit en paralelo muestra cómo la dinámica de la plataforma afecta los resultados
  5. La recuperación de tres niveles satisface diferentes necesidades: InsightForge para profundidad, PanoramaSearch para amplitud, InterviewAgents para perspectivas directas

El código fuente completo está disponible en github.com/666ghj/MiroFish.

botón

¿Quieres probar MiroFish? Visita la demostración en vivo para ver una simulación de evento de punto caliente en acción.

Practica el diseño de API en Apidog

Descubre una forma más fácil de construir y usar APIs