Hide code cell source
%load_ext watermark
import pandas as pd
import numpy as np
from typing import Type, Optional, Callable
from typing import List, Dict, Union, Tuple
from myst_nb import glue

# from review_methods_tests import collect_vitals, find_missing, find_missing_loc_dates
# from review_methods_tests import make_a_summary

import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.colors
from matplotlib.colors import LinearSegmentedColormap, ListedColormap

import setvariables as conf_
import reportclass as r_class

Reporting#

The Report class is used to generate descriptive statistics and identify objects of interest from a query defined by geographic, adminsitrative and/or temporal bounds. These results are considered in the context of topographic data using the LandUse class.

Requests#

A request has five components including the data. A dictionary that contains four key-value pairs, one of which must be column name and value for that column, start and end dates and language choice. The report_data method uses the request to slice the data and load detailed information for survey locations and objects. The results of report_data are passed on to the ReportClass.

# example one
# this will give a report on the canton of bern in french between the specified dates
boundaries = dict(canton='Bern', language='fr', start_date='2015-01-01', end_date="2021-12-31")

# example two
# this will give a report on all lakes
boundaries = dict(feature_type='l', language='fr', start_date='2019-01-01', end_date='2022-01-01')

# the report_data method takes the boundaries and returns returns the top level of the report
# the language and two data frames from the same date range. w_df includes only the surveys
# that meet the criteria in boundaries, w_di includes all the data from the date range.
top_label, language, w_df, w_di = r_class.report_data(boundaries, survey_data.copy(), beaches, codes)

# the language map is included with the module
a_report = r_class.ReportClass(w_df,boundaries, top_label, language, language_map)

Report boundaries#

Once the boundaries have been applied to the survey data the basic report contents can be summarized. Calling a_report.available_features lists the identified geographic/adminstrative boundaries within the report data. If the boundaries are set tocanton='Bern' there are four reporting categories available

# a summary of the different features and boundaries in a report
a_report.available_features

# the number of each type of feature within the selected data
a_report.the_number_of_attributes_in_a_feature('feature_type')
Hide code cell source
# starting data, can be MySQL or NoSQL calls
# the three methods accept Callables, as long
# as the out put is pd.DataFrame
c_l = r_class.language_maps()
surveys = r_class.collect_survey_data_for_report()
codes, beaches, land_cover, land_use, streets, river_intersect_lakes = r_class.collect_env_data_for_report()

survey_data = surveys.merge(beaches['canton'], left_on='slug', right_index=True, validate='many_to_one')

# temporal and geographic boundaries
# user defined input
boundaries = dict(canton='Bern', language='fr', start_date="2015-11-01", end_date="2021-12-31")

# the report_data method takes the boundaries and returns returns the top level of the report
# the language and two data frames from the same date range. w_df includes only the surveys
# that meet the criteria in boundaries, w_di includes all the data from the date range.
top_label, language, w_df, w_di = r_class.report_data(boundaries, survey_data.copy(), beaches, codes)

# the language map is included with the module
a_report = r_class.ReportClass(w_df,boundaries, top_label, 'fr', c_l)

# a summary of the different features and boundaries in a report
a_report.available_features
['parent_boundary', 'feature_type', 'feature_name', 'city']

Note

The available features are column names of the survey data. They represent the different geopraphic or administrative boundaries in the selected report data.

  • parent_boundary is a geographic boundary such as a river basin or a category such as mountains

  • feature_type designates whether the location is at a river, lake or park

  • feature_name is the name of the river, lake or park

Report contents#

Within the canton of Bern there are samples from one park, four lakes and six rivers. The lakes have the most samples, followed by rivers and parks.

# the number and category of samples from the different features in the report
a_report.the_number_of_attributes_in_a_feature('feature_type')
city feature_name samples
r 14 6 96
l 14 4 99
p 1 1 1

Report labels#

The surveys are categorized by parks, lakes or rivers. The name of each one and the municipalities where surveys were conducted can be accesed with Report.feature_labels().

my_labels = a_report.feature_labels()

The subject of the report can be identified by calling a_report.top_label. The first element is a column name in the dataframe and the second element is the value of interest.

a_report.top_label
['canton', 'Bern']

To identify the municipalities associated with a particular feature in the report simply use the feature labels as a key to my_labels.

# collect the labels
my_labels = a_report.feature_labels()
# the lakes and the cities on those lakes
my_labels['l']
{'feature_name': array(['aare', 'bielersee', 'brienzersee', 'thunersee'], dtype=object),
 'city': array(['Kallnach', 'Vinelz', 'Erlach', 'Gals', 'Ligerz', 'Lüscherz',
        'Biel/Bienne', 'Nidau', 'Bönigen', 'Brienz (BE)', 'Spiez', 'Thun',
        'Beatenberg', 'Unterseen'], dtype=object)}
# in the same way the name of the parks and the cities in those parks can be indentified
my_labels['p']
{'feature_name': array(['alpes-bernoises'], dtype=object),
 'city': array(['Grindelwald'], dtype=object)}
# the same for rivers
my_labels['r']
{'feature_name': array(['aare', 'aarenidau-buren-kanal', 'emme', 'langeten', 'schuss',
        'zulg'], dtype=object),
 'city': array(['Muri bei Bern', 'Bern', 'Belp', 'Köniz', 'Walperswil', 'Kallnach',
        'Rubigen', 'Port', 'Brügg', 'Utzenstorf', 'Burgdorf', 'Langenthal',
        'Biel/Bienne', 'Steffisburg'], dtype=object)}

Generating a report for a specific feature#

A detailed report can be generated for any element in the feature labels. The method ReportClass.a_subreport(feature_of_interest='my_feature') accepts the name of the feauture of interest and uses the data from the established report to create a report that contains only the data from the feature of interest.

bielersee = a_report.a_subreport(feature_of_interest='bielersee')
bielersee.feature_labels()
{'l': {'feature_name': array(['bielersee'], dtype=object),
  'city': array(['Vinelz', 'Erlach', 'Gals', 'Ligerz', 'Lüscherz', 'Biel/Bienne',
         'Nidau'], dtype=object)}}

Comparing survey totals between features#

The survey totals are the sum of all the objects found at a survey divided by the length of the survey. Comparing the survey results between features in a report is done by calling ReportClass.summarize_feature_labels(feature='feature of interest') for example to compare the survey totals between cities in Bern call a_report.summarize_feature_labels(feature='city').

