Como a MiroFish Cria Mundos Digitais Paralelos

Ashley Innocent

Ashley Innocent

19 março 2026

Como a MiroFish Cria Mundos Digitais Paralelos

Introdução

As redes sociais se movem rapidamente. Uma única publicação pode desencadear cascatas de reações, reformulações e contra-movimentos que ninguém previu. E se você pudesse ver como um cenário se desenrola antes que aconteça no mundo real?

MiroFish faz exatamente isso. É um motor de inteligência de enxame que cria mundos paralelos digitais onde milhares de agentes de IA com personalidades, memórias e padrões de comportamento distintos interagem livremente. Você faz upload de material-base — um artigo de notícia, um rascunho de política, até mesmo um romance — e o MiroFish constrói uma simulação de alta fidelidade de como os eventos podem se desenrolar.

💡
Construir o MiroFish exigiu uma base confiável para testes de API. A equipe usou o Apidog para projetar, depurar e documentar todas as APIs de backend antes de escrever a lógica de simulação. Isso identificou problemas de endpoint precocemente e manteve o backend Python e o frontend Vue em sincronia durante todo o desenvolvimento.
botão

Esta publicação detalha a arquitetura técnica por trás do MiroFish. Você aprenderá como o sistema transforma documentos brutos em simulações vivas, como os agentes tomam decisões e como o fluxo de trabalho de cinco etapas orquestra tudo, desde a construção do grafo de conhecimento até o monitoramento em tempo real.

Visão Geral do Sistema: O Fluxo de Trabalho de Cinco Etapas

MiroFish processa simulações através de cinco fases distintas:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Etapa 1   │ ──► │   Etapa 2   │ ──► │   Etapa 3   │ ──► │   Etapa 4   │ ──► │   Etapa 5   │
│  Geração de │     │ Construção  │     │ Configuração│     │  Execução   │     │  Geração de │
│  Ontologia  │     │ de GraphRAG │     │ do Ambiente │     │ da Simulação│     │   Relatório │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

Etapa 1: Geração de Ontologia

O sistema analisa seus documentos de entrada e requisitos de simulação, então usa um LLM para gerar uma ontologia personalizada. Isso define:

O gerador de ontologia impõe uma estrutura de dois níveis: 8 tipos específicos baseados no seu conteúdo, mais 2 tipos de fallback (Pessoa e Organização) para abranger tudo o que não se encaixa em outro lugar.

Etapa 2: Construção de GraphRAG

Os documentos são divididos em "chunks" (500 caracteres, 50 de sobreposição) e enviados para a Zep Cloud em lotes. O sistema:

  1. Cria um grafo autônomo com um ID único
  2. Define a ontologia personalizada
  3. Envia lotes de texto para extração de entidades e relacionamentos
  4. Aguarda a Zep processar cada episódio
  5. Recupera o grafo final com nós e arestas

Etapa 3: Configuração do Ambiente

O gerador de configuração de simulação analisa o grafo de conhecimento e cria parâmetros detalhados para os agentes:

Etapa 4: Execução da Simulação

Os agentes acordam de acordo com seus cronogramas de atividade e começam a publicar, comentar e reagir. O sistema executa simulações paralelas no Twitter e Reddit, registrando cada ação em arquivos JSONL em tempo real.

Etapa 5: Geração de Relatórios

O Agente de Relatórios usa três ferramentas principais de recuperação para analisar o que aconteceu:

Aprofundamento Técnico: Geração de Ontologia

O gerador de ontologia reside em backend/app/services/ontology_generator.py. Ele usa um prompt de sistema cuidadosamente elaborado que impõe regras rigorosas.

O prompt do sistema inclui orientações extensas sobre o que conta como uma entidade válida (pessoas, organizações, meios de comunicação) versus o que não conta (conceitos abstratos, temas, pontos de vista). Esta distinção é importante porque a simulação precisa de agentes que possam realmente falar e agir nas redes sociais.

Após o LLM gerar a ontologia, o método _validate_and_process impõe restrições:

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

Esta camada de validação garante que a saída sempre funcione com os limites da API da Zep, mantendo a estrutura de dois níveis.

Construção do Grafo de Conhecimento: Integração Zep

O serviço de construção de grafo (backend/app/services/graph_builder.py) lida com o fluxo de trabalho assíncrono:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)

Geração Dinâmica de Modelos Pydantic

Uma parte inteligente: o sistema cria dinamicamente modelos Pydantic para cada tipo de entidade em tempo de execução:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

