1- import  dask .dataframe  as  dd 
1+ """Module providing functions for processing and wrangling data.""" 
2+ 
23from  datetime  import  datetime 
4+ from  pathlib  import  Path 
5+ 
6+ import  dask .dataframe  as  dd 
37import  numpy  as  np 
48import  pandas  as  pd 
5- from  pathlib  import  Path 
69
710from  .config  import  Config 
811
9- def  format_outname (prefix : str , se : bool , weekday :bool ):
10-     ''' 
12+ 
13+ def  format_outname (prefix : str , se : bool , weekday : bool ):
14+     """ 
15+     Write out results. 
1116
1217    Parameters 
1318    ---------- 
14-     prefix 
15-     se 
16-     weekday 
19+     prefix: 
20+     se: boolean to write out standard errors, if true, use an obfuscated name 
21+     weekday: boolean for weekday adjustments. 
22+              signals will be generated with weekday adjustments (True) or without 
23+                 adjustments (False) 
1724
1825    Returns 
1926    ------- 
20- 
21-     ''' 
22-     # write out results 
27+     outname str 
28+     """ 
2329    out_name  =  "smoothed_adj_cli"  if  weekday  else  "smoothed_cli" 
2430    if  se :
2531        assert  prefix  is  not None , "template has no obfuscated prefix" 
2632        out_name  =  prefix  +  "_"  +  out_name 
2733    return  out_name 
2834
35+ 
2936def  format_df (df : pd .DataFrame , geo_id : str , se : bool , logger ):
30-     ''' 
31-     format dataframe and checks for anomalies to write results 
37+     """ 
38+     Format dataframe and checks for anomalies to write results. 
39+ 
3240    Parameters 
3341    ---------- 
3442    df: dataframe from output from update_sensor 
@@ -39,9 +47,9 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
3947    Returns 
4048    ------- 
4149    filtered and formatted dataframe 
42-     '''  
50+     """  
4351    # report in percentage 
44-     df [' val' =  df [' val' *  100 
52+     df [" val" =  df [" val" *  100 
4553    df ["se" ] =  df ["se" ] *  100 
4654
4755    val_isnull  =  df ["val" ].isnull ()
@@ -50,23 +58,23 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
5058        logger .info ("sensor value is nan, check pipeline" )
5159    df  =  df [~ val_isnull ]
5260
53-     se_too_high  =  df ['se' ] >=  5 
61+     se_too_high  =  df ["se" ] >=  5 
5462    df_se_too_high  =  df [se_too_high ]
5563    if  len (df_se_too_high ) >  0 :
5664        logger .info (f"standard error suspiciously high! investigate { geo_id }  )
5765    df  =  df [~ se_too_high ]
5866
59-     sensor_too_high  =  df [' val' >=  90 
67+     sensor_too_high  =  df [" val" >=  90 
6068    df_sensor_too_high  =  df [sensor_too_high ]
6169    if  len (df_sensor_too_high ) >  0 :
6270        logger .info (f"standard error suspiciously high! investigate { geo_id }  )
6371    df  =  df [~ sensor_too_high ]
6472
6573    if  se :
66-         valid_cond  =  (df ['se' ] >  0 ) &  (df [' val' >  0 )
74+         valid_cond  =  (df ["se" ] >  0 ) &  (df [" val" >  0 )
6775        invalid_df  =  df [~ valid_cond ]
6876        if  len (invalid_df ) >  0 :
69-             logger .info (f "p=0, std_err=0 invalid"
77+             logger .info ("p=0, std_err=0 invalid" )
7078        df  =  df [valid_cond ]
7179    else :
7280        df ["se" ] =  np .NAN 
@@ -75,8 +83,10 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
7583    df ["sample_size" ] =  np .NAN 
7684    return  df 
7785
78- def  write_to_csv (output_df : pd .DataFrame , prefix : str , geo_id : str , weekday : bool , se :bool , logger , output_path = "." ):
79-     """Write sensor values to csv. 
86+ 
87+ def  write_to_csv (output_df : pd .DataFrame , prefix : str , geo_id : str , weekday : bool , se : bool , logger , output_path = "." ):
88+     """ 
89+     Write sensor values to csv. 
8090
8191    Args: 
8292      output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id 
@@ -91,24 +101,21 @@ def write_to_csv(output_df: pd.DataFrame, prefix: str, geo_id: str, weekday: boo
91101    if  se :
92102        logger .info (f"========= WARNING: WRITING SEs TO { out_name }  )
93103
94-     dates  =  set (list (output_df [' date' 
95-     grouped  =  filtered_df .groupby (' date' 
104+     dates  =  set (list (output_df [" date" 
105+     grouped  =  filtered_df .groupby (" date" 
96106    for  d  in  dates :
97-         filename  =  "%s/%s_%s_%s.csv"  %  (output_path ,
98-                                         (d  +  Config .DAY_SHIFT ).strftime ("%Y%m%d" ),
99-                                         geo_id ,
100-                                         out_name )
107+         filename  =  "%s/%s_%s_%s.csv"  %  (output_path , (d  +  Config .DAY_SHIFT ).strftime ("%Y%m%d" ), geo_id , out_name )
101108        single_date_df  =  grouped .get_group (d )
102-         single_date_df  =  single_date_df .drop (columns = [' date' 
109+         single_date_df  =  single_date_df .drop (columns = [" date" 
103110        single_date_df .to_csv (filename , index = False , na_rep = "NA" )
104111
105112        logger .debug (f"wrote { len (single_date_df )} { geo_id }  )
106113
107114
108115def  csv_to_df (filepath : str , startdate : datetime , enddate : datetime , dropdate : datetime , logger ) ->  pd .DataFrame :
109-     ''' 
110-     Reads  csv using Dask and  filters out based on date range and currently unused column,  
111-     then converts back into pandas dataframe. 
116+     """ 
117+     Read  csv using Dask,  filters unneeded data, then converts back into pandas dataframe.  
118+ 
112119    Parameters 
113120    ---------- 
114121      filepath: path to the aggregated doctor-visits data 
@@ -117,7 +124,7 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
117124      dropdate: data drop date (YYYY-mm-dd) 
118125
119126    ------- 
120-     '''  
127+     """  
121128    filepath  =  Path (filepath )
122129    logger .info (f"Processing { filepath }  )
123130
@@ -142,7 +149,7 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
142149    assert  startdate  <  enddate , "Start date >= end date" 
143150    assert  enddate  <=  dropdate , "End date > drop date" 
144151
145-     date_filter  =  (( ddata [Config .DATE_COL ] >=  Config .FIRST_DATA_DATE ) &  (ddata [Config .DATE_COL ] <  dropdate ) )
152+     date_filter  =  (ddata [Config .DATE_COL ] >=  Config .FIRST_DATA_DATE ) &  (ddata [Config .DATE_COL ] <  dropdate )
146153
147154    df  =  ddata [date_filter ].compute ()
148155
0 commit comments