Changeset d727af2 in flex_extract.git for source/python/classes/EcFlexpart.py
- Timestamp:
- Mar 10, 2019, 4:48:16 PM (5 years ago)
- Branches:
- master, ctbto, dev
- Children:
- 90a1ca0
- Parents:
- 79729d5
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
source/python/classes/EcFlexpart.py
r79729d5 rd727af2 73 73 sys.path.append('../') 74 74 import _config 75 from .GribUtil import GribUtil75 from classes.GribUtil import GribUtil 76 76 from mods.tools import (init128, to_param_id, silent_remove, product, 77 77 my_error, make_dir, get_informations, get_dimensions, 78 execute_subprocess, to_param_id_with_tablenumber) 79 from .MarsRetrieval import MarsRetrieval 80 from .UioFiles import UioFiles 78 execute_subprocess, to_param_id_with_tablenumber, 79 generate_retrieval_period_boundary) 80 from classes.MarsRetrieval import MarsRetrieval 81 from classes.UioFiles import UioFiles 81 82 import mods.disaggregation as disaggregation 82 83 … … 586 587 for key in index_keys: 587 588 key_vals = codes_index_get(iid, key) 588 # have to sort the key values for correct disaggregation,589 # have to sort the key values for correct order, 589 590 # therefore convert to int first 590 591 key_vals = [int(k) for k in key_vals] … … 922 923 923 924 table128 = init128(_config.PATH_GRIBTABLE) 925 # get ids from the flux parameter names 924 926 pars = to_param_id(self.params['OG_acc_SL'][0], table128) 925 927 … … 928 930 929 931 # get the values of the keys which are used for distinct access 930 # of grib messages via product 932 # of grib messages via product and save the maximum number of 933 # ensemble members if there is more than one 931 934 if '/' in self.number: 932 935 # more than one ensemble member is selected 933 936 index_keys = ["number", "date", "time", "step"] 937 # maximum ensemble number retrieved 938 # + 1 for the control run (ensemble number 0) 939 maxnum = int(self.number.split('/')[-1]) + 1 940 # remember the index of the number values 941 index_number = index_keys.index('number') 942 # empty set to save ensemble numbers which were already processed 943 ens_numbers = set() 944 # index for the ensemble number 945 inumb = 0 934 946 else: 935 947 index_keys = ["date", "time", "step"] 948 # maximum ensemble number 949 maxnum = None 950 951 # get sorted lists of the index values 952 # this is very important for disaggregating 953 # the flux data in correct order 936 954 iid, index_vals = self._mk_index_values(c.inputdir, 937 955 inputfiles, … … 943 961 944 962 if c.rrint: 963 # set start and end timestamps for retrieval period 945 964 if not c.purefc: 946 965 start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H') … … 953 972 end_date = end_date + timedelta(hours=c.maxstep) 954 973 974 # get necessary grid dimensions from grib files for storing the 975 # precipitation fields 955 976 info = get_informations(os.path.join(c.inputdir, 956 977 inputfiles.files[0])) 957 978 dims = get_dimensions(info, c.purefc, c.dtime, index_vals, 958 979 start_date, end_date) 959 # create numpy array 960 lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) 961 cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) 980 981 # create empty numpy arrays 982 if not maxnum: 983 lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) 984 cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64) 985 else: 986 lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64) 987 cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64) 988 989 # index counter for time line 962 990 it_lsp = 0 963 991 it_cp = 0 992 993 # store the order of date and step 964 994 date_list = [] 965 995 step_list = [] 966 996 967 # initialize dictionaries to store values997 # initialize dictionaries to store flux values per parameter 968 998 orig_vals = {} 969 999 deac_vals = {} … … 976 1006 for prod in product(*index_vals): 977 1007 # e.g. prod = ('20170505', '0', '12') 978 # ( date,time, step)1008 # ( date ,time, step) 979 1009 980 1010 print('CURRENT PRODUCT: ', prod) 1011 1012 # the whole process has to be done for each seperate ensemble member 1013 # therefore, for each new ensemble member we delete old flux values 1014 # and start collecting flux data from the beginning time step 1015 if maxnum and prod[index_number] not in ens_numbers: 1016 ens_numbers.add(prod[index_number]) 1017 inumb = len(ens_numbers) - 1 1018 # re-initialize dictionaries to store flux values per parameter 1019 # for the next ensemble member 1020 it_lsp = 0 1021 it_cp = 0 1022 orig_vals = {} 1023 deac_vals = {} 1024 for p in pars: 1025 orig_vals[p] = [] 1026 deac_vals[p] = [] 981 1027 982 1028 for i in range(len(index_keys)): … … 1010 1056 # if necessary, add ensemble member number to filename suffix 1011 1057 # otherwise, add empty string 1012 if 'number' in index_keys: 1013 index_number = index_keys.index('number') 1014 if len(index_vals[index_number]) > 1: 1015 numbersuffix = '.N{:0>3}'.format(int(prod[index_number])) 1058 if maxnum: 1059 numbersuffix = '.N{:0>3}'.format(int(prod[index_number])) 1016 1060 else: 1017 1061 numbersuffix = '' … … 1092 1136 if step not in step_list: 1093 1137 step_list.append(step) 1094 if c.rrint and parId == 142: 1095 lsp_np[:,it_lsp] = deac_vals[parId][-1][:] 1138 # store precipitation values 1139 if maxnum and parId == 142: 1140 lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:] 1096 1141 it_lsp += 1 1097 elif c.rrint and parId == 143: 1098 cp_np[:,it_cp] = deac_vals[parId][-1][:] 1142 elif not maxnum and parId == 142: 1143 lsp_np[:, it_lsp] = deac_vals[parId][-1][:] 1144 it_lsp += 1 1145 elif maxnum and parId == 143: 1146 cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:] 1147 it_cp += 1 1148 elif not maxnum and parId == 143: 1149 cp_np[:, it_cp] = deac_vals[parId][-1][:] 1099 1150 it_cp += 1 1100 1151 … … 1195 1246 self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir) 1196 1247 1197 self._prep_new_rrint( ni, nj, lsp_np.shape[1], lsp_np,1198 cp_np, date_list, step_list, c)1248 self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np, 1249 cp_np, maxnum, index_keys, index_vals, c) 1199 1250 1200 1251 return 1201 1252 1202 def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):1253 def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c): 1203 1254 '''Calculates and writes out the disaggregated precipitation fields. 1204 1255 1205 1256 Disaggregation is done in time and original times are written to the 1206 1257 flux files, while the additional subgrid times are written to 1207 extra files. 1258 extra files output files. They are named like the original files with 1259 suffixes "_1" and "_2" for the first and second subgrid point. 1208 1260 1209 1261 Parameters … … 1226 1278 Shape (ni * nj, nt). 1227 1279 1228 date_list : list of datetime 1229 The list of dates for which the disaggregation is to be done. 1230 1231 step_list : list of int 1232 The list of steps for a single forecast time. 1233 Only necessary for pure forecasts. 1280 maxnum : int 1281 The maximum number of ensemble members. It is None 1282 if there are no or just one ensemble. 1283 1284 index_keys : dictionary 1285 List of parameter names which serves as index. 1286 1287 index_vals : list of list of str 1288 Contains the values from the keys used for a distinct selection 1289 of grib messages in processing the grib files. 1290 Content looks like e.g.: 1291 index_vals[0]: ('20171106', '20171107', '20171108') ; date 1292 index_vals[1]: ('0', '1200', '1800', '600') ; time 1293 index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange 1234 1294 1235 1295 c : ControlFile … … 1242 1302 ''' 1243 1303 print('... disaggregation of precipitation with new method.') 1244 lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64) 1245 cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64) 1304 1305 tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb') 1306 1307 # initialize new numpy arrays for disaggregated fields 1308 if maxnum: 1309 lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64) 1310 cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64) 1311 else: 1312 lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64) 1313 cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64) 1246 1314 1247 1315 # do the disaggregation, but neglect the last value of the 1248 # time series. This one corresponds for example to 24 hour, 1249 # which we don't need. 1250 for ix in range(ni*nj): 1251 lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1] 1252 cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1] 1316 # original time series. This one corresponds for example to 1317 # 24 hour, which we don't need. we use 0 - 23 UTC for a day. 1318 if maxnum: 1319 for inum in range(maxnum): 1320 for ix in range(ni*nj): 1321 lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1] 1322 cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1] 1323 else: 1324 for ix in range(ni*nj): 1325 lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[0,ix,:])[:-1] 1326 cp_new_np[ix,:] = disaggregation.IA3(cp_np[0,ix,:])[:-1] 1253 1327 1254 1328 # write to grib files (full/orig times to flux file and inbetween 1255 1329 # times into seperate end files) 1256 1330 print('... write disaggregated precipitation to files.') 1331 1332 if maxnum: 1333 # remember the index of the number values 1334 index_number = index_keys.index('number') 1335 # empty set to save unique ensemble numbers which were already processed 1336 ens_numbers = set() 1337 # index for the ensemble number 1338 inumb = 0 1339 else: 1340 inumb = 0 1341 1257 1342 # index variable of disaggregated fields 1258 1343 it = 0 1259 # loop over times and write original time step and the two newly 1260 # generated sub time steps for each loop 1261 for date in date_list: 1262 for step in step_list: 1263 tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb') 1264 1265 if c.purefc: 1266 fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \ 1267 '.{:0>3}'.format(step) 1268 filename1 = c.prefix + date.strftime('%y%m%d.%H') + \ 1269 '.{:0>3}'.format(step) + '_1' 1270 filename2 = c.prefix + date.strftime('%y%m%d.%H') + \ 1271 '.{:0>3}'.format(step) + '_2' 1272 else: 1273 fluxfilename = 'flux' + date.strftime('%Y%m%d%H') 1274 filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1' 1275 filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2' 1276 1277 # write original time step to flux file as usual 1278 fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename)) 1279 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1280 wherekeynames=['paramId'], wherekeyvalues=[142], 1281 keynames=['date','time','stepRange','values'], 1282 keyvalues=[int(date.strftime('%Y%m%d')), 1283 date.hour*100, step, lsp_new_np[:,it]], 1284 ) 1285 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1286 wherekeynames=['paramId'], wherekeyvalues=[143], 1287 keynames=['date','time','stepRange','values'], 1288 keyvalues=[int(date.strftime('%Y%m%d')), 1289 date.hour*100, step, cp_new_np[:,it]] 1290 ) 1291 1292 # write first subgrid time step 1293 endfile1 = GribUtil(os.path.join(c.inputdir, filename1)) 1294 endfile1.set_keys(tmpfile, filemode='w', strict=True, 1295 wherekeynames=['paramId'], wherekeyvalues=[142], 1296 keynames=['date','time','stepRange','values'], 1297 keyvalues=[int(date.strftime('%Y%m%d')), 1298 date.hour*100, step, lsp_new_np[:,it+1]] 1299 ) 1300 endfile1.set_keys(tmpfile, filemode='a', strict=True, 1301 wherekeynames=['paramId'], wherekeyvalues=[143], 1302 keynames=['date','time','stepRange','values'], 1303 keyvalues=[int(date.strftime('%Y%m%d')), 1304 date.hour*100, step, cp_new_np[:,it+1]] 1305 ) 1306 1307 # write second subgrid time step 1308 endfile2 = GribUtil(os.path.join(c.inputdir, filename2)) 1309 endfile2.set_keys(tmpfile, filemode='w', strict=True, 1310 wherekeynames=['paramId'], wherekeyvalues=[142], 1311 keynames=['date','time','stepRange','values'], 1312 keyvalues=[int(date.strftime('%Y%m%d')), 1313 date.hour*100, step, lsp_new_np[:,it+2]] 1314 ) 1315 endfile2.set_keys(tmpfile, filemode='a', strict=True, 1316 wherekeynames=['paramId'], wherekeyvalues=[143], 1317 keynames=['date','time','stepRange','values'], 1318 keyvalues=[int(date.strftime('%Y%m%d')), 1319 date.hour*100, step, cp_new_np[:,it+2]] 1320 ) 1321 it = it + 3 # jump to next original time step 1344 1345 # "product" genereates each possible combination between the 1346 # values of the index keys 1347 for prod in product(*index_vals): 1348 # e.g. prod = ('20170505', '0', '12') 1349 # ( date ,time, step) 1350 # or prod = ('0' , '20170505', '0', '12') 1351 # (number, date ,time, step) 1352 1353 cdate = prod[index_keys.index('date')] 1354 ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100) 1355 cstep = '{:0>3}'.format(int(prod[index_keys.index('step')])) 1356 1357 date = datetime.strptime(cdate + ctime, '%Y%m%d%H') 1358 date += timedelta(hours=int(cstep)) 1359 1360 start_period, end_period = generate_retrieval_period_boundary(c) 1361 # skip all temporary times 1362 # which are outside the retrieval period 1363 if date < start_period or \ 1364 date > end_period: 1365 continue 1366 1367 # the whole process has to be done for each seperate ensemble member 1368 # therefore, for each new ensemble member we delete old flux values 1369 # and start collecting flux data from the beginning time step 1370 if maxnum and prod[index_number] not in ens_numbers: 1371 ens_numbers.add(prod[index_number]) 1372 inumb = int(prod[index_number]) 1373 it = 0 1374 1375 # if necessary, add ensemble member number to filename suffix 1376 # otherwise, add empty string 1377 if maxnum: 1378 numbersuffix = '.N{:0>3}'.format(int(prod[index_number])) 1379 else: 1380 numbersuffix = '' 1381 1382 # per original time stamp: write original time step and 1383 # the two newly generated sub time steps 1384 if c.purefc: 1385 fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep 1386 else: 1387 fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix 1388 1389 # write original time step to flux file as usual 1390 fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename)) 1391 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1392 wherekeynames=['paramId'], wherekeyvalues=[142], 1393 keynames=['perturbationNumber','date','time','stepRange','values'], 1394 keyvalues=[inumb, int(date.strftime('%Y%m%d')), 1395 date.hour*100, 0, lsp_new_np[inumb,:,it]], 1396 ) 1397 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1398 wherekeynames=['paramId'], wherekeyvalues=[143], 1399 keynames=['perturbationNumber','date','time','stepRange','values'], 1400 keyvalues=[inumb,int(date.strftime('%Y%m%d')), 1401 date.hour*100, 0, cp_new_np[inumb,:,it]] 1402 ) 1403 1404 # rr for first subgrid point is identified by step = 1 1405 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1406 wherekeynames=['paramId'], wherekeyvalues=[142], 1407 keynames=['perturbationNumber','date','time','stepRange','values'], 1408 keyvalues=[inumb,int(date.strftime('%Y%m%d')), 1409 date.hour*100, '1', lsp_new_np[inumb,:,it+1]] 1410 ) 1411 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1412 wherekeynames=['paramId'], wherekeyvalues=[143], 1413 keynames=['perturbationNumber','date','time','stepRange','values'], 1414 keyvalues=[inumb,int(date.strftime('%Y%m%d')), 1415 date.hour*100, '1', cp_new_np[inumb,:,it+1]] 1416 ) 1417 1418 # rr for second subgrid point is identified by step = 2 1419 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1420 wherekeynames=['paramId'], wherekeyvalues=[142], 1421 keynames=['perturbationNumber','date','time','stepRange','values'], 1422 keyvalues=[inumb,int(date.strftime('%Y%m%d')), 1423 date.hour*100, '2', lsp_new_np[inumb,:,it+2]] 1424 ) 1425 fluxfile.set_keys(tmpfile, filemode='a', strict=True, 1426 wherekeynames=['paramId'], wherekeyvalues=[143], 1427 keynames=['perturbationNumber','date','time','stepRange','values'], 1428 keyvalues=[inumb,int(date.strftime('%Y%m%d')), 1429 date.hour*100, '2', cp_new_np[inumb,:,it+2]] 1430 ) 1431 1432 it = it + 3 # jump to next original time step in rr fields 1322 1433 return 1323 1434 … … 1578 1689 # collect for final processing 1579 1690 self.outputfilelist.append(os.path.basename(fnout)) 1580 # get additional precipitation subgrid data if available1581 if c.rrint:1582 self.outputfilelist.append(os.path.basename(fnout + '_1'))1583 self.outputfilelist.append(os.path.basename(fnout + '_2'))1691 # # get additional precipitation subgrid data if available 1692 # if c.rrint: 1693 # self.outputfilelist.append(os.path.basename(fnout + '_1')) 1694 # self.outputfilelist.append(os.path.basename(fnout + '_2')) 1584 1695 # ============================================================================================ 1585 1696 # create outputfile and copy all data from intermediate files … … 1614 1725 ''' Calculates extra ensemble members for ELDA - Stream. 1615 1726 1727 This is a specific feature which doubles the number of ensemble members 1728 for the ELDA Stream. 1729 1616 1730 Parameters 1617 1731 ---------- … … 1643 1757 1644 1758 filename = cffile.split('N000')[0] 1645 for i in range(1, maxnum +1): # max number nehmen1759 for i in range(1, maxnum + 1): 1646 1760 1647 1761 # read an ensemble member … … 1656 1770 break 1657 1771 values = codes_get_array(gid, 'values') 1772 # generate a new ensemble member by subtracting 1773 # 2 * ( current time step value - last time step value ) 1658 1774 codes_set_array(gid, 'values', 1659 1775 values-2*(values-cfvalues[j])) … … 1704 1820 1705 1821 print('Output filelist: ') 1706 print(s elf.outputfilelist)1822 print(sorted(self.outputfilelist)) 1707 1823 1708 1824 for ofile in self.outputfilelist:
Note: See TracChangeset
for help on using the changeset viewer.