- 为 parse_dns_response 函数添加文档字符串,说明函数功能、参数和返回值 - 优化函数内部逻辑,明确各部分的解析目的和过程 -调整代码格式,提高可维护性
443 lines
15 KiB
Python
443 lines
15 KiB
Python
import tkinter as tk
|
||
from tkinter import ttk, messagebox
|
||
import socket
|
||
import struct
|
||
import time
|
||
import uuid
|
||
import re
|
||
import random
|
||
|
||
# 获取本机信息
|
||
def get_local_info():
|
||
"""
|
||
获取本机的主机名、MAC地址和IP地址列表。
|
||
|
||
Returns:
|
||
tuple: 包含主机名、MAC地址和IP地址列表的元组。
|
||
"""
|
||
hostname = socket.gethostname() # 获取主机名
|
||
local_ips = socket.gethostbyname_ex(hostname)[-1] # 获取本机的IP地址列表
|
||
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())) # 获取MAC地址
|
||
return hostname, mac, local_ips
|
||
|
||
# 验证 IP 地址或域名
|
||
def validate_ip_or_domain(value):
|
||
"""
|
||
验证输入值是否为有效的IP地址或域名。
|
||
|
||
Args:
|
||
value (str): 待验证的IP地址或域名。
|
||
|
||
Returns:
|
||
bool: 如果输入值有效返回True,否则返回False。
|
||
"""
|
||
ip_pattern = re.compile(r'^\d{1,3}(\.\d{1,3}){3}$') # 匹配IPv4地址的正则
|
||
domain_pattern = re.compile(r'^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$') # 匹配域名的正则
|
||
return ip_pattern.match(value) or domain_pattern.match(value)
|
||
|
||
# 验证端口号
|
||
def validate_port(port):
|
||
"""
|
||
验证输入的端口号是否有效。
|
||
|
||
Args:
|
||
port (str): 待验证的端口号。
|
||
|
||
Returns:
|
||
bool: 如果端口号有效返回True,否则返回False。
|
||
"""
|
||
return port.isdigit() and 1 <= int(port) <= 65535
|
||
|
||
# 构造 IP 头部
|
||
def construct_ip_header(src_ip, dest_ip, payload_length, protocol):
|
||
"""
|
||
构造IP头部。
|
||
|
||
Args:
|
||
src_ip (str): 源IP地址。
|
||
dest_ip (str): 目标IP地址。
|
||
payload_length (int): 负载长度。
|
||
protocol (int): 协议号。
|
||
|
||
Returns:
|
||
bytes: 构造的IP头部。
|
||
"""
|
||
# IP头部的版本号,IPv4为4
|
||
version = 4
|
||
# IP头部长度,IPv4头部长度为5个32位字节,即20字节
|
||
ihl = 5
|
||
# 服务类型,表示优先权及吞吐量、延迟等特性
|
||
tos = 0
|
||
# 总长度,即IP头部加数据部分的长度
|
||
total_length = 20 + payload_length # IP头部20字节 + 数据部分
|
||
# 标识符,用于唯一标识主机发送的每一份数据包
|
||
identification = 54321
|
||
# 标志位与片偏移,用于指示数据包的分割与重组
|
||
flags_offset = 0
|
||
# 生存时间,表示数据包可以经过的路由器数量
|
||
ttl = 64
|
||
# 首部校验和,用于错误检测
|
||
header_checksum = 0
|
||
|
||
# 将源IP转换为二进制格式
|
||
src_ip_bytes = socket.inet_aton(src_ip)
|
||
# 将目标IP转换为二进制格式
|
||
dest_ip_bytes = socket.inet_aton(dest_ip)
|
||
# 打包IP头部信息,转换为二进制格式
|
||
ip_header = struct.pack('!BBHHHBBH4s4s',
|
||
(version << 4) + ihl, tos, total_length,
|
||
identification, flags_offset, ttl, protocol,
|
||
header_checksum, src_ip_bytes, dest_ip_bytes)
|
||
|
||
# 返回构造的IP头部
|
||
return ip_header
|
||
|
||
|
||
# 构造 ICMP 报文
|
||
def construct_icmp():
|
||
"""
|
||
构造ICMP报文。
|
||
|
||
Returns:
|
||
bytes: 构造的ICMP报文。
|
||
"""
|
||
# ICMP 报文头部: 类型(8), 代码(0), 校验和(0), 标识符(1), 序列号(1)
|
||
# 一开始校验和设为0,因为之后需要先计算校验和
|
||
icmp_header = struct.pack('!BBHHH', 8, 0, 0, 1, 1)
|
||
# ICMP 报文的数据部分,这里简单地使用 'Ping' 作为示例数据
|
||
data = b'Ping'
|
||
# 计算校验和,确保报文的完整性
|
||
checksum = calculate_checksum(icmp_header + data)
|
||
# 填充校验和字段,完成报文头部的构造
|
||
icmp_header = struct.pack('!BBHHH', 8, 0, checksum, 1, 1)
|
||
# 返回完整的ICMP报文
|
||
return icmp_header + data
|
||
|
||
# 构造 UDP 报文
|
||
def construct_udp(src_port, dest_port):
|
||
"""
|
||
构造UDP报文。
|
||
|
||
Args:
|
||
src_port (int): 源端口号。
|
||
dest_port (int): 目标端口号。
|
||
|
||
Returns:
|
||
bytes: 构造的UDP报文。
|
||
"""
|
||
# UDP头部格式: 源端口号 (2 字节), 目标端口号 (2 字节), 长度 (2 字节), 校验和 (2 字节)
|
||
# 长度固定为8,因为UDP头部长度为8字节,校验和暂时为0,稍后由IP层计算并填充
|
||
udp_header = struct.pack('!HHHH', src_port, dest_port, 8, 0)
|
||
return udp_header
|
||
|
||
# 构造 TCP 报文(SYN)
|
||
def construct_tcp(src_port, dest_port):
|
||
"""
|
||
构造TCP报文(SYN)。
|
||
|
||
Args:
|
||
src_port (int): 源端口号。
|
||
dest_port (int): 目标端口号。
|
||
|
||
Returns:
|
||
bytes: 构造的TCP报文。
|
||
"""
|
||
# 初始化TCP标头字段
|
||
seq = 0 # 初始序列号
|
||
ack_seq = 0 # 确认序列号
|
||
offset = 5 # TCP头部的长度,单位是32位字(5 * 4 = 20字节)
|
||
reserved = 0 # 保留字段,默认为0
|
||
flags = 0b000010 # SYN标志位设置为1,其他标志为0
|
||
window = socket.htons(5840) # 窗口大小,使用network字节序
|
||
checksum = 0 # 校验和,暂时为0
|
||
urgent_ptr = 0 # 紧急指针,默认为0
|
||
|
||
# 打包TCP报文头部
|
||
tcp_header = struct.pack('!HHLLBBHHH',
|
||
src_port, dest_port, seq, ack_seq,
|
||
(offset << 4) + reserved, flags, window,
|
||
checksum, urgent_ptr)
|
||
return tcp_header
|
||
|
||
# 计算校验和
|
||
def calculate_checksum(data):
|
||
"""
|
||
计算校验和。
|
||
|
||
Args:
|
||
data (bytes): 待计算校验和的数据。
|
||
|
||
Returns:
|
||
int: 计算得到的校验和。
|
||
"""
|
||
# 初始化校验和变量
|
||
checksum = 0
|
||
# 获取数据长度
|
||
n = len(data)
|
||
|
||
# 按 16 位块划分
|
||
for i in range(0, n - 1, 2):
|
||
# 将每块数据转换为 16 位整数
|
||
chunk = (data[i] << 8) + data[i + 1]
|
||
# 累加每块数据到校验和
|
||
checksum += chunk
|
||
|
||
# 如果长度为奇数,补零
|
||
if n % 2 == 1:
|
||
# 只取最后一个字节,并将其置于 16 位整数的高字节位置
|
||
checksum += data[-1] << 8
|
||
|
||
# 将 32 位总和折叠为 16 位
|
||
checksum = (checksum >> 16) + (checksum & 0xFFFF)
|
||
checksum += (checksum >> 16)
|
||
|
||
# 对总和取反
|
||
return ~checksum & 0xFFFF
|
||
|
||
# 构造 DNS 查询报文
|
||
def build_dns_query(domain):
|
||
"""
|
||
构建DNS查询报文。
|
||
|
||
Args:
|
||
domain (str): 需要查询的域名。
|
||
|
||
Returns:
|
||
bytes: 构建的DNS查询报文。
|
||
"""
|
||
# DNS请求报文头部
|
||
transaction_id = random.randint(0, 65535) # 随机生成事务ID
|
||
flags = 0x0100 # 标志字段(标准查询请求)
|
||
questions = 1 # 查询问题数
|
||
answer_rrs = 0 # 回答资源记录数
|
||
authority_rrs = 0 # 权威记录数
|
||
additional_rrs = 0 # 附加记录数
|
||
|
||
# 将DNS报文头部各个字段打包成二进制数据
|
||
header = struct.pack('>HHHHHH', transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs)
|
||
|
||
# 域名部分(查询问题)
|
||
domain_parts = domain.split('.')
|
||
query = b''
|
||
for part in domain_parts:
|
||
# 每个域名部分前添加其长度,并将域名部分转换为字节串
|
||
query += struct.pack('B', len(part)) + part.encode()
|
||
query += b'\0' # 域名结束符
|
||
|
||
# 查询类型 (A记录: 1)
|
||
query_type = 1
|
||
# 查询类 (IN: 1)
|
||
query_class = 1
|
||
|
||
# 将查询类型和查询类打包到查询报文中
|
||
# 完整的查询报文由头部、域名部分和查询类型、查询类组成
|
||
query_packet = header + query + struct.pack('>HH', query_type, query_class)
|
||
|
||
return query_packet
|
||
|
||
# 解析 DNS 响应报文
|
||
def parse_dns_response(response):
|
||
"""
|
||
解析DNS响应报文,提取IP地址。
|
||
|
||
Args:
|
||
response: response -- DNS响应报文(字节流)
|
||
|
||
Returns:
|
||
String: IP地址字符串,如果未找到A记录则返回None。
|
||
"""
|
||
# 解析DNS响应报文的头部信息
|
||
# 响应头部:包含事务ID、标志、问题数、答案数等
|
||
transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs = struct.unpack('>HHHHHH', response[:12])
|
||
|
||
# 初始化偏移量以解析响应报文的其余部分
|
||
offset = 12
|
||
# 跳过问题部分,找到第一个答案记录的起始位置
|
||
while response[offset] != 0:
|
||
length = response[offset]
|
||
offset += length + 1
|
||
offset += 5 # 跳过结束符和查询类型、查询类部分
|
||
|
||
# 解析回答部分,寻找A记录
|
||
# 答案数量
|
||
for _ in range(answer_rrs):
|
||
# 跳过名字部分(不需要,因为已经知道是A记录)
|
||
offset += 2 # 跳过指针
|
||
# 解析答案的类型、类、TTL和数据长度
|
||
answer_type, answer_class, ttl, data_len = struct.unpack('>HHIH', response[offset:offset+10])
|
||
offset += 10
|
||
if answer_type == 1: # A记录
|
||
# 获取并返回IP地址
|
||
ip = socket.inet_ntoa(response[offset:offset+4])
|
||
return ip
|
||
offset += data_len # 跳过数据部分
|
||
|
||
# 如果没有找到A记录,返回None
|
||
return None
|
||
|
||
# 发送报文
|
||
def send_packet():
|
||
"""
|
||
发送报文。
|
||
"""
|
||
try:
|
||
# 获取源IP、目标主机和目标端口
|
||
src_ip = ip_combobox.get()
|
||
dest_host = dest_entry.get()
|
||
dest_port = port_entry.get()
|
||
packet_type = var.get()
|
||
local_port = local_port_entry.get() # 获取本地发送端口
|
||
|
||
# 验证目标地址
|
||
if not validate_ip_or_domain(dest_host):
|
||
raise ValueError("目标地址无效,必须为有效的 IP 或域名")
|
||
|
||
# 验证端口号
|
||
if not validate_port(dest_port):
|
||
raise ValueError("端口号无效,必须是有效的正整数(1-65535)")
|
||
|
||
# 验证本地发送端口号
|
||
if local_port and not validate_port(local_port):
|
||
raise ValueError("本地发送端口号无效,必须是有效的正整数(1-65535)")
|
||
|
||
# 解析目标IP地址
|
||
dest_ip = socket.gethostbyname(dest_host)
|
||
src_port = int(local_port) if local_port else 12345 # 使用用户输入的本地端口或默认端口
|
||
dest_port = int(dest_port)
|
||
|
||
# ICMP报文需要添加 RTT 计算
|
||
if packet_type == "ICMP":
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
|
||
packet = construct_icmp()
|
||
|
||
# 记录发送时间
|
||
start_time = time.time()
|
||
|
||
# 发送ICMP报文
|
||
sock.sendto(packet, (dest_ip, 0))
|
||
|
||
# 接收响应
|
||
sock.settimeout(2) # 设置超时为2秒
|
||
try:
|
||
sock.recvfrom(1024)
|
||
end_time = time.time()
|
||
rtt = (end_time - start_time) * 1000 # 毫秒
|
||
result.set(f"ICMP请求成功,RTT: {rtt:.2f} 毫秒")
|
||
except socket.timeout:
|
||
result.set("ICMP请求超时")
|
||
finally:
|
||
sock.close()
|
||
|
||
return # 结束函数
|
||
|
||
# 对于其他报文类型(UDP, TCP, DNS, IP)
|
||
if packet_type == "UDP":
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.bind(('', src_port)) # 绑定本地发送端口
|
||
packet = construct_udp(src_port, dest_port)
|
||
elif packet_type == "TCP":
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.bind(('', src_port)) # 绑定本地发送端口
|
||
sock.connect((dest_ip, dest_port))
|
||
packet = b"Hello, TCP!" # TCP 是面向连接的,这里发送简单的字符串
|
||
sock.sendall(packet)
|
||
sock.close()
|
||
result.set(f"TCP 连接成功,数据已发送!")
|
||
return # 结束函数
|
||
elif packet_type == "DNS":
|
||
dns_server = '8.8.8.8' # Google Public DNS
|
||
port = 53
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.bind(('', src_port)) # 绑定本地发送端口
|
||
query_packet = build_dns_query(dest_host)
|
||
sock.sendto(query_packet, (dns_server, port))
|
||
response, _ = sock.recvfrom(512)
|
||
ip = parse_dns_response(response)
|
||
if ip:
|
||
result.set(f"DNS解析成功,IP地址: {ip}")
|
||
else:
|
||
result.set("DNS解析失败")
|
||
sock.close()
|
||
return # 结束函数
|
||
elif packet_type == "IP": # 添加IP报文类型处理
|
||
# 构造IP头部
|
||
payload = b"Hello, IP!" # 示例负载数据
|
||
ip_header = construct_ip_header(src_ip, dest_ip, len(payload), socket.IPPROTO_RAW)
|
||
packet = ip_header + payload
|
||
|
||
# 创建原始套接字
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
|
||
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 告诉内核IP头部已包含在数据包中
|
||
sock.bind(('', src_port)) # 绑定本地发送端口
|
||
|
||
# 发送IP报文
|
||
sock.sendto(packet, (dest_ip, 0))
|
||
result.set(f"IP报文发送成功!")
|
||
sock.close()
|
||
return # 结束函数
|
||
|
||
# 发送报文
|
||
sock.sendto(packet, (dest_ip, dest_port))
|
||
result.set(f"报文发送成功!类型: {packet_type}")
|
||
except socket.gaierror:
|
||
messagebox.showerror("网络错误", "无法解析目标地址,请检查输入的 IP 或域名是否正确。")
|
||
except socket.error as e:
|
||
messagebox.showerror("网络错误", f"网络错误: {e.strerror}")
|
||
except ValueError as e:
|
||
messagebox.showerror("输入错误", f"输入无效: {e}")
|
||
except Exception as e:
|
||
messagebox.showerror("未知错误", f"发送报文时发生未知错误: {e}")
|
||
|
||
# GUI 界面设计
|
||
window = tk.Tk()
|
||
window.title("网络通信软件")
|
||
|
||
# 获取本机信息
|
||
hostname, mac, local_ips = get_local_info()
|
||
|
||
tk.Label(window, text="本机信息:").grid(row=0, column=0)
|
||
info_text = tk.Text(window, height=4, width=40)
|
||
info_text.grid(row=0, column=1, columnspan=2)
|
||
info_text.insert(tk.END, f"主机名: {hostname}\nMAC 地址: {mac}\n可用 IP: {', '.join(local_ips)}")
|
||
info_text.config(state=tk.DISABLED)
|
||
|
||
# 配置IP地址选择
|
||
tk.Label(window, text="本机 IP 地址:").grid(row=1, column=0)
|
||
ip_combobox = ttk.Combobox(window, values=local_ips)
|
||
ip_combobox.grid(row=1, column=1)
|
||
ip_combobox.current(0)
|
||
|
||
# 配置本地发送端口输入框
|
||
tk.Label(window, text="本地发送端口:").grid(row=1, column=2)
|
||
local_port_entry = tk.Entry(window)
|
||
local_port_entry.grid(row=1, column=3)
|
||
|
||
# 配置目标地址输入框
|
||
tk.Label(window, text="目标地址:").grid(row=2, column=0)
|
||
dest_entry = tk.Entry(window)
|
||
dest_entry.grid(row=2, column=1)
|
||
|
||
# 配置目标端口输入框
|
||
tk.Label(window, text="目标端口:").grid(row=3, column=0)
|
||
port_entry = tk.Entry(window)
|
||
port_entry.grid(row=3, column=1)
|
||
|
||
# 配置报文类型选择
|
||
tk.Label(window, text="报文类型:").grid(row=4, column=0)
|
||
var = tk.StringVar(value="ICMP")
|
||
tk.Radiobutton(window, text="ICMP", variable=var, value="ICMP").grid(row=4, column=1)
|
||
tk.Radiobutton(window, text="UDP", variable=var, value="UDP").grid(row=4, column=2)
|
||
tk.Radiobutton(window, text="TCP", variable=var, value="TCP").grid(row=4, column=3)
|
||
tk.Radiobutton(window, text="DNS", variable=var, value="DNS").grid(row=4, column=4)
|
||
tk.Radiobutton(window, text="IP", variable=var, value="IP").grid(row=4, column=5) # 添加IP选项
|
||
|
||
# 配置结果显示
|
||
result = tk.StringVar()
|
||
tk.Label(window, textvariable=result).grid(row=6, column=0, columnspan=3)
|
||
|
||
# 发送报文按钮
|
||
tk.Button(window, text="发送报文", command=send_packet).grid(row=5, column=1)
|
||
|
||
# 启动GUI主循环
|
||
window.mainloop()
|