feat(relay): track peer last-seen times
The plan calls for relay switch state to remember when peers were last seen. Wrap room peers in a small entry that keeps PeerInfo alongside a monotonic Instant, and expose the timestamp through room snapshots for relay diagnostics. Joining initializes last_seen. Valid Ethernet frames with the peer's authorized source refresh the timestamp before safety filters or rate limits run, so filtered but well-formed traffic still proves the peer is active. Malformed frames and forged client source MACs do not refresh it. Room leave now removes the associated timestamp with the peer entry. That keeps membership, MAC indexes, rate-limit buckets, and diagnostics state aligned. Test Plan: - cargo fmt --check - cargo test -p lanparty-relay last_seen -- --nocapture - cargo test -p lanparty-relay - cargo clippy -p lanparty-relay --all-targets -- -D warnings - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md
This commit is contained in:
@@ -93,6 +93,7 @@ Public relay binary and relay-owned room state:
|
|||||||
- malformed peer datagram disconnect threshold
|
- malformed peer datagram disconnect threshold
|
||||||
- peer stats control events retained for relay diagnostics
|
- peer stats control events retained for relay diagnostics
|
||||||
- graceful disconnect control events propagated as peer-leave reasons
|
- graceful disconnect control events propagated as peer-leave reasons
|
||||||
|
- per-peer last-seen timestamps in relay room snapshots
|
||||||
- peer leave cleanup for room membership and MAC indexes
|
- peer leave cleanup for room membership and MAC indexes
|
||||||
|
|
||||||
## Build
|
## Build
|
||||||
|
|||||||
@@ -67,6 +67,7 @@ pub struct RoomSnapshot {
|
|||||||
effective_tap_mtu: u16,
|
effective_tap_mtu: u16,
|
||||||
gateway: Option<PeerInfo>,
|
gateway: Option<PeerInfo>,
|
||||||
clients: Vec<PeerInfo>,
|
clients: Vec<PeerInfo>,
|
||||||
|
last_seen_by_peer: HashMap<u32, Instant>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RoomSnapshot {
|
impl RoomSnapshot {
|
||||||
@@ -89,6 +90,11 @@ impl RoomSnapshot {
|
|||||||
pub fn clients(&self) -> &[PeerInfo] {
|
pub fn clients(&self) -> &[PeerInfo] {
|
||||||
&self.clients
|
&self.clients
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn last_seen(&self, peer_id: u32) -> Option<Instant> {
|
||||||
|
self.last_seen_by_peer.get(&peer_id).copied()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
@@ -287,14 +293,26 @@ struct Room {
|
|||||||
next_peer_id: u32,
|
next_peer_id: u32,
|
||||||
max_clients: usize,
|
max_clients: usize,
|
||||||
effective_tap_mtu: Option<u16>,
|
effective_tap_mtu: Option<u16>,
|
||||||
gateway: Option<PeerInfo>,
|
gateway: Option<PeerEntry>,
|
||||||
clients: HashMap<u32, PeerInfo>,
|
clients: HashMap<u32, PeerEntry>,
|
||||||
clients_by_mac: HashMap<MacAddr, u32>,
|
clients_by_mac: HashMap<MacAddr, u32>,
|
||||||
client_multicast_limits: HashMap<u32, TokenBucket>,
|
client_multicast_limits: HashMap<u32, TokenBucket>,
|
||||||
client_unknown_unicast_limits: HashMap<u32, TokenBucket>,
|
client_unknown_unicast_limits: HashMap<u32, TokenBucket>,
|
||||||
client_total_bandwidth_limits: HashMap<u32, TokenBucket>,
|
client_total_bandwidth_limits: HashMap<u32, TokenBucket>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct PeerEntry {
|
||||||
|
info: PeerInfo,
|
||||||
|
last_seen: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerEntry {
|
||||||
|
const fn new(info: PeerInfo, last_seen: Instant) -> Self {
|
||||||
|
Self { info, last_seen }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Room {
|
impl Room {
|
||||||
fn new(room_id: u64, max_clients: usize) -> Self {
|
fn new(room_id: u64, max_clients: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -337,14 +355,17 @@ impl Room {
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let joined_at = Instant::now();
|
||||||
|
|
||||||
match peer.role() {
|
match peer.role() {
|
||||||
Role::Gateway => {
|
Role::Gateway => {
|
||||||
self.gateway = Some(peer.clone());
|
self.gateway = Some(PeerEntry::new(peer.clone(), joined_at));
|
||||||
}
|
}
|
||||||
Role::Client => {
|
Role::Client => {
|
||||||
let mac = peer.mac().expect("client peer info has MAC");
|
let mac = peer.mac().expect("client peer info has MAC");
|
||||||
self.clients_by_mac.insert(mac, peer.peer_id());
|
self.clients_by_mac.insert(mac, peer.peer_id());
|
||||||
self.clients.insert(peer.peer_id(), peer.clone());
|
self.clients
|
||||||
|
.insert(peer.peer_id(), PeerEntry::new(peer.clone(), joined_at));
|
||||||
self.client_multicast_limits
|
self.client_multicast_limits
|
||||||
.insert(peer.peer_id(), client_multicast_limit());
|
.insert(peer.peer_id(), client_multicast_limit());
|
||||||
self.client_unknown_unicast_limits
|
self.client_unknown_unicast_limits
|
||||||
@@ -409,14 +430,30 @@ impl Room {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn snapshot(&self) -> RoomSnapshot {
|
fn snapshot(&self) -> RoomSnapshot {
|
||||||
let mut clients: Vec<_> = self.clients.values().cloned().collect();
|
let mut clients: Vec<_> = self
|
||||||
|
.clients
|
||||||
|
.values()
|
||||||
|
.map(|entry| entry.info.clone())
|
||||||
|
.collect();
|
||||||
clients.sort_by_key(PeerInfo::peer_id);
|
clients.sort_by_key(PeerInfo::peer_id);
|
||||||
|
let mut last_seen_by_peer =
|
||||||
|
HashMap::with_capacity(self.clients.len() + usize::from(self.gateway.is_some()));
|
||||||
|
|
||||||
|
if let Some(gateway) = &self.gateway {
|
||||||
|
last_seen_by_peer.insert(gateway.info.peer_id(), gateway.last_seen);
|
||||||
|
}
|
||||||
|
last_seen_by_peer.extend(
|
||||||
|
self.clients
|
||||||
|
.values()
|
||||||
|
.map(|client| (client.info.peer_id(), client.last_seen)),
|
||||||
|
);
|
||||||
|
|
||||||
RoomSnapshot {
|
RoomSnapshot {
|
||||||
room_id: self.room_id,
|
room_id: self.room_id,
|
||||||
effective_tap_mtu: self.effective_tap_mtu.unwrap_or_default(),
|
effective_tap_mtu: self.effective_tap_mtu.unwrap_or_default(),
|
||||||
gateway: self.gateway.clone(),
|
gateway: self.gateway.as_ref().map(|gateway| gateway.info.clone()),
|
||||||
clients,
|
clients,
|
||||||
|
last_seen_by_peer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -424,18 +461,19 @@ impl Room {
|
|||||||
if self
|
if self
|
||||||
.gateway
|
.gateway
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.is_some_and(|gateway| gateway.peer_id() == peer_id)
|
.is_some_and(|gateway| gateway.info.peer_id() == peer_id)
|
||||||
{
|
{
|
||||||
return Ok(self.gateway.take().expect("gateway exists"));
|
return Ok(self.gateway.take().expect("gateway exists").info);
|
||||||
}
|
}
|
||||||
|
|
||||||
let peer =
|
let entry =
|
||||||
self.clients
|
self.clients
|
||||||
.remove(&peer_id)
|
.remove(&peer_id)
|
||||||
.ok_or_else(|| ForwardingError::UnknownIngressPeer {
|
.ok_or_else(|| ForwardingError::UnknownIngressPeer {
|
||||||
room: room_code.clone(),
|
room: room_code.clone(),
|
||||||
peer_id,
|
peer_id,
|
||||||
})?;
|
})?;
|
||||||
|
let peer = entry.info;
|
||||||
let mac = peer.mac().expect("client peer info has MAC");
|
let mac = peer.mac().expect("client peer info has MAC");
|
||||||
self.clients_by_mac.remove(&mac);
|
self.clients_by_mac.remove(&mac);
|
||||||
self.client_multicast_limits.remove(&peer.peer_id());
|
self.client_multicast_limits.remove(&peer.peer_id());
|
||||||
@@ -463,13 +501,14 @@ impl Room {
|
|||||||
peer_id: ingress_peer_id,
|
peer_id: ingress_peer_id,
|
||||||
})?;
|
})?;
|
||||||
let ingress_role = ingress.role();
|
let ingress_role = ingress.role();
|
||||||
|
let ingress_mac = ingress.mac();
|
||||||
let frame = match EthernetFrame::parse(frame_bytes) {
|
let frame = match EthernetFrame::parse(frame_bytes) {
|
||||||
Ok(frame) => frame,
|
Ok(frame) => frame,
|
||||||
Err(_) => return Ok(ForwardingDecision::dropped(DropReason::Malformed)),
|
Err(_) => return Ok(ForwardingDecision::dropped(DropReason::Malformed)),
|
||||||
};
|
};
|
||||||
|
|
||||||
if ingress_role == Role::Client {
|
if ingress_role == Role::Client {
|
||||||
let expected_source = ingress.mac().expect("client peers have MAC addresses");
|
let expected_source = ingress_mac.expect("client peers have MAC addresses");
|
||||||
if frame.source() != expected_source {
|
if frame.source() != expected_source {
|
||||||
return Ok(ForwardingDecision::dropped(
|
return Ok(ForwardingDecision::dropped(
|
||||||
DropReason::UnauthorizedSourceMac,
|
DropReason::UnauthorizedSourceMac,
|
||||||
@@ -477,6 +516,8 @@ impl Room {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.mark_peer_seen(ingress_peer_id, now);
|
||||||
|
|
||||||
if let Some(drop_reason) = safety_drop_reason(ingress_role, frame) {
|
if let Some(drop_reason) = safety_drop_reason(ingress_role, frame) {
|
||||||
return Ok(ForwardingDecision::dropped(drop_reason));
|
return Ok(ForwardingDecision::dropped(drop_reason));
|
||||||
}
|
}
|
||||||
@@ -546,8 +587,24 @@ impl Room {
|
|||||||
fn peer(&self, peer_id: u32) -> Option<&PeerInfo> {
|
fn peer(&self, peer_id: u32) -> Option<&PeerInfo> {
|
||||||
self.gateway
|
self.gateway
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.filter(|peer| peer.peer_id() == peer_id)
|
.filter(|peer| peer.info.peer_id() == peer_id)
|
||||||
.or_else(|| self.clients.get(&peer_id))
|
.map(|peer| &peer.info)
|
||||||
|
.or_else(|| self.clients.get(&peer_id).map(|peer| &peer.info))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mark_peer_seen(&mut self, peer_id: u32, seen_at: Instant) {
|
||||||
|
if let Some(gateway) = self
|
||||||
|
.gateway
|
||||||
|
.as_mut()
|
||||||
|
.filter(|peer| peer.info.peer_id() == peer_id)
|
||||||
|
{
|
||||||
|
gateway.last_seen = seen_at;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(client) = self.clients.get_mut(&peer_id) {
|
||||||
|
client.last_seen = seen_at;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_peer_ids_except(&self, ingress_peer_id: u32) -> Vec<u32> {
|
fn all_peer_ids_except(&self, ingress_peer_id: u32) -> Vec<u32> {
|
||||||
@@ -555,9 +612,9 @@ impl Room {
|
|||||||
Vec::with_capacity(self.clients.len() + usize::from(self.gateway.is_some()));
|
Vec::with_capacity(self.clients.len() + usize::from(self.gateway.is_some()));
|
||||||
|
|
||||||
if let Some(gateway) = &self.gateway
|
if let Some(gateway) = &self.gateway
|
||||||
&& gateway.peer_id() != ingress_peer_id
|
&& gateway.info.peer_id() != ingress_peer_id
|
||||||
{
|
{
|
||||||
peer_ids.push(gateway.peer_id());
|
peer_ids.push(gateway.info.peer_id());
|
||||||
}
|
}
|
||||||
|
|
||||||
peer_ids.extend(
|
peer_ids.extend(
|
||||||
@@ -573,8 +630,8 @@ impl Room {
|
|||||||
fn gateway_peer_id_except(&self, ingress_peer_id: u32) -> Vec<u32> {
|
fn gateway_peer_id_except(&self, ingress_peer_id: u32) -> Vec<u32> {
|
||||||
self.gateway
|
self.gateway
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.filter(|gateway| gateway.peer_id() != ingress_peer_id)
|
.filter(|gateway| gateway.info.peer_id() != ingress_peer_id)
|
||||||
.map_or_else(Vec::new, |gateway| vec![gateway.peer_id()])
|
.map_or_else(Vec::new, |gateway| vec![gateway.info.peer_id()])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -826,6 +883,8 @@ mod tests {
|
|||||||
assert_eq!(snapshot.gateway().unwrap().peer_id(), 1);
|
assert_eq!(snapshot.gateway().unwrap().peer_id(), 1);
|
||||||
assert_eq!(snapshot.clients().len(), 1);
|
assert_eq!(snapshot.clients().len(), 1);
|
||||||
assert_eq!(snapshot.effective_tap_mtu(), 1200);
|
assert_eq!(snapshot.effective_tap_mtu(), 1200);
|
||||||
|
assert!(snapshot.last_seen(gateway.peer().peer_id()).is_some());
|
||||||
|
assert!(snapshot.last_seen(client.peer().peer_id()).is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -904,6 +963,7 @@ mod tests {
|
|||||||
assert_eq!(result.peer(), client.peer());
|
assert_eq!(result.peer(), client.peer());
|
||||||
assert!(!result.room_removed());
|
assert!(!result.room_removed());
|
||||||
assert_eq!(snapshot.clients().len(), 1);
|
assert_eq!(snapshot.clients().len(), 1);
|
||||||
|
assert_eq!(snapshot.last_seen(client.peer().peer_id()), None);
|
||||||
assert!(registry.join(client_hello(1)).is_ok());
|
assert!(registry.join(client_hello(1)).is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -932,6 +992,7 @@ mod tests {
|
|||||||
assert_eq!(result.peer(), gateway.peer());
|
assert_eq!(result.peer(), gateway.peer());
|
||||||
assert!(!result.room_removed());
|
assert!(!result.room_removed());
|
||||||
assert!(snapshot.gateway().is_none());
|
assert!(snapshot.gateway().is_none());
|
||||||
|
assert_eq!(snapshot.last_seen(gateway.peer().peer_id()), None);
|
||||||
assert_eq!(snapshot.clients().len(), 1);
|
assert_eq!(snapshot.clients().len(), 1);
|
||||||
assert!(registry.join(gateway_hello()).is_ok());
|
assert!(registry.join(gateway_hello()).is_ok());
|
||||||
}
|
}
|
||||||
@@ -1021,6 +1082,74 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn refreshes_peer_last_seen_after_valid_frames() {
|
||||||
|
let mut registry = RoomRegistry::default();
|
||||||
|
let gateway = registry.join(gateway_hello()).unwrap();
|
||||||
|
let client = registry.join(client_hello(1)).unwrap();
|
||||||
|
let client_seen_at = Instant::now() + Duration::from_secs(5);
|
||||||
|
let gateway_seen_at = client_seen_at + Duration::from_secs(1);
|
||||||
|
let client_frame = ethernet(MacAddr::BROADCAST, mac(1));
|
||||||
|
let gateway_frame = ethernet(mac(1), physical_mac());
|
||||||
|
|
||||||
|
registry
|
||||||
|
.forward_ethernet_at(
|
||||||
|
&room(),
|
||||||
|
client.peer().peer_id(),
|
||||||
|
&client_frame,
|
||||||
|
client_seen_at,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
registry
|
||||||
|
.forward_ethernet_at(
|
||||||
|
&room(),
|
||||||
|
gateway.peer().peer_id(),
|
||||||
|
&gateway_frame,
|
||||||
|
gateway_seen_at,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let snapshot = registry.snapshot(&room()).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
snapshot.last_seen(client.peer().peer_id()),
|
||||||
|
Some(client_seen_at)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
snapshot.last_seen(gateway.peer().peer_id()),
|
||||||
|
Some(gateway_seen_at)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn keeps_last_seen_unchanged_for_unauthorized_client_source() {
|
||||||
|
let mut registry = RoomRegistry::default();
|
||||||
|
let client = registry.join(client_hello(1)).unwrap();
|
||||||
|
let before = registry
|
||||||
|
.snapshot(&room())
|
||||||
|
.unwrap()
|
||||||
|
.last_seen(client.peer().peer_id())
|
||||||
|
.unwrap();
|
||||||
|
let frame = ethernet(MacAddr::BROADCAST, mac(2));
|
||||||
|
|
||||||
|
let decision = registry
|
||||||
|
.forward_ethernet_at(
|
||||||
|
&room(),
|
||||||
|
client.peer().peer_id(),
|
||||||
|
&frame,
|
||||||
|
before + Duration::from_secs(5),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_dropped(&decision, DropReason::UnauthorizedSourceMac);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
registry
|
||||||
|
.snapshot(&room())
|
||||||
|
.unwrap()
|
||||||
|
.last_seen(client.peer().peer_id()),
|
||||||
|
Some(before)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rate_limits_client_broadcast_after_burst() {
|
fn rate_limits_client_broadcast_after_burst() {
|
||||||
let mut registry = RoomRegistry::default();
|
let mut registry = RoomRegistry::default();
|
||||||
|
|||||||
Reference in New Issue
Block a user