Skip to content
This repository has been archived by the owner on Nov 16, 2024. It is now read-only.

Core #40

Merged
merged 11 commits into from
May 31, 2022
Merged

Core #40

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-build-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
- run: pytest
env:
LD_LIBRARY_PATH: /usr/local/lib
NPBC_DATABASE_DIR: data

# build executable for linux
build-linux:
Expand Down
93 changes: 53 additions & 40 deletions npbc_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
"""


import sqlite3
from sqlite3 import DatabaseError, connect, Connection
from argparse import ArgumentParser
from argparse import Namespace as ArgNamespace
from collections.abc import Generator
from datetime import datetime
from sys import argv
from typing import Generator

from colorama import Fore, Style

Expand Down Expand Up @@ -176,7 +176,7 @@ def status_print(status: bool, message: str) -> None:
print(f"{Style.BRIGHT}{message}{Style.RESET_ALL}\n")


def calculate(parsed_arguments: ArgNamespace) -> None:
def calculate(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""calculate the cost for a given month and year
- default to the previous month if no month and no year is given
- default to the current month if no month is given and year is given
Expand Down Expand Up @@ -210,12 +210,12 @@ def calculate(parsed_arguments: ArgNamespace) -> None:
# prepare a dictionary for undelivered strings
undelivered_strings = {
int(paper_id): []
for paper_id, _, _, _, _ in npbc_core.get_papers()
for paper_id, _, _, _, _ in npbc_core.get_papers(connection)
}

# get the undelivered strings from the database
try:
raw_undelivered_strings = npbc_core.get_undelivered_strings(month=month, year=year)
raw_undelivered_strings = npbc_core.get_undelivered_strings(connection, month=month, year=year)

# add them to the dictionary
for _, paper_id, _, _, string in raw_undelivered_strings:
Expand All @@ -226,33 +226,34 @@ def calculate(parsed_arguments: ArgNamespace) -> None:
pass

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

try:
# calculate the cost for each paper
costs, total, undelivered_dates = npbc_core.calculate_cost_of_all_papers(
connection,
undelivered_strings,
month,
year
)

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

# format the results
formatted = '\n'.join(npbc_core.format_output(costs, total, month, year))
formatted = '\n'.join(npbc_core.format_output(connection, costs, total, month, year))

# unless the user specifies so, log the results to the database
if not parsed_arguments.nolog:
try:
npbc_core.save_results(costs, undelivered_dates, month, year)
npbc_core.save_results(connection, costs, undelivered_dates, month, year)

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

Expand All @@ -263,7 +264,7 @@ def calculate(parsed_arguments: ArgNamespace) -> None:
print(f"SUMMARY:\n\n{formatted}")


def addudl(parsed_arguments: ArgNamespace) -> None:
def addudl(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""add undelivered strings to the database
- default to the current month if no month and/or no year is given"""

Expand All @@ -286,7 +287,7 @@ def addudl(parsed_arguments: ArgNamespace) -> None:

# attempt to add the strings to the database
try:
npbc_core.add_undelivered_string(month, year, parsed_arguments.paperid, *parsed_arguments.strings)
npbc_core.add_undelivered_string(connection, month, year, parsed_arguments.paperid, *parsed_arguments.strings)

# if the paper doesn't exist, print an error message
except npbc_exceptions.PaperNotExists:
Expand All @@ -299,7 +300,7 @@ def addudl(parsed_arguments: ArgNamespace) -> None:
return

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

Expand All @@ -311,7 +312,7 @@ def addudl(parsed_arguments: ArgNamespace) -> None:
status_print(True, "Success!")


