미로피쉬, 디지털 평행세계 구축 방법

Ashley Innocent

Ashley Innocent

19 March 2026

미로피쉬, 디지털 평행세계 구축 방법

소개

소셜 미디어는 빠르게 움직입니다. 단 하나의 게시물이 누구도 예측하지 못한 반응, 재구성, 반대 운동의 연쇄를 촉발할 수 있습니다. 현실에서 시나리오가 전개되기 전에 어떻게 진행될지 미리 볼 수 있다면 어떨까요?

MiroFish는 바로 그런 일을 합니다. MiroFish는 독자적인 성격, 기억, 행동 패턴을 가진 수천 개의 AI 에이전트가 자유롭게 상호 작용하는 디지털 병렬 세계를 생성하는 스웜 인텔리전스 엔진입니다. 뉴스 기사, 정책 초안, 심지어 소설과 같은 시드 자료를 업로드하면 MiroFish가 이벤트가 어떻게 전개될지 고성능 시뮬레이션을 구축합니다.

💡
MiroFish를 구축하려면 안정적인 API 테스트 기반이 필요했습니다. 팀은 시뮬레이션 로직을 작성하기 전에 모든 백엔드 API를 설계, 디버그 및 문서화하기 위해 Apidog를 사용했습니다. 이를 통해 초기 단계에서 엔드포인트 문제를 파악하고 개발 전반에 걸쳐 Python 백엔드와 Vue 프런트엔드를 동기화할 수 있었습니다.
버튼

이 게시물은 MiroFish의 기술 아키텍처를 분석합니다. 이 시스템이 원본 문서를 살아있는 시뮬레이션으로 변환하는 방법, 에이전트가 결정을 내리는 방법, 그리고 5단계 워크플로가 지식 그래프 구축부터 실시간 모니터링까지 모든 것을 어떻게 조율하는지 배우게 될 것입니다.

시스템 개요: 5단계 워크플로

MiroFish는 5가지 개별 단계를 통해 시뮬레이션을 처리합니다.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘

1단계: 온톨로지 생성

시스템은 입력 문서와 시뮬레이션 요구 사항을 분석한 다음 LLM을 사용하여 사용자 지정 온톨로지를 생성합니다. 이는 다음을 정의합니다.

온톨로지 생성기는 2단계 구조를 적용합니다. 콘텐츠를 기반으로 한 8가지 특정 유형과 다른 곳에 맞지 않는 모든 것을 포착하기 위한 2가지 대체 유형(사람 및 조직)입니다.

2단계: GraphRAG 구축

문서는 청크로 분할(500자, 50 중복)되어 Zep Cloud로 일괄 전송됩니다. 시스템은 다음을 수행합니다.

  1. 고유 ID를 가진 독립형 그래프 생성
  2. 사용자 지정 온톨로지 설정
  3. 엔티티 및 관계 추출을 위한 텍스트 일괄 전송
  4. Zep이 각 에피소드를 처리할 때까지 대기
  5. 노드와 엣지가 포함된 최종 그래프 검색

3단계: 환경 설정

시뮬레이션 구성 생성기는 지식 그래프를 분석하고 상세한 에이전트 매개변수를 생성합니다.

4단계: 시뮬레이션 실행

에이전트는 활동 일정에 따라 활성화되어 게시, 댓글 달기, 반응하기를 시작합니다. 시스템은 Twitter와 Reddit에서 병렬 시뮬레이션을 실행하며, 모든 작업을 JSONL 파일에 실시간으로 로깅합니다.

5단계: 보고서 생성

보고서 에이전트는 발생한 일을 분석하기 위해 세 가지 핵심 검색 도구를 사용합니다.

기술 심층 분석: 온톨로지 생성

온톨로지 생성기는 backend/app/services/ontology_generator.py에 있습니다. 엄격한 규칙을 적용하는 신중하게 제작된 시스템 프롬프트를 사용합니다.

시스템 프롬프트는 유효한 엔티티(사람, 조직, 미디어 매체)와 그렇지 않은 것(추상 개념, 테마, 관점)에 대한 광범위한 지침을 포함합니다. 이 구분은 시뮬레이션에 실제로 소셜 미디어에서 말하고 행동할 수 있는 에이전트가 필요하기 때문에 중요합니다.

LLM이 온톨로지를 생성한 후, _validate_and_process 메서드는 제약 조건을 적용합니다.

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result

이 유효성 검사 계층은 2단계 구조를 유지하면서 Zep의 API 제한 내에서 항상 출력이 작동하도록 보장합니다.

지식 그래프 구축: Zep 통합

그래프 빌더 서비스(`backend/app/services/graph_builder.py`)는 비동기 워크플로를 처리합니다.

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)

동적 Pydantic 모델 생성

한 가지 영리한 부분은 시스템이 런타임에 각 엔티티 유형에 대한 Pydantic 모델을 동적으로 생성한다는 것입니다.

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class

이를 통해 Zep은 사전 정의된 모델 없이도 사용자 지정 스키마에 대해 엔티티 속성을 검증할 수 있습니다.

대규모 그래프 페이지 매김

Zep은 페이지로 구분된 결과를 반환합니다. zep_paging.py 유틸리티는 모든 것을 가져옵니다.

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes

시간 기반 에이전트 활동 시뮬레이션

