Python

Python programming language

Batch File Rename

import glob
import os

for filename in glob.glob("./**/*.ext", recursive=True):
    new_name = "-".join(filename.split("_"))
    os.rename(filename, new_name)

Logging Basics

import argparse
import logging
import os
from datetime import datetime

# Do not specify __name__ to use root log level
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s : %(msecs)04d : %(name)s : %(levelname)s : %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log_file = (
    f"{os.path.splitext(__file__)[0]}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.log"
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)


def clparser() -> argparse.ArgumentParser:
    """Create a parser to handle input arguments and displaying a help message."""
    desc_msg = """My logging program."""
    parser = argparse.ArgumentParser(description=desc_msg)
    parser.add_argument(
        "-l",
        "--logfile",
        action="store_true",
        help="Write log messages to a file.",
    )
    parser.add_argument(
        "-v",
        "--verbose",
        action="store_true",
        help="Control the amount of information to display.",
    )
    return parser


if __name__ == "__main__":
    args = clparser().parse_args()
    if args.verbose:
        logger.addHandler(stream_handler)
    if args.logfile:
        file_handler = logging.FileHandler(filename=log_file)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    logger.info("Begin my test module")

Sending HTTP Requests

import logging

import requests

logger = logging.getLogger(__name__)


def send_request(request_type: str, url: str, **kwargs) -> requests.Response:
    """Send an HTTP request.

    Args:
    ----
        request_type (str): Accepts "GET" or "POST"
        url (str): Request URL

    Returns:
    -------
        requests.Response: Request response.
    """
    valid_methods = ("GET", "POST")
    if request_type.upper() not in valid_methods:
        raise ValueError(f"Invalid request type. Supported types: {valid_methods}")
    try:
        response = requests.request(request_type.upper(), url, **kwargs)
        response.raise_for_status()  # Raises an exception if status code >= 400
        return response
    except requests.exceptions.RequestException as err:
        logger.error(f"{err}. Request type: {request_type}. URL: {url}. Args: {kwargs}")
        raise err

# Example GET request
response = send_request("GET", "https://api.publicapis.org/entries", timeout=5)
logger.info(response.json())

# Example POST request
response = send_request("POST", "https://someurl.com", headers={}, timeout=5)
logger.info(response.json())

Regular Expressions

See here for more information on regex syntax.

Code
import re

text_string = """
Hello world

8001234567
800-321-7654
900.987.6543

some.email@email.com
mycompany@company.net
wierd-12-address-4@somedomain.blah
"""

pattern = re.compile(r"[0-9]{3}[.-]?[0-9]{3}[.-]?[0-9]{4}")
matches = re.finditer(pattern, text_string)

for match in matches:
    print(match)
    print(match.span())
    print(text_string[match.start() : match.end()])
<re.Match object; span=(14, 24), match='8001234567'>
(14, 24)
8001234567
<re.Match object; span=(25, 37), match='800-321-7654'>
(25, 37)
800-321-7654
<re.Match object; span=(38, 50), match='900.987.6543'>
(38, 50)
900.987.6543

OOP Basics

Code
class Employee:
    """Create an employee object with relevant attributes."""

    annual_raise_pct = 0.04
    employee_no = 1

    def __init__(self, first_name, last_name, position, years_employed=1):
        """Function called when new object is initiated."""
        self.first_name = first_name
        self.last_name = last_name
        self.position = position
        self.years_employed = years_employed
        self.employee_number = Employee.employee_no
        self.email = self.first_name + "." + self.last_name + "@company.com"
        self.salary = self.calculate_salary()
        Employee.employee_no += 1

    def starting_salary(self):
        """Return the starting salary for each position at company"""
        if self.position == "HR":
            return 25000
        elif self.position == "Management":
            return 50000
        elif self.position == "Developer":
            return 100000
        elif self.position == "CEO":
            return 200000
        else:
            return None

    def calculate_salary(self):
        salary = self.starting_salary()
        for year in range(self.years_employed):
            salary = int(salary + (salary * Employee.annual_raise_pct))
        return salary

    def __repr__(self):
        """Create representational object string"""
        return f"""Employee(first_name = {self.first_name}, last_name = {self.last_name}, position = {self.position}, years_employed = {self.years_employed})"""


