Assignment 4

Simple database and socket programs in Python

Assignment 4: Database

Program file structure

Output

database$ python report.py
# ID| FirstName|LastName |Rentals |Address
# ==============================================================================
# 1 MARY SMITH 3 1913 Hanoi Way
# 2 PATRICIA JOHNSON 5 1121 Loja Avenue
# 3 LINDA WILLIAMS 2 692 Joliet Street
# 4 BARBARA JONES 2 1566 Inegl Manor
# 5 ELIZABETH BROWN 0 53 Idfu Parkway
# 6 JENNIFER DAVIS 5 1795 Santiago de Compostela Way
# 7 MARIA MILLER 4 900 Santiago de Compostela Parkway
# 8 SUSAN WILSON 2 478 Joliet Way
# 9 MARGARET MOORE 2 613 Korolev Drive
# 10 DOROTHY TAYLOR 3 1531 Sal Drive
# ... 400 more lines

report.py

# report.py

from db import db_access, customer_api

# Make a database query
customers = customer_api.get_all_customers_and_rentals()

# Attempt to nicely format the output as a table
print(f"{'ID':>3}|{'FirstName':>10}|{'LastName':14}|{'Rentals':3}\t|{'Address'}")
print('=' * 80)
for c in customers:
print(f"{c['customer_id']:>3} {c['first_name']:>10} {c['last_name']:14} {c['rental_count']:3}\t{c['address']}")

db/customer_api.py

# db/customer.api.py

from .db import do_command

def get_all_customers_and_rentals():
"""Queries the database for all customers, addresses, and their rental counts
:returns a list of dictionaries like {customer_id, last_name, first_name, rental_count, address}
"""

customers = get_all_customers()
for c in customers:
c['rental_count'] = get_rental_count_for_customer(c['customer_id'])
return customers

def get_all_customers():
"""Queries the database for all customers and addresses
:returns a list of dicts like {customer_id, last_name, first_name, address}
"""

return do_command("""SELECT customer_id, last_name, first_name, address
from customer INNER JOIN address ON customer.address_id = address.address_id"""
)

def get_customer_by_id(customer_id):
return do_command("select customer_id, last_name, first_name from customer where customer_id = ?", [customer_id])

def get_rental_count_for_customer(customer_id):
return do_command("select count(all) as rental_count from rental where inventory_id = ?", [customer_id])[0]['rental_count']

db/db.py

# db/db.py

import sqlite3
from os.path import join, split

# this file is external to the code
SQLITE_DATABASE_NAME = 'sqlite-sakila.sq'

def dictionary_factory(cursor, row):
"""
Create a dictionary from rows in a cursor result.
The keys will be the column names
:param cursor a cursor from which a query row has just been fetched
:param row the row that was fetched
:return A dictionary associating column names to values
"""

col_names = [d[0].lower() for d in cursor.description]
return dict(zip(col_names, row))

def get_connection():
fname = SQLITE_DATABASE_NAME
conn = sqlite3.connect(fname)
conn.row_factory = dictionary_factory
return conn

def do_command(cmd, args=[]):
"""
Takes a SQL command and returns a list of results.
Allows for arguments, providing a default value of []
"""

try:
conn = get_connection()
crs = conn.cursor()
crs.execute(cmd, args)
return crs.fetchall()
finally:
conn.close()

Assignment 4: Sockets

Output

sockets$ python polynomial_server.py &> /dev/null &
# (server running in background on port 12345)
sockets$ python polynomial_client.py
#📤 Sent: `E1.0 -945 1689 -950 230 -25 1`
#📩 Recv: `E0.0`
#✅ Matched expected response
#
#📤 Sent: `S0 2 -945 1689 -950 230 -25 1 1e-15`
#📩 Recv: `S1.0000000000000004`
#✅ Matched expected response
#
#📤 Sent: `G4.1 0 0`
#📩 Recv: `XIncorrect command type`
#✅ Matched expected response
#
#📤 Sent: `4 1 0`
#📩 Recv: `Xcould not convert string to float: ''`
#✅ Matched expected response
#
#📤 Sent: `E1.0`
#📩 Recv: `XToo few arguments`
#✅ Matched expected response
#
#📤 Sent: `S1.0`
#📩 Recv: `XToo few arguments`
#✅ Matched expected response
#
#📤 Sent: `S0 2 -945 1689 -950 230 -25 1 -1e-15`
#📩 Recv: `XInvalid tolerance`
#✅ Matched expected response
#
#📤 Sent: `Not a number`
#📩 Recv: `Xcould not convert string to float: 'ot'`
#✅ Matched expected response
#
#📤 Sent: `S0 2 -945 1689 -950 230 -25 1 0`
#📩 Recv: `XInvalid tolerance`
#✅ Matched expected response
#
#📤 Sent: `S0 2 -945 1689 -950 230 G 1 1e-15`
#📩 Recv: `Xcould not convert string to float: 'G'`
#✅ Matched expected response

