1
0
mirror of https://github.com/pathes/fakedns.git synced 2026-01-22 17:55:58 +01:00
Files
fakedns/fakedns.py
2014-08-24 00:14:13 +02:00

170 lines
6.1 KiB
Python

#!/usr/bin/env python3
# (c) 2014 Patryk Hes
import socketserver
import sys
DNS_HEADER_LENGTH = 12
# TODO make some DNS database with IPs connected to regexs
IP = '192.168.1.1'
class DNSHandler(socketserver.BaseRequestHandler):
def handle(self):
socket = self.request[1]
data = self.request[0].strip()
# If request doesn't even contain full header, don't respond.
if len(data) < DNS_HEADER_LENGTH:
return
# Try to read questions - if they're invalid, don't respond.
try:
all_questions = self.dns_extract_questions(data)
except IndexError:
return
# Filter only those questions, which have QTYPE=A and QCLASS=IN
# TODO this is very limiting, remove QTYPE filter in future, handle different QTYPEs
accepted_questions = []
for question in all_questions:
name = str(b'.'.join(question['name']), encoding='UTF-8')
if question['qtype'] == b'\x00\x01' and question['qclass'] == b'\x00\x01':
accepted_questions.append(question)
print('\033[32m{}\033[39m'.format(name))
else:
print('\033[31m{}\033[39m'.format(name))
response = (
self.dns_response_header(data) +
self.dns_response_questions(accepted_questions) +
self.dns_response_answers(accepted_questions)
)
socket.sendto(response, self.client_address)
def dns_extract_questions(self, data):
"""
Extracts question section from DNS request data.
See http://tools.ietf.org/html/rfc1035 4.1.2. Question section format.
"""
questions = []
# Get number of questions from header's QDCOUNT
n = (data[4] << 8) + data[5]
# Where we actually read in data? Start at beginning of question sections.
pointer = DNS_HEADER_LENGTH
# Read each question section
for i in range(n):
question = {
'name': [],
'qtype': '',
'qclass': '',
}
length = data[pointer]
# Read each label from QNAME part
while length != 0:
start = pointer + 1
end = pointer + length + 1
question['name'].append(data[start:end])
pointer += length + 1
length = data[pointer]
# Read QTYPE
question['qtype'] = data[pointer+1:pointer+3]
# Read QCLASS
question['qclass'] = data[pointer+3:pointer+5]
# Move pointer 5 octets further (zero length octet, QTYPE, QNAME)
pointer += 5
questions.append(question)
return questions
def dns_response_header(self, data):
"""
Generates DNS response header.
See http://tools.ietf.org/html/rfc1035 4.1.1. Header section format.
"""
header = b''
# ID - copy it from request
header += data[:2]
# QR 1 response
# OPCODE 0000 standard query
# AA 0 not authoritative
# TC 0 not truncated
# RD 0 recursion not desired
# RA 0 recursion not available
# Z 000 unused
# RCODE 0000 no error condition
header += b'\x80\x00'
# QDCOUNT - question entries count, set to QDCOUNT from request
header += data[4:6]
# ANCOUNT - answer records count, set to QDCOUNT from request
header += data[4:6]
# NSCOUNT - authority records count, set to 0
header += b'\x00\x00'
# ARCOUNT - additional records count, set to 0
header += b'\x00\x00'
return header
def dns_response_questions(self, questions):
"""
Generates DNS response questions.
See http://tools.ietf.org/html/rfc1035 4.1.2. Question section format.
"""
sections = b''
for question in questions:
section = b''
for label in question['name']:
# Length octet
section += bytes([len(label)])
section += label
# Zero length octet
section += b'\x00'
section += question['qtype']
section += question['qclass']
sections += section
return sections
def dns_response_answers(self, questions):
"""
Generates DNS response answers.
See http://tools.ietf.org/html/rfc1035 4.1.3. Resource record format.
"""
records = b''
for question in questions:
record = b''
for label in question['name']:
# Length octet
record += bytes([len(label)])
record += label
# Zero length octet
record += b'\x00'
# TYPE - just copy QTYPE
# TODO QTYPE values set is superset of TYPE values set, handle different QTYPEs, see RFC 1035 3.2.3.
record += question['qtype']
# CLASS - just copy QCLASS
# TODO QCLASS values set is superset of CLASS values set, handle at least * QCLASS, see RFC 1035 3.2.5.
record += question['qclass']
# TTL - 32 bit unsigned integer. Set to 0 to inform, that response
# should not be cached.
record += b'\x00\x00\x00\x00'
# RDLENGTH - 16 bit unsigned integer, length of RDATA field.
# In case of QTYPE=A and QCLASS=IN, RDLENGTH=4.
record += b'\x00\x04'
# RDATA - in case of QTYPE=A and QCLASS=IN, it's IPv4 address.
record += b''.join(map(
lambda x: bytes([int(x)]),
IP.split('.')
))
records += record
return records
if __name__ == '__main__':
# Minimal configuration - allow to pass IP in configuration
if len(sys.argv) > 1:
IP = sys.argv[1]
host, port = '', 53
server = socketserver.ThreadingUDPServer((host, port), DNSHandler)
print('\033[36mStarted DNS server.\033[39m')
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
sys.exit(0)