current_employees = [
    Employee("Geo", "Coug", "Developer", 6),
    Employee("Jane", "Doe", "HR", 4),
    Employee("John", "Doe", "Management", 15),
    Employee("Bob", "Loblaw", "CEO", 24),
]

for e in current_employees:
    print(f"Employee Number: {e.employee_number}")
    print(e)
    print("Email:", e.email)
    print("Salary: ${:,}\n".format(e.salary))
Employee Number: 1
Employee(first_name = Geo, last_name = Coug, position = Developer, years_employed = 6)
Email: Geo.Coug@company.com
Salary: $126,530

Employee Number: 2
Employee(first_name = Jane, last_name = Doe, position = HR, years_employed = 4)
Email: Jane.Doe@company.com
Salary: $29,245

Employee Number: 3
Employee(first_name = John, last_name = Doe, position = Management, years_employed = 15)
Email: John.Doe@company.com
Salary: $90,039

Employee Number: 4
Employee(first_name = Bob, last_name = Loblaw, position = CEO, years_employed = 24)
Email: Bob.Loblaw@company.com
Salary: $512,645

DBMS Data Types

Code
import pandas as pd

df = pd.read_csv("../../../static/development/data-types.csv")
df.to_html(index=False)
Data Type Postgres MariaDB SQL Server Firebird MS-Access SQLite
Timestamp with time zone 1184.0 NaN NaN NaN NaN NaN
Timestamp 1184.0 7 NaN type ‘datetime.datetime’ NaN NaN
Datetime NaN 12 class ‘datetime.datetime’ NaN class ‘datetime.datetime’ NaN
Date 1082.0 10 class ‘datetime.date’ type ‘datetime.date’ class ‘datetime.datetime’ NaN
Time 1083.0 11 class ‘datetime.time’ type ‘datetime.time’ class ‘datetime.datetime’ NaN
Boolean 16.0 16 class ‘bool’ NaN class ‘bool’ NaN
Small integer 21.0 1 class ‘int’ NaN class ‘int’ NaN
Integer 23.0 2 class ‘int’ type ‘int’ class ‘int’ NaN
Long integer 20.0 3 class ‘int’ type ‘long’ NaN NaN
Single 701.0 4 class ‘float’ type ‘float’ class ‘float’ NaN
Double precision 701.0 5 class ‘float’ type ‘float’ class ‘float’ NaN
Decimal 1700.0 0 class ‘decimal.Decimal’ class ‘decimal.Decimal’ NaN NaN
Currency 790.0 NaN NaN class ‘decimal.Decimal’ class ‘decimal.Decimal’ NaN
Character 1042.0 NaN class ‘str’ type ‘str’ class ‘str’ NaN
Character varying 1043.0 15 class ‘str’ type ‘str’ class ‘str’ NaN
Text 25.0 NaN class ‘str’ type ‘str’ class ‘str’ NaN
Binary / BLOB 17.0 249,250,251,252 class ‘bytearray’ type ‘str’ type ‘bytearray’ NaN

DBMS Libraries

  • Postgres: psycopg2
  • MariaDB: pymysql
  • SQL Server: pyodbc
  • Firebird: fdb
  • MS-Access: pyodbc
  • SQLite: sqlite3

DB Table Dependencies

def dependency_order(dep_list):
    rem_tables = list(set([t[0] for t in dep_list] + [t[1] for t in dep_list]))
    rem_dep = copy.copy(dep_list)
    sortkey = 1
    ret_list = []
    while len(rem_dep) > 0:
        tbls = [tbl for tbl in rem_tables if tbl not in [dep[0] for dep in rem_dep]]
        ret_list.extend([(tb, sortkey) for tb in tbls])
        rem_tables = [tbl for tbl in rem_tables if tbl not in tbls]
        rem_dep = [dep for dep in rem_dep if dep[1] not in tbls]
        sortkey += 1
    if len(rem_tables) > 0:
        ret_list.extend([(tb, sortkey) for tb in rem_tables])
    ret_list.sort(cmp=lambda x, y: cmp(x[1], y[1]))
    return [item[0] for item in ret_list]

