Add DHCP server container.
[oam.git] / solution / infra / dhcp-tester / test_dhcp.py
1 #!/usr/bin/env python3
2
3 ################################################################################
4 # Copyright 2025 highstreet technologies
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #     http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 """
19 Scapy-based DHCP test script with Option 43 parsing.
20
21 Sends DHCPDISCOVER, waits for DHCPOFFER, extracts vendor-specific Option 43 data,
22 and prints all relevant information.
23 """
24
25 from scapy.all import Ether, IP, UDP, BOOTP, DHCP, sendp, sniff, AsyncSniffer
26 import time
27 import sys
28
29 # Mapping of Option 43 sub-option types
30 OPTION_43_TYPES = {
31     0x81: "Controller IP Address",
32     0x82: "Controller FQDN",
33     0x83: "Event Collector IP Address",
34     0x84: "Event Collector FQDN",
35     0x85: "PNF Registration Format",
36     0x86: "NETCONF Call Home"
37 }
38
39 def parse_option_43(raw_bytes):
40     """Parses vendor-encapsulated Option 43 data."""
41     index = 0
42     parsed_data = {}
43
44     while index < len(raw_bytes):
45         if index + 2 > len(raw_bytes):
46             print("[!] Malformed Option 43 data (truncated)")
47             break
48
49         opt_type = raw_bytes[index]  # First byte is the sub-option type
50         opt_len = raw_bytes[index + 1]  # Second byte is the length
51
52         if index + 2 + opt_len > len(raw_bytes):
53             print(f"[!] Skipping invalid Option 43 sub-option {opt_type:#04x} (bad length)")
54             break
55
56         opt_value = raw_bytes[index + 2: index + 2 + opt_len]  # Value field
57
58         if opt_type in [0x81, 0x83]:  # IP Address (4 bytes)
59             if opt_len == 4:
60                 parsed_value = ".".join(str(b) for b in opt_value)
61             else:
62                 parsed_value = f"Invalid IP length ({opt_len})"
63         
64         elif opt_type in [0x82, 0x84]:  # FQDN (ASCII string)
65             try:
66                 parsed_value = opt_value.decode("ascii")
67             except UnicodeDecodeError:
68                 parsed_value = opt_value.hex()  # Fallback to hex if not ASCII
69
70         elif opt_type in [0x85, 0x86]:  # Single-byte flags
71             parsed_value = int(opt_value[0]) if opt_len == 1 else f"Invalid flag length ({opt_len})"
72         
73         else:
74             parsed_value = opt_value.hex()  # Unknown types are printed in hex
75
76         parsed_data[OPTION_43_TYPES.get(opt_type, f"Unknown Type {opt_type:#04x}")] = parsed_value
77
78         index += 2 + opt_len  # Move to the next sub-option
79
80     return parsed_data
81
82 def print_dhcp_options(dhcp_options):
83     """Print all DHCP options from a list of (option, value) tuples."""
84     for opt in dhcp_options:
85         if isinstance(opt, tuple):
86             if opt[0] == "vendor_specific":  # Fix: Use the correct Scapy name for Option 43
87                 print("    Option 43 (Vendor-Specific Information):")
88                 parsed_43 = parse_option_43(opt[1])  # Convert raw bytes
89                 for k, v in parsed_43.items():
90                     print(f"        {k}: {v}")
91             else:
92                 print(f"    Option {opt[0]}: {opt[1]}")
93
94 def handle_packet(pkt):
95     """Callback to process incoming DHCP packets and detect DHCPOFFER."""
96     if DHCP in pkt:
97         dhcp_opts = pkt[DHCP].options
98         for opt in dhcp_opts:
99             if isinstance(opt, tuple) and opt[0] == "message-type":
100                 if opt[1] in [2, "offer"]:  # DHCPOFFER detected
101                     server_ip = pkt[IP].src
102                     offered_ip = pkt[BOOTP].yiaddr
103                     print(f"[+] Received DHCPOFFER from {server_ip}")
104                     print(f"    Offered IP: {offered_ip}")
105                     print("    Full DHCP options:")
106                     print_dhcp_options(dhcp_opts)
107                     return True
108     return False
109
110 def test_dhcp():
111     # Start sniffing in *async* mode so we don't block.
112     sniff_thread = AsyncSniffer(
113         iface="eth0",
114         filter="udp and (port 67 or port 68)",
115         prn=handle_packet
116     )
117     sniff_thread.start()
118
119     time.sleep(1)  # Give it a moment to get ready
120
121     # Now send DHCPDISCOVER
122     discover = (
123         Ether(src="02:50:02:99:00:01", dst="ff:ff:ff:ff:ff:ff")
124         / IP(src="0.0.0.0", dst="255.255.255.255")
125         / UDP(sport=68, dport=67)
126         / BOOTP(chaddr=b'\x02\x50\x02\x99\x00\x01', xid=0x99999999, flags=0x8000)
127         / DHCP(options=[
128             ("message-type", "discover"),
129             ("parameter-request-list", [43]),  # Explicitly request Option 43
130             ("vendor_class_id", "o-ran-ru2/pynts"),  # Option 60
131             "end"
132         ])
133     )
134     print("[*] Sending DHCPDISCOVER...")
135     sendp(discover, iface="eth0", verbose=False)
136
137     print("[*] Sniffing for 5 seconds...")
138     time.sleep(5)
139
140     sniff_thread.stop()
141     results = sniff_thread.results
142     print(f"[+] Captured {len(results)} packets in total")
143
144     for pkt in results:
145         if handle_packet(pkt):
146             print("[+] Test SUCCESS - Received a valid DHCPOFFER.")
147             sys.exit(0)  # Exit with success code 0
148
149     print("[-] Test FAILED - No DHCPOFFER received.")
150     sys.exit(1)  # Exit with failure code 1
151
152 if __name__ == "__main__":
153     test_dhcp()