시뮬레이션 구성 생성기(`backend/app/services/simulation_config_generator.py`)는 중국 시간대 행동을 기반으로 현실적인 활동 패턴을 생성합니다.

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 새벽에는 거의 사람이 없음
    "morning_hours": [6, 7, 8],                  # 아침에는 점차 활발해짐
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 저녁 피크 시간
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}

다른 에이전트 유형은 다른 패턴을 가집니다.

에이전트 유형 활동 수준 활동 시간 응답 지연 영향력
대학 0.2 9-17 60-240분 3.0
미디어 매체 0.5 7-23 5-30분 2.5
학생 0.8 8-12, 18-23 1-15분 0.8
교수 0.4 8-21 15-90분 2.0

구성 생성기는 LLM 호출을 사용하여 특정 시나리오에 따라 이러한 값을 사용자 지정하며, LLM이 실패하면 규칙 기반 기본값으로 돌아갑니다.

실시간 활동 추적

시뮬레이션 실행기(`backend/app/services/simulation_runner.py`)는 JSONL 로그 스트리밍을 통해 에이전트 활동을 모니터링합니다.

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()

이는 백그라운드 스레드에서 실행되며, 시뮬레이션 상태를 2초마다 업데이트합니다. 프런트엔드는 이 상태를 폴링하여 실시간 진행 상황을 표시합니다.

크로스 플랫폼 프로세스 관리

시뮬레이션을 중지하려면 Windows와 Unix 전반에 걸쳐 신중한 프로세스 관리가 필요합니다.

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

정리 핸들러는 SIGINT, SIGTERM 및 SIGHUP에 대한 신호 핸들러를 등록합니다.

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)

이를 통해 서버가 종료될 때 시뮬레이션이 정상적으로 중지됩니다.

보고서 생성: 3단계 검색

Zep 도구 서비스(`backend/app/services/zep_tools.py`)는 세 가지 검색 기능을 제공합니다.

InsightForge (심층 분석)

복잡한 질문을 하위 쿼리로 분해하고, 각각을 검색한 다음, 집계합니다.

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)

PanoramaSearch (전체 범위)

만료되거나 유효하지 않은 과거 사실을 포함하여 모든 것을 검색합니다.

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)

InterviewAgents (실시간)

활성 에이전트와 대화하기 위해 실제 OASIS 인터뷰 API를 호출합니다.

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"

주요 엔지니어링 결정

1. 비동기 작업 관리

오래 실행되는 작업(그래프 구축, 시뮬레이션 실행)은 진행 상황 추적과 함께 비동기 작업을 사용합니다.

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id

프런트엔드는 `/api/graph/task/{task_id}`를 통해 작업 상태를 폴링합니다.

2. 재시도 기능이 있는 배치 LLM 호출

구성 생성은 대규모 에이전트 목록을 15개씩의 배치로 분할합니다.

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)

각 배치에는 잘린 출력에 대한 JSON 복구 로직이 포함됩니다.

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content

3. 듀얼 플랫폼 병렬 시뮬레이션

Twitter와 Reddit은 별도의 데이터베이스와 활동 로그를 사용하여 병렬로 실행됩니다.

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log

실행기는 `simulation_end` 이벤트를 통해 플랫폼별 완료를 감지합니다.

성능 고려 사항

메모리 관리

데이터베이스 격리

각 플랫폼은 병렬 쓰기 중 잠금 경합을 피하기 위해 자체 SQLite 데이터베이스를 사용합니다.

점진적 성능 저하

Zep 검색 API가 실패하면 시스템은 로컬 키워드 일치로 대체됩니다.

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)

결론

MiroFish는 완전한 다중 에이전트 시뮬레이션 시스템을 처음부터 구축하는 방법을 보여줍니다. 5단계 워크플로는 원본 문서를 수천 명의 에이전트가 현실적인 행동 패턴에 따라 상호 작용하는 살아있는 디지털 세계로 변환합니다.

주요 요점:

  1. 온톨로지 설계가 중요합니다: 2단계 구조(8가지 특정 유형 + 2가지 대체 유형)는 API 제한을 초과하지 않으면서 포괄적인 적용 범위를 보장합니다.
  2. 비동기 워크플로는 장기 작업을 가능하게 합니다: 진행 상황 업데이트가 있는 작업 추적은 여러 분이 소요되는 작업 중에도 사용자에게 정보를 제공합니다.
  3. 시간 기반 활동은 현실감을 만듭니다: 중국 시간대 패턴과 에이전트 유형별 스케줄은 신뢰할 수 있는 행동을 생성합니다.
  4. 듀얼 플랫폼 시뮬레이션은 비교를 제공합니다: Twitter와 Reddit을 병렬로 실행하면 플랫폼 역학이 결과에 미치는 영향을 보여줍니다.
  5. 3단계 검색은 다양한 요구 사항을 충족합니다: InsightForge는 깊이 있는 분석을, PanoramaSearch는 광범위한 조사를, InterviewAgents는 직접적인 관점을 제공합니다.

전체 소스 코드는 github.com/666ghj/MiroFish에서 확인할 수 있습니다.

버튼

MiroFish를 사용해보고 싶으신가요? 라이브 데모를 방문하여 핫스팟 이벤트 시뮬레이션이 작동하는 것을 확인해보세요.

Apidog에서 API 설계-첫 번째 연습

API를 더 쉽게 구축하고 사용하는 방법을 발견하세요