Every developer has called socket.getaddrinfo() or requests.get() and trusted that DNS resolution "just works." But what actually happens between your application and port 53? What bytes go on the wire, and what comes back?
The best way I know to truly understand the DNS protocol is to implement it. In this tutorial, I walk through building a DNS resolver from scratch in Python using only the standard library — socket for sending UDP datagrams and struct for packing and unpacking binary data. No dnspython, no third-party packages. Just raw bytes and RFC 1035.
If you want a conceptual overview of DNS resolution before diving into code, I recommend reading How DNS Queries Work first. This article assumes you understand what a recursive resolver does and what record types like A, MX, and CNAME mean.
DNS Packet Structure Overview
Every DNS message — whether it is a query or a response — follows the same binary format defined in RFC 1035. The structure looks like this:
+---------------------+
| Header | 12 bytes (fixed)
+---------------------+
| Question | Variable length
+---------------------+
| Answer | Variable length (response only)
+---------------------+
| Authority | Variable length (response only)
+---------------------+
| Additional | Variable length (response only)
+---------------------+
The header is always 12 bytes and contains a transaction ID, flags, and four count fields (one for each section). The question section specifies the domain and record type being queried. The remaining sections contain resource records and only appear in responses.
For our resolver, we need to build the header and question section (the query), send it over UDP to a DNS server on port 53, and then parse everything that comes back.
Encoding a Domain Name
DNS does not transmit domain names as plain text strings. Instead, RFC 1035 specifies a length-prefixed label encoding. Each label (the part between dots) is preceded by a single byte indicating its length, and the entire name is terminated with a null byte (0x00).
For example, example.com becomes:
\x07example\x03com\x00
^ ^ ^
7 bytes 3 bytes end of name
Here is the encoding function:
def encode_domain_name(domain: str) -> bytes:
parts = domain.strip(".").split(".")
result = b""
for part in parts:
result += bytes([len(part)]) + part.encode()
result += b"\x00"
return result
Each label can be at most 63 bytes (the length field is 6 bits — the top 2 bits are reserved for compression, which I cover later). The total domain name including length bytes and terminator cannot exceed 255 bytes.
Building the Query Packet
A DNS query packet consists of the 12-byte header followed by a single question entry. Here are the header fields we need to set:
| Field | Size | Value for our query |
|---|---|---|
| ID | 2 bytes | Random transaction ID |
| Flags | 2 bytes | 0x0100 (RD=1, recursion desired) |
| QDCOUNT | 2 bytes | 1 (one question) |
| ANCOUNT | 2 bytes | 0 |
| NSCOUNT | 2 bytes | 0 |
| ARCOUNT | 2 bytes | 0 |
The question entry appends the encoded domain name, the query type (2 bytes), and the query class (2 bytes, always 1 for Internet):
import struct
import os
# DNS record type constants
RECORD_TYPES = {
"A": 1,
"NS": 2,
"CNAME": 5,
"SOA": 6,
"MX": 15,
"TXT": 16,
"AAAA": 28,
}
def build_query(domain: str, record_type: int = 1) -> tuple[bytes, int]:
transaction_id = int.from_bytes(os.urandom(2), "big")
flags = 0x0100 # RD=1 (recursion desired)
header = struct.pack("!HHHHHH", transaction_id, flags, 1, 0, 0, 0)
question = encode_domain_name(domain) + struct.pack("!HH", record_type, 1)
return header + question, transaction_id
The ! in the format string tells struct.pack to use network byte order (big-endian), and H is an unsigned 16-bit integer. The six H fields map directly to the six header fields in order.
The random transaction ID is critical — it is how we match a response to the query we sent. In a production resolver, failing to verify this ID opens you up to cache poisoning attacks.
Sending the Query
DNS queries use UDP on port 53. We create a datagram socket, send the packet, and wait for a response:
import socket
def send_query(packet: bytes, server: str = "8.8.8.8", timeout: float = 5.0) -> bytes:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
sock.sendto(packet, (server, 53))
response, _ = sock.recvfrom(4096)
return response
finally:
sock.close()
A few things to note:
- 4096 bytes is more than enough for a typical DNS response. Standard UDP DNS messages are limited to 512 bytes unless EDNS0 is negotiated, but many servers will send larger responses anyway.
- Timeout is set to 5 seconds. DNS is usually fast — under 100ms for most queries. If you do not get a response within 5 seconds, something is wrong.
- We use
AF_INETfor IPv4. To query over IPv6, you would useAF_INET6.
Parsing the Response Header
The response starts with the same 12-byte header structure. We unpack it and extract the important flag bits:
def parse_header(response: bytes) -> dict:
fields = struct.unpack("!HHHHHH", response[:12])
return {
"id": fields[0],
"flags": fields[1],
"qr": (fields[1] >> 15) & 1, # 1 = response
"aa": (fields[1] >> 10) & 1, # authoritative answer
"tc": (fields[1] >> 9) & 1, # truncated
"rcode": fields[1] & 0xF, # response code
"qdcount": fields[2], # questions
"ancount": fields[3], # answers
"nscount": fields[4], # authority records
"arcount": fields[5], # additional records
}
The flags field packs multiple values into 16 bits. The most important ones for us:
- QR (bit 15):
0for query,1for response. If this is not1, something went wrong. - AA (bit 10):
1if the responding server is authoritative for the queried domain. - TC (bit 9):
1if the response was truncated because it exceeded the UDP message size. A production resolver would retry over TCP when this happens. - RCODE (bits 0-3):
0means success (NOERROR),3means the domain does not exist (NXDOMAIN),2means server failure (SERVFAIL).
Decoding Domain Names (Name Compression)
This is the hardest part of implementing a DNS parser. DNS responses use name compression to avoid repeating the same domain name multiple times in a single packet. Instead of writing example.com again, the server inserts a pointer — a 2-byte value that references an earlier position in the packet where that name (or a suffix of it) already appears.
Here is how it works:
- If the top 2 bits of a length byte are
00, it is a normal label (the remaining 6 bits are the label length). - If the top 2 bits are
11, it is a compression pointer. The remaining 14 bits form an offset into the packet where the name continues. - A null byte (
0x00) terminates the name.
The tricky part is that pointers can point to other pointers, and the offset we need to return to the caller (for parsing the next field) is different from where the pointer sends us. I track this with a jumped flag:
def decode_domain_name(response: bytes, offset: int) -> tuple[str, int]:
labels = []
jumped = False
max_offset = offset
while True:
if offset >= len(response):
break
length = response[offset]
if length == 0:
if not jumped:
max_offset = offset + 1
break
if (length & 0xC0) == 0xC0: # Compression pointer
if not jumped:
max_offset = offset + 2
pointer = struct.unpack("!H", response[offset:offset + 2])[0] & 0x3FFF
offset = pointer
jumped = True
else:
offset += 1
labels.append(response[offset:offset + length].decode())
offset += length
return ".".join(labels), max_offset
When we encounter a pointer, we save the current position plus 2 (the size of the pointer) as the "real" next offset, then jump to wherever the pointer says. Any labels we find at the pointer destination get appended to the same list. If we never jump, the next offset is simply one past the null terminator.
This compression scheme is why you cannot parse DNS packets by splitting on dots or using simple string operations. You have to work with the raw bytes and follow pointers.
Parsing Resource Records
Each answer record (and authority/additional record) has this structure:
| Field | Size | Description |
|---|---|---|
| NAME | Variable | Domain name (possibly compressed) |
| TYPE | 2 bytes | Record type (A=1, AAAA=28, etc.) |
| CLASS | 2 bytes | Usually 1 (IN = Internet) |
| TTL | 4 bytes | Time to live in seconds |
| RDLENGTH | 2 bytes | Length of the RDATA field |
| RDATA | RDLENGTH | Record data (format depends on TYPE) |
The parsing function reads the name, unpacks the fixed fields, and then interprets RDATA based on the record type:
def parse_record(response: bytes, offset: int) -> tuple[dict, int]:
name, offset = decode_domain_name(response, offset)
rtype, rclass, ttl, rdlength = struct.unpack("!HHIH", response[offset:offset + 10])
rdata_offset = offset + 10
if rtype == 1: # A record — 4 bytes IPv4 address
value = ".".join(str(b) for b in response[rdata_offset:rdata_offset + rdlength])
elif rtype == 28: # AAAA record — 16 bytes IPv6 address
raw = response[rdata_offset:rdata_offset + rdlength]
value = ":".join(f"{raw[i]:02x}{raw[i+1]:02x}" for i in range(0, 16, 2))
elif rtype in (2, 5): # NS or CNAME — compressed domain name
value, _ = decode_domain_name(response, rdata_offset)
elif rtype == 15: # MX — 2-byte priority + compressed domain name
priority = struct.unpack("!H", response[rdata_offset:rdata_offset + 2])[0]
exchange, _ = decode_domain_name(response, rdata_offset + 2)
value = f"{priority} {exchange}"
elif rtype == 16: # TXT — one or more length-prefixed strings
txt_len = response[rdata_offset]
value = response[rdata_offset + 1:rdata_offset + 1 + txt_len].decode(errors="replace")
else:
value = response[rdata_offset:rdata_offset + rdlength].hex()
return {
"name": name,
"type": rtype,
"class": rclass,
"ttl": ttl,
"value": value,
}, rdata_offset + rdlength
A few important details:
- A records are 4 raw bytes representing each octet of an IPv4 address.
- AAAA records are 16 raw bytes for the full IPv6 address.
- CNAME and NS records contain a compressed domain name, so we call
decode_domain_nameagain pointing into the RDATA. - MX records have a 2-byte preference value before the mail exchange domain name.
- TXT records use yet another encoding — each TXT string is preceded by a length byte (not null-terminated). A single TXT record can contain multiple strings, but for simplicity I parse only the first one here.
- For any record type we do not explicitly handle, we fall back to a hex representation of the raw data.
Note how the return offset is rdata_offset + rdlength — we skip past the entire RDATA section regardless of how we parsed it. This ensures we are always positioned correctly for the next record.
Putting It All Together
Now I can combine everything into a single resolve() function that builds a query, sends it, and parses the response:
RECORD_TYPE_NAMES = {v: k for k, v in RECORD_TYPES.items()}
def resolve(domain: str, record_type: str = "A", server: str = "8.8.8.8") -> list[dict]:
type_num = RECORD_TYPES.get(record_type.upper(), 1)
packet, txn_id = build_query(domain, type_num)
response = send_query(packet, server)
header = parse_header(response)
# Verify the response matches our query
if header["id"] != txn_id:
raise ValueError("Transaction ID mismatch — possible spoofing attempt")
if header["rcode"] == 3:
raise ValueError(f"NXDOMAIN: {domain} does not exist")
if header["rcode"] != 0:
raise ValueError(f"DNS error: RCODE {header['rcode']}")
# Skip the question section (it is echoed back in the response)
offset = 12
for _ in range(header["qdcount"]):
_, offset = decode_domain_name(response, offset)
offset += 4 # Skip QTYPE (2 bytes) + QCLASS (2 bytes)
# Parse all answer records
records = []
for _ in range(header["ancount"]):
record, offset = parse_record(response, offset)
record["type_name"] = RECORD_TYPE_NAMES.get(record["type"], str(record["type"]))
records.append(record)
return records
The flow is straightforward:
- Build the query packet with the domain and record type.
- Send it over UDP and get the response bytes.
- Parse the header and check for errors.
- Skip the question section (DNS responses echo the question back).
- Parse each answer record.
Testing It
Save everything in a single file (e.g., dns_resolver.py) and add a test block:
if __name__ == "__main__":
print("--- A record for google.com ---")
for record in resolve("google.com", "A"):
print(f" {record['name']} {record['ttl']} {record['type_name']} {record['value']}")
print("\n--- MX records for gmail.com ---")
for record in resolve("gmail.com", "MX"):
print(f" {record['name']} {record['ttl']} {record['type_name']} {record['value']}")
print("\n--- NS records for example.com ---")
for record in resolve("example.com", "NS"):
print(f" {record['name']} {record['ttl']} {record['type_name']} {record['value']}")
print("\n--- TXT records for google.com ---")
for record in resolve("google.com", "TXT"):
print(f" {record['name']} {record['ttl']} {record['type_name']} {record['value']}")
Running it produces output like this:
--- A record for google.com ---
google.com 236 A 142.250.80.46
--- MX records for gmail.com ---
gmail.com 3600 MX 5 gmail-smtp-in.l.google.com
gmail.com 3600 MX 10 alt1.gmail-smtp-in.l.google.com
gmail.com 3600 MX 20 alt2.gmail-smtp-in.l.google.com
gmail.com 3600 MX 30 alt3.gmail-smtp-in.l.google.com
gmail.com 3600 MX 40 alt4.gmail-smtp-in.l.google.com
--- NS records for example.com ---
example.com 7199 NS a.iana-servers.net
example.com 7199 NS b.iana-servers.net
--- TXT records for google.com ---
google.com 3600 TXT v=spf1 include:_spf.google.com ~all
You just resolved real DNS records using raw UDP and binary packet construction. No libraries, no abstractions — just the protocol as defined in RFC 1035.
Compare your results against the DNS Inspector to verify that your resolver is returning the correct records.
Error Handling Improvements
The basic resolver works, but a production-quality implementation needs more robust error handling. Here are the most important additions:
def resolve_safe(domain: str, record_type: str = "A", server: str = "8.8.8.8") -> list[dict]:
try:
records = resolve(domain, record_type, server)
return records
except socket.timeout:
raise TimeoutError(f"DNS query timed out after 5s (server: {server})")
except socket.gaierror as e:
raise ConnectionError(f"Cannot reach DNS server {server}: {e}")
except struct.error as e:
raise ValueError(f"Malformed DNS response — could not unpack binary data: {e}")
except IndexError:
raise ValueError("Malformed DNS response — packet shorter than expected")
In practice, you should also handle:
- Truncated responses (TC flag set): The response was too large for UDP. Retry the same query over TCP on port 53. TCP DNS prepends a 2-byte length field before the DNS message.
- Multiple questions: Some servers (rarely) include more than one question in the response. Our parser handles this via the
qdcountloop. - Authority and additional sections: We only parse the answer section. The authority section contains NS records for the zone, and the additional section often contains the IP addresses of those nameservers (glue records). Parsing these uses the exact same
parse_recordfunction.
What Is Missing from Production Use
My resolver demonstrates the core protocol, but it is missing several features that real resolvers implement:
- TCP fallback: When a UDP response is truncated (TC=1), a real resolver retries over TCP. DNS over TCP uses the same packet format but prepends a 2-byte message length.
- EDNS0 (RFC 6891): Modern resolvers include an OPT pseudo-record in the additional section to advertise a larger UDP buffer size (typically 4096 bytes) and support for extensions like DNSSEC.
- DNSSEC validation: Our resolver does not validate DNSSEC signatures. A validating resolver would check RRSIG, DNSKEY, and DS records to verify the chain of trust from the root zone. See What Is DNSSEC? for the full explanation.
- Caching: A real resolver caches responses for the duration of the TTL value. Without caching, every call to
resolve()makes a fresh network request. - Concurrent queries: Production resolvers send A and AAAA queries in parallel (or use the
ANYquery type) and handle multiple in-flight requests. - Iterative resolution: Our resolver sends recursive queries (RD=1) to a resolver like 8.8.8.8. A fully iterative resolver would start at the root servers and follow the delegation chain itself.
For production Python applications, I recommend using dnspython instead — it handles all of these concerns. See my guide on DNS Lookups in Python with dnspython for practical examples.
Verifying Your Results
Once your resolver is working, you will want to verify its output against established tools:
- Use the DNS Inspector to query any domain and compare the records your resolver returns against what authoritative servers report.
- Use the Propagation Checker to see how a domain resolves across 30+ global DNS servers — useful for understanding why your resolver might get different answers depending on which upstream server you query.
- Use
digfrom the command line for a quick comparison. My dig command guide covers the syntax in detail.
If your resolver returns different results from dig, the most common cause is that you are querying a different server. Google's 8.8.8.8 may return different cached values than Cloudflare's 1.1.1.1 because their caches are independent. This is normal and expected.
The Complete Code
Here is the entire resolver in a single copy-pasteable file:
"""
Minimal DNS resolver — no dependencies, just socket + struct.
Supports A, AAAA, NS, CNAME, MX, TXT, and SOA queries.
"""
import os
import socket
import struct
RECORD_TYPES = {
"A": 1, "NS": 2, "CNAME": 5, "SOA": 6,
"MX": 15, "TXT": 16, "AAAA": 28,
}
RECORD_TYPE_NAMES = {v: k for k, v in RECORD_TYPES.items()}
def encode_domain_name(domain: str) -> bytes:
parts = domain.strip(".").split(".")
result = b""
for part in parts:
result += bytes([len(part)]) + part.encode()
result += b"\x00"
return result
def build_query(domain: str, record_type: int = 1) -> tuple[bytes, int]:
transaction_id = int.from_bytes(os.urandom(2), "big")
flags = 0x0100 # RD=1
header = struct.pack("!HHHHHH", transaction_id, flags, 1, 0, 0, 0)
question = encode_domain_name(domain) + struct.pack("!HH", record_type, 1)
return header + question, transaction_id
def send_query(packet: bytes, server: str = "8.8.8.8", timeout: float = 5.0) -> bytes:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
sock.sendto(packet, (server, 53))
response, _ = sock.recvfrom(4096)
return response
finally:
sock.close()
def parse_header(response: bytes) -> dict:
fields = struct.unpack("!HHHHHH", response[:12])
return {
"id": fields[0],
"flags": fields[1],
"qr": (fields[1] >> 15) & 1,
"aa": (fields[1] >> 10) & 1,
"tc": (fields[1] >> 9) & 1,
"rcode": fields[1] & 0xF,
"qdcount": fields[2],
"ancount": fields[3],
"nscount": fields[4],
"arcount": fields[5],
}
def decode_domain_name(response: bytes, offset: int) -> tuple[str, int]:
labels = []
jumped = False
max_offset = offset
while True:
if offset >= len(response):
break
length = response[offset]
if length == 0:
if not jumped:
max_offset = offset + 1
break
if (length & 0xC0) == 0xC0:
if not jumped:
max_offset = offset + 2
pointer = struct.unpack("!H", response[offset:offset + 2])[0] & 0x3FFF
offset = pointer
jumped = True
else:
offset += 1
labels.append(response[offset:offset + length].decode())
offset += length
return ".".join(labels), max_offset
def parse_record(response: bytes, offset: int) -> tuple[dict, int]:
name, offset = decode_domain_name(response, offset)
rtype, rclass, ttl, rdlength = struct.unpack("!HHIH", response[offset:offset + 10])
rdata_offset = offset + 10
if rtype == 1: # A
value = ".".join(str(b) for b in response[rdata_offset:rdata_offset + rdlength])
elif rtype == 28: # AAAA
raw = response[rdata_offset:rdata_offset + rdlength]
value = ":".join(f"{raw[i]:02x}{raw[i+1]:02x}" for i in range(0, 16, 2))
elif rtype in (2, 5): # NS, CNAME
value, _ = decode_domain_name(response, rdata_offset)
elif rtype == 15: # MX
priority = struct.unpack("!H", response[rdata_offset:rdata_offset + 2])[0]
exchange, _ = decode_domain_name(response, rdata_offset + 2)
value = f"{priority} {exchange}"
elif rtype == 16: # TXT
txt_len = response[rdata_offset]
value = response[rdata_offset + 1:rdata_offset + 1 + txt_len].decode(errors="replace")
else:
value = response[rdata_offset:rdata_offset + rdlength].hex()
return {
"name": name,
"type": rtype,
"class": rclass,
"ttl": ttl,
"value": value,
}, rdata_offset + rdlength
def resolve(domain: str, record_type: str = "A", server: str = "8.8.8.8") -> list[dict]:
type_num = RECORD_TYPES.get(record_type.upper(), 1)
packet, txn_id = build_query(domain, type_num)
response = send_query(packet, server)
header = parse_header(response)
if header["id"] != txn_id:
raise ValueError("Transaction ID mismatch")
if header["rcode"] == 3:
raise ValueError(f"NXDOMAIN: {domain} does not exist")
if header["rcode"] != 0:
raise ValueError(f"DNS error: RCODE {header['rcode']}")
offset = 12
for _ in range(header["qdcount"]):
_, offset = decode_domain_name(response, offset)
offset += 4
records = []
for _ in range(header["ancount"]):
record, offset = parse_record(response, offset)
record["type_name"] = RECORD_TYPE_NAMES.get(record["type"], str(record["type"]))
records.append(record)
return records
if __name__ == "__main__":
print("--- A record for google.com ---")
for r in resolve("google.com", "A"):
print(f" {r['name']} {r['ttl']} {r['type_name']} {r['value']}")
print("\n--- MX records for gmail.com ---")
for r in resolve("gmail.com", "MX"):
print(f" {r['name']} {r['ttl']} {r['type_name']} {r['value']}")
print("\n--- NS records for example.com ---")
for r in resolve("example.com", "NS"):
print(f" {r['name']} {r['ttl']} {r['type_name']} {r['value']}")
print("\n--- TXT records for google.com ---")
for r in resolve("google.com", "TXT"):
print(f" {r['name']} {r['ttl']} {r['type_name']} {r['value']}")