CSV Sniffer

import re

class CsvDiagError(Exception):
    def __init__(self, msg):
        self.value = msg

    def __str__(self):
        return self.value

class CsvLine:
    escchar = "\\"

    def __init__(self, line_text):
        self.text = line_text
        self.delim_counts = {}
        self.item_errors = []  # A list of error messages.

    def __str__(self):
        return "; ".join(
            [
                "Text: <<%s>>" % self.text,
                "Delimiter counts: <<%s>>"
                % ", ".join(
                    [
                        "%s: %d" % (k, self.delim_counts[k])
                        for k in self.delim_counts.keys()
                    ]
                ),
            ]
        )

    def count_delim(self, delim):
        # If the delimiter is a space, consider multiple spaces to be equivalent
        # to a single delimiter, split on the space(s), and consider the delimiter
        # count to be one fewer than the items returned.
        if delim == " ":
            self.delim_counts[delim] = max(0, len(re.split(r" +", self.text)) - 1)
        else:
            self.delim_counts[delim] = self.text.count(delim)

    def delim_count(self, delim):
        return self.delim_counts[delim]

    def _well_quoted(self, element, qchar):
        # A well-quoted element has either no quotes, a quote on each end and none
        # in the middle, or quotes on both ends and every internal quote is either
        # doubled or escaped.
        # Returns a tuple of three booleans; the first indicates whether the element is
        # well-quoted, the second indicates whether the quote character is used
        # at all, and the third indicates whether the escape character is used.
        if qchar not in element:
            return (True, False, False)
        if len(element) == 0:
            return (True, False, False)
        if element[0] == qchar and element[-1] == qchar and qchar not in element[1:-1]:
            return (True, True, False)
        # The element has quotes; if it doesn't have one on each end, it is not well-quoted.
        if not (element[0] == qchar and element[-1] == qchar):
            return (False, True, False)
        e = element[1:-1]
        # If there are no quotes left after removing doubled quotes, this is well-quoted.
        if qchar not in e.replace(qchar + qchar, ""):
            return (True, True, False)
        # if there are no quotes left after removing escaped quotes, this is well-quoted.
        if qchar not in e.replace(self.escchar + qchar, ""):
            return (True, True, True)
        return (False, True, False)

    def record_format_error(self, pos_no, errmsg):
        self.item_errors.append("%s in position %d." % (errmsg, pos_no))

    def items(self, delim, qchar):
        # Parses the line into a list of items, breaking it at delimiters that are not
        # within quoted stretches.  (This is a almost CSV parser, for valid delim and qchar,
        # except that it does not eliminate quote characters or reduce escaped quotes.)
        self.item_errors = []
        if qchar is None:
            if delim is None:
                return self.text
            else:
                if delim == " ":
                    return re.split(r" +", self.text)
                else:
                    return self.text.split(delim)
        elements = []  # The list of items on the line that will be returned.
        eat_multiple_delims = delim == " "
        # States of the FSM:
        # _IN_QUOTED: An opening quote has been seen, but no closing quote encountered.
        #  Actions / transition:
        #   quote: save char in escape buffer / _ESCAPED
        #   esc_char : save char in escape buffer / _ESCAPED
        #   delimiter: save char in element buffer / _IN_QUOTED
        #   other: save char in element buffer / _IN_QUOTED
        # _ESCAPED: An escape character has been seen while _IN_QUOTED (and is in the escape buffer).
        #  Actions / transitions
        #   quote: save escape buffer in element buffer, empty escape buffer,
        #    save char in element buffer / _IN_QUOTED
        #   delimiter: save escape buffer in element buffer, empty escape buffer,
        #    save element buffer, empty element buffer / _BETWEEN
        #   other: save escape buffer in element buffer, empty escape buffer,
        #    save char in element buffer / _IN_QUOTED
        # _QUOTE_IN_QUOTED: A quote has been seen while _IN_QUOTED (and is in the escape buffer).
        #  Actions / transitions
        #   quote: save escape buffer in element buffer, empty escape buffer,
        #    save char in element buffer / _IN_QUOTED
        #   delimiter: save escape buffer in element buffer, empty escape buffer,
        #    save element buffer, empty element buffer / _DELIMITED
        #   other: save escape buffer in element buffer, empty escape buffer,
        #    save char in element buffer / _IN_QUOTED
        #     (An 'other' character in this position represents a bad format:
        #     a quote not followed by another quote or a delimiter.)
        # _IN_UNQUOTED: A non-delimiter, non-quote has been seen.
        #  Actions / transitions
        #   quote: save char in element buffer / _IN_UNQUOTED
        #    (This represents a bad format.)
        #   delimiter: save element buffer, empty element buffer / _DELIMITED
        #   other: save char in element buffer / _IN_UNQUOTED
        # _BETWEEN: Not in an element, and a delimiter not seen.  This is the starting state,
        #   and the state following a closing quote but before a delimiter is seen.
        #  Actions / transition:
        #   quote: save char in element buffer / _IN_QUOTED
        #   delimiter: save element buffer, empty element buffer / _DELIMITED
        #    (The element buffer should be empty, representing a null data item.)
        #   other: save char in element buffer / _IN_UNQUOTED
        # _DELIMITED: A delimiter has been seen while not in a quoted item.
        #  Actions / transition:
        #   quote: save char in element buffer / _IN_QUOTED
        #   delimiter: if eat_multiple: no action / _DELIMITED
        #     if not eat_multiple: save element buffer, empty element buffer / _DELIMITED
        #   other: save char in element buffer / _IN_UNQUOTED
        # At end of line: save escape buffer in element buffer, save element buffer.  For a well-formed
        # line, these should be empty, but they may not be.
        #
        # Define the state constants, which will also be used as indexes into an execution vector.
        (
            _IN_QUOTED,
            _ESCAPED,
            _QUOTE_IN_QUOTED,
            _IN_UNQUOTED,
            _BETWEEN,
            _DELIMITED,
        ) = range(6)
        #
        # Because of Python 2.7's scoping rules:
        # * The escape buffer and current element are defined as mutable objects that will have their
        #  first elements modified, rather than as string variables.  (Python 2.x does not allow
        #  modification of a variable in an enclosing scope that is not the global scope, but
        #  mutable objects like lists can be altered.  Another approach would be to implement this
        #  as a class and use instance variables.)
        # * The action functions return the next state rather than assigning it directly to the 'state' variable.
        esc_buf = [""]
        current_element = [""]

        def in_quoted():
            if c == self.escchar:
                esc_buf[0] = c
                return _ESCAPED
            elif c == qchar:
                esc_buf[0] = c
                return _QUOTE_IN_QUOTED
            else:
                current_element[0] += c
                return _IN_QUOTED

        def escaped():
            if c == delim:
                current_element[0] += esc_buf[0]
                esc_buf[0] = ""
                elements.append(current_element[0])
                current_element[0] = ""
                return _BETWEEN
            else:
                current_element[0] += esc_buf[0]
                esc_buf[0] = ""
                current_element[0] += c
                return _IN_QUOTED

        def quote_in_quoted():
            if c == qchar:
                current_element[0] += esc_buf[0]
                esc_buf[0] = ""
                current_element[0] += c
                return _IN_QUOTED
            elif c == delim:
                current_element[0] += esc_buf[0]
                esc_buf[0] = ""
                elements.append(current_element[0])
                current_element[0] = ""
                return _DELIMITED
            else:
                current_element[0] += esc_buf[0]
                esc_buf[0] = ""
                current_element[0] += c
                self.record_format_error(
                    i + 1, "Unexpected character following a closing quote"
                )
                return _IN_QUOTED

        def in_unquoted():
            if c == delim:
                elements.append(current_element[0])
                current_element[0] = ""
                return _DELIMITED
            else:
                current_element[0] += c
                return _IN_UNQUOTED

        def between():
            if c == qchar:
                current_element[0] += c
                return _IN_QUOTED
            elif c == delim:
                elements.append(current_element[0])
                current_element[0] = ""
                return _DELIMITED
            else:
                current_element[0] += c
                return _IN_UNQUOTED

        def delimited():
            if c == qchar:
                current_element[0] += c
                return _IN_QUOTED
            elif c == delim:
                if not eat_multiple_delims:
                    elements.append(current_element[0])
                    current_element[0] = ""
                return _DELIMITED
            else:
                current_element[0] += c
                return _IN_UNQUOTED

        # Functions in the execution vector must be ordered identically to the
        # indexes represented by the state constants.
        exec_vector = [
            in_quoted,
            escaped,
            quote_in_quoted,
            in_unquoted,
            between,
            delimited,
        ]
        # Set the starting state.
        state = _BETWEEN
        # Process the line of text.
        for i, c in enumerate(self.text):
            state = exec_vector[state]()
        # Process the end-of-line condition.
        if len(esc_buf[0]) > 0:
            current_element[0] += esc_buf[0]
        if len(current_element[0]) > 0:
            elements.append(current_element[0])
        return elements

    def well_quoted_line(self, delim, qchar):
        # Returns a tuple of boolean, int, and boolean, indicating: 1) whether the line is
        # well-quoted, 2) the number of elements for which the quote character is used,
        # and 3) whether the escape character is used.
        wq = [self._well_quoted(el, qchar) for el in self.items(delim, qchar)]
        return (
            all([b[0] for b in wq]),
            sum([b[1] for b in wq]),
            any([b[2] for b in wq]),
        )

