Changeset d727af2 in flex_extract.git for source/python/classes/EcFlexpart.py


Ignore:
Timestamp:
Mar 10, 2019, 4:48:16 PM (5 years ago)
Author:
Anne Philipp <anne.philipp@…>
Branches:
master, ctbto, dev
Children:
90a1ca0
Parents:
79729d5
Message:

updated rrint processes (added missing rr for ensembles and wrote all precip fields into the output files with different steps to distingush)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • source/python/classes/EcFlexpart.py

    r79729d5 rd727af2  
    7373sys.path.append('../')
    7474import _config
    75 from .GribUtil import GribUtil
     75from classes.GribUtil import GribUtil
    7676from mods.tools import (init128, to_param_id, silent_remove, product,
    7777                        my_error, make_dir, get_informations, get_dimensions,
    78                         execute_subprocess, to_param_id_with_tablenumber)
    79 from .MarsRetrieval import MarsRetrieval
    80 from .UioFiles import UioFiles
     78                        execute_subprocess, to_param_id_with_tablenumber,
     79                        generate_retrieval_period_boundary)
     80from classes.MarsRetrieval import MarsRetrieval
     81from classes.UioFiles import UioFiles
    8182import mods.disaggregation as disaggregation
    8283
     
    586587        for key in index_keys:
    587588            key_vals = codes_index_get(iid, key)
    588             # have to sort the key values for correct disaggregation,
     589            # have to sort the key values for correct order,
    589590            # therefore convert to int first
    590591            key_vals = [int(k) for k in key_vals]
     
    922923
    923924        table128 = init128(_config.PATH_GRIBTABLE)
     925        # get ids from the flux parameter names
    924926        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
    925927
     
    928930
    929931        # get the values of the keys which are used for distinct access
    930         # of grib messages via product
     932        # of grib messages via product and save the maximum number of
     933        # ensemble members if there is more than one
    931934        if '/' in self.number:
    932935            # more than one ensemble member is selected
    933936            index_keys = ["number", "date", "time", "step"]
     937            # maximum ensemble number retrieved
     938            # + 1 for the control run (ensemble number 0)
     939            maxnum = int(self.number.split('/')[-1]) + 1
     940            # remember the index of the number values
     941            index_number = index_keys.index('number')
     942            # empty set to save ensemble numbers which were already processed
     943            ens_numbers = set()
     944            # index for the ensemble number
     945            inumb = 0
    934946        else:
    935947            index_keys = ["date", "time", "step"]
     948            # maximum ensemble number
     949            maxnum = None
     950
     951        # get sorted lists of the index values
     952        # this is very important for disaggregating
     953        # the flux data in correct order
    936954        iid, index_vals = self._mk_index_values(c.inputdir,
    937955                                                inputfiles,
     
    943961
    944962        if c.rrint:
     963            # set start and end timestamps for retrieval period
    945964            if not c.purefc:
    946965                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
     
    953972                end_date = end_date + timedelta(hours=c.maxstep)
    954973
     974            # get necessary grid dimensions from grib files for storing the
     975            # precipitation fields
    955976            info = get_informations(os.path.join(c.inputdir,
    956977                                                 inputfiles.files[0]))
    957978            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
    958979                                  start_date, end_date)
    959             # create numpy array
    960             lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
    961             cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
     980
     981            # create empty numpy arrays
     982            if not maxnum:
     983                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
     984                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
     985            else:
     986                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
     987                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
     988
     989            # index counter for time line
    962990            it_lsp = 0
    963991            it_cp = 0
     992
     993            # store the order of date and step
    964994            date_list = []
    965995            step_list = []
    966996
    967         # initialize dictionaries to store values
     997        # initialize dictionaries to store flux values per parameter
    968998        orig_vals = {}
    969999        deac_vals = {}
     
    9761006        for prod in product(*index_vals):
    9771007            # e.g. prod = ('20170505', '0', '12')
    978             #             (  date    ,time, step)
     1008            #             ( date     ,time, step)
    9791009
    9801010            print('CURRENT PRODUCT: ', prod)
     1011
     1012            # the whole process has to be done for each seperate ensemble member
     1013            # therefore, for each new ensemble member we delete old flux values
     1014            # and start collecting flux data from the beginning time step
     1015            if maxnum and prod[index_number] not in ens_numbers:
     1016                ens_numbers.add(prod[index_number])
     1017                inumb = len(ens_numbers) - 1
     1018                # re-initialize dictionaries to store flux values per parameter
     1019                # for the next ensemble member
     1020                it_lsp = 0
     1021                it_cp = 0
     1022                orig_vals = {}
     1023                deac_vals = {}
     1024                for p in pars:
     1025                    orig_vals[p] = []
     1026                    deac_vals[p] = []
    9811027
    9821028            for i in range(len(index_keys)):
     
    10101056            # if necessary, add ensemble member number to filename suffix
    10111057            # otherwise, add empty string
    1012             if 'number' in index_keys:
    1013                 index_number = index_keys.index('number')
    1014                 if len(index_vals[index_number]) > 1:
    1015                     numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
     1058            if maxnum:
     1059                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
    10161060            else:
    10171061                numbersuffix = ''
     
    10921136                            if step not in step_list:
    10931137                                step_list.append(step)
    1094                         if c.rrint and parId == 142:
    1095                             lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
     1138                        # store precipitation values
     1139                        if maxnum and parId == 142:
     1140                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
    10961141                            it_lsp += 1
    1097                         elif c.rrint and parId == 143:
    1098                             cp_np[:,it_cp] = deac_vals[parId][-1][:]
     1142                        elif not maxnum and parId == 142:
     1143                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
     1144                            it_lsp += 1
     1145                        elif maxnum and parId == 143:
     1146                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
     1147                            it_cp += 1
     1148                        elif not maxnum and parId == 143:
     1149                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
    10991150                            it_cp += 1
    11001151
     
    11951246            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
    11961247
    1197             self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
    1198                                  cp_np, date_list, step_list, c)
     1248            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
     1249                                 cp_np, maxnum, index_keys, index_vals, c)
    11991250
    12001251        return
    12011252
    1202     def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
     1253    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
    12031254        '''Calculates and writes out the disaggregated precipitation fields.
    12041255
    12051256        Disaggregation is done in time and original times are written to the
    12061257        flux files, while the additional subgrid times are written to
    1207         extra files.
     1258        extra files output files. They are named like the original files with
     1259        suffixes "_1" and "_2" for the first and second subgrid point.
    12081260
    12091261        Parameters
     
    12261278            Shape (ni * nj, nt).
    12271279
    1228         date_list : list of datetime
    1229             The list of dates for which the disaggregation is to be done.
    1230 
    1231         step_list : list of int
    1232             The list of steps for a single forecast time.
    1233             Only necessary for pure forecasts.
     1280        maxnum : int
     1281            The maximum number of ensemble members. It is None
     1282            if there are no or just one ensemble.
     1283
     1284        index_keys : dictionary
     1285            List of parameter names which serves as index.
     1286
     1287        index_vals : list of list  of str
     1288            Contains the values from the keys used for a distinct selection
     1289            of grib messages in processing  the grib files.
     1290            Content looks like e.g.:
     1291            index_vals[0]: ('20171106', '20171107', '20171108') ; date
     1292            index_vals[1]: ('0', '1200', '1800', '600') ; time
     1293            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
    12341294
    12351295        c : ControlFile
     
    12421302        '''
    12431303        print('... disaggregation of precipitation with new method.')
    1244         lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
    1245         cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
     1304
     1305        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
     1306
     1307        # initialize new numpy arrays for disaggregated fields
     1308        if maxnum:
     1309            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
     1310            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
     1311        else:
     1312            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
     1313            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
    12461314
    12471315        # do the disaggregation, but neglect the last value of the
    1248         # time series. This one corresponds for example to 24 hour,
    1249         # which we don't need.
    1250         for ix in range(ni*nj):
    1251             lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
    1252             cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
     1316        # original time series. This one corresponds for example to
     1317        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
     1318        if maxnum:
     1319            for inum in range(maxnum):
     1320                for ix in range(ni*nj):
     1321                    lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1]
     1322                    cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1]
     1323        else:
     1324            for ix in range(ni*nj):
     1325                lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[0,ix,:])[:-1]
     1326                cp_new_np[ix,:] = disaggregation.IA3(cp_np[0,ix,:])[:-1]
    12531327
    12541328        # write to grib files (full/orig times to flux file and inbetween
    12551329        # times into seperate end files)
    12561330        print('... write disaggregated precipitation to files.')
     1331
     1332        if maxnum:
     1333            # remember the index of the number values
     1334            index_number = index_keys.index('number')
     1335            # empty set to save unique ensemble numbers which were already processed
     1336            ens_numbers = set()
     1337            # index for the ensemble number
     1338            inumb = 0
     1339        else:
     1340            inumb = 0
     1341
    12571342        # index variable of disaggregated fields
    12581343        it = 0
    1259         # loop over times and write original time step and the two newly
    1260         # generated sub time steps for each loop
    1261         for date in date_list:
    1262             for step in step_list:
    1263                 tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
    1264 
    1265                 if c.purefc:
    1266                     fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
    1267                                    '.{:0>3}'.format(step)
    1268                     filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
    1269                                 '.{:0>3}'.format(step) + '_1'
    1270                     filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
    1271                                 '.{:0>3}'.format(step) + '_2'
    1272                 else:
    1273                     fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
    1274                     filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
    1275                     filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
    1276 
    1277                 # write original time step to flux file as usual
    1278                 fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
    1279                 fluxfile.set_keys(tmpfile, filemode='a', strict=True,
    1280                                   wherekeynames=['paramId'], wherekeyvalues=[142],
    1281                                   keynames=['date','time','stepRange','values'],
    1282                                   keyvalues=[int(date.strftime('%Y%m%d')),
    1283                                              date.hour*100, step, lsp_new_np[:,it]],
    1284                                  )
    1285                 fluxfile.set_keys(tmpfile, filemode='a', strict=True,
    1286                                   wherekeynames=['paramId'], wherekeyvalues=[143],
    1287                                   keynames=['date','time','stepRange','values'],
    1288                                   keyvalues=[int(date.strftime('%Y%m%d')),
    1289                                              date.hour*100, step, cp_new_np[:,it]]
    1290                                  )
    1291 
    1292                 # write first subgrid time step
    1293                 endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
    1294                 endfile1.set_keys(tmpfile, filemode='w', strict=True,
    1295                                   wherekeynames=['paramId'], wherekeyvalues=[142],
    1296                                   keynames=['date','time','stepRange','values'],
    1297                                   keyvalues=[int(date.strftime('%Y%m%d')),
    1298                                              date.hour*100, step, lsp_new_np[:,it+1]]
    1299                                   )
    1300                 endfile1.set_keys(tmpfile, filemode='a', strict=True,
    1301                                   wherekeynames=['paramId'], wherekeyvalues=[143],
    1302                                   keynames=['date','time','stepRange','values'],
    1303                                   keyvalues=[int(date.strftime('%Y%m%d')),
    1304                                              date.hour*100, step, cp_new_np[:,it+1]]
    1305                                  )
    1306 
    1307                 # write second subgrid time step
    1308                 endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
    1309                 endfile2.set_keys(tmpfile, filemode='w', strict=True,
    1310                                   wherekeynames=['paramId'], wherekeyvalues=[142],
    1311                                   keynames=['date','time','stepRange','values'],
    1312                                   keyvalues=[int(date.strftime('%Y%m%d')),
    1313                                              date.hour*100, step, lsp_new_np[:,it+2]]
    1314                                  )
    1315                 endfile2.set_keys(tmpfile, filemode='a', strict=True,
    1316                                   wherekeynames=['paramId'], wherekeyvalues=[143],
    1317                                   keynames=['date','time','stepRange','values'],
    1318                                   keyvalues=[int(date.strftime('%Y%m%d')),
    1319                                              date.hour*100, step, cp_new_np[:,it+2]]
    1320                                  )
    1321                 it = it + 3 # jump to next original time step
     1344
     1345        # "product" genereates each possible combination between the
     1346        # values of the index keys
     1347        for prod in product(*index_vals):
     1348            # e.g. prod = ('20170505', '0', '12')
     1349            #             ( date     ,time, step)
     1350            # or   prod = ('0'   , '20170505', '0', '12')
     1351            #             (number, date      ,time, step)
     1352
     1353            cdate = prod[index_keys.index('date')]
     1354            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
     1355            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
     1356
     1357            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
     1358            date += timedelta(hours=int(cstep))
     1359
     1360            start_period, end_period = generate_retrieval_period_boundary(c)
     1361            # skip all temporary times
     1362            # which are outside the retrieval period
     1363            if date < start_period or \
     1364               date > end_period:
     1365                continue
     1366
     1367            # the whole process has to be done for each seperate ensemble member
     1368            # therefore, for each new ensemble member we delete old flux values
     1369            # and start collecting flux data from the beginning time step
     1370            if maxnum and prod[index_number] not in ens_numbers:
     1371                ens_numbers.add(prod[index_number])
     1372                inumb = int(prod[index_number])
     1373                it = 0
     1374
     1375            # if necessary, add ensemble member number to filename suffix
     1376            # otherwise, add empty string
     1377            if maxnum:
     1378                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
     1379            else:
     1380                numbersuffix = ''
     1381
     1382            # per original time stamp: write original time step and
     1383            # the two newly generated sub time steps
     1384            if c.purefc:
     1385                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
     1386            else:
     1387                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
     1388
     1389            # write original time step to flux file as usual
     1390            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
     1391            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
     1392                              wherekeynames=['paramId'], wherekeyvalues=[142],
     1393                              keynames=['perturbationNumber','date','time','stepRange','values'],
     1394                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
     1395                                         date.hour*100, 0, lsp_new_np[inumb,:,it]],
     1396                             )
     1397            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
     1398                              wherekeynames=['paramId'], wherekeyvalues=[143],
     1399                              keynames=['perturbationNumber','date','time','stepRange','values'],
     1400                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
     1401                                         date.hour*100, 0, cp_new_np[inumb,:,it]]
     1402                             )
     1403
     1404            # rr for first subgrid point is identified by step = 1
     1405            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
     1406                              wherekeynames=['paramId'], wherekeyvalues=[142],
     1407                              keynames=['perturbationNumber','date','time','stepRange','values'],
     1408                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
     1409                                         date.hour*100, '1', lsp_new_np[inumb,:,it+1]]
     1410                              )
     1411            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
     1412                              wherekeynames=['paramId'], wherekeyvalues=[143],
     1413                              keynames=['perturbationNumber','date','time','stepRange','values'],
     1414                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
     1415                                         date.hour*100, '1', cp_new_np[inumb,:,it+1]]
     1416                              )
     1417
     1418            # rr for second subgrid point is identified by step = 2
     1419            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
     1420                              wherekeynames=['paramId'], wherekeyvalues=[142],
     1421                              keynames=['perturbationNumber','date','time','stepRange','values'],
     1422                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
     1423                                         date.hour*100, '2', lsp_new_np[inumb,:,it+2]]
     1424                              )
     1425            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
     1426                              wherekeynames=['paramId'], wherekeyvalues=[143],
     1427                              keynames=['perturbationNumber','date','time','stepRange','values'],
     1428                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
     1429                                         date.hour*100, '2', cp_new_np[inumb,:,it+2]]
     1430                              )
     1431
     1432            it = it + 3 # jump to next original time step in rr fields
    13221433        return
    13231434
     
    15781689            # collect for final processing
    15791690            self.outputfilelist.append(os.path.basename(fnout))
    1580             # get additional precipitation subgrid data if available
    1581             if c.rrint:
    1582                 self.outputfilelist.append(os.path.basename(fnout + '_1'))
    1583                 self.outputfilelist.append(os.path.basename(fnout + '_2'))
     1691            # # get additional precipitation subgrid data if available
     1692            # if c.rrint:
     1693                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
     1694                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
    15841695# ============================================================================================
    15851696            # create outputfile and copy all data from intermediate files
     
    16141725        ''' Calculates extra ensemble members for ELDA - Stream.
    16151726
     1727        This is a specific feature which doubles the number of ensemble members
     1728        for the ELDA Stream.
     1729
    16161730        Parameters
    16171731        ----------
     
    16431757
    16441758            filename = cffile.split('N000')[0]
    1645             for i in range(1, maxnum+1): # max number nehmen
     1759            for i in range(1, maxnum + 1):
    16461760
    16471761                # read an ensemble member
     
    16561770                        break
    16571771                    values = codes_get_array(gid, 'values')
     1772                    # generate a new ensemble member by subtracting
     1773                    # 2 * ( current time step value - last time step value )
    16581774                    codes_set_array(gid, 'values',
    16591775                                    values-2*(values-cfvalues[j]))
     
    17041820
    17051821        print('Output filelist: ')
    1706         print(self.outputfilelist)
     1822        print(sorted(self.outputfilelist))
    17071823
    17081824        for ofile in self.outputfilelist:
Note: See TracChangeset for help on using the changeset viewer.
hosted by ZAMG