Assignment 4: Database
Program file structure
-
module-4/database
-
db
- report.py
- sqlite-sakila.sq
Output
database$ python report.py
report.py
from db import db_access, customer_api
customers = customer_api.get_all_customers_and_rentals()
print(f"{'ID':>3}|{'FirstName':>10}|{'LastName':14}|{'Rentals':3}\t|{'Address'}")
print('=' * 80)
for c in customers:
print(f"{c['customer_id']:>3} {c['first_name']:>10} {c['last_name']:14} {c['rental_count']:3}\t{c['address']}")
db/customer_api.py
from .db import do_command
def get_all_customers_and_rentals():
"""Queries the database for all customers, addresses, and their rental counts
:returns a list of dictionaries like {customer_id, last_name, first_name, rental_count, address}
"""
customers = get_all_customers()
for c in customers:
c['rental_count'] = get_rental_count_for_customer(c['customer_id'])
return customers
def get_all_customers():
"""Queries the database for all customers and addresses
:returns a list of dicts like {customer_id, last_name, first_name, address}
"""
return do_command("""SELECT customer_id, last_name, first_name, address
from customer INNER JOIN address ON customer.address_id = address.address_id""")
def get_customer_by_id(customer_id):
return do_command("select customer_id, last_name, first_name from customer where customer_id = ?", [customer_id])
def get_rental_count_for_customer(customer_id):
return do_command("select count(all) as rental_count from rental where inventory_id = ?", [customer_id])[0]['rental_count']
db/db.py
import sqlite3
from os.path import join, split
SQLITE_DATABASE_NAME = 'sqlite-sakila.sq'
def dictionary_factory(cursor, row):
"""
Create a dictionary from rows in a cursor result.
The keys will be the column names
:param cursor a cursor from which a query row has just been fetched
:param row the row that was fetched
:return A dictionary associating column names to values
"""
col_names = [d[0].lower() for d in cursor.description]
return dict(zip(col_names, row))
def get_connection():
fname = SQLITE_DATABASE_NAME
conn = sqlite3.connect(fname)
conn.row_factory = dictionary_factory
return conn
def do_command(cmd, args=[]):
"""
Takes a SQL command and returns a list of results.
Allows for arguments, providing a default value of []
"""
try:
conn = get_connection()
crs = conn.cursor()
crs.execute(cmd, args)
return crs.fetchall()
finally:
conn.close()
Assignment 4: Sockets
-
module-4/sockets
-
simple_server
-
polynomials
- polynomial_client.py
- polynomial_server.py
Output
sockets$ python polynomial_server.py &> /dev/null &
sockets$ python polynomial_client.py
polynomial_client.py
import socket
from time import sleep
import logging
SERVER_IP = "localhost"
SERVER_PORT = 12345
def create_connection(ip: str, port: int):
sock = socket.socket()
while True:
try:
sock.connect((ip, port))
logging.info(f"Connected to {ip}:{port}")
return sock
except OSError as err:
logging.error(err)
logging.info(
f"Failed while connecting to {ip}:{port}. Try again in 3 seconds...")
sleep(3)
def send_message(sock: socket.socket, message: str):
message_byte = message.encode()
sock.sendall(message_byte)
logging.info(f"Sent message: `{message}`")
return sock
def receive_message(sock: socket.socket) -> str:
msg: str = ""
while True:
bytes = sock.recv(2048)
if len(bytes) == 0:
break
msg += bytes.decode()
if (len(msg) > 0):
logging.info(f"Received message: \"{msg}\"")
return msg
def make_request(msg: str):
with create_connection(SERVER_IP, SERVER_PORT) as sock:
send_message(sock, msg)
sock.shutdown(1)
return receive_message(sock)
testing_strings = (["E1.0 -945 1689 -950 230 -25 1", "E0.0"],
["S0 2 -945 1689 -950 230 -25 1 1e-15",
"S1.0000000000000004"],
["G4.1 0 0", "XIncorrect command type"],
["4 1 0", "Xcould not convert string to float: ''"],
["E1.0", "XToo few arguments"],
["S1.0", "XToo few arguments"],
["S0 2 -945 1689 -950 230 -25 1 -1e-15",
"XInvalid tolerance"],
["Not a number",
"Xcould not convert string to float: 'ot'"],
["S0 2 -945 1689 -950 230 -25 1 0",
"XInvalid tolerance"],
["S0 2 -945 1689 -950 230 G 1 1e-15", "Xcould not convert string to float: 'G'"])
for input, expected in testing_strings:
print(f"📤 Sent: `{input}`")
response = make_request(input)
print(f"📩 Recv: `{response}`")
if (response != expected):
print(f"❌ Expected: `{expected}`")
else:
print(f"✅ Matched expected response")
print("")
polynomial_server.py
import logging
from simple_server import server
from polynomials import polynomials
logging.basicConfig(level=logging.INFO)
SERVER_PORT = 12345
def handle_command(client_msg: str):
"""
Parse command as input to the functions in the polynomial library
:param a client input string e.g 'E1.0 3.0 1.0'
:returns an API response message e.g 'XInvalid input'
"""
if len(client_msg) == 0:
return 'X' + 'Client message was empty'
command = client_msg[0]
params = client_msg[1:].split(" ")
for i in range(0, len(params)):
try:
params[i] = float(params[i])
except ValueError:
return 'X' + f"could not convert string to float: '{params[i]}'"
if (command not in ('E', 'S')):
return 'X' + f"Incorrect command type"
if ((command == 'E' and len(params) < 2) or (command == 'S' and len(params) < 4)):
return 'X' + 'Too few arguments'
if (command == 'E'):
logging.debug(
f"Calling polynomials.evaluate({params[0]}, {params[1:]})")
result = polynomials.evaluate(params[0], params[1:])
elif (command == 'S'):
if (params[-1] <= 0):
return 'X' + f"Invalid tolerance"
logging.debug(
f"Calling polynomials.bisection({params[0]}, {params[1]}, {params[2:-1]}, {params[-1]})")
result = polynomials.bisection(
params[0], params[1], params[2:-1], params[-1])
return command + str(result)
with server.create_listener(SERVER_PORT) as listener:
while True:
server.accept_client(listener, handle_command)