はじめに
ソーシャルメディアは変化が速いです。1つの投稿が、誰も予測しなかった反応、再形成、対抗運動の連鎖を引き起こすことがあります。現実世界でシナリオが起こる前に、それがどのように展開するかを見ることができたらどうでしょうか?
MiroFishはまさにそれを行います。これは、数千のAIエージェントがそれぞれ異なる個性、記憶、行動パターンを持ち、自由に相互作用するデジタル並行世界を構築する群知能エンジンです。ニュース記事、政策草案、さらには小説といったシードマテリアルをアップロードすると、MiroFishはイベントがどのように展開するかを高精度でシミュレーションします。
この記事では、MiroFishの技術アーキテクチャを詳しく解説します。システムがどのように生データを生き生きとしたシミュレーションに変換し、エージェントがどのように意思決定を行い、そして知識グラフの構築からリアルタイム監視まで、5段階のワークフローがどのようにすべてを調整しているのかを学びます。

システム概要: 5段階のワークフロー
MiroFishは、5つの異なるフェーズを経てシミュレーションを処理します。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
ステップ1: オントロジー生成
システムは、入力ドキュメントとシミュレーション要件を分析し、LLMを使用してカスタムオントロジーを生成します。これにより、以下が定義されます。
- 10種類のエンティティタイプ (例: 学生、教授、大学、メディア、政府機関)
- 10種類の関係タイプ (例: WORKS_FOR (勤務する), COMMENTS_ON (コメントする), RESPONDS_TO (応答する))
- 各タイプに関する属性 (
name,uuid,created_atなどの予約語は避ける)
オントロジージェネレーターは2層構造を採用しています。コンテンツに基づく8つの特定タイプと、それ以外のものを受け入れるための2つのフォールバックタイプ (Person と Organization) です。
ステップ2: GraphRAG構築
ドキュメントはチャンクに分割され(500文字、50文字の重複)、バッチでZep Cloudに送信されます。システムは以下を行います。
- 一意のIDを持つスタンドアロンのグラフを作成する
- カスタムオントロジーを設定する
- エンティティおよび関係抽出のためにテキストバッチを送信する
- Zepが各エピソードを処理するのを待機する
- ノードとエッジを含む最終的なグラフを取得する
ステップ3: 環境設定
シミュレーション構成ジェネレーターは知識グラフを分析し、詳細なエージェントパラメータを作成します。
- 中国のタイムゾーンパターンに基づいた時間設定 (ピーク時間 19-22時、閑散時間 0-5時)
- 初期投稿とホットトピックを含むイベント設定
- エージェント活動設定 (1時間あたりの投稿数、応答遅延、影響度)
- 異なるバイラルしきい値を持つTwitterおよびReddit用のプラットフォーム設定
ステップ4: シミュレーション実行
エージェントは活動スケジュールに従って目覚め、投稿、コメント、反応を開始します。システムはTwitterとRedditで並行シミュレーションを実行し、すべての行動をリアルタイムでJSONLファイルに記録します。
ステップ5: レポート生成
レポートエージェントは、発生した事象を分析するために3つの主要な検索ツールを使用します。
- InsightForge: 質問をサブクエリに分解する詳細検索
- PanoramaSearch: 期限切れ/無効な履歴的事実を含む全範囲のビュー
- InterviewAgents: IPCを介したアクティブエージェントとのリアルタイムインタビュー
技術詳細: オントロジー生成
オントロジージェネレーターはbackend/app/services/ontology_generator.pyにあります。これは、厳格なルールを適用するために注意深く作成されたシステムプロンプトを使用しています。
システムプロンプトには、有効なエンティティ(人物、組織、メディアなど)とそうでないもの(抽象的な概念、テーマ、視点など)について広範なガイダンスが含まれています。この区別は、シミュレーションがソーシャルメディア上で実際に発言し行動できるエージェントを必要とするため重要です。
LLMがオントロジーを生成した後、_validate_and_processメソッドは制約を適用します。
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep APIの制限: エンティティタイプは最大10、エッジタイプは最大10
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# フォールバックタイプが存在することを確認
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# フォールバックを追加すると制限を超過する場合、トリムする
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
この検証レイヤーにより、ZepのAPI制限内で常に機能し、2層構造を維持することが保証されます。
知識グラフ構築: Zep統合
グラフビルダーサービス(backend/app/services/graph_builder.py)は非同期ワークフローを処理します。
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. グラフを作成
graph_id = self.create_graph(graph_name)
# 2. オントロジーを設定
self.set_ontology(graph_id, ontology)
# 3. テキストをチャンク化
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. バッチを送信
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Zepの処理を待機
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. 最終的なグラフを取得
graph_info = self._get_graph_info(graph_id)
動的なPydanticモデル生成
巧妙な点の一つは、システムが実行時に各エンティティタイプに対してPydanticモデルを動的に作成することです。
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
これにより、事前に定義されたモデルを必要とせずに、Zepはカスタムスキーマに対してエンティティ属性を検証できます。
大規模グラフのページング
Zepはページングされた結果を返します。zep_paging.pyユーティリティはすべてを取得します。
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
時間ベースのエージェント活動シミュレーション
シミュレーション構成ジェネレーター(backend/app/services/simulation_config_generator.py)は、中国のタイムゾーンの行動に基づいて現実的な活動パターンを作成します。
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 深夜はほとんど人がいない
"morning_hours": [6, 7, 8], # 朝方に徐々に活発化
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 夜間ピーク
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
異なるエージェントタイプには異なるパターンが割り当てられます。
| エージェントタイプ | 活動レベル | 活動時間 | 応答遅延 | 影響力 |
|---|---|---|---|---|
| 大学 | 0.2 | 9-17 | 60-240 分 | 3.0 |
| メディア | 0.5 | 7-23 | 5-30 分 | 2.5 |
| 学生 | 0.8 | 8-12, 18-23 | 1-15 分 | 0.8 |
| 教授 | 0.4 | 8-21 | 15-90 分 | 2.0 |
構成ジェネレーターは、LLM呼び出しを使用して特定のシナリオに基づいてこれらの値をカスタマイズし、LLMが失敗した場合はルールベースのデフォルト値にフォールバックします。
リアルタイム行動追跡
シミュレーションランナー(backend/app/services/simulation_runner.py)は、JSONLログをストリーミングしてエージェントの活動を監視します。
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# イベントを処理
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# エージェントの行動を解析
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
これはバックグラウンドスレッドで実行され、2秒ごとにシミュレーション状態を更新します。フロントエンドはこの状態をポーリングしてリアルタイムの進捗を表示します。
クロスプラットフォームのプロセス管理
シミュレーションを停止するには、WindowsとUnixにわたる注意深いプロセス管理が必要です。
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: taskkill を使用してプロセスツリーを強制終了
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: プロセスグループを強制終了 (start_new_session=True で作成)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
クリーンアップハンドラーは、SIGINT、SIGTERM、SIGHUPのシグナルハンドラーを登録します。
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# その後、元のハンドラーを呼び出す
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
これにより、サーバーがシャットダウンしたときにシミュレーションが正常に停止することが保証されます。
レポート生成: 3層検索
Zepツールサービス(backend/app/services/zep_tools.py)は、3つの検索機能を提供します。
InsightForge (詳細分析)
複雑な質問をサブクエリに分解し、それぞれを検索し、その後集計します。
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. LLMを使用してサブクエリを生成
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. 各サブクエリを検索
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. エッジからエンティティUUIDを抽出
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. 詳細なエンティティ情報を取得
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. 関係チェーンを構築
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (全範囲)
期限切れ/無効な履歴的事実を含むすべてを取得します。
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (リアルタイム)
実際のOASISインタビューAPIを呼び出して、アクティブなエージェントと対話します。
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. CSV/JSONからエージェントプロファイルをロード
profiles = self._load_agent_profiles(simulation_id)
# 2. LLMを使用して関連するエージェントを選択
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. インタビュー質問を生成
questions = self._generate_interview_questions(...)
# 4. 実際のインタビューAPIを呼び出す(デュアルプラットフォーム)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. 結果を解析および整形
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
主要な設計上の決定事項
1. 非同期タスク管理
時間のかかる操作(グラフ構築、シミュレーション実行)は、進捗追跡機能を備えた非同期タスクを使用します。
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
フロントエンドは/api/graph/task/{task_id}を介してタスクの状態をポーリングします。
2. リトライ付きバッチLLM呼び出し
構成生成は、大規模なエージェントリストを15個のバッチに分割します。
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
各バッチには、切り詰められた出力のためのJSON修復ロジックが含まれています。
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. デュアルプラットフォーム並列シミュレーション
TwitterとRedditは、個別のデータベースとアクションログを使用して並行して実行されます。
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
ランナーは、simulation_endイベントを介してプラットフォームごとの完了を検出します。
パフォーマンスに関する考慮事項
メモリ管理
- 大規模なドキュメントは、LLMコンテキスト用に5万文字に切り詰められます
- エンティティの要約はそれぞれ300文字に制限されます
- 最近のアクションはメモリ上で50件に制限されます(完全な履歴はJSONLファイルに保存)
データベースの分離
各プラットフォームは独自のSQLiteデータベースを使用し、並行書き込み時のロック競合を回避します。
段階的な劣化 (Graceful Degradation)
Zep検索APIが失敗した場合、システムはローカルキーワードマッチングにフォールバックします。
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search APIが失敗しました。ローカル検索にフォールバックします: {e}")
return self._local_search(graph_id, query, limit, scope)
結論
MiroFishは、ゼロから完全なマルチエージェントシミュレーションシステムを構築する方法を示します。5段階のワークフローは、生データを、数千のエージェントが現実的な行動パターンに従って相互作用する生きたデジタル世界へと変換します。
主なポイント:
- オントロジー設計が重要: 2層構造 (8つの特定タイプ + 2つのフォールバックタイプ) により、API制限を超過することなく網羅性を確保
- 非同期ワークフローが長時間の操作を可能にする: 進捗更新を伴うタスク追跡により、数分かかる操作中もユーザーに情報を提供
- 時間ベースの活動が現実味を生み出す: 中国のタイムゾーンパターンとエージェントタイプ固有のスケジュールにより、信頼性の高い行動が生成される
- デュアルプラットフォームシミュレーションが比較を提供する: TwitterとRedditを並行して実行することで、プラットフォームのダイナミクスが結果にどのように影響するかを示す
- 3層検索が異なるニーズに対応: InsightForgeは深掘り、PanoramaSearchは広範囲、InterviewAgentsは直接的な視点を提供
完全なソースコードはgithub.com/666ghj/MiroFishで入手できます。
MiroFishを試してみたいですか?ライブデモにアクセスして、ホットスポットイベントシミュレーションが動作している様子をご覧ください。