def diagnose_delim(linestream, possible_delimiters=None, possible_quotechars=None):
    # Returns a tuple consisting of the delimiter, quote character, and escape
    # character for quote characters within elements of a line.  All may be None.
    # If the escape character is not None, it will be u"\".
    # Arguments:
    # * linestream: An iterable file-like object with a 'next()' method that returns lines of text
    #  as bytes or unicode.
    # * possible_delimiters: A list of single characters that might be used to separate items on
    #  a line.  If not specified, the default consists of tab, comma, semicolon, and vertical rule.
    #  If a space character is included, multiple space characters will be treated as a single
    #  delimiter--so it's best if there are no missing values on space-delimited lines, though
    #  that is not necessarily a fatal flaw unless there is a very high fraction of missing values.
    # * possible_quotechars: A list of single characters that might be used to quote items on
    #  a line.  If not specified, the default consists of single and double quotes.
    if not possible_delimiters:
        possible_delimiters = ["\t", ",", ";", "|"]
    if not possible_quotechars:
        possible_quotechars = ['"', "'"]
    lines = []
    for i in range(100):
        try:
            ln = linestream.next()
        except StopIteration:
            break
        except:
            raise
        while len(ln) > 0 and ln[-1] in ("\n", "\r"):
            ln = ln[:-1]
        if len(ln) > 0:
            lines.append(CsvLine(ln))
    if len(lines) == 0:
        raise CsvDiagError("CSV diagnosis error: no lines read")
    for ln in lines:
        for d in possible_delimiters:
            ln.count_delim(d)
    # For each delimiter, find the minimum number of delimiters found on any line, and the number of lines
    # with that minimum number
    delim_stats = {}
    for d in possible_delimiters:
        dcounts = [ln.delim_count(d) for ln in lines]
        min_count = min(dcounts)
        delim_stats[d] = (min_count, dcounts.count(min_count))
    # Remove delimiters that were never found.
    for k in delim_stats.keys():
        if delim_stats[k][0] == 0:
            del delim_stats[k]

    def all_well_quoted(delim, qchar):
        # Returns a tuple of boolean, int, and boolean indicating: 1) whether the line is
        # well-quoted, 2) the total number of lines and elements for which the quote character
        # is used, and 3) the escape character used.
        wq = [l.well_quoted_line(delim, qchar) for l in lines]
        return (
            all([b[0] for b in wq]),
            sum([b[1] for b in wq]),
            CsvLine.escchar if any([b[2] for b in wq]) else None,
        )

    def eval_quotes(delim):
        # Returns a tuple of the form to be returned by 'diagnose_delim()'.
        ok_quotes = {}
        for q in possible_quotechars:
            allwq = all_well_quoted(delim, q)
            if allwq[0]:
                ok_quotes[q] = (allwq[1], allwq[2])
        if len(ok_quotes) == 0:
            return (delim, None, None)  # No quotes, no escapechar
        else:
            max_use = max([v[0] for v in ok_quotes.values()])
            if max_use == 0:
                return (delim, None, None)
            # If multiple quote characters have the same usage, return (arbitrarily) the first one.
            for q in ok_quotes.keys():
                if ok_quotes[q][0] == max_use:
                    return (delim, q, ok_quotes[q][1])

    if len(delim_stats) == 0:
        # None of the delimiters were found.  Some other delimiter may apply,
        # or the input may contain a single value on each line.
        # Identify possible quote characters.
        return eval_quotes(None)
    else:
        if len(delim_stats) > 1:
            # If one of them is a space, prefer the non-space
            if " " in delim_stats.keys():
                del delim_stats[" "]
        if len(delim_stats) == 1:
            return eval_quotes(delim_stats.keys()[0])
        # Assign weights to the delimiters.  The weight is the square of the minimum number of delimiters
        # on a line times the number of lines with that delimiter.
        delim_wts = {}
        for d in delim_stats.keys():
            delim_wts[d] = delim_stats[d][0] ** 2 * delim_stats[d][1]
        # Evaluate quote usage for each delimiter, from most heavily weighted to least.
        # Return the first good pair where the quote character is used.
        delim_order = sorted(delim_wts, key=delim_wts.get, reverse=True)
        for d in delim_order:
            quote_check = eval_quotes(d)
            if quote_check[0] and quote_check[1]:
                return quote_check
        # There are no delimiters for which quotes are OK.
        return (delim_order[0], None, None)
    # Should never get here
    raise CsvDiagError("CSV diagnosis error: an untested set of conditions are present")