a_report.summarize_feature_labels(feature='city')
pcs_m
label Beatenberg Belp Bern Biel/Bienne Brienz (BE) Brügg Burgdorf Bönigen Erlach Gals ... Nidau Port Rubigen Spiez Steffisburg Thun Unterseen Utzenstorf Vinelz Walperswil
25% 1.9875 0.05 0.19 2.69 4.14 1.02 0.87 3.0925 1.83 1.215 ... 2.52 0.8125 3.2 0.3725 0.18 1.24 1.095 0.4125 2.14 0.22
50% 2.425 0.1 0.525 5.06 5.02 1.02 0.87 3.175 1.83 1.28 ... 2.52 1.375 3.2 0.595 0.24 1.39 1.8 0.495 3.38 0.22
75% 2.8625 0.17 1.2675 7.5 5.1 1.02 0.87 3.2575 1.83 1.345 ... 2.52 1.9375 3.2 0.9825 0.545 1.555 2.3025 0.66 5.215 0.22
count 2 11 32 17 3 1 1 2 1 2 ... 1 2 1 26 11 3 12 8 23 1
max 3.3 0.19 5.42 9.68 5.18 1.02 0.87 3.34 1.83 1.41 ... 2.52 2.5 3.2 2.84 3.58 1.72 3.94 1.22 10.47 0.22
mean 2.425 0.111818 1.022187 5.074706 4.486667 1.02 0.87 3.175 1.83 1.28 ... 2.52 1.375 3.2 0.786923 0.634545 1.4 1.888333 0.565 3.770435 0.22
min 1.55 0.02 0.02 0.91 3.26 1.02 0.87 3.01 1.83 1.15 ... 2.52 0.25 3.2 0.16 0.06 1.09 0.54 0.22 1.05 0.22
std 1.237437 0.065087 1.266312 2.911849 1.065332 NaN NaN 0.233345 NaN 0.183848 ... NaN 1.59099 NaN 0.612306 1.010202 0.315119 1.044595 0.30538 2.24379 NaN
total 104 60 1892 3307 696 36 41 277 101 48 ... 63 118 57 838 114 276 1879 41 2033 14

9 rows × 27 columns

Any available feature or category of the report can be called

a_report.summarize_feature_labels(feature='feature_type')

The distribution of survey totals in parks, rivers and lakes for the canton:

pcs_m
label l p r
25% 0.89 2.81 0.1975
50% 1.83 2.81 0.73
75% 3.62 2.81 1.5925
count 99 1 96
max 14.8 2.81 7.92
mean 2.796061 2.81 1.209167
min 0.16 2.81 0.02
std 2.637667 NaN 1.464487
total 9902 169 3688

The subreport for a specific feature works exactly the same way:

bielersee.summarize_feature_labels(feature='city')

The distritbution of survey totals for the cities on bielersee:

pcs_m
label Biel/Bienne Erlach Gals Ligerz Lüscherz Nidau Vinelz
25% 3.725 1.83 1.215 3.7 0.43 2.52 2.14
50% 5.42 1.83 1.28 4.0 0.64 2.52 3.38
75% 7.76 1.83 1.345 9.4 0.84 2.52 5.215
count 15 1 2 3 5 1 23
max 9.68 1.83 1.41 14.8 1.42 2.52 10.47
mean 5.613333 1.83 1.28 7.4 0.746 2.52 3.770435
min 0.91 1.83 1.15 3.4 0.4 2.52 1.05
std 2.654795 NaN 0.183848 6.415606 0.416509 NaN 2.24379
total 3209 101 48 163 202 63 2033

Most common objects#

The most common objects in a report are selected using two criteria:

  • The ranking by quantity ie. top-ten, top-twenty

  • The fail rate: ie. 0.5, 0,6

The defaults are set in the ReportClass, however they can be changed by providing values for the variables mc_criteria_one and mc_criteria_two when the report is created. Once the variables are set the most common objects are displayed with the quantity, percent of total, the median pcs/m and the fail rate for each object in the most common. The weight of the most common objects with respect to the all the objects is also included.

most_common, weight = a_report.most_common
quantity % pcs_m fail rate
G27 2410 0.175158 0.07 0.739796
Gfrags 1870 0.135911 0.01 0.698980
G67 1172 0.085181 0.03 0.586735
G30 750 0.054510 0.04 0.637755
Gfoams 542 0.039392 0.00 0.423469
G145 461 0.033505 0.00 0.153061
G200 362 0.026310 0.00 0.331633
G208 342 0.024856 0.00 0.316327
Gcaps 285 0.020714 0.00 0.408163
G941 261 0.018969 0.00 0.224490
G74 242 0.017588 0.00 0.239796
weight
{'quantity': 8697.0, '%': 0.6320953557671344}

Works the same for a subreport#

The criteria for selecting the most common in a subreport are by default the same as the original report.

b_most_common, b_weight = bielersee.most_common
quantity % pcs_m fail rate
Gfrags 1089 0.187146 0.12 0.98
G27 841 0.144527 0.06 0.80
G67 697 0.119780 0.30 0.92
G30 363 0.062382 0.16 0.84
Gfoams 208 0.035745 0.00 0.64
G200 179 0.030761 0.02 0.56
G941 166 0.028527 0.00 0.46
Gcaps 140 0.024059 0.00 0.66
G25 98 0.016841 0.03 0.66
G89 95 0.016326 0.02 0.56
G940 92 0.015810 0.00 0.24
G177 77 0.013233 0.03 0.62
G904 75 0.012889 0.02 0.52

Changing the criteria of the most common#

Specify the new values for the criteria and call a new report. In the example below the criteria are changed to the top-five objects or those objects that were found in at least 60% of the surveys.

# default arguments that define the most common objects
# this assumes that the columns quantity and fail rate exist
mc_criteria_one = {
        
        'column': 'quantity',
        'val': 5
    }

mc_criteria_two = {
        'column': 'fail rate',
        'val': 0.6
    }
a_report_changed = r_class.ReportClass(w_df,boundaries, top_label, 'fr', c_l, mc_criteria_one=mc_criteria_one, mc_criteria_two=mc_criteria_two)
c_mc, weight_cmc = a_report_changed.most_common
c_mc

Note the only variables that changed are the selection criteria. The new most common objects list for the canton is shorter and accounts for less of the total.

quantity % pcs_m fail rate
G27 2410 0.175158 0.07 0.739796
Gfrags 1870 0.135911 0.01 0.698980
G67 1172 0.085181 0.03 0.586735
G30 750 0.054510 0.04 0.637755
Gfoams 542 0.039392 0.00 0.423469
G145 461 0.033505 0.00 0.153061
weight_cmc
{'quantity': 7205.0, '%': 0.5236572425321608}

Retrieving properties#

There are 318’478 rows in the survey data. We can test the sorting and grouping functions by running a report class on all possible combinations of the features of interest. The test should produce the set of arguments that define the survey locations and surveys that define the boundaries of a report.

some_features = ['feature_type', 'parent_boundary', 'feature_name', 'canton', 'city']

def produce_reports_for_testing(survey_data, some_features):
    reports = {}
    for a_feature in some_features:
        labels = survey_data[a_feature].unique()
        label_reports = {}
        for label in labels:
            start_date = survey_data[survey_data[a_feature] == label]['date'].min()
            end_date = survey_data[survey_data[a_feature] == label]['date'].max()
            
            boundaries = {a_feature:label, 'language':'fr', 'start_date':start_date, 'end_date':end_date}
            top_label, language, w_df, w_di = report_data(boundaries, survey_data.copy())
            a_report = ReportClass(w_df, w_di, boundaries, top_label, 'fr', c_l)
            label_reports.update({label:a_report.features})
        reports.update({a_feature:label_reports})
    return reports
   
t = produce_reports_for_testing(survey_data, some_features)

t['canton']['Valais']

The properties should contain the arguments for cities in the example report

# t['city']['Saint-Gingolph']

Unit tests#

The reportclass module#

The ReportClass takes the following parameters:#

  • w_df (pd.DataFrame, optional): The survey data DataFrame for report generation.

  • boundaries (dict, optional): A dictionary defining the reporting boundaries, including ‘start_date’, end_date’, and ‘language’.

  • top_label (List, optional): A list containing two elements - [label_column, label_value].

  • language (str, optional): The language in which the report is generated.

  • lang_maps (pd.DataFrame, optional): A DataFrame containing language mapping data.

  • mc_criteria_one (dict, optional): The first criteria for identifying objects of interest.

  • mc_criteria_two (dict, optional): The second criteria for identifying objects of interest.

  • ooi (str, optional): The name of the object of interest column.

And has the following methods:#

  • features: Get a list of available features for report generation.

  • available_features: Get a list of available features based on predefined criteria.

  • inventory: Get an inventory of objects with summary statistics.

  • most_common: Find the most common objects based on criteria.

  • summarize_feature_labels: Summarize data for a specific feature.

  • the_number_of_attributes_in_a_feature: Count attributes in a feature.

  • __repr__ : Return a string representation of the ReportClass instance.

Other methods in the reportclass module.#

Note: the methods in bold have been unit tested

from inspect import getmembers, isfunction

functions_list = getmembers(reportclass, isfunction)
[x[0] for x in functions_list]

AGGREGATING, COUNTING, REPORTING:

  • ‘a_cumulative_report’,

  • ‘a_summary_of_one_vector’,

  • ‘aggregate_boundaries’,

  • ‘aggregate_dataframe’,

  • ‘calculate_rate_per_unit’,

  • ‘use_gfrags_gfoams_gcaps’,

  • ‘use_parent_groups_or_gfrags

  • ‘categorize_work_data’,

  • ‘count_objects_with_positive_quantity’,

  • ‘display_tabular_data_by_column_values’,

  • ‘get_top_x_records_with_max_quantity’,

COLLECTING DATA

  • ‘collect_env_data_for_report’,

  • ‘collect_survey_data_for_report’,

  • ‘combine_survey_files’,

  • ‘report_data’,

  • ‘slice_data_by_date’,

PROCESSING DATA WITH REQUEST

  • ‘check_for_top_label’,

  • ‘merge_dataframes_on_column_and_index’,

  • ‘add_column_to_work_data’,

  • ‘add_columns_to_work_data’,

DISPLAY

  • ‘language_maps’,

  • ‘translate_for_display’,

  • ‘capitalize_index’,

  • ‘translate_word’,

  • ‘translated_and_style_for_display’,

  • ‘color_gradient’,

Combining codes using parent groups#

Specific to beach litter data#

  • reportclass.use_parent_groups_or_gfrags

  • reportclass.use_gfrags_gfoams_gcaps

config setting: setvariables.code_result_columns

Defines the set of columns to use when aggregating to the object level

import unittest
class TestUseGfragsGfoamsGcaps(unittest.TestCase):

    def test_use_gfrags_gfoams_gcaps(self):
        # Sample data and code mappings
        data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
                             'sample_id': [1, 2, 1, 2, 1, 1],
                             'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
                             'quantity': [2, 1, 2, 1, 2, 1]})
        codes = pd.DataFrame({'parent_code': ['Gfoams', 'Gfrags', 'Gcaps','Gfoams', 'Gcaps', 'F'],
                              'code': ['A', 'B', 'C', 'D', 'E', 'F']})
        codes.set_index('code', inplace=True)
        # Expected result
        expected_result = pd.DataFrame(
            {'code': {0: 'Gfoams', 1: 'Gfrags', 2: 'Gcaps', 3: 'Gfoams',  4: 'Gcaps',  5: 'F'},
             'sample_id': {0: 1, 1: 2, 2: 1, 3: 2, 4: 1, 5: 1},
             'density': {0: 1.5, 1: 0.5, 2: 1.5, 3: 0.5, 4: 1.5, 5: 1.0},
             'quantity': {0: 2, 1: 1, 2: 2, 3: 1, 4: 2, 5: 1}}
        )

        # Call the function
        updated_data = r_class.use_gfrags_gfoams_gcaps(data, codes)

        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(updated_data, expected_result)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestUseGfragsGfoamsGcaps)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Hide code cell source
import unittest
class TestUseGfragsGfoamsGcaps(unittest.TestCase):

    def test_use_gfrags_gfoams_gcaps(self):
        # Sample data and code mappings
        data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
                             'sample_id': [1, 2, 1, 2, 1, 1],
                             'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
                             'quantity': [2, 1, 2, 1, 2, 1]})
        codes = pd.DataFrame({'parent_code': ['Gfoams', 'Gfrags', 'Gcaps','Gfoams', 'Gcaps', 'F'],
                              'code': ['A', 'B', 'C', 'D', 'E', 'F']})
        codes.set_index('code', inplace=True)
        # Expected result
        expected_result = pd.DataFrame(
            {'code': {0: 'Gfoams', 1: 'Gfrags', 2: 'Gcaps', 3: 'Gfoams',  4: 'Gcaps',  5: 'F'},
             'sample_id': {0: 1, 1: 2, 2: 1, 3: 2, 4: 1, 5: 1},
             'density': {0: 1.5, 1: 0.5, 2: 1.5, 3: 0.5, 4: 1.5, 5: 1.0},
             'quantity': {0: 2, 1: 1, 2: 2, 3: 1, 4: 2, 5: 1}}
        )

        # Call the function
        updated_data = r_class.use_gfrags_gfoams_gcaps(data, codes)

        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(updated_data, expected_result)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestUseGfragsGfoamsGcaps)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_use_gfrags_gfoams_gcaps (__main__.TestUseGfragsGfoamsGcaps) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.006s

OK

Aggregate a dataframe#

  • reportclass.aggregate_dataframe

config settings#

Accepts the following arguments for the methods given the defaults code_result_columns and work_columns:

  • setvariables.agg_groups

  • setvariables.unit_agg

class TestAggregateDataFrame(unittest.TestCase):

    def test_aggregate_dataframe(self):
        # Sample data
        data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
                             'sample_id': [1, 2, 1, 2, 1, 1],
                             'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
                             'quantity': [2, 1, 2, 1, 2, 1],
                             'prop a': ['s1', 's2','s1','s1','s2', 's3'],
                             'prop b': ['x' ,'x', 'z','z','z', 'q']})
        group_by_columns = ['sample_id','prop a']
        aggregation_functions = {'quantity': 'sum','density': 'median'}

        # Expected result
        expected_result = pd.DataFrame({
            'sample_id': {0: 1, 1: 1, 2: 1, 3: 2, 4: 2},
            'prop a': {0: 's1', 1: 's2', 2: 's3', 3: 's1', 4: 's2'},
            'quantity': {0: 4, 1: 2, 2: 1, 3: 1, 4: 1},
            'density': {0: 1.5, 1: 1.5, 2: 1.0, 3: 0.5, 4: 0.5}})

        # Call the function
        result = r_class.aggregate_dataframe(data, groupby_columns=group_by_columns, aggregation_functions=aggregation_functions)

        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected_result)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAggregateDataFrame)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Hide code cell source
class TestAggregateDataFrame(unittest.TestCase):

    def test_aggregate_dataframe(self):
        # Sample data
        data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
                             'sample_id': [1, 2, 1, 2, 1, 1],
                             'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
                             'quantity': [2, 1, 2, 1, 2, 1],
                             'prop a': ['s1', 's2','s1','s1','s2', 's3'],
                             'prop b': ['x' ,'x', 'z','z','z', 'q']})
        group_by_columns = ['sample_id','prop a']
        aggregation_functions = {'quantity': 'sum','density': 'median'}

        # Expected result
        expected_result = pd.DataFrame({
            'sample_id': {0: 1, 1: 1, 2: 1, 3: 2, 4: 2},
            'prop a': {0: 's1', 1: 's2', 2: 's3', 3: 's1', 4: 's2'},
            'quantity': {0: 4, 1: 2, 2: 1, 3: 1, 4: 1},
            'density': {0: 1.5, 1: 1.5, 2: 1.0, 3: 0.5, 4: 0.5}})

        # Call the function
        result = r_class.aggregate_dataframe(data, groupby_columns=group_by_columns, aggregation_functions=aggregation_functions)

        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected_result)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAggregateDataFrame)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_aggregate_dataframe (__main__.TestAggregateDataFrame) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.007s

OK

Calculating rate per unit#

  • reportclass.calculate_rate_per_unit

config settings#

Accepts the following arguments for the methods, the default is setvariables.unit_agg:

  • setvariables.agg_groups

  • setvariables.unit_agg

class TestCalculateRatePerUnit(unittest.TestCase):

    def test_calculate_rate_per_unit(self):
        # Sample data
        data = pd.DataFrame({
            'sample': [1, 2, 3, 4, 5, 6],
            'object': ['A', 'B', 'A', 'A', 'B', 'B'],
            'quantity': [10, 20, 30, 40, 50, 60],
            'pcs_m': [1,1,2,1, 2, 2],
        })

        # Objects to check
        column_of_interest = 'object'
        objects_to_check = ['A', 'B']
        groupby_columns = ['object']
        
        # Aggregation functions
        aggregation_methods = {
            'quantity': 'sum',
            'pcs_m': 'median'
        }
        
        # Expected result
        expected_result = pd.DataFrame({
            
            'pcs_m': {'A': 1.0, 'B': 2.0},
            'quantity': {'A': 80, 'B': 130},
            'label': {'A': 'all', 'B': 'all'}})
        
        # Call the function
        result = r_class.calculate_rate_per_unit(data, objects_to_check,column_of_interest=column_of_interest, groupby_columns=groupby_columns)
        result.index.name = None

        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected_result)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestCalculateRatePerUnit)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)

Hide code cell source
class TestCalculateRatePerUnit(unittest.TestCase):

    def test_calculate_rate_per_unit(self):
        # Sample data
        data = pd.DataFrame({
            'sample': [1, 2, 3, 4, 5, 6],
            'object': ['A', 'B', 'A', 'A', 'B', 'B'],
            'quantity': [10, 20, 30, 40, 50, 60],
            'pcs_m': [1,1,2,1, 2, 2],
        })

        # Objects to check
        column_of_interest = 'object'
        objects_to_check = ['A', 'B']
        groupby_columns = ['object']
        
        # Aggregation functions
        aggregation_methods = {
            'quantity': 'sum',
            'pcs_m': 'median'
        }
        
        # Expected result
        expected_result = pd.DataFrame({
            
            'pcs_m': {'A': 1.0, 'B': 2.0},
            'quantity': {'A': 80, 'B': 130},
            'label': {'A': 'all', 'B': 'all'}})
        
        # Call the function
        result = r_class.calculate_rate_per_unit(data, objects_to_check,column_of_interest=column_of_interest, groupby_columns=groupby_columns)
        result.index.name = None

        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected_result)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestCalculateRatePerUnit)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_calculate_rate_per_unit (__main__.TestCalculateRatePerUnit) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.006s

