目標: NyaMeshP2Pライブラリ実現のための本格的P2P通信システム
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ P2PBox │◄──►│ MessageBus │◄──►│ Transport │
│ (ユーザーAPI) │ │ (ローカル配送) │ │ (送受信層) │
└─────────────┘ └──────────────┘ └─────────────┘
▲ ▲ ▲
│ │ │
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ IntentBox │ │ ハンドラ管理 │ │ InProcess │
│ (構造化MSG) │ │ ノード登録 │ │ WebSocket │
└─────────────┘ └──────────────┘ │ WebRTC │
└─────────────┘
核心思想: 「P2PBox一つにTransport一つ + 共有MessageBus」
pub struct IntentBoxData {
pub name: String, // "chat.message", "file.share"等
pub payload: serde_json::Value, // 任意のJSON data
}
pub type IntentBox = Arc<Mutex<IntentBoxData>>;
// 使用例
msg = new IntentBox("chat.message", { text: "Hello P2P!", timestamp: 12345 })
print(msg.name) // "chat.message"
print(msg.payload) // {"text":"Hello P2P!","timestamp":12345}
pub struct MessageBusData {
nodes: HashMap<String, BusEndpoint>, // ノード登録
subscribers: HashMap<String, Vec<IntentHandler>>, // "node_id:intent_name" → ハンドラー
stats: BusStatistics,
}
pub type MessageBus = Arc<Mutex<MessageBusData>>;
// シングルトンアクセス
MessageBusData::global() -> MessageBus
役割: 同プロセス内での超高速メッセージルーティング・ハンドラ管理
pub trait Transport: Send + Sync {
fn node_id(&self) -> &str;
fn send(&self, to: &str, intent: IntentBox, opts: SendOpts) -> Result<(), SendError>;
fn on_receive(&mut self, callback: Box<dyn Fn(IntentEnvelope) + Send + Sync>);
}
// 3種類の実装
pub struct InProcessTransport { ... } // 同プロセス内
pub struct WebSocketTransport { ... } // WebSocket通信
pub struct WebRTCTransport { ... } // WebRTC P2P
pub struct P2PBoxData {
node_id: String,
transport: Arc<dyn Transport>,
bus: MessageBus, // 全P2PBoxで共有
}
pub type P2PBox = Arc<Mutex<P2PBoxData>>;
impl P2PBoxData {
pub fn new(node_id: String, kind: TransportKind) -> P2PBox
pub fn on(&self, intent_name: &str, handler: IntentHandler) -> Result<(), P2PError>
pub fn send(&self, to: &str, intent: IntentBox) -> Result<(), SendError>
}
// 2つのノード作成
node_a = new P2PBox("alice", transport: "inprocess")
node_b = new P2PBox("bob", transport: "inprocess")
// 受信ハンドラ設定
node_b.on("chat.message", function(intent, from) {
console = new ConsoleBox()
console.log("From " + from + ": " + intent.payload.text)
})
// メッセージ送信
msg = new IntentBox("chat.message", { text: "Hello P2P!" })
node_a.send("bob", msg) // → "From alice: Hello P2P!"
// 文字列直送(内部でIntentBox化) - 個別送信のみ
node_a.send("bob", "hello") // → IntentBox("message", "hello")
node_a.send("bob", "chat:hello") // → IntentBox("chat", "hello")
// 注意: ブロードキャスト機能は安全性のため除外(無限ループリスク回避)
// 将来:異なるTransportでも同じAPI
local_node = new P2PBox("local", transport: "inprocess")
web_node = new P2PBox("web", transport: "websocket", { url: "ws://localhost:8080" })
p2p_node = new P2PBox("p2p", transport: "webrtc", { ice_servers: [...] })
// Transportに関係なく同じsend/onメソッド
msg = new IntentBox("file.share", { filename: "data.json" })
local_node.send("web", msg) // WebSocket経由
web_node.send("p2p", msg) // WebRTC経由
1. bus.has_node("node_b") == true?
YES → bus.route("node_b", intent) ← 同プロセス内の最速配送
NO → transport.send("node_b", intent, opts) ← WebSocket/WebRTC経由
transport.on_receive コールバックで受信
↓
1. envelope.to == self.node_id?
YES → bus.dispatch_to_local(self.node_id, intent) ← 自分宛
NO → 2へ
↓
2. bus.has_node(envelope.to)?
YES → bus.route(envelope.to, intent) ← 同プロセス内転送
NO → bus.trace_drop(envelope) ← ルート不明
src/boxes/intent_box.rs
)src/messaging/message_bus.rs
)src/transport/mod.rs
)src/transport/inprocess.rs
)src/boxes/p2p_box.rs
)// tests/phase2/p2p_basic_test.nyash
node_a = new P2PBox("alice", transport: "inprocess")
node_b = new P2PBox("bob", transport: "inprocess")
node_b.on("test.ping", function(intent, from) {
console = new ConsoleBox()
console.log("PING from " + from)
})
msg = new IntentBox("test.ping", { timestamp: 12345 })
result = node_a.send("bob", msg)
// 期待出力: "PING from alice"
send()
は同期版 fn
で実装開始async fn
化Arc<Mutex<_>>
パターンに準拠MessageBus
も Arc<Mutex<MessageBusData>>
IntentBox
も Arc<Mutex<IntentBoxData>>
#[derive(Debug, Clone)]
pub enum SendError {
NodeNotFound(String), // 宛先ノードが見つからない
NetworkError(String), // ネットワークエラー
Timeout, // 送信タイムアウト
SerializationError(String), // JSON変換エラー
BusError(String), // MessageBusエラー
}
// NyaMeshP2Pライブラリの実現
mesh = new NyaMesh("my_node")
mesh.join("chat_room")
mesh.send("Hello everyone!")
mesh.on("message", function(msg, from) {
print(from + ": " + msg)
})
📚 関連ドキュメント
📝 最終更新: 2025-08-12 | 🎓 設計協力: Gemini先生・ChatGPT先生 | 🎯 目標: Everything is Box哲学によるP2P通信革命 |