source: flex_extract.git/source/python/classes/EcFlexpart.py @ 27fe969

ctbtodev
Last change on this file since 27fe969 was 27fe969, checked in by Anne Philipp <anne.philipp@…>, 6 years ago

put grib2flexpart part of code into its own function; changed function documentation of input controlfile parameter

  • Property mode set to 100644
File size: 52.7 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import subprocess
77import shutil
78import os
79import glob
80from datetime import datetime, timedelta
81import numpy as np
82from gribapi import grib_set, grib_index_select, grib_new_from_index, grib_get,\
83                    grib_write, grib_get_values, grib_set_values, grib_release,\
84                    grib_index_release, grib_index_get
85
86# software specific classes and modules from flex_extract
87import _config
88from GribTools import GribTools
89from mods.tools import init128, to_param_id, silent_remove, product, my_error
90from MarsRetrieval import MarsRetrieval
91import mods.disaggregation
92
93# ------------------------------------------------------------------------------
94# CLASS
95# ------------------------------------------------------------------------------
96class EcFlexpart(object):
97    '''
98    Class to retrieve FLEXPART specific ECMWF data.
99    '''
100    # --------------------------------------------------------------------------
101    # CLASS FUNCTIONS
102    # --------------------------------------------------------------------------
103    def __init__(self, c, fluxes=False):
104        '''
105        @Description:
106            Creates an object/instance of EcFlexpart with the
107            associated settings of its attributes for the retrieval.
108
109        @Input:
110            self: instance of EcFlexpart
111                The current object of the class.
112
113            c: instance of class ControlFile
114                Contains all the parameters of CONTROL file and
115                command line.
116                For more information about format and content of the parameter
117                see documentation.
118
119            fluxes: boolean, optional
120                Decides if the flux parameter settings are stored or
121                the rest of the parameter list.
122                Default value is False.
123
124        @Return:
125            <nothing>
126        '''
127        # set a counter for the number of mars requests generated
128        self.mreq_count = 0
129
130        # different mars types for retrieving data for flexpart
131        self.types = dict()
132
133        if c.maxstep > len(c.type):    # Pure forecast mode
134            c.type = [c.type[1]]
135            c.step = ['{:0>3}'.format(int(c.step[0]))]
136            c.time = [c.time[0]]
137            for i in range(1, c.maxstep + 1):
138                c.type.append(c.type[0])
139                c.step.append('{:0>3}'.format(i))
140                c.time.append(c.time[0])
141
142        self.inputdir = c.inputdir
143        self.basetime = c.basetime
144        self.dtime = c.dtime
145        i = 0
146        if fluxes and c.maxstep <= 24:
147            # no forecast beyond one day is needed!
148            # Thus, prepare flux data manually as usual
149            # with only forecast fields with start times at 00/12
150            # (but without 00/12 fields since these are
151            # the initialisation times of the flux fields
152            # and therefore are zero all the time)
153            self.types[c.type[1]] = {'times': '00/12', 'steps':
154                                     '{}/to/12/by/{}'.format(c.dtime, c.dtime)}
155        else:
156            for ty, st, ti in zip(c.type, c.step, c.time):
157                btlist = range(24)
158                if c.basetime == '12':
159                    btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
160                if c.basetime == '00':
161                    btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
162
163                if i % int(c.dtime) == 0 and (i in btlist or c.maxstep > 24):
164
165                    if ty not in self.types.keys():
166                        self.types[ty] = {'times': '', 'steps': ''}
167
168                    if ti not in self.types[ty]['times']:
169                        if self.types[ty]['times']:
170                            self.types[ty]['times'] += '/'
171                        self.types[ty]['times'] += ti
172
173                    if st not in self.types[ty]['steps']:
174                        if self.types[ty]['steps']:
175                            self.types[ty]['steps'] += '/'
176                        self.types[ty]['steps'] += st
177                i += 1
178
179        self.marsclass = c.marsclass
180        self.stream = c.stream
181        self.number = c.number
182        self.resol = c.resol
183        self.accuracy = c.accuracy
184        self.level = c.level
185        self.expver = c.expver
186        self.levelist = c.levelist
187        # for gaussian grid retrieval
188        self.glevelist = '1/to/' + c.level
189
190        if hasattr(c, 'gaussian') and c.gaussian:
191            self.gaussian = c.gaussian
192        else:
193            self.gaussian = ''
194
195        if 'N' in c.grid:  # Gaussian output grid
196            self.grid = c.grid
197            self.area = 'G'
198        else:
199            self.grid = '{}/{}'.format(int(c.grid) / 1000., int(c.grid) / 1000.)
200            self.area = '{}/{}/{}/{}'.format(int(c.upper) / 1000.,
201                                             int(c.left) / 1000.,
202                                             int(c.lower) / 1000.,
203                                             int(c.right) / 1000.)
204
205        self.outputfilelist = []
206
207
208        # Now comes the nasty part that deals with the different
209        # scenarios we have:
210        # 1) Calculation of etadot on
211        #    a) Gaussian grid
212        #    b) Output grid
213        #    c) Output grid using parameter 77 retrieved from MARS
214        # 3) Calculation/Retrieval of omega
215        # 4) Download also data for WRF
216
217        # Different grids need different retrievals
218        # SH = Spherical Harmonics, GG = Gaussian Grid,
219        # OG = Output Grid, ML = MultiLevel, SL = SingleLevel
220        self.params = {'SH__ML': '', 'SH__SL': '',
221                       'GG__ML': '', 'GG__SL': '',
222                       'OG__ML': '', 'OG__SL': '',
223                       'OG_OROLSM_SL': '', 'OG_acc_SL': ''}
224        # the self.params dictionary stores a list of
225        # [param, levtype, levelist, grid] per key
226
227        if fluxes is False:
228            self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
229            #                        "SD/MSL/TCC/10U/10V/2T/2D/129/172"
230            self.params['OG__SL'] = ["141/151/164/165/166/167/168/129/172", \
231                                     'SFC', '1', self.grid]
232            if c.addpar:
233                if c.addpar[0] == '/':
234                    c.addpar = c.addpar[1:]
235                self.params['OG__SL'][0] += '/' + '/'.join(c.addpar)
236
237            self.params['OG_OROLSM__SL'] = ["160/27/28/173", \
238                                            'SFC', '1', self.grid]
239
240            self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
241
242            #if c.gauss == '0' and c.eta == '1':
243            if not c.gauss and c.eta:
244                # the simplest case
245                self.params['OG__ML'][0] += '/U/V/77'
246            #elif c.gauss == '0' and c.eta == '0':
247            elif not c.gauss and not c.eta:
248            # this is not recommended (inaccurate)
249                self.params['OG__ML'][0] += '/U/V'
250            #elif c.gauss == '1' and c.eta == '0':
251            elif c.gauss and not c.eta:
252                # this is needed for data before 2008, or for reanalysis data
253                self.params['GG__SL'] = ['Q', 'ML', '1', \
254                                         '{}'.format((int(self.resol) + 1) / 2)]
255                self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
256            else:
257                print('Warning: This is a very costly parameter combination, \
258                      use only for debugging!')
259                self.params['GG__SL'] = ['Q', 'ML', '1', \
260                                         '{}'.format((int(self.resol) + 1) / 2)]
261                self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
262                                         '{}'.format((int(self.resol) + 1) / 2)]
263
264            if c.omega:
265                self.params['OG__ML'][0] += '/W'
266
267            # add cloud water content if necessary
268            if c.cwc:
269                self.params['OG__ML'][0] += '/CLWC/CIWC'
270
271            # add vorticity and geopotential height for WRF if necessary
272            if c.wrf:
273                self.params['OG__ML'][0] += '/Z/VO'
274                if '/D' not in self.params['OG__ML'][0]:
275                    self.params['OG__ML'][0] += '/D'
276                #wrf_sfc = 'sp/msl/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/stl1/ /
277                #           stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4'.upper()
278                wrf_sfc = '134/235/167/165/166/168/129/172/34/31/141/ \
279                           139/170/183/236/39/40/41/42'
280                lwrt_sfc = wrf_sfc.split('/')
281                for par in lwrt_sfc:
282                    if par not in self.params['OG__SL'][0]:
283                        self.params['OG__SL'][0] += '/' + par
284
285        else:
286            self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
287                                        'SFC', '1', self.grid]
288
289        # if needed, add additional WRF specific parameters here
290
291        return
292
293
294    def _mk_targetname(self, ftype, param, date):
295        '''
296        @Description:
297            Creates the filename for the requested grib data to be stored in.
298            This name is passed as the "target" parameter in the request.
299
300        @Input:
301            ftype: string
302                Shortcut name of the type of the field. E.g. AN, FC, PF, ...
303
304            param: string
305                Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
306                GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
307
308            date: string
309                The date period of the grib data to be stored in this file.
310
311        @Return:
312            targetname: string
313                The target filename for the grib data.
314        '''
315        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
316                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
317
318        return targetname
319
320
321    def _start_retrievement(self, request, par_dict):
322        '''
323        @Description:
324            Creates the Mars Retrieval and prints or submits the request
325            depending on the status of the request variable.
326
327        @Input:
328            self: instance of EcFlexpart
329                The current object of the class.
330
331            request: integer
332                Selects the mode of retrieval.
333                0: Retrieves the data from ECMWF.
334                1: Prints the mars requests to an output file.
335                2: Retrieves the data and prints the mars request.
336
337            par_dict: dictionary
338                Contains all parameter which have to be set for creating the
339                Mars Retrievals. The parameter are:
340                marsclass, stream, type, levtype, levelist, resol, gaussian,
341                accuracy, grid, target, area, date, time, number, step, expver,
342                param
343
344        @Return:
345            <nothing>
346        '''
347        # increase number of mars requests
348        self.mreq_count += 1
349
350        MR = MarsRetrieval(self.server,
351                           marsclass=par_dict['marsclass'],
352                           stream=par_dict['stream'],
353                           type=par_dict['type'],
354                           levtype=par_dict['levtype'],
355                           levelist=par_dict['levelist'],
356                           resol=par_dict['resol'],
357                           gaussian=par_dict['gaussian'],
358                           accuracy=par_dict['accuracy'],
359                           grid=par_dict['grid'],
360                           target=par_dict['target'],
361                           area=par_dict['area'],
362                           date=par_dict['date'],
363                           time=par_dict['time'],
364                           number=par_dict['number'],
365                           step=par_dict['step'],
366                           expver=par_dict['expver'],
367                           param=par_dict['param'])
368
369        if request == 0:
370            MR.display_info()
371            MR.data_retrieve()
372        elif request == 1:
373            MR.print_infodata_csv(self.inputdir, self.mreq_count)
374        elif request == 2:
375            MR.print_infodata_csv(self.inputdir, self.mreq_count)
376            MR.display_info()
377            MR.data_retrieve()
378        else:
379            print('Failure')
380
381        return
382
383
384    def retrieve(self, server, dates, request, inputdir='.'):
385        '''
386        @Description:
387            Finalizing the retrieval information by setting final details
388            depending on grid type.
389            Prepares MARS retrievals per grid type and submits them.
390
391        @Input:
392            self: instance of EcFlexpart
393                The current object of the class.
394
395            server: instance of ECMWFService or ECMWFDataServer
396                The connection to the ECMWF server. This is different
397                for member state users which have full access and non
398                member state users which have only access to the public
399                data sets. The decision is made from command line argument
400                "public"; for public access its True (ECMWFDataServer)
401                for member state users its False (ECMWFService)
402
403            dates: string
404                Contains start and end date of the retrieval in the format
405                "YYYYMMDD/to/YYYYMMDD"
406
407            request: integer
408                Selects the mode of retrieval.
409                0: Retrieves the data from ECMWF.
410                1: Prints the mars requests to an output file.
411                2: Retrieves the data and prints the mars request.
412
413            inputdir: string, optional
414                Path to the directory where the retrieved data is about
415                to be stored. The default is the current directory ('.').
416
417        @Return:
418            <nothing>
419        '''
420        self.dates = dates
421        self.server = server
422        self.inputdir = inputdir
423        oro = False
424
425        # define times with datetime module
426        t12h = timedelta(hours=12)
427        t24h = timedelta(hours=24)
428
429        # dictionary which contains all parameter for the mars request
430        # entries with a "None" will change in different requests and will
431        # therefore be set in each request seperately
432        retr_param_dict = {'marsclass':self.marsclass,
433                           'stream':None,
434                           'type':None,
435                           'levtype':None,
436                           'levelist':None,
437                           'resol':self.resol,
438                           'gaussian':None,
439                           'accuracy':self.accuracy,
440                           'grid':None,
441                           'target':None,
442                           'area':None,
443                           'date':None,
444                           'time':None,
445                           'number':self.number,
446                           'step':None,
447                           'expver':self.expver,
448                           'param':None}
449
450        for ftype in self.types:
451            # fk contains field types such as
452            #     [AN, FC, PF, CV]
453            # fv contains all of the items of the belonging key
454            #     [times, steps]
455            for pk, pv in self.params.iteritems():
456                # pk contains one of these keys of params
457                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
458                #      OG_OROLSM_SL, OG_acc_SL]
459                # pv contains all of the items of the belonging key
460                #     [param, levtype, levelist, grid]
461                if isinstance(pv, str):
462                    continue
463                retr_param_dict['type'] = '' + ftype
464                retr_param_dict['time'] = self.types[ftype]['times']
465                retr_param_dict['step'] = self.types[ftype]['steps']
466                retr_param_dict['date'] = self.dates
467                retr_param_dict['stream'] = self.stream
468                retr_param_dict['target'] = \
469                    self._mk_targetname(ftype, pk,
470                                        retr_param_dict['date'].split('/')[0])
471                retr_param_dict['param'] = pv[0]
472                retr_param_dict['levtype'] = pv[1]
473                retr_param_dict['levelist'] = pv[2]
474                retr_param_dict['grid'] = pv[3]
475                retr_param_dict['area'] = self.area
476                retr_param_dict['gaussian'] = self.gaussian
477
478                if pk == 'OG__SL':
479                    pass
480                if pk == 'OG_OROLSM__SL' and not oro:
481                    oro = True
482                    retr_param_dict['stream'] = 'OPER'
483                    retr_param_dict['type'] = 'AN'
484                    retr_param_dict['time'] = '00'
485                    retr_param_dict['step'] = '000'
486                    retr_param_dict['date'] = self.dates.split('/')[0]
487                    retr_param_dict['target'] = self._mk_targetname('',
488                                            pk, retr_param_dict['date'])
489                elif pk == 'OG_OROLSM__SL' and oro:
490                    continue
491                if pk == 'GG__SL' and pv[0] == 'Q':
492                    retr_param_dict['area'] = ""
493                    retr_param_dict['gaussian'] = 'reduced'
494
495    # ------  on demand path  --------------------------------------------------
496                if not self.basetime:
497                    # ******* start retrievement
498                    self._start_retrievement(request, retr_param_dict)
499    # ------  operational path  ------------------------------------------------
500                else:
501                    # check if mars job requests fields beyond basetime.
502                    # if yes eliminate those fields since they may not
503                    # be accessible with user's credentials
504
505                    enddate = retr_param_dict['date'].split('/')[-1]
506                    elimit = datetime.strptime(enddate + self.basetime,
507                                               '%Y%m%d%H')
508
509                    if self.basetime == '12':
510                        # --------------  flux data ----------------------------
511                        if 'acc' in pk:
512
513                        # Strategy:
514                        # if maxtime-elimit >= 24h reduce date by 1,
515                        # if 12h <= maxtime-elimit<12h reduce time for last date
516                        # if maxtime-elimit<12h reduce step for last time
517                        # A split of the MARS job into 2 is likely necessary.
518
519
520                            startdate = retr_param_dict['date'].split('/')[0]
521                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
522                            retr_param_dict['date'] = '/'.join([startdate,
523                                                                'to',
524                                                                enddate])
525
526                            # ******* start retrievement
527                            self._start_retrievement(request, retr_param_dict)
528
529                            retr_param_dict['date'] = \
530                                datetime.strftime(elimit - t12h, '%Y%m%d')
531                            retr_param_dict['time'] = '00'
532                            retr_param_dict['target'] = \
533                                self._mk_targetname(ftype, pk,
534                                                    retr_param_dict['date'])
535
536                            # ******* start retrievement
537                            self._start_retrievement(request, retr_param_dict)
538
539                        # --------------  non flux data ------------------------
540                        else:
541                            # ******* start retrievement
542                            self._start_retrievement(request, retr_param_dict)
543
544                    else: # basetime = 0
545                        retr_param_dict['date'] = \
546                            datetime.strftime(elimit - t24h, '%Y%m%d')
547
548                        timesave = ''.join(retr_param_dict['time'])
549
550                        if '/' in retr_param_dict['time']:
551                            times = retr_param_dict['time'].split('/')
552                            steps = retr_param_dict['step'].split('/')
553                            while (pk != 'OG_OROLSM__SL' and
554                                   'acc' not in pk and
555                                   (int(times[0]) + int(steps[0])) <= 12):
556                                times = times[1:]
557
558                            if len(times) > 1:
559                                retr_param_dict['time'] = '/'.join(times)
560                            else:
561                                retr_param_dict['time'] = times[0]
562
563                        # ******* start retrievement
564                        self._start_retrievement(request, retr_param_dict)
565
566                        if (pk != 'OG_OROLSM__SL' and
567                            int(retr_param_dict['step'].split('/')[0]) == 0 and
568                            int(timesave.split('/')[0]) == 0):
569
570                            retr_param_dict['date'] = \
571                                datetime.strftime(elimit, '%Y%m%d')
572                            retr_param_dict['time'] = '00'
573                            retr_param_dict['step'] = '000'
574                            retr_param_dict['target'] = \
575                                self._mk_targetname(ftype, pk,
576                                                    retr_param_dict['date'])
577
578                            # ******* start retrievement
579                            self._start_retrievement(request, retr_param_dict)
580
581        if request == 0 or request == 2:
582            print('MARS retrieve done ... ')
583        elif request == 1:
584            print('MARS request printed ...')
585
586        return
587
588
589    def write_namelist(self, c, filename):
590        '''
591        @Description:
592            Creates a namelist file in the temporary directory and writes
593            the following values to it: maxl, maxb, mlevel,
594            mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
595            momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
596
597        @Input:
598            self: instance of EcFlexpart
599                The current object of the class.
600
601            c: instance of class ControlFile
602                Contains all the parameters of CONTROL file and
603                command line.
604                For more information about format and content of the parameter
605                see documentation.
606
607            filename: string
608                Name of the namelist file.
609
610        @Return:
611            <nothing>
612        '''
613
614        self.inputdir = c.inputdir
615        area = np.asarray(self.area.split('/')).astype(float)
616        grid = np.asarray(self.grid.split('/')).astype(float)
617
618        if area[1] > area[3]:
619            area[1] -= 360
620        maxl = int((area[3] - area[1]) / grid[1]) + 1
621        maxb = int((area[0] - area[2]) / grid[0]) + 1
622
623        with open(self.inputdir + '/' + filename, 'w') as f:
624            f.write('&NAMGEN\n')
625            f.write(',\n  '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb),
626                                  'mlevel = ' + str(self.level),
627                                  'mlevelist = ' + '"' + str(self.levelist)
628                                                 + '"',
629                                  'mnauf = ' + str(self.resol),
630                                  'metapar = ' + '77',
631                                  'rlo0 = ' + str(area[1]),
632                                  'rlo1 = ' + str(area[3]),
633                                  'rla0 = ' + str(area[2]),
634                                  'rla1 = ' + str(area[0]),
635                                  'momega = ' + str(c.omega),
636                                  'momegadiff = ' + str(c.omegadiff),
637                                  'mgauss = ' + str(c.gauss),
638                                  'msmooth = ' + str(c.smooth),
639                                  'meta = ' + str(c.eta),
640                                  'metadiff = ' + str(c.etadiff),
641                                  'mdpdeta = ' + str(c.dpdeta)]))
642
643            f.write('\n/\n')
644
645        return
646
647
648    def deacc_fluxes(self, inputfiles, c):
649        '''
650        @Description:
651            Goes through all flux fields in ordered time and de-accumulate
652            the fields. Afterwards the fields are disaggregated in time.
653            Different versions of disaggregation is provided for rainfall
654            data (darain, modified linear) and the surface fluxes and
655            stress data (dapoly, cubic polynomial).
656
657        @Input:
658            self: instance of EcFlexpart
659                The current object of the class.
660
661            inputfiles: instance of UioFiles
662                Contains a list of files.
663
664            c: instance of class ControlFile
665                Contains all the parameters of CONTROL file and
666                command line.
667                For more information about format and content of the parameter
668                see documentation.
669
670        @Return:
671            <nothing>
672        '''
673
674        table128 = init128(_config.PATH_GRIBTABLE)
675        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
676        index_keys = ["date", "time", "step"]
677        indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX)
678        silent_remove(indexfile)
679        grib = GribTools(inputfiles.files)
680        # creates new index file
681        iid = grib.index(index_keys=index_keys, index_file=indexfile)
682
683        # read values of index keys
684        index_vals = []
685        for key in index_keys:
686            key_vals = grib_index_get(iid, key)
687            print(key_vals)
688            # have to sort the steps for disaggregation,
689            # therefore convert to int first
690            if key == 'step':
691                key_vals = [int(k) for k in key_vals]
692                key_vals.sort()
693                key_vals = [str(k) for k in key_vals]
694            index_vals.append(key_vals)
695            # index_vals looks for example like:
696            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
697            # index_vals[1]: ('0', '1200') ; time
698            # index_vals[2]: (3', '6', '9', '12') ; stepRange
699
700        valsdict = {}
701        svalsdict = {}
702        stepsdict = {}
703        for p in pars:
704            valsdict[str(p)] = []
705            svalsdict[str(p)] = []
706            stepsdict[str(p)] = []
707
708        print('maxstep: ', c.maxstep)
709
710        for prod in product(*index_vals):
711            # e.g. prod = ('20170505', '0', '12')
712            #             (  date    ,time, step)
713            # per date e.g. time = 0, 1200
714            # per time e.g. step = 3, 6, 9, 12
715            print('current prod: ', prod)
716            for i in range(len(index_keys)):
717                grib_index_select(iid, index_keys[i], prod[i])
718
719            # get first id from current product
720            gid = grib_new_from_index(iid)
721
722            # if there is data for this product combination
723            # prepare some date and time parameter before reading the data
724            if gid is not None:
725                cdate = grib_get(gid, 'date')
726                time = grib_get(gid, 'time')
727                step = grib_get(gid, 'step')
728                # date+time+step-2*dtime
729                # (since interpolated value valid for step-2*dtime)
730                sdate = datetime(year=cdate/10000,
731                                 month=(cdate % 10000)/100,
732                                 day=(cdate % 100),
733                                 hour=time/100)
734                fdate = sdate + timedelta(hours=step-2*int(c.dtime))
735                sdates = sdate + timedelta(hours=step)
736                elimit = None
737            else:
738                break
739
740            if c.maxstep > 12:
741                fnout = os.path.join(c.inputdir, 'flux' +
742                                     sdate.strftime('%Y%m%d') +
743                                     '.{:0>2}'.format(time/100) +
744                                     '.{:0>3}'.format(step-2*int(c.dtime)))
745                gnout = os.path.join(c.inputdir, 'flux' +
746                                     sdate.strftime('%Y%m%d') +
747                                     '.{:0>2}'.format(time/100) +
748                                     '.{:0>3}'.format(step-int(c.dtime)))
749                hnout = os.path.join(c.inputdir, 'flux' +
750                                     sdate.strftime('%Y%m%d') +
751                                     '.{:0>2}'.format(time/100) +
752                                     '.{:0>3}'.format(step))
753            else:
754                fnout = os.path.join(c.inputdir, 'flux' +
755                                     fdate.strftime('%Y%m%d%H'))
756                gnout = os.path.join(c.inputdir, 'flux' +
757                                     (fdate + timedelta(hours=int(c.dtime))).
758                                     strftime('%Y%m%d%H'))
759                hnout = os.path.join(c.inputdir, 'flux' +
760                                     sdates.strftime('%Y%m%d%H'))
761
762            print("outputfile = " + fnout)
763            f_handle = open(fnout, 'w')
764            g_handle = open(gnout, 'w')
765            h_handle = open(hnout, 'w')
766
767            # read message for message and store relevant data fields
768            # data keywords are stored in pars
769            while 1:
770                if gid is None:
771                    break
772                cparamId = str(grib_get(gid, 'paramId'))
773                step = grib_get(gid, 'step')
774                atime = grib_get(gid, 'time')
775                ni = grib_get(gid, 'Ni')
776                nj = grib_get(gid, 'Nj')
777                if cparamId in valsdict.keys():
778                    values = grib_get_values(gid)
779                    vdp = valsdict[cparamId]
780                    svdp = svalsdict[cparamId]
781                    sd = stepsdict[cparamId]
782
783                    if cparamId == '142' or cparamId == '143':
784                        fak = 1. / 1000.
785                    else:
786                        fak = 3600.
787
788                    values = (np.reshape(values, (nj, ni))).flatten() / fak
789                    vdp.append(values[:])  # save the accumulated values
790                    if step <= int(c.dtime):
791                        svdp.append(values[:] / int(c.dtime))
792                    else:  # deaccumulate values
793                        svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime))
794
795                    print(cparamId, atime, step, len(values),
796                          values[0], np.std(values))
797                    # save the 1/3-hourly or specific values
798                    # svdp.append(values[:])
799                    sd.append(step)
800                    # len(svdp) correspond to the time
801                    if len(svdp) >= 3:
802                        if len(svdp) > 3:
803                            if cparamId == '142' or cparamId == '143':
804                                values = disaggregation.darain(svdp)
805                            else:
806                                values = disaggregation.dapoly(svdp)
807
808                            if not (step == c.maxstep and c.maxstep > 12 \
809                                    or sdates == elimit):
810                                vdp.pop(0)
811                                svdp.pop(0)
812                        else:
813                            if c.maxstep > 12:
814                                values = svdp[1]
815                            else:
816                                values = svdp[0]
817
818                        grib_set_values(gid, values)
819                        if c.maxstep > 12:
820                            grib_set(gid, 'step', max(0, step-2*int(c.dtime)))
821                        else:
822                            grib_set(gid, 'step', 0)
823                            grib_set(gid, 'time', fdate.hour*100)
824                            grib_set(gid, 'date', fdate.year*10000 +
825                                     fdate.month*100+fdate.day)
826                        grib_write(gid, f_handle)
827
828                        if c.basetime:
829                            elimit = datetime.strptime(c.end_date +
830                                                       c.basetime, '%Y%m%d%H')
831                        else:
832                            elimit = sdate + timedelta(2*int(c.dtime))
833
834                        # squeeze out information of last two steps contained
835                        # in svdp
836                        # if step+int(c.dtime) == c.maxstep and c.maxstep>12
837                        # or sdates+timedelta(hours = int(c.dtime))
838                        # >= elimit:
839                        # Note that svdp[0] has not been popped in this case
840
841                        if step == c.maxstep and c.maxstep > 12 or \
842                           sdates == elimit:
843
844                            values = svdp[3]
845                            grib_set_values(gid, values)
846                            grib_set(gid, 'step', 0)
847                            truedatetime = fdate + timedelta(hours=
848                                                             2*int(c.dtime))
849                            grib_set(gid, 'time', truedatetime.hour * 100)
850                            grib_set(gid, 'date', truedatetime.year * 10000 +
851                                     truedatetime.month * 100 +
852                                     truedatetime.day)
853                            grib_write(gid, h_handle)
854
855                            #values = (svdp[1]+svdp[2])/2.
856                            if cparamId == '142' or cparamId == '143':
857                                values = disaggregation.darain(list(reversed(svdp)))
858                            else:
859                                values = disaggregation.dapoly(list(reversed(svdp)))
860
861                            grib_set(gid, 'step', 0)
862                            truedatetime = fdate + timedelta(hours=int(c.dtime))
863                            grib_set(gid, 'time', truedatetime.hour * 100)
864                            grib_set(gid, 'date', truedatetime.year * 10000 +
865                                     truedatetime.month * 100 +
866                                     truedatetime.day)
867                            grib_set_values(gid, values)
868                            grib_write(gid, g_handle)
869
870                    grib_release(gid)
871
872                    gid = grib_new_from_index(iid)
873
874            f_handle.close()
875            g_handle.close()
876            h_handle.close()
877
878        grib_index_release(iid)
879
880        return
881
882
883    def create(self, inputfiles, c):
884        '''
885        @Description:
886            This method is based on the ECMWF example index.py
887            https://software.ecmwf.int/wiki/display/GRIB/index.py
888
889            An index file will be created which depends on the combination
890            of "date", "time" and "stepRange" values. This is used to iterate
891            over all messages in each grib file which were passed through the
892            parameter "inputfiles" to seperate specific parameters into fort.*
893            files. Afterwards the FORTRAN program is called to convert
894            the data fields all to the same grid and put them in one file
895            per unique time step (combination of "date", "time" and
896            "stepRange").
897
898        @Input:
899            self: instance of EcFlexpart
900                The current object of the class.
901
902            inputfiles: instance of UioFiles
903                Contains a list of files.
904
905            c: instance of class ControlFile
906                Contains all the parameters of CONTROL file and
907                command line.
908                For more information about format and content of the parameter
909                see documentation.
910
911        @Return:
912            <nothing>
913        '''
914
915        table128 = init128(_config.PATH_GRIBTABLE)
916        wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
917                            stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
918                            table128)
919
920        index_keys = ["date", "time", "step"]
921        indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX)
922        silent_remove(indexfile)
923        grib = GribTools(inputfiles.files)
924        # creates new index file
925        iid = grib.index(index_keys=index_keys, index_file=indexfile)
926
927        # read values of index keys
928        index_vals = []
929        for key in index_keys:
930            index_vals.append(grib_index_get(iid, key))
931            print(index_vals[-1])
932            # index_vals looks for example like:
933            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
934            # index_vals[1]: ('0', '1200', '1800', '600') ; time
935            # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
936
937        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
938                 '17':None, '19':None, '21':None, '22':None, '20':None}
939
940        for prod in product(*index_vals):
941            # e.g. prod = ('20170505', '0', '12')
942            #             (  date    ,time, step)
943            # per date e.g. time = 0, 1200
944            # per time e.g. step = 3, 6, 9, 12
945            # flag for Fortran program and file merging
946            convertFlag = False
947            print('current prod: ', prod)
948            # e.g. prod = ('20170505', '0', '12')
949            #             (  date    ,time, step)
950            # per date e.g. time = 0, 600, 1200, 1800
951            # per time e.g. step = 0, 3, 6, 9, 12
952            for i in range(len(index_keys)):
953                grib_index_select(iid, index_keys[i], prod[i])
954
955            # get first id from current product
956            gid = grib_new_from_index(iid)
957
958            # if there is data for this product combination
959            # prepare some date and time parameter before reading the data
960            if gid is not None:
961                # Combine all temporary data files into final grib file if
962                # gid is at least one time not None. Therefore set convertFlag
963                # to save information. The Fortran program is also
964                # only executed if convertFlag is True
965                convertFlag = True
966                # remove old fort.* files and open new ones
967                # they are just valid for a single product
968                for k, f in fdict.iteritems():
969                    fortfile = os.path.join(c.inputdir, 'fort.' + k)
970                    silent_remove(fortfile)
971                    fdict[k] = open(fortfile, 'w')
972
973                cdate = str(grib_get(gid, 'date'))
974                time = grib_get(gid, 'time')
975                step = grib_get(gid, 'step')
976                # create correct timestamp from the three time informations
977                # date, time, step
978                timestamp = datetime.strptime(cdate + '{:0>2}'.format(time/100),
979                                              '%Y%m%d%H')
980                timestamp += timedelta(hours=int(step))
981                cdateH = datetime.strftime(timestamp, '%Y%m%d%H')
982
983                if c.basetime:
984                    slimit = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
985                    bt = '23'
986                    if c.basetime == '00':
987                        bt = '00'
988                        slimit = datetime.strptime(c.end_date + bt, '%Y%m%d%H')\
989                            - timedelta(hours=12-int(c.dtime))
990                    if c.basetime == '12':
991                        bt = '12'
992                        slimit = datetime.strptime(c.end_date + bt, '%Y%m%d%H')\
993                            - timedelta(hours=12-int(c.dtime))
994
995                    elimit = datetime.strptime(c.end_date + bt, '%Y%m%d%H')
996
997                    if timestamp < slimit or timestamp > elimit:
998                        continue
999            else:
1000                pass
1001
1002            try:
1003                if c.wrf:
1004                    if 'olddate' not in locals() or cdate != olddate:
1005                        fwrf = open(os.path.join(c.outputdir,
1006                                    'WRF' + cdate + '.{:0>2}'.format(time) +
1007                                    '.000.grb2'), 'w')
1008                        olddate = cdate[:]
1009            except AttributeError:
1010                pass
1011
1012            # helper variable to remember which fields were already used.
1013            savedfields = []
1014            while 1:
1015                if gid is None:
1016                    break
1017                paramId = grib_get(gid, 'paramId')
1018                gridtype = grib_get(gid, 'gridType')
1019                levtype = grib_get(gid, 'typeOfLevel')
1020                if paramId == 133 and gridtype == 'reduced_gg':
1021                # Specific humidity (Q.grb) is used as a template only
1022                # so we need the first we "meet"
1023                    with open(os.path.join(c.inputdir, 'fort.18'), 'w') as fout:
1024                        grib_write(gid, fout)
1025                elif paramId == 131 or paramId == 132:
1026                    grib_write(gid, fdict['10'])
1027                elif paramId == 130:
1028                    grib_write(gid, fdict['11'])
1029                elif paramId == 133 and gridtype != 'reduced_gg':
1030                    grib_write(gid, fdict['17'])
1031                elif paramId == 152:
1032                    grib_write(gid, fdict['12'])
1033                elif paramId == 155 and gridtype == 'sh':
1034                    grib_write(gid, fdict['13'])
1035                elif  paramId in [129, 138, 155] and levtype == 'hybrid' \
1036                        and c.wrf:
1037                    pass
1038                elif paramId == 246 or paramId == 247:
1039                    # cloud liquid water and ice
1040                    if paramId == 246:
1041                        clwc = grib_get_values(gid)
1042                    else:
1043                        clwc += grib_get_values(gid)
1044                        grib_set_values(gid, clwc)
1045                        grib_set(gid, 'paramId', 201031)
1046                        grib_write(gid, fdict['22'])
1047                elif paramId == 135:
1048                    grib_write(gid, fdict['19'])
1049                elif paramId == 77:
1050                    grib_write(gid, fdict['21'])
1051                else:
1052                    if paramId not in savedfields:
1053                        grib_write(gid, fdict['16'])
1054                        savedfields.append(paramId)
1055                    else:
1056                        print('duplicate ' + str(paramId) + ' not written')
1057
1058                try:
1059                    if c.wrf:
1060                        # model layer
1061                        if levtype == 'hybrid' and \
1062                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1063                            grib_write(gid, fwrf)
1064                        # sfc layer
1065                        elif paramId in wrfpars:
1066                            grib_write(gid, fwrf)
1067                except AttributeError:
1068                    pass
1069
1070                grib_release(gid)
1071                gid = grib_new_from_index(iid)
1072
1073            for f in fdict.values():
1074                f.close()
1075
1076            # call for Fortran program if flag is True
1077            if convertFlag:
1078                pwd = os.getcwd()
1079                os.chdir(c.inputdir)
1080                if os.stat('fort.21').st_size == 0 and c.eta:
1081                    print('Parameter 77 (etadot) is missing, most likely it is \
1082                           not available for this type or date/time\n')
1083                    print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1084                    my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1085                             is set to 1 in CONTROL file')
1086
1087                # create the corresponding output file fort.15
1088                # (generated by Fortran program) + fort.16 (paramId 167 and 168)
1089                p = subprocess.check_call([os.path.join(
1090                    c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1091                os.chdir(pwd)
1092
1093                # create final output filename, e.g. EN13040500 (ENYYMMDDHH)
1094                fnout = os.path.join(c.inputdir, c.prefix)
1095                if c.maxstep > 12:
1096                    suffix = cdate[2:8] + '.{:0>2}'.format(time/100) + \
1097                             '.{:0>3}'.format(step)
1098                else:
1099                    suffix = cdateH[2:10]
1100                fnout += suffix
1101                print("outputfile = " + fnout)
1102                self.outputfilelist.append(fnout) # needed for final processing
1103
1104                # create outputfile and copy all data from intermediate files
1105                # to the outputfile (final GRIB files)
1106                orolsm = os.path.basename(glob.glob(
1107                    c.inputdir + '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1108                fluxfile = 'flux' + cdate[0:2] + suffix
1109                if not c.cwc:
1110                    flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1111                else:
1112                    flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1113
1114                with open(fnout, 'wb') as fout:
1115                    for f in flist:
1116                        shutil.copyfileobj(open(os.path.join(c.inputdir, f),
1117                                                'rb'), fout)
1118
1119                if c.omega:
1120                    with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1121                        shutil.copyfileobj(
1122                            open(os.path.join(c.inputdir, 'fort.25'),
1123                                 'rb'), fout)
1124            else:
1125                pass
1126
1127        if c.wrf:
1128            fwrf.close()
1129
1130        grib_index_release(iid)
1131
1132        return
1133
1134
1135    def process_output(self, c):
1136        '''
1137        @Description:
1138            The grib files are postprocessed depending on the selection in
1139            CONTROL file. The resulting files are moved to the output
1140            directory if its not equal to the input directory.
1141            The following modifications might be done if
1142            properly switched in CONTROL file:
1143            GRIB2 - Conversion to GRIB2
1144            ECTRANS - Transfer of files to gateway server
1145            ECSTORAGE - Storage at ECMWF server
1146
1147        @Input:
1148            self: instance of EcFlexpart
1149                The current object of the class.
1150
1151            c: instance of class ControlFile
1152                Contains all the parameters of CONTROL file and
1153                command line.
1154                For more information about format and content of the parameter
1155                see documentation.
1156
1157        @Return:
1158            <nothing>
1159
1160        '''
1161
1162        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1163
1164        if not c.ecapi:
1165            print('ecstorage: {}\n ecfsdir: {}\n'.
1166                  format(c.ecstorage, c.ecfsdir))
1167            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1168                  .format(c.ectrans, c.gateway, c.destination))
1169
1170        print('Output filelist: \n')
1171        print(self.outputfilelist)
1172
1173        if c.format.lower() == 'grib2':
1174            for ofile in self.outputfilelist:
1175                p = subprocess.check_call(['grib_set', '-s', 'edition=2, \
1176                                           productDefinitionTemplateNumber=8',
1177                                           ofile, ofile + '_2'])
1178                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1179
1180        if c.ectrans and not c.ecapi:
1181            for ofile in self.outputfilelist:
1182                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1183                                           c.gateway, '-remote', c.destination,
1184                                           '-source', ofile])
1185
1186        if c.ecstorage and not c.ecapi:
1187            for ofile in self.outputfilelist:
1188                p = subprocess.check_call(['ecp', '-o', ofile,
1189                                           os.path.expandvars(c.ecfsdir)])
1190
1191        if c.outputdir != c.inputdir:
1192            for ofile in self.outputfilelist:
1193                p = subprocess.check_call(['mv', ofile, c.outputdir])
1194
1195        return
1196
1197
1198    def prepare_fp_files(self, c):
1199        '''
1200        @Description:
1201            Conversion of GRIB files to FLEXPART binary format.
1202
1203        @Input:
1204            c: instance of class ControlFile
1205                Contains all the parameters of CONTROL file and
1206                command line.
1207                For more information about format and content of the parameter
1208                see documentation.
1209
1210
1211        @Return:
1212            <nothing>
1213        '''
1214        # generate AVAILABLE file
1215        # Example of AVAILABLE file data:
1216        # 20131107 000000      EN13110700              ON DISC
1217        clist = []
1218        for ofile in self.outputfilelist:
1219            fname = ofile.split('/')
1220            if '.' in fname[-1]:
1221                l = fname[-1].split('.')
1222                timestamp = datetime.strptime(l[0][-6:] + l[1],
1223                                              '%y%m%d%H')
1224                timestamp += timedelta(hours=int(l[2]))
1225                cdate = datetime.strftime(timestamp, '%Y%m%d')
1226                chms = datetime.strftime(timestamp, '%H%M%S')
1227            else:
1228                cdate = '20' + fname[-1][-8:-2]
1229                chms = fname[-1][-2:] + '0000'
1230            clist.append(cdate + ' ' + chms + ' '*6 +
1231                         fname[-1] + ' '*14 + 'ON DISC')
1232        clist.sort()
1233        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1234            f.write('\n'.join(clist) + '\n')
1235
1236        # generate pathnames file
1237        pwd = os.path.abspath(c.outputdir)
1238        with open(pwd + '/pathnames', 'w') as f:
1239            f.write(pwd + '/Options/\n')
1240            f.write(pwd + '/\n')
1241            f.write(pwd + '/\n')
1242            f.write(pwd + '/AVAILABLE\n')
1243            f.write(' = == = == = == = == = == ==  = \n')
1244
1245        # create Options dir if necessary
1246        if not os.path.exists(pwd + '/Options'):
1247            os.makedirs(pwd+'/Options')
1248
1249        # read template COMMAND file
1250        with open(os.path.expandvars(os.path.expanduser(
1251            c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f:
1252            lflist = f.read().split('\n')
1253
1254        # find index of list where to put in the
1255        # date and time information
1256        # usually after the LDIRECT parameter
1257        i = 0
1258        for l in lflist:
1259            if 'LDIRECT' in l.upper():
1260                break
1261            i += 1
1262
1263        # insert the date and time information of run start and end
1264        # into the list of lines of COMMAND file
1265        lflist = lflist[:i+1] + \
1266                 [clist[0][:16], clist[-1][:16]] + \
1267                 lflist[i+3:]
1268
1269        # write the new COMMAND file
1270        with open(pwd + '/Options/COMMAND', 'w') as g:
1271            g.write('\n'.join(lflist) + '\n')
1272
1273        # change to outputdir and start the grib2flexpart run
1274        # afterwards switch back to the working dir
1275        os.chdir(c.outputdir)
1276        p = subprocess.check_call([
1277            os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts))
1278            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1279        os.chdir(pwd)
1280
1281        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG