Python-web-communicate/main.py

443 lines
15 KiB
Python
Raw Normal View History

2024-12-08 15:15:32 +00:00
import tkinter as tk
2025-01-06 06:44:58 +00:00
from tkinter import ttk, messagebox
2024-12-08 15:15:32 +00:00
import socket
import struct
import time
2025-01-06 06:44:58 +00:00
import uuid
import re
import random
2024-12-08 15:15:32 +00:00
2025-01-06 06:44:58 +00:00
# 获取本机信息
def get_local_info():
"""
获取本机的主机名MAC地址和IP地址列表
Returns:
tuple: 包含主机名MAC地址和IP地址列表的元组
"""
hostname = socket.gethostname() # 获取主机名
local_ips = socket.gethostbyname_ex(hostname)[-1] # 获取本机的IP地址列表
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())) # 获取MAC地址
2025-01-06 06:44:58 +00:00
return hostname, mac, local_ips
# 验证 IP 地址或域名
def validate_ip_or_domain(value):
"""
验证输入值是否为有效的IP地址或域名
Args:
value (str): 待验证的IP地址或域名
Returns:
bool: 如果输入值有效返回True否则返回False
"""
ip_pattern = re.compile(r'^\d{1,3}(\.\d{1,3}){3}$') # 匹配IPv4地址的正则
domain_pattern = re.compile(r'^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$') # 匹配域名的正则
2025-01-06 06:44:58 +00:00
return ip_pattern.match(value) or domain_pattern.match(value)
# 验证端口号
def validate_port(port):
"""
验证输入的端口号是否有效
Args:
port (str): 待验证的端口号
Returns:
bool: 如果端口号有效返回True否则返回False
"""
2025-01-06 06:44:58 +00:00
return port.isdigit() and 1 <= int(port) <= 65535
# 构造 IP 头部
def construct_ip_header(src_ip, dest_ip, payload_length, protocol):
"""
构造IP头部
Args:
src_ip (str): 源IP地址
dest_ip (str): 目标IP地址
payload_length (int): 负载长度
protocol (int): 协议号
Returns:
bytes: 构造的IP头部
"""
# IP头部的版本号IPv4为4
2025-01-06 06:44:58 +00:00
version = 4
# IP头部长度IPv4头部长度为5个32位字节即20字节
2025-01-06 06:44:58 +00:00
ihl = 5
# 服务类型,表示优先权及吞吐量、延迟等特性
2025-01-06 06:44:58 +00:00
tos = 0
# 总长度即IP头部加数据部分的长度
total_length = 20 + payload_length # IP头部20字节 + 数据部分
# 标识符,用于唯一标识主机发送的每一份数据包
2025-01-06 06:44:58 +00:00
identification = 54321
# 标志位与片偏移,用于指示数据包的分割与重组
2025-01-06 06:44:58 +00:00
flags_offset = 0
# 生存时间,表示数据包可以经过的路由器数量
2025-01-06 06:44:58 +00:00
ttl = 64
# 首部校验和,用于错误检测
2025-01-06 06:44:58 +00:00
header_checksum = 0
# 将源IP转换为二进制格式
src_ip_bytes = socket.inet_aton(src_ip)
# 将目标IP转换为二进制格式
dest_ip_bytes = socket.inet_aton(dest_ip)
# 打包IP头部信息转换为二进制格式
2025-01-06 06:44:58 +00:00
ip_header = struct.pack('!BBHHHBBH4s4s',
(version << 4) + ihl, tos, total_length,
identification, flags_offset, ttl, protocol,
header_checksum, src_ip_bytes, dest_ip_bytes)
# 返回构造的IP头部
2025-01-06 06:44:58 +00:00
return ip_header
2025-01-06 06:28:59 +00:00
2024-12-08 15:15:32 +00:00
# 构造 ICMP 报文
def construct_icmp():
"""
构造ICMP报文
Returns:
bytes: 构造的ICMP报文
"""
# ICMP 报文头部: 类型(8), 代码(0), 校验和(0), 标识符(1), 序列号(1)
# 一开始校验和设为0因为之后需要先计算校验和
2024-12-08 15:15:32 +00:00
icmp_header = struct.pack('!BBHHH', 8, 0, 0, 1, 1)
# ICMP 报文的数据部分,这里简单地使用 'Ping' 作为示例数据
2025-01-06 06:28:59 +00:00
data = b'Ping'
# 计算校验和,确保报文的完整性
2025-01-06 06:28:59 +00:00
checksum = calculate_checksum(icmp_header + data)
# 填充校验和字段,完成报文头部的构造
2024-12-08 15:15:32 +00:00
icmp_header = struct.pack('!BBHHH', 8, 0, checksum, 1, 1)
# 返回完整的ICMP报文
2025-01-06 06:28:59 +00:00
return icmp_header + data
2024-12-08 15:15:32 +00:00
# 构造 UDP 报文
def construct_udp(src_port, dest_port):
"""
构造UDP报文
Args:
src_port (int): 源端口号
dest_port (int): 目标端口号
Returns:
bytes: 构造的UDP报文
"""
# UDP头部格式: 源端口号 (2 字节), 目标端口号 (2 字节), 长度 (2 字节), 校验和 (2 字节)
# 长度固定为8因为UDP头部长度为8字节校验和暂时为0稍后由IP层计算并填充
udp_header = struct.pack('!HHHH', src_port, dest_port, 8, 0)
2024-12-08 15:15:32 +00:00
return udp_header
2025-01-06 06:44:58 +00:00
# 构造 TCP 报文SYN
def construct_tcp(src_port, dest_port):
"""
构造TCP报文SYN
Args:
src_port (int): 源端口号
dest_port (int): 目标端口号
Returns:
bytes: 构造的TCP报文
"""
# 初始化TCP标头字段
seq = 0 # 初始序列号
ack_seq = 0 # 确认序列号
offset = 5 # TCP头部的长度单位是32位字5 * 4 = 20字节
reserved = 0 # 保留字段默认为0
flags = 0b000010 # SYN标志位设置为1其他标志为0
window = socket.htons(5840) # 窗口大小使用network字节序
checksum = 0 # 校验和暂时为0
urgent_ptr = 0 # 紧急指针默认为0
# 打包TCP报文头部
2025-01-06 06:44:58 +00:00
tcp_header = struct.pack('!HHLLBBHHH',
src_port, dest_port, seq, ack_seq,
(offset << 4) + reserved, flags, window,
checksum, urgent_ptr)
return tcp_header
2024-12-08 15:15:32 +00:00
2025-01-06 06:44:58 +00:00
# 计算校验和
def calculate_checksum(data):
"""
计算校验和
Args:
data (bytes): 待计算校验和的数据
Returns:
int: 计算得到的校验和
"""
# 初始化校验和变量
2025-01-06 06:44:58 +00:00
checksum = 0
# 获取数据长度
2025-01-06 06:44:58 +00:00
n = len(data)
# 按 16 位块划分
2025-01-06 06:44:58 +00:00
for i in range(0, n - 1, 2):
# 将每块数据转换为 16 位整数
2025-01-06 06:44:58 +00:00
chunk = (data[i] << 8) + data[i + 1]
# 累加每块数据到校验和
2025-01-06 06:44:58 +00:00
checksum += chunk
# 如果长度为奇数,补零
2025-01-06 06:44:58 +00:00
if n % 2 == 1:
# 只取最后一个字节,并将其置于 16 位整数的高字节位置
2025-01-06 06:44:58 +00:00
checksum += data[-1] << 8
# 将 32 位总和折叠为 16 位
2025-01-06 06:44:58 +00:00
checksum = (checksum >> 16) + (checksum & 0xFFFF)
checksum += (checksum >> 16)
# 对总和取反
2025-01-06 06:44:58 +00:00
return ~checksum & 0xFFFF
2024-12-08 15:15:32 +00:00
# 构造 DNS 查询报文
def build_dns_query(domain):
"""
构建DNS查询报文
Args:
domain (str): 需要查询的域名
Returns:
bytes: 构建的DNS查询报文
"""
# DNS请求报文头部
transaction_id = random.randint(0, 65535) # 随机生成事务ID
flags = 0x0100 # 标志字段(标准查询请求)
questions = 1 # 查询问题数
answer_rrs = 0 # 回答资源记录数
authority_rrs = 0 # 权威记录数
additional_rrs = 0 # 附加记录数
# 将DNS报文头部各个字段打包成二进制数据
header = struct.pack('>HHHHHH', transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs)
# 域名部分(查询问题)
domain_parts = domain.split('.')
query = b''
for part in domain_parts:
# 每个域名部分前添加其长度,并将域名部分转换为字节串
query += struct.pack('B', len(part)) + part.encode()
query += b'\0' # 域名结束符
# 查询类型 (A记录: 1)
query_type = 1
# 查询类 (IN: 1)
query_class = 1
# 将查询类型和查询类打包到查询报文中
# 完整的查询报文由头部、域名部分和查询类型、查询类组成
query_packet = header + query + struct.pack('>HH', query_type, query_class)
return query_packet
# 解析 DNS 响应报文
def parse_dns_response(response):
"""
解析DNS响应报文提取IP地址
Args:
response: response -- DNS响应报文字节流
Returns:
String: IP地址字符串如果未找到A记录则返回None
"""
# 解析DNS响应报文的头部信息
# 响应头部包含事务ID、标志、问题数、答案数等
transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs = struct.unpack('>HHHHHH', response[:12])
# 初始化偏移量以解析响应报文的其余部分
offset = 12
# 跳过问题部分,找到第一个答案记录的起始位置
while response[offset] != 0:
length = response[offset]
offset += length + 1
offset += 5 # 跳过结束符和查询类型、查询类部分
# 解析回答部分寻找A记录
# 答案数量
for _ in range(answer_rrs):
# 跳过名字部分不需要因为已经知道是A记录
offset += 2 # 跳过指针
# 解析答案的类型、类、TTL和数据长度
answer_type, answer_class, ttl, data_len = struct.unpack('>HHIH', response[offset:offset+10])
offset += 10
if answer_type == 1: # A记录
# 获取并返回IP地址
ip = socket.inet_ntoa(response[offset:offset+4])
return ip
offset += data_len # 跳过数据部分
# 如果没有找到A记录返回None
return None
2024-12-08 15:15:32 +00:00
# 发送报文
def send_packet():
"""
发送报文
"""
2024-12-08 15:15:32 +00:00
try:
# 获取源IP、目标主机和目标端口
2025-01-06 06:44:58 +00:00
src_ip = ip_combobox.get()
dest_host = dest_entry.get()
dest_port = port_entry.get()
2024-12-08 15:15:32 +00:00
packet_type = var.get()
local_port = local_port_entry.get() # 获取本地发送端口
2024-12-08 15:15:32 +00:00
2025-01-06 06:44:58 +00:00
# 验证目标地址
if not validate_ip_or_domain(dest_host):
raise ValueError("目标地址无效,必须为有效的 IP 或域名")
# 验证端口号
if not validate_port(dest_port):
raise ValueError("端口号无效必须是有效的正整数1-65535")
2024-12-08 15:15:32 +00:00
# 验证本地发送端口号
if local_port and not validate_port(local_port):
raise ValueError("本地发送端口号无效必须是有效的正整数1-65535")
# 解析目标IP地址
2025-01-06 06:44:58 +00:00
dest_ip = socket.gethostbyname(dest_host)
src_port = int(local_port) if local_port else 12345 # 使用用户输入的本地端口或默认端口
2025-01-06 06:44:58 +00:00
dest_port = int(dest_port)
# ICMP报文需要添加 RTT 计算
2024-12-08 15:15:32 +00:00
if packet_type == "ICMP":
2025-01-06 06:44:58 +00:00
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
2024-12-08 15:15:32 +00:00
packet = construct_icmp()
# 记录发送时间
start_time = time.time()
# 发送ICMP报文
sock.sendto(packet, (dest_ip, 0))
# 接收响应
sock.settimeout(2) # 设置超时为2秒
try:
sock.recvfrom(1024)
end_time = time.time()
rtt = (end_time - start_time) * 1000 # 毫秒
result.set(f"ICMP请求成功RTT: {rtt:.2f} 毫秒")
except socket.timeout:
result.set("ICMP请求超时")
finally:
sock.close()
return # 结束函数
# 对于其他报文类型UDP, TCP, DNS, IP
if packet_type == "UDP":
2025-01-06 06:44:58 +00:00
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', src_port)) # 绑定本地发送端口
2025-01-06 06:44:58 +00:00
packet = construct_udp(src_port, dest_port)
elif packet_type == "TCP":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', src_port)) # 绑定本地发送端口
2025-01-06 06:44:58 +00:00
sock.connect((dest_ip, dest_port))
packet = b"Hello, TCP!" # TCP 是面向连接的,这里发送简单的字符串
sock.sendall(packet)
sock.close()
result.set(f"TCP 连接成功,数据已发送!")
return # 结束函数
elif packet_type == "DNS":
dns_server = '8.8.8.8' # Google Public DNS
port = 53
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', src_port)) # 绑定本地发送端口
query_packet = build_dns_query(dest_host)
sock.sendto(query_packet, (dns_server, port))
response, _ = sock.recvfrom(512)
ip = parse_dns_response(response)
if ip:
result.set(f"DNS解析成功IP地址: {ip}")
else:
result.set("DNS解析失败")
sock.close()
return # 结束函数
elif packet_type == "IP": # 添加IP报文类型处理
# 构造IP头部
payload = b"Hello, IP!" # 示例负载数据
ip_header = construct_ip_header(src_ip, dest_ip, len(payload), socket.IPPROTO_RAW)
packet = ip_header + payload
# 创建原始套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 告诉内核IP头部已包含在数据包中
sock.bind(('', src_port)) # 绑定本地发送端口
# 发送IP报文
sock.sendto(packet, (dest_ip, 0))
result.set(f"IP报文发送成功")
sock.close()
return # 结束函数
2025-01-06 06:44:58 +00:00
# 发送报文
sock.sendto(packet, (dest_ip, dest_port))
result.set(f"报文发送成功!类型: {packet_type}")
except socket.gaierror:
messagebox.showerror("网络错误", "无法解析目标地址,请检查输入的 IP 或域名是否正确。")
except socket.error as e:
messagebox.showerror("网络错误", f"网络错误: {e.strerror}")
except ValueError as e:
messagebox.showerror("输入错误", f"输入无效: {e}")
2024-12-08 15:15:32 +00:00
except Exception as e:
2025-01-06 06:44:58 +00:00
messagebox.showerror("未知错误", f"发送报文时发生未知错误: {e}")
2024-12-08 15:15:32 +00:00
# GUI 界面设计
window = tk.Tk()
window.title("网络通信软件")
# 获取本机信息
2025-01-06 06:44:58 +00:00
hostname, mac, local_ips = get_local_info()
tk.Label(window, text="本机信息:").grid(row=0, column=0)
info_text = tk.Text(window, height=4, width=40)
info_text.grid(row=0, column=1, columnspan=2)
info_text.insert(tk.END, f"主机名: {hostname}\nMAC 地址: {mac}\n可用 IP: {', '.join(local_ips)}")
info_text.config(state=tk.DISABLED)
# 配置IP地址选择
2025-01-06 06:44:58 +00:00
tk.Label(window, text="本机 IP 地址:").grid(row=1, column=0)
ip_combobox = ttk.Combobox(window, values=local_ips)
ip_combobox.grid(row=1, column=1)
ip_combobox.current(0)
# 配置本地发送端口输入框
tk.Label(window, text="本地发送端口:").grid(row=1, column=2)
local_port_entry = tk.Entry(window)
local_port_entry.grid(row=1, column=3)
# 配置目标地址输入框
2025-01-06 06:44:58 +00:00
tk.Label(window, text="目标地址:").grid(row=2, column=0)
dest_entry = tk.Entry(window)
dest_entry.grid(row=2, column=1)
2024-12-08 15:15:32 +00:00
# 配置目标端口输入框
2025-01-06 06:44:58 +00:00
tk.Label(window, text="目标端口:").grid(row=3, column=0)
2024-12-08 15:15:32 +00:00
port_entry = tk.Entry(window)
2025-01-06 06:44:58 +00:00
port_entry.grid(row=3, column=1)
2024-12-08 15:15:32 +00:00
# 配置报文类型选择
2025-01-06 06:44:58 +00:00
tk.Label(window, text="报文类型:").grid(row=4, column=0)
2024-12-08 15:15:32 +00:00
var = tk.StringVar(value="ICMP")
2025-01-06 06:44:58 +00:00
tk.Radiobutton(window, text="ICMP", variable=var, value="ICMP").grid(row=4, column=1)
tk.Radiobutton(window, text="UDP", variable=var, value="UDP").grid(row=4, column=2)
tk.Radiobutton(window, text="TCP", variable=var, value="TCP").grid(row=4, column=3)
tk.Radiobutton(window, text="DNS", variable=var, value="DNS").grid(row=4, column=4)
tk.Radiobutton(window, text="IP", variable=var, value="IP").grid(row=4, column=5) # 添加IP选项
2024-12-08 15:15:32 +00:00
# 配置结果显示
2024-12-08 15:15:32 +00:00
result = tk.StringVar()
2025-01-06 06:44:58 +00:00
tk.Label(window, textvariable=result).grid(row=6, column=0, columnspan=3)
2024-12-08 15:15:32 +00:00
# 发送报文按钮
2025-01-06 06:44:58 +00:00
tk.Button(window, text="发送报文", command=send_packet).grid(row=5, column=1)
# 启动GUI主循环
2024-12-08 15:15:32 +00:00
window.mainloop()