OK

Aggregate boundaries#

  • reportclass.aggregate_boundaries

Is called by reportclass.a_cumulative_report and calls reportclass.aggregate_dataframe.


class TestAggregateBoundaries(unittest.TestCase):
    
    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021],
            'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
            'sample id' : [1, 1, 2, 2, 3, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
            'Density': [10, 20, 30, 40, 50, 60],
            'Quantity': [1, 1, 1, 1, 1, 1]
        })
        # test data
        self.df = pd.DataFrame(data)
        
        # from user input
        self.feature_name = 'Year'
        self.object_column = 'Objects'
        self.sample_id = 'sample id'

        # from default or user input
        self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
        self.group_methods = {'Quantity': 'sum','Density': 'median'}

        # the feature name, sample id and object columns make up the groupby columns
        # the feauture name is used to mask the different child boundaries
        self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]

        # the labels of the child boundaries are collected
        # using the feature name variable
        self.boundary_labels = self.df[self.feature_name].unique()

        # the boundary columns are used when aggregating the child boundaries
        self.boundary_columns = [self.object_column]


    def test_aggregate_boundaries_without_labels(self):
       
        args = {
            'groupby_columns':self.groupby_columns,
            'unit_agg':self.unit_methods,
            'group_agg': self.group_methods,
            'boundary_labels': None,
            'boundary_columns': self.boundary_columns}

        expected = pd.DataFrame(
            {'Objects': {0: 'X', 1: 'Y'},
             'Quantity': {0: 3, 1: 3},
             'Density': {0: 30.0, 1: 40.0},
             'label': {0: 'all', 1: 'all'}
            })
        
        result = r_class.aggregate_boundaries(self.df, **args)
        
        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected)

    def test_aggregate_boundaries_with_labels(self):
        args = {
            'groupby_columns':self.groupby_columns,
            'unit_agg':self.unit_methods,
            'group_agg': self.group_methods,
            'boundary_labels': self.boundary_labels,
            'boundary_columns': self.boundary_columns}

        expected = pd.DataFrame(
            {'Objects': ['X', 'Y', 'X', 'Y', 'X', 'Y'],
             'Quantity': [1, 1, 1, 1, 1, 1],
             'Density': [10., 20., 30., 40., 60., 50.],
             'label': [2019, 2019, 2020, 2020, 2021, 2021]
            })
        
        result = r_class.aggregate_boundaries(self.df, **args)
        result.reset_index(inplace=True, drop=True)
        
        
        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected)

Hide code cell source
class TestAggregateBoundaries(unittest.TestCase):
    
    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021],
            'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
            'sample id' : [1, 1, 2, 2, 3, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
            'Density': [10, 20, 30, 40, 50, 60],
            'Quantity': [1, 1, 1, 1, 1, 1]
        })
        # test data
        self.df = pd.DataFrame(data)
        
        # from user input
        self.feature_name = 'Year'
        self.object_column = 'Objects'
        self.sample_id = 'sample id'

        # from default or user input
        self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
        self.group_methods = {'Quantity': 'sum','Density': 'median'}

        # the feature name, sample id and object columns make up the groupby columns
        # the feauture name is used to mask the different child boundaries
        self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]

        # the labels of the child boundaries are collected
        # using the feature name variable
        self.boundary_labels = self.df[self.feature_name].unique()

        # the boundary columns are used when aggregating the child boundaries
        self.boundary_columns = [self.object_column]


    def test_aggregate_boundaries_without_labels(self):
       
        args = {
            'groupby_columns':self.groupby_columns,
            'unit_agg':self.unit_methods,
            'group_agg': self.group_methods,
            'boundary_labels': None,
            'boundary_columns': self.boundary_columns}

        expected = pd.DataFrame(
            {'Objects': {0: 'X', 1: 'Y'},
             'Quantity': {0: 3, 1: 3},
             'Density': {0: 30.0, 1: 40.0},
             'label': {0: 'all', 1: 'all'}
            })
        
        result = r_class.aggregate_boundaries(self.df, **args)
        
        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected)

    def test_aggregate_boundaries_with_labels(self):
        args = {
            'groupby_columns':self.groupby_columns,
            'unit_agg':self.unit_methods,
            'group_agg': self.group_methods,
            'boundary_labels': self.boundary_labels,
            'boundary_columns': self.boundary_columns}

        expected = pd.DataFrame(
            {'Objects': ['X', 'Y', 'X', 'Y', 'X', 'Y'],
             'Quantity': [1, 1, 1, 1, 1, 1],
             'Density': [10., 20., 30., 40., 60., 50.],
             'label': [2019, 2019, 2020, 2020, 2021, 2021]
            })
        
        result = r_class.aggregate_boundaries(self.df, **args)
        result.reset_index(inplace=True, drop=True)
        
        
        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAggregateBoundaries)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_aggregate_boundaries_with_labels (__main__.TestAggregateBoundaries) ... ok
test_aggregate_boundaries_without_labels (__main__.TestAggregateBoundaries) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.018s

OK

Cumulative reports#

  • reportclass.a_cumulative_report

Calls reportclass.aggregate_boundaries


class TestAcumulativeReport(unittest.TestCase):
    
    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021],
            'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
            'sample id' : [1, 1, 2, 2, 3, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
            'Density': [10, 20, 30, 40, 50, 60],
            'Quantity': [1, 1, 1, 1, 1, 1]
        })
        # test data
        self.df = pd.DataFrame(data)
        
        # from user input
        self.feature_name = 'Year'
        self.object_column = 'Objects'
        self.sample_id = 'sample id'

        # from default or user input
        self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
        self.group_methods = {'Quantity': 'sum','Density': 'median'}

        # the feature name, sample id and object columns make up the groupby columns
        # the feauture name is used to mask the different child boundaries
        self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]

        # the labels of the child boundaries are collected
        # using the feature name variable
        self.boundary_labels = self.df[self.feature_name].unique()

        # the boundary columns are used when aggregating the child boundaries
        self.boundary_columns = [self.object_column]


    def test_acumulative_df(self):
       
        args = {
            'feature_name': self.feature_name,
            'object_column': self.object_column,
            'sample_id': self.sample_id,
            'unit_agg': self.unit_methods,
            'group_agg':self.group_methods,
            'pivot_values': 'Density'
          }

        expected = pd.DataFrame(
            {2019: {'X': 10.0, 'Y': 20.0},
            2020: {'X': 30.0, 'Y': 40.0},
            2021: {'X': 60.0, 'Y': 50.0},
            'all': {'X': 30.0, 'Y': 40.0}}
        )
        expected.index.name = self.object_column
        expected.columns.name = 'label'        
        
        result = r_class.a_cumulative_report(self.df, **args)
      
        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAcumulativeReport)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Hide code cell source