def deludl(parsed_arguments: ArgNamespace) -> None:
def deludl(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""delete undelivered strings from the database"""

# validate the month and year
Expand All @@ -326,6 +327,7 @@ def deludl(parsed_arguments: ArgNamespace) -> None:
# attempt to delete the strings from the database
try:
npbc_core.delete_undelivered_string(
connection,
month=parsed_arguments.month,
year=parsed_arguments.year,
paper_id=parsed_arguments.paperid,
Expand All @@ -344,14 +346,14 @@ def deludl(parsed_arguments: ArgNamespace) -> None:
return

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

status_print(True, "Success!")


def getudl(parsed_arguments: ArgNamespace) -> None:
def getudl(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""get undelivered strings from the database
filter by whichever parameter the user provides. they as many as they want.
available parameters: month, year, paper_id, string_id, string"""
Expand All @@ -368,6 +370,7 @@ def getudl(parsed_arguments: ArgNamespace) -> None:
# attempt to get the strings from the database
try:
undelivered_strings = npbc_core.get_undelivered_strings(
connection,
month=parsed_arguments.month,
year=parsed_arguments.year,
paper_id=parsed_arguments.paperid,
Expand Down Expand Up @@ -401,7 +404,7 @@ def extract_delivery_from_user_input(input_delivery: str) -> list[bool]:
return list(map(lambda x: x == 'Y', input_delivery))


def extract_costs_from_user_input(paper_id: int | None, delivery_data: list[bool] | None, *input_costs: float) -> Generator[float, None, None]:
def extract_costs_from_user_input(connection: Connection, paper_id: int | None, delivery_data: list[bool] | None, *input_costs: float) -> Generator[float, None, None]:
"""convert the user input to a float list"""

# filter the data to remove zeros
Expand Down Expand Up @@ -430,10 +433,10 @@ def extract_costs_from_user_input(paper_id: int | None, delivery_data: list[bool

# get the delivery data from the database, and filter for the paper ID
try:
raw_data = [paper for paper in npbc_core.get_papers() if paper[0] == int(paper_id)]
raw_data = [paper for paper in npbc_core.get_papers(connection) if paper[0] == int(paper_id)]

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

Expand All @@ -457,7 +460,7 @@ def extract_costs_from_user_input(paper_id: int | None, delivery_data: list[bool
raise npbc_exceptions.InvalidInput("Neither delivery data nor paper ID given.")


def editpaper(parsed_arguments: ArgNamespace) -> None:
def editpaper(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""edit a paper's information"""


Expand All @@ -468,10 +471,11 @@ def editpaper(parsed_arguments: ArgNamespace) -> None:

# attempt to edit the paper. if costs are given, use them, else use None
npbc_core.edit_existing_paper(
connection,
paper_id=parsed_arguments.paperid,
name=parsed_arguments.name,
days_delivered=delivery_data,
days_cost=list(extract_costs_from_user_input(parsed_arguments.paperid, delivery_data, *parsed_arguments.costs)) if parsed_arguments.costs else None
days_cost=list(extract_costs_from_user_input(connection, parsed_arguments.paperid, delivery_data, *parsed_arguments.costs)) if parsed_arguments.costs else None
)

# if the paper doesn't exist, print an error message
Expand All @@ -485,14 +489,14 @@ def editpaper(parsed_arguments: ArgNamespace) -> None:
return

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

status_print(True, "Success!")


def addpaper(parsed_arguments: ArgNamespace) -> None:
def addpaper(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""add a new paper to the database"""

try:
Expand All @@ -501,9 +505,10 @@ def addpaper(parsed_arguments: ArgNamespace) -> None:

# attempt to add the paper.
npbc_core.add_new_paper(
connection,
name=parsed_arguments.name,
days_delivered=delivery_data,
days_cost=list(extract_costs_from_user_input(None, delivery_data, *parsed_arguments.costs))
days_cost=list(extract_costs_from_user_input(connection, None, delivery_data, *parsed_arguments.costs))
)

# if the paper already exists, print an error message
Expand All @@ -517,45 +522,45 @@ def addpaper(parsed_arguments: ArgNamespace) -> None:
return

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

status_print(True, "Success!")


def delpaper(parsed_arguments: ArgNamespace) -> None:
def delpaper(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""delete a paper from the database"""

# attempt to delete the paper
try:
npbc_core.delete_existing_paper(parsed_arguments.paperid)
npbc_core.delete_existing_paper(connection, parsed_arguments.paperid)

# if the paper doesn't exist, print an error message
except npbc_exceptions.PaperNotExists:
status_print(False, "Paper does not exist.")
return

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

status_print(True, "Success!")


def getpapers(parsed_arguments: ArgNamespace) -> None:
def getpapers(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""get a list of all papers in the database
- filter by whichever parameter the user provides. they may use as many as they want (but keys are always printed)
- available parameters: name, days, costs
- the output is provided as a formatted table, printed to the standard output"""

# get the papers from the database
try:
raw_data = npbc_core.get_papers()
raw_data = npbc_core.get_papers(connection)

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

Expand Down Expand Up @@ -611,7 +616,7 @@ def getpapers(parsed_arguments: ArgNamespace) -> None:
delivery = [
''.join([
'Y' if days[paper_id][day_id]['delivery'] else 'N'
for day_id, _ in enumerate(npbc_core.WEEKDAY_NAMES)
for day_id in range(len(npbc_core.WEEKDAY_NAMES))
])
for paper_id in ids
]
Expand All @@ -624,7 +629,7 @@ def getpapers(parsed_arguments: ArgNamespace) -> None:
costs = [
';'.join([
str(days[paper_id][day_id]['cost'])
for day_id, _ in enumerate(npbc_core.WEEKDAY_NAMES)
for day_id in range(len(npbc_core.WEEKDAY_NAMES))
if days[paper_id][day_id]['cost'] != 0
])
for paper_id in ids
Expand Down Expand Up @@ -652,7 +657,7 @@ def getpapers(parsed_arguments: ArgNamespace) -> None:
print()


def getlogs(parsed_arguments: ArgNamespace) -> None:
def getlogs(parsed_arguments: ArgNamespace, connection: Connection) -> None:
"""get a list of all logs in the database
- filter by whichever parameter the user provides. they may use as many as they want (but log IDs are always printed)
- available parameters: log_id, paper_id, month, year, timestamp
Expand All @@ -661,6 +666,7 @@ def getlogs(parsed_arguments: ArgNamespace) -> None:
# attempt to get the logs from the database
try:
data = npbc_core.get_logged_data(
connection,
query_log_id = parsed_arguments.logid,
query_paper_id=parsed_arguments.paperid,
query_month=parsed_arguments.month,
Expand All @@ -669,7 +675,7 @@ def getlogs(parsed_arguments: ArgNamespace) -> None:
)

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error. Please report this to the developer.\n{e}")
return

Expand Down Expand Up @@ -705,18 +711,25 @@ def main(arguments: list[str]) -> None:

# attempt to initialize the database
try:
npbc_core.setup_and_connect_DB()
database_path = npbc_core.create_and_setup_DB()

# if there is a database error, print an error message
except sqlite3.DatabaseError as e:
except DatabaseError as e:
status_print(False, f"Database error: {e}\nPlease report this to the developer.")
return

# parse the command line arguments
parsed_namespace = define_and_read_args(arguments)

# execute the appropriate function
parsed_namespace.func(parsed_namespace)
try:

with connect(database_path) as connection:
# execute the appropriate function
parsed_namespace.func(parsed_namespace, connection)

# close the database connection
finally:
connection.close() # type: ignore


if __name__ == "__main__":
Expand Down
Loading