Skip to main content
DNS Checker(beta)
10 min read

Build a DNS Resolver from Scratch in Python

Ishan Karunaratne

Ishan Karunaratne

Software Architect & Infrastructure Engineer

Every developer has called socket.getaddrinfo() or requests.get() and trusted that DNS resolution "just works." But what actually happens between your application and port 53? What bytes go on the wire, and what comes back?

The best way I know to truly understand the DNS protocol is to implement it. In this tutorial, I walk through building a DNS resolver from scratch in Python using only the standard library — socket for sending UDP datagrams and struct for packing and unpacking binary data. No dnspython, no third-party packages. Just raw bytes and RFC 1035.

If you want a conceptual overview of DNS resolution before diving into code, I recommend reading How DNS Queries Work first. This article assumes you understand what a recursive resolver does and what record types like A, MX, and CNAME mean.

DNS Packet Structure Overview

Every DNS message — whether it is a query or a response — follows the same binary format defined in RFC 1035. The structure looks like this:

+---------------------+
|        Header       |  12 bytes (fixed)
+---------------------+
|       Question      |  Variable length
+---------------------+
|        Answer       |  Variable length (response only)
+---------------------+
|      Authority      |  Variable length (response only)
+---------------------+
|      Additional     |  Variable length (response only)
+---------------------+

The header is always 12 bytes and contains a transaction ID, flags, and four count fields (one for each section). The question section specifies the domain and record type being queried. The remaining sections contain resource records and only appear in responses.

For our resolver, we need to build the header and question section (the query), send it over UDP to a DNS server on port 53, and then parse everything that comes back.

Encoding a Domain Name

DNS does not transmit domain names as plain text strings. Instead, RFC 1035 specifies a length-prefixed label encoding. Each label (the part between dots) is preceded by a single byte indicating its length, and the entire name is terminated with a null byte (0x00).

For example, example.com becomes:

\x07example\x03com\x00
 ^          ^        ^
 7 bytes    3 bytes  end of name

Here is the encoding function:

def encode_domain_name(domain: str) -> bytes:
    parts = domain.strip(".").split(".")
    result = b""
    for part in parts:
        result += bytes([len(part)]) + part.encode()
    result += b"\x00"
    return result

Each label can be at most 63 bytes (the length field is 6 bits — the top 2 bits are reserved for compression, which I cover later). The total domain name including length bytes and terminator cannot exceed 255 bytes.

Building the Query Packet

A DNS query packet consists of the 12-byte header followed by a single question entry. Here are the header fields we need to set:

FieldSizeValue for our query
ID2 bytesRandom transaction ID
Flags2 bytes0x0100 (RD=1, recursion desired)
QDCOUNT2 bytes1 (one question)
ANCOUNT2 bytes0
NSCOUNT2 bytes0
ARCOUNT2 bytes0

The question entry appends the encoded domain name, the query type (2 bytes), and the query class (2 bytes, always 1 for Internet):

import struct
import os

# DNS record type constants
RECORD_TYPES = {
    "A": 1,
    "NS": 2,
    "CNAME": 5,
    "SOA": 6,
    "MX": 15,
    "TXT": 16,
    "AAAA": 28,
}

def build_query(domain: str, record_type: int = 1) -> tuple[bytes, int]:
    transaction_id = int.from_bytes(os.urandom(2), "big")

    flags = 0x0100  # RD=1 (recursion desired)
    header = struct.pack("!HHHHHH", transaction_id, flags, 1, 0, 0, 0)

    question = encode_domain_name(domain) + struct.pack("!HH", record_type, 1)

    return header + question, transaction_id

The ! in the format string tells struct.pack to use network byte order (big-endian), and H is an unsigned 16-bit integer. The six H fields map directly to the six header fields in order.

The random transaction ID is critical — it is how we match a response to the query we sent. In a production resolver, failing to verify this ID opens you up to cache poisoning attacks.

Sending the Query

DNS queries use UDP on port 53. We create a datagram socket, send the packet, and wait for a response:

import socket

