-
Notifications
You must be signed in to change notification settings - Fork 0
/
sip_functions.py
3611 lines (3312 loc) · 157 KB
/
sip_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 27 11:35:45 2020
@author: nigo0024
"""
from ast import literal_eval
from copy import deepcopy
import fnmatch
import itertools as it
import math
import numpy as np
import os
import geopandas as gpd
import pandas as pd
import pathlib
import sys
import time
from hs_process import batch
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.cross_decomposition import PLSRegression
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import StratifiedKFold
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import PowerTransformer
from sklearn.compose import TransformedTargetRegressor
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_squared_log_error
from sklearn.metrics import r2_score
from scipy.stats import rankdata
import warnings
from sklearn.utils._testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
# Plotting
from sklearn.linear_model import LinearRegression
from matplotlib import pyplot as plt
import seaborn as sns
import matplotlib.lines as mlines
import matplotlib.patches as mpatches
from matplotlib.ticker import MaxNLocator
import matplotlib.ticker
from matplotlib.ticker import FormatStrFormatter
from datetime import datetime
import subprocess
import globus_sdk
from extended_text_box import BoxStyle
from extended_text_box import ExtendedTextBox
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
# In[File management functions]
def hs_grid_search(hs_settings, msi_run_id, dir_out=None):
'''
Reads ``hs_settings`` and returns a dataframe with all the necessary
information to execute each specific processing scenario. This enables
searching over any number of image processsing scenarios.
Folder name will be the index of df_grid for each set of outputs, so
df_grid must be referenced to know which folder corresponds to which
scenario.
'''
df_grid = pd.DataFrame(columns=hs_settings.keys())
keys = hs_settings.keys()
values = (hs_settings[key] for key in keys)
combinations = [dict(zip(keys, combination)) for combination in it.product(*values)]
for i in combinations:
data = []
for col in df_grid.columns:
data.append(i[col])
df_temp = pd.DataFrame(data=[data], columns=df_grid.columns)
df_grid = df_grid.append(df_temp)
df_grid = df_grid.reset_index(drop=True)
# if csv is True:
if dir_out is not None and os.path.isdir(dir_out):
df_grid.to_csv(os.path.join(dir_out, 'msi_' + str(msi_run_id) + '_hs_settings.csv'), index=True)
return df_grid
def get_idx_grid(dir_results_msi, msi_run_id, idx_min=0):
'''
Finds the index of the processing scenario based on files written to disk
The problem I have, is that after 10 loops, I am running into a
memoryerror. I am not sure why this is, but one thing I can try is to
restart the Python instance and begin the script from the beginning after
every main loop. However, I must determine which processing scenario I
am currently on based on the files written to disk.
Parameters:
dir_results_msi: directory to search
msi_run_id:
start: The minimum idx_grid to return (e.g., if start=100, then
idx_grid will be forced to be at least 100; it will be higher if
other folders already exist and processing as been performed)
'''
# onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]
folders_all = [f for f in os.listdir(dir_results_msi) if os.path.isdir(os.path.join(dir_results_msi, f))]
folders_out = []
idx_list = []
str_match = 'msi_' + str(msi_run_id) + '_' # eligible folders must have this in their name
for f in folders_all:
if str_match in f:
idx_grid1 = int(f.replace(str_match, ''))
if idx_grid1 >= idx_min:
idx_list.append(idx_grid1)
folders_out.append(f)
if len(idx_list) == 0:
return idx_min
for idx_grid2 in range(idx_min, max(idx_list)+2):
if idx_grid2 not in idx_list: break
# idx_dir = os.path.join(dir_results_msi, str_match + str(idx_grid2))
# if not os.path.isdir(idx_dir):
# os.mkdir(idx_dir)
return idx_grid2
def grid_n_levels(df_grid):
n_clip = len(df_grid['clip'].unique())
n_smooth = len(df_grid['smooth'].unique())
n_bin = len(df_grid['bin'].unique())
n_segment = len(df_grid['segment'].unique())
return n_clip, n_smooth, n_bin, n_segment
def clean_df_grid(df_grid):
# [(x, y) for x in [1,2,3] for y in [3,1,4] if x != y]
# for proc_step, in ['clip', 'smooth', 'bin', 'segment', 'feature'] and i in range(10):
# print(proc_step, i)
scenarios = [(idx, row_n, proc_step) for idx, row_n in df_grid.iterrows()
for proc_step in ['clip', 'smooth', 'bin', 'segment']]
for idx, row_n, proc_step in scenarios:
try:
df_grid.loc[idx][proc_step] = literal_eval(row_n[proc_step])
except ValueError:
pass
return df_grid
# try:
# df_grid.loc[idx]['smooth'] = literal_eval(row_n['smooth'])
# except ValueError:
# pass
# try:
# df_grid.loc[idx]['bin'] = literal_eval(row_n['smooth'])
# except ValueError:
# pass
# try:
# df_grid.loc[idx]['segment'] = literal_eval(row_n['segment'])
# except ValueError:
# pass
def recurs_dir(base_dir, search_ext='.bip', level=None):
'''
Searches all folders and subfolders recursively within <base_dir>
for filetypes of <search_exp>.
Returns sorted <outFiles>, a list of full path strings of each result.
Parameters:
base_dir: directory path that should include files to be returned
search_ext: file format/extension to search for in all directories
and subdirectories
level: how many levels to search; if None, searches all levels
Returns:
out_files: include the full pathname, filename, and ext of all
files that have ``search_exp`` in their name.
'''
if level is None:
level = 1
else:
level -= 1
d_str = os.listdir(base_dir)
out_files = []
for item in d_str:
full_path = os.path.join(base_dir, item)
if not os.path.isdir(full_path) and item.endswith(search_ext):
out_files.append(full_path)
elif os.path.isdir(full_path) and level >= 0:
new_dir = full_path # If dir, then search in that
out_files_temp = recurs_dir(new_dir, search_ext)
if out_files_temp: # if list is not empty
out_files.extend(out_files_temp) # add items
return sorted(out_files)
def retrieve_files(data_dir, panel_type, crop_type):
'''
Gets all the necessary files to be used in this scenario.
Returns:
df_crop (``pandas.DataFrame``): a dataframe containing all the cropping
instructions for all the input datacubes
'''
fname_crop_info = os.path.join(data_dir, crop_type + '.csv')
df_crop = pd.read_csv(fname_crop_info)
df_crop['date'] = pd.to_datetime(df_crop['date'])
df_crop['directory'] = df_crop.apply(lambda row : os.path.join(
row['directory'], panel_type), axis = 1)
gdf_wells_fname = os.path.join(data_dir, 'plot_bounds_wells.geojson')
gdf_aerf_fname = os.path.join(data_dir, 'aerf_whole_field_sample_locs.geojson')
gdf_wells = gpd.read_file(gdf_wells_fname)
gdf_aerf = gpd.read_file(gdf_aerf_fname)
return df_crop, gdf_wells, gdf_aerf
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
def unique_datacubes(df_crop):
'''
Returns a list of unique datacubes from df_crop
'''
fname_list = []
for idx, row in df_crop.iterrows():
fname = os.path.join(row['directory'], row['name_short'] + row['name_long'] + row['ext'])
if fname not in fname_list:
fname_list.append(fname)
df_files = pd.DataFrame(columns=['filename', 'size'])
for fname in fname_list:
if os.path.isfile(fname):
data = [fname, os.path.getsize(fname)]
df_temp = pd.DataFrame(data=[data], columns=df_files.columns)
df_files = df_files.append(df_temp)
print('Total number of datacubes to crop: {0}'.format(len(df_files)))
print('Total file size: {0}\n'.format(convert_size(df_files['size'].sum())))
def check_processing(dir_out, ext='.bip', n_files=833):
'''
Checks directory; if it doesn't exist, it is created; if processing is
finished, True is returned to indicate this step can be skipped.
'''
if not os.path.isdir(dir_out):
pathlib.Path(dir_out).mkdir(parents=True, exist_ok=True)
# try:
# os.mkdir(dir_out)
# except FileNotFoundError:
# os.mkdir(os.path.split(dir_out)[0])
# os.mkdir(dir_out)
skip = False
elif len(fnmatch.filter(os.listdir(dir_out), '*' + ext)) >= n_files:
skip = True # directory exists and contains many files; clipping was already done
else:
skip = False # directory exists, but clipping was not completed
return skip
def get_msi_segment_dir(row, level='segment'):
'''
Gets the msi directory equivalent of the directory where the .spec and
other files are located following segmentation. These will be transfered
to 2nd tier storage after testing/plotting.
Parameters:
row (``pd.Series``):
'''
panel_type = row['dir_panels']
crop_type = row['crop']
clip_type, _ = get_clip_type(row)
smooth_type, _, _ = get_smooth_type(row)
bin_type, _, _, _ = get_bin_type(row)
segment_type, _, _, _, _, _, _ = get_segment_type(row)
if level == 'segment':
msi_seg_dir = '/'.join((panel_type, crop_type, clip_type, smooth_type, bin_type, segment_type))
elif level == 'bin':
msi_seg_dir = '/'.join((panel_type, crop_type, clip_type, smooth_type, bin_type))
elif level == 'smooth':
msi_seg_dir = '/'.join((panel_type, crop_type, clip_type, smooth_type))
elif level == 'clip':
msi_seg_dir = '/'.join((panel_type, crop_type, clip_type))
elif level == 'crop':
msi_seg_dir = '/'.join((panel_type, crop_type))
return msi_seg_dir
def tier2_data_transfer(dir_base, row):
'''
Actually transfers the data to MSI 2nd tier storage
I think this may take quite some time, so should always be used in a
parallelized way
Parameters:
row (``pd.Series``):
'''
# msi_dir = 'results/ref_closest_panel/crop_plot/clip_none/smooth_none/seg_mcari2_50_upper'
msi_seg_dir = get_msi_segment_dir(row)
tier2_dir = os.path.join('S3://', msi_seg_dir)
subprocess.call(['s3cmd', 'put', '-r', dir_base + mis_seg_dir2, tier2_dir])
subprocess.call(['rm', '-r', dir_base + mis_seg_dir2])
def tier2_results_transfer(msi_result_dir, globus_client_id='684eb60a-9c5e-48af-929d-0880fd829173'):
'''
Transfers results from msi_0_000 folder from high performance storage
to 2nd tier storage
'''
tier2_dir = os.path.join('S3://results', msi_result_dir)
subprocess.call(['s3cmd', 'put', '-r', dir_base + msi_result_dir, tier2_dir])
subprocess.call(['rm', '-r', dir_base + msi_result_dir])
def get_globus_data_dir(dir_base, msi_run_id, row,
msi_base='/home/yangc1/public',
level='segment'):
'''
Gets the data directory to transfer all files
Parameters:
level (``str``): The data directory level to transfer; must
be one of ['segment', 'bin', 'smooth', 'clip', 'crop'].
'''
msi_seg_dir = get_msi_segment_dir(row, level=level)
dest_base_dir = os.path.basename(dir_base) + '_msi_run_' + str(msi_run_id)
dir_source_data = '/'.join(
(msi_base, os.path.basename(dir_base), 'data', msi_seg_dir + '/'))
dir_dest_data = '/'.join(
('/' + dest_base_dir, 'data', msi_seg_dir + '/'))
return dir_source_data, dir_dest_data
def get_globus_results_dir(dir_base, msi_run_id, row, msi_base='/home/yangc1/public'):
label_base = 'msi_' + str(msi_run_id) + '_' + str(row.name).zfill(3)
dest_base_dir = os.path.basename(dir_base) + '_msi_run_' + str(msi_run_id)
dir_source_results = '/'.join(
(msi_base, os.path.basename(dir_base), 'results',
'msi_' + str(msi_run_id) + '_results', label_base + '/'))
dir_dest_results = '/'.join(
('/' + dest_base_dir, 'results',
'msi_' + str(msi_run_id) + '_results', label_base + '/'))
return dir_source_results, dir_dest_results
def globus_autoactivate(tc, endpoint, if_expires_in=7200):
r = tc.endpoint_autoactivate(endpoint, if_expires_in=if_expires_in)
if (r["code"] == "AutoActivationFailed"):
print("Endpoint requires manual activation, please open "
"the following URL in a browser to activate the "
"endpoint:")
print("https://app.globus.org/file-manager?origin_id=%s"
% endpoint)
def globus_transfer(dir_source_data, dir_dest_data, TRANSFER_REFRESH_TOKEN, client,
TRANSFER_TOKEN, delete_only=None, label=None,
source_endpoint='d865fc6a-2db3-11e6-8070-22000b1701d1',
dest_endpoint='fb6f1c6b-86b1-11e8-9571-0a6d4e044368'):
'''
Transfers results from msi_0_000 folder from high performance storage
to 2nd tier storage using the GLOBUS Python SDK API. This required some
setup (see "globus_token.py")
dir_source_data = '/home/yangc1/public/hs_process/results/msi_1_results/'
dir_dest_data = 'hs_process/results/msi_1_results'
'''
# if source_endpoint is None:
# source_endpoint = tc.endpoint_search(filter_fulltext='umnmsi#home')
# if dest_endpoint is None:
# # dest_endpoint = tc.endpoint_search(filter_fulltext='umnmsi#tier2')
# tier2_id = 'fb6f1c6b-86b1-11e8-9571-0a6d4e044368'
# dest_endpoint = tc.get_endpoint(tier2_id)
# First, get the transfer client using access tokens
authorizer = globus_sdk.RefreshTokenAuthorizer(
TRANSFER_REFRESH_TOKEN, client, access_token=TRANSFER_TOKEN)
tc = globus_sdk.TransferClient(authorizer=authorizer)
globus_autoactivate(tc, source_endpoint)
globus_autoactivate(tc, dest_endpoint)
submission_id = None
# see: https://globus-sdk-python.readthedocs.io/en/stable/clients/transfer/#helper-objects
tdata = globus_sdk.TransferData( # initialize data transfer
tc, source_endpoint=source_endpoint, destination_endpoint=dest_endpoint,
label=label, submission_id=submission_id,
sync_level=2, verify_checksum=False, preserve_timestamp=False,
encrypt_data=False, deadline=None, recursive_symlinks='ignore')
tdata.add_item(dir_source_data, dir_dest_data, recursive=True) # add directory
transfer_result, delete_result = None, None
if delete_only == False or delete_only is None: # transfer
transfer_result = tc.submit_transfer(tdata)
print("GLOBUS TRANSFER task_id:", transfer_result["task_id"])
print('Waiting for transfer {0} to complete...'
''.format(transfer_result['task_id']))
c = it.count(1)
while not tc.task_wait(transfer_result['task_id'], timeout=60):
count = next(c)
print('Transfer {0} has not yet finished; transfer submitted {1} '
'minute(s) ago'.format(transfer_result['task_id'], count))
if count >= 6:
print('Cancelling task after {0} minutes.'.format(count))
tc.cancel_task(transfer_result['task_id'])
if count >= 8:
print('Looks like I have to break out of the while loop.')
break
print('DONE.')
if delete_only == False or delete_only == True: # delete
ddata = globus_sdk.DeleteData(tc, source_endpoint, recursive=True)
ddata.add_item(dir_source_data)
delete_result = tc.submit_delete(ddata)
print("GLOBUS DELETE task_id:", delete_result['task_id'])
return transfer_result, delete_result
def restart_script():
print("argv was", sys.argv)
print("sys.executable was", sys.executable)
print("restart now")
# os.execv(sys.executable, ['python'] + sys.argv)
os.execv(sys.executable, ['python', __file__] + sys.argv[1:])
def save_n_obs(dir_results_meta, df_join, msi_run_id, grid_idx, y_label, feat):
fname = os.path.join(dir_results_meta, 'msi_{0}_n_observations.csv'
''.format(msi_run_id))
if not os.path.exists(fname):
with open(fname, 'w+') as f:
f.write('msi_run_id, grid_idx, y_label, feature, obs_n\n')
with open(fname, 'a+') as f:
f.write('{0}, {1}, {2}, {3}, {4}\n'
''.format(msi_run_id, grid_idx, y_label, feat, len(df_join)))
# In[Timing functions]
def time_setup_img(dir_out, msi_run_id):
'''
Set up times dictionary and save to file for every loop to append a new
row
'''
cols = ['msi_run_id', 'grid_idx', 'n_jobs', 'time_start', 'time_end', 'time_total',
'crop', 'clip', 'smooth', 'bin', 'segment']
pathlib.Path(dir_out).mkdir(parents=True, exist_ok=True)
fname_times = os.path.join(dir_out, 'msi_' + str(msi_run_id) + '_time_imgproc.csv')
if not os.path.isfile(fname_times):
df_times = pd.DataFrame(columns=cols)
df_times.to_csv(fname_times, index=False)
time_dict = {i:[None] for i in cols}
return time_dict
def time_setup_training(dir_out, msi_run_id):
'''
Set up times dictionary and save to file for every loop to append a new
row
'''
cols = ['n_jobs', 'msi_run_id', 'grid_idx', 'y_label', 'feats',
'time_start', 'time_end', 'time_total',
# 'init1', 'init2', 'init3',
'load_ground', 'load_spec', 'join_data', 'feat_sel', 'tune',
'test', 'plot']
pathlib.Path(dir_out).mkdir(parents=True, exist_ok=True)
fname_times = os.path.join(dir_out, 'msi_' + str(msi_run_id) + '_time_train.csv')
if not os.path.isfile(fname_times):
df_times = pd.DataFrame(columns=cols)
df_times.to_csv(fname_times, index=False)
time_dict = {i:[None] for i in cols}
return time_dict
# def time_setup_training(dir_out, y_label_list, extra_feats_names, msi_run_id):
# '''
# Set up times dictionary and save to file for every loop to append a new
# row
# '''
# cols = ['n_jobs', 'msi_run_id', 'grid_idx',
# 'time_start', 'time_end', 'time_total', 'sttp-init']
# for y_label in y_label_list:
# for feat_name in extra_feats_names:
# col_str = 'sttp-' + y_label + '-' + feat_name
# cols.append(col_str)
# if not os.path.isdir(dir_out):
# os.mkdir(dir_out)
# fname_times = os.path.join(dir_out, 'msi_' + str(msi_run_id) + '_train_runtime.csv')
# if not os.path.isfile(fname_times):
# df_times = pd.DataFrame(columns=cols)
# df_times.to_csv(fname_times, index=False)
# time_dict = {i:[None] for i in cols}
# return time_dict
def append_times(dir_out, time_dict, msi_run_id):
'''
Appends time info to the 'runtime.csv' in ``dir_out``
Parameters:
time_dict (``dict``): contains the time delta or datetime string to
be written to each .csv column
'''
time_end = datetime.now()
time_dict['time_end'] = [str(time_end)]
time_start = datetime.strptime(
time_dict['time_start'][0], '%Y-%m-%d %H:%M:%S.%f')
time_total = time_end - time_start
time_dict['time_total'] = [str(time_total)]
if 'segment' in time_dict.keys():
fname = 'msi_' + str(msi_run_id) + '_time_imgproc.csv'
else:
fname = 'msi_' + str(msi_run_id) + '_time_train.csv'
fname_times = os.path.join(dir_out, fname)
df_time = pd.DataFrame.from_dict(time_dict)
df_time.to_csv(fname_times, header=None, mode='a', index=False,
index_label=df_time.columns)
time_dict_null = dict.fromkeys(time_dict.keys(), [None])
return time_dict_null
def time_loop_init(time_dict, msi_run_id, grid_idx, n_jobs):
'''
Initialization function for keeping track of time. Returns ``time_dict``,
which will hold the time delta or datetime string for each step in the
script.
'''
time_start = datetime.now()
time_dict['n_jobs'] = [n_jobs]
time_dict['msi_run_id'] = [msi_run_id]
time_dict['grid_idx'] = [grid_idx]
time_dict['time_start'] = [str(time_start)]
return time_dict, time_start
def time_step(time_dict, key, time_last):
'''
Calculates the time since time_last and adds it to ``time_dict`` for the
appropriate key.
'''
time_new = datetime.now()
time_dif = time_new - time_last
time_dict[key] = [str(time_dif)]
return time_dict, time_new
# In[Count processed image functions]
def proc_files_count_setup(dir_out, msi_run_id):
'''
Set up processed files dict and save to file for every loop to append a new
row
'''
cols = ['n_jobs', 'msi_run_id', 'grid_idx', 'processed']
pathlib.Path(dir_out).mkdir(parents=True, exist_ok=True)
fname_n_files = os.path.join(dir_out, 'msi_' + str(msi_run_id) + '_imgproc_n_files.csv')
if not os.path.isfile(fname_n_files):
df_proc = pd.DataFrame(columns=cols)
df_proc.to_csv(fname_n_files, index=False)
proc_dict = {i:[None] for i in cols}
return proc_dict
def proc_files_count(proc_dict, n_jobs, msi_run_id, key, dir_data, row, ext='.spec'):
'''
Calculates the time since time_last and adds it to ``time_dict`` for the
appropriate key.
'''
proc_dict['n_jobs'] = [n_jobs]
proc_dict['msi_run_id'] = [msi_run_id]
# dir_data has to be explicit if we're going to do this for every single level..
# n_files_proc = len(recurs_dir(get_spec_data(dir_data, row, feat='reflectance'), search_ext=ext))
n_files_proc = len(fnmatch.filter(os.listdir(
get_spec_data(dir_data, row, feat='reflectance')), '*' + ext))
proc_dict[key] = [n_files_proc]
return proc_dict, n_files_proc
def proc_files_append(dir_out, proc_dict, msi_run_id):
'''
Appends time info to the '_imgproc_n_files.csv' in ``dir_out``
Parameters:
proc_dict (``dict``): contains the n processed files to be written to
each .csv column
'''
fname = 'msi_' + str(msi_run_id) + '_imgproc_n_files.csv'
fname_n_files = os.path.join(dir_out, fname)
df_proc = pd.DataFrame.from_dict(proc_dict)
df_proc.to_csv(fname_n_files, header=None, mode='a', index=False,
index_label=df_proc.columns)
proc_dict_null = dict.fromkeys(proc_dict.keys(), [None])
return proc_dict_null
# In[Image processing functions]
def print_details(row):
print('\nProcessing scenario ID: {0}'.format(row.name))
print('Panels type: {0}'.format(row['dir_panels']))
print('Crop type: {0}'.format(row['crop']))
print('Clip type: {0}'.format(row['clip']))
print('Smooth type: {0}'.format(row['smooth']))
print('Bin type: {0}'.format(row['bin']))
print('Segment type: {0}'.format(row['segment']))
def get_clip_type(row):
'''
Determines the clip type being used in this scenario (and updates
wl_bands accordingly)
Parameters:
row (``pd.Series``):
'''
if pd.isnull(row['clip']):
clip_type = 'clip_none'
wl_bands = row['clip']
elif len(row['clip']['wl_bands']) == 2:
clip_type = 'clip_ends'
wl_bands = row['clip']['wl_bands']
elif len(row['clip']['wl_bands']) == 4:
clip_type = 'clip_all'
wl_bands = row['clip']['wl_bands']
return clip_type, wl_bands
def get_smooth_type(row):
'''
Determines the smooth type being used in this scenario and returns window
size and order for use in file names, etc.
'''
if pd.isnull(row['smooth']):
smooth_type = 'smooth_none'
window_size = row['smooth']
order = row['smooth']
else:
window_size = row['smooth']['window_size']
order = row['smooth']['order']
smooth_type = 'smooth_window_{0}'.format(window_size)
return smooth_type, window_size, order
def get_bin_type(row):
'''
Determines the bin type being used in this scenario (and updates
accordingly)
Parameters:
row (``pd.Series``):
'''
if pd.isnull(row['bin']):
method_bin = None
else:
method_bin = row['bin']['method']
if method_bin == 'spectral_mimic':
sensor = row['bin']['sensor']
bandwidth = None
bin_type = 'bin_mimic_{0}'.format(sensor.replace('-', '_'))
elif method_bin == 'spectral_resample':
sensor = None
bandwidth = row['bin']['bandwidth']
bin_type = 'bin_resample_{0}nm'.format(bandwidth)
else:
sensor = None
bandwidth = None
bin_type = 'bin_none'
return bin_type, method_bin, sensor, bandwidth
def get_segment_type(row):
'''
Determines the segment type being used in this scenario and returns other relevant
information for use in file names, etc.
'''
if pd.isnull(row['segment']):
method = None
wl1 = None
wl2 = None
wl3 = None
mask_percentile = None
mask_side = None
segment_type = 'seg_none'
elif row['segment']['method'] == 'mcari2':
method = row['segment']['method']
wl1 = row['segment']['wl1']
wl2 = row['segment']['wl2']
wl3 = row['segment']['wl3']
mask_percentile = row['segment']['mask_percentile']
mask_side = row['segment']['mask_side']
if isinstance(mask_percentile, list):
mask_pctl_print = '_'.join([str(x) for x in mask_percentile])
segment_type = 'seg_{0}_{1}_{2}'.format(method, mask_pctl_print, get_side_inverse(mask_side))
else:
segment_type = 'seg_{0}_{1}_{2}'.format(method, mask_percentile, get_side_inverse(mask_side))
elif row['segment']['method'] == 'ndi':
method = row['segment']['method']
wl1 = row['segment']['wl1']
wl2 = row['segment']['wl2']
wl3 = None
mask_percentile = row['segment']['mask_percentile']
mask_side = row['segment']['mask_side']
if isinstance(mask_percentile, list):
mask_pctl_print = '_'.join([str(x) for x in mask_percentile])
segment_type = 'seg_{0}_{1}_{2}'.format(method, mask_pctl_print, get_side_inverse(mask_side))
else:
segment_type = 'seg_{0}_{1}_{2}'.format(method, mask_percentile, get_side_inverse(mask_side))
elif row['segment']['method'] == ['mcari2', 'mcari2']:
method = row['segment']['method']
wl1 = row['segment']['wl1']
wl2 = row['segment']['wl2']
wl3 = row['segment']['wl3']
mask_percentile = row['segment']['mask_percentile']
mask_side = row['segment']['mask_side']
segment_type = 'seg_{0}_between_{1}_{2}_pctl'.format(method[0], mask_percentile[0], mask_percentile[1])
elif isinstance(row['segment']['method'], list) and row['segment']['method'][1] != 'mcari2':
method = row['segment']['method']
wl1 = row['segment']['wl1']
wl2 = row['segment']['wl2']
wl3 = row['segment']['wl3']
mask_percentile = row['segment']['mask_percentile']
mask_side = row['segment']['mask_side']
if method == ['mcari2', [545, 565]]:
segment_type = 'seg_mcari2_{0}_{1}_green_{2}_{3}'.format(mask_percentile[0], get_side_inverse(mask_side[0]), mask_percentile[1], get_side_inverse(mask_side[1]))
elif method == ['mcari2', [800, 820]]:
segment_type = 'seg_mcari2_{0}_{1}_nir_{2}_{3}'.format(mask_percentile[0], get_side_inverse(mask_side[0]), mask_percentile[1], get_side_inverse(mask_side[1]))
return segment_type, method, wl1, wl2, wl3, mask_percentile, mask_side
def smooth_get_base_dir(dir_data, panel_type, crop_type, clip_type):
'''
Gets the base directory for the smoothed images
'''
if clip_type == 'clip_none':
base_dir = os.path.join(dir_data, panel_type, crop_type)
else:
base_dir = os.path.join(dir_data, panel_type, crop_type, clip_type)
return base_dir
def bin_get_base_dir(dir_data, panel_type, crop_type, clip_type, smooth_type):
'''
Gets the base directory for the binned images
'''
if smooth_type == 'smooth_none' and clip_type == 'clip_none':
base_dir = os.path.join(dir_data, panel_type, crop_type)
elif smooth_type == 'smooth_none' and clip_type != 'clip_none':
base_dir = os.path.join(dir_data, panel_type, crop_type, clip_type)
else: # smooth_type != 'smooth_none'
base_dir = os.path.join(dir_data, panel_type, crop_type, clip_type, smooth_type)
return base_dir
def seg_get_base_dir(dir_data, panel_type, crop_type, clip_type, smooth_type,
bin_type):
'''
Gets the base directory for the segmented images
'''
if bin_type == 'bin_none' and smooth_type == 'smooth_none' and clip_type == 'clip_none':
base_dir = os.path.join(dir_data, panel_type, crop_type)
elif bin_type == 'bin_none' and smooth_type == 'smooth_none' and clip_type != 'clip_none':
base_dir = os.path.join(dir_data, panel_type, crop_type, clip_type)
elif bin_type == 'bin_none' and smooth_type != 'smooth_none':
base_dir = os.path.join(dir_data, panel_type, crop_type, clip_type, smooth_type)
else: # bin_type != 'bin_none'
base_dir = os.path.join(dir_data, panel_type, crop_type, clip_type, smooth_type, bin_type)
return base_dir
def crop(df_crop, panel_type, dir_out_crop, out_force=True, n_files=854,
gdf_aerf=None, gdf_wells=None):
'''
Gets the cropping info for each site and crops all the datacubes
Parameters:
df_crop (``pandas.DataFrame``): a dataframe containing all the cropping
instructions for all the input datacubes
'''
if check_processing(dir_out_crop, ext='.bip', n_files=n_files):
return
folder_name = None
name_append = os.path.split(dir_out_crop)[-1].replace('_', '-')
if panel_type == 'ref_closest_panel': # data doesn't exist for 7/23 aerf small plot
df_crop_aerf_small = df_crop[(df_crop['study'] == 'aerfsmall') &
(df_crop['date'] != datetime(2019, 7, 23))]
else:
df_crop_aerf_small = df_crop[df_crop['study'] == 'aerfsmall']
df_crop_aerf_whole = df_crop[df_crop['study'] == 'aerffield']
df_crop_wells_18 = df_crop[(df_crop['study'] == 'wells') &
(df_crop['date'].dt.year == 2018)]
df_crop_wells_19 = df_crop[(df_crop['study'] == 'wells') &
(df_crop['date'].dt.year == 2019)]
hsbatch = batch()
hsbatch.io.set_io_defaults(force=out_force)
hsbatch.spatial_crop(fname_sheet=df_crop_wells_18, method='many_gdf',
gdf=gdf_wells, base_dir_out=dir_out_crop,
folder_name=folder_name, name_append=name_append)
hsbatch.spatial_crop(fname_sheet=df_crop_wells_19, method='many_gdf',
gdf=gdf_wells, base_dir_out=dir_out_crop,
folder_name=folder_name, name_append=name_append)
hsbatch.spatial_crop(fname_sheet=df_crop_aerf_small, method='single',
base_dir_out=dir_out_crop, folder_name=folder_name,
name_append=name_append)
hsbatch.spatial_crop(fname_sheet=df_crop_aerf_whole, method='many_gdf',
gdf=gdf_aerf, base_dir_out=dir_out_crop,
folder_name=folder_name, name_append=name_append)
def chunk_by_n(array, n):
np.random.shuffle(array) # studies have different size images, so shuffling makes each chunk more similar
arrays = np.array_split(array, n)
list_out = []
for l in arrays:
list_out.append(l.tolist())
return list_out
def check_missed_files(fname_list, base_dir_out, ext_out, f_pp, row, base_dir_f,
out_force, lock):
'''
Check if any files were not processed that were supposed to be processed.
Parameters:
f_pp (function): The parallel processing function to run to
complete the processing of the missing files.
**kwargs (dict): keyword arguments to pass to f_pp.
'''
# Goal: take out filepaths and end text
to_process = [os.path.splitext(os.path.basename(i))[0].rsplit('-')[0] for i in fname_list]
name_ex, ext = os.path.splitext(fname_list[0])
end_str = '-' + '-'.join(name_ex.split('-')[1:]) + ext
# Find processed files without filepaths and end text
# base_dir_spec = os.path.join(dir_out_mask, 'reflectance')
fname_list_complete = fnmatch.filter(os.listdir(base_dir_out), '*' + ext_out) # no filepath
processed = [os.path.splitext(i)[0].split('-')[0] for i in fname_list_complete]
missed = [f for f in to_process if f not in processed]
base_dir = os.path.dirname(fname_list[0])
fname_list_missed = [os.path.join(base_dir, f + end_str) for f in missed]
if len(missed) > 0:
print('There were {0} images that slipped through the cracks. '
'Processing them manually now...\n'.format(len(missed)))
print('Directory: {0}'.format(base_dir))
print('Here are the missed images:\n{0}\n'.format(missed))
f_pp(fname_list_missed, row, base_dir_f, out_force, lock)
def clip(dir_data, row, out_force=True, n_files=854):
'''
Clips each of the datacubes according to instructions in df_grid
'''
panel_type = row['dir_panels']
crop_type = row['crop']
clip_type, wl_bands = get_clip_type(row)
base_dir = os.path.join(dir_data, panel_type, crop_type)
dir_out_clip = os.path.join(dir_data, panel_type, crop_type, clip_type)
if check_processing(dir_out_clip, ext='.bip', n_files=n_files):
return
if wl_bands is not None:
folder_name = None
name_append = os.path.split(dir_out_clip)[-1].replace('_', '-')
hsbatch = batch()
hsbatch.io.set_io_defaults(force=out_force)
hsbatch.spectral_clip(base_dir=base_dir, folder_name=folder_name,
name_append=name_append,
base_dir_out=dir_out_clip,
wl_bands=wl_bands)
else:
print('Clip: ``clip_type`` is None, so there is nothing to process.')
def clip_f_pp(fname_list_clip, wl_bands, dir_out_clip, out_force, lock):
'''
Parallel processing: clips each of the datacubes according to instructions
in df_grid organized for multi-core processing.
These are the lines of code that have to be passed as a function to
ProcessPoolExecutor()
'''
assert wl_bands is not None, ('``wl_bands`` must not be ``None``')
hsbatch = batch(lock=lock)
hsbatch.io.set_io_defaults(force=out_force)
# clip2(fname_list_clip, wl_bands, dir_out_clip, hsbatch)
# if wl_bands is not None:
folder_name = None
name_append = os.path.split(dir_out_clip)[-1].replace('_', '-')
hsbatch.spectral_clip(fname_list=fname_list_clip, folder_name=folder_name,
name_append=name_append,
base_dir_out=dir_out_clip,
wl_bands=wl_bands)
def clip_pp(dir_data, row, n_jobs, out_force=True, n_files=854):
'''
Actual execution of band clipping via multi-core processing
'''
m = Manager()
lock = m.Lock()
panel_type = row['dir_panels']
crop_type = row['crop']
clip_type, wl_bands = get_clip_type(row)
base_dir = os.path.join(dir_data, panel_type, crop_type)
dir_out_clip = os.path.join(dir_data, panel_type, crop_type, clip_type)
already_processed = check_processing(dir_out_clip, ext='.bip', n_files=n_files)
if out_force is False and already_processed is True:
fname_list = []
elif clip_type == 'clip_none':
fname_list = []
else:
fname_list = fnmatch.filter(os.listdir(base_dir), '*.bip') # no filepath
fname_list = [os.path.join(base_dir, f) for f in fname_list]
# fname_list = recurs_dir(base_dir, search_ext='.bip', level=0)
chunk_size = int(len(fname_list) / (n_jobs*2))
if len(fname_list) == 0:
print('Clip: ``clip_type`` is either None and there is nothing to '
'process, or all the images are already processed.')
else:
chunks = chunk_by_n(fname_list, n_jobs*2)
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
executor.map(
clip_f_pp, chunks, it.repeat(wl_bands),
it.repeat(dir_out_clip), it.repeat(out_force), it.repeat(lock))
ext_out = '.bip'
check_missed_files(fname_list, dir_out_clip, ext_out, clip_f_pp, row,
dir_out_clip, out_force, lock)
def smooth(dir_data, row, out_force=True, n_files=854):
'''
Smoothes each of the datacubes according to instructions in df_grid
'''
panel_type = row['dir_panels']
crop_type = row['crop']
clip_type, wl_bands = get_clip_type(row)
smooth_type, window_size, order = get_smooth_type(row)
base_dir = smooth_get_base_dir(dir_data, panel_type, crop_type, clip_type)
dir_out_smooth = os.path.join(dir_data, panel_type, crop_type, clip_type,
smooth_type)
if check_processing(dir_out_smooth, ext='.bip', n_files=n_files):
return
if window_size is not None and order is not None:
folder_name = None
name_append = os.path.split(dir_out_smooth)[-1].replace('_', '-')
hsbatch = batch()
hsbatch.io.set_io_defaults(force=out_force)
hsbatch.spectral_smooth(base_dir=base_dir, folder_name=folder_name,
name_append=name_append,
base_dir_out=dir_out_smooth,
window_size=window_size, order=order)
else:
print('Smooth: ``smooth_type`` is None, so there is nothing to process.')
def smooth_f_pp(fname_list_smooth, window_size, order, dir_out_smooth, out_force, lock):
'''
Parallel processing: smoothes each of the datacubes according to
instructions in df_grid organized for multi-core processing.
'''
msg = ('``window_size`` must not be ``None``')
assert window_size is not None and order is not None, msg
hsbatch = batch(lock=lock)
hsbatch.io.set_io_defaults(force=out_force)
# clip2(fname_list_clip, wl_bands, dir_out_clip, hsbatch)
# if wl_bands is not None:
folder_name = None
name_append = os.path.split(dir_out_smooth)[-1].replace('_', '-')
hsbatch.spectral_smooth(
fname_list=fname_list_smooth, folder_name=folder_name,
name_append=name_append, base_dir_out=dir_out_smooth,
window_size=window_size, order=order)
def smooth_pp(dir_data, row, n_jobs, out_force=True, n_files=854):
'''
Actual execution of band smoothing via multi-core processing
'''
m = Manager()
lock = m.Lock()
panel_type = row['dir_panels']
crop_type = row['crop']
clip_type, wl_bands = get_clip_type(row)
smooth_type, window_size, order = get_smooth_type(row)
base_dir = smooth_get_base_dir(dir_data, panel_type, crop_type, clip_type)
dir_out_smooth = os.path.join(dir_data, panel_type, crop_type, clip_type,
smooth_type)
already_processed = check_processing(dir_out_smooth, ext='.bip', n_files=n_files)
if out_force is False and already_processed is True:
fname_list = []
elif smooth_type == 'smooth_none':
fname_list = []
else:
fname_list = fnmatch.filter(os.listdir(base_dir), '*.bip') # no filepath
fname_list = [os.path.join(base_dir, f) for f in fname_list]
# fname_list = recurs_dir(base_dir, search_ext='.bip', level=0)
if len(fname_list) == 0:
print('Smooth: ``smooth_type`` is either None and there is nothing to '
'process, or all the images are already processed.')
else:
np.random.shuffle(fname_list) # studies have different size images, so shuffling makes each chunk more similar
chunks = chunk_by_n(fname_list, n_jobs*2)
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
executor.map(
smooth_f_pp, chunks, it.repeat(window_size),
it.repeat(order), it.repeat(dir_out_smooth),
it.repeat(out_force), it.repeat(lock))
ext = '.bip'
check_missed_files(fname_list, dir_out_smooth, ext, smooth_f_pp, row,
dir_out_smooth, out_force, lock)
def bin_f_pp(fname_list_bin, row, dir_out_bin, out_force, lock):
'''
Parallel processing: spectral mimic/resampling for each of the datacubes
according to instructions in df_grid organized for multi-core processing.
'''
bin_type, method_bin, sensor, bandwidth = get_bin_type(row)
msg = ('``bin_type`` must not be ``None``')