diff --git a/nodescraper/plugins/inband/network/network_collector.py b/nodescraper/plugins/inband/network/network_collector.py index 0f96e7c8..21f571da 100644 --- a/nodescraper/plugins/inband/network/network_collector.py +++ b/nodescraper/plugins/inband/network/network_collector.py @@ -31,11 +31,25 @@ from nodescraper.models import TaskResult from .networkdata import ( + BroadcomNicDevice, + BroadcomNicQos, + BroadcomNicQosAppEntry, EthtoolInfo, IpAddress, Neighbor, NetworkDataModel, NetworkInterface, + PensandoNicCard, + PensandoNicDcqcn, + PensandoNicEnvironment, + PensandoNicPcieAts, + PensandoNicPort, + PensandoNicQos, + PensandoNicQosScheduling, + PensandoNicRdmaStatistic, + PensandoNicRdmaStatistics, + PensandoNicVersionFirmware, + PensandoNicVersionHostSoftware, Route, RoutingRule, ) @@ -49,7 +63,26 @@ class NetworkCollector(InBandDataCollector[NetworkDataModel, None]): CMD_ROUTE = "ip route show" CMD_RULE = "ip rule show" CMD_NEIGHBOR = "ip neighbor show" - CMD_ETHTOOL_TEMPLATE = "sudo ethtool {interface}" + CMD_ETHTOOL_TEMPLATE = "ethtool {interface}" + + # LLDP commands + CMD_LLDPCLI_NEIGHBOR = "lldpcli show neighbor" + CMD_LLDPCTL = "lldpctl" + + # Broadcom NIC commands + CMD_NICCLI_LISTDEV = "niccli --list_devices" + CMD_NICCLI_GETQOS_TEMPLATE = "niccli --dev {device_num} qos --ets --show" + + # Pensando NIC commands + CMD_NICCTL_CARD = "nicctl show card" + CMD_NICCTL_DCQCN = "nicctl show dcqcn" + CMD_NICCTL_ENVIRONMENT = "nicctl show environment" + CMD_NICCTL_PCIE_ATS = "nicctl show pcie ats" + CMD_NICCTL_PORT = "nicctl show port" + CMD_NICCTL_QOS = "nicctl show qos" + CMD_NICCTL_RDMA_STATISTICS = "nicctl show rdma statistics" + CMD_NICCTL_VERSION_HOST_SOFTWARE = "nicctl show version host-software" + CMD_NICCTL_VERSION_FIRMWARE = "nicctl show version firmware" def _parse_ip_addr(self, output: str) -> List[NetworkInterface]: """Parse 'ip addr show' output into NetworkInterface objects. @@ -431,6 +464,920 @@ def _parse_ethtool(self, interface: str, output: str) -> EthtoolInfo: return ethtool_info + def _parse_niccli_listdev(self, output: str) -> List[BroadcomNicDevice]: + """Parse 'niccli --list_devices' output into BroadcomNicDevice objects. + + Args: + output: Raw output from 'niccli --list_devices' command + + Returns: + List of BroadcomNicDevice objects + """ + devices = [] + current_device = None + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Check if this is a device header line + match = re.match(r"^(\d+)\s*\)\s*(.+?)(?:\s+\((.+?)\))?$", line_stripped) + if match: + device_num_str = match.group(1) + model = match.group(2).strip() if match.group(2) else None + adapter_port = match.group(3).strip() if match.group(3) else None + + try: + device_num = int(device_num_str) + except ValueError: + continue + + current_device = BroadcomNicDevice( + device_num=device_num, + model=model, + adapter_port=adapter_port, + ) + devices.append(current_device) + + # Check for Device Interface Name line + elif "Device Interface Name" in line and current_device: + parts = line_stripped.split(":") + if len(parts) >= 2: + current_device.interface_name = parts[1].strip() + + # Check for MAC Address line + elif "MAC Address" in line and current_device: + parts = line_stripped.split(":") + if len(parts) >= 2: + # MAC address has colons, so rejoin the parts after first split + mac = ":".join(parts[1:]).strip() + current_device.mac_address = mac + + # Check for PCI Address line + elif "PCI Address" in line and current_device: + parts = line_stripped.split(":") + if len(parts) >= 2: + # PCI address also has colons, rejoin + pci = ":".join(parts[1:]).strip() + current_device.pci_address = pci + + return devices + + def _parse_nicctl_card(self, output: str) -> List[PensandoNicCard]: + """Parse 'nicctl show card' output into PensandoNicCard objects. + + Args: + output: Raw output from 'nicctl show card' command + + Returns: + List of PensandoNicCard objects + """ + cards = [] + + # Skip header lines and separator lines + in_data_section = False + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Skip header line (starts with "Id") + if line_stripped.startswith("Id"): + in_data_section = True + continue + + # Skip separator lines (mostly dashes) + if re.match(r"^-+$", line_stripped): + continue + + # Parse data lines after header + if in_data_section: + # Split by whitespace + parts = line_stripped.split() + + # Expected format: Id PCIe_BDF ASIC F/W_partition Serial_number + if len(parts) >= 2: + card = PensandoNicCard( + id=parts[0], + pcie_bdf=parts[1], + asic=parts[2] if len(parts) > 2 else None, + fw_partition=parts[3] if len(parts) > 3 else None, + serial_number=parts[4] if len(parts) > 4 else None, + ) + cards.append(card) + + return cards + + def _parse_nicctl_dcqcn(self, output: str) -> List[PensandoNicDcqcn]: + """Parse 'nicctl show dcqcn' output into PensandoNicDcqcn objects. + + Args: + output: Raw output from 'nicctl show dcqcn' command + + Returns: + List of PensandoNicDcqcn objects + """ + dcqcn_entries = [] + current_entry = None + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Check for NIC line + if line_stripped.startswith("NIC :"): + # Save previous entry if exists + if current_entry: + dcqcn_entries.append(current_entry) + + # Parse NIC ID and PCIe BDF + # Format: "NIC : ()" + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)", line_stripped, re.IGNORECASE + ) + if match: + nic_id = match.group(1) + pcie_bdf = match.group(2) + current_entry = PensandoNicDcqcn( + nic_id=nic_id, + pcie_bdf=pcie_bdf, + ) + continue + + # Skip separator lines (dashes or asterisks) + if re.match(r"^[-*]+$", line_stripped): + continue + + # Parse fields within current entry + if current_entry and ":" in line_stripped: + parts = line_stripped.split(":", 1) + if len(parts) == 2: + key = parts[0].strip() + value = parts[1].strip() + + if key == "Lif id": + current_entry.lif_id = value + elif key == "ROCE device": + current_entry.roce_device = value + elif key == "DCQCN profile id": + current_entry.dcqcn_profile_id = value + elif key == "Status": + current_entry.status = value + + # Add the last entry if exists + if current_entry: + dcqcn_entries.append(current_entry) + + return dcqcn_entries + + def _parse_nicctl_environment(self, output: str) -> List[PensandoNicEnvironment]: + """Parse 'nicctl show environment' output into PensandoNicEnvironment objects. + + Args: + output: Raw output from 'nicctl show environment' command + + Returns: + List of PensandoNicEnvironment objects + """ + environment_entries = [] + current_entry = None + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Check for NIC line + if line_stripped.startswith("NIC :"): + # Save previous entry if exists + if current_entry: + environment_entries.append(current_entry) + + # Parse NIC ID and PCIe BDF + # Format: "NIC : ()" + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)", line_stripped, re.IGNORECASE + ) + if match: + nic_id = match.group(1) + pcie_bdf = match.group(2) + current_entry = PensandoNicEnvironment( + nic_id=nic_id, + pcie_bdf=pcie_bdf, + ) + continue + + # Skip separator lines (dashes) + if re.match(r"^-+$", line_stripped): + continue + + # Skip section headers (Power(W):, Temperature(C):, etc.) + if line_stripped.endswith("):"): + continue + + # Parse fields within current entry + if current_entry and ":" in line_stripped: + parts = line_stripped.split(":", 1) + if len(parts) == 2: + key = parts[0].strip() + value_str = parts[1].strip() + + # Try to parse the value as float + try: + value = float(value_str) + except ValueError: + continue + + # Map keys to fields + if key == "Total power drawn (pin)" or key == "Total power drawn": + current_entry.total_power_drawn = value + elif key == "Core power (pout1)" or key == "Core power": + current_entry.core_power = value + elif key == "ARM power (pout2)" or key == "ARM power": + current_entry.arm_power = value + elif key == "Local board temperature": + current_entry.local_board_temperature = value + elif key == "Die temperature": + current_entry.die_temperature = value + elif key == "Input voltage": + current_entry.input_voltage = value + elif key == "Core voltage": + current_entry.core_voltage = value + elif key == "Core frequency": + current_entry.core_frequency = value + elif key == "CPU frequency": + current_entry.cpu_frequency = value + elif key == "P4 stage frequency": + current_entry.p4_stage_frequency = value + + # Add the last entry if exists + if current_entry: + environment_entries.append(current_entry) + + return environment_entries + + def _parse_nicctl_pcie_ats(self, output: str) -> List[PensandoNicPcieAts]: + """Parse 'nicctl show pcie ats' output into PensandoNicPcieAts objects. + + Args: + output: Raw output from 'nicctl show pcie ats' command + + Returns: + List of PensandoNicPcieAts objects + """ + pcie_ats_entries = [] + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Parse line format: "NIC : () : " + if line_stripped.startswith("NIC :"): + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)\s*:\s*(\w+)", + line_stripped, + re.IGNORECASE, + ) + if match: + nic_id = match.group(1) + pcie_bdf = match.group(2) + status = match.group(3) + entry = PensandoNicPcieAts( + nic_id=nic_id, + pcie_bdf=pcie_bdf, + status=status, + ) + pcie_ats_entries.append(entry) + + return pcie_ats_entries + + def _parse_nicctl_port(self, output: str) -> List[PensandoNicPort]: + """Parse 'nicctl show port' output into PensandoNicPort objects. + + Args: + output: Raw output from 'nicctl show port' command + + Returns: + List of PensandoNicPort objects + """ + port_entries = [] + current_entry = None + current_section = None # 'spec' or 'status' + current_nic_id = None + current_pcie_bdf = None + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Check for NIC line + if line_stripped.startswith("NIC") and ":" in line_stripped: + # Save previous entry if exists + if current_entry: + port_entries.append(current_entry) + current_entry = None + + # Parse NIC ID and PCIe BDF + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)", line_stripped, re.IGNORECASE + ) + if match: + current_nic_id = match.group(1) + current_pcie_bdf = match.group(2) + continue + + # Check for Port line + if ( + line_stripped.startswith("Port") + and ":" in line_stripped + and current_nic_id + and current_pcie_bdf + ): + # Save previous entry if exists + if current_entry: + port_entries.append(current_entry) + + # Parse Port ID and Port name + match = re.match( + r"Port\s*:\s*([a-f0-9\-]+)\s*\(([^\)]+)\)", line_stripped, re.IGNORECASE + ) + if match: + port_id = match.group(1) + port_name = match.group(2) + current_entry = PensandoNicPort( + nic_id=current_nic_id, + pcie_bdf=current_pcie_bdf, + port_id=port_id, + port_name=port_name, + ) + continue + + # Skip separator lines (dashes) + if re.match(r"^-+$", line_stripped): + continue + + # Check for section headers + if line_stripped.endswith(":"): + if line_stripped == "Spec:": + current_section = "spec" + elif line_stripped == "Status:": + current_section = "status" + continue + + # Parse fields within current entry and section + if current_entry and current_section and ":" in line_stripped: + parts = line_stripped.split(":", 1) + if len(parts) == 2: + key = parts[0].strip() + value = parts[1].strip() + + if current_section == "spec": + if key == "Ifindex": + current_entry.spec_ifindex = value + elif key == "Type": + current_entry.spec_type = value + elif key == "speed": + current_entry.spec_speed = value + elif key == "Admin state": + current_entry.spec_admin_state = value + elif key == "FEC type": + current_entry.spec_fec_type = value + elif key == "Pause type": + current_entry.spec_pause_type = value + elif key == "Number of lanes": + try: + current_entry.spec_num_lanes = int(value) + except ValueError: + pass + elif key == "MTU": + try: + current_entry.spec_mtu = int(value) + except ValueError: + pass + elif key == "TX pause": + current_entry.spec_tx_pause = value + elif key == "RX pause": + current_entry.spec_rx_pause = value + elif key == "Auto negotiation": + current_entry.spec_auto_negotiation = value + elif current_section == "status": + if key == "Physical port": + try: + current_entry.status_physical_port = int(value) + except ValueError: + pass + elif key == "Operational status": + current_entry.status_operational_status = value + elif key == "Link FSM state": + current_entry.status_link_fsm_state = value + elif key == "FEC type": + current_entry.status_fec_type = value + elif key == "Cable type": + current_entry.status_cable_type = value + elif key == "Number of lanes": + try: + current_entry.status_num_lanes = int(value) + except ValueError: + pass + elif key == "speed": + current_entry.status_speed = value + elif key == "Auto negotiation": + current_entry.status_auto_negotiation = value + elif key == "MAC ID": + try: + current_entry.status_mac_id = int(value) + except ValueError: + pass + elif key == "MAC channel": + try: + current_entry.status_mac_channel = int(value) + except ValueError: + pass + elif key == "MAC address": + current_entry.status_mac_address = value + elif key == "Transceiver type": + current_entry.status_transceiver_type = value + elif key == "Transceiver state": + current_entry.status_transceiver_state = value + elif key == "Transceiver PID": + current_entry.status_transceiver_pid = value + + # Add the last entry if exists + if current_entry: + port_entries.append(current_entry) + + return port_entries + + def _parse_nicctl_qos(self, output: str) -> List[PensandoNicQos]: + """Parse 'nicctl show qos' output into PensandoNicQos objects. + + Args: + output: Raw output from 'nicctl show qos' command + + Returns: + List of PensandoNicQos objects + """ + qos_entries = [] + current_entry = None + current_nic_id = None + current_pcie_bdf = None + in_scheduling_table = False + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Check for NIC line: "NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0)" + if line_stripped.startswith("NIC") and ":" in line_stripped: + # Save previous entry if exists + if current_entry: + qos_entries.append(current_entry) + current_entry = None + + # Parse NIC ID and PCIe BDF + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)", line_stripped, re.IGNORECASE + ) + if match: + current_nic_id = match.group(1) + current_pcie_bdf = match.group(2) + in_scheduling_table = False + continue + + # Check for Port line: "Port : 0490814a-6c40-4242-4242-000011010000" + if ( + line_stripped.startswith("Port") + and ":" in line_stripped + and current_nic_id + and current_pcie_bdf + ): + # Save previous entry if exists + if current_entry: + qos_entries.append(current_entry) + + # Parse Port ID + parts = line_stripped.split(":") + if len(parts) >= 2: + port_id = parts[1].strip() + current_entry = PensandoNicQos( + nic_id=current_nic_id, + pcie_bdf=current_pcie_bdf, + port_id=port_id, + ) + in_scheduling_table = False + continue + + # Skip separator lines (dashes) but don't reset scheduling table flag + if re.match(r"^-+$", line_stripped): + continue + + # Check for section headers + if current_entry: + # Classification type + if "Classification type" in line: + parts = line_stripped.split(":") + if len(parts) >= 2: + current_entry.classification_type = parts[1].strip() + + # DSCP bitmap + elif "DSCP bitmap" in line and "==>" in line: + parts = line_stripped.split("==>") + if len(parts) >= 2: + bitmap_part = parts[0].split(":") + if len(bitmap_part) >= 2: + current_entry.dscp_bitmap = bitmap_part[1].strip() + priority_part = parts[1].split(":") + if len(priority_part) >= 2: + try: + current_entry.dscp_priority = int(priority_part[1].strip()) + except ValueError: + pass + + # DSCP range + elif line_stripped.startswith("DSCP") and "==>" in line and "bitmap" not in line: + parts = line_stripped.split("==>") + if len(parts) >= 2: + dscp_part = parts[0].split(":") + if len(dscp_part) >= 2: + current_entry.dscp_range = dscp_part[1].strip() + priority_part = parts[1].split(":") + if len(priority_part) >= 2: + try: + current_entry.dscp_priority = int(priority_part[1].strip()) + except ValueError: + pass + + # PFC priority bitmap + elif "PFC priority bitmap" in line: + parts = line_stripped.split(":") + if len(parts) >= 2: + current_entry.pfc_priority_bitmap = parts[1].strip() + + # PFC no-drop priorities + elif "PFC no-drop priorities" in line: + parts = line_stripped.split(":") + if len(parts) >= 2: + current_entry.pfc_no_drop_priorities = parts[1].strip() + + # Scheduling table header + elif "Priority" in line and "Scheduling" in line: + in_scheduling_table = True + continue + + # Parse scheduling table entries + elif in_scheduling_table and not line_stripped.startswith("---"): + # Try to parse scheduling entry + # Format: "0 DWRR 0 N/A" + parts = line_stripped.split() + if len(parts) >= 2: + try: + priority = int(parts[0]) + scheduling_type = parts[1] if len(parts) > 1 else None + bandwidth = None + rate_limit = None + if len(parts) > 2: + try: + bandwidth = int(parts[2]) + except ValueError: + pass + if len(parts) > 3: + rate_limit = parts[3] + + sched_entry = PensandoNicQosScheduling( + priority=priority, + scheduling_type=scheduling_type, + bandwidth=bandwidth, + rate_limit=rate_limit, + ) + current_entry.scheduling.append(sched_entry) + except (ValueError, IndexError): + pass + + # Add the last entry if exists + if current_entry: + qos_entries.append(current_entry) + + return qos_entries + + def _parse_nicctl_rdma_statistics(self, output: str) -> List[PensandoNicRdmaStatistics]: + """Parse 'nicctl show rdma statistics' output into PensandoNicRdmaStatistics objects. + + Args: + output: Raw output from 'nicctl show rdma statistics' command + + Returns: + List of PensandoNicRdmaStatistics objects + """ + rdma_stats_entries = [] + current_entry = None + in_statistics_table = False + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Check for NIC line: "NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0)" + if line_stripped.startswith("NIC") and ":" in line_stripped: + # Save previous entry if exists + if current_entry: + rdma_stats_entries.append(current_entry) + + # Parse NIC ID and PCIe BDF + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)", line_stripped, re.IGNORECASE + ) + if match: + nic_id = match.group(1) + pcie_bdf = match.group(2) + current_entry = PensandoNicRdmaStatistics( + nic_id=nic_id, + pcie_bdf=pcie_bdf, + ) + in_statistics_table = False + continue + + # Skip separator lines (dashes) + if re.match(r"^-+$", line_stripped): + continue + + # Check for table header + if "Name" in line and "Count" in line: + in_statistics_table = True + continue + + # Parse statistics entries + if current_entry and in_statistics_table: + # The format is: "Queue pair create 1" + # We need to split from the right to get the count + parts = line_stripped.rsplit(None, 1) # Split from right, max 1 split + if len(parts) == 2: + name = parts[0].strip() + count_str = parts[1].strip() + try: + count = int(count_str) + stat_entry = PensandoNicRdmaStatistic( + name=name, + count=count, + ) + current_entry.statistics.append(stat_entry) + except ValueError: + pass + + # Add the last entry if exists + if current_entry: + rdma_stats_entries.append(current_entry) + + return rdma_stats_entries + + def _parse_nicctl_version_host_software( + self, output: str + ) -> Optional[PensandoNicVersionHostSoftware]: + """Parse 'nicctl show version host-software' output into PensandoNicVersionHostSoftware object. + + Args: + output: Raw output from 'nicctl show version host-software' command + + Returns: + PensandoNicVersionHostSoftware object or None if no data found + """ + version_info = PensandoNicVersionHostSoftware() + found_data = False + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped or ":" not in line_stripped: + continue + + # Split on the first colon to get key and value + parts = line_stripped.split(":", 1) + if len(parts) != 2: + continue + + key = parts[0].strip().lower() + value = parts[1].strip() + + if "nicctl" in key: + version_info.nicctl = value + found_data = True + elif "ipc driver" in key or "ipc_driver" in key: + version_info.ipc_driver = value + found_data = True + elif "ionic driver" in key or "ionic_driver" in key: + version_info.ionic_driver = value + found_data = True + + return version_info if found_data else None + + def _parse_nicctl_version_firmware(self, output: str) -> List[PensandoNicVersionFirmware]: + """Parse 'nicctl show version firmware' output into PensandoNicVersionFirmware objects. + + Args: + output: Raw output from 'nicctl show version firmware' command + + Returns: + List of PensandoNicVersionFirmware objects + """ + firmware_entries = [] + current_entry = None + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Skip separator lines (dashes) + if re.match(r"^-+$", line_stripped): + # Save previous entry when we hit a separator + if current_entry: + firmware_entries.append(current_entry) + current_entry = None + continue + + # Check for NIC line + if line_stripped.startswith("NIC") and ":" in line_stripped: + # Save previous entry if exists + if current_entry: + firmware_entries.append(current_entry) + + # Parse NIC ID and PCIe BDF + match = re.match( + r"NIC\s*:\s*([a-f0-9\-]+)\s*\(([0-9a-f:\.]+)\)", line_stripped, re.IGNORECASE + ) + if match: + nic_id = match.group(1) + pcie_bdf = match.group(2) + current_entry = PensandoNicVersionFirmware( + nic_id=nic_id, + pcie_bdf=pcie_bdf, + ) + continue + + # Parse version fields + if current_entry and ":" in line_stripped: + parts = line_stripped.split(":", 1) + if len(parts) == 2: + key = parts[0].strip().lower() + value = parts[1].strip() + + if "cpld" in key: + current_entry.cpld = value + elif "boot0" in key: + current_entry.boot0 = value + elif "uboot-a" in key or "uboot_a" in key: + current_entry.uboot_a = value + elif "firmware-a" in key or "firmware_a" in key: + current_entry.firmware_a = value + elif ( + "device config-a" in key + or "device_config_a" in key + or "device config" in key + ): + current_entry.device_config_a = value + + # Add the last entry if exists + if current_entry: + firmware_entries.append(current_entry) + + return firmware_entries + + def _parse_niccli_qos(self, device_num: int, output: str) -> BroadcomNicQos: + """Parse 'niccli --dev X qos --ets --show' output into BroadcomNicQos object. + + Args: + device_num: Device number + output: Raw output from 'niccli --dev X qos --ets --show' command + + Returns: + BroadcomNicQos object with parsed data + """ + qos_info = BroadcomNicQos(device_num=device_num, raw_output=output) + + current_app_entry = None + + for line in output.splitlines(): + line_stripped = line.strip() + if not line_stripped: + continue + + # Parse PRIO_MAP: "PRIO_MAP: 0:0 1:0 2:0 3:1 4:0 5:0 6:0 7:2" + if "PRIO_MAP:" in line: + parts = line.split("PRIO_MAP:") + if len(parts) >= 2: + prio_entries = parts[1].strip().split() + for entry in prio_entries: + if ":" in entry: + prio, tc = entry.split(":") + try: + qos_info.prio_map[int(prio)] = int(tc) + except ValueError: + pass + + # Parse TC Bandwidth: "TC Bandwidth: 50% 50% 0%" + elif "TC Bandwidth:" in line: + parts = line.split("TC Bandwidth:") + if len(parts) >= 2: + bandwidth_entries = parts[1].strip().split() + for bw in bandwidth_entries: + bw_clean = bw.rstrip("%") + try: + qos_info.tc_bandwidth.append(int(bw_clean)) + except ValueError: + pass + + # Parse TSA_MAP: "TSA_MAP: 0:ets 1:ets 2:strict" + elif "TSA_MAP:" in line: + parts = line.split("TSA_MAP:") + if len(parts) >= 2: + tsa_entries = parts[1].strip().split() + for entry in tsa_entries: + if ":" in entry: + tc, tsa = entry.split(":", 1) + try: + qos_info.tsa_map[int(tc)] = tsa + except ValueError: + pass + + # Parse PFC enabled: "PFC enabled: 3" + elif "PFC enabled:" in line: + parts = line.split("PFC enabled:") + if len(parts) >= 2: + try: + qos_info.pfc_enabled = int(parts[1].strip()) + except ValueError: + pass + + # Parse APP entries - detect start of new APP entry + elif line_stripped.startswith("APP#"): + # Save previous entry if exists + if current_app_entry: + qos_info.app_entries.append(current_app_entry) + current_app_entry = BroadcomNicQosAppEntry() + + # Parse Priority within APP entry + elif "Priority:" in line and current_app_entry is not None: + parts = line.split("Priority:") + if len(parts) >= 2: + try: + current_app_entry.priority = int(parts[1].strip()) + except ValueError: + pass + + # Parse Sel within APP entry + elif "Sel:" in line and current_app_entry is not None: + parts = line.split("Sel:") + if len(parts) >= 2: + try: + current_app_entry.sel = int(parts[1].strip()) + except ValueError: + pass + + # Parse DSCP within APP entry + elif "DSCP:" in line and current_app_entry is not None: + parts = line.split("DSCP:") + if len(parts) >= 2: + try: + current_app_entry.dscp = int(parts[1].strip()) + except ValueError: + pass + + # Parse protocol and port (e.g., "UDP or DCCP: 4791") + elif ( + "UDP" in line or "TCP" in line or "DCCP" in line + ) and current_app_entry is not None: + if ":" in line: + parts = line.split(":") + if len(parts) >= 2: + current_app_entry.protocol = parts[0].strip() + try: + current_app_entry.port = int(parts[1].strip()) + except ValueError: + pass + + # Parse TC Rate Limit: "TC Rate Limit: 100% 100% 100% 0% 0% 0% 0% 0%" + elif "TC Rate Limit:" in line: + parts = line.split("TC Rate Limit:") + if len(parts) >= 2: + rate_entries = parts[1].strip().split() + for rate in rate_entries: + rate_clean = rate.rstrip("%") + try: + qos_info.tc_rate_limit.append(int(rate_clean)) + except ValueError: + pass + + # Add the last APP entry if exists + if current_app_entry: + qos_info.app_entries.append(current_app_entry) + + return qos_info + def _collect_ethtool_info(self, interfaces: List[NetworkInterface]) -> Dict[str, EthtoolInfo]: """Collect ethtool information for all network interfaces. @@ -444,7 +1391,7 @@ def _collect_ethtool_info(self, interfaces: List[NetworkInterface]) -> Dict[str, for iface in interfaces: cmd = self.CMD_ETHTOOL_TEMPLATE.format(interface=iface.name) - res_ethtool = self._run_sut_cmd(cmd) + res_ethtool = self._run_sut_cmd(cmd, sudo=True) if res_ethtool.exit_code == 0: ethtool_info = self._parse_ethtool(iface.name, res_ethtool.stdout) @@ -464,6 +1411,303 @@ def _collect_ethtool_info(self, interfaces: List[NetworkInterface]) -> Dict[str, return ethtool_data + def _collect_lldp_info(self) -> None: + """Collect LLDP information using lldpcli and lldpctl commands.""" + # Run lldpcli show neighbor + res_lldpcli = self._run_sut_cmd(self.CMD_LLDPCLI_NEIGHBOR, sudo=True) + if res_lldpcli.exit_code == 0: + self._log_event( + category=EventCategory.NETWORK, + description="Collected LLDP neighbor information (lldpcli)", + priority=EventPriority.INFO, + ) + else: + self._log_event( + category=EventCategory.NETWORK, + description="LLDP neighbor collection failed or lldpcli not available", + data={"command": res_lldpcli.command, "exit_code": res_lldpcli.exit_code}, + priority=EventPriority.INFO, + ) + + # Run lldpctl + res_lldpctl = self._run_sut_cmd(self.CMD_LLDPCTL, sudo=True) + if res_lldpctl.exit_code == 0: + self._log_event( + category=EventCategory.NETWORK, + description="Collected LLDP information (lldpctl)", + priority=EventPriority.INFO, + ) + else: + self._log_event( + category=EventCategory.NETWORK, + description="LLDP collection failed or lldpctl not available", + data={"command": res_lldpctl.command, "exit_code": res_lldpctl.exit_code}, + priority=EventPriority.INFO, + ) + + def _collect_broadcom_nic_info( + self, + ) -> Tuple[List[BroadcomNicDevice], Dict[int, BroadcomNicQos]]: + """Collect Broadcom NIC information using niccli commands. + + Returns: + Tuple of (list of BroadcomNicDevice, dict mapping device number to BroadcomNicQos) + """ + devices = [] + qos_data = {} + + # First, list devices + res_listdev = self._run_sut_cmd(self.CMD_NICCLI_LISTDEV, sudo=True) + if res_listdev.exit_code == 0: + # Parse device list + devices = self._parse_niccli_listdev(res_listdev.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Broadcom NIC device list: {len(devices)} devices", + priority=EventPriority.INFO, + ) + + # Collect QoS info for each device + for device in devices: + cmd = self.CMD_NICCLI_GETQOS_TEMPLATE.format(device_num=device.device_num) + res_qos = self._run_sut_cmd(cmd, sudo=True) + if res_qos.exit_code == 0: + qos_info = self._parse_niccli_qos(device.device_num, res_qos.stdout) + qos_data[device.device_num] = qos_info + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Broadcom NIC QoS info for device {device.device_num}", + priority=EventPriority.INFO, + ) + else: + self._log_event( + category=EventCategory.NETWORK, + description=f"Failed to collect QoS info for device {device.device_num}", + data={"command": res_qos.command, "exit_code": res_qos.exit_code}, + priority=EventPriority.WARNING, + ) + + if qos_data: + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Broadcom NIC QoS info for {len(qos_data)} devices", + priority=EventPriority.INFO, + ) + else: + self._log_event( + category=EventCategory.NETWORK, + description="Broadcom NIC collection failed or niccli not available", + data={"command": res_listdev.command, "exit_code": res_listdev.exit_code}, + priority=EventPriority.INFO, + ) + + return devices, qos_data + + def _collect_pensando_nic_info( + self, + ) -> Tuple[ + List[PensandoNicCard], + List[PensandoNicDcqcn], + List[PensandoNicEnvironment], + List[PensandoNicPcieAts], + List[PensandoNicPort], + List[PensandoNicQos], + List[PensandoNicRdmaStatistics], + Optional[PensandoNicVersionHostSoftware], + List[PensandoNicVersionFirmware], + ]: + """Collect Pensando NIC information using nicctl commands. + + Returns: + Tuple of (list of PensandoNicCard, list of PensandoNicDcqcn, + list of PensandoNicEnvironment, list of PensandoNicPcieAts, + list of PensandoNicPort, list of PensandoNicQos, + list of PensandoNicRdmaStatistics, + PensandoNicVersionHostSoftware object, + list of PensandoNicVersionFirmware) + """ + cards = [] + dcqcn_entries = [] + environment_entries = [] + pcie_ats_entries = [] + port_entries = [] + qos_entries = [] + rdma_statistics_entries = [] + version_host_software = None + version_firmware_entries = [] + collected_count = 0 + + # Track which commands succeeded and which failed + collected_commands = [] + uncollected_commands = [] + + # Parse nicctl show card output + res_card = self._run_sut_cmd(self.CMD_NICCTL_CARD, sudo=True) + if res_card.exit_code == 0: + cards = self._parse_nicctl_card(res_card.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC card list: {len(cards)} cards", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_CARD) + else: + uncollected_commands.append(self.CMD_NICCTL_CARD) + + # Parse nicctl show dcqcn output + res_dcqcn = self._run_sut_cmd(self.CMD_NICCTL_DCQCN, sudo=True) + if res_dcqcn.exit_code == 0: + dcqcn_entries = self._parse_nicctl_dcqcn(res_dcqcn.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC DCQCN info: {len(dcqcn_entries)} entries", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_DCQCN) + else: + uncollected_commands.append(self.CMD_NICCTL_DCQCN) + + # Parse nicctl show environment output + res_environment = self._run_sut_cmd(self.CMD_NICCTL_ENVIRONMENT, sudo=True) + if res_environment.exit_code == 0: + environment_entries = self._parse_nicctl_environment(res_environment.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC environment info: {len(environment_entries)} entries", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_ENVIRONMENT) + else: + uncollected_commands.append(self.CMD_NICCTL_ENVIRONMENT) + + # Parse nicctl show pcie ats output + res_pcie_ats = self._run_sut_cmd(self.CMD_NICCTL_PCIE_ATS, sudo=True) + if res_pcie_ats.exit_code == 0: + pcie_ats_entries = self._parse_nicctl_pcie_ats(res_pcie_ats.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC PCIe ATS info: {len(pcie_ats_entries)} entries", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_PCIE_ATS) + else: + uncollected_commands.append(self.CMD_NICCTL_PCIE_ATS) + + # Parse nicctl show port output + res_port = self._run_sut_cmd(self.CMD_NICCTL_PORT, sudo=True) + if res_port.exit_code == 0: + port_entries = self._parse_nicctl_port(res_port.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC port info: {len(port_entries)} ports", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_PORT) + else: + uncollected_commands.append(self.CMD_NICCTL_PORT) + + # Parse nicctl show qos output + res_qos = self._run_sut_cmd(self.CMD_NICCTL_QOS, sudo=True) + if res_qos.exit_code == 0: + qos_entries = self._parse_nicctl_qos(res_qos.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC QoS info: {len(qos_entries)} entries", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_QOS) + else: + uncollected_commands.append(self.CMD_NICCTL_QOS) + + # Parse nicctl show rdma statistics output + res_rdma_stats = self._run_sut_cmd(self.CMD_NICCTL_RDMA_STATISTICS, sudo=True) + if res_rdma_stats.exit_code == 0: + rdma_statistics_entries = self._parse_nicctl_rdma_statistics(res_rdma_stats.stdout) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC RDMA statistics: {len(rdma_statistics_entries)} entries", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_RDMA_STATISTICS) + else: + uncollected_commands.append(self.CMD_NICCTL_RDMA_STATISTICS) + + # Parse nicctl show version host-software output + res_version_host = self._run_sut_cmd(self.CMD_NICCTL_VERSION_HOST_SOFTWARE, sudo=True) + if res_version_host.exit_code == 0: + version_host_software = self._parse_nicctl_version_host_software( + res_version_host.stdout + ) + if version_host_software: + self._log_event( + category=EventCategory.NETWORK, + description="Collected Pensando NIC host software version", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_VERSION_HOST_SOFTWARE) + else: + uncollected_commands.append(self.CMD_NICCTL_VERSION_HOST_SOFTWARE) + else: + uncollected_commands.append(self.CMD_NICCTL_VERSION_HOST_SOFTWARE) + + # Parse nicctl show version firmware output + res_version_firmware = self._run_sut_cmd(self.CMD_NICCTL_VERSION_FIRMWARE, sudo=True) + if res_version_firmware.exit_code == 0: + version_firmware_entries = self._parse_nicctl_version_firmware( + res_version_firmware.stdout + ) + self._log_event( + category=EventCategory.NETWORK, + description=f"Collected Pensando NIC firmware versions: {len(version_firmware_entries)} entries", + priority=EventPriority.INFO, + ) + collected_count += 1 + collected_commands.append(self.CMD_NICCTL_VERSION_FIRMWARE) + else: + uncollected_commands.append(self.CMD_NICCTL_VERSION_FIRMWARE) + + # Log summary of collected and uncollected commands + if collected_commands: + self._log_event( + category=EventCategory.NETWORK, + description=f"Successfully collected {len(collected_commands)} nicctl commands: {', '.join(collected_commands)}", + priority=EventPriority.INFO, + ) + + if uncollected_commands: + self._log_event( + category=EventCategory.NETWORK, + description=f"Failed to collect {len(uncollected_commands)} nicctl commands: {', '.join(uncollected_commands)}", + priority=EventPriority.WARNING, + ) + + if not collected_commands and not uncollected_commands: + self._log_event( + category=EventCategory.NETWORK, + description="Pensando NIC collection failed or nicctl not available", + priority=EventPriority.INFO, + ) + + return ( + cards, + dcqcn_entries, + environment_entries, + pcie_ats_entries, + port_entries, + qos_entries, + rdma_statistics_entries, + version_host_software, + version_firmware_entries, + ) + def collect_data( self, args=None, @@ -479,6 +1723,17 @@ def collect_data( rules = [] neighbors = [] ethtool_data = {} + broadcom_devices: List[BroadcomNicDevice] = [] + broadcom_qos_data: Dict[int, BroadcomNicQos] = {} + pensando_cards: List[PensandoNicCard] = [] + pensando_dcqcn: List[PensandoNicDcqcn] = [] + pensando_environment: List[PensandoNicEnvironment] = [] + pensando_pcie_ats: List[PensandoNicPcieAts] = [] + pensando_ports: List[PensandoNicPort] = [] + pensando_qos: List[PensandoNicQos] = [] + pensando_rdma_statistics: List[PensandoNicRdmaStatistics] = [] + pensando_version_host_software: Optional[PensandoNicVersionHostSoftware] = None + pensando_version_firmware: List[PensandoNicVersionFirmware] = [] # Collect interface/address information res_addr = self._run_sut_cmd(self.CMD_ADDR) @@ -558,22 +1813,55 @@ def collect_data( priority=EventPriority.WARNING, ) - if interfaces or routes or rules or neighbors: - network_data = NetworkDataModel( - interfaces=interfaces, - routes=routes, - rules=rules, - neighbors=neighbors, - ethtool_info=ethtool_data, - ) - self.result.message = ( - f"Collected network data: {len(interfaces)} interfaces, " - f"{len(routes)} routes, {len(rules)} rules, {len(neighbors)} neighbors, " - f"{len(ethtool_data)} ethtool entries" - ) - self.result.status = ExecutionStatus.OK - return self.result, network_data - else: - self.result.message = "Failed to collect network data" - self.result.status = ExecutionStatus.ERROR - return self.result, None + # Collect LLDP information + self._collect_lldp_info() + + # Collect Broadcom NIC information + broadcom_devices, broadcom_qos_data = self._collect_broadcom_nic_info() + + # Collect Pensando NIC information + ( + pensando_cards, + pensando_dcqcn, + pensando_environment, + pensando_pcie_ats, + pensando_ports, + pensando_qos, + pensando_rdma_statistics, + pensando_version_host_software, + pensando_version_firmware, + ) = self._collect_pensando_nic_info() + + network_data = NetworkDataModel( + interfaces=interfaces, + routes=routes, + rules=rules, + neighbors=neighbors, + ethtool_info=ethtool_data, + broadcom_nic_devices=broadcom_devices, + broadcom_nic_qos=broadcom_qos_data, + pensando_nic_cards=pensando_cards, + pensando_nic_dcqcn=pensando_dcqcn, + pensando_nic_environment=pensando_environment, + pensando_nic_pcie_ats=pensando_pcie_ats, + pensando_nic_ports=pensando_ports, + pensando_nic_qos=pensando_qos, + pensando_nic_rdma_statistics=pensando_rdma_statistics, + pensando_nic_version_host_software=pensando_version_host_software, + pensando_nic_version_firmware=pensando_version_firmware, + ) + self.result.message = ( + f"Collected network data: {len(interfaces)} interfaces, " + f"{len(routes)} routes, {len(rules)} rules, {len(neighbors)} neighbors, " + f"{len(ethtool_data)} ethtool entries, {len(broadcom_devices)} Broadcom NICs, " + f"{len(pensando_cards)} Pensando NICs, {len(pensando_dcqcn)} Pensando DCQCN entries, " + f"{len(pensando_environment)} Pensando environment entries, " + f"{len(pensando_pcie_ats)} Pensando PCIe ATS entries, " + f"{len(pensando_ports)} Pensando ports, " + f"{len(pensando_qos)} Pensando QoS entries, " + f"{len(pensando_rdma_statistics)} Pensando RDMA statistics, " + f"Pensando host software version: {'Yes' if pensando_version_host_software else 'No'}, " + f"{len(pensando_version_firmware)} Pensando firmware versions" + ) + self.result.status = ExecutionStatus.OK + return self.result, network_data diff --git a/nodescraper/plugins/inband/network/networkdata.py b/nodescraper/plugins/inband/network/networkdata.py index 5e94efc2..34d1f63e 100644 --- a/nodescraper/plugins/inband/network/networkdata.py +++ b/nodescraper/plugins/inband/network/networkdata.py @@ -105,6 +105,195 @@ class EthtoolInfo(BaseModel): link_detected: Optional[str] = None # Link detection status (e.g., "yes", "no") +class BroadcomNicDevice(BaseModel): + """Broadcom NIC device information from niccli --list_devices""" + + device_num: int # Device number (1, 2, 3, etc.) + model: Optional[str] = None # e.g., "Broadcom BCM57608 1x400G QSFP-DD PCIe Ethernet NIC" + adapter_port: Optional[str] = None # e.g., "Adp#1 Port#1" + interface_name: Optional[str] = None # e.g., "benic1p1" + mac_address: Optional[str] = None # e.g., "8C:84:74:37:C3:70" + pci_address: Optional[str] = None # e.g., "0000:06:00.0" + + +class BroadcomNicQosAppEntry(BaseModel): + """APP TLV entry in Broadcom NIC QoS configuration""" + + priority: Optional[int] = None + sel: Optional[int] = None + dscp: Optional[int] = None + protocol: Optional[str] = None # "UDP or DCCP", etc. + port: Optional[int] = None + + +class BroadcomNicQos(BaseModel): + """Broadcom NIC QoS information from niccli --dev X qos --ets --show""" + + device_num: int # Device number this QoS info belongs to + raw_output: str # Raw command output + # ETS Configuration + prio_map: Dict[int, int] = Field( + default_factory=dict + ) # Priority to TC mapping {0: 0, 1: 0, ...} + tc_bandwidth: List[int] = Field( + default_factory=list + ) # TC bandwidth percentages [50, 50, 0, ...] + tsa_map: Dict[int, str] = Field( + default_factory=dict + ) # TC to TSA mapping {0: "ets", 1: "ets", ...} + # PFC Configuration + pfc_enabled: Optional[int] = None # Bitmap of PFC enabled priorities + # APP TLV entries + app_entries: List[BroadcomNicQosAppEntry] = Field(default_factory=list) + # TC Rate Limit + tc_rate_limit: List[int] = Field(default_factory=list) # TC rate limits [100, 100, 100, ...] + + +class PensandoNicCard(BaseModel): + """Pensando NIC card information from nicctl show card""" + + id: str # Card ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + asic: Optional[str] = None # ASIC type (e.g., "salina") + fw_partition: Optional[str] = None # Firmware partition (e.g., "A") + serial_number: Optional[str] = None # Serial number (e.g., "FPL25330294") + + +class PensandoNicDcqcn(BaseModel): + """Pensando NIC DCQCN information from nicctl show dcqcn""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + lif_id: Optional[str] = None # Lif ID (UUID format) + roce_device: Optional[str] = None # ROCE device name (e.g., "rocep9s0") + dcqcn_profile_id: Optional[str] = None # DCQCN profile id (e.g., "1") + status: Optional[str] = None # Status (e.g., "Disabled") + + +class PensandoNicEnvironment(BaseModel): + """Pensando NIC environment information from nicctl show environment""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + # Power measurements in Watts + total_power_drawn: Optional[float] = None # Total power drawn (pin) + core_power: Optional[float] = None # Core power (pout1) + arm_power: Optional[float] = None # ARM power (pout2) + # Temperature measurements in Celsius + local_board_temperature: Optional[float] = None # Local board temperature + die_temperature: Optional[float] = None # Die temperature + # Voltage measurements in millivolts + input_voltage: Optional[float] = None # Input voltage + core_voltage: Optional[float] = None # Core voltage + # Frequency measurements in MHz + core_frequency: Optional[float] = None # Core frequency + cpu_frequency: Optional[float] = None # CPU frequency + p4_stage_frequency: Optional[float] = None # P4 stage frequency + + +class PensandoNicPcieAts(BaseModel): + """Pensando NIC PCIe ATS information from nicctl show pcie ats""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + status: str # Status (e.g., "Disabled", "Enabled") + + +class PensandoNicPort(BaseModel): + """Pensando NIC port information from nicctl show port""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + port_id: str # Port ID (UUID format) + port_name: str # Port name (e.g., "eth1/1") + # Spec fields + spec_ifindex: Optional[str] = None + spec_type: Optional[str] = None + spec_speed: Optional[str] = None + spec_admin_state: Optional[str] = None + spec_fec_type: Optional[str] = None + spec_pause_type: Optional[str] = None + spec_num_lanes: Optional[int] = None + spec_mtu: Optional[int] = None + spec_tx_pause: Optional[str] = None + spec_rx_pause: Optional[str] = None + spec_auto_negotiation: Optional[str] = None + # Status fields + status_physical_port: Optional[int] = None + status_operational_status: Optional[str] = None + status_link_fsm_state: Optional[str] = None + status_fec_type: Optional[str] = None + status_cable_type: Optional[str] = None + status_num_lanes: Optional[int] = None + status_speed: Optional[str] = None + status_auto_negotiation: Optional[str] = None + status_mac_id: Optional[int] = None + status_mac_channel: Optional[int] = None + status_mac_address: Optional[str] = None + status_transceiver_type: Optional[str] = None + status_transceiver_state: Optional[str] = None + status_transceiver_pid: Optional[str] = None + + +class PensandoNicQosScheduling(BaseModel): + """QoS Scheduling entry""" + + priority: int + scheduling_type: Optional[str] = None # e.g., "DWRR" + bandwidth: Optional[int] = None # Bandwidth in percentage + rate_limit: Optional[str] = None # Rate limit (e.g., "N/A" or value in Gbps) + + +class PensandoNicQos(BaseModel): + """Pensando NIC QoS information from nicctl show qos""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + port_id: str # Port ID (UUID format) + classification_type: Optional[str] = None # e.g., "DSCP" + dscp_bitmap: Optional[str] = None # DSCP bitmap + dscp_range: Optional[str] = None # DSCP range (e.g., "0-63") + dscp_priority: Optional[int] = None # Priority mapped from DSCP + pfc_priority_bitmap: Optional[str] = None # PFC priority bitmap + pfc_no_drop_priorities: Optional[str] = None # PFC no-drop priorities + scheduling: List[PensandoNicQosScheduling] = Field(default_factory=list) # Scheduling entries + + +class PensandoNicRdmaStatistic(BaseModel): + """RDMA statistic entry""" + + name: str # Statistic name + count: int # Count value + + +class PensandoNicRdmaStatistics(BaseModel): + """Pensando NIC RDMA statistics from nicctl show rdma statistics""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + statistics: List[PensandoNicRdmaStatistic] = Field(default_factory=list) # Statistics entries + + +class PensandoNicVersionHostSoftware(BaseModel): + """Pensando NIC host software version from nicctl show version host-software""" + + nicctl: Optional[str] = None # nicctl version + ipc_driver: Optional[str] = None # IPC driver version + ionic_driver: Optional[str] = None # ionic driver version + + +class PensandoNicVersionFirmware(BaseModel): + """Pensando NIC firmware version from nicctl show version firmware""" + + nic_id: str # NIC ID (UUID format) + pcie_bdf: str # PCIe Bus:Device.Function (e.g., "0000:06:00.0") + cpld: Optional[str] = None # CPLD version + boot0: Optional[str] = None # Boot0 version + uboot_a: Optional[str] = None # Uboot-A version + firmware_a: Optional[str] = None # Firmware-A version + device_config_a: Optional[str] = None # Device config-A version + + class NetworkDataModel(DataModel): """Complete network configuration data""" @@ -115,3 +304,16 @@ class NetworkDataModel(DataModel): ethtool_info: Dict[str, EthtoolInfo] = Field( default_factory=dict ) # Interface name -> EthtoolInfo mapping + broadcom_nic_devices: List[BroadcomNicDevice] = Field(default_factory=list) + broadcom_nic_qos: Dict[int, BroadcomNicQos] = Field( + default_factory=dict + ) # Device number -> QoS info mapping + pensando_nic_cards: List[PensandoNicCard] = Field(default_factory=list) + pensando_nic_dcqcn: List[PensandoNicDcqcn] = Field(default_factory=list) + pensando_nic_environment: List[PensandoNicEnvironment] = Field(default_factory=list) + pensando_nic_pcie_ats: List[PensandoNicPcieAts] = Field(default_factory=list) + pensando_nic_ports: List[PensandoNicPort] = Field(default_factory=list) + pensando_nic_qos: List[PensandoNicQos] = Field(default_factory=list) + pensando_nic_rdma_statistics: List[PensandoNicRdmaStatistics] = Field(default_factory=list) + pensando_nic_version_host_software: Optional[PensandoNicVersionHostSoftware] = None + pensando_nic_version_firmware: List[PensandoNicVersionFirmware] = Field(default_factory=list) diff --git a/test/unit/plugin/test_network_collector.py b/test/unit/plugin/test_network_collector.py index 9d7e7546..ba5a151d 100644 --- a/test/unit/plugin/test_network_collector.py +++ b/test/unit/plugin/test_network_collector.py @@ -32,11 +32,20 @@ from nodescraper.models.systeminfo import OSFamily from nodescraper.plugins.inband.network.network_collector import NetworkCollector from nodescraper.plugins.inband.network.networkdata import ( + BroadcomNicDevice, + BroadcomNicQos, EthtoolInfo, IpAddress, Neighbor, NetworkDataModel, NetworkInterface, + PensandoNicCard, + PensandoNicDcqcn, + PensandoNicEnvironment, + PensandoNicPcieAts, + PensandoNicPort, + PensandoNicQos, + PensandoNicQosScheduling, Route, RoutingRule, ) @@ -293,7 +302,7 @@ def test_collect_data_success(collector, conn_mock): collector.system_info.os_family = OSFamily.LINUX # Mock successful command execution - def run_sut_cmd_side_effect(cmd): + def run_sut_cmd_side_effect(cmd, **kwargs): if "addr show" in cmd: return MagicMock(exit_code=0, stdout=IP_ADDR_OUTPUT, command=cmd) elif "route show" in cmd: @@ -305,6 +314,15 @@ def run_sut_cmd_side_effect(cmd): elif "ethtool" in cmd: # Fail ethtool commands (simulating no sudo or not supported) return MagicMock(exit_code=1, stdout="", command=cmd) + elif "lldpcli" in cmd or "lldpctl" in cmd: + # LLDP commands fail (not available) + return MagicMock(exit_code=1, stdout="", command=cmd) + elif "niccli" in cmd: + # Broadcom NIC commands fail (not available) + return MagicMock(exit_code=1, stdout="", command=cmd) + elif "nicctl" in cmd: + # Pensando NIC commands fail (not available) + return MagicMock(exit_code=1, stdout="", command=cmd) return MagicMock(exit_code=1, stdout="", command=cmd) collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) @@ -330,9 +348,9 @@ def test_collect_data_addr_failure(collector, conn_mock): collector.system_info.os_family = OSFamily.LINUX # Mock failed addr command but successful others - def run_sut_cmd_side_effect(cmd): + def run_sut_cmd_side_effect(cmd, **kwargs): if "addr show" in cmd: - return MagicMock(exit_code=1, stdout="", command=cmd) + return MagicMock(exit_code=1, command=cmd) elif "route show" in cmd: return MagicMock(exit_code=0, stdout=IP_ROUTE_OUTPUT, command=cmd) elif "rule show" in cmd: @@ -340,8 +358,17 @@ def run_sut_cmd_side_effect(cmd): elif "neighbor show" in cmd: return MagicMock(exit_code=0, stdout=IP_NEIGHBOR_OUTPUT, command=cmd) elif "ethtool" in cmd: - return MagicMock(exit_code=1, stdout="", command=cmd) - return MagicMock(exit_code=1, stdout="", command=cmd) + return MagicMock(exit_code=1, command=cmd) + elif "lldpcli" in cmd or "lldpctl" in cmd: + # LLDP commands fail (not available) + return MagicMock(exit_code=1, command=cmd) + elif "niccli" in cmd: + # Broadcom NIC commands fail (not available) + return MagicMock(exit_code=1, command=cmd) + elif "nicctl" in cmd: + # Pensando NIC commands fail (not available) + return MagicMock(exit_code=1, command=cmd) + return MagicMock(exit_code=1, command=cmd) collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) @@ -362,16 +389,20 @@ def test_collect_data_all_failures(collector, conn_mock): """Test collection when all commands fail""" collector.system_info.os_family = OSFamily.LINUX - # Mock all commands failing (including ethtool) - def run_sut_cmd_side_effect(cmd): - return MagicMock(exit_code=1, stdout="", command=cmd) + # Mock all commands failing (including ethtool, LLDP, Broadcom, Pensando) + def run_sut_cmd_side_effect(cmd, **kwargs): + return MagicMock(exit_code=1, command=cmd) collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) result, data = collector.collect_data() - assert result.status == ExecutionStatus.ERROR - assert data is None + assert result.status == ExecutionStatus.OK + assert data is not None + assert len(data.interfaces) == 0 + assert len(data.routes) == 0 + assert len(data.rules) == 0 + assert len(data.neighbors) == 0 assert len(result.events) > 0 @@ -542,3 +573,1288 @@ def test_network_data_model_creation(collector): assert len(data.ethtool_info) == 1 assert data.interfaces[0].name == "ethmock123" assert data.ethtool_info["ethmock123"].speed == "1000mockMb/s" + + +# Sample Broadcom NIC command outputs for testing +NICCLI_LISTDEV_OUTPUT = """ +1 ) Broadcom BCM57608 1x400G QSFP-DD PCIe Ethernet NIC (Adp#1 Port#1) + Device Interface Name : abcd1p1 + MAC Address : 81:82:83:84:85:88 + PCI Address : 0000:22:00.0 +""" + +NICCLI_QOS_OUTPUT = """ +IEEE 8021QAZ ETS Configuration TLV: + PRIO_MAP: 0:0 1:0 2:0 3:1 4:0 5:0 6:0 7:2 + TC Bandwidth: 50% 50% 0% + TSA_MAP: 0:ets 1:ets 2:strict +IEEE 8021QAZ PFC TLV: + PFC enabled: 3 +IEEE 8021QAZ APP TLV: + APP#0: + Priority: 7 + Sel: 5 + DSCP: 48 + + APP#1: + Priority: 3 + Sel: 5 + DSCP: 26 + + APP#2: + Priority: 3 + Sel: 3 + UDP or DCCP: 4791 + +TC Rate Limit: 100% 100% 100% 0% 0% 0% 0% 0% +""" + +NICCLI_QOS_MINIMAL_OUTPUT = """IEEE 8021QAZ ETS Configuration TLV: + PRIO_MAP: 0:0 1:1 + TC Bandwidth: 50% 50% + TSA_MAP: 0:ets 1:strict +IEEE 8021QAZ PFC TLV: + PFC enabled: 1 +TC Rate Limit: 100% 100% +""" + +# Sample Pensando NIC command outputs for testing +NICCTL_SHOW_CARD_OUTPUT = """ +--------------------------------------------------------------------------------------------- +Id PCIe BDF ASIC F/W partition Serial number +--------------------------------------------------------------------------------------------- +1111111-4c32-3533-3330-12345000000 0000:06:00.0 test1 A ABC1234 +2222222-4c32-3533-3731-78901500000 0000:16:00.0 test2 A DEF5678 +""" + +NICCTL_SHOW_DCQCN_OUTPUT = """ +NIC : 1111111-4c32-3533-3330-12345000000 (0000:06:00.0) +------------------------------------------------------------------------------------------ + +Lif id : 1111111-4c32-3533-3330-12345000000 +ROCE device : sample + DCQCN profile id : 1 + Status : Disabled +****************************************************************************************** +""" + +NICCTL_SHOW_ENVIRONMENT_OUTPUT = """ +NIC : 1111111-4c32-3533-3330-12345000000 (0000:06:00.0) + + Power(W): + Total power drawn (pin) : 29.437 + Core power (pout1) : 12.375 + ARM power (pout2) : 0.788 + Temperature(C): + Local board temperature : 44.12 + Die temperature : 45.59 + Voltage(mV): + Input voltage : 12078 + Core voltage : 725 + Frequency(MHz): + Core frequency : 1100 + CPU frequency : 1500 + P4 stage frequency : 1500 +------------------------------------------------------------------------------------- +""" + +NICCTL_SHOW_PCIE_ATS_OUTPUT = """ +NIC : 1111111-4c32-3533-3330-12345000000 (0000:06:00.0) : Disabled +""" + +NICCTL_SHOW_PORT_OUTPUT = """ +NIC : 1111111-4c32-3533-3330-12345000000 (0000:06:00.0) + +Port : 555555a-6c40-4242-4242-000011010000 (eth1/1) + Spec: + Ifindex : 0x11010000 + Type : ETH + speed : 400G + Admin state : UP + FEC type : RS + Pause type : PFC + Number of lanes : 4 + MTU : 9216 + TX pause : enabled + RX pause : enabled + Auto negotiation : disabled + Status: + Physical port : 1 + Operational status : DOWN + Link FSM state : SIGNAL_DETECT + FEC type : RS + Cable type : Copper + Number of lanes : 4 + speed : 400G + Auto negotiation : disabled + MAC ID : 0 + MAC channel : 0 + MAC address : 04:90:81:4a:6c:40 + Transceiver type : QSFP_CMIS + Transceiver state : SPROM-READ + Transceiver PID : QSFP-400G-CR4 +------------------------------------------------------------------------------------- +""" + +NICCTL_SHOW_QOS_OUTPUT = """ +NIC : 1111111-4c32-3533-3330-12345000000 (0000:06:00.0) + +Port : 0490814a-6c40-4242-4242-000011010000 + + Classification type : DSCP + + DSCP-to-priority : + DSCP bitmap : 0xffffffffffffffff ==> priority : 0 + DSCP : 0-63 ==> priority : 0 + + + PFC : + PFC priority bitmap : 0x0 + PFC no-drop priorities : + + Scheduling : + -------------------------------------------- + Priority Scheduling Bandwidth Rate-limit + Type (in %age) (in Gbps) + -------------------------------------------- + 0 DWRR 0 N/A +""" + + +def test_parse_niccli_listdev_device(collector): + """Test parsing Broadcom NIC device from niccli --list_devices output""" + devices = collector._parse_niccli_listdev(NICCLI_LISTDEV_OUTPUT) + + assert len(devices) == 1 + + # Check device + device1 = devices[0] + assert device1.device_num == 1 + assert device1.model == "Broadcom BCM57608 1x400G QSFP-DD PCIe Ethernet NIC" + assert device1.adapter_port == "Adp#1 Port#1" + assert device1.interface_name == "abcd1p1" + assert device1.mac_address == "81:82:83:84:85:88" + assert device1.pci_address == "0000:22:00.0" + + +def test_parse_niccli_listdev_empty_output(collector): + """Test parsing empty niccli --list_devices output""" + devices = collector._parse_niccli_listdev("") + + assert len(devices) == 0 + + +def test_parse_niccli_listdev_malformed_output(collector): + """Test parsing malformed niccli --list_devices output gracefully""" + malformed = """some random text +not a valid device line +123 invalid format +""" + + devices = collector._parse_niccli_listdev(malformed) + + # Should handle gracefully, return empty list or skip invalid lines + assert isinstance(devices, list) + + +def test_parse_niccli_qos_complete(collector): + """Test parsing complete Broadcom NIC QoS output with all fields""" + qos = collector._parse_niccli_qos(1, NICCLI_QOS_OUTPUT) + + assert qos.device_num == 1 + assert qos.raw_output == NICCLI_QOS_OUTPUT + + # Check PRIO_MAP + assert len(qos.prio_map) == 8 + assert qos.prio_map[0] == 0 + assert qos.prio_map[1] == 0 + assert qos.prio_map[3] == 1 + assert qos.prio_map[7] == 2 + + # Check TC Bandwidth + assert len(qos.tc_bandwidth) == 3 + assert qos.tc_bandwidth[0] == 50 + assert qos.tc_bandwidth[1] == 50 + assert qos.tc_bandwidth[2] == 0 + + # Check TSA_MAP + assert len(qos.tsa_map) == 3 + assert qos.tsa_map[0] == "ets" + assert qos.tsa_map[1] == "ets" + assert qos.tsa_map[2] == "strict" + + # Check PFC enabled + assert qos.pfc_enabled == 3 + + # Check APP entries + assert len(qos.app_entries) == 3 + + # Check APP#0 + app0 = qos.app_entries[0] + assert app0.priority == 7 + assert app0.sel == 5 + assert app0.dscp == 48 + assert app0.protocol is None + assert app0.port is None + + # Check APP#1 + app1 = qos.app_entries[1] + assert app1.priority == 3 + assert app1.sel == 5 + assert app1.dscp == 26 + + # Check APP#2 (with protocol and port) + app2 = qos.app_entries[2] + assert app2.priority == 3 + assert app2.sel == 3 + assert app2.dscp is None + assert app2.protocol == "UDP or DCCP" + assert app2.port == 4791 + + # Check TC Rate Limit + assert len(qos.tc_rate_limit) == 8 + assert qos.tc_rate_limit[0] == 100 + assert qos.tc_rate_limit[1] == 100 + assert qos.tc_rate_limit[2] == 100 + assert qos.tc_rate_limit[3] == 0 + assert qos.tc_rate_limit[7] == 0 + + +def test_parse_niccli_qos_empty_output(collector): + """Test parsing empty QoS output""" + qos = collector._parse_niccli_qos(1, "") + + assert qos.device_num == 1 + assert qos.raw_output == "" + assert len(qos.prio_map) == 0 + assert len(qos.tc_bandwidth) == 0 + assert len(qos.tsa_map) == 0 + assert qos.pfc_enabled is None + assert len(qos.app_entries) == 0 + assert len(qos.tc_rate_limit) == 0 + + +def test_parse_niccli_qos_multiple_app_protocols(collector): + """Test parsing QoS with APP entries having different protocols""" + qos_multi_protocol = """IEEE 8021QAZ ETS Configuration TLV: + PRIO_MAP: 0:0 + TC Bandwidth: 100% + TSA_MAP: 0:ets +IEEE 8021QAZ PFC TLV: + PFC enabled: 0 +IEEE 8021QAZ APP TLV: + APP#0: + Priority: 5 + Sel: 3 + TCP: 8080 + + APP#1: + Priority: 6 + Sel: 3 + UDP: 9000 + +TC Rate Limit: 100% +""" + + qos = collector._parse_niccli_qos(3, qos_multi_protocol) + + assert len(qos.app_entries) == 2 + + # Check TCP entry + app0 = qos.app_entries[0] + assert app0.priority == 5 + assert app0.sel == 3 + assert app0.protocol == "TCP" + assert app0.port == 8080 + + # Check UDP entry + app1 = qos.app_entries[1] + assert app1.priority == 6 + assert app1.sel == 3 + assert app1.protocol == "UDP" + assert app1.port == 9000 + + +def test_parse_niccli_qos_malformed_values(collector): + """Test parsing QoS output with malformed values gracefully""" + malformed = """IEEE 8021QAZ ETS Configuration TLV: + PRIO_MAP: 0:invalid 1:1 bad:data + TC Bandwidth: 50% invalid 50% + TSA_MAP: 0:ets bad:value 1:strict +IEEE 8021QAZ PFC TLV: + PFC enabled: not_a_number +TC Rate Limit: 100% bad% 100% +""" + + qos = collector._parse_niccli_qos(1, malformed) + + # Should skip invalid entries but parse valid ones + assert qos.device_num == 1 + # Should have parsed valid prio_map entry (1:1) + assert 1 in qos.prio_map + assert qos.prio_map[1] == 1 + # Should have parsed valid bandwidth entries + assert 50 in qos.tc_bandwidth + # Should have parsed valid tsa_map entries + assert qos.tsa_map.get(0) == "ets" + assert qos.tsa_map.get(1) == "strict" + # PFC should be None due to invalid number + assert qos.pfc_enabled is None + + +def test_network_data_model_with_broadcom_nic(collector): + """Test creating NetworkDataModel with Broadcom NIC data""" + device = BroadcomNicDevice( + device_num=1, + model="Broadcom BCM57608 1x400G QSFP-DD PCIe Ethernet NIC", + adapter_port="Adp#1 Port#1", + interface_name="benic1p1", + mac_address="8C:84:74:37:C3:70", + pci_address="0000:06:00.0", + ) + + qos = BroadcomNicQos( + device_num=1, + raw_output="test output", + prio_map={0: 0, 1: 1}, + tc_bandwidth=[50, 50], + tsa_map={0: "ets", 1: "strict"}, + pfc_enabled=3, + tc_rate_limit=[100, 100], + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + broadcom_nic_devices=[device], + broadcom_nic_qos={1: qos}, + ) + + assert len(data.broadcom_nic_devices) == 1 + assert len(data.broadcom_nic_qos) == 1 + assert data.broadcom_nic_devices[0].device_num == 1 + assert data.broadcom_nic_devices[0].interface_name == "benic1p1" + assert data.broadcom_nic_qos[1].device_num == 1 + assert data.broadcom_nic_qos[1].pfc_enabled == 3 + + +def test_parse_nicctl_show_card_multiple_cards(collector): + """Test parsing multiple Pensando NIC cards from nicctl show card output""" + cards = collector._parse_nicctl_card(NICCTL_SHOW_CARD_OUTPUT) + + assert len(cards) == 2 + + # Check first card + card1 = cards[0] + assert card1.id == "1111111-4c32-3533-3330-12345000000" + assert card1.pcie_bdf == "0000:06:00.0" + assert card1.asic == "test1" + assert card1.fw_partition == "A" + assert card1.serial_number == "ABC1234" + + # Check second card + card2 = cards[1] + assert card2.id == "2222222-4c32-3533-3731-78901500000" + assert card2.pcie_bdf == "0000:16:00.0" + assert card2.asic == "test2" + assert card2.fw_partition == "A" + assert card2.serial_number == "DEF5678" + + +def test_parse_nicctl_show_card_empty_output(collector): + """Test parsing empty nicctl show card output""" + cards = collector._parse_nicctl_card("") + + assert len(cards) == 0 + + +def test_parse_nicctl_show_card_partial_fields(collector): + """Test parsing nicctl show card output with partial fields""" + partial_output = """ +--------------------------------------------------------------------------------------------- +Id PCIe BDF ASIC F/W partition Serial number +--------------------------------------------------------------------------------------------- +42424650-4c32-3533-3330-323934000000 0000:06:00.0 +42424650-4c32-3533-3731-304535000000 0000:16:00.0 salina +""" + + cards = collector._parse_nicctl_card(partial_output) + + assert len(cards) == 2 + + # First card with only ID and PCIe BDF + card1 = cards[0] + assert card1.id == "42424650-4c32-3533-3330-323934000000" + assert card1.pcie_bdf == "0000:06:00.0" + assert card1.asic is None + assert card1.fw_partition is None + assert card1.serial_number is None + + # Second card with ID, PCIe BDF, and ASIC + card2 = cards[1] + assert card2.id == "42424650-4c32-3533-3731-304535000000" + assert card2.pcie_bdf == "0000:16:00.0" + assert card2.asic == "salina" + assert card2.fw_partition is None + assert card2.serial_number is None + + +def test_parse_nicctl_show_card_malformed_output(collector): + """Test parsing malformed nicctl show card output gracefully""" + malformed = """some random text +not a valid card line +123 invalid format +""" + + cards = collector._parse_nicctl_card(malformed) + + # Should handle gracefully, return empty list or skip invalid lines + assert isinstance(cards, list) + # May parse some invalid entries, but should not crash + + +def test_network_data_model_with_pensando_nic(collector): + """Test creating NetworkDataModel with Pensando NIC data""" + card1 = PensandoNicCard( + id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + asic="salina", + fw_partition="A", + serial_number="FPL25330294", + ) + + card2 = PensandoNicCard( + id="42424650-4c32-3533-3731-304535000000", + pcie_bdf="0000:16:00.0", + asic="salina", + fw_partition="A", + serial_number="FPL253710E5", + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + pensando_nic_cards=[card1, card2], + ) + + assert len(data.pensando_nic_cards) == 2 + assert data.pensando_nic_cards[0].id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_cards[0].pcie_bdf == "0000:06:00.0" + assert data.pensando_nic_cards[0].asic == "salina" + assert data.pensando_nic_cards[1].serial_number == "FPL253710E5" + + +def test_collect_pensando_nic_success(collector, conn_mock): + """Test successful collection of Pensando NIC data""" + collector.system_info.os_family = OSFamily.LINUX + + # Mock successful nicctl command execution + def run_sut_cmd_side_effect(cmd, **kwargs): + if "nicctl show card" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_CARD_OUTPUT, command=cmd) + elif "nicctl show dcqcn" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_DCQCN_OUTPUT, command=cmd) + elif "nicctl show environment" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_ENVIRONMENT_OUTPUT, command=cmd) + elif "nicctl show pcie ats" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_PCIE_ATS_OUTPUT, command=cmd) + elif "nicctl show port" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_PORT_OUTPUT, command=cmd) + elif "nicctl show qos" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_QOS_OUTPUT, command=cmd) + elif "nicctl show rdma statistics" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_RDMA_STATISTICS_OUTPUT, command=cmd) + elif "nicctl show version host-software" in cmd: + return MagicMock( + exit_code=0, stdout=NICCTL_SHOW_VERSION_HOST_SOFTWARE_OUTPUT, command=cmd + ) + elif "nicctl show version firmware" in cmd: + return MagicMock(exit_code=0, stdout=NICCTL_SHOW_VERSION_FIRMWARE_OUTPUT, command=cmd) + elif "nicctl" in cmd: + # Other nicctl commands succeed but return empty + return MagicMock(exit_code=0, stdout="", command=cmd) + return MagicMock(exit_code=1, stdout="", command=cmd) + + collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) + + ( + cards, + dcqcn_entries, + environment_entries, + pcie_ats_entries, + port_entries, + qos_entries, + rdma_statistics_entries, + version_host_software, + version_firmware_entries, + ) = collector._collect_pensando_nic_info() + + assert len(cards) == 2 + assert cards[0].id == "1111111-4c32-3533-3330-12345000000" + assert cards[0].pcie_bdf == "0000:06:00.0" + assert cards[0].asic == "test1" + assert cards[0].serial_number == "ABC1234" + + assert len(dcqcn_entries) == 1 + assert dcqcn_entries[0].nic_id == "1111111-4c32-3533-3330-12345000000" + assert dcqcn_entries[0].pcie_bdf == "0000:06:00.0" + + assert len(environment_entries) == 1 + assert environment_entries[0].nic_id == "1111111-4c32-3533-3330-12345000000" + assert environment_entries[0].pcie_bdf == "0000:06:00.0" + + assert len(pcie_ats_entries) == 1 + assert pcie_ats_entries[0].nic_id == "1111111-4c32-3533-3330-12345000000" + assert pcie_ats_entries[0].pcie_bdf == "0000:06:00.0" + assert pcie_ats_entries[0].status == "Disabled" + + assert len(port_entries) == 1 + assert port_entries[0].nic_id == "1111111-4c32-3533-3330-12345000000" + assert port_entries[0].pcie_bdf == "0000:06:00.0" + assert port_entries[0].port_name == "eth1/1" + + assert len(qos_entries) == 1 + assert qos_entries[0].nic_id == "1111111-4c32-3533-3330-12345000000" + assert qos_entries[0].pcie_bdf == "0000:06:00.0" + assert qos_entries[0].port_id == "0490814a-6c40-4242-4242-000011010000" + + assert len(rdma_statistics_entries) == 2 + assert rdma_statistics_entries[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert rdma_statistics_entries[0].pcie_bdf == "0000:06:00.0" + assert len(rdma_statistics_entries[0].statistics) == 2 + + assert version_host_software is not None + assert version_host_software.nicctl == "1.117.1-a-63" + assert version_host_software.ipc_driver == "1.117.1.a.63" + assert version_host_software.ionic_driver == "25.08.4.004" + + assert len(version_firmware_entries) == 2 + assert version_firmware_entries[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert version_firmware_entries[0].pcie_bdf == "0000:06:00.0" + assert version_firmware_entries[0].cpld == "3.16 (primary)" + + +def test_parse_nicctl_show_dcqcn_multiple_entries(collector): + """Test parsing Pensando NIC DCQCN entry from nicctl show dcqcn output""" + dcqcn_entries = collector._parse_nicctl_dcqcn(NICCTL_SHOW_DCQCN_OUTPUT) + + assert len(dcqcn_entries) == 1 + + # Check entry + entry1 = dcqcn_entries[0] + assert entry1.nic_id == "1111111-4c32-3533-3330-12345000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.lif_id == "1111111-4c32-3533-3330-12345000000" + assert entry1.roce_device == "sample" + assert entry1.dcqcn_profile_id == "1" + assert entry1.status == "Disabled" + + +def test_parse_nicctl_show_dcqcn_empty_output(collector): + """Test parsing empty nicctl show dcqcn output""" + dcqcn_entries = collector._parse_nicctl_dcqcn("") + + assert len(dcqcn_entries) == 0 + + +def test_parse_nicctl_show_dcqcn_partial_fields(collector): + """Test parsing nicctl show dcqcn output with partial fields""" + partial_output = """ +NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0) +------------------------------------------------------------------------------------------ + +Lif id : 43000070-0100-0000-4242-0490814a6c40 +****************************************************************************************** +""" + + dcqcn_entries = collector._parse_nicctl_dcqcn(partial_output) + + assert len(dcqcn_entries) == 1 + + # Entry with only NIC ID, PCIe BDF, and Lif ID + entry1 = dcqcn_entries[0] + assert entry1.nic_id == "42424650-4c32-3533-3330-323934000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.lif_id == "43000070-0100-0000-4242-0490814a6c40" + assert entry1.roce_device is None + assert entry1.dcqcn_profile_id is None + assert entry1.status is None + + +def test_parse_nicctl_show_dcqcn_malformed_output(collector): + """Test parsing malformed nicctl show dcqcn output gracefully""" + malformed = """some random text +not a valid dcqcn line +123 invalid format +""" + + dcqcn_entries = collector._parse_nicctl_dcqcn(malformed) + + # Should handle gracefully, return empty list + assert isinstance(dcqcn_entries, list) + assert len(dcqcn_entries) == 0 + + +def test_network_data_model_with_pensando_nic_dcqcn(collector): + """Test creating NetworkDataModel with Pensando NIC DCQCN data""" + dcqcn1 = PensandoNicDcqcn( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + lif_id="43000070-0100-0000-4242-0490814a6c40", + roce_device="rocep9s0", + dcqcn_profile_id="1", + status="Disabled", + ) + + dcqcn2 = PensandoNicDcqcn( + nic_id="42424650-4c32-3533-3731-304535000000", + pcie_bdf="0000:16:00.0", + lif_id="43000070-0100-0000-4242-0490815cce50", + roce_device="rocep25s0", + dcqcn_profile_id="1", + status="Disabled", + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + pensando_nic_dcqcn=[dcqcn1, dcqcn2], + ) + + assert len(data.pensando_nic_dcqcn) == 2 + assert data.pensando_nic_dcqcn[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_dcqcn[0].pcie_bdf == "0000:06:00.0" + assert data.pensando_nic_dcqcn[0].roce_device == "rocep9s0" + assert data.pensando_nic_dcqcn[1].lif_id == "43000070-0100-0000-4242-0490815cce50" + + +def test_parse_nicctl_show_environment_multiple_entries(collector): + """Test parsing Pensando NIC environment entry from nicctl show environment output""" + environment_entries = collector._parse_nicctl_environment(NICCTL_SHOW_ENVIRONMENT_OUTPUT) + + assert len(environment_entries) == 1 + + # Check entry + entry1 = environment_entries[0] + assert entry1.nic_id == "1111111-4c32-3533-3330-12345000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.total_power_drawn == 29.437 + assert entry1.core_power == 12.375 + assert entry1.arm_power == 0.788 + assert entry1.local_board_temperature == 44.12 + assert entry1.die_temperature == 45.59 + assert entry1.input_voltage == 12078 + assert entry1.core_voltage == 725 + assert entry1.core_frequency == 1100 + assert entry1.cpu_frequency == 1500 + assert entry1.p4_stage_frequency == 1500 + + +def test_parse_nicctl_show_environment_empty_output(collector): + """Test parsing empty nicctl show environment output""" + environment_entries = collector._parse_nicctl_environment("") + + assert len(environment_entries) == 0 + + +def test_parse_nicctl_show_environment_partial_fields(collector): + """Test parsing nicctl show environment output with partial fields""" + partial_output = """ +NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0) + + Power(W): + Total power drawn (pin) : 29.437 + Temperature(C): + Local board temperature : 44.12 +------------------------------------------------------------------------------------- +""" + + environment_entries = collector._parse_nicctl_environment(partial_output) + + assert len(environment_entries) == 1 + + # Entry with only some fields + entry1 = environment_entries[0] + assert entry1.nic_id == "42424650-4c32-3533-3330-323934000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.total_power_drawn == 29.437 + assert entry1.local_board_temperature == 44.12 + assert entry1.core_power is None + assert entry1.die_temperature is None + assert entry1.input_voltage is None + + +def test_parse_nicctl_show_environment_malformed_output(collector): + """Test parsing malformed nicctl show environment output gracefully""" + malformed = """some random text +not a valid environment line +123 invalid format +""" + + environment_entries = collector._parse_nicctl_environment(malformed) + + # Should handle gracefully, return empty list + assert isinstance(environment_entries, list) + assert len(environment_entries) == 0 + + +def test_network_data_model_with_pensando_nic_environment(collector): + """Test creating NetworkDataModel with Pensando NIC environment data""" + env1 = PensandoNicEnvironment( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + total_power_drawn=29.437, + core_power=12.375, + arm_power=0.788, + local_board_temperature=44.12, + die_temperature=45.59, + input_voltage=12078, + core_voltage=725, + core_frequency=1100, + cpu_frequency=1500, + p4_stage_frequency=1500, + ) + + env2 = PensandoNicEnvironment( + nic_id="42424650-4c32-3533-3731-304535000000", + pcie_bdf="0000:16:00.0", + total_power_drawn=28.968, + core_power=12.031, + arm_power=0.292, + local_board_temperature=42.62, + die_temperature=42.28, + input_voltage=12078, + core_voltage=725, + core_frequency=1100, + cpu_frequency=1500, + p4_stage_frequency=1500, + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + pensando_nic_environment=[env1, env2], + ) + + assert len(data.pensando_nic_environment) == 2 + assert data.pensando_nic_environment[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_environment[0].pcie_bdf == "0000:06:00.0" + assert data.pensando_nic_environment[0].total_power_drawn == 29.437 + assert data.pensando_nic_environment[0].die_temperature == 45.59 + assert data.pensando_nic_environment[1].core_frequency == 1100 + + +def test_parse_nicctl_show_pcie_ats_multiple_entries(collector): + """Test parsing Pensando NIC PCIe ATS entry from nicctl show pcie ats output""" + pcie_ats_entries = collector._parse_nicctl_pcie_ats(NICCTL_SHOW_PCIE_ATS_OUTPUT) + + assert len(pcie_ats_entries) == 1 + + # Check entry + entry1 = pcie_ats_entries[0] + assert entry1.nic_id == "1111111-4c32-3533-3330-12345000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.status == "Disabled" + + +def test_parse_nicctl_show_pcie_ats_empty_output(collector): + """Test parsing empty nicctl show pcie ats output""" + pcie_ats_entries = collector._parse_nicctl_pcie_ats("") + + assert len(pcie_ats_entries) == 0 + + +def test_parse_nicctl_show_pcie_ats_enabled(collector): + """Test parsing nicctl show pcie ats output with Enabled status""" + enabled_output = """ +NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0) : Enabled +NIC : 42424650-4c32-3533-3731-304535000000 (0000:16:00.0) : Disabled +""" + + pcie_ats_entries = collector._parse_nicctl_pcie_ats(enabled_output) + + assert len(pcie_ats_entries) == 2 + assert pcie_ats_entries[0].status == "Enabled" + assert pcie_ats_entries[1].status == "Disabled" + + +def test_parse_nicctl_show_pcie_ats_malformed_output(collector): + """Test parsing malformed nicctl show pcie ats output gracefully""" + malformed = """some random text +not a valid pcie ats line +123 invalid format +""" + + pcie_ats_entries = collector._parse_nicctl_pcie_ats(malformed) + + # Should handle gracefully, return empty list + assert isinstance(pcie_ats_entries, list) + assert len(pcie_ats_entries) == 0 + + +def test_network_data_model_with_pensando_nic_pcie_ats(collector): + """Test creating NetworkDataModel with Pensando NIC PCIe ATS data""" + ats1 = PensandoNicPcieAts( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + status="Disabled", + ) + + ats2 = PensandoNicPcieAts( + nic_id="42424650-4c32-3533-3731-304535000000", + pcie_bdf="0000:16:00.0", + status="Enabled", + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + pensando_nic_pcie_ats=[ats1, ats2], + ) + + assert len(data.pensando_nic_pcie_ats) == 2 + assert data.pensando_nic_pcie_ats[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_pcie_ats[0].pcie_bdf == "0000:06:00.0" + assert data.pensando_nic_pcie_ats[0].status == "Disabled" + assert data.pensando_nic_pcie_ats[1].status == "Enabled" + + +def test_parse_nicctl_show_port_multiple_entries(collector): + """Test parsing Pensando NIC port entry from nicctl show port output""" + port_entries = collector._parse_nicctl_port(NICCTL_SHOW_PORT_OUTPUT) + + assert len(port_entries) == 1 + + # Check entry + entry1 = port_entries[0] + assert entry1.nic_id == "1111111-4c32-3533-3330-12345000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.port_id == "555555a-6c40-4242-4242-000011010000" + assert entry1.port_name == "eth1/1" + # Spec fields + assert entry1.spec_ifindex == "0x11010000" + assert entry1.spec_type == "ETH" + assert entry1.spec_speed == "400G" + assert entry1.spec_admin_state == "UP" + assert entry1.spec_fec_type == "RS" + assert entry1.spec_pause_type == "PFC" + assert entry1.spec_num_lanes == 4 + assert entry1.spec_mtu == 9216 + assert entry1.spec_tx_pause == "enabled" + assert entry1.spec_rx_pause == "enabled" + assert entry1.spec_auto_negotiation == "disabled" + # Status fields + assert entry1.status_physical_port == 1 + assert entry1.status_operational_status == "DOWN" + assert entry1.status_link_fsm_state == "SIGNAL_DETECT" + assert entry1.status_fec_type == "RS" + assert entry1.status_cable_type == "Copper" + assert entry1.status_num_lanes == 4 + assert entry1.status_speed == "400G" + assert entry1.status_auto_negotiation == "disabled" + assert entry1.status_mac_id == 0 + assert entry1.status_mac_channel == 0 + assert entry1.status_mac_address == "04:90:81:4a:6c:40" + assert entry1.status_transceiver_type == "QSFP_CMIS" + assert entry1.status_transceiver_state == "SPROM-READ" + assert entry1.status_transceiver_pid == "QSFP-400G-CR4" + + +def test_parse_nicctl_show_port_empty_output(collector): + """Test parsing empty nicctl show port output""" + port_entries = collector._parse_nicctl_port("") + + assert len(port_entries) == 0 + + +def test_parse_nicctl_show_port_partial_fields(collector): + """Test parsing nicctl show port output with partial fields""" + partial_output = """ +NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0) + +Port : 0490814a-6c40-4242-4242-000011010000 (eth1/1) + Spec: + speed : 400G + Admin state : UP + Status: + Operational status : DOWN +------------------------------------------------------------------------------------- +""" + + port_entries = collector._parse_nicctl_port(partial_output) + + assert len(port_entries) == 1 + + # Entry with only some fields + entry1 = port_entries[0] + assert entry1.nic_id == "42424650-4c32-3533-3330-323934000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.port_name == "eth1/1" + assert entry1.spec_speed == "400G" + assert entry1.spec_admin_state == "UP" + assert entry1.status_operational_status == "DOWN" + assert entry1.spec_mtu is None + assert entry1.status_mac_address is None + + +def test_parse_nicctl_show_port_malformed_output(collector): + """Test parsing malformed nicctl show port output gracefully""" + malformed = """some random text +not a valid port line +123 invalid format +""" + + port_entries = collector._parse_nicctl_port(malformed) + + # Should handle gracefully, return empty list + assert isinstance(port_entries, list) + assert len(port_entries) == 0 + + +def test_network_data_model_with_pensando_nic_port(collector): + """Test creating NetworkDataModel with Pensando NIC port data""" + port1 = PensandoNicPort( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + port_id="0490814a-6c40-4242-4242-000011010000", + port_name="eth1/1", + spec_speed="400G", + spec_admin_state="UP", + spec_mtu=9216, + status_operational_status="DOWN", + status_mac_address="04:90:81:4a:6c:40", + ) + + port2 = PensandoNicPort( + nic_id="42424650-4c32-3533-3731-304535000000", + pcie_bdf="0000:16:00.0", + port_id="0490815c-ce50-4242-4242-000011010000", + port_name="eth1/1", + spec_speed="400G", + spec_admin_state="UP", + spec_mtu=9216, + status_operational_status="UP", + status_mac_address="04:90:81:5c:ce:50", + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + pensando_nic_ports=[port1, port2], + ) + + assert len(data.pensando_nic_ports) == 2 + assert data.pensando_nic_ports[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_ports[0].port_name == "eth1/1" + assert data.pensando_nic_ports[0].spec_speed == "400G" + assert data.pensando_nic_ports[0].status_mac_address == "04:90:81:4a:6c:40" + assert data.pensando_nic_ports[1].status_operational_status == "UP" + + +def test_parse_nicctl_show_qos_multiple_entries(collector): + """Test parsing Pensando NIC QoS entry from nicctl show qos output""" + qos_entries = collector._parse_nicctl_qos(NICCTL_SHOW_QOS_OUTPUT) + + assert len(qos_entries) == 1 + + # Check entry + entry1 = qos_entries[0] + assert entry1.nic_id == "1111111-4c32-3533-3330-12345000000" + assert entry1.pcie_bdf == "0000:06:00.0" + assert entry1.port_id == "0490814a-6c40-4242-4242-000011010000" + assert entry1.classification_type == "DSCP" + assert entry1.dscp_bitmap == "0xffffffffffffffff" + assert entry1.dscp_range == "0-63" + assert entry1.dscp_priority == 0 + assert entry1.pfc_priority_bitmap == "0x0" + assert entry1.pfc_no_drop_priorities == "" + assert len(entry1.scheduling) == 1 + assert entry1.scheduling[0].priority == 0 + assert entry1.scheduling[0].scheduling_type == "DWRR" + assert entry1.scheduling[0].bandwidth == 0 + assert entry1.scheduling[0].rate_limit == "N/A" + + +def test_parse_nicctl_show_qos_empty_output(collector): + """Test parsing empty nicctl show qos output""" + qos_entries = collector._parse_nicctl_qos("") + + assert len(qos_entries) == 0 + + +def test_parse_nicctl_show_qos_malformed_output(collector): + """Test parsing malformed nicctl show qos output gracefully""" + malformed = """some random text +not a valid qos line +123 invalid format +""" + + qos_entries = collector._parse_nicctl_qos(malformed) + + # Should handle gracefully, return empty list + assert isinstance(qos_entries, list) + assert len(qos_entries) == 0 + + +def test_network_data_model_with_pensando_nic_qos(collector): + """Test creating NetworkDataModel with Pensando NIC QoS data""" + sched1 = PensandoNicQosScheduling( + priority=0, + scheduling_type="DWRR", + bandwidth=0, + rate_limit="N/A", + ) + + qos1 = PensandoNicQos( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + port_id="0490814a-6c40-4242-4242-000011010000", + classification_type="DSCP", + dscp_bitmap="0xffffffffffffffff", + dscp_range="0-63", + dscp_priority=0, + pfc_priority_bitmap="0x0", + pfc_no_drop_priorities="", + scheduling=[sched1], + ) + + qos2 = PensandoNicQos( + nic_id="42424650-4c32-3533-3731-304535000000", + pcie_bdf="0000:16:00.0", + port_id="0490815c-ce50-4242-4242-000011010000", + classification_type="DSCP", + ) + + data = NetworkDataModel( + interfaces=[], + routes=[], + rules=[], + neighbors=[], + ethtool_info={}, + pensando_nic_qos=[qos1, qos2], + ) + + assert len(data.pensando_nic_qos) == 2 + assert data.pensando_nic_qos[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_qos[0].port_id == "0490814a-6c40-4242-4242-000011010000" + assert data.pensando_nic_qos[0].classification_type == "DSCP" + assert len(data.pensando_nic_qos[0].scheduling) == 1 + assert data.pensando_nic_qos[1].nic_id == "42424650-4c32-3533-3731-304535000000" + + +# Mock output for 'nicctl show rdma statistics' +NICCTL_SHOW_RDMA_STATISTICS_OUTPUT = """NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0) + +------------------------------------------------------------ +Name Count +------------------------------------------------------------ +Queue pair create 1 +Completion queue create 2 + +NIC : 42424650-4c32-3533-3731-304535000000 (0000:16:00.0) + +------------------------------------------------------------ +Name Count +------------------------------------------------------------ +Queue pair create 1 +Completion queue create 2 +""" + + +def test_parse_nicctl_show_rdma_statistics_multiple_entries(collector): + """Test parsing multiple NIC RDMA statistics entries.""" + entries = collector._parse_nicctl_rdma_statistics(NICCTL_SHOW_RDMA_STATISTICS_OUTPUT) + + assert len(entries) == 2 + + # Check first entry + assert entries[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert entries[0].pcie_bdf == "0000:06:00.0" + assert len(entries[0].statistics) == 2 + assert entries[0].statistics[0].name == "Queue pair create" + assert entries[0].statistics[0].count == 1 + assert entries[0].statistics[1].name == "Completion queue create" + assert entries[0].statistics[1].count == 2 + + # Check second entry + assert entries[1].nic_id == "42424650-4c32-3533-3731-304535000000" + assert entries[1].pcie_bdf == "0000:16:00.0" + assert len(entries[1].statistics) == 2 + assert entries[1].statistics[0].name == "Queue pair create" + assert entries[1].statistics[0].count == 1 + assert entries[1].statistics[1].name == "Completion queue create" + assert entries[1].statistics[1].count == 2 + + +def test_parse_nicctl_show_rdma_statistics_empty_output(collector): + """Test parsing empty RDMA statistics output.""" + entries = collector._parse_nicctl_rdma_statistics("") + assert len(entries) == 0 + + +# Mock output for 'nicctl show version host-software' +NICCTL_SHOW_VERSION_HOST_SOFTWARE_OUTPUT = """nicctl : 1.117.1-a-63 +IPC driver : 1.117.1.a.63 +ionic driver : 25.08.4.004 +""" + + +def test_parse_nicctl_show_version_host_software(collector): + """Test parsing host software version.""" + version = collector._parse_nicctl_version_host_software( + NICCTL_SHOW_VERSION_HOST_SOFTWARE_OUTPUT + ) + + assert version is not None + assert version.nicctl == "1.117.1-a-63" + assert version.ipc_driver == "1.117.1.a.63" + assert version.ionic_driver == "25.08.4.004" + + +def test_parse_nicctl_show_version_host_software_empty_output(collector): + """Test parsing empty host software version output.""" + version = collector._parse_nicctl_version_host_software("") + assert version is None + + +# Mock output for 'nicctl show version firmware' +NICCTL_SHOW_VERSION_FIRMWARE_OUTPUT = """NIC : 42424650-4c32-3533-3330-323934000000 (0000:06:00.0) + +CPLD : 3.16 (primary) +Boot0 : 21 +Uboot-A : 1.117.1-a-63 +Firmware-A : 1.117.1-a-63 +Device config-A : device_config_rdma_1x400G/1.0.0 +------------------------------------------------------------------------------------- + +NIC : 42424650-4c32-3533-3731-304535000000 (0000:16:00.0) + +CPLD : 3.16 (primary) +Boot0 : 21 +Uboot-A : 1.117.1-a-63 +Firmware-A : 1.117.1-a-63 +Device config-A : device_config_rdma_1x400G/1.0.0 +------------------------------------------------------------------------------------- +""" + + +def test_parse_nicctl_show_version_firmware_multiple_entries(collector): + """Test parsing multiple NIC firmware version entries.""" + entries = collector._parse_nicctl_version_firmware(NICCTL_SHOW_VERSION_FIRMWARE_OUTPUT) + + assert len(entries) == 2 + + # Check first entry + assert entries[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert entries[0].pcie_bdf == "0000:06:00.0" + assert entries[0].cpld == "3.16 (primary)" + assert entries[0].boot0 == "21" + assert entries[0].uboot_a == "1.117.1-a-63" + assert entries[0].firmware_a == "1.117.1-a-63" + assert entries[0].device_config_a == "device_config_rdma_1x400G/1.0.0" + + # Check second entry + assert entries[1].nic_id == "42424650-4c32-3533-3731-304535000000" + assert entries[1].pcie_bdf == "0000:16:00.0" + assert entries[1].cpld == "3.16 (primary)" + assert entries[1].boot0 == "21" + assert entries[1].uboot_a == "1.117.1-a-63" + assert entries[1].firmware_a == "1.117.1-a-63" + assert entries[1].device_config_a == "device_config_rdma_1x400G/1.0.0" + + +def test_parse_nicctl_show_version_firmware_empty_output(collector): + """Test parsing empty firmware version output.""" + entries = collector._parse_nicctl_version_firmware("") + assert len(entries) == 0 + + +def test_network_data_model_with_pensando_nic_rdma_statistics(): + """Test NetworkDataModel with Pensando NIC RDMA statistics.""" + from nodescraper.plugins.inband.network.networkdata import ( + NetworkDataModel, + PensandoNicRdmaStatistic, + PensandoNicRdmaStatistics, + ) + + data = NetworkDataModel( + pensando_nic_rdma_statistics=[ + PensandoNicRdmaStatistics( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + statistics=[ + PensandoNicRdmaStatistic(name="Queue pair create", count=1), + PensandoNicRdmaStatistic(name="Completion queue create", count=2), + ], + ) + ] + ) + + assert len(data.pensando_nic_rdma_statistics) == 1 + assert data.pensando_nic_rdma_statistics[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert len(data.pensando_nic_rdma_statistics[0].statistics) == 2 + + +def test_network_data_model_with_pensando_nic_version_host_software(): + """Test NetworkDataModel with Pensando NIC host software version.""" + from nodescraper.plugins.inband.network.networkdata import ( + NetworkDataModel, + PensandoNicVersionHostSoftware, + ) + + data = NetworkDataModel( + pensando_nic_version_host_software=PensandoNicVersionHostSoftware( + nicctl="1.117.1-a-63", + ipc_driver="1.117.1.a.63", + ionic_driver="25.08.4.004", + ) + ) + + assert data.pensando_nic_version_host_software is not None + assert data.pensando_nic_version_host_software.nicctl == "1.117.1-a-63" + assert data.pensando_nic_version_host_software.ipc_driver == "1.117.1.a.63" + assert data.pensando_nic_version_host_software.ionic_driver == "25.08.4.004" + + +def test_network_data_model_with_pensando_nic_version_firmware(): + """Test NetworkDataModel with Pensando NIC firmware versions.""" + from nodescraper.plugins.inband.network.networkdata import ( + NetworkDataModel, + PensandoNicVersionFirmware, + ) + + data = NetworkDataModel( + pensando_nic_version_firmware=[ + PensandoNicVersionFirmware( + nic_id="42424650-4c32-3533-3330-323934000000", + pcie_bdf="0000:06:00.0", + cpld="3.16 (primary)", + boot0="21", + uboot_a="1.117.1-a-63", + firmware_a="1.117.1-a-63", + device_config_a="device_config_rdma_1x400G/1.0.0", + ) + ] + ) + + assert len(data.pensando_nic_version_firmware) == 1 + assert data.pensando_nic_version_firmware[0].nic_id == "42424650-4c32-3533-3330-323934000000" + assert data.pensando_nic_version_firmware[0].cpld == "3.16 (primary)"