def send_query(packet: bytes, server: str = "8.8.8.8", timeout: float = 5.0) -> bytes:
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout)
    try:
        sock.sendto(packet, (server, 53))
        response, _ = sock.recvfrom(4096)
        return response
    finally:
        sock.close()

A few things to note:

  • 4096 bytes is more than enough for a typical DNS response. Standard UDP DNS messages are limited to 512 bytes unless EDNS0 is negotiated, but many servers will send larger responses anyway.
  • Timeout is set to 5 seconds. DNS is usually fast — under 100ms for most queries. If you do not get a response within 5 seconds, something is wrong.
  • We use AF_INET for IPv4. To query over IPv6, you would use AF_INET6.

Parsing the Response Header

The response starts with the same 12-byte header structure. We unpack it and extract the important flag bits:

def parse_header(response: bytes) -> dict:
    fields = struct.unpack("!HHHHHH", response[:12])
    return {
        "id": fields[0],
        "flags": fields[1],
        "qr": (fields[1] >> 15) & 1,       # 1 = response
        "aa": (fields[1] >> 10) & 1,        # authoritative answer
        "tc": (fields[1] >> 9) & 1,         # truncated
        "rcode": fields[1] & 0xF,           # response code
        "qdcount": fields[2],               # questions
        "ancount": fields[3],               # answers
        "nscount": fields[4],               # authority records
        "arcount": fields[5],               # additional records
    }

The flags field packs multiple values into 16 bits. The most important ones for us:

  • QR (bit 15): 0 for query, 1 for response. If this is not 1, something went wrong.
  • AA (bit 10): 1 if the responding server is authoritative for the queried domain.
  • TC (bit 9): 1 if the response was truncated because it exceeded the UDP message size. A production resolver would retry over TCP when this happens.
  • RCODE (bits 0-3): 0 means success (NOERROR), 3 means the domain does not exist (NXDOMAIN), 2 means server failure (SERVFAIL).

Decoding Domain Names (Name Compression)

This is the hardest part of implementing a DNS parser. DNS responses use name compression to avoid repeating the same domain name multiple times in a single packet. Instead of writing example.com again, the server inserts a pointer — a 2-byte value that references an earlier position in the packet where that name (or a suffix of it) already appears.

Here is how it works:

  • If the top 2 bits of a length byte are 00, it is a normal label (the remaining 6 bits are the label length).
  • If the top 2 bits are 11, it is a compression pointer. The remaining 14 bits form an offset into the packet where the name continues.
  • A null byte (0x00) terminates the name.

The tricky part is that pointers can point to other pointers, and the offset we need to return to the caller (for parsing the next field) is different from where the pointer sends us. I track this with a jumped flag:

def decode_domain_name(response: bytes, offset: int) -> tuple[str, int]:
    labels = []
    jumped = False
    max_offset = offset

    while True:
        if offset >= len(response):
            break

        length = response[offset]

        if length == 0:
            if not jumped:
                max_offset = offset + 1
            break

        if (length & 0xC0) == 0xC0:  # Compression pointer
            if not jumped:
                max_offset = offset + 2
            pointer = struct.unpack("!H", response[offset:offset + 2])[0] & 0x3FFF
            offset = pointer
            jumped = True
        else:
            offset += 1
            labels.append(response[offset:offset + length].decode())
            offset += length

    return ".".join(labels), max_offset

When we encounter a pointer, we save the current position plus 2 (the size of the pointer) as the "real" next offset, then jump to wherever the pointer says. Any labels we find at the pointer destination get appended to the same list. If we never jump, the next offset is simply one past the null terminator.

This compression scheme is why you cannot parse DNS packets by splitting on dots or using simple string operations. You have to work with the raw bytes and follow pointers.

Parsing Resource Records

Each answer record (and authority/additional record) has this structure:

FieldSizeDescription
NAMEVariableDomain name (possibly compressed)
TYPE2 bytesRecord type (A=1, AAAA=28, etc.)
CLASS2 bytesUsually 1 (IN = Internet)
TTL4 bytesTime to live in seconds
RDLENGTH2 bytesLength of the RDATA field
RDATARDLENGTHRecord data (format depends on TYPE)

