-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrootDnsServer.py
More file actions
88 lines (82 loc) · 3.32 KB
/
rootDnsServer.py
File metadata and controls
88 lines (82 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import socket
import dns
import dns.resolver
import dns.query
import dns.name
import dns.message
import warnings
from helpers import LOCAL_HOST, ROOT_SERVER_PORT, BUFFER_SIZE
from helpers import splitInput
from helpers import getInput
from helpers import customPrint
from helpers import displayMessages
def findOutTld(userInput, localNameServer):
returnMessage = []
defaultResolver = dns.resolver.get_default_resolver()
converToDomainName = dns.name.from_text(userInput)
numberOfWords = 1
rootInput = getInput(userInput, numberOfWords)
customPrint("Customized rootInput", rootInput)
message = f"I dont know the address of \"{userInput}\" but I know the address of \"{rootInput}\""
returnMessage.append(message)
returnMessage.append(
f"Looking up \"{rootInput}\" on \"{localNameServer}\"")
query = dns.message.make_query(rootInput, dns.rdatatype.NS)
response = dns.query.udp(query, localNameServer)
responseCode = response.rcode()
if responseCode != dns.rcode.NOERROR:
if responseCode == dns.rcode.NXDOMAIN:
returnMessage.append(f"\"{rootInput}\" does not exist.")
returnMessage.append("Please enter a legible domain")
return returnMessage
else:
returnMessage.append("Please enter a legible domain.")
return returnMessage
resourceRecordSet = None
if len(response.authority) > 0:
resourceRecordSet = response.authority[0]
else:
resourceRecordSet = response.answer[0]
resourceRecord = resourceRecordSet[0]
if resourceRecord.rdtype == dns.rdatatype.SOA:
returnMessage.append(
f"Same server is TLD Server for \"{rootInput}\"")
else:
tldServerName = resourceRecord.target
returnMessage.append(
f"\"{tldServerName}\" is TLD Server for \"{rootInput}\"")
warnings.filterwarnings("ignore", category=DeprecationWarning)
ipAddressofTld = defaultResolver.query(
tldServerName).rrset[0].to_text()
returnMessage.append(
f"IP Address of \"{tldServerName}\" is \"{ipAddressofTld}\"")
returnMessage.append(str(tldServerName))
returnMessage.append(ipAddressofTld)
return returnMessage
def rootDnsServer():
rootDnsServerSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
rootDnsServerSocket.bind((LOCAL_HOST, ROOT_SERVER_PORT))
print(f"Root Server is up and running at port:{ROOT_SERVER_PORT}")
while True:
localNameServer, _ = rootDnsServerSocket.recvfrom(BUFFER_SIZE)
localNameServer = localNameServer.decode()
clientMessage, clientAddress = rootDnsServerSocket.recvfrom(
BUFFER_SIZE)
userInput = clientMessage.decode()
print(f"Talking to Client at Address:{clientAddress}")
print(f"Client Message:{userInput}")
result = findOutTld(userInput, localNameServer)
displayMessages(result)
serverMessage = str(result).encode()
rootDnsServerSocket.sendto(serverMessage, clientAddress)
except KeyboardInterrupt:
print("\nStopping the server")
print("........")
print("........")
print("You Stopped the server")
exit()
except:
print("Something went wrong")
exit()
rootDnsServer()