소개
소셜 미디어는 빠르게 움직입니다. 단 하나의 게시물이 누구도 예측하지 못한 반응, 재구성, 반대 운동의 연쇄를 촉발할 수 있습니다. 현실에서 시나리오가 전개되기 전에 어떻게 진행될지 미리 볼 수 있다면 어떨까요?
MiroFish는 바로 그런 일을 합니다. MiroFish는 독자적인 성격, 기억, 행동 패턴을 가진 수천 개의 AI 에이전트가 자유롭게 상호 작용하는 디지털 병렬 세계를 생성하는 스웜 인텔리전스 엔진입니다. 뉴스 기사, 정책 초안, 심지어 소설과 같은 시드 자료를 업로드하면 MiroFish가 이벤트가 어떻게 전개될지 고성능 시뮬레이션을 구축합니다.
이 게시물은 MiroFish의 기술 아키텍처를 분석합니다. 이 시스템이 원본 문서를 살아있는 시뮬레이션으로 변환하는 방법, 에이전트가 결정을 내리는 방법, 그리고 5단계 워크플로가 지식 그래프 구축부터 실시간 모니터링까지 모든 것을 어떻게 조율하는지 배우게 될 것입니다.

시스템 개요: 5단계 워크플로
MiroFish는 5가지 개별 단계를 통해 시뮬레이션을 처리합니다.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
1단계: 온톨로지 생성
시스템은 입력 문서와 시뮬레이션 요구 사항을 분석한 다음 LLM을 사용하여 사용자 지정 온톨로지를 생성합니다. 이는 다음을 정의합니다.
- 10가지 엔티티 유형 (예: 학생, 교수, 대학, 미디어 매체, 정부 기관)
- 10가지 관계 유형 (예: ~에 재직, ~에 댓글 달기, ~에 응답하기)
- 각 유형의 속성 (`name`, `uuid`, `created_at`과 같은 예약어 제외)
온톨로지 생성기는 2단계 구조를 적용합니다. 콘텐츠를 기반으로 한 8가지 특정 유형과 다른 곳에 맞지 않는 모든 것을 포착하기 위한 2가지 대체 유형(사람 및 조직)입니다.
2단계: GraphRAG 구축
문서는 청크로 분할(500자, 50 중복)되어 Zep Cloud로 일괄 전송됩니다. 시스템은 다음을 수행합니다.
- 고유 ID를 가진 독립형 그래프 생성
- 사용자 지정 온톨로지 설정
- 엔티티 및 관계 추출을 위한 텍스트 일괄 전송
- Zep이 각 에피소드를 처리할 때까지 대기
- 노드와 엣지가 포함된 최종 그래프 검색
3단계: 환경 설정
시뮬레이션 구성 생성기는 지식 그래프를 분석하고 상세한 에이전트 매개변수를 생성합니다.
- 중국 시간대 패턴 기반의 시간 구성 (피크 시간 19-22시, 비활동 시간 0-5시)
- 초기 게시물 및 인기 주제를 포함한 이벤트 구성
- 에이전트 활동 구성 (시간당 게시물 수, 응답 지연, 영향력 가중치)
- 다양한 바이럴 임계값을 가진 Twitter 및 Reddit용 플랫폼 구성
4단계: 시뮬레이션 실행
에이전트는 활동 일정에 따라 활성화되어 게시, 댓글 달기, 반응하기를 시작합니다. 시스템은 Twitter와 Reddit에서 병렬 시뮬레이션을 실행하며, 모든 작업을 JSONL 파일에 실시간으로 로깅합니다.
5단계: 보고서 생성
보고서 에이전트는 발생한 일을 분석하기 위해 세 가지 핵심 검색 도구를 사용합니다.
- InsightForge: 질문을 하위 쿼리로 분해하는 심층 검색
- PanoramaSearch: 만료/유효하지 않은 과거 사실을 포함한 전체 범위 보기
- InterviewAgents: IPC를 통한 활성 에이전트와의 실시간 인터뷰
기술 심층 분석: 온톨로지 생성
온톨로지 생성기는 backend/app/services/ontology_generator.py에 있습니다. 엄격한 규칙을 적용하는 신중하게 제작된 시스템 프롬프트를 사용합니다.
시스템 프롬프트는 유효한 엔티티(사람, 조직, 미디어 매체)와 그렇지 않은 것(추상 개념, 테마, 관점)에 대한 광범위한 지침을 포함합니다. 이 구분은 시뮬레이션에 실제로 소셜 미디어에서 말하고 행동할 수 있는 에이전트가 필요하기 때문에 중요합니다.
LLM이 온톨로지를 생성한 후, _validate_and_process 메서드는 제약 조건을 적용합니다.
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
이 유효성 검사 계층은 2단계 구조를 유지하면서 Zep의 API 제한 내에서 항상 출력이 작동하도록 보장합니다.
지식 그래프 구축: Zep 통합
그래프 빌더 서비스(`backend/app/services/graph_builder.py`)는 비동기 워크플로를 처리합니다.
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
동적 Pydantic 모델 생성
한 가지 영리한 부분은 시스템이 런타임에 각 엔티티 유형에 대한 Pydantic 모델을 동적으로 생성한다는 것입니다.
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
이를 통해 Zep은 사전 정의된 모델 없이도 사용자 지정 스키마에 대해 엔티티 속성을 검증할 수 있습니다.
대규모 그래프 페이지 매김
Zep은 페이지로 구분된 결과를 반환합니다. zep_paging.py 유틸리티는 모든 것을 가져옵니다.
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
시간 기반 에이전트 활동 시뮬레이션
시뮬레이션 구성 생성기(`backend/app/services/simulation_config_generator.py`)는 중국 시간대 행동을 기반으로 현실적인 활동 패턴을 생성합니다.
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 새벽에는 거의 사람이 없음
"morning_hours": [6, 7, 8], # 아침에는 점차 활발해짐
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 저녁 피크 시간
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
다른 에이전트 유형은 다른 패턴을 가집니다.
| 에이전트 유형 | 활동 수준 | 활동 시간 | 응답 지연 | 영향력 |
|---|---|---|---|---|
| 대학 | 0.2 | 9-17 | 60-240분 | 3.0 |
| 미디어 매체 | 0.5 | 7-23 | 5-30분 | 2.5 |
| 학생 | 0.8 | 8-12, 18-23 | 1-15분 | 0.8 |
| 교수 | 0.4 | 8-21 | 15-90분 | 2.0 |
구성 생성기는 LLM 호출을 사용하여 특정 시나리오에 따라 이러한 값을 사용자 지정하며, LLM이 실패하면 규칙 기반 기본값으로 돌아갑니다.
실시간 활동 추적
시뮬레이션 실행기(`backend/app/services/simulation_runner.py`)는 JSONL 로그 스트리밍을 통해 에이전트 활동을 모니터링합니다.
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
이는 백그라운드 스레드에서 실행되며, 시뮬레이션 상태를 2초마다 업데이트합니다. 프런트엔드는 이 상태를 폴링하여 실시간 진행 상황을 표시합니다.
크로스 플랫폼 프로세스 관리
시뮬레이션을 중지하려면 Windows와 Unix 전반에 걸쳐 신중한 프로세스 관리가 필요합니다.
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
정리 핸들러는 SIGINT, SIGTERM 및 SIGHUP에 대한 신호 핸들러를 등록합니다.
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
이를 통해 서버가 종료될 때 시뮬레이션이 정상적으로 중지됩니다.
보고서 생성: 3단계 검색
Zep 도구 서비스(`backend/app/services/zep_tools.py`)는 세 가지 검색 기능을 제공합니다.
InsightForge (심층 분석)
복잡한 질문을 하위 쿼리로 분해하고, 각각을 검색한 다음, 집계합니다.
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (전체 범위)
만료되거나 유효하지 않은 과거 사실을 포함하여 모든 것을 검색합니다.
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (실시간)
활성 에이전트와 대화하기 위해 실제 OASIS 인터뷰 API를 호출합니다.
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
주요 엔지니어링 결정
1. 비동기 작업 관리
오래 실행되는 작업(그래프 구축, 시뮬레이션 실행)은 진행 상황 추적과 함께 비동기 작업을 사용합니다.
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
프런트엔드는 `/api/graph/task/{task_id}`를 통해 작업 상태를 폴링합니다.
2. 재시도 기능이 있는 배치 LLM 호출
구성 생성은 대규모 에이전트 목록을 15개씩의 배치로 분할합니다.
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
각 배치에는 잘린 출력에 대한 JSON 복구 로직이 포함됩니다.
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. 듀얼 플랫폼 병렬 시뮬레이션
Twitter와 Reddit은 별도의 데이터베이스와 활동 로그를 사용하여 병렬로 실행됩니다.
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
실행기는 `simulation_end` 이벤트를 통해 플랫폼별 완료를 감지합니다.
성능 고려 사항
메모리 관리
- 대용량 문서는 LLM 컨텍스트를 위해 50k자로 잘림
- 엔티티 요약은 각각 300자로 제한됨
- 최근 활동은 메모리에서 50개로 제한됨 (전체 기록은 JSONL 파일에 저장)
데이터베이스 격리
각 플랫폼은 병렬 쓰기 중 잠금 경합을 피하기 위해 자체 SQLite 데이터베이스를 사용합니다.
점진적 성능 저하
Zep 검색 API가 실패하면 시스템은 로컬 키워드 일치로 대체됩니다.
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
결론
MiroFish는 완전한 다중 에이전트 시뮬레이션 시스템을 처음부터 구축하는 방법을 보여줍니다. 5단계 워크플로는 원본 문서를 수천 명의 에이전트가 현실적인 행동 패턴에 따라 상호 작용하는 살아있는 디지털 세계로 변환합니다.
주요 요점:
- 온톨로지 설계가 중요합니다: 2단계 구조(8가지 특정 유형 + 2가지 대체 유형)는 API 제한을 초과하지 않으면서 포괄적인 적용 범위를 보장합니다.
- 비동기 워크플로는 장기 작업을 가능하게 합니다: 진행 상황 업데이트가 있는 작업 추적은 여러 분이 소요되는 작업 중에도 사용자에게 정보를 제공합니다.
- 시간 기반 활동은 현실감을 만듭니다: 중국 시간대 패턴과 에이전트 유형별 스케줄은 신뢰할 수 있는 행동을 생성합니다.
- 듀얼 플랫폼 시뮬레이션은 비교를 제공합니다: Twitter와 Reddit을 병렬로 실행하면 플랫폼 역학이 결과에 미치는 영향을 보여줍니다.
- 3단계 검색은 다양한 요구 사항을 충족합니다: InsightForge는 깊이 있는 분석을, PanoramaSearch는 광범위한 조사를, InterviewAgents는 직접적인 관점을 제공합니다.
전체 소스 코드는 github.com/666ghj/MiroFish에서 확인할 수 있습니다.
MiroFish를 사용해보고 싶으신가요? 라이브 데모를 방문하여 핫스팟 이벤트 시뮬레이션이 작동하는 것을 확인해보세요.
