Introdução
As redes sociais se movem rapidamente. Uma única publicação pode desencadear cascatas de reações, reformulações e contra-movimentos que ninguém previu. E se você pudesse ver como um cenário se desenrola antes que aconteça no mundo real?
MiroFish faz exatamente isso. É um motor de inteligência de enxame que cria mundos paralelos digitais onde milhares de agentes de IA com personalidades, memórias e padrões de comportamento distintos interagem livremente. Você faz upload de material-base — um artigo de notícia, um rascunho de política, até mesmo um romance — e o MiroFish constrói uma simulação de alta fidelidade de como os eventos podem se desenrolar.
Esta publicação detalha a arquitetura técnica por trás do MiroFish. Você aprenderá como o sistema transforma documentos brutos em simulações vivas, como os agentes tomam decisões e como o fluxo de trabalho de cinco etapas orquestra tudo, desde a construção do grafo de conhecimento até o monitoramento em tempo real.

Visão Geral do Sistema: O Fluxo de Trabalho de Cinco Etapas
MiroFish processa simulações através de cinco fases distintas:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Etapa 1 │ ──► │ Etapa 2 │ ──► │ Etapa 3 │ ──► │ Etapa 4 │ ──► │ Etapa 5 │
│ Geração de │ │ Construção │ │ Configuração│ │ Execução │ │ Geração de │
│ Ontologia │ │ de GraphRAG │ │ do Ambiente │ │ da Simulação│ │ Relatório │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Etapa 1: Geração de Ontologia
O sistema analisa seus documentos de entrada e requisitos de simulação, então usa um LLM para gerar uma ontologia personalizada. Isso define:
- 10 tipos de entidade (ex: Estudante, Professor, Universidade, Meio de Comunicação, Agência Governamental)
- 10 tipos de relacionamento (ex: TRABALHA_PARA, COMENTA_SOBRE, RESPONDE_A)
- Atributos para cada tipo (evitando palavras reservadas como
name,uuid,created_at)
O gerador de ontologia impõe uma estrutura de dois níveis: 8 tipos específicos baseados no seu conteúdo, mais 2 tipos de fallback (Pessoa e Organização) para abranger tudo o que não se encaixa em outro lugar.
Etapa 2: Construção de GraphRAG
Os documentos são divididos em "chunks" (500 caracteres, 50 de sobreposição) e enviados para a Zep Cloud em lotes. O sistema:
- Cria um grafo autônomo com um ID único
- Define a ontologia personalizada
- Envia lotes de texto para extração de entidades e relacionamentos
- Aguarda a Zep processar cada episódio
- Recupera o grafo final com nós e arestas
Etapa 3: Configuração do Ambiente
O gerador de configuração de simulação analisa o grafo de conhecimento e cria parâmetros detalhados para os agentes:
- Configuração de tempo baseada em padrões de fuso horário chinês (horas de pico 19-22, horas mortas 0-5)
- Configuração de eventos com publicações iniciais e tópicos quentes
- Configurações de atividade dos agentes (publicações por hora, atrasos de resposta, pesos de influência)
- Configurações de plataforma para Twitter e Reddit com diferentes limites de viralidade
Etapa 4: Execução da Simulação
Os agentes acordam de acordo com seus cronogramas de atividade e começam a publicar, comentar e reagir. O sistema executa simulações paralelas no Twitter e Reddit, registrando cada ação em arquivos JSONL em tempo real.
Etapa 5: Geração de Relatórios
O Agente de Relatórios usa três ferramentas principais de recuperação para analisar o que aconteceu:
- InsightForge: Busca aprofundada que decompõe perguntas em subconsultas
- PanoramaSearch: Visão de escopo total incluindo fatos históricos expirados/inválidos
- InterviewAgents: Entrevistas em tempo real com agentes ativos via IPC
Aprofundamento Técnico: Geração de Ontologia
O gerador de ontologia reside em backend/app/services/ontology_generator.py. Ele usa um prompt de sistema cuidadosamente elaborado que impõe regras rigorosas.
O prompt do sistema inclui orientações extensas sobre o que conta como uma entidade válida (pessoas, organizações, meios de comunicação) versus o que não conta (conceitos abstratos, temas, pontos de vista). Esta distinção é importante porque a simulação precisa de agentes que possam realmente falar e agir nas redes sociais.
Após o LLM gerar a ontologia, o método _validate_and_process impõe restrições:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Esta camada de validação garante que a saída sempre funcione com os limites da API da Zep, mantendo a estrutura de dois níveis.
Construção do Grafo de Conhecimento: Integração Zep
O serviço de construção de grafo (backend/app/services/graph_builder.py) lida com o fluxo de trabalho assíncrono:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Geração Dinâmica de Modelos Pydantic
Uma parte inteligente: o sistema cria dinamicamente modelos Pydantic para cada tipo de entidade em tempo de execução:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Isso permite que a Zep valide atributos de entidade contra o esquema personalizado sem a necessidade de modelos pré-definidos.
Paginação de Grafos Grandes
A Zep retorna resultados paginados. O utilitário zep_paging.py busca tudo:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Simulação de Atividade de Agentes Baseada em Tempo
O gerador de configuração de simulação (backend/app/services/simulation_config_generator.py) cria padrões de atividade realistas baseados no comportamento do fuso horário chinês:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Diferentes tipos de agentes recebem diferentes padrões:
| Tipo de Agente | Nível de Atividade | Horas Ativas | Atraso de Resposta | Influência |
|---|---|---|---|---|
| Universidade | 0.2 | 9-17 | 60-240 min | 3.0 |
| Meio de Comunicação | 0.5 | 7-23 | 5-30 min | 2.5 |
| Estudante | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professor | 0.4 | 8-21 | 15-90 min | 2.0 |
O gerador de configuração usa chamadas de LLM para personalizar esses valores com base no seu cenário específico, e então recorre a padrões baseados em regras se o LLM falhar.
Rastreamento de Ações em Tempo Real
O executor da simulação (backend/app/services/simulation_runner.py) monitora a atividade dos agentes transmitindo logs JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
Isso é executado em um thread em segundo plano, atualizando o estado da simulação a cada 2 segundos. O frontend consulta esse estado para mostrar o progresso em tempo real.
Gerenciamento de Processos Multiplataforma
Parar simulações requer um gerenciamento cuidadoso de processos entre Windows e Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
O manipulador de limpeza registra manipuladores de sinal para SIGINT, SIGTERM e SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Isso garante que as simulações parem graciosamente quando o servidor for desligado.
Geração de Relatórios: Recuperação de Três Níveis
O serviço de ferramentas Zep (backend/app/services/zep_tools.py) oferece três funções de recuperação:
InsightForge (Análise Aprofundada)
Decompõe perguntas complexas em subconsultas, pesquisa cada uma e depois agrega:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Escopo Total)
Recupera tudo, incluindo fatos históricos expirados/inválidos:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Tempo Real)
Chama a API de entrevista OASIS real para conversar com agentes ativos:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Principais Decisões de Engenharia
1. Gerenciamento de Tarefas Assíncronas
Operações de longa duração (construção de grafo, execução de simulação) usam tarefas assíncronas com rastreamento de progresso:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
O frontend consulta o status da tarefa via /api/graph/task/{task_id}.
2. Chamadas em Lote de LLM com Tentativa
A geração de configuração divide grandes listas de agentes em lotes de 15:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Cada lote inclui lógica de reparo de JSON para saídas truncadas:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Simulação Paralela em Duas Plataformas
Twitter e Reddit são executados em paralelo com bancos de dados e logs de ações separados:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
O executor detecta a conclusão por plataforma via eventos simulation_end.
Considerações de Desempenho
Gerenciamento de Memória
- Documentos grandes são truncados para 50 mil caracteres para o contexto do LLM
- Resumos de entidades limitados a 300 caracteres cada
- Ações recentes limitadas a 50 na memória (histórico completo em arquivos JSONL)
Isolamento de Banco de Dados
Cada plataforma usa seu próprio banco de dados SQLite para evitar contenção de bloqueio durante escritas paralelas.
Degradação Graciosa
Quando a API de Busca da Zep falha, o sistema recorre à correspondência local de palavras-chave:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusão
MiroFish demonstra como construir um sistema completo de simulação multiagente do zero. O fluxo de trabalho de cinco etapas transforma documentos brutos em mundos digitais vivos onde milhares de agentes interagem de acordo com padrões de comportamento realistas.
Principais aprendizados:
- O design da ontologia é importante: A estrutura de dois níveis (8 tipos específicos + 2 de fallback) garante cobertura sem exceder os limites da API
- Fluxos de trabalho assíncronos permitem operações longas: O rastreamento de tarefas com atualizações de progresso mantém os usuários informados durante operações de vários minutos
- A atividade baseada em tempo cria realismo: Padrões de fuso horário chinês e cronogramas específicos de tipos de agentes produzem comportamentos críveis
- A simulação de duas plataformas oferece comparação: Executar Twitter e Reddit em paralelo mostra como a dinâmica da plataforma afeta os resultados
- A recuperação de três níveis atende a diferentes necessidades: InsightForge para profundidade, PanoramaSearch para amplitude, InterviewAgents para perspectivas diretas
O código-fonte completo está disponível em github.com/666ghj/MiroFish.
Quer experimentar o MiroFish? Visite a demonstração ao vivo para ver uma simulação de evento de ponto de acesso em ação.
