import tkinter as tk from tkinter import ttk, messagebox import socket import struct import time import uuid import re import random # 获取本机信息 def get_local_info(): """ 获取本机的主机名、MAC地址和IP地址列表。 Returns: tuple: 包含主机名、MAC地址和IP地址列表的元组。 """ hostname = socket.gethostname() # 获取主机名 local_ips = socket.gethostbyname_ex(hostname)[-1] # 获取本机的IP地址列表 mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())) # 获取MAC地址 return hostname, mac, local_ips # 验证 IP 地址或域名 def validate_ip_or_domain(value): """ 验证输入值是否为有效的IP地址或域名。 Args: value (str): 待验证的IP地址或域名。 Returns: bool: 如果输入值有效返回True,否则返回False。 """ ip_pattern = re.compile(r'^\d{1,3}(\.\d{1,3}){3}$') # 匹配IPv4地址的正则 domain_pattern = re.compile(r'^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$') # 匹配域名的正则 return ip_pattern.match(value) or domain_pattern.match(value) # 验证端口号 def validate_port(port): """ 验证输入的端口号是否有效。 Args: port (str): 待验证的端口号。 Returns: bool: 如果端口号有效返回True,否则返回False。 """ return port.isdigit() and 1 <= int(port) <= 65535 # 构造 IP 头部 def construct_ip_header(src_ip, dest_ip, payload_length, protocol): """ 构造IP头部。 Args: src_ip (str): 源IP地址。 dest_ip (str): 目标IP地址。 payload_length (int): 负载长度。 protocol (int): 协议号。 Returns: bytes: 构造的IP头部。 """ # IP头部的版本号,IPv4为4 version = 4 # IP头部长度,IPv4头部长度为5个32位字节,即20字节 ihl = 5 # 服务类型,表示优先权及吞吐量、延迟等特性 tos = 0 # 总长度,即IP头部加数据部分的长度 total_length = 20 + payload_length # IP头部20字节 + 数据部分 # 标识符,用于唯一标识主机发送的每一份数据包 identification = 54321 # 标志位与片偏移,用于指示数据包的分割与重组 flags_offset = 0 # 生存时间,表示数据包可以经过的路由器数量 ttl = 64 # 首部校验和,用于错误检测 header_checksum = 0 # 将源IP转换为二进制格式 src_ip_bytes = socket.inet_aton(src_ip) # 将目标IP转换为二进制格式 dest_ip_bytes = socket.inet_aton(dest_ip) # 打包IP头部信息,转换为二进制格式 ip_header = struct.pack('!BBHHHBBH4s4s', (version << 4) + ihl, tos, total_length, identification, flags_offset, ttl, protocol, header_checksum, src_ip_bytes, dest_ip_bytes) # 返回构造的IP头部 return ip_header # 构造 ICMP 报文 def construct_icmp(): """ 构造ICMP报文。 Returns: bytes: 构造的ICMP报文。 """ # ICMP 报文头部: 类型(8), 代码(0), 校验和(0), 标识符(1), 序列号(1) # 一开始校验和设为0,因为之后需要先计算校验和 icmp_header = struct.pack('!BBHHH', 8, 0, 0, 1, 1) # ICMP 报文的数据部分,这里简单地使用 'Ping' 作为示例数据 data = b'Ping' # 计算校验和,确保报文的完整性 checksum = calculate_checksum(icmp_header + data) # 填充校验和字段,完成报文头部的构造 icmp_header = struct.pack('!BBHHH', 8, 0, checksum, 1, 1) # 返回完整的ICMP报文 return icmp_header + data # 构造 UDP 报文 def construct_udp(src_port, dest_port): """ 构造UDP报文。 Args: src_port (int): 源端口号。 dest_port (int): 目标端口号。 Returns: bytes: 构造的UDP报文。 """ # UDP头部格式: 源端口号 (2 字节), 目标端口号 (2 字节), 长度 (2 字节), 校验和 (2 字节) # 长度固定为8,因为UDP头部长度为8字节,校验和暂时为0,稍后由IP层计算并填充 udp_header = struct.pack('!HHHH', src_port, dest_port, 8, 0) return udp_header # 构造 TCP 报文(SYN) def construct_tcp(src_port, dest_port): """ 构造TCP报文(SYN)。 Args: src_port (int): 源端口号。 dest_port (int): 目标端口号。 Returns: bytes: 构造的TCP报文。 """ # 初始化TCP标头字段 seq = 0 # 初始序列号 ack_seq = 0 # 确认序列号 offset = 5 # TCP头部的长度,单位是32位字(5 * 4 = 20字节) reserved = 0 # 保留字段,默认为0 flags = 0b000010 # SYN标志位设置为1,其他标志为0 window = socket.htons(5840) # 窗口大小,使用network字节序 checksum = 0 # 校验和,暂时为0 urgent_ptr = 0 # 紧急指针,默认为0 # 打包TCP报文头部 tcp_header = struct.pack('!HHLLBBHHH', src_port, dest_port, seq, ack_seq, (offset << 4) + reserved, flags, window, checksum, urgent_ptr) return tcp_header # 计算校验和 def calculate_checksum(data): """ 计算校验和。 Args: data (bytes): 待计算校验和的数据。 Returns: int: 计算得到的校验和。 """ # 初始化校验和变量 checksum = 0 # 获取数据长度 n = len(data) # 按 16 位块划分 for i in range(0, n - 1, 2): # 将每块数据转换为 16 位整数 chunk = (data[i] << 8) + data[i + 1] # 累加每块数据到校验和 checksum += chunk # 如果长度为奇数,补零 if n % 2 == 1: # 只取最后一个字节,并将其置于 16 位整数的高字节位置 checksum += data[-1] << 8 # 将 32 位总和折叠为 16 位 checksum = (checksum >> 16) + (checksum & 0xFFFF) checksum += (checksum >> 16) # 对总和取反 return ~checksum & 0xFFFF # 构造 DNS 查询报文 def build_dns_query(domain): """ 构建DNS查询报文。 Args: domain (str): 需要查询的域名。 Returns: bytes: 构建的DNS查询报文。 """ # DNS请求报文头部 transaction_id = random.randint(0, 65535) # 随机生成事务ID flags = 0x0100 # 标志字段(标准查询请求) questions = 1 # 查询问题数 answer_rrs = 0 # 回答资源记录数 authority_rrs = 0 # 权威记录数 additional_rrs = 0 # 附加记录数 # 将DNS报文头部各个字段打包成二进制数据 header = struct.pack('>HHHHHH', transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs) # 域名部分(查询问题) domain_parts = domain.split('.') query = b'' for part in domain_parts: # 每个域名部分前添加其长度,并将域名部分转换为字节串 query += struct.pack('B', len(part)) + part.encode() query += b'\0' # 域名结束符 # 查询类型 (A记录: 1) query_type = 1 # 查询类 (IN: 1) query_class = 1 # 将查询类型和查询类打包到查询报文中 # 完整的查询报文由头部、域名部分和查询类型、查询类组成 query_packet = header + query + struct.pack('>HH', query_type, query_class) return query_packet # 解析 DNS 响应报文 def parse_dns_response(response): """ 解析DNS响应报文,提取IP地址。 Args: response: response -- DNS响应报文(字节流) Returns: String: IP地址字符串,如果未找到A记录则返回None。 """ # 解析DNS响应报文的头部信息 # 响应头部:包含事务ID、标志、问题数、答案数等 transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs = struct.unpack('>HHHHHH', response[:12]) # 初始化偏移量以解析响应报文的其余部分 offset = 12 # 跳过问题部分,找到第一个答案记录的起始位置 while response[offset] != 0: length = response[offset] offset += length + 1 offset += 5 # 跳过结束符和查询类型、查询类部分 # 解析回答部分,寻找A记录 # 答案数量 for _ in range(answer_rrs): # 跳过名字部分(不需要,因为已经知道是A记录) offset += 2 # 跳过指针 # 解析答案的类型、类、TTL和数据长度 answer_type, answer_class, ttl, data_len = struct.unpack('>HHIH', response[offset:offset+10]) offset += 10 if answer_type == 1: # A记录 # 获取并返回IP地址 ip = socket.inet_ntoa(response[offset:offset+4]) return ip offset += data_len # 跳过数据部分 # 如果没有找到A记录,返回None return None # 发送报文 def send_packet(): """ 发送报文。 """ try: # 获取源IP、目标主机和目标端口 src_ip = ip_combobox.get() dest_host = dest_entry.get() dest_port = port_entry.get() packet_type = var.get() local_port = local_port_entry.get() # 获取本地发送端口 # 验证目标地址 if not validate_ip_or_domain(dest_host): raise ValueError("目标地址无效,必须为有效的 IP 或域名") # 验证端口号 if not validate_port(dest_port): raise ValueError("端口号无效,必须是有效的正整数(1-65535)") # 验证本地发送端口号 if local_port and not validate_port(local_port): raise ValueError("本地发送端口号无效,必须是有效的正整数(1-65535)") # 解析目标IP地址 dest_ip = socket.gethostbyname(dest_host) src_port = int(local_port) if local_port else 12345 # 使用用户输入的本地端口或默认端口 dest_port = int(dest_port) # ICMP报文需要添加 RTT 计算 if packet_type == "ICMP": sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) packet = construct_icmp() # 记录发送时间 start_time = time.time() # 发送ICMP报文 sock.sendto(packet, (dest_ip, 0)) # 接收响应 sock.settimeout(2) # 设置超时为2秒 try: sock.recvfrom(1024) end_time = time.time() rtt = (end_time - start_time) * 1000 # 毫秒 result.set(f"ICMP请求成功,RTT: {rtt:.2f} 毫秒") except socket.timeout: result.set("ICMP请求超时") finally: sock.close() return # 结束函数 # 对于其他报文类型(UDP, TCP, DNS, IP) if packet_type == "UDP": sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('', src_port)) # 绑定本地发送端口 packet = construct_udp(src_port, dest_port) elif packet_type == "TCP": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('', src_port)) # 绑定本地发送端口 sock.connect((dest_ip, dest_port)) packet = b"Hello, TCP!" # TCP 是面向连接的,这里发送简单的字符串 sock.sendall(packet) sock.close() result.set(f"TCP 连接成功,数据已发送!") return # 结束函数 elif packet_type == "DNS": dns_server = '8.8.8.8' # Google Public DNS port = 53 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('', src_port)) # 绑定本地发送端口 query_packet = build_dns_query(dest_host) sock.sendto(query_packet, (dns_server, port)) response, _ = sock.recvfrom(512) ip = parse_dns_response(response) if ip: result.set(f"DNS解析成功,IP地址: {ip}") else: result.set("DNS解析失败") sock.close() return # 结束函数 elif packet_type == "IP": # 添加IP报文类型处理 # 构造IP头部 payload = b"Hello, IP!" # 示例负载数据 ip_header = construct_ip_header(src_ip, dest_ip, len(payload), socket.IPPROTO_RAW) packet = ip_header + payload # 创建原始套接字 sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 告诉内核IP头部已包含在数据包中 sock.bind(('', src_port)) # 绑定本地发送端口 # 发送IP报文 sock.sendto(packet, (dest_ip, 0)) result.set(f"IP报文发送成功!") sock.close() return # 结束函数 # 发送报文 sock.sendto(packet, (dest_ip, dest_port)) result.set(f"报文发送成功!类型: {packet_type}") except socket.gaierror: messagebox.showerror("网络错误", "无法解析目标地址,请检查输入的 IP 或域名是否正确。") except socket.error as e: messagebox.showerror("网络错误", f"网络错误: {e.strerror}") except ValueError as e: messagebox.showerror("输入错误", f"输入无效: {e}") except Exception as e: messagebox.showerror("未知错误", f"发送报文时发生未知错误: {e}") # GUI 界面设计 window = tk.Tk() window.title("网络通信软件") # 获取本机信息 hostname, mac, local_ips = get_local_info() tk.Label(window, text="本机信息:").grid(row=0, column=0) info_text = tk.Text(window, height=4, width=40) info_text.grid(row=0, column=1, columnspan=2) info_text.insert(tk.END, f"主机名: {hostname}\nMAC 地址: {mac}\n可用 IP: {', '.join(local_ips)}") info_text.config(state=tk.DISABLED) # 配置IP地址选择 tk.Label(window, text="本机 IP 地址:").grid(row=1, column=0) ip_combobox = ttk.Combobox(window, values=local_ips) ip_combobox.grid(row=1, column=1) ip_combobox.current(0) # 配置本地发送端口输入框 tk.Label(window, text="本地发送端口:").grid(row=1, column=2) local_port_entry = tk.Entry(window) local_port_entry.grid(row=1, column=3) # 配置目标地址输入框 tk.Label(window, text="目标地址:").grid(row=2, column=0) dest_entry = tk.Entry(window) dest_entry.grid(row=2, column=1) # 配置目标端口输入框 tk.Label(window, text="目标端口:").grid(row=3, column=0) port_entry = tk.Entry(window) port_entry.grid(row=3, column=1) # 配置报文类型选择 tk.Label(window, text="报文类型:").grid(row=4, column=0) var = tk.StringVar(value="ICMP") tk.Radiobutton(window, text="ICMP", variable=var, value="ICMP").grid(row=4, column=1) tk.Radiobutton(window, text="UDP", variable=var, value="UDP").grid(row=4, column=2) tk.Radiobutton(window, text="TCP", variable=var, value="TCP").grid(row=4, column=3) tk.Radiobutton(window, text="DNS", variable=var, value="DNS").grid(row=4, column=4) tk.Radiobutton(window, text="IP", variable=var, value="IP").grid(row=4, column=5) # 添加IP选项 # 配置结果显示 result = tk.StringVar() tk.Label(window, textvariable=result).grid(row=6, column=0, columnspan=3) # 发送报文按钮 tk.Button(window, text="发送报文", command=send_packet).grid(row=5, column=1) # 启动GUI主循环 window.mainloop()