feat(main): 添加 DNS 查询功能- 实现了构建 DNS 查询报文和解析响应报文的逻辑
- 在发送报文函数中添加了对 DNS 查询的支持 - 在界面上增加了 DNS 选项供用户选择
This commit is contained in:
parent
8d385e54e2
commit
bb61cd787c
75
main.py
75
main.py
@ -5,6 +5,7 @@ import struct
|
||||
import time
|
||||
import uuid
|
||||
import re
|
||||
import random
|
||||
|
||||
# 获取本机信息
|
||||
def get_local_info():
|
||||
@ -171,6 +172,63 @@ def calculate_checksum(data):
|
||||
# 对总和取反
|
||||
return ~checksum & 0xFFFF
|
||||
|
||||
# 构造 DNS 查询报文
|
||||
def build_dns_query(domain):
|
||||
# DNS请求报文头部
|
||||
transaction_id = random.randint(0, 65535) # 随机生成事务ID
|
||||
flags = 0x0100 # 标志字段(标准查询请求)
|
||||
questions = 1 # 查询问题数
|
||||
answer_rrs = 0 # 回答资源记录数
|
||||
authority_rrs = 0 # 权威记录数
|
||||
additional_rrs = 0 # 附加记录数
|
||||
|
||||
# DNS报文头部
|
||||
header = struct.pack('>HHHHHH', transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs)
|
||||
|
||||
# 域名部分(查询问题)
|
||||
domain_parts = domain.split('.')
|
||||
query = b''
|
||||
for part in domain_parts:
|
||||
query += struct.pack('B', len(part)) + part.encode()
|
||||
query += b'\0' # 域名结束符
|
||||
|
||||
# 查询类型 (A记录: 1)
|
||||
query_type = 1
|
||||
# 查询类 (IN: 1)
|
||||
query_class = 1
|
||||
|
||||
# 完整的查询报文
|
||||
query_packet = header + query + struct.pack('>HH', query_type, query_class)
|
||||
|
||||
return query_packet
|
||||
|
||||
# 解析 DNS 响应报文
|
||||
def parse_dns_response(response):
|
||||
# 解析DNS响应报文,提取IP地址
|
||||
# 响应头部:包含事务ID、标志、问题数、答案数等
|
||||
transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs = struct.unpack('>HHHHHH', response[:12])
|
||||
|
||||
# 跳过问题部分
|
||||
offset = 12
|
||||
while response[offset] != 0:
|
||||
length = response[offset]
|
||||
offset += length + 1
|
||||
offset += 5 # 跳过结束符和查询类型、查询类部分
|
||||
|
||||
# 解析回答部分
|
||||
# 答案数量
|
||||
for _ in range(answer_rrs):
|
||||
# 跳过名字部分(不需要,因为已经知道是A记录)
|
||||
offset += 2 # 跳过指针
|
||||
answer_type, answer_class, ttl, data_len = struct.unpack('>HHIH', response[offset:offset+10])
|
||||
offset += 10
|
||||
if answer_type == 1: # A记录
|
||||
ip = socket.inet_ntoa(response[offset:offset+4]) # 获取IP地址
|
||||
return ip
|
||||
offset += data_len # 跳过数据部分
|
||||
|
||||
return None
|
||||
|
||||
# 发送报文
|
||||
def send_packet():
|
||||
"""
|
||||
@ -219,7 +277,7 @@ def send_packet():
|
||||
|
||||
return # 结束函数
|
||||
|
||||
# 对于其他报文类型(UDP, TCP)
|
||||
# 对于其他报文类型(UDP, TCP, DNS)
|
||||
if packet_type == "UDP":
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
packet = construct_udp(src_port, dest_port)
|
||||
@ -231,6 +289,20 @@ def send_packet():
|
||||
sock.close()
|
||||
result.set(f"TCP 连接成功,数据已发送!")
|
||||
return # 结束函数
|
||||
elif packet_type == "DNS":
|
||||
dns_server = '8.8.8.8' # Google Public DNS
|
||||
port = 53
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
query_packet = build_dns_query(dest_host)
|
||||
sock.sendto(query_packet, (dns_server, port))
|
||||
response, _ = sock.recvfrom(512)
|
||||
ip = parse_dns_response(response)
|
||||
if ip:
|
||||
result.set(f"DNS解析成功,IP地址: {ip}")
|
||||
else:
|
||||
result.set("DNS解析失败")
|
||||
sock.close()
|
||||
return # 结束函数
|
||||
|
||||
# 发送报文
|
||||
sock.sendto(packet, (dest_ip, dest_port))
|
||||
@ -279,6 +351,7 @@ var = tk.StringVar(value="ICMP")
|
||||
tk.Radiobutton(window, text="ICMP", variable=var, value="ICMP").grid(row=4, column=1)
|
||||
tk.Radiobutton(window, text="UDP", variable=var, value="UDP").grid(row=4, column=2)
|
||||
tk.Radiobutton(window, text="TCP", variable=var, value="TCP").grid(row=4, column=3)
|
||||
tk.Radiobutton(window, text="DNS", variable=var, value="DNS").grid(row=4, column=4) # 添加DNS选项
|
||||
|
||||
# 配置结果显示
|
||||
result = tk.StringVar()
|
||||
|
Loading…
Reference in New Issue
Block a user