Assignment 4: Database
Program file structure
  - 
    module-4/database
    
      - 
        db
        
      
 
      - report.py
 
      - sqlite-sakila.sq
 
    
   
Output
database$ python report.py
report.py
from db import db_access, customer_api
customers = customer_api.get_all_customers_and_rentals()
print(f"{'ID':>3}|{'FirstName':>10}|{'LastName':14}|{'Rentals':3}\t|{'Address'}")
print('=' * 80)
for c in customers:
    print(f"{c['customer_id']:>3} {c['first_name']:>10} {c['last_name']:14} {c['rental_count']:3}\t{c['address']}")
db/customer_api.py
from .db import do_command
def get_all_customers_and_rentals():
  """Queries the database for all customers, addresses, and their rental counts
  :returns a list of dictionaries like {customer_id, last_name, first_name, rental_count, address}
  """
  customers = get_all_customers()
  for c in customers:
    c['rental_count'] = get_rental_count_for_customer(c['customer_id'])
  return customers
def get_all_customers():
  """Queries the database for all customers and addresses
  :returns a list of dicts like {customer_id, last_name, first_name, address}
  """
  return do_command("""SELECT customer_id, last_name, first_name, address
  from customer INNER JOIN address ON customer.address_id = address.address_id""")
def get_customer_by_id(customer_id):
  return do_command("select customer_id, last_name, first_name from customer where customer_id = ?", [customer_id])
def get_rental_count_for_customer(customer_id):
  return do_command("select count(all) as rental_count from rental where inventory_id = ?", [customer_id])[0]['rental_count']
db/db.py
import sqlite3
from os.path import join, split
SQLITE_DATABASE_NAME = 'sqlite-sakila.sq'
def dictionary_factory(cursor, row):
  """
  Create a dictionary from rows in a cursor result.
  The keys will be the column names
  :param cursor   a cursor from which a query row has just been fetched
  :param row      the row that was fetched
  :return         A dictionary associating column names to values
  """
  col_names = [d[0].lower() for d in cursor.description]
  return dict(zip(col_names, row))
def get_connection():
  fname = SQLITE_DATABASE_NAME
  conn = sqlite3.connect(fname)
  conn.row_factory = dictionary_factory
  return conn
def do_command(cmd, args=[]):
  """
  Takes a SQL command and returns a list of results.
  Allows for arguments, providing a default value of []
  """
  try:
    conn = get_connection()
    crs = conn.cursor()
    crs.execute(cmd, args)
    return crs.fetchall()
  finally:
    conn.close()
Assignment 4: Sockets
  - 
    module-4/sockets
    
      - 
        simple_server
        
      
 
      - 
        polynomials
        
      
 
      - polynomial_client.py
 
      - polynomial_server.py
 
    
   
Output
sockets$ python polynomial_server.py &> /dev/null &
sockets$ python polynomial_client.py
polynomial_client.py
import socket
from time import sleep
import logging
SERVER_IP = "localhost"
SERVER_PORT = 12345
def create_connection(ip: str, port: int):
    sock = socket.socket()
    while True:
        try:
            sock.connect((ip, port))
            logging.info(f"Connected to {ip}:{port}")
            return sock
        except OSError as err:
            logging.error(err)
            logging.info(
                f"Failed while connecting to {ip}:{port}. Try again in 3 seconds...")
            sleep(3)
def send_message(sock: socket.socket, message: str):
    message_byte = message.encode()
    sock.sendall(message_byte)
    logging.info(f"Sent message: `{message}`")
    return sock
def receive_message(sock: socket.socket) -> str:
    msg: str = ""
    while True:
        bytes = sock.recv(2048)
        if len(bytes) == 0:
            break
        msg += bytes.decode()  
    if (len(msg) > 0):
        logging.info(f"Received message: \"{msg}\"")
    return msg
def make_request(msg: str):
    with create_connection(SERVER_IP, SERVER_PORT) as sock:
        send_message(sock, msg)
        sock.shutdown(1)
        return receive_message(sock)
testing_strings = (["E1.0 -945 1689 -950 230 -25 1", "E0.0"],
                   ["S0 2 -945 1689 -950 230 -25 1 1e-15",
                    "S1.0000000000000004"],
                   ["G4.1 0 0", "XIncorrect command type"],
                   ["4 1 0",  "Xcould not convert string to float: ''"],
                   ["E1.0", "XToo few arguments"],
                   ["S1.0", "XToo few arguments"],
                   ["S0 2 -945 1689 -950 230 -25 1 -1e-15",
                    "XInvalid tolerance"],
                   ["Not a number",
                    "Xcould not convert string to float: 'ot'"],
                   ["S0 2 -945 1689 -950 230 -25 1 0",
                    "XInvalid tolerance"],
                   ["S0 2 -945 1689 -950 230 G 1 1e-15", "Xcould not convert string to float: 'G'"])
for input, expected in testing_strings:
    print(f"📤  Sent: `{input}`")
    response = make_request(input)
    print(f"📩  Recv: `{response}`")
    if (response != expected):
        print(f"❌  Expected: `{expected}`")
    else:
        print(f"✅  Matched expected response")
    print("")
polynomial_server.py
import logging
from simple_server import server
from polynomials import polynomials
logging.basicConfig(level=logging.INFO)
SERVER_PORT = 12345
def handle_command(client_msg: str):
    """
    Parse command as input to the functions in the polynomial library
    :param a client input string e.g 'E1.0 3.0 1.0'
    :returns an API response message e.g 'XInvalid input'
    """
    if len(client_msg) == 0:
        return 'X' + 'Client message was empty'
    
    command = client_msg[0]     
    params = client_msg[1:].split(" ")
    
    for i in range(0, len(params)):
        try:
            params[i] = float(params[i])
        except ValueError:
            return 'X' + f"could not convert string to float: '{params[i]}'"
    
    if (command not in ('E', 'S')):
        return 'X' + f"Incorrect command type"
    
    if ((command == 'E' and len(params) < 2) or (command == 'S' and len(params) < 4)):
        return 'X' + 'Too few arguments'
    
    if (command == 'E'):
        logging.debug(
            f"Calling polynomials.evaluate({params[0]}, {params[1:]})")
        result = polynomials.evaluate(params[0], params[1:])
    elif (command == 'S'):
        if (params[-1] <= 0):
            return 'X' + f"Invalid tolerance"
        logging.debug(
            f"Calling polynomials.bisection({params[0]}, {params[1]}, {params[2:-1]}, {params[-1]})")
        result = polynomials.bisection(
            params[0], params[1], params[2:-1], params[-1])
    return command + str(result)
with server.create_listener(SERVER_PORT) as listener:
    
    while True:
        server.accept_client(listener, handle_command)