Show code cell source
%load_ext watermark
import pandas as pd
import numpy as np
from typing import Type, Optional, Callable
from typing import List, Dict, Union, Tuple
from myst_nb import glue
# from review_methods_tests import collect_vitals, find_missing, find_missing_loc_dates
# from review_methods_tests import make_a_summary
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.colors
from matplotlib.colors import LinearSegmentedColormap, ListedColormap
import setvariables as conf_
import reportclass as r_class
Reporting#
The Report
class is used to generate descriptive statistics and identify objects of interest from a query defined by geographic, adminsitrative and/or temporal bounds. These results are considered in the context of topographic data using the LandUse
class.
Requests#
A request has five components including the data. A dictionary that contains four key-value pairs, one of which must be column name and value for that column, start and end dates and language choice. The report_data
method uses the request to slice the data and load detailed information for survey locations and objects. The results of report_data
are passed on to the ReportClass
.
# example one
# this will give a report on the canton of bern in french between the specified dates
boundaries = dict(canton='Bern', language='fr', start_date='2015-01-01', end_date="2021-12-31")
# example two
# this will give a report on all lakes
boundaries = dict(feature_type='l', language='fr', start_date='2019-01-01', end_date='2022-01-01')
# the report_data method takes the boundaries and returns returns the top level of the report
# the language and two data frames from the same date range. w_df includes only the surveys
# that meet the criteria in boundaries, w_di includes all the data from the date range.
top_label, language, w_df, w_di = r_class.report_data(boundaries, survey_data.copy(), beaches, codes)
# the language map is included with the module
a_report = r_class.ReportClass(w_df,boundaries, top_label, language, language_map)
Report boundaries#
Once the boundaries have been applied to the survey data the basic report contents can be summarized. Calling a_report.available_features
lists the identified geographic/adminstrative boundaries within the report data. If the boundaries are set tocanton='Bern'
there are four reporting categories available
# a summary of the different features and boundaries in a report
a_report.available_features
# the number of each type of feature within the selected data
a_report.the_number_of_attributes_in_a_feature('feature_type')
Show code cell source
# starting data, can be MySQL or NoSQL calls
# the three methods accept Callables, as long
# as the out put is pd.DataFrame
c_l = r_class.language_maps()
surveys = r_class.collect_survey_data_for_report()
codes, beaches, land_cover, land_use, streets, river_intersect_lakes = r_class.collect_env_data_for_report()
survey_data = surveys.merge(beaches['canton'], left_on='slug', right_index=True, validate='many_to_one')
# temporal and geographic boundaries
# user defined input
boundaries = dict(canton='Bern', language='fr', start_date="2015-11-01", end_date="2021-12-31")
# the report_data method takes the boundaries and returns returns the top level of the report
# the language and two data frames from the same date range. w_df includes only the surveys
# that meet the criteria in boundaries, w_di includes all the data from the date range.
top_label, language, w_df, w_di = r_class.report_data(boundaries, survey_data.copy(), beaches, codes)
# the language map is included with the module
a_report = r_class.ReportClass(w_df,boundaries, top_label, 'fr', c_l)
# a summary of the different features and boundaries in a report
a_report.available_features
['parent_boundary', 'feature_type', 'feature_name', 'city']
Note
The available features are column names of the survey data. They represent the different geopraphic or administrative boundaries in the selected report data.
parent_boundary
is a geographic boundary such as a river basin or a category such as mountainsfeature_type
designates whether the location is at a river, lake or parkfeature_name
is the name of the river, lake or park
Report contents#
Within the canton of Bern there are samples from one park, four lakes and six rivers. The lakes have the most samples, followed by rivers and parks.
# the number and category of samples from the different features in the report
a_report.the_number_of_attributes_in_a_feature('feature_type')
city | feature_name | samples | |
---|---|---|---|
r | 14 | 6 | 96 |
l | 14 | 4 | 99 |
p | 1 | 1 | 1 |
Report labels#
The surveys are categorized by parks, lakes or rivers. The name of each one and the municipalities where surveys were conducted can be accesed with Report.feature_labels()
.
my_labels = a_report.feature_labels()
The subject of the report can be identified by calling a_report.top_label
. The first element is a column name in the dataframe and the second element is the value of interest.
a_report.top_label
['canton', 'Bern']
To identify the municipalities associated with a particular feature in the report simply use the feature labels as a key to my_labels
.
# collect the labels
my_labels = a_report.feature_labels()
# the lakes and the cities on those lakes
my_labels['l']
{'feature_name': array(['aare', 'bielersee', 'brienzersee', 'thunersee'], dtype=object),
'city': array(['Kallnach', 'Vinelz', 'Erlach', 'Gals', 'Ligerz', 'Lüscherz',
'Biel/Bienne', 'Nidau', 'Bönigen', 'Brienz (BE)', 'Spiez', 'Thun',
'Beatenberg', 'Unterseen'], dtype=object)}
# in the same way the name of the parks and the cities in those parks can be indentified
my_labels['p']
{'feature_name': array(['alpes-bernoises'], dtype=object),
'city': array(['Grindelwald'], dtype=object)}
# the same for rivers
my_labels['r']
{'feature_name': array(['aare', 'aarenidau-buren-kanal', 'emme', 'langeten', 'schuss',
'zulg'], dtype=object),
'city': array(['Muri bei Bern', 'Bern', 'Belp', 'Köniz', 'Walperswil', 'Kallnach',
'Rubigen', 'Port', 'Brügg', 'Utzenstorf', 'Burgdorf', 'Langenthal',
'Biel/Bienne', 'Steffisburg'], dtype=object)}
Generating a report for a specific feature#
A detailed report can be generated for any element in the feature labels. The method ReportClass.a_subreport(feature_of_interest='my_feature')
accepts the name of the feauture of interest and uses the data from the established report to create a report that contains only the data from the feature of interest.
bielersee = a_report.a_subreport(feature_of_interest='bielersee')
bielersee.feature_labels()
{'l': {'feature_name': array(['bielersee'], dtype=object),
'city': array(['Vinelz', 'Erlach', 'Gals', 'Ligerz', 'Lüscherz', 'Biel/Bienne',
'Nidau'], dtype=object)}}
Comparing survey totals between features#
The survey totals are the sum of all the objects found at a survey divided by the length of the survey. Comparing the survey results between features in a report is done by calling ReportClass.summarize_feature_labels(feature='feature of interest')
for example to compare the survey totals between cities in Bern call a_report.summarize_feature_labels(feature='city')
.
a_report.summarize_feature_labels(feature='city')
pcs_m | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
label | Beatenberg | Belp | Bern | Biel/Bienne | Brienz (BE) | Brügg | Burgdorf | Bönigen | Erlach | Gals | ... | Nidau | Port | Rubigen | Spiez | Steffisburg | Thun | Unterseen | Utzenstorf | Vinelz | Walperswil |
25% | 1.9875 | 0.05 | 0.19 | 2.69 | 4.14 | 1.02 | 0.87 | 3.0925 | 1.83 | 1.215 | ... | 2.52 | 0.8125 | 3.2 | 0.3725 | 0.18 | 1.24 | 1.095 | 0.4125 | 2.14 | 0.22 |
50% | 2.425 | 0.1 | 0.525 | 5.06 | 5.02 | 1.02 | 0.87 | 3.175 | 1.83 | 1.28 | ... | 2.52 | 1.375 | 3.2 | 0.595 | 0.24 | 1.39 | 1.8 | 0.495 | 3.38 | 0.22 |
75% | 2.8625 | 0.17 | 1.2675 | 7.5 | 5.1 | 1.02 | 0.87 | 3.2575 | 1.83 | 1.345 | ... | 2.52 | 1.9375 | 3.2 | 0.9825 | 0.545 | 1.555 | 2.3025 | 0.66 | 5.215 | 0.22 |
count | 2 | 11 | 32 | 17 | 3 | 1 | 1 | 2 | 1 | 2 | ... | 1 | 2 | 1 | 26 | 11 | 3 | 12 | 8 | 23 | 1 |
max | 3.3 | 0.19 | 5.42 | 9.68 | 5.18 | 1.02 | 0.87 | 3.34 | 1.83 | 1.41 | ... | 2.52 | 2.5 | 3.2 | 2.84 | 3.58 | 1.72 | 3.94 | 1.22 | 10.47 | 0.22 |
mean | 2.425 | 0.111818 | 1.022187 | 5.074706 | 4.486667 | 1.02 | 0.87 | 3.175 | 1.83 | 1.28 | ... | 2.52 | 1.375 | 3.2 | 0.786923 | 0.634545 | 1.4 | 1.888333 | 0.565 | 3.770435 | 0.22 |
min | 1.55 | 0.02 | 0.02 | 0.91 | 3.26 | 1.02 | 0.87 | 3.01 | 1.83 | 1.15 | ... | 2.52 | 0.25 | 3.2 | 0.16 | 0.06 | 1.09 | 0.54 | 0.22 | 1.05 | 0.22 |
std | 1.237437 | 0.065087 | 1.266312 | 2.911849 | 1.065332 | NaN | NaN | 0.233345 | NaN | 0.183848 | ... | NaN | 1.59099 | NaN | 0.612306 | 1.010202 | 0.315119 | 1.044595 | 0.30538 | 2.24379 | NaN |
total | 104 | 60 | 1892 | 3307 | 696 | 36 | 41 | 277 | 101 | 48 | ... | 63 | 118 | 57 | 838 | 114 | 276 | 1879 | 41 | 2033 | 14 |
9 rows × 27 columns
Any available feature or category of the report can be called
a_report.summarize_feature_labels(feature='feature_type')
The distribution of survey totals in parks, rivers and lakes for the canton:
pcs_m | |||
---|---|---|---|
label | l | p | r |
25% | 0.89 | 2.81 | 0.1975 |
50% | 1.83 | 2.81 | 0.73 |
75% | 3.62 | 2.81 | 1.5925 |
count | 99 | 1 | 96 |
max | 14.8 | 2.81 | 7.92 |
mean | 2.796061 | 2.81 | 1.209167 |
min | 0.16 | 2.81 | 0.02 |
std | 2.637667 | NaN | 1.464487 |
total | 9902 | 169 | 3688 |
The subreport for a specific feature works exactly the same way:
bielersee.summarize_feature_labels(feature='city')
The distritbution of survey totals for the cities on bielersee:
pcs_m | |||||||
---|---|---|---|---|---|---|---|
label | Biel/Bienne | Erlach | Gals | Ligerz | Lüscherz | Nidau | Vinelz |
25% | 3.725 | 1.83 | 1.215 | 3.7 | 0.43 | 2.52 | 2.14 |
50% | 5.42 | 1.83 | 1.28 | 4.0 | 0.64 | 2.52 | 3.38 |
75% | 7.76 | 1.83 | 1.345 | 9.4 | 0.84 | 2.52 | 5.215 |
count | 15 | 1 | 2 | 3 | 5 | 1 | 23 |
max | 9.68 | 1.83 | 1.41 | 14.8 | 1.42 | 2.52 | 10.47 |
mean | 5.613333 | 1.83 | 1.28 | 7.4 | 0.746 | 2.52 | 3.770435 |
min | 0.91 | 1.83 | 1.15 | 3.4 | 0.4 | 2.52 | 1.05 |
std | 2.654795 | NaN | 0.183848 | 6.415606 | 0.416509 | NaN | 2.24379 |
total | 3209 | 101 | 48 | 163 | 202 | 63 | 2033 |
Most common objects#
The most common objects in a report are selected using two criteria:
The ranking by quantity ie. top-ten, top-twenty
The fail rate: ie. 0.5, 0,6
The defaults are set in the ReportClass
, however they can be changed by providing values for the variables mc_criteria_one
and mc_criteria_two
when the report is created. Once the variables are set the most common objects are displayed with the quantity, percent of total, the median pcs/m and the fail rate for each object in the most common. The weight of the most common objects with respect to the all the objects is also included.
most_common, weight = a_report.most_common
quantity | % | pcs_m | fail rate | |
---|---|---|---|---|
G27 | 2410 | 0.175158 | 0.07 | 0.739796 |
Gfrags | 1870 | 0.135911 | 0.01 | 0.698980 |
G67 | 1172 | 0.085181 | 0.03 | 0.586735 |
G30 | 750 | 0.054510 | 0.04 | 0.637755 |
Gfoams | 542 | 0.039392 | 0.00 | 0.423469 |
G145 | 461 | 0.033505 | 0.00 | 0.153061 |
G200 | 362 | 0.026310 | 0.00 | 0.331633 |
G208 | 342 | 0.024856 | 0.00 | 0.316327 |
Gcaps | 285 | 0.020714 | 0.00 | 0.408163 |
G941 | 261 | 0.018969 | 0.00 | 0.224490 |
G74 | 242 | 0.017588 | 0.00 | 0.239796 |
weight
{'quantity': 8697.0, '%': 0.6320953557671344}
Works the same for a subreport#
The criteria for selecting the most common in a subreport are by default the same as the original report.
b_most_common, b_weight = bielersee.most_common
quantity | % | pcs_m | fail rate | |
---|---|---|---|---|
Gfrags | 1089 | 0.187146 | 0.12 | 0.98 |
G27 | 841 | 0.144527 | 0.06 | 0.80 |
G67 | 697 | 0.119780 | 0.30 | 0.92 |
G30 | 363 | 0.062382 | 0.16 | 0.84 |
Gfoams | 208 | 0.035745 | 0.00 | 0.64 |
G200 | 179 | 0.030761 | 0.02 | 0.56 |
G941 | 166 | 0.028527 | 0.00 | 0.46 |
Gcaps | 140 | 0.024059 | 0.00 | 0.66 |
G25 | 98 | 0.016841 | 0.03 | 0.66 |
G89 | 95 | 0.016326 | 0.02 | 0.56 |
G940 | 92 | 0.015810 | 0.00 | 0.24 |
G177 | 77 | 0.013233 | 0.03 | 0.62 |
G904 | 75 | 0.012889 | 0.02 | 0.52 |
Changing the criteria of the most common#
Specify the new values for the criteria and call a new report. In the example below the criteria are changed to the top-five objects or those objects that were found in at least 60% of the surveys.
# default arguments that define the most common objects
# this assumes that the columns quantity and fail rate exist
mc_criteria_one = {
'column': 'quantity',
'val': 5
}
mc_criteria_two = {
'column': 'fail rate',
'val': 0.6
}
a_report_changed = r_class.ReportClass(w_df,boundaries, top_label, 'fr', c_l, mc_criteria_one=mc_criteria_one, mc_criteria_two=mc_criteria_two)
c_mc, weight_cmc = a_report_changed.most_common
c_mc
Note the only variables that changed are the selection criteria. The new most common objects list for the canton is shorter and accounts for less of the total.
quantity | % | pcs_m | fail rate | |
---|---|---|---|---|
G27 | 2410 | 0.175158 | 0.07 | 0.739796 |
Gfrags | 1870 | 0.135911 | 0.01 | 0.698980 |
G67 | 1172 | 0.085181 | 0.03 | 0.586735 |
G30 | 750 | 0.054510 | 0.04 | 0.637755 |
Gfoams | 542 | 0.039392 | 0.00 | 0.423469 |
G145 | 461 | 0.033505 | 0.00 | 0.153061 |
weight_cmc
{'quantity': 7205.0, '%': 0.5236572425321608}
Retrieving properties#
There are 318’478 rows in the survey data. We can test the sorting and grouping functions by running a report class on all possible combinations of the features of interest. The test should produce the set of arguments that define the survey locations and surveys that define the boundaries of a report.
some_features = ['feature_type', 'parent_boundary', 'feature_name', 'canton', 'city']
def produce_reports_for_testing(survey_data, some_features):
reports = {}
for a_feature in some_features:
labels = survey_data[a_feature].unique()
label_reports = {}
for label in labels:
start_date = survey_data[survey_data[a_feature] == label]['date'].min()
end_date = survey_data[survey_data[a_feature] == label]['date'].max()
boundaries = {a_feature:label, 'language':'fr', 'start_date':start_date, 'end_date':end_date}
top_label, language, w_df, w_di = report_data(boundaries, survey_data.copy())
a_report = ReportClass(w_df, w_di, boundaries, top_label, 'fr', c_l)
label_reports.update({label:a_report.features})
reports.update({a_feature:label_reports})
return reports
t = produce_reports_for_testing(survey_data, some_features)
t['canton']['Valais']
The properties should contain the arguments for cities in the example report
# t['city']['Saint-Gingolph']
Unit tests#
The reportclass module#
The ReportClass
takes the following parameters:#
w_df (pd.DataFrame, optional): The survey data DataFrame for report generation.
boundaries (dict, optional): A dictionary defining the reporting boundaries, including ‘start_date’, end_date’, and ‘language’.
top_label (List, optional): A list containing two elements - [label_column, label_value].
language (str, optional): The language in which the report is generated.
lang_maps (pd.DataFrame, optional): A DataFrame containing language mapping data.
mc_criteria_one (dict, optional): The first criteria for identifying objects of interest.
mc_criteria_two (dict, optional): The second criteria for identifying objects of interest.
ooi (str, optional): The name of the object of interest column.
And has the following methods:#
features: Get a list of available features for report generation.
available_features: Get a list of available features based on predefined criteria.
inventory: Get an inventory of objects with summary statistics.
most_common: Find the most common objects based on criteria.
summarize_feature_labels: Summarize data for a specific feature.
the_number_of_attributes_in_a_feature: Count attributes in a feature.
__repr__
: Return a string representation of the ReportClass instance.
Other methods in the reportclass
module.#
Note: the methods in bold have been unit tested
from inspect import getmembers, isfunction
functions_list = getmembers(reportclass, isfunction)
[x[0] for x in functions_list]
AGGREGATING, COUNTING, REPORTING:
‘a_cumulative_report’,
‘a_summary_of_one_vector’,
‘aggregate_boundaries’,
‘aggregate_dataframe’,
‘calculate_rate_per_unit’,
‘use_gfrags_gfoams_gcaps’,
‘use_parent_groups_or_gfrags’
‘categorize_work_data’,
‘count_objects_with_positive_quantity’,
‘display_tabular_data_by_column_values’,
‘get_top_x_records_with_max_quantity’,
COLLECTING DATA
‘collect_env_data_for_report’,
‘collect_survey_data_for_report’,
‘combine_survey_files’,
‘report_data’,
‘slice_data_by_date’,
PROCESSING DATA WITH REQUEST
‘check_for_top_label’,
‘merge_dataframes_on_column_and_index’,
‘add_column_to_work_data’,
‘add_columns_to_work_data’,
DISPLAY
‘language_maps’,
‘translate_for_display’,
‘capitalize_index’,
‘translate_word’,
‘translated_and_style_for_display’,
‘color_gradient’,
Combining codes using parent groups#
Specific to beach litter data#
reportclass.use_parent_groups_or_gfrags
reportclass.use_gfrags_gfoams_gcaps
config setting: setvariables.code_result_columns
Defines the set of columns to use when aggregating to the object level
import unittest
class TestUseGfragsGfoamsGcaps(unittest.TestCase):
def test_use_gfrags_gfoams_gcaps(self):
# Sample data and code mappings
data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
'sample_id': [1, 2, 1, 2, 1, 1],
'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
'quantity': [2, 1, 2, 1, 2, 1]})
codes = pd.DataFrame({'parent_code': ['Gfoams', 'Gfrags', 'Gcaps','Gfoams', 'Gcaps', 'F'],
'code': ['A', 'B', 'C', 'D', 'E', 'F']})
codes.set_index('code', inplace=True)
# Expected result
expected_result = pd.DataFrame(
{'code': {0: 'Gfoams', 1: 'Gfrags', 2: 'Gcaps', 3: 'Gfoams', 4: 'Gcaps', 5: 'F'},
'sample_id': {0: 1, 1: 2, 2: 1, 3: 2, 4: 1, 5: 1},
'density': {0: 1.5, 1: 0.5, 2: 1.5, 3: 0.5, 4: 1.5, 5: 1.0},
'quantity': {0: 2, 1: 1, 2: 2, 3: 1, 4: 2, 5: 1}}
)
# Call the function
updated_data = r_class.use_gfrags_gfoams_gcaps(data, codes)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(updated_data, expected_result)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestUseGfragsGfoamsGcaps)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Show code cell source
import unittest
class TestUseGfragsGfoamsGcaps(unittest.TestCase):
def test_use_gfrags_gfoams_gcaps(self):
# Sample data and code mappings
data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
'sample_id': [1, 2, 1, 2, 1, 1],
'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
'quantity': [2, 1, 2, 1, 2, 1]})
codes = pd.DataFrame({'parent_code': ['Gfoams', 'Gfrags', 'Gcaps','Gfoams', 'Gcaps', 'F'],
'code': ['A', 'B', 'C', 'D', 'E', 'F']})
codes.set_index('code', inplace=True)
# Expected result
expected_result = pd.DataFrame(
{'code': {0: 'Gfoams', 1: 'Gfrags', 2: 'Gcaps', 3: 'Gfoams', 4: 'Gcaps', 5: 'F'},
'sample_id': {0: 1, 1: 2, 2: 1, 3: 2, 4: 1, 5: 1},
'density': {0: 1.5, 1: 0.5, 2: 1.5, 3: 0.5, 4: 1.5, 5: 1.0},
'quantity': {0: 2, 1: 1, 2: 2, 3: 1, 4: 2, 5: 1}}
)
# Call the function
updated_data = r_class.use_gfrags_gfoams_gcaps(data, codes)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(updated_data, expected_result)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestUseGfragsGfoamsGcaps)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_use_gfrags_gfoams_gcaps (__main__.TestUseGfragsGfoamsGcaps) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK
Aggregate a dataframe#
reportclass.aggregate_dataframe
config settings#
Accepts the following arguments for the methods given the defaults code_result_columns
and work_columns
:
setvariables.agg_groups
setvariables.unit_agg
class TestAggregateDataFrame(unittest.TestCase):
def test_aggregate_dataframe(self):
# Sample data
data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
'sample_id': [1, 2, 1, 2, 1, 1],
'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
'quantity': [2, 1, 2, 1, 2, 1],
'prop a': ['s1', 's2','s1','s1','s2', 's3'],
'prop b': ['x' ,'x', 'z','z','z', 'q']})
group_by_columns = ['sample_id','prop a']
aggregation_functions = {'quantity': 'sum','density': 'median'}
# Expected result
expected_result = pd.DataFrame({
'sample_id': {0: 1, 1: 1, 2: 1, 3: 2, 4: 2},
'prop a': {0: 's1', 1: 's2', 2: 's3', 3: 's1', 4: 's2'},
'quantity': {0: 4, 1: 2, 2: 1, 3: 1, 4: 1},
'density': {0: 1.5, 1: 1.5, 2: 1.0, 3: 0.5, 4: 0.5}})
# Call the function
result = r_class.aggregate_dataframe(data, groupby_columns=group_by_columns, aggregation_functions=aggregation_functions)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected_result)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAggregateDataFrame)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Show code cell source
class TestAggregateDataFrame(unittest.TestCase):
def test_aggregate_dataframe(self):
# Sample data
data = pd.DataFrame({'code': ['A', 'B', 'C', 'D', 'E', 'F'],
'sample_id': [1, 2, 1, 2, 1, 1],
'density': [1.5, 0.5, 1.5, 0.5, 1.5, 1],
'quantity': [2, 1, 2, 1, 2, 1],
'prop a': ['s1', 's2','s1','s1','s2', 's3'],
'prop b': ['x' ,'x', 'z','z','z', 'q']})
group_by_columns = ['sample_id','prop a']
aggregation_functions = {'quantity': 'sum','density': 'median'}
# Expected result
expected_result = pd.DataFrame({
'sample_id': {0: 1, 1: 1, 2: 1, 3: 2, 4: 2},
'prop a': {0: 's1', 1: 's2', 2: 's3', 3: 's1', 4: 's2'},
'quantity': {0: 4, 1: 2, 2: 1, 3: 1, 4: 1},
'density': {0: 1.5, 1: 1.5, 2: 1.0, 3: 0.5, 4: 0.5}})
# Call the function
result = r_class.aggregate_dataframe(data, groupby_columns=group_by_columns, aggregation_functions=aggregation_functions)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected_result)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAggregateDataFrame)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_aggregate_dataframe (__main__.TestAggregateDataFrame) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.007s
OK
Calculating rate per unit#
reportclass.calculate_rate_per_unit
config settings#
Accepts the following arguments for the methods, the default is setvariables.unit_agg
:
setvariables.agg_groups
setvariables.unit_agg
class TestCalculateRatePerUnit(unittest.TestCase):
def test_calculate_rate_per_unit(self):
# Sample data
data = pd.DataFrame({
'sample': [1, 2, 3, 4, 5, 6],
'object': ['A', 'B', 'A', 'A', 'B', 'B'],
'quantity': [10, 20, 30, 40, 50, 60],
'pcs_m': [1,1,2,1, 2, 2],
})
# Objects to check
column_of_interest = 'object'
objects_to_check = ['A', 'B']
groupby_columns = ['object']
# Aggregation functions
aggregation_methods = {
'quantity': 'sum',
'pcs_m': 'median'
}
# Expected result
expected_result = pd.DataFrame({
'pcs_m': {'A': 1.0, 'B': 2.0},
'quantity': {'A': 80, 'B': 130},
'label': {'A': 'all', 'B': 'all'}})
# Call the function
result = r_class.calculate_rate_per_unit(data, objects_to_check,column_of_interest=column_of_interest, groupby_columns=groupby_columns)
result.index.name = None
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected_result)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestCalculateRatePerUnit)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Show code cell source
class TestCalculateRatePerUnit(unittest.TestCase):
def test_calculate_rate_per_unit(self):
# Sample data
data = pd.DataFrame({
'sample': [1, 2, 3, 4, 5, 6],
'object': ['A', 'B', 'A', 'A', 'B', 'B'],
'quantity': [10, 20, 30, 40, 50, 60],
'pcs_m': [1,1,2,1, 2, 2],
})
# Objects to check
column_of_interest = 'object'
objects_to_check = ['A', 'B']
groupby_columns = ['object']
# Aggregation functions
aggregation_methods = {
'quantity': 'sum',
'pcs_m': 'median'
}
# Expected result
expected_result = pd.DataFrame({
'pcs_m': {'A': 1.0, 'B': 2.0},
'quantity': {'A': 80, 'B': 130},
'label': {'A': 'all', 'B': 'all'}})
# Call the function
result = r_class.calculate_rate_per_unit(data, objects_to_check,column_of_interest=column_of_interest, groupby_columns=groupby_columns)
result.index.name = None
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected_result)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestCalculateRatePerUnit)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_calculate_rate_per_unit (__main__.TestCalculateRatePerUnit) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK
Aggregate boundaries#
reportclass.aggregate_boundaries
Is called by reportclass.a_cumulative_report
and calls reportclass.aggregate_dataframe
.
class TestAggregateBoundaries(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021],
'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
'sample id' : [1, 1, 2, 2, 3, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
'Density': [10, 20, 30, 40, 50, 60],
'Quantity': [1, 1, 1, 1, 1, 1]
})
# test data
self.df = pd.DataFrame(data)
# from user input
self.feature_name = 'Year'
self.object_column = 'Objects'
self.sample_id = 'sample id'
# from default or user input
self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
self.group_methods = {'Quantity': 'sum','Density': 'median'}
# the feature name, sample id and object columns make up the groupby columns
# the feauture name is used to mask the different child boundaries
self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]
# the labels of the child boundaries are collected
# using the feature name variable
self.boundary_labels = self.df[self.feature_name].unique()
# the boundary columns are used when aggregating the child boundaries
self.boundary_columns = [self.object_column]
def test_aggregate_boundaries_without_labels(self):
args = {
'groupby_columns':self.groupby_columns,
'unit_agg':self.unit_methods,
'group_agg': self.group_methods,
'boundary_labels': None,
'boundary_columns': self.boundary_columns}
expected = pd.DataFrame(
{'Objects': {0: 'X', 1: 'Y'},
'Quantity': {0: 3, 1: 3},
'Density': {0: 30.0, 1: 40.0},
'label': {0: 'all', 1: 'all'}
})
result = r_class.aggregate_boundaries(self.df, **args)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected)
def test_aggregate_boundaries_with_labels(self):
args = {
'groupby_columns':self.groupby_columns,
'unit_agg':self.unit_methods,
'group_agg': self.group_methods,
'boundary_labels': self.boundary_labels,
'boundary_columns': self.boundary_columns}
expected = pd.DataFrame(
{'Objects': ['X', 'Y', 'X', 'Y', 'X', 'Y'],
'Quantity': [1, 1, 1, 1, 1, 1],
'Density': [10., 20., 30., 40., 60., 50.],
'label': [2019, 2019, 2020, 2020, 2021, 2021]
})
result = r_class.aggregate_boundaries(self.df, **args)
result.reset_index(inplace=True, drop=True)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected)
Show code cell source
class TestAggregateBoundaries(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021],
'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
'sample id' : [1, 1, 2, 2, 3, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
'Density': [10, 20, 30, 40, 50, 60],
'Quantity': [1, 1, 1, 1, 1, 1]
})
# test data
self.df = pd.DataFrame(data)
# from user input
self.feature_name = 'Year'
self.object_column = 'Objects'
self.sample_id = 'sample id'
# from default or user input
self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
self.group_methods = {'Quantity': 'sum','Density': 'median'}
# the feature name, sample id and object columns make up the groupby columns
# the feauture name is used to mask the different child boundaries
self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]
# the labels of the child boundaries are collected
# using the feature name variable
self.boundary_labels = self.df[self.feature_name].unique()
# the boundary columns are used when aggregating the child boundaries
self.boundary_columns = [self.object_column]
def test_aggregate_boundaries_without_labels(self):
args = {
'groupby_columns':self.groupby_columns,
'unit_agg':self.unit_methods,
'group_agg': self.group_methods,
'boundary_labels': None,
'boundary_columns': self.boundary_columns}
expected = pd.DataFrame(
{'Objects': {0: 'X', 1: 'Y'},
'Quantity': {0: 3, 1: 3},
'Density': {0: 30.0, 1: 40.0},
'label': {0: 'all', 1: 'all'}
})
result = r_class.aggregate_boundaries(self.df, **args)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected)
def test_aggregate_boundaries_with_labels(self):
args = {
'groupby_columns':self.groupby_columns,
'unit_agg':self.unit_methods,
'group_agg': self.group_methods,
'boundary_labels': self.boundary_labels,
'boundary_columns': self.boundary_columns}
expected = pd.DataFrame(
{'Objects': ['X', 'Y', 'X', 'Y', 'X', 'Y'],
'Quantity': [1, 1, 1, 1, 1, 1],
'Density': [10., 20., 30., 40., 60., 50.],
'label': [2019, 2019, 2020, 2020, 2021, 2021]
})
result = r_class.aggregate_boundaries(self.df, **args)
result.reset_index(inplace=True, drop=True)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAggregateBoundaries)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_aggregate_boundaries_with_labels (__main__.TestAggregateBoundaries) ... ok
test_aggregate_boundaries_without_labels (__main__.TestAggregateBoundaries) ... ok
----------------------------------------------------------------------
Ran 2 tests in 0.018s
OK
Cumulative reports#
reportclass.a_cumulative_report
Calls reportclass.aggregate_boundaries
class TestAcumulativeReport(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021],
'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
'sample id' : [1, 1, 2, 2, 3, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
'Density': [10, 20, 30, 40, 50, 60],
'Quantity': [1, 1, 1, 1, 1, 1]
})
# test data
self.df = pd.DataFrame(data)
# from user input
self.feature_name = 'Year'
self.object_column = 'Objects'
self.sample_id = 'sample id'
# from default or user input
self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
self.group_methods = {'Quantity': 'sum','Density': 'median'}
# the feature name, sample id and object columns make up the groupby columns
# the feauture name is used to mask the different child boundaries
self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]
# the labels of the child boundaries are collected
# using the feature name variable
self.boundary_labels = self.df[self.feature_name].unique()
# the boundary columns are used when aggregating the child boundaries
self.boundary_columns = [self.object_column]
def test_acumulative_df(self):
args = {
'feature_name': self.feature_name,
'object_column': self.object_column,
'sample_id': self.sample_id,
'unit_agg': self.unit_methods,
'group_agg':self.group_methods,
'pivot_values': 'Density'
}
expected = pd.DataFrame(
{2019: {'X': 10.0, 'Y': 20.0},
2020: {'X': 30.0, 'Y': 40.0},
2021: {'X': 60.0, 'Y': 50.0},
'all': {'X': 30.0, 'Y': 40.0}}
)
expected.index.name = self.object_column
expected.columns.name = 'label'
result = r_class.a_cumulative_report(self.df, **args)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAcumulativeReport)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
Show code cell source
class TestAcumulativeReport(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021],
'city': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2'],
'sample id' : [1, 1, 2, 2, 3, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X'],
'Density': [10, 20, 30, 40, 50, 60],
'Quantity': [1, 1, 1, 1, 1, 1]
})
# test data
self.df = pd.DataFrame(data)
# from user input
self.feature_name = 'Year'
self.object_column = 'Objects'
self.sample_id = 'sample id'
# from default or user input
self.unit_methods = {'Density': 'sum', 'Quantity': 'count'}
self.group_methods = {'Quantity': 'sum','Density': 'median'}
# the feature name, sample id and object columns make up the groupby columns
# the feauture name is used to mask the different child boundaries
self.groupby_columns=[self.feature_name, self.sample_id, self.object_column]
# the labels of the child boundaries are collected
# using the feature name variable
self.boundary_labels = self.df[self.feature_name].unique()
# the boundary columns are used when aggregating the child boundaries
self.boundary_columns = [self.object_column]
def test_acumulative_df(self):
args = {
'feature_name': self.feature_name,
'object_column': self.object_column,
'sample_id': self.sample_id,
'unit_agg': self.unit_methods,
'group_agg':self.group_methods,
'pivot_values': 'Density'
}
expected = pd.DataFrame(
{2019: {'X': 10.0, 'Y': 20.0},
2020: {'X': 30.0, 'Y': 40.0},
2021: {'X': 60.0, 'Y': 50.0},
'all': {'X': 30.0, 'Y': 40.0}}
)
expected.index.name = self.object_column
expected.columns.name = 'label'
result = r_class.a_cumulative_report(self.df, **args)
# Check if the result matches the expected result
pd.testing.assert_frame_equal(result, expected)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestAcumulativeReport)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_acumulative_df (__main__.TestAcumulativeReport) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.016s
OK
Categorize work data#
reportclass.categorize_work_data
Called by reportclass.ReportClass
Generates
ReportClass.features
class TestCategorizeWorkData(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
'Use': ['l', 'l', 'p', 'r', 'r', 'l', 'l'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
'sample id' : [1, 1, 2, 2, 3, 4, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
'Density': [10, 20, 30, 40, 50, 60, 15],
'Quantity': [1, 1, 1, 1, 1, 1, 2]
})
# test data
self.df = pd.DataFrame(data)
# these values are set in the conf file. They describe the oreder of
# aggregation. The lowest reporting unit is last, in this case city
self.columns_of_interest = ['Region', 'Use', 'Year', 'State', 'City']
self.sample_id = 'sample id'
self.labels = ['Use', 'l']
def test_categorize_work_data(self):
# Expected output
expected = {
'l': {
'Region': np.array(['A', 'B', 'C'], dtype='object'),
'State': np.array(['V', 'B', 'C'], dtype= 'object'),
'City': np.array(['a1', 'a2', 'b2'], dtype='object'),
'Use': np.array(['l'], dtype= 'object'),
'Year': np.array([2019, 2021]),
'samples': np.array([1, 4])
}}
# Call the function
result = r_class.categorize_work_data(self.df, self.labels, self.columns_of_interest, self.sample_id)
for akey in self.columns_of_interest:
done = set(expected[self.labels[1]][akey])
dtwo = set(result[self.labels[1]][akey])
unittest.TestCase.assertEqual(self, first=done, second=dtwo)
Show code cell source
class TestCategorizeWorkData(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
'Use': ['l', 'l', 'p', 'r', 'r', 'l', 'l'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
'sample id' : [1, 1, 2, 2, 3, 4, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
'Density': [10, 20, 30, 40, 50, 60, 15],
'Quantity': [1, 1, 1, 1, 1, 1, 2]
})
# test data
self.df = pd.DataFrame(data)
# these values are set in the conf file. They describe the oreder of
# aggregation. The lowest reporting unit is last, in this case city
self.columns_of_interest = ['Region', 'Use', 'Year', 'State', 'City']
self.sample_id = 'sample id'
self.labels = ['Use', 'l']
def test_categorize_work_data(self):
# Expected output
expected = {
'l': {
'Region': np.array(['A', 'B', 'C'], dtype='object'),
'State': np.array(['V', 'B', 'C'], dtype= 'object'),
'City': np.array(['a1', 'a2', 'b2'], dtype='object'),
'Use': np.array(['l'], dtype= 'object'),
'Year': np.array([2019, 2021]),
'samples': np.array([1, 4])
}}
# Call the function
result = r_class.categorize_work_data(self.df, self.labels, self.columns_of_interest, self.sample_id)
for akey in self.columns_of_interest:
done = set(expected[self.labels[1]][akey])
dtwo = set(result[self.labels[1]][akey])
unittest.TestCase.assertEqual(self, first=done, second=dtwo)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestCategorizeWorkData)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_categorize_work_data (__main__.TestCategorizeWorkData) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.004s
OK
Summarize a vector#
a_summary_of_one_vector
Called by reportclass.ReportClass.summarize_feature_labels
and calls aggregate_dataframe
class TestASummaryOfOneVector(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
'Use': ['l', 'l', 'p', 'p', 'r', 'l', 'l'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
'Slug': ['l1', 'l5', 'l2', 'l2', 'l3', 'l4', 'l4'],
'sample id' : [1, 5, 2, 2, 3, 4, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
'Density': [2, 2, 2, 2, 2, 2, 2],
'Quantity': [1, 1, 1, 1, 1, 1, 1]
})
# test data
self.df = pd.DataFrame(data)
# these values are set in the conf file. They describe the oreder of
# aggregation. The lowest reporting unit is last, in this case city
self.columns_of_interest = ['Region', 'Use', 'Year', 'State', 'City']
self.sample_id = 'sample id'
self.feature = 'City'
self.labels = self.df[self.feature].unique()
self.unit_columns = [self.sample_id, 'location', self.feature]
def test_a_summary_of_one_vector(self):
# Test the case of one vector
test_df = self.df[self.df.Use == 'l']
unit_columns = ['sample id', 'Slug', 'Use']
unit_agg = {'Density':'median', 'Quantity':'sum'}
label = 'Lake'
result = r_class.a_summary_of_one_vector(test_df, unit_columns, unit_agg, describe='Density', total_column='Quantity')
expected = pd.DataFrame({
'Density': {
'count': 3,
'mean': 2.0,
'std': 0.0,
'min': 2.0,
'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'max': 2.0,
'total': 4}})
np.testing.assert_array_equal(expected.to_dict(), result.to_dict())
unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))
def test_summarize_feature_variables_test(self):
# this can be set in config or called as a
# keyword variable
unit_agg = {'Density':'median', 'Quantity':'sum'}
# summaruze features is a method in the ReportClass
# that calls a_summary_of_one_vector on collections
# of vectors
result = summarize_feature_labels_test(self.df, sample_id=self.sample_id, location='Slug', feature=self.feature, unit_agg=unit_agg, describe_column='Density', total_column='Quantity')
expected = pd.DataFrame({('Density', 'a1'): {
'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 1,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 'NaN',
'total': 1},
('Density', 'a2'): {'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 3,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 0.0,
'total': 3},
('Density', 'b1'): {'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 1,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 'NaN',
'total': 2},
('Density', 'b2'): {'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 1,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 'NaN',
'total': 1}})
np.testing.assert_array_equal(expected.values, result.values)
unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))
Show code cell source
def summarize_feature_labels_test(df,
feature: str = None,
sample_id: str = 'loc_date',
location: str = 'slug',
describe_column: str = 'pcs_m',
unit_agg: dict = conf_.unit_agg,
**kwargs):
unit_columns = [sample_id, location, feature]
labels = df[feature].unique()
x = []
for the_label in labels:
d = df[df[feature] == the_label].copy()
ds = r_class.a_summary_of_one_vector(d.copy(),
unit_columns=unit_columns,
unit_agg=unit_agg,
describe=describe_column,
label=the_label, **kwargs)
x.append(ds)
d = pd.concat(x)
d = d.fillna('NaN')
return d.pivot(columns='label')
class TestASummaryOfOneVector(unittest.TestCase):
def setUp(self):
# Create a sample DataFrame for testing
data = pd.DataFrame({
'Region': ['A', 'A', 'B', 'B', 'A', 'B','C'],
'Use': ['l', 'l', 'p', 'p', 'r', 'l', 'l'],
'Year': [2019, 2019, 2020, 2020, 2021, 2021, 2021],
'State': ['V', 'B', 'V', 'V', 'B', 'C', 'B'],
'City': ['a1', 'a2', 'b1', 'b1', 'a2', 'b2', 'a2'],
'Slug': ['l1', 'l5', 'l2', 'l2', 'l3', 'l4', 'l4'],
'sample id' : [1, 5, 2, 2, 3, 4, 4],
'Objects': ['X', 'Y', 'X', 'Y', 'Y', 'X', 'X'],
'Density': [2, 2, 2, 2, 2, 2, 2],
'Quantity': [1, 1, 1, 1, 1, 1, 1]
})
# test data
self.df = pd.DataFrame(data)
# these values are set in the conf file. They describe the oreder of
# aggregation. The lowest reporting unit is last, in this case city
self.columns_of_interest = ['Region', 'Use', 'Year', 'State', 'City']
self.sample_id = 'sample id'
self.feature = 'City'
self.labels = self.df[self.feature].unique()
self.unit_columns = [self.sample_id, 'location', self.feature]
def test_a_summary_of_one_vector(self):
# Test the case of one vector
test_df = self.df[self.df.Use == 'l']
unit_columns = ['sample id', 'Slug', 'Use']
unit_agg = {'Density':'median', 'Quantity':'sum'}
label = 'Lake'
result = r_class.a_summary_of_one_vector(test_df, unit_columns, unit_agg, describe='Density', total_column='Quantity')
expected = pd.DataFrame({
'Density': {
'count': 3,
'mean': 2.0,
'std': 0.0,
'min': 2.0,
'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'max': 2.0,
'total': 4}})
np.testing.assert_array_equal(expected.to_dict(), result.to_dict())
unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))
def test_summarize_feature_variables_test(self):
# this can be set in config or called as a
# keyword variable
unit_agg = {'Density':'median', 'Quantity':'sum'}
# summaruze features is a method in the ReportClass
# that calls a_summary_of_one_vector on collections
# of vectors
result = summarize_feature_labels_test(self.df, sample_id=self.sample_id, location='Slug', feature=self.feature, unit_agg=unit_agg, describe_column='Density', total_column='Quantity')
expected = pd.DataFrame({('Density', 'a1'): {
'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 1,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 'NaN',
'total': 1},
('Density', 'a2'): {'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 3,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 0.0,
'total': 3},
('Density', 'b1'): {'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 1,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 'NaN',
'total': 2},
('Density', 'b2'): {'25%': 2.0,
'50%': 2.0,
'75%': 2.0,
'count': 1,
'max': 2.0,
'mean': 2.0,
'min': 2.0,
'std': 'NaN',
'total': 1}})
np.testing.assert_array_equal(expected.values, result.values)
unittest.TestCase.assertEqual(self, set(expected.index), set(result.index))
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestASummaryOfOneVector)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_a_summary_of_one_vector (__main__.TestASummaryOfOneVector) ... ok
test_summarize_feature_variables_test (__main__.TestASummaryOfOneVector) ... ok
----------------------------------------------------------------------
Ran 2 tests in 0.087s
OK
Collecting records by value#
reportclass.get_top_x_records_with_max_quantity
Is called by reportclass.ReportClass.inventory
class TestGetTopXRecordsWithMaxQuantity(unittest.TestCase):
def test_get_top_x_records_with_max_quantity(self):
data = {
'ID': ['A', 'B', 'C', 'D', 'E'],
'Quantity': [10, 20, 15, 5, 30]
}
df = pd.DataFrame(data)
# the column used to measure
quantity_column = 'Quantity'
# the id of the objects being measured
id_column = 'ID'
# get the top 3
x = 3
# test that
result = r_class.get_top_x_records_with_max_quantity(df, quantity_column, id_column, x)
# expected output
expected_d = {
'ID': ['E', 'B', 'C'],
'Quantity': [30, 20, 15],
'%': [0.46153846153846156, 0.3076923076923077, 0.23076923076923078]
}
expected = pd.DataFrame(expected_d)
# test result against expected
pd.testing.assert_frame_equal(result.reset_index(drop=True), expected.reset_index(drop=True))
Show code cell source
class TestGetTopXRecordsWithMaxQuantity(unittest.TestCase):
def test_get_top_x_records_with_max_quantity(self):
data = {
'ID': ['A', 'B', 'C', 'D', 'E'],
'Quantity': [10, 20, 15, 5, 30]
}
df = pd.DataFrame(data)
# the column used to measure
quantity_column = 'Quantity'
# the id of the objects being measured
id_column = 'ID'
# get the top 3
x = 3
# test that
result = r_class.get_top_x_records_with_max_quantity(df, quantity_column, id_column, x)
# expected output
expected_d = {
'ID': ['E', 'B', 'C'],
'Quantity': [30, 20, 15],
'%': [0.46153846153846156, 0.3076923076923077, 0.23076923076923078]
}
expected = pd.DataFrame(expected_d)
# test result against expected
pd.testing.assert_frame_equal(result.reset_index(drop=True), expected.reset_index(drop=True))
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestGetTopXRecordsWithMaxQuantity)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_get_top_x_records_with_max_quantity (__main__.TestGetTopXRecordsWithMaxQuantity) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK
Counting occurences#
reportclass.count_objects_with_positive_quantity
Is called by reportclass.ReportClass.inventory
Show code cell source
class TestCountObjectsWithPositiveQuantity(unittest.TestCase):
def test_count_objects_with_positive_quantity(self):
# Create a sample DataFrame
data = {
'sample': ['s1', 's1', 's2', 's2', 's3'],
'code': ['A', 'B', 'A', 'C', 'B'],
'quantity': [1, 2, 0, 3, 0]
}
df = pd.DataFrame(data)
# Define the parameters for the function
value_column = 'quantity'
object_column = 'code'
# Call the function
result = r_class.count_objects_with_positive_quantity(df, value_column, object_column)
# Define the expected output
expected_data = {'A': 0.5, 'B': 0.5, 'C': 1.0}
expected_output = pd.Series(expected_data)
# Check if the result matches the expected output
pd.testing.assert_series_equal(result.sort_index(), expected_output.sort_index())
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestGetTopXRecordsWithMaxQuantity)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_get_top_x_records_with_max_quantity (__main__.TestGetTopXRecordsWithMaxQuantity) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK
Selecting by two criteria#
reportclass.display_tabular_data_by_column_values
Is called by reportclass.ReportClass.most_common
class TestDisplayTabularDataByColumnValues(unittest.TestCase):
def test_display_tabular_data_by_column_values(self):
data = {
'index_col': ['i1', 'i2', 'i3', 'i4', 'i5'],
'column1': [10, 20, 30, 40, 50],
'column2': [5, 15, 25, 35, 45],
'quantity': [3, 6, 9, 12, 15]
}
df = pd.DataFrame(data)
column_one = {'column': 'quantity', 'val': 2}
column_two = {'column': 'column2', 'val': 20}
index = 'index_col'
result = r_class.display_tabular_data_by_column_values(df, column_one, column_two, index)
expected_d = {
'column1': [50, 40, 30],
'column2': [45, 35, 25],
'quantity': [15, 12, 9]
}
expected = pd.DataFrame(expected_d, index=['i5', 'i4', 'i3'])
pd.testing.assert_frame_equal(result, expected)
Show code cell source
class TestDisplayTabularDataByColumnValues(unittest.TestCase):
def test_display_tabular_data_by_column_values(self):
data = {
'index_col': ['i1', 'i2', 'i3', 'i4', 'i5'],
'column1': [10, 20, 30, 40, 50],
'column2': [5, 15, 25, 35, 45],
'quantity': [3, 6, 9, 12, 15]
}
df = pd.DataFrame(data)
column_one = {'column': 'quantity', 'val': 2}
column_two = {'column': 'column2', 'val': 20}
index = 'index_col'
result = r_class.display_tabular_data_by_column_values(df, column_one, column_two, index)
expected_d = {
'column1': [50, 40, 30],
'column2': [45, 35, 25],
'quantity': [15, 12, 9]
}
expected = pd.DataFrame(expected_d, index=['i5', 'i4', 'i3'])
pd.testing.assert_frame_equal(result, expected)
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestDisplayTabularDataByColumnValues)
test_runner = unittest.TextTestRunner(verbosity=3)
test_result = test_runner.run(test_suite)
test_display_tabular_data_by_column_values (__main__.TestDisplayTabularDataByColumnValues) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.004s
OK
data
Region | Use | Year | State | City | Slug | sample id | Objects | Density | Quantity | |
---|---|---|---|---|---|---|---|---|---|---|
0 | A | l | 2019 | V | a1 | l1 | 1 | X | 2 | 1 |
1 | A | l | 2019 | B | a2 | l5 | 5 | Y | 2 | 1 |
2 | B | p | 2020 | V | b1 | l2 | 2 | X | 2 | 1 |
3 | B | p | 2020 | V | b1 | l2 | 2 | Y | 2 | 1 |
4 | A | r | 2021 | B | a2 | l3 | 3 | Y | 2 | 1 |
5 | B | l | 2021 | C | b2 | l4 | 4 | X | 2 | 1 |
6 | C | l | 2021 | B | a2 | l4 | 4 | X | 2 | 1 |
Author: hammerdirt-analyst
conda environment: cantonal_report
pandas : 2.0.3
matplotlib: 3.7.1
numpy : 1.25.2