The parsing function reads the name, unpacks the fixed fields, and then interprets RDATA based on the record type:

def parse_record(response: bytes, offset: int) -> tuple[dict, int]:
    name, offset = decode_domain_name(response, offset)
    rtype, rclass, ttl, rdlength = struct.unpack("!HHIH", response[offset:offset + 10])
    rdata_offset = offset + 10

    if rtype == 1:  # A record — 4 bytes IPv4 address
        value = ".".join(str(b) for b in response[rdata_offset:rdata_offset + rdlength])

    elif rtype == 28:  # AAAA record — 16 bytes IPv6 address
        raw = response[rdata_offset:rdata_offset + rdlength]
        value = ":".join(f"{raw[i]:02x}{raw[i+1]:02x}" for i in range(0, 16, 2))

    elif rtype in (2, 5):  # NS or CNAME — compressed domain name
        value, _ = decode_domain_name(response, rdata_offset)

    elif rtype == 15:  # MX — 2-byte priority + compressed domain name
        priority = struct.unpack("!H", response[rdata_offset:rdata_offset + 2])[0]
        exchange, _ = decode_domain_name(response, rdata_offset + 2)
        value = f"{priority} {exchange}"

    elif rtype == 16:  # TXT — one or more length-prefixed strings
        txt_len = response[rdata_offset]
        value = response[rdata_offset + 1:rdata_offset + 1 + txt_len].decode(errors="replace")

    else:
        value = response[rdata_offset:rdata_offset + rdlength].hex()

    return {
        "name": name,
        "type": rtype,
        "class": rclass,
        "ttl": ttl,
        "value": value,
    }, rdata_offset + rdlength

A few important details:

  • A records are 4 raw bytes representing each octet of an IPv4 address.
  • AAAA records are 16 raw bytes for the full IPv6 address.
  • CNAME and NS records contain a compressed domain name, so we call decode_domain_name again pointing into the RDATA.
  • MX records have a 2-byte preference value before the mail exchange domain name.
  • TXT records use yet another encoding — each TXT string is preceded by a length byte (not null-terminated). A single TXT record can contain multiple strings, but for simplicity I parse only the first one here.
  • For any record type we do not explicitly handle, we fall back to a hex representation of the raw data.

Note how the return offset is rdata_offset + rdlength — we skip past the entire RDATA section regardless of how we parsed it. This ensures we are always positioned correctly for the next record.

Putting It All Together

Now I can combine everything into a single resolve() function that builds a query, sends it, and parses the response:

RECORD_TYPE_NAMES = {v: k for k, v in RECORD_TYPES.items()}

def resolve(domain: str, record_type: str = "A", server: str = "8.8.8.8") -> list[dict]:
    type_num = RECORD_TYPES.get(record_type.upper(), 1)
    packet, txn_id = build_query(domain, type_num)
    response = send_query(packet, server)

    header = parse_header(response)

    # Verify the response matches our query
    if header["id"] != txn_id:
        raise ValueError("Transaction ID mismatch — possible spoofing attempt")
    if header["rcode"] == 3:
        raise ValueError(f"NXDOMAIN: {domain} does not exist")
    if header["rcode"] != 0:
        raise ValueError(f"DNS error: RCODE {header['rcode']}")

    # Skip the question section (it is echoed back in the response)
    offset = 12
    for _ in range(header["qdcount"]):
        _, offset = decode_domain_name(response, offset)
        offset += 4  # Skip QTYPE (2 bytes) + QCLASS (2 bytes)

    # Parse all answer records
    records = []
    for _ in range(header["ancount"]):
        record, offset = parse_record(response, offset)
        record["type_name"] = RECORD_TYPE_NAMES.get(record["type"], str(record["type"]))
        records.append(record)

    return records

The flow is straightforward:

  1. Build the query packet with the domain and record type.
  2. Send it over UDP and get the response bytes.
  3. Parse the header and check for errors.
  4. Skip the question section (DNS responses echo the question back).
  5. Parse each answer record.

Testing It

Save everything in a single file (e.g., dns_resolver.py) and add a test block:

if __name__ == "__main__":
    print("--- A record for google.com ---")
    for record in resolve("google.com", "A"):
        print(f"  {record['name']}  {record['ttl']}  {record['type_name']}  {record['value']}")

    print("\n--- MX records for gmail.com ---")
    for record in resolve("gmail.com", "MX"):
        print(f"  {record['name']}  {record['ttl']}  {record['type_name']}  {record['value']}")

    print("\n--- NS records for example.com ---")
    for record in resolve("example.com", "NS"):
        print(f"  {record['name']}  {record['ttl']}  {record['type_name']}  {record['value']}")

    print("\n--- TXT records for google.com ---")
    for record in resolve("google.com", "TXT"):
        print(f"  {record['name']}  {record['ttl']}  {record['type_name']}  {record['value']}")

Running it produces output like this:

--- A record for google.com ---
  google.com  236  A  142.250.80.46

--- MX records for gmail.com ---
  gmail.com  3600  MX  5 gmail-smtp-in.l.google.com
  gmail.com  3600  MX  10 alt1.gmail-smtp-in.l.google.com
  gmail.com  3600  MX  20 alt2.gmail-smtp-in.l.google.com
  gmail.com  3600  MX  30 alt3.gmail-smtp-in.l.google.com
  gmail.com  3600  MX  40 alt4.gmail-smtp-in.l.google.com

--- NS records for example.com ---
  example.com  7199  NS  a.iana-servers.net
  example.com  7199  NS  b.iana-servers.net

--- TXT records for google.com ---
  google.com  3600  TXT  v=spf1 include:_spf.google.com ~all

You just resolved real DNS records using raw UDP and binary packet construction. No libraries, no abstractions — just the protocol as defined in RFC 1035.

Compare your results against the DNS Inspector to verify that your resolver is returning the correct records.

Error Handling Improvements

The basic resolver works, but a production-quality implementation needs more robust error handling. Here are the most important additions:

def resolve_safe(domain: str, record_type: str = "A", server: str = "8.8.8.8") -> list[dict]:
    try:
        records = resolve(domain, record_type, server)
        return records
    except socket.timeout:
        raise TimeoutError(f"DNS query timed out after 5s (server: {server})")
    except socket.gaierror as e:
        raise ConnectionError(f"Cannot reach DNS server {server}: {e}")
    except struct.error as e:
        raise ValueError(f"Malformed DNS response — could not unpack binary data: {e}")
    except IndexError:
        raise ValueError("Malformed DNS response — packet shorter than expected")

In practice, you should also handle:

  • Truncated responses (TC flag set): The response was too large for UDP. Retry the same query over TCP on port 53. TCP DNS prepends a 2-byte length field before the DNS message.
  • Multiple questions: Some servers (rarely) include more than one question in the response. Our parser handles this via the qdcount loop.
  • Authority and additional sections: We only parse the answer section. The authority section contains NS records for the zone, and the additional section often contains the IP addresses of those nameservers (glue records). Parsing these uses the exact same parse_record function.

What Is Missing from Production Use

My resolver demonstrates the core protocol, but it is missing several features that real resolvers implement:

  • TCP fallback: When a UDP response is truncated (TC=1), a real resolver retries over TCP. DNS over TCP uses the same packet format but prepends a 2-byte message length.
  • EDNS0 (RFC 6891): Modern resolvers include an OPT pseudo-record in the additional section to advertise a larger UDP buffer size (typically 4096 bytes) and support for extensions like DNSSEC.
  • DNSSEC validation: Our resolver does not validate DNSSEC signatures. A validating resolver would check RRSIG, DNSKEY, and DS records to verify the chain of trust from the root zone. See What Is DNSSEC? for the full explanation.
  • Caching: A real resolver caches responses for the duration of the TTL value. Without caching, every call to resolve() makes a fresh network request.
  • Concurrent queries: Production resolvers send A and AAAA queries in parallel (or use the ANY query type) and handle multiple in-flight requests.
  • Iterative resolution: Our resolver sends recursive queries (RD=1) to a resolver like 8.8.8.8. A fully iterative resolver would start at the root servers and follow the delegation chain itself.