class TestAcumulativeReport(unittest.TestCase):
    
    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021],
            'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
            'sample id' : [1, 1, 2, 2, 3, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
            'Density': [10, 20, 30, 40, 50, 60],
            'Quantity': [1, 1, 1, 1, 1, 1]
        })
        # test data
        self.df = pd.DataFrame(data)
        
        # from user input
        self.feature_name = 'Year'
        self.object_column = 'Objects'
        self.sample_id = 'sample id'

        # from default or user input
        self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
        self.group_methods = {'Quantity': 'sum','Density': 'median'}

        # the feature name, sample id and object columns make up the groupby columns
        # the feauture name is used to mask the different child boundaries
        self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]

        # the labels of the child boundaries are collected
        # using the feature name variable
        self.boundary_labels = self.df[self.feature_name].unique()

        # the boundary columns are used when aggregating the child boundaries
        self.boundary_columns = [self.object_column]


    def test_acumulative_df(self):
       
        args = {
            'feature_name': self.feature_name,
            'object_column': self.object_column,
            'sample_id': self.sample_id,
            'unit_agg': self.unit_methods,
            'group_agg':self.group_methods,
            'pivot_values': 'Density'
          }

        expected = pd.DataFrame(
            {2019: {'X': 10.0, 'Y': 20.0},
            2020: {'X': 30.0, 'Y': 40.0},
            2021: {'X': 60.0, 'Y': 50.0},
            'all': {'X': 30.0, 'Y': 40.0}}
        )
        expected.index.name = self.object_column
        expected.columns.name = 'label'        
        
        result = r_class.a_cumulative_report(self.df, **args)
      
        # Check if the result matches the expected result
        pd.testing.assert_frame_equal(result, expected)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAcumulativeReport)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_acumulative_df (__main__.TestAcumulativeReport) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.016s

OK

Categorize work data#

  • reportclass.categorize_work_data

Called by reportclass.ReportClass

Generates ReportClass.features

class TestCategorizeWorkData(unittest.TestCase):

    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
            'Use': ['l', 'l', 'p', 'r', 'r', 'l', 'l'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
            'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
            'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
            'sample id' : [1, 1, 2, 2, 3, 4, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
            'Density': [10, 20, 30, 40, 50, 60, 15],
            'Quantity': [1, 1, 1, 1, 1, 1, 2]
        })
        # test data
        self.df = pd.DataFrame(data)

        # these values are set in the conf file. They describe the oreder of
        # aggregation. The lowest reporting unit is last, in this case city
        self.columns_of_interest = ['Region', 'Use',  'Year', 'State', 'City']
        self.sample_id = 'sample id'
        self.labels = ['Use', 'l']
        
       
    def test_categorize_work_data(self):
       
        # Expected output
        expected = {
            'l': {
                'Region': np.array(['A', 'B', 'C'], dtype='object'),
                'State': np.array(['V', 'B', 'C'], dtype= 'object'),
                'City': np.array(['a1', 'a2', 'b2'], dtype='object'),
                'Use': np.array(['l'], dtype= 'object'),
                'Year': np.array([2019, 2021]),
                'samples': np.array([1, 4])
            }}
        # Call the function
        result = r_class.categorize_work_data(self.df, self.labels, self.columns_of_interest, self.sample_id)

        for akey in self.columns_of_interest:
            done = set(expected[self.labels[1]][akey])
            dtwo = set(result[self.labels[1]][akey])
            unittest.TestCase.assertEqual(self, first=done, second=dtwo)
Hide code cell source
class TestCategorizeWorkData(unittest.TestCase):

    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
            'Use': ['l', 'l', 'p', 'r', 'r', 'l', 'l'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
            'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
            'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
            'sample id' : [1, 1, 2, 2, 3, 4, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
            'Density': [10, 20, 30, 40, 50, 60, 15],
            'Quantity': [1, 1, 1, 1, 1, 1, 2]
        })
        # test data
        self.df = pd.DataFrame(data)

        # these values are set in the conf file. They describe the oreder of
        # aggregation. The lowest reporting unit is last, in this case city
        self.columns_of_interest = ['Region', 'Use',  'Year', 'State', 'City']
        self.sample_id = 'sample id'
        self.labels = ['Use', 'l']
        
       
    def test_categorize_work_data(self):
       
        # Expected output
        expected = {
            'l': {
                'Region': np.array(['A', 'B', 'C'], dtype='object'),
                'State': np.array(['V', 'B', 'C'], dtype= 'object'),
                'City': np.array(['a1', 'a2', 'b2'], dtype='object'),
                'Use': np.array(['l'], dtype= 'object'),
                'Year': np.array([2019, 2021]),
                'samples': np.array([1, 4])
            }}
        # Call the function
        result = r_class.categorize_work_data(self.df, self.labels, self.columns_of_interest, self.sample_id)

        for akey in self.columns_of_interest:
            done = set(expected[self.labels[1]][akey])
            dtwo = set(result[self.labels[1]][akey])
            unittest.TestCase.assertEqual(self, first=done, second=dtwo)


test_suite = unittest.TestLoader().loadTestsFromTestCase(TestCategorizeWorkData)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_categorize_work_data (__main__.TestCategorizeWorkData) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.004s

OK

Summarize a vector#

  • a_summary_of_one_vector

Called by reportclass.ReportClass.summarize_feature_labels and calls aggregate_dataframe