polynomial_client.py

# polynomial_client.py

import socket
from time import sleep
import logging

SERVER_IP = "localhost"
SERVER_PORT = 12345


def create_connection(ip: str, port: int):
sock = socket.socket()
while True:
try:
sock.connect((ip, port))
logging.info(f"Connected to {ip}:{port}")
return sock
except OSError as err:
logging.error(err)
logging.info(
f"Failed while connecting to {ip}:{port}. Try again in 3 seconds...")
sleep(3)


def send_message(sock: socket.socket, message: str):
message_byte = message.encode()
sock.sendall(message_byte)
logging.info(f"Sent message: `{message}`")
return sock


def receive_message(sock: socket.socket) -> str:
msg: str = ""
while True:
bytes = sock.recv(2048)
if len(bytes) == 0:
break
msg += bytes.decode() # UTF-
if (len(msg) > 0):
logging.info(f"Received message: \"{msg}\"")
return msg


def make_request(msg: str):
with create_connection(SERVER_IP, SERVER_PORT) as sock:
send_message(sock, msg)
sock.shutdown(1)
return receive_message(sock)


testing_strings = (["E1.0 -945 1689 -950 230 -25 1", "E0.0"],
["S0 2 -945 1689 -950 230 -25 1 1e-15",
"S1.0000000000000004"],
["G4.1 0 0", "XIncorrect command type"],
["4 1 0", "Xcould not convert string to float: ''"],
["E1.0", "XToo few arguments"],
["S1.0", "XToo few arguments"],
["S0 2 -945 1689 -950 230 -25 1 -1e-15",
"XInvalid tolerance"],
["Not a number",
"Xcould not convert string to float: 'ot'"],
["S0 2 -945 1689 -950 230 -25 1 0",
"XInvalid tolerance"],
["S0 2 -945 1689 -950 230 G 1 1e-15", "Xcould not convert string to float: 'G'"])

# logging.basicConfig(level=logging.INFO)

for input, expected in testing_strings:
print(f"📤 Sent: `{input}`")
response = make_request(input)
print(f"📩 Recv: `{response}`")
if (response != expected):
print(f"❌ Expected: `{expected}`")
else:
print(f"✅ Matched expected response")
print("")

polynomial_server.py

import logging

from simple_server import server
from polynomials import polynomials

logging.basicConfig(level=logging.INFO)

SERVER_PORT = 12345

# Define message handler command


def handle_command(client_msg: str):
"""
Parse command as input to the functions in the polynomial library
:param a client input string e.g 'E1.0 3.0 1.0'
:returns an API response message e.g 'XInvalid input'
"""

if len(client_msg) == 0:
return 'X' + 'Client message was empty'
# Parse string message
command = client_msg[0] # first character is an instruction
params = client_msg[1:].split(" ")
# Attempt to parse the input
for i in range(0, len(params)):
try:
params[i] = float(params[i])
except ValueError:
return 'X' + f"could not convert string to float: '{params[i]}'"
# Check that the command is valid
if (command not in ('E', 'S')):
return 'X' + f"Incorrect command type"
# Verify the number of parameters
if ((command == 'E' and len(params) < 2) or (command == 'S' and len(params) < 4)):
return 'X' + 'Too few arguments'
# Execute commands
if (command == 'E'):
logging.debug(
f"Calling polynomials.evaluate({params[0]}, {params[1:]})")
result = polynomials.evaluate(params[0], params[1:])
elif (command == 'S'):
if (params[-1] <= 0):
return 'X' + f"Invalid tolerance"
logging.debug(
f"Calling polynomials.bisection({params[0]}, {params[1]}, {params[2:-1]}, {params[-1]})")
result = polynomials.bisection(
params[0], params[1], params[2:-1], params[-1])
return command + str(result)


with server.create_listener(SERVER_PORT) as listener:
# Loop until program is killed
while True:
server.accept_client(listener, handle_command)