For production Python applications, I recommend using dnspython instead — it handles all of these concerns. See my guide on DNS Lookups in Python with dnspython for practical examples.

Verifying Your Results

Once your resolver is working, you will want to verify its output against established tools:

  • Use the DNS Inspector to query any domain and compare the records your resolver returns against what authoritative servers report.
  • Use the Propagation Checker to see how a domain resolves across 30+ global DNS servers — useful for understanding why your resolver might get different answers depending on which upstream server you query.
  • Use dig from the command line for a quick comparison. My dig command guide covers the syntax in detail.

If your resolver returns different results from dig, the most common cause is that you are querying a different server. Google's 8.8.8.8 may return different cached values than Cloudflare's 1.1.1.1 because their caches are independent. This is normal and expected.

The Complete Code

Here is the entire resolver in a single copy-pasteable file:

"""
Minimal DNS resolver — no dependencies, just socket + struct.
Supports A, AAAA, NS, CNAME, MX, TXT, and SOA queries.
"""

import os
import socket
import struct

RECORD_TYPES = {
    "A": 1, "NS": 2, "CNAME": 5, "SOA": 6,
    "MX": 15, "TXT": 16, "AAAA": 28,
}
RECORD_TYPE_NAMES = {v: k for k, v in RECORD_TYPES.items()}


def encode_domain_name(domain: str) -> bytes:
    parts = domain.strip(".").split(".")
    result = b""
    for part in parts:
        result += bytes([len(part)]) + part.encode()
    result += b"\x00"
    return result


def build_query(domain: str, record_type: int = 1) -> tuple[bytes, int]:
    transaction_id = int.from_bytes(os.urandom(2), "big")
    flags = 0x0100  # RD=1
    header = struct.pack("!HHHHHH", transaction_id, flags, 1, 0, 0, 0)
    question = encode_domain_name(domain) + struct.pack("!HH", record_type, 1)
    return header + question, transaction_id


def send_query(packet: bytes, server: str = "8.8.8.8", timeout: float = 5.0) -> bytes:
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout)
    try:
        sock.sendto(packet, (server, 53))
        response, _ = sock.recvfrom(4096)
        return response
    finally:
        sock.close()


def parse_header(response: bytes) -> dict:
    fields = struct.unpack("!HHHHHH", response[:12])
    return {
        "id": fields[0],
        "flags": fields[1],
        "qr": (fields[1] >> 15) & 1,
        "aa": (fields[1] >> 10) & 1,
        "tc": (fields[1] >> 9) & 1,
        "rcode": fields[1] & 0xF,
        "qdcount": fields[2],
        "ancount": fields[3],
        "nscount": fields[4],
        "arcount": fields[5],
    }


def decode_domain_name(response: bytes, offset: int) -> tuple[str, int]:
    labels = []
    jumped = False
    max_offset = offset

    while True:
        if offset >= len(response):
            break

        length = response[offset]

        if length == 0:
            if not jumped:
                max_offset = offset + 1
            break

        if (length & 0xC0) == 0xC0:
            if not jumped:
                max_offset = offset + 2
            pointer = struct.unpack("!H", response[offset:offset + 2])[0] & 0x3FFF
            offset = pointer
            jumped = True
        else:
            offset += 1
            labels.append(response[offset:offset + length].decode())
            offset += length

    return ".".join(labels), max_offset