class TestASummaryOfOneVector(unittest.TestCase):

    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
            'Use': ['l', 'l', 'p', 'p', 'r', 'l', 'l'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
            'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
            'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
            'Slug': ['l1', 'l5', 'l2', 'l2', 'l3', 'l4', 'l4'],
            'sample id' : [1, 5, 2, 2, 3, 4, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
            'Density': [2, 2, 2, 2, 2, 2, 2],
            'Quantity': [1, 1, 1, 1, 1, 1, 1]
        })

        # test data
        self.df = pd.DataFrame(data)        

        # these values are set in the conf file. They describe the oreder of
        # aggregation. The lowest reporting unit is last, in this case city
        self.columns_of_interest = ['Region', 'Use',  'Year', 'State', 'City']
        self.sample_id = 'sample id'
        self.feature = 'City'
        self.labels = self.df[self.feature].unique()
        self.unit_columns = [self.sample_id, 'location', self.feature]
    
    def test_a_summary_of_one_vector(self):

        # Test the case of one vector
        test_df = self.df[self.df.Use == 'l']
        unit_columns = ['sample id', 'Slug', 'Use']
        unit_agg = {'Density':'median', 'Quantity':'sum'}
        label = 'Lake'
        
        result = r_class.a_summary_of_one_vector(test_df, unit_columns, unit_agg, describe='Density', total_column='Quantity')

        expected = pd.DataFrame({
            'Density': {
                'count': 3,
                'mean': 2.0,
                'std': 0.0,
                'min': 2.0,
                '25%': 2.0,
                '50%': 2.0,
                '75%': 2.0,
                'max': 2.0,
                'total': 4}})
        np.testing.assert_array_equal(expected.to_dict(), result.to_dict())
        unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))        

    def test_summarize_feature_variables_test(self):

        # this can be set in config or called as a
        # keyword variable
        unit_agg = {'Density':'median', 'Quantity':'sum'}

        # summaruze features is a method in the ReportClass
        # that calls a_summary_of_one_vector on collections
        # of vectors
        result = summarize_feature_labels_test(self.df, sample_id=self.sample_id, location='Slug', feature=self.feature, unit_agg=unit_agg, describe_column='Density', total_column='Quantity')
        

        expected = pd.DataFrame({('Density', 'a1'): {
            '25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 1,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 'NaN',
            'total': 1},
            ('Density', 'a2'): {'25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 3,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 0.0,
            'total': 3},
            ('Density', 'b1'): {'25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 1,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 'NaN',
            'total': 2},
            ('Density', 'b2'): {'25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 1,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 'NaN',
            'total': 1}})
        
        np.testing.assert_array_equal(expected.values, result.values)
        unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))

Hide code cell source
def summarize_feature_labels_test(df,
                             feature: str = None,
                             sample_id: str = 'loc_date',
                             location: str = 'slug',
                             describe_column: str = 'pcs_m',
                             unit_agg: dict = conf_.unit_agg,
                             **kwargs):
    
    unit_columns = [sample_id, location, feature]
    labels = df[feature].unique()
    
    x = []
    for the_label in labels:
        d = df[df[feature] == the_label].copy()
        ds = r_class.a_summary_of_one_vector(d.copy(),
                                     unit_columns=unit_columns,
                                     unit_agg=unit_agg,
                                     describe=describe_column,
                                     label=the_label, **kwargs)
        x.append(ds)

    d = pd.concat(x)
    d = d.fillna('NaN')
 
    
    return d.pivot(columns='label')
    
class TestASummaryOfOneVector(unittest.TestCase):

    def setUp(self):
        # Create a sample DataFrame for testing
        data = pd.DataFrame({
            'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
            'Use': ['l', 'l', 'p', 'p', 'r', 'l', 'l'],
            'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
            'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
            'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
            'Slug': ['l1', 'l5', 'l2', 'l2', 'l3', 'l4', 'l4'],
            'sample id' : [1, 5, 2, 2, 3, 4, 4],
            'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
            'Density': [2, 2, 2, 2, 2, 2, 2],
            'Quantity': [1, 1, 1, 1, 1, 1, 1]
        })

        # test data
        self.df = pd.DataFrame(data)
        

        # these values are set in the conf file. They describe the oreder of
        # aggregation. The lowest reporting unit is last, in this case city
        self.columns_of_interest = ['Region', 'Use',  'Year', 'State', 'City']
        self.sample_id = 'sample id'
        self.feature = 'City'
        self.labels = self.df[self.feature].unique()
        self.unit_columns = [self.sample_id, 'location', self.feature]
    
    def test_a_summary_of_one_vector(self):

        # Test the case of one vector
        test_df = self.df[self.df.Use == 'l']
        unit_columns = ['sample id', 'Slug', 'Use']
        unit_agg = {'Density':'median', 'Quantity':'sum'}
        label = 'Lake'
        
        result = r_class.a_summary_of_one_vector(test_df, unit_columns, unit_agg, describe='Density', total_column='Quantity')

        expected = pd.DataFrame({
            'Density': {
                'count': 3,
                'mean': 2.0,
                'std': 0.0,
                'min': 2.0,
                '25%': 2.0,
                '50%': 2.0,
                '75%': 2.0,
                'max': 2.0,
                'total': 4}})
        np.testing.assert_array_equal(expected.to_dict(), result.to_dict())
        unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))
        
      
        

    def test_summarize_feature_variables_test(self):

        # this can be set in config or called as a
        # keyword variable
        unit_agg = {'Density':'median', 'Quantity':'sum'}

        # summaruze features is a method in the ReportClass
        # that calls a_summary_of_one_vector on collections
        # of vectors
        result = summarize_feature_labels_test(self.df, sample_id=self.sample_id, location='Slug', feature=self.feature, unit_agg=unit_agg, describe_column='Density', total_column='Quantity')
        

        expected = pd.DataFrame({('Density', 'a1'): {
            '25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 1,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 'NaN',
            'total': 1},
            ('Density', 'a2'): {'25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 3,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 0.0,
            'total': 3},
            ('Density', 'b1'): {'25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 1,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 'NaN',
            'total': 2},
            ('Density', 'b2'): {'25%': 2.0,
            '50%': 2.0,
            '75%': 2.0,
            'count': 1,
            'max': 2.0,
            'mean': 2.0,
            'min': 2.0,
            'std': 'NaN',
            'total': 1}})
        
        np.testing.assert_array_equal(expected.values, result.values)
        unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))
        
        
      
        

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestASummaryOfOneVector)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_a_summary_of_one_vector (__main__.TestASummaryOfOneVector) ... ok
test_summarize_feature_variables_test (__main__.TestASummaryOfOneVector) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.087s

OK

Collecting records by value#

  • reportclass.get_top_x_records_with_max_quantity

