Python-web-communicate/main.py
fly6516 2fb3a08fbf refactor(main): 优化 DNS响应解析函数
- 为 parse_dns_response 函数添加文档字符串,说明函数功能、参数和返回值
- 优化函数内部逻辑,明确各部分的解析目的和过程
-调整代码格式,提高可维护性
2025-01-08 14:54:12 +08:00

443 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox
import socket
import struct
import time
import uuid
import re
import random
# 获取本机信息
def get_local_info():
"""
获取本机的主机名、MAC地址和IP地址列表。
Returns:
tuple: 包含主机名、MAC地址和IP地址列表的元组。
"""
hostname = socket.gethostname() # 获取主机名
local_ips = socket.gethostbyname_ex(hostname)[-1] # 获取本机的IP地址列表
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())) # 获取MAC地址
return hostname, mac, local_ips
# 验证 IP 地址或域名
def validate_ip_or_domain(value):
"""
验证输入值是否为有效的IP地址或域名。
Args:
value (str): 待验证的IP地址或域名。
Returns:
bool: 如果输入值有效返回True否则返回False。
"""
ip_pattern = re.compile(r'^\d{1,3}(\.\d{1,3}){3}$') # 匹配IPv4地址的正则
domain_pattern = re.compile(r'^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$') # 匹配域名的正则
return ip_pattern.match(value) or domain_pattern.match(value)
# 验证端口号
def validate_port(port):
"""
验证输入的端口号是否有效。
Args:
port (str): 待验证的端口号。
Returns:
bool: 如果端口号有效返回True否则返回False。
"""
return port.isdigit() and 1 <= int(port) <= 65535
# 构造 IP 头部
def construct_ip_header(src_ip, dest_ip, payload_length, protocol):
"""
构造IP头部。
Args:
src_ip (str): 源IP地址。
dest_ip (str): 目标IP地址。
payload_length (int): 负载长度。
protocol (int): 协议号。
Returns:
bytes: 构造的IP头部。
"""
# IP头部的版本号IPv4为4
version = 4
# IP头部长度IPv4头部长度为5个32位字节即20字节
ihl = 5
# 服务类型,表示优先权及吞吐量、延迟等特性
tos = 0
# 总长度即IP头部加数据部分的长度
total_length = 20 + payload_length # IP头部20字节 + 数据部分
# 标识符,用于唯一标识主机发送的每一份数据包
identification = 54321
# 标志位与片偏移,用于指示数据包的分割与重组
flags_offset = 0
# 生存时间,表示数据包可以经过的路由器数量
ttl = 64
# 首部校验和,用于错误检测
header_checksum = 0
# 将源IP转换为二进制格式
src_ip_bytes = socket.inet_aton(src_ip)
# 将目标IP转换为二进制格式
dest_ip_bytes = socket.inet_aton(dest_ip)
# 打包IP头部信息转换为二进制格式
ip_header = struct.pack('!BBHHHBBH4s4s',
(version << 4) + ihl, tos, total_length,
identification, flags_offset, ttl, protocol,
header_checksum, src_ip_bytes, dest_ip_bytes)
# 返回构造的IP头部
return ip_header
# 构造 ICMP 报文
def construct_icmp():
"""
构造ICMP报文。
Returns:
bytes: 构造的ICMP报文。
"""
# ICMP 报文头部: 类型(8), 代码(0), 校验和(0), 标识符(1), 序列号(1)
# 一开始校验和设为0因为之后需要先计算校验和
icmp_header = struct.pack('!BBHHH', 8, 0, 0, 1, 1)
# ICMP 报文的数据部分,这里简单地使用 'Ping' 作为示例数据
data = b'Ping'
# 计算校验和,确保报文的完整性
checksum = calculate_checksum(icmp_header + data)
# 填充校验和字段,完成报文头部的构造
icmp_header = struct.pack('!BBHHH', 8, 0, checksum, 1, 1)
# 返回完整的ICMP报文
return icmp_header + data
# 构造 UDP 报文
def construct_udp(src_port, dest_port):
"""
构造UDP报文。
Args:
src_port (int): 源端口号。
dest_port (int): 目标端口号。
Returns:
bytes: 构造的UDP报文。
"""
# UDP头部格式: 源端口号 (2 字节), 目标端口号 (2 字节), 长度 (2 字节), 校验和 (2 字节)
# 长度固定为8因为UDP头部长度为8字节校验和暂时为0稍后由IP层计算并填充
udp_header = struct.pack('!HHHH', src_port, dest_port, 8, 0)
return udp_header
# 构造 TCP 报文SYN
def construct_tcp(src_port, dest_port):
"""
构造TCP报文SYN
Args:
src_port (int): 源端口号。
dest_port (int): 目标端口号。
Returns:
bytes: 构造的TCP报文。
"""
# 初始化TCP标头字段
seq = 0 # 初始序列号
ack_seq = 0 # 确认序列号
offset = 5 # TCP头部的长度单位是32位字5 * 4 = 20字节
reserved = 0 # 保留字段默认为0
flags = 0b000010 # SYN标志位设置为1其他标志为0
window = socket.htons(5840) # 窗口大小使用network字节序
checksum = 0 # 校验和暂时为0
urgent_ptr = 0 # 紧急指针默认为0
# 打包TCP报文头部
tcp_header = struct.pack('!HHLLBBHHH',
src_port, dest_port, seq, ack_seq,
(offset << 4) + reserved, flags, window,
checksum, urgent_ptr)
return tcp_header
# 计算校验和
def calculate_checksum(data):
"""
计算校验和。
Args:
data (bytes): 待计算校验和的数据。
Returns:
int: 计算得到的校验和。
"""
# 初始化校验和变量
checksum = 0
# 获取数据长度
n = len(data)
# 按 16 位块划分
for i in range(0, n - 1, 2):
# 将每块数据转换为 16 位整数
chunk = (data[i] << 8) + data[i + 1]
# 累加每块数据到校验和
checksum += chunk
# 如果长度为奇数,补零
if n % 2 == 1:
# 只取最后一个字节,并将其置于 16 位整数的高字节位置
checksum += data[-1] << 8
# 将 32 位总和折叠为 16 位
checksum = (checksum >> 16) + (checksum & 0xFFFF)
checksum += (checksum >> 16)
# 对总和取反
return ~checksum & 0xFFFF
# 构造 DNS 查询报文
def build_dns_query(domain):
"""
构建DNS查询报文。
Args:
domain (str): 需要查询的域名。
Returns:
bytes: 构建的DNS查询报文。
"""
# DNS请求报文头部
transaction_id = random.randint(0, 65535) # 随机生成事务ID
flags = 0x0100 # 标志字段(标准查询请求)
questions = 1 # 查询问题数
answer_rrs = 0 # 回答资源记录数
authority_rrs = 0 # 权威记录数
additional_rrs = 0 # 附加记录数
# 将DNS报文头部各个字段打包成二进制数据
header = struct.pack('>HHHHHH', transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs)
# 域名部分(查询问题)
domain_parts = domain.split('.')
query = b''
for part in domain_parts:
# 每个域名部分前添加其长度,并将域名部分转换为字节串
query += struct.pack('B', len(part)) + part.encode()
query += b'\0' # 域名结束符
# 查询类型 (A记录: 1)
query_type = 1
# 查询类 (IN: 1)
query_class = 1
# 将查询类型和查询类打包到查询报文中
# 完整的查询报文由头部、域名部分和查询类型、查询类组成
query_packet = header + query + struct.pack('>HH', query_type, query_class)
return query_packet
# 解析 DNS 响应报文
def parse_dns_response(response):
"""
解析DNS响应报文提取IP地址。
Args:
response: response -- DNS响应报文字节流
Returns:
String: IP地址字符串如果未找到A记录则返回None。
"""
# 解析DNS响应报文的头部信息
# 响应头部包含事务ID、标志、问题数、答案数等
transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs = struct.unpack('>HHHHHH', response[:12])
# 初始化偏移量以解析响应报文的其余部分
offset = 12
# 跳过问题部分,找到第一个答案记录的起始位置
while response[offset] != 0:
length = response[offset]
offset += length + 1
offset += 5 # 跳过结束符和查询类型、查询类部分
# 解析回答部分寻找A记录
# 答案数量
for _ in range(answer_rrs):
# 跳过名字部分不需要因为已经知道是A记录
offset += 2 # 跳过指针
# 解析答案的类型、类、TTL和数据长度
answer_type, answer_class, ttl, data_len = struct.unpack('>HHIH', response[offset:offset+10])
offset += 10
if answer_type == 1: # A记录
# 获取并返回IP地址
ip = socket.inet_ntoa(response[offset:offset+4])
return ip
offset += data_len # 跳过数据部分
# 如果没有找到A记录返回None
return None
# 发送报文
def send_packet():
"""
发送报文。
"""
try:
# 获取源IP、目标主机和目标端口
src_ip = ip_combobox.get()
dest_host = dest_entry.get()
dest_port = port_entry.get()
packet_type = var.get()
local_port = local_port_entry.get() # 获取本地发送端口
# 验证目标地址
if not validate_ip_or_domain(dest_host):
raise ValueError("目标地址无效,必须为有效的 IP 或域名")
# 验证端口号
if not validate_port(dest_port):
raise ValueError("端口号无效必须是有效的正整数1-65535")
# 验证本地发送端口号
if local_port and not validate_port(local_port):
raise ValueError("本地发送端口号无效必须是有效的正整数1-65535")
# 解析目标IP地址
dest_ip = socket.gethostbyname(dest_host)
src_port = int(local_port) if local_port else 12345 # 使用用户输入的本地端口或默认端口
dest_port = int(dest_port)
# ICMP报文需要添加 RTT 计算
if packet_type == "ICMP":
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
packet = construct_icmp()
# 记录发送时间
start_time = time.time()
# 发送ICMP报文
sock.sendto(packet, (dest_ip, 0))
# 接收响应
sock.settimeout(2) # 设置超时为2秒
try:
sock.recvfrom(1024)
end_time = time.time()
rtt = (end_time - start_time) * 1000 # 毫秒
result.set(f"ICMP请求成功RTT: {rtt:.2f} 毫秒")
except socket.timeout:
result.set("ICMP请求超时")
finally:
sock.close()
return # 结束函数
# 对于其他报文类型UDP, TCP, DNS, IP
if packet_type == "UDP":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', src_port)) # 绑定本地发送端口
packet = construct_udp(src_port, dest_port)
elif packet_type == "TCP":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', src_port)) # 绑定本地发送端口
sock.connect((dest_ip, dest_port))
packet = b"Hello, TCP!" # TCP 是面向连接的,这里发送简单的字符串
sock.sendall(packet)
sock.close()
result.set(f"TCP 连接成功,数据已发送!")
return # 结束函数
elif packet_type == "DNS":
dns_server = '8.8.8.8' # Google Public DNS
port = 53
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', src_port)) # 绑定本地发送端口
query_packet = build_dns_query(dest_host)
sock.sendto(query_packet, (dns_server, port))
response, _ = sock.recvfrom(512)
ip = parse_dns_response(response)
if ip:
result.set(f"DNS解析成功IP地址: {ip}")
else:
result.set("DNS解析失败")
sock.close()
return # 结束函数
elif packet_type == "IP": # 添加IP报文类型处理
# 构造IP头部
payload = b"Hello, IP!" # 示例负载数据
ip_header = construct_ip_header(src_ip, dest_ip, len(payload), socket.IPPROTO_RAW)
packet = ip_header + payload
# 创建原始套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 告诉内核IP头部已包含在数据包中
sock.bind(('', src_port)) # 绑定本地发送端口
# 发送IP报文
sock.sendto(packet, (dest_ip, 0))
result.set(f"IP报文发送成功")
sock.close()
return # 结束函数
# 发送报文
sock.sendto(packet, (dest_ip, dest_port))
result.set(f"报文发送成功!类型: {packet_type}")
except socket.gaierror:
messagebox.showerror("网络错误", "无法解析目标地址,请检查输入的 IP 或域名是否正确。")
except socket.error as e:
messagebox.showerror("网络错误", f"网络错误: {e.strerror}")
except ValueError as e:
messagebox.showerror("输入错误", f"输入无效: {e}")
except Exception as e:
messagebox.showerror("未知错误", f"发送报文时发生未知错误: {e}")
# GUI 界面设计
window = tk.Tk()
window.title("网络通信软件")
# 获取本机信息
hostname, mac, local_ips = get_local_info()
tk.Label(window, text="本机信息:").grid(row=0, column=0)
info_text = tk.Text(window, height=4, width=40)
info_text.grid(row=0, column=1, columnspan=2)
info_text.insert(tk.END, f"主机名: {hostname}\nMAC 地址: {mac}\n可用 IP: {', '.join(local_ips)}")
info_text.config(state=tk.DISABLED)
# 配置IP地址选择
tk.Label(window, text="本机 IP 地址:").grid(row=1, column=0)
ip_combobox = ttk.Combobox(window, values=local_ips)
ip_combobox.grid(row=1, column=1)
ip_combobox.current(0)
# 配置本地发送端口输入框
tk.Label(window, text="本地发送端口:").grid(row=1, column=2)
local_port_entry = tk.Entry(window)
local_port_entry.grid(row=1, column=3)
# 配置目标地址输入框
tk.Label(window, text="目标地址:").grid(row=2, column=0)
dest_entry = tk.Entry(window)
dest_entry.grid(row=2, column=1)
# 配置目标端口输入框
tk.Label(window, text="目标端口:").grid(row=3, column=0)
port_entry = tk.Entry(window)
port_entry.grid(row=3, column=1)
# 配置报文类型选择
tk.Label(window, text="报文类型:").grid(row=4, column=0)
var = tk.StringVar(value="ICMP")
tk.Radiobutton(window, text="ICMP", variable=var, value="ICMP").grid(row=4, column=1)
tk.Radiobutton(window, text="UDP", variable=var, value="UDP").grid(row=4, column=2)
tk.Radiobutton(window, text="TCP", variable=var, value="TCP").grid(row=4, column=3)
tk.Radiobutton(window, text="DNS", variable=var, value="DNS").grid(row=4, column=4)
tk.Radiobutton(window, text="IP", variable=var, value="IP").grid(row=4, column=5) # 添加IP选项
# 配置结果显示
result = tk.StringVar()
tk.Label(window, textvariable=result).grid(row=6, column=0, columnspan=3)
# 发送报文按钮
tk.Button(window, text="发送报文", command=send_packet).grid(row=5, column=1)
# 启动GUI主循环
window.mainloop()