+
+ def parse_option_43(self, raw_bytes):
+ """Parses vendor-encapsulated Option 43 data."""
+ index = 0
+
+ while index < len(raw_bytes):
+ if index + 2 > len(raw_bytes):
+ print("[!] Malformed Option 43 data (truncated)")
+ break
+
+ opt_type = raw_bytes[index] # First byte is the sub-option type
+ opt_len = raw_bytes[index + 1] # Second byte is the length
+
+ if index + 2 + opt_len > len(raw_bytes):
+ print(f"[!] Skipping invalid Option 43 sub-option {opt_type:#04x} (bad length)")
+ break
+
+ opt_value = raw_bytes[index + 2: index + 2 + opt_len] # Value field
+
+ if opt_type == 0x81: # IP Address (4 bytes)
+ if opt_len == 4:
+ parsed_value = ".".join(str(b) for b in opt_value)
+ self.dhcp_sdnr_controller_ip = parsed_value
+ logger.debug(f"Received Controller IP {self.dhcp_sdnr_controller_ip} via DHCP.")
+ else:
+ logger.error(f"Length of IP address is not 4, but {opt_len}. Failed to extract Controller IP address from DHCP.")
+ self.dhcp_sdnr_controller_ip = None
+
+ elif opt_type == 0x83: # IP Address (4 bytes)
+ if opt_len == 4:
+ parsed_value = ".".join(str(b) for b in opt_value)
+ self.dhcp_ves_ip = parsed_value
+ logger.debug(f"Received VES Collector IP {self.dhcp_ves_ip} via DHCP.")
+ else:
+ logger.error(f"Length of IP address is not 4, but {opt_len}. Failed to extract VES Collector IP address from DHCP.")
+ self.dhcp_ves_ip = None
+
+ elif opt_type == 0x82: # FQDN (ASCII string)
+ try:
+ self.dhcp_sdnr_fqdn = opt_value.decode("ascii")
+ logger.debug(f"Received Controller FQDN {self.dhcp_sdnr_fqdn} via DHCP.")
+ except UnicodeDecodeError:
+ logger.error(f"Could not decode Controller FQDN in ASCII. Received hex value: {opt_value.hex()}")
+ self.dhcp_sdnr_fqdn = None
+
+ elif opt_type == 0x84: # FQDN (ASCII string)
+ try:
+ self.dhcp_ves_fqdn = opt_value.decode("ascii")
+ logger.debug(f"Received VES Collector FQDN {self.dhcp_ves_fqdn} via DHCP.")
+ except UnicodeDecodeError:
+ logger.error(f"Could not decode Controller FQDN in ASCII. Received hex value: {opt_value.hex()}")
+ self.dhcp_ves_fqdn = None
+
+ elif opt_type == 0x86: # Single-byte flag
+ if opt_len == 1:
+ parsed_value = int(opt_value[0])
+ if parsed_value == 0:
+ self.dhcp_sdnr_callhome_tls = False
+ logger.debug(f"Received CallHome over SSH via DHCP.")
+ elif parsed_value == 1:
+ logger.debug(f"Received CallHome over TLS via DHCP.")
+ self.dhcp_sdnr_callhome_tls = True
+ else:
+ logger.error(f"Could not get correct NETCONF Call Home information. Received hex value: {opt_value.hex()}")
+ index += 2 + opt_len # Move to the next sub-option
+
+ def print_dhcp_options(self, dhcp_options):
+ """Print all DHCP options from a list of (option, value) tuples."""
+ for opt in dhcp_options:
+ if isinstance(opt, tuple):
+ if opt[0] == "vendor_specific": # Fix: Use the correct Scapy name for Option 43
+ logger.debug(" Option 43 (Vendor-Specific Information):")
+ self.parse_option_43(opt[1]) # Convert raw bytes
+ else:
+ logger.debug(f" Option {opt[0]}: {opt[1]}")
+
+ def handle_packet(self, pkt):
+ """Callback to process incoming DHCP packets and detect DHCPOFFER."""
+ if DHCP in pkt:
+ dhcp_opts = pkt[DHCP].options
+ for opt in dhcp_opts:
+ if isinstance(opt, tuple) and opt[0] == "message-type":
+ if opt[1] in [2, "offer"]: # DHCPOFFER detected
+ server_ip = pkt[IP].src
+ offered_ip = pkt[BOOTP].yiaddr
+ logger.debug(f"[+] Received DHCPOFFER from {server_ip}")
+ logger.debug(f" Offered IP: {offered_ip}")
+ logger.debug(" Full DHCP options:")
+ self.print_dhcp_options(dhcp_opts)
+ return True
+ return False
+
+ def dhcp_get_config(self):
+ # Start sniffing in *async* mode so we don't block.
+ for iface in get_if_list():
+ if iface == 'lo':
+ logger.debug(f"Skipping sending DHCPDISCOVER on {iface}...")
+ continue
+ logger.debug(f"Sending DHCPDISCOVER on {iface}")
+ # sendp(discover, iface=iface, verbose=False)
+ sniff_thread = AsyncSniffer(
+ iface=iface,
+ filter="udp and (port 67 or port 68)",
+ prn=self.handle_packet,
+ store=True # Ensure packets are stored in results
+ )
+ sniff_thread.start()
+
+ time.sleep(1) # Give it a moment to get ready
+
+ mac_addr = get_container_mac_address()
+ logger.debug(f"Got back MAC address {mac_addr}")
+
+ # Now send DHCPDISCOVER
+ discover = (
+ Ether(src=mac_addr, dst="ff:ff:ff:ff:ff:ff")
+ / IP(src="0.0.0.0", dst="255.255.255.255")
+ / UDP(sport=68, dport=67)
+ / BOOTP(chaddr=b'\x02\x50\x02\x99\x00\x01', xid=0x99999999, flags=0x8000)
+ / DHCP(options=[
+ ("message-type", "discover"),
+ ("parameter-request-list", [43]), # Explicitly request Option 43
+ ("vendor_class_id", "o-ran-ru2/pynts"), # Option 60
+ "end"
+ ])
+ )
+ logger.debug("[*] Sending DHCPDISCOVER...")
+ sendp(discover, iface=iface, verbose=False)
+
+ logger.debug("[*] Sniffing for 5 seconds...")
+ time.sleep(5)
+
+ # sniff_thread.running = False
+ sniff_thread.stop()
+ results = sniff_thread.results
+ logger.debug(f"[+] Captured {len(results)} packets in total")
+
+ # for pkt in results:
+ # if self.handle_packet(pkt):
+ # logger.debug("[+] Test SUCCESS - Received a valid DHCPOFFER.")
+ # return True
+
+ # logger.debug("[-] Test FAILED - No DHCPOFFER received.")
+ # return False