Is called by reportclass.ReportClass.inventory

class TestGetTopXRecordsWithMaxQuantity(unittest.TestCase):

    def test_get_top_x_records_with_max_quantity(self):
        data = {
            'ID': ['A', 'B', 'C', 'D', 'E'],
            'Quantity': [10, 20, 15, 5, 30]
        }
        df = pd.DataFrame(data)

        # the column used to measure
        quantity_column = 'Quantity'
        # the id of the objects being measured
        id_column = 'ID'
        # get the top 3
        x = 3

        # test that
        result = r_class.get_top_x_records_with_max_quantity(df, quantity_column, id_column, x)

        # expected output
        expected_d = {
            'ID': ['E', 'B', 'C'],
            'Quantity': [30, 20, 15],
            '%': [0.46153846153846156, 0.3076923076923077, 0.23076923076923078]
        }
        expected = pd.DataFrame(expected_d)

        # test result against expected
        pd.testing.assert_frame_equal(result.reset_index(drop=True), expected.reset_index(drop=True))
Hide code cell source
class TestGetTopXRecordsWithMaxQuantity(unittest.TestCase):

    def test_get_top_x_records_with_max_quantity(self):
        data = {
            'ID': ['A', 'B', 'C', 'D', 'E'],
            'Quantity': [10, 20, 15, 5, 30]
        }
        df = pd.DataFrame(data)

        # the column used to measure
        quantity_column = 'Quantity'
        # the id of the objects being measured
        id_column = 'ID'
        # get the top 3
        x = 3

        # test that
        result = r_class.get_top_x_records_with_max_quantity(df, quantity_column, id_column, x)

        # expected output
        expected_d = {
            'ID': ['E', 'B', 'C'],
            'Quantity': [30, 20, 15],
            '%': [0.46153846153846156, 0.3076923076923077, 0.23076923076923078]
        }
        expected = pd.DataFrame(expected_d)

        # test result against expected
        pd.testing.assert_frame_equal(result.reset_index(drop=True), expected.reset_index(drop=True))

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestGetTopXRecordsWithMaxQuantity)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_get_top_x_records_with_max_quantity (__main__.TestGetTopXRecordsWithMaxQuantity) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.006s

OK

Counting occurences#

  • reportclass.count_objects_with_positive_quantity

Is called by reportclass.ReportClass.inventory

Hide code cell source
class TestCountObjectsWithPositiveQuantity(unittest.TestCase):

    def test_count_objects_with_positive_quantity(self):
        # Create a sample DataFrame
        data = {
            'sample': ['s1', 's1', 's2', 's2', 's3'],
            'code': ['A', 'B', 'A', 'C', 'B'],
            'quantity': [1, 2, 0, 3, 0]
        }
        df = pd.DataFrame(data)

        # Define the parameters for the function
        value_column = 'quantity'
        object_column = 'code'

        # Call the function
        result = r_class.count_objects_with_positive_quantity(df, value_column, object_column)

        # Define the expected output
        expected_data = {'A': 0.5, 'B': 0.5, 'C': 1.0}
        expected_output = pd.Series(expected_data)

        # Check if the result matches the expected output
        pd.testing.assert_series_equal(result.sort_index(), expected_output.sort_index())

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestGetTopXRecordsWithMaxQuantity)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_get_top_x_records_with_max_quantity (__main__.TestGetTopXRecordsWithMaxQuantity) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.006s

OK

Selecting by two criteria#

  • reportclass.display_tabular_data_by_column_values

Is called by reportclass.ReportClass.most_common

class TestDisplayTabularDataByColumnValues(unittest.TestCase):

    def test_display_tabular_data_by_column_values(self):
        
        data = {
            'index_col': ['i1', 'i2', 'i3', 'i4', 'i5'],
            'column1': [10, 20, 30, 40, 50],
            'column2': [5, 15, 25, 35, 45],
            'quantity': [3, 6, 9, 12, 15]
        }
        df = pd.DataFrame(data)

       
        column_one = {'column': 'quantity', 'val': 2}
        column_two = {'column': 'column2', 'val': 20}
        index = 'index_col'

        
        result = r_class.display_tabular_data_by_column_values(df, column_one, column_two, index)
        
        
        expected_d = {
            'column1': [50, 40, 30],
            'column2': [45, 35, 25],
            'quantity': [15, 12, 9]
        }
        expected = pd.DataFrame(expected_d, index=['i5', 'i4', 'i3'])
        
        
        pd.testing.assert_frame_equal(result, expected)
Hide code cell source
class TestDisplayTabularDataByColumnValues(unittest.TestCase):

    def test_display_tabular_data_by_column_values(self):
        
        data = {
            'index_col': ['i1', 'i2', 'i3', 'i4', 'i5'],
            'column1': [10, 20, 30, 40, 50],
            'column2': [5, 15, 25, 35, 45],
            'quantity': [3, 6, 9, 12, 15]
        }
        df = pd.DataFrame(data)

       
        column_one = {'column': 'quantity', 'val': 2}
        column_two = {'column': 'column2', 'val': 20}
        index = 'index_col'

        
        result = r_class.display_tabular_data_by_column_values(df, column_one, column_two, index)
        
        
        expected_d = {
            'column1': [50, 40, 30],
            'column2': [45, 35, 25],
            'quantity': [15, 12, 9]
        }
        expected = pd.DataFrame(expected_d, index=['i5', 'i4', 'i3'])
        
        
        pd.testing.assert_frame_equal(result, expected)

test_suite = unittest.TestLoader().loadTestsFromTestCase(TestDisplayTabularDataByColumnValues)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_display_tabular_data_by_column_values (__main__.TestDisplayTabularDataByColumnValues) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.004s

OK
data
Region Use Year State City Slug sample id Objects Density Quantity
0 A l 2019 V a1 l1 1 X 2 1
1 A l 2019 B a2 l5 5 Y 2 1
2 B p 2020 V b1 l2 2 X 2 1
3 B p 2020 V b1 l2 2 Y 2 1
4 A r 2021 B a2 l3 3 Y 2 1
5 B l 2021 C b2 l4 4 X 2 1
6 C l 2021 B a2 l4 4 X 2 1
Author: hammerdirt-analyst

conda environment: cantonal_report

pandas    : 2.0.3
matplotlib: 3.7.1
numpy     : 1.25.2