Skip to content

Talking to the game

Zandronum’s RCON listens on UDP. The protocol has one wrinkle and one gift:

  • The wrinkle: every datagram is Huffman-coded with a fixed, protocol-defined tree.
  • The gift: a datagram whose first byte is 0xFF is passed through unencoded. So your adapter can always send 0xFF + raw bytes and only needs the Huffman decoder for server replies — no encoder.
  1. Send CLRC_BEGINCONNECTION (52) with protocol version 4.
  2. Server replies SVRC_SALT (34) with a 32-character salt.
  3. Send CLRC_PASSWORD (53) with the lowercase hex md5(salt + password).
  4. Server replies SVRC_LOGGEDIN (35).

After that, each command is one datagram: CLRC_COMMAND (54) + the console command + a NUL byte. Send CLRC_PONG (55) every few seconds — the server drops clients silent for 40.

Here it is, minus the Huffman table (grab HUFFMAN_TREE and _parse_tree from the reference Pack’s adapter.py — it’s a 511-byte constant, not something to retype):

import hashlib, socket, threading, time
CLRC_BEGINCONNECTION, CLRC_PASSWORD, CLRC_COMMAND, CLRC_PONG = 52, 53, 54, 55
SVRC_OLDPROTOCOL, SVRC_BANNED, SVRC_SALT = 32, 33, 34
SVRC_LOGGEDIN, SVRC_INVALIDPASSWORD = 35, 36
RCON_PROTOCOL_VERSION = 4
def huffman_decode(packet):
if not packet:
return b""
if packet[0] == 0xFF: # unencoded passthrough
return packet[1:]
bits_available = (len(packet) - 1) * 8 - packet[0]
out, node = bytearray(), _HUFF_ROOT
for byte in packet[1:]:
for bitpos in range(8): # wire bytes carry bits in reversed order
if bits_available <= 0:
return bytes(out)
node = node[(byte >> bitpos) & 1]
if isinstance(node, int):
out.append(node)
node = _HUFF_ROOT
bits_available -= 1
return bytes(out)
def raw(payload): # the 0xFF escape: never encode outbound
return b"\xff" + payload
class RconError(Exception):
pass
class RconClient:
def __init__(self, host, port, password):
self.addr, self.password = (host, port), password
self.sock, self.lock = None, threading.Lock()
threading.Thread(target=self._pong_loop, daemon=True).start()
def _read_packet(self, sock, want, timeout=3.0):
deadline = time.monotonic() + timeout
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise RconError(f"timed out waiting for message {want}")
sock.settimeout(remaining)
data = huffman_decode(sock.recv(4096))
if data and data[0] == want:
return data[1:]
if data and data[0] in (SVRC_OLDPROTOCOL, SVRC_BANNED,
SVRC_INVALIDPASSWORD):
raise RconError(f"server rejected connection: {data[0]}")
def _connect(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(self.addr)
try:
sock.send(raw(bytes([CLRC_BEGINCONNECTION, RCON_PROTOCOL_VERSION])))
salt = self._read_packet(sock, SVRC_SALT).split(b"\0", 1)[0]
digest = hashlib.md5(salt + self.password.encode()).hexdigest()
sock.send(raw(bytes([CLRC_PASSWORD]) + digest.encode() + b"\0"))
self._read_packet(sock, SVRC_LOGGEDIN)
except Exception:
sock.close()
raise
self.sock = sock
def command(self, cmd):
with self.lock:
try:
if self.sock is None:
self._connect()
self.sock.send(raw(bytes([CLRC_COMMAND]) + cmd.encode() + b"\0"))
except (OSError, RconError) as e:
if self.sock is not None:
self.sock.close()
self.sock = None
raise RconError(f"rcon command failed: {e}") from e
def _pong_loop(self):
while True:
time.sleep(5)
with self.lock:
if self.sock is None:
continue
try:
self.sock.send(raw(bytes([CLRC_PONG])))
except OSError:
self.sock.close()
self.sock = None

Note the shape of command(): it connects lazily and tears down on any failure. If the game isn’t running, the adapter doesn’t crash — the command raises RconError, and the next command tries a fresh connection. This matters for the Pack contract: the game dying mid-stream must degrade into failed Invocations, not a dead adapter.

Next: wire an Invocation to this client.