Isso permite que a Zep valide atributos de entidade contra o esquema personalizado sem a necessidade de modelos pré-definidos.

Paginação de Grafos Grandes

A Zep retorna resultados paginados. O utilitário zep_paging.py busca tudo:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

Simulação de Atividade de Agentes Baseada em Tempo

O gerador de configuração de simulação (backend/app/services/simulation_config_generator.py) cria padrões de atividade realistas baseados no comportamento do fuso horário chinês:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

Diferentes tipos de agentes recebem diferentes padrões:

Tipo de Agente Nível de Atividade Horas Ativas Atraso de Resposta Influência
Universidade 0.2 9-17 60-240 min 3.0
Meio de Comunicação 0.5 7-23 5-30 min 2.5
Estudante 0.8 8-12, 18-23 1-15 min 0.8
Professor 0.4 8-21 15-90 min 2.0

O gerador de configuração usa chamadas de LLM para personalizar esses valores com base no seu cenário específico, e então recorre a padrões baseados em regras se o LLM falhar.

Rastreamento de Ações em Tempo Real

O executor da simulação (backend/app/services/simulation_runner.py) monitora a atividade dos agentes transmitindo logs JSONL:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

Isso é executado em um thread em segundo plano, atualizando o estado da simulação a cada 2 segundos. O frontend consulta esse estado para mostrar o progresso em tempo real.

Gerenciamento de Processos Multiplataforma

Parar simulações requer um gerenciamento cuidadoso de processos entre Windows e Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

O manipulador de limpeza registra manipuladores de sinal para SIGINT, SIGTERM e SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

Isso garante que as simulações parem graciosamente quando o servidor for desligado.

Geração de Relatórios: Recuperação de Três Níveis

O serviço de ferramentas Zep (backend/app/services/zep_tools.py) oferece três funções de recuperação:

InsightForge (Análise Aprofundada)

Decompõe perguntas complexas em subconsultas, pesquisa cada uma e depois agrega:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (Escopo Total)

Recupera tudo, incluindo fatos históricos expirados/inválidos:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (Tempo Real)

Chama a API de entrevista OASIS real para conversar com agentes ativos:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

Principais Decisões de Engenharia

1. Gerenciamento de Tarefas Assíncronas

Operações de longa duração (construção de grafo, execução de simulação) usam tarefas assíncronas com rastreamento de progresso:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

O frontend consulta o status da tarefa via /api/graph/task/{task_id}.

2. Chamadas em Lote de LLM com Tentativa

A geração de configuração divide grandes listas de agentes em lotes de 15:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

Cada lote inclui lógica de reparo de JSON para saídas truncadas:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. Simulação Paralela em Duas Plataformas

Twitter e Reddit são executados em paralelo com bancos de dados e logs de ações separados:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

O executor detecta a conclusão por plataforma via eventos simulation_end.

Considerações de Desempenho

Gerenciamento de Memória

Isolamento de Banco de Dados

Cada plataforma usa seu próprio banco de dados SQLite para evitar contenção de bloqueio durante escritas paralelas.

Degradação Graciosa

Quando a API de Busca da Zep falha, o sistema recorre à correspondência local de palavras-chave:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)

Conclusão

MiroFish demonstra como construir um sistema completo de simulação multiagente do zero. O fluxo de trabalho de cinco etapas transforma documentos brutos em mundos digitais vivos onde milhares de agentes interagem de acordo com padrões de comportamento realistas.

Principais aprendizados:

  1. O design da ontologia é importante: A estrutura de dois níveis (8 tipos específicos + 2 de fallback) garante cobertura sem exceder os limites da API
  2. Fluxos de trabalho assíncronos permitem operações longas: O rastreamento de tarefas com atualizações de progresso mantém os usuários informados durante operações de vários minutos
  3. A atividade baseada em tempo cria realismo: Padrões de fuso horário chinês e cronogramas específicos de tipos de agentes produzem comportamentos críveis
  4. A simulação de duas plataformas oferece comparação: Executar Twitter e Reddit em paralelo mostra como a dinâmica da plataforma afeta os resultados
  5. A recuperação de três níveis atende a diferentes necessidades: InsightForge para profundidade, PanoramaSearch para amplitude, InterviewAgents para perspectivas diretas

O código-fonte completo está disponível em github.com/666ghj/MiroFish.

botão

Quer experimentar o MiroFish? Visite a demonstração ao vivo para ver uma simulação de evento de ponto de acesso em ação.

Pratique o design de API no Apidog

Descubra uma forma mais fácil de construir e usar APIs