Import CSV to a Database

class CsvFile(object):
    """CsvFile class automatically opens a file and creates a CSV reader, reads the first row containing column headers, and stores those headers so that they can be used to construct the INSERT statement."""
    def __init__(self, filename):
        self.fn = filename
        self.f = None
        self.open()
        self.rdr = csv.reader(self.f)
        self.headers = next(self.rdr)
    def open(self):
        if self.f is None:
            mode = "rb" if sys.version_info < (3,) else "r"
            self.f = open(self.fn, mode)
    def reader(self):
        return self.rdr
    def close(self):
        self.rdr = None
        self.f.close()
        self.f = None


class Database(object):
    """The Database class and subclasses provide a database connection for each type of DBMS, and a method to construct an INSERT statement for a given CsvFile object, using that DBMS's parameter substitution string.  The conn_info argument is a dictionary containing the host name, user name, and password."""
    def __init__(self, conn_info):
        self.paramstr = '%s'
        self.conn = None
    def insert_sql(self, tablename, csvfile):
        return "insert into %s (%s) values (%s);" % (
                tablename,
                ",".join(csvfile.headers),
                ",".join([self.paramstr] * len(csvfile.headers))
                )


class PgDb(Database):
    def __init__(self, conn_info):
        self.db_type = "p"
        import psycopg2
        self.paramstr = "%s"
        connstr = "host=%(server)s dbname=%(db)s user=%(user)s password=%(pw)s" % conn_info
        self.conn = psycopg2.connect(connstr)

    def postgres_copy(csvfile, db):
        """Postgres COPY command. Fastest implementation"""
        curs = db.conn.cursor()
        rf = open(csvfile.fn, "rt")
        # Read and discard headers
        hdrs = rf.readline()
        copy_cmd = "copy copy_test from stdin with (format csv)"
        curs.copy_expert(copy_cmd, rf)

    def simple_copy(csvfile, db):
        """Row-by-row reading and writing"""
        ins_sql = db.insert_sql("copy_test", csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        for line in rdr:
            curs.execute(ins_sql, clean_line(line))
        db.conn.commit()

    def buffer1_copy(csvfile, db, buflines):
        """Buffered reading and writing"""
        ins_sql = db.insert_sql("copy_test", csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        eof = False
        while True:
            b = []
            for j in range(buflines):
                try:
                    line = next(rdr)
                except StopIteration:
                    eof = True
                else:
                    b.append(clean_line(line))
            if len(b) > 0:
                curs.executemany(ins_sql, b)
            if eof:
                break
        db.conn.commit()