def parse_record(response: bytes, offset: int) -> tuple[dict, int]:
    name, offset = decode_domain_name(response, offset)
    rtype, rclass, ttl, rdlength = struct.unpack("!HHIH", response[offset:offset + 10])
    rdata_offset = offset + 10

    if rtype == 1:  # A
        value = ".".join(str(b) for b in response[rdata_offset:rdata_offset + rdlength])

    elif rtype == 28:  # AAAA
        raw = response[rdata_offset:rdata_offset + rdlength]
        value = ":".join(f"{raw[i]:02x}{raw[i+1]:02x}" for i in range(0, 16, 2))

    elif rtype in (2, 5):  # NS, CNAME
        value, _ = decode_domain_name(response, rdata_offset)

    elif rtype == 15:  # MX
        priority = struct.unpack("!H", response[rdata_offset:rdata_offset + 2])[0]
        exchange, _ = decode_domain_name(response, rdata_offset + 2)
        value = f"{priority} {exchange}"

    elif rtype == 16:  # TXT
        txt_len = response[rdata_offset]
        value = response[rdata_offset + 1:rdata_offset + 1 + txt_len].decode(errors="replace")

    else:
        value = response[rdata_offset:rdata_offset + rdlength].hex()

    return {
        "name": name,
        "type": rtype,
        "class": rclass,
        "ttl": ttl,
        "value": value,
    }, rdata_offset + rdlength


def resolve(domain: str, record_type: str = "A", server: str = "8.8.8.8") -> list[dict]:
    type_num = RECORD_TYPES.get(record_type.upper(), 1)
    packet, txn_id = build_query(domain, type_num)
    response = send_query(packet, server)

    header = parse_header(response)

    if header["id"] != txn_id:
        raise ValueError("Transaction ID mismatch")
    if header["rcode"] == 3:
        raise ValueError(f"NXDOMAIN: {domain} does not exist")
    if header["rcode"] != 0:
        raise ValueError(f"DNS error: RCODE {header['rcode']}")

    offset = 12
    for _ in range(header["qdcount"]):
        _, offset = decode_domain_name(response, offset)
        offset += 4

    records = []
    for _ in range(header["ancount"]):
        record, offset = parse_record(response, offset)
        record["type_name"] = RECORD_TYPE_NAMES.get(record["type"], str(record["type"]))
        records.append(record)

    return records


if __name__ == "__main__":
    print("--- A record for google.com ---")
    for r in resolve("google.com", "A"):
        print(f"  {r['name']}  {r['ttl']}  {r['type_name']}  {r['value']}")

    print("\n--- MX records for gmail.com ---")
    for r in resolve("gmail.com", "MX"):
        print(f"  {r['name']}  {r['ttl']}  {r['type_name']}  {r['value']}")

    print("\n--- NS records for example.com ---")
    for r in resolve("example.com", "NS"):
        print(f"  {r['name']}  {r['ttl']}  {r['type_name']}  {r['value']}")

    print("\n--- TXT records for google.com ---")
    for r in resolve("google.com", "TXT"):
        print(f"  {r['name']}  {r['ttl']}  {r['type_name']}  {r['value']}")

Frequently Asked Questions

This guide was researched and structured by the author based on hands-on DNS implementation experience, with AI assistance for drafting and code examples.

About the Author

Ishan Karunaratne
Ishan Karunaratne

Software Architect & Infrastructure Engineer

US Army veteran with a B.S. in Information Technology, CompTIA A+, Network+, and Security+ certified. 20+ years building and securing web infrastructure.

B.S. Information Technology — Online SystemsCompTIA A+ (2009)CompTIA Network+ (2009)CompTIA Security+ (2009)US Army Veteran — Operation Iraqi Freedom

Share this article

Related Articles

DNS Lookups in PHP: dns_get_record, gethostbyname, and Beyond

Everything you need for DNS lookups in PHP — from quick gethostbyname() calls to full dns_get_record() queries, checkdnsrr() validation, reverse DNS, and real-world email verification patterns.

Build a DNS Resolver from Scratch in PHP

Implement the DNS protocol in PHP — construct binary query packets with pack(), send raw UDP over sockets to port 53, and parse responses with unpack(). Pure PHP, no extensions required beyond sockets.

DNS Queries in Node.js: dns.lookup vs dns.resolve Explained

The critical difference between dns.lookup() and dns.resolve() that most Node.js tutorials miss — plus complete examples for every record type, custom resolvers, the Promises API, and TypeScript types.

Build a DNS Resolver from Scratch in Node.js

Implement the DNS protocol in JavaScript — construct binary query packets with Buffer, send raw UDP to port 53 with dgram, and parse the responses. No dependencies, just Node.js built-ins.