For programmers of all skill levels, TARPN offers some sample programs to get you started with accessing the node programmatically (with software). Typically, the Python programming language is used on Raspberry Pi, although the Pi OS can deal with virtually ANY programming language. Python has some advantages, but its greatest disadvantage is that the syntax (formatting) not only depends on typing all the commands properlyk but each line must be indented exactly right or the program won't run. This can get frustrating, but today most AI assistants (such as ChatGPT) can check your code and fix syntax errors very reliably.
Don N2IRZ, inspired by Aaron KN4ORB, has rewritten the Python demo program to use the more modern telnetlib3 library instead of the deprecated telnetlib. Simply said, the previous example would no longer run as written. These new samples run under both Bullseye and Bookworm, and should run under Trixie (but this has not been tested).
What These Samples Do
These programs log in to the Node's telnet server, and interact with it in two ways:
Both sample programs start out the same way, by logging in to the telnet server. This is what you do (behind the scenes) when you access the node using TARPN Home or, essentially, when you use QTterm. That means this code can be used to access the node and do things, like any user might. It also shows two examples of interacting with the node, along with storing text in a file on the Pi.
These files are heavily commented to help you better understand the structure and function of the code blocks. Using the internet and perhaps an AI assistant, you can use these as a starting point for your own python node interactive project.
How you would call (run) this on the node is explained in each file's comments. Also note that the telnetlib library might not have been installed on your Pi yet, so included at the bottom of this page are instructions for doing just that.
# -*- coding: utf-8 -*-
import argparse
import asyncio
from functools import partial
import logging
import re
import telnetlib3
import os
# **** nodetelnet_bbs.py by Don Rotolo N2IRZ 18DEC2025 ****
#
# This example demonstrates connecting to a neighbor node and interacting
# with its BBS using LinBPQ's telnet interface.
#
# Usage:
# python3 /path/to/file/nodetelnet_bbs.py
# Example: python3 /home/pi/bpq-extensions/nodetelnet_bbs.py n2irz 1
#
# NOTES:
# * Callsign is entered WITHOUT SSID (this code assumes "-2")
# * Port number must be between 1 and 12
# * All network reads use timeouts to avoid hanging forever
# * BBS output is written to /tmp/tarpn/read_bbs_msgs.txt (overwritten each run)
# ---------------------------------------------------------------------
# Hard-coded IP address of BPQ telnet server
bpq_telnet = "127.0.1.1"
# Timeout (seconds) for all waits on node/BBS responses
READ_TIMEOUT = 20
# Output file location
OUTPUT_DIR = "/tmp/tarpn"
OUTPUT_FILE = f"{OUTPUT_DIR}/read_bbs_msgs.txt"
async def empty_reader(reader):
"""
Drain any pending input so the node is in a known, idle state.
This helps avoid reacting to stale data.
"""
await asyncio.sleep(0.5)
while reader.at_eof() is False and reader._buffer:
await reader.read(1024)
async def read_until(reader, text, timeout=READ_TIMEOUT):
"""
Read data until a specific text string is seen.
Raises TimeoutError if the text is not seen within the timeout.
"""
buf = ""
try:
while text not in buf:
buf += await asyncio.wait_for(reader.read(64), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Timed out waiting for '{text}'")
async def read_until_ci(reader, text, timeout=READ_TIMEOUT):
"""
Read from the telnet session until *text* is seen,
ignoring differences in upper/lower case.
"""
buf = ""
target = text.lower()
try:
while target not in buf.lower():
buf += await asyncio.wait_for(reader.read(64), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(
f"Timed out waiting for '{text}' (case-insensitive)"
)
return buf
async def port_shell(port, neighbor, reader, writer):
"""
This function is called after the telnet connection is established.
It handles login, neighbor connection, BBS access, message capture,
and clean shutdown.
"""
logger = logging.getLogger(f"BBS-{neighbor}")
user_sent = False
pwd_sent = False
connected = False
finished_reading = False
# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Open output file (overwrite each run)
outfile = open(OUTPUT_FILE, "w")
logger.info("Logging into Telnet")
while True:
try:
line = await asyncio.wait_for(
reader.read(64), timeout=READ_TIMEOUT
)
except asyncio.TimeoutError:
logger.error("Timeout waiting for data from node")
writer.write("BYE\r\n")
break
# Username prompt (node callsign prompt ending with ':')
if not user_sent:
if ":" in line:
username = line.split(":")[0]
writer.write(f"{username}\r\n")
user_sent = True
# Password prompt
elif not pwd_sent:
if "p" in line or "type p" in line:
writer.write("p\r\n")
pwd_sent = True
# Connect to neighbor node
if user_sent and pwd_sent and not connected:
await empty_reader(reader)
logger.info(
f"Connecting to neighbor {neighbor}-2 on port {port}"
)
writer.write(f"C {port} {neighbor}-2\r\n") # NOTE the SSID of neighbor is hard-coded as "-2"
try:
await read_until(reader, "Connected")
except TimeoutError as e:
logger.error(e)
writer.write("BYE\r\n")
break
connected = True
logger.info("Connection established")
# Enter BBS
writer.write("BBS\r\n")
try:
# BBS prompt case can be inconsistent, so match case-insensitively
await read_until_ci(reader, f"de {neighbor}>")
except TimeoutError as e:
logger.error(e)
writer.write("BYE\r\n")
break
logger.info("BBS open, requesting message list")
writer.write("L\r\n")
# Read BBS output
elif connected and not finished_reading:
text = line.strip()
# End of BBS list (case-insensitive match)
if text.lower().endswith(
f"de {neighbor}>".lower()
):
finished_reading = True
logger.info("Message list complete")
await asyncio.sleep(1)
logger.info("Sending BYE")
writer.write("BYE\r\n")
break
elif "Invalid command" in text:
logger.info("Invalid command received, aborting")
writer.write("BYE\r\n")
break
else:
# Normal BBS output:
# - Echoed commands (like "L")
# - Message list lines
logger.info(text)
outfile.write(text + "\n")
outfile.close()
async def connect_to_port(port, neighbor):
"""
Establishes the telnet connection and hands control to port_shell().
"""
factory = partial(port_shell, port, neighbor)
reader, writer = await telnetlib3.open_connection(
bpq_telnet, 8010, shell=factory
)
await writer.protocol.waiter_closed
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s -- %(message)s"
)
parser = argparse.ArgumentParser(
description="Example code showing how to connect to a neighbor BBS",
usage="python3 /path/to/nodetelnet_bbs.py "
" "
)
parser.add_argument(
"neighbor",
help="Neighbor callsign (alphanumeric, max 6 characters)"
)
parser.add_argument(
"port",
type=int,
help="Port number (1-12)"
)
args = parser.parse_args()
# Validate neighbor callsign
if not re.fullmatch(r"[A-Za-z0-9]{1,6}", args.neighbor):
parser.error(
"Neighbor callsign must be alphanumeric and no more than 6 characters"
)
# Validate port number
if not (1 <= args.port <= 12):
parser.error("Port must be an integer from 1 to 12")
asyncio.run(connect_to_port(args.port, args.neighbor))
if __name__ == "__main__":
main()
# -*- coding: utf-8 -*-
import argparse
import asyncio
from functools import partial
import logging
import telnetlib3
# **** nodetelnet_unproto.py by Don Rotolo N2IRZ 18DEC2025 ****
#
# This example demonstrates how to:
# * Connect to the LinBPQ node's telnet server
# * Enter UNPROTO (UI frame) mode
# * Send one or more UI text frames out a specific port
# * Use "CQ" as the destination for the UI frame
# * Exit UNPROTO mode cleanly
# * Log off the telnet session
#
# This is intended as *teaching code*, so it is deliberately verbose
# and heavily commented.
#
# Usage: python3 /home/pi/bpq-extensions/unproto_example.py 3
# Where is a BPQ port number from 1 to 12. (The example above is for port 3)
#
# ---------------------------------------------------------------------------
# Hard-coded IP address of the local BPQ node's telnet server
# (127.0.1.1 is standard for LinBPQ on the same machine)
# ---------------------------------------------------------------------------
bpq_telnet = "127.0.1.1"
# Timeout (seconds) used whenever we wait for a response
READ_TIMEOUT = 20
# Define the destination address explicitly
dest = "CQ"
# ---------------------------------------------------------------------------
# Utility function: drain any pending input from the telnet reader
# This helps ensure the node is in a known, quiet state before we send commands
# ---------------------------------------------------------------------------
async def empty_reader(reader):
await asyncio.sleep(0.5)
while not reader.at_eof() and reader._buffer:
await reader.read(1024)
# ---------------------------------------------------------------------------
# Utility function: read until a specific string is seen, with timeout
# ---------------------------------------------------------------------------
async def read_until(reader, text, timeout=READ_TIMEOUT):
buf = ""
try:
while text not in buf:
chunk = await asyncio.wait_for(reader.read(64), timeout=timeout)
buf += chunk
except asyncio.TimeoutError:
raise TimeoutError(f"Timed out waiting for '{text}'")
return buf
# ---------------------------------------------------------------------------
# Main telnet "shell" function
# This function is called by telnetlib3 once the connection is established
# ---------------------------------------------------------------------------
async def port_shell(port, reader, writer):
logger = logging.getLogger(f"UNPROTO-{port}")
# State flags used to step through login and operation
user_sent = False
pwd_sent = False
unproto_active = False
logger.info("Logging into Telnet")
# This loop runs until we explicitly break out
while True:
try:
# Universal timeout applied to every read
line = await asyncio.wait_for(reader.read(64), timeout=READ_TIMEOUT)
except asyncio.TimeoutError:
logger.error("No response from node within 20 seconds — aborting session")
writer.write("b\r\n") # Cleanly log off
break
# ------------------------------------------------------------
# Step 1: Send username (node callsign)
# The node prompts with something like:
# N2IRZ:
# ------------------------------------------------------------
if not user_sent:
if ":" in line:
username = line.split(":")[0]
writer.write(f"{username}\r\n")
user_sent = True
# ------------------------------------------------------------
# Step 2: Send password
# Default LinBPQ telnet password is usually "p"
# ------------------------------------------------------------
elif not pwd_sent:
if "p" in line or "type p" in line:
writer.write("p\r\n")
pwd_sent = True
# ------------------------------------------------------------
# Step 3: Enter UNPROTO mode (Similarly, you can enter ANY command)
# ------------------------------------------------------------
if user_sent and pwd_sent and not unproto_active:
await empty_reader(reader)
logger.info(f"Entering UNPROTO mode on port {port}")
# Example UNPROTO command:
# UNPROTO 3 CQ
writer.write(f"UNPROTO {port} {dest}\r\n")
# Node indicates UNPROTO mode with text like:
# /ex to exit
try:
await read_until(reader, "/ex to exit")
except TimeoutError as e:
logger.error(str(e))
writer.write("b\r\n")
break
unproto_active = True
logger.info("UNPROTO active, sending UI frames")
# --------------------------------------------------------
# Example UI transmissions
# You could put a loop here if desired
# --------------------------------------------------------
writer.write("This is a test UI frame from Python example code\r\n")
await writer.drain()
await asyncio.sleep(1) # Non-blocking wait, for packet TX timing
writer.write("Second UI frame - learning how UNPROTO works\r\n")
await writer.drain()
# --------------------------------------------------------
# Exit UNPROTO mode (If not Unproto, be sure you exit properly)
# --------------------------------------------------------
logger.info("Exiting UNPROTO mode")
await asyncio.sleep(1)
writer.write("/ex\r\n")
# --------------------------------------------------------
# Log off the telnet session
# --------------------------------------------------------
logger.info("Disconnecting from Telnet")
await asyncio.sleep(1)
writer.write("b\r\n")
break
# ---------------------------------------------------------------------------
# Function that opens the telnet connection and attaches the shell
# ---------------------------------------------------------------------------
async def connect_to_port(port):
factory = partial(port_shell, port)
reader, writer = await telnetlib3.open_connection(
bpq_telnet,
8010,
shell=factory
)
# Wait until the connection is closed cleanly
await writer.protocol.waiter_closed
# ---------------------------------------------------------------------------
# Main program entry point
# ---------------------------------------------------------------------------
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s -- %(message)s"
)
parser = argparse.ArgumentParser(
description="Example code demonstrating UNPROTO operation via LinBPQ telnet",
usage="python3 %(prog)s "
)
parser.add_argument(
"port",
type=int,
help="BPQ port number (1-12)"
)
args = parser.parse_args()
# Validate port range
if not (1 <= args.port <= 12):
parser.error("Port must be an integer from 1 to 12")
asyncio.run(connect_to_port(args.port))
if __name__ == "__main__":
main()
For both programs, you will need to install the "telnetlib3" library as explained here:
In Bullseye, it's trivial to install telnetlib3:
sudo python3 -m pip install telnetlib3
Done and done.
But in Bookworm, it's not a system package, and you can't install it normally. Instead, run these two lines from a command prompt. Don't fear "—break-system-packages": Since telnetlib3 is not installed by Debian, it won't conflict with the packages that Debian does install. Using a virtual environment (one way to use a package without interfering with the rest of the operating system) then requires you to actually use that virtual environment, which is not really practical in TARPN.
sudo python3 -m pip install --break-system-packages telnetlib3
python3 -c "import telnetlib3; print('OK')"
If it says "OK", it should be, well, OK.
You can check it with
python3
import telnetlib3
print(telnetlib3.__file__)
You should see the directory in which it is installed (like /usr/local/lib/python3.11/dist-packages/telnetlib3/__init__.py)
So, some ideas for these samples:
Questions?Ask on the TARPN Discord. You can contact N2IRZ there, or at his address at QRZ.com