source: flex_extract.git/source/python/classes/EcFlexpart.py @ ca867de

dev
Last change on this file since ca867de was ca867de, checked in by Anne Philipp <anne.philipp@…>, 14 months ago

refactored functions in EcFlexpart? and did some minor changes

  • Property mode set to 100644
File size: 53.2 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83from gribapi import grib_set, grib_index_select, grib_new_from_index, grib_get,\
84                    grib_write, grib_get_values, grib_set_values, grib_release,\
85                    grib_index_release, grib_index_get
86
87# software specific classes and modules from flex_extract
88sys.path.append('../')
89import _config
90from GribTools import GribTools
91from mods.tools import init128, to_param_id, silent_remove, product, my_error
92from MarsRetrieval import MarsRetrieval
93import mods.disaggregation as disaggregation
94
95# ------------------------------------------------------------------------------
96# CLASS
97# ------------------------------------------------------------------------------
98class EcFlexpart(object):
99    '''
100    Class to retrieve FLEXPART specific ECMWF data.
101    '''
102    # --------------------------------------------------------------------------
103    # CLASS FUNCTIONS
104    # --------------------------------------------------------------------------
105    def __init__(self, c, fluxes=False):
106        '''
107        @Description:
108            Creates an object/instance of EcFlexpart with the
109            associated settings of its attributes for the retrieval.
110
111        @Input:
112            self: instance of EcFlexpart
113                The current object of the class.
114
115            c: instance of class ControlFile
116                Contains all the parameters of CONTROL file and
117                command line.
118                For more information about format and content of the parameter
119                see documentation.
120
121            fluxes: boolean, optional
122                Decides if the flux parameter settings are stored or
123                the rest of the parameter list.
124                Default value is False.
125
126        @Return:
127            <nothing>
128        '''
129        # set a counter for the number of mars requests generated
130        self.mreq_count = 0
131
132        # different mars types for retrieving data for flexpart
133        self.types = dict()
134
135        if c.maxstep > len(c.type):    # Pure forecast mode
136            c.type = [c.type[1]]
137            c.step = ['{:0>3}'.format(int(c.step[0]))]
138            c.time = [c.time[0]]
139            for i in range(1, c.maxstep + 1):
140                c.type.append(c.type[0])
141                c.step.append('{:0>3}'.format(i))
142                c.time.append(c.time[0])
143
144        self.inputdir = c.inputdir
145        self.basetime = c.basetime
146        self.dtime = c.dtime
147        i = 0
148        if fluxes and c.maxstep <= 24:
149            # no forecast beyond one day is needed!
150            # Thus, prepare flux data manually as usual
151            # with only forecast fields with start times at 00/12
152            # (but without 00/12 fields since these are
153            # the initialisation times of the flux fields
154            # and therefore are zero all the time)
155            self.types[c.type[1]] = {'times': '00/12', 'steps':
156                                     '{}/to/12/by/{}'.format(c.dtime, c.dtime)}
157        else:
158            for ty, st, ti in zip(c.type, c.step, c.time):
159                btlist = range(24)
160                if c.basetime == '12':
161                    btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
162                if c.basetime == '00':
163                    btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
164
165                if i % int(c.dtime) == 0 and (i in btlist or c.maxstep > 24):
166
167                    if ty not in self.types.keys():
168                        self.types[ty] = {'times': '', 'steps': ''}
169
170                    if ti not in self.types[ty]['times']:
171                        if self.types[ty]['times']:
172                            self.types[ty]['times'] += '/'
173                        self.types[ty]['times'] += ti
174
175                    if st not in self.types[ty]['steps']:
176                        if self.types[ty]['steps']:
177                            self.types[ty]['steps'] += '/'
178                        self.types[ty]['steps'] += st
179                i += 1
180
181        self.marsclass = c.marsclass
182        self.stream = c.stream
183        self.number = c.number
184        self.resol = c.resol
185        self.accuracy = c.accuracy
186        self.level = c.level
187        self.expver = c.expver
188        self.levelist = c.levelist
189        # for gaussian grid retrieval
190        self.glevelist = '1/to/' + c.level
191
192        if hasattr(c, 'gaussian') and c.gaussian:
193            self.gaussian = c.gaussian
194        else:
195            self.gaussian = ''
196
197        if 'N' in c.grid:  # Gaussian output grid
198            self.grid = c.grid
199            self.area = 'G'
200        else:
201            self.grid = '{}/{}'.format(int(c.grid) / 1000., int(c.grid) / 1000.)
202            self.area = '{}/{}/{}/{}'.format(int(c.upper) / 1000.,
203                                             int(c.left) / 1000.,
204                                             int(c.lower) / 1000.,
205                                             int(c.right) / 1000.)
206
207        self.outputfilelist = []
208
209
210        # Now comes the nasty part that deals with the different
211        # scenarios we have:
212        # 1) Calculation of etadot on
213        #    a) Gaussian grid
214        #    b) Output grid
215        #    c) Output grid using parameter 77 retrieved from MARS
216        # 3) Calculation/Retrieval of omega
217        # 4) Download also data for WRF
218
219        # Different grids need different retrievals
220        # SH = Spherical Harmonics, GG = Gaussian Grid,
221        # OG = Output Grid, ML = MultiLevel, SL = SingleLevel
222        self.params = {'SH__ML': '', 'SH__SL': '',
223                       'GG__ML': '', 'GG__SL': '',
224                       'OG__ML': '', 'OG__SL': '',
225                       'OG_OROLSM_SL': '', 'OG_acc_SL': ''}
226        # the self.params dictionary stores a list of
227        # [param, levtype, levelist, grid] per key
228
229        if fluxes is False:
230            self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
231            #                        "SD/MSL/TCC/10U/10V/2T/2D/129/172"
232            self.params['OG__SL'] = ["141/151/164/165/166/167/168/129/172", \
233                                     'SFC', '1', self.grid]
234            if c.addpar:
235                if c.addpar[0] == '/':
236                    c.addpar = c.addpar[1:]
237                self.params['OG__SL'][0] += '/' + '/'.join(c.addpar)
238
239            self.params['OG_OROLSM__SL'] = ["160/27/28/173", \
240                                            'SFC', '1', self.grid]
241
242            self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
243
244            #if c.gauss == '0' and c.eta == '1':
245            if not c.gauss and c.eta:
246                # the simplest case
247                self.params['OG__ML'][0] += '/U/V/77'
248            #elif c.gauss == '0' and c.eta == '0':
249            elif not c.gauss and not c.eta:
250            # this is not recommended (inaccurate)
251                self.params['OG__ML'][0] += '/U/V'
252            #elif c.gauss == '1' and c.eta == '0':
253            elif c.gauss and not c.eta:
254                # this is needed for data before 2008, or for reanalysis data
255                self.params['GG__SL'] = ['Q', 'ML', '1', \
256                                         '{}'.format((int(self.resol) + 1) / 2)]
257                self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
258            else:
259                print('Warning: This is a very costly parameter combination, \
260                      use only for debugging!')
261                self.params['GG__SL'] = ['Q', 'ML', '1', \
262                                         '{}'.format((int(self.resol) + 1) / 2)]
263                self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
264                                         '{}'.format((int(self.resol) + 1) / 2)]
265
266            if c.omega:
267                self.params['OG__ML'][0] += '/W'
268
269            # add cloud water content if necessary
270            if c.cwc:
271                self.params['OG__ML'][0] += '/CLWC/CIWC'
272
273            # add vorticity and geopotential height for WRF if necessary
274            if c.wrf:
275                self.params['OG__ML'][0] += '/Z/VO'
276                if '/D' not in self.params['OG__ML'][0]:
277                    self.params['OG__ML'][0] += '/D'
278                #wrf_sfc = 'sp/msl/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/stl1/ /
279                #           stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4'.upper()
280                wrf_sfc = '134/235/167/165/166/168/129/172/34/31/141/ \
281                           139/170/183/236/39/40/41/42'
282                lwrt_sfc = wrf_sfc.split('/')
283                for par in lwrt_sfc:
284                    if par not in self.params['OG__SL'][0]:
285                        self.params['OG__SL'][0] += '/' + par
286
287        else:
288            self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
289                                        'SFC', '1', self.grid]
290
291        return
292
293
294    def _mk_targetname(self, ftype, param, date):
295        '''
296        @Description:
297            Creates the filename for the requested grib data to be stored in.
298            This name is passed as the "target" parameter in the request.
299
300        @Input:
301            ftype: string
302                Shortcut name of the type of the field. E.g. AN, FC, PF, ...
303
304            param: string
305                Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
306                GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
307
308            date: string
309                The date period of the grib data to be stored in this file.
310
311        @Return:
312            targetname: string
313                The target filename for the grib data.
314        '''
315        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
316                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
317
318        return targetname
319
320
321    def _start_retrievement(self, request, par_dict):
322        '''
323        @Description:
324            Creates the Mars Retrieval and prints or submits the request
325            depending on the status of the request variable.
326
327        @Input:
328            self: instance of EcFlexpart
329                The current object of the class.
330
331            request: integer
332                Selects the mode of retrieval.
333                0: Retrieves the data from ECMWF.
334                1: Prints the mars requests to an output file.
335                2: Retrieves the data and prints the mars request.
336
337            par_dict: dictionary
338                Contains all parameter which have to be set for creating the
339                Mars Retrievals. The parameter are:
340                marsclass, stream, type, levtype, levelist, resol, gaussian,
341                accuracy, grid, target, area, date, time, number, step, expver,
342                param
343
344        @Return:
345            <nothing>
346        '''
347        # increase number of mars requests
348        self.mreq_count += 1
349
350        MR = MarsRetrieval(self.server,
351                           marsclass=par_dict['marsclass'],
352                           stream=par_dict['stream'],
353                           type=par_dict['type'],
354                           levtype=par_dict['levtype'],
355                           levelist=par_dict['levelist'],
356                           resol=par_dict['resol'],
357                           gaussian=par_dict['gaussian'],
358                           accuracy=par_dict['accuracy'],
359                           grid=par_dict['grid'],
360                           target=par_dict['target'],
361                           area=par_dict['area'],
362                           date=par_dict['date'],
363                           time=par_dict['time'],
364                           number=par_dict['number'],
365                           step=par_dict['step'],
366                           expver=par_dict['expver'],
367                           param=par_dict['param'])
368
369        if request == 0:
370            MR.display_info()
371            MR.data_retrieve()
372        elif request == 1:
373            MR.print_infodata_csv(self.inputdir, self.mreq_count)
374        elif request == 2:
375            MR.print_infodata_csv(self.inputdir, self.mreq_count)
376            MR.display_info()
377            MR.data_retrieve()
378        else:
379            print('Failure')
380
381        return
382
383
384    def _mk_index_values(self, inputdir, inputfiles, keys):
385        '''
386        @Description:
387            Creates an index file for a set of grib parameter keys.
388            The values from the index keys are returned in a list.
389
390        @Input:
391            keys: dictionary
392                List of parameter names which serves as index.
393
394            inputfiles: instance of UioFiles
395                Contains a list of files.
396
397        @Return:
398            iid: grib_index
399                This is a grib specific index structure to access
400                messages in a file.
401
402            index_vals: list
403                Contains the values from the keys used for a distinct selection
404                of grib messages in processing  the grib files.
405                Content looks like e.g.:
406                index_vals[0]: ('20171106', '20171107', '20171108') ; date
407                index_vals[1]: ('0', '1200', '1800', '600') ; time
408                index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
409        '''
410        iid = None
411        index_keys = keys
412
413        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
414        silent_remove(indexfile)
415        grib = GribTools(inputfiles.files)
416        # creates new index file
417        iid = grib.index(index_keys=index_keys, index_file=indexfile)
418
419        # read the values of index keys
420        index_vals = []
421        for key in index_keys:
422            #index_vals.append(grib_index_get(iid, key))
423            #print(index_vals[-1])
424            key_vals = grib_index_get(iid, key)
425            print(key_vals)
426            # have to sort the steps for disaggregation,
427            # therefore convert to int first
428            if key == 'step':
429                key_vals = [int(k) for k in key_vals]
430                key_vals.sort()
431                key_vals = [str(k) for k in key_vals]
432            index_vals.append(key_vals)
433            # index_vals looks for example like:
434            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
435            # index_vals[1]: ('0', '1200') ; time
436            # index_vals[2]: (3', '6', '9', '12') ; stepRange
437
438        return iid, index_vals
439
440
441    def retrieve(self, server, dates, request, inputdir='.'):
442        '''
443        @Description:
444            Finalizing the retrieval information by setting final details
445            depending on grid type.
446            Prepares MARS retrievals per grid type and submits them.
447
448        @Input:
449            self: instance of EcFlexpart
450                The current object of the class.
451
452            server: instance of ECMWFService or ECMWFDataServer
453                The connection to the ECMWF server. This is different
454                for member state users which have full access and non
455                member state users which have only access to the public
456                data sets. The decision is made from command line argument
457                "public"; for public access its True (ECMWFDataServer)
458                for member state users its False (ECMWFService)
459
460            dates: string
461                Contains start and end date of the retrieval in the format
462                "YYYYMMDD/to/YYYYMMDD"
463
464            request: integer
465                Selects the mode of retrieval.
466                0: Retrieves the data from ECMWF.
467                1: Prints the mars requests to an output file.
468                2: Retrieves the data and prints the mars request.
469
470            inputdir: string, optional
471                Path to the directory where the retrieved data is about
472                to be stored. The default is the current directory ('.').
473
474        @Return:
475            <nothing>
476        '''
477        self.dates = dates
478        self.server = server
479        self.inputdir = inputdir
480        oro = False
481
482        # define times with datetime module
483        t12h = timedelta(hours=12)
484        t24h = timedelta(hours=24)
485
486        # dictionary which contains all parameter for the mars request,
487        # entries with a "None" will change in different requests and will
488        # therefore be set in each request seperately
489        retr_param_dict = {'marsclass':self.marsclass,
490                           'stream':None,
491                           'type':None,
492                           'levtype':None,
493                           'levelist':None,
494                           'resol':self.resol,
495                           'gaussian':None,
496                           'accuracy':self.accuracy,
497                           'grid':None,
498                           'target':None,
499                           'area':None,
500                           'date':None,
501                           'time':None,
502                           'number':self.number,
503                           'step':None,
504                           'expver':self.expver,
505                           'param':None}
506
507        for ftype in self.types:
508            # fk contains field types such as
509            #     [AN, FC, PF, CV]
510            # fv contains all of the items of the belonging key
511            #     [times, steps]
512            for pk, pv in self.params.iteritems():
513                # pk contains one of these keys of params
514                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
515                #      OG_OROLSM_SL, OG_acc_SL]
516                # pv contains all of the items of the belonging key
517                #     [param, levtype, levelist, grid]
518                if isinstance(pv, str):
519                    continue
520                retr_param_dict['type'] = '' + ftype
521                retr_param_dict['time'] = self.types[ftype]['times']
522                retr_param_dict['step'] = self.types[ftype]['steps']
523                retr_param_dict['date'] = self.dates
524                retr_param_dict['stream'] = self.stream
525                retr_param_dict['target'] = \
526                    self._mk_targetname(ftype, pk,
527                                        retr_param_dict['date'].split('/')[0])
528                retr_param_dict['param'] = pv[0]
529                retr_param_dict['levtype'] = pv[1]
530                retr_param_dict['levelist'] = pv[2]
531                retr_param_dict['grid'] = pv[3]
532                retr_param_dict['area'] = self.area
533                retr_param_dict['gaussian'] = self.gaussian
534
535                if pk == 'OG__SL':
536                    pass
537                if pk == 'OG_OROLSM__SL' and not oro:
538                    oro = True
539                    retr_param_dict['stream'] = 'OPER'
540                    retr_param_dict['type'] = 'AN'
541                    retr_param_dict['time'] = '00'
542                    retr_param_dict['step'] = '000'
543                    retr_param_dict['date'] = self.dates.split('/')[0]
544                    retr_param_dict['target'] = self._mk_targetname('',
545                                            pk, retr_param_dict['date'])
546                elif pk == 'OG_OROLSM__SL' and oro:
547                    continue
548                if pk == 'GG__SL' and pv[0] == 'Q':
549                    retr_param_dict['area'] = ""
550                    retr_param_dict['gaussian'] = 'reduced'
551
552    # ------  on demand path  --------------------------------------------------
553                if not self.basetime:
554                    # ******* start retrievement
555                    self._start_retrievement(request, retr_param_dict)
556    # ------  operational path  ------------------------------------------------
557                else:
558                    # check if mars job requests fields beyond basetime.
559                    # if yes eliminate those fields since they may not
560                    # be accessible with user's credentials
561
562                    enddate = retr_param_dict['date'].split('/')[-1]
563                    elimit = datetime.strptime(enddate + self.basetime,
564                                               '%Y%m%d%H')
565
566                    if self.basetime == '12':
567                        # --------------  flux data ----------------------------
568                        if 'acc' in pk:
569
570                        # Strategy:
571                        # if maxtime-elimit >= 24h reduce date by 1,
572                        # if 12h <= maxtime-elimit<12h reduce time for last date
573                        # if maxtime-elimit<12h reduce step for last time
574                        # A split of the MARS job into 2 is likely necessary.
575
576
577                            startdate = retr_param_dict['date'].split('/')[0]
578                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
579                            retr_param_dict['date'] = '/'.join([startdate,
580                                                                'to',
581                                                                enddate])
582
583                            # ******* start retrievement
584                            self._start_retrievement(request, retr_param_dict)
585
586                            retr_param_dict['date'] = \
587                                datetime.strftime(elimit - t12h, '%Y%m%d')
588                            retr_param_dict['time'] = '00'
589                            retr_param_dict['target'] = \
590                                self._mk_targetname(ftype, pk,
591                                                    retr_param_dict['date'])
592
593                            # ******* start retrievement
594                            self._start_retrievement(request, retr_param_dict)
595
596                        # --------------  non flux data ------------------------
597                        else:
598                            # ******* start retrievement
599                            self._start_retrievement(request, retr_param_dict)
600
601                    else: # basetime = 0
602                        retr_param_dict['date'] = \
603                            datetime.strftime(elimit - t24h, '%Y%m%d')
604
605                        timesave = ''.join(retr_param_dict['time'])
606
607                        if '/' in retr_param_dict['time']:
608                            times = retr_param_dict['time'].split('/')
609                            steps = retr_param_dict['step'].split('/')
610                            while (pk != 'OG_OROLSM__SL' and
611                                   'acc' not in pk and
612                                   (int(times[0]) + int(steps[0])) <= 12):
613                                times = times[1:]
614
615                            if len(times) > 1:
616                                retr_param_dict['time'] = '/'.join(times)
617                            else:
618                                retr_param_dict['time'] = times[0]
619
620                        # ******* start retrievement
621                        self._start_retrievement(request, retr_param_dict)
622
623                        if (pk != 'OG_OROLSM__SL' and
624                            int(retr_param_dict['step'].split('/')[0]) == 0 and
625                            int(timesave.split('/')[0]) == 0):
626
627                            retr_param_dict['date'] = \
628                                datetime.strftime(elimit, '%Y%m%d')
629                            retr_param_dict['time'] = '00'
630                            retr_param_dict['step'] = '000'
631                            retr_param_dict['target'] = \
632                                self._mk_targetname(ftype, pk,
633                                                    retr_param_dict['date'])
634
635                            # ******* start retrievement
636                            self._start_retrievement(request, retr_param_dict)
637
638        if request == 0 or request == 2:
639            print('MARS retrieve done ... ')
640        elif request == 1:
641            print('MARS request printed ...')
642
643        return
644
645
646    def write_namelist(self, c, filename):
647        '''
648        @Description:
649            Creates a namelist file in the temporary directory and writes
650            the following values to it: maxl, maxb, mlevel,
651            mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
652            momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
653
654        @Input:
655            self: instance of EcFlexpart
656                The current object of the class.
657
658            c: instance of class ControlFile
659                Contains all the parameters of CONTROL file and
660                command line.
661                For more information about format and content of the parameter
662                see documentation.
663
664            filename: string
665                Name of the namelist file.
666
667        @Return:
668            <nothing>
669        '''
670
671        self.inputdir = c.inputdir
672        area = np.asarray(self.area.split('/')).astype(float)
673        grid = np.asarray(self.grid.split('/')).astype(float)
674
675        if area[1] > area[3]:
676            area[1] -= 360
677        maxl = int((area[3] - area[1]) / grid[1]) + 1
678        maxb = int((area[0] - area[2]) / grid[0]) + 1
679
680        with open(self.inputdir + '/' + filename, 'w') as f:
681            f.write('&NAMGEN\n')
682            f.write(',\n  '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb),
683                                  'mlevel = ' + str(self.level),
684                                  'mlevelist = ' + '"' + str(self.levelist)
685                                                 + '"',
686                                  'mnauf = ' + str(self.resol),
687                                  'metapar = ' + '77',
688                                  'rlo0 = ' + str(area[1]),
689                                  'rlo1 = ' + str(area[3]),
690                                  'rla0 = ' + str(area[2]),
691                                  'rla1 = ' + str(area[0]),
692                                  'momega = ' + str(c.omega),
693                                  'momegadiff = ' + str(c.omegadiff),
694                                  'mgauss = ' + str(c.gauss),
695                                  'msmooth = ' + str(c.smooth),
696                                  'meta = ' + str(c.eta),
697                                  'metadiff = ' + str(c.etadiff),
698                                  'mdpdeta = ' + str(c.dpdeta)]))
699
700            f.write('\n/\n')
701
702        return
703
704
705    def deacc_fluxes(self, inputfiles, c):
706        '''
707        @Description:
708            Goes through all flux fields in ordered time and de-accumulate
709            the fields. Afterwards the fields are disaggregated in time.
710            Different versions of disaggregation is provided for rainfall
711            data (darain, modified linear) and the surface fluxes and
712            stress data (dapoly, cubic polynomial).
713
714        @Input:
715            self: instance of EcFlexpart
716                The current object of the class.
717
718            inputfiles: instance of UioFiles
719                Contains a list of files.
720
721            c: instance of class ControlFile
722                Contains all the parameters of CONTROL file and
723                command line.
724                For more information about format and content of the parameter
725                see documentation.
726
727        @Return:
728            <nothing>
729        '''
730
731        table128 = init128(_config.PATH_GRIBTABLE)
732        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
733
734        iid = None
735        index_vals = None
736
737        # get the values of the keys which are used for distinct access
738        # of grib messages via product
739        index_keys = ["date", "time", "step"]
740        iid, index_vals = self._mk_index_values(c.inputdir,
741                                                inputfiles,
742                                                index_keys)
743        # index_vals looks like e.g.:
744        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
745        # index_vals[1]: ('0', '1200', '1800', '600') ; time
746        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
747
748        valsdict = {}
749        svalsdict = {}
750#        stepsdict = {}
751        for p in pars:
752            valsdict[str(p)] = []
753            svalsdict[str(p)] = []
754#            stepsdict[str(p)] = []
755
756        print('maxstep: ', c.maxstep)
757
758        # "product" genereates each possible combination between the
759        # values of the index keys
760        for prod in product(*index_vals):
761            # e.g. prod = ('20170505', '0', '12')
762            #             (  date    ,time, step)
763
764            print('current product: ', prod)
765
766            for i in range(len(index_keys)):
767                grib_index_select(iid, index_keys[i], prod[i])
768
769            # get first id from current product
770            gid = grib_new_from_index(iid)
771
772            # if there is no data for this specific time combination / product
773            # skip the rest of the for loop and start with next timestep/product
774            if not gid:
775                continue
776
777            # create correct timestamp from the three time informations
778            cdate = str(grib_get(gid, 'date'))
779            ctime = '{:0>2}'.format(grib_get(gid, 'time')/100)
780            cstep = '{:0>3}'.format(grib_get(gid, 'step'))
781            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
782            t_dt = t_date + timedelta(hours=int(cstep))
783            t_m1dt = t_date + timedelta(hours=int(cstep)-int(c.dtime))
784            t_m2dt = t_date + timedelta(hours=int(cstep)-2*int(c.dtime))
785            t_enddate = None
786
787            if c.maxstep > 12:
788                fnout = os.path.join(c.inputdir, 'flux' +
789                                     t_date.strftime('%Y%m%d.%H') +
790                                     '.{:0>3}'.format(step-2*int(c.dtime)))
791                gnout = os.path.join(c.inputdir, 'flux' +
792                                     t_date.strftime('%Y%m%d.%H') +
793                                     '.{:0>3}'.format(step-int(c.dtime)))
794                hnout = os.path.join(c.inputdir, 'flux' +
795                                     t_date.strftime('%Y%m%d.%H') +
796                                     '.{:0>3}'.format(step))
797            else:
798                fnout = os.path.join(c.inputdir, 'flux' +
799                                     t_m2dt.strftime('%Y%m%d%H'))
800                gnout = os.path.join(c.inputdir, 'flux' +
801                                     t_m1dt.strftime('%Y%m%d%H'))
802                hnout = os.path.join(c.inputdir, 'flux' +
803                                     t_dt.strftime('%Y%m%d%H'))
804
805            print("outputfile = " + fnout)
806
807            # read message for message and store relevant data fields
808            # data keywords are stored in pars
809            while 1:
810                if not gid:
811                    break
812                cparamId = str(grib_get(gid, 'paramId'))
813                step = grib_get(gid, 'step')
814                time = grib_get(gid, 'time')
815                ni = grib_get(gid, 'Ni')
816                nj = grib_get(gid, 'Nj')
817                if cparamId in valsdict.keys():
818                    values = grib_get_values(gid)
819                    vdp = valsdict[cparamId]
820                    svdp = svalsdict[cparamId]
821 #                   sd = stepsdict[cparamId]
822
823                    if cparamId == '142' or cparamId == '143':
824                        fak = 1. / 1000.
825                    else:
826                        fak = 3600.
827
828                    values = (np.reshape(values, (nj, ni))).flatten() / fak
829                    vdp.append(values[:])  # save the accumulated values
830                    if step <= int(c.dtime):
831                        svdp.append(values[:] / int(c.dtime))
832                    else:  # deaccumulate values
833                        svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime))
834
835                    print(cparamId, time, step, len(values),
836                          values[0], np.std(values))
837
838                    # len(svdp) correspond to the time
839                    if len(svdp) >= 3:
840                        if len(svdp) > 3:
841                            if cparamId == '142' or cparamId == '143':
842                                values = disaggregation.darain(svdp)
843                            else:
844                                values = disaggregation.dapoly(svdp)
845
846                            if not (step == c.maxstep and c.maxstep > 12 \
847                                    or t_dt == t_enddate):
848                                vdp.pop(0)
849                                svdp.pop(0)
850                        else:
851                            if c.maxstep > 12:
852                                values = svdp[1]
853                            else:
854                                values = svdp[0]
855
856                        grib_set_values(gid, values)
857
858                        if c.maxstep > 12:
859                            grib_set(gid, 'step', max(0, step-2*int(c.dtime)))
860                        else:
861                            grib_set(gid, 'step', 0)
862                            grib_set(gid, 'time', t_m2dt.hour*100)
863                            grib_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
864
865                        with open(fnout, 'w') as f_handle:
866                            grib_write(gid, f_handle)
867
868                        if c.basetime:
869                            t_enddate = datetime.strptime(c.end_date +
870                                                          c.basetime,
871                                                          '%Y%m%d%H')
872                        else:
873                            t_enddate = t_date + timedelta(2*int(c.dtime))
874
875                        # squeeze out information of last two steps contained
876                        # in svdp
877                        # if step+int(c.dtime) == c.maxstep and c.maxstep>12
878                        # or t_dt+timedelta(hours = int(c.dtime))
879                        # >= t_enddate:
880                        # Note that svdp[0] has not been popped in this case
881
882                        if step == c.maxstep and c.maxstep > 12 or \
883                           t_dt == t_enddate:
884
885                            values = svdp[3]
886                            grib_set_values(gid, values)
887                            grib_set(gid, 'step', 0)
888                            truedatetime = t_m2dt + timedelta(hours=
889                                                             2*int(c.dtime))
890                            grib_set(gid, 'time', truedatetime.hour * 100)
891                            grib_set(gid, 'date', truedatetime.year * 10000 +
892                                     truedatetime.month * 100 +
893                                     truedatetime.day)
894                            with open(hnout, 'w') as h_handle:
895                                grib_write(gid, h_handle)
896
897                            #values = (svdp[1]+svdp[2])/2.
898                            if cparamId == '142' or cparamId == '143':
899                                values = disaggregation.darain(list(reversed(svdp)))
900                            else:
901                                values = disaggregation.dapoly(list(reversed(svdp)))
902
903                            grib_set(gid, 'step', 0)
904                            truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
905                            grib_set(gid, 'time', truedatetime.hour * 100)
906                            grib_set(gid, 'date', truedatetime.year * 10000 +
907                                     truedatetime.month * 100 +
908                                     truedatetime.day)
909                            grib_set_values(gid, values)
910                            with open(gnout, 'w') as g_handle:
911                                grib_write(gid, g_handle)
912
913                grib_release(gid)
914
915                gid = grib_new_from_index(iid)
916
917        grib_index_release(iid)
918
919        return
920
921
922    def create(self, inputfiles, c):
923        '''
924        @Description:
925            This method is based on the ECMWF example index.py
926            https://software.ecmwf.int/wiki/display/GRIB/index.py
927
928            An index file will be created which depends on the combination
929            of "date", "time" and "stepRange" values. This is used to iterate
930            over all messages in each grib file which were passed through the
931            parameter "inputfiles" to seperate specific parameters into fort.*
932            files. Afterwards the FORTRAN program is called to convert
933            the data fields all to the same grid and put them in one file
934            per unique time step (combination of "date", "time" and
935            "stepRange").
936
937        @Input:
938            self: instance of EcFlexpart
939                The current object of the class.
940
941            inputfiles: instance of UioFiles
942                Contains a list of files.
943
944            c: instance of class ControlFile
945                Contains all the parameters of CONTROL file and
946                command line.
947                For more information about format and content of the parameter
948                see documentation.
949
950        @Return:
951            <nothing>
952        '''
953
954        if c.wrf:
955            table128 = init128(_config.PATH_GRIBTABLE)
956            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
957                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
958                                  table128)
959
960        # these numbers are indices for the temporary files "fort.xx"
961        # which are used to seperate the grib fields to,
962        # for the Fortran program input
963        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
964        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
965        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
966                 '17':None, '18':None, '19':None, '21':None, '22':None}
967
968        iid = None
969        index_vals = None
970
971        # get the values of the keys which are used for distinct access
972        # of grib messages via product
973        index_keys = ["date", "time", "step"]
974        iid, index_vals = self._mk_index_values(c.inputdir,
975                                                inputfiles,
976                                                index_keys)
977        # index_vals looks like e.g.:
978        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
979        # index_vals[1]: ('0', '1200', '1800', '600') ; time
980        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
981
982        # "product" genereates each possible combination between the
983        # values of the index keys
984        for prod in product(*index_vals):
985            # e.g. prod = ('20170505', '0', '12')
986            #             (  date    ,time, step)
987
988            print('current product: ', prod)
989
990            for i in range(len(index_keys)):
991                grib_index_select(iid, index_keys[i], prod[i])
992
993            # get first id from current product
994            gid = grib_new_from_index(iid)
995
996            # if there is no data for this specific time combination / product
997            # skip the rest of the for loop and start with next timestep/product
998            if not gid:
999                continue
1000
1001            # remove old fort.* files and open new ones
1002            # they are just valid for a single product
1003            for k, f in fdict.iteritems():
1004                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1005                silent_remove(fortfile)
1006                fdict[k] = open(fortfile, 'w')
1007
1008            # create correct timestamp from the three time informations
1009            cdate = str(grib_get(gid, 'date'))
1010            ctime = '{:0>2}'.format(grib_get(gid, 'time')/100)
1011            cstep = '{:0>3}'.format(grib_get(gid, 'step'))
1012            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1013            timestamp += timedelta(hours=int(cstep))
1014            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1015
1016            # if the timestamp is out of basetime start/end date period,
1017            # skip this specific product
1018            if c.basetime:
1019                start_time = datetime.strptime(c.end_date + c.basetime,
1020                                                '%Y%m%d%H') - time_delta
1021                end_time = datetime.strptime(c.end_date + c.basetime,
1022                                             '%Y%m%d%H')
1023                if timestamp < start_time or timestamp > end_time:
1024                    continue
1025
1026            if c.wrf:
1027                if 'olddate' not in locals() or cdate != olddate:
1028                    fwrf = open(os.path.join(c.outputdir,
1029                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1030                    olddate = cdate[:]
1031
1032            # savedfields remembers which fields were already used.
1033            savedfields = []
1034            # sum of cloud liquid and ice water content
1035            scwc = None
1036            while 1:
1037                if not gid:
1038                    break
1039                paramId = grib_get(gid, 'paramId')
1040                gridtype = grib_get(gid, 'gridType')
1041                levtype = grib_get(gid, 'typeOfLevel')
1042                if paramId == 77: # ETADOT
1043                    grib_write(gid, fdict['21'])
1044                elif paramId == 130: # T
1045                    grib_write(gid, fdict['11'])
1046                elif paramId == 131 or paramId == 132: # U, V wind component
1047                    grib_write(gid, fdict['10'])
1048                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1049                    grib_write(gid, fdict['17'])
1050                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1051                    grib_write(gid, fdict['18'])
1052                elif paramId == 135: # W
1053                    grib_write(gid, fdict['19'])
1054                elif paramId == 152: # LNSP
1055                    grib_write(gid, fdict['12'])
1056                elif paramId == 155 and gridtype == 'sh': # D
1057                    grib_write(gid, fdict['13'])
1058                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1059                    # sum cloud liquid water and ice
1060                    if not scwc:
1061                        scwc = grib_get_values(gid)
1062                    else:
1063                        scwc += grib_get_values(gid)
1064                        grib_set_values(gid, scwc)
1065                        grib_set(gid, 'paramId', 201031)
1066                        grib_write(gid, fdict['22'])
1067                elif c.wrf and paramId in [129, 138, 155] and \
1068                      levtype == 'hybrid': # Z, VO, D
1069                    # do not do anything right now
1070                    # these are specific parameter for WRF
1071                    pass
1072                else:
1073                    if paramId not in savedfields:
1074                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1075                        # and all ADDPAR parameter
1076                        grib_write(gid, fdict['16'])
1077                        savedfields.append(paramId)
1078                    else:
1079                        print('duplicate ' + str(paramId) + ' not written')
1080
1081                try:
1082                    if c.wrf:
1083                        # model layer
1084                        if levtype == 'hybrid' and \
1085                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1086                            grib_write(gid, fwrf)
1087                        # sfc layer
1088                        elif paramId in wrfpars:
1089                            grib_write(gid, fwrf)
1090                except AttributeError:
1091                    pass
1092
1093                grib_release(gid)
1094                gid = grib_new_from_index(iid)
1095
1096            for f in fdict.values():
1097                f.close()
1098
1099            # call for Fortran program to convert e.g. reduced_gg grids to
1100            # regular_ll and calculate detadot/dp
1101            pwd = os.getcwd()
1102            os.chdir(c.inputdir)
1103            if os.stat('fort.21').st_size == 0 and c.eta:
1104                print('Parameter 77 (etadot) is missing, most likely it is \
1105                       not available for this type or date/time\n')
1106                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1107                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1108                         is set to 1 in CONTROL file')
1109
1110            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1111            p = subprocess.check_call([os.path.join(
1112                c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1113            os.chdir(pwd)
1114
1115            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1116            if c.maxstep > 12:
1117                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1118            else:
1119                suffix = cdate_hour[2:10]
1120            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1121            print("outputfile = " + fnout)
1122            # collect for final processing
1123            self.outputfilelist.append(os.path.basename(fnout))
1124
1125            # create outputfile and copy all data from intermediate files
1126            # to the outputfile (final GRIB input files for FLEXPART)
1127            orolsm = os.path.basename(glob.glob(c.inputdir +
1128                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1129            fluxfile = 'flux' + cdate[0:2] + suffix
1130            if not c.cwc:
1131                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1132            else:
1133                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1134
1135            with open(fnout, 'wb') as fout:
1136                for f in flist:
1137                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1138                                       fout)
1139
1140            if c.omega:
1141                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1142                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1143                                            'rb'), fout)
1144
1145        if c.wrf:
1146            fwrf.close()
1147
1148        grib_index_release(iid)
1149
1150        return
1151
1152
1153    def process_output(self, c):
1154        '''
1155        @Description:
1156            The grib files are postprocessed depending on the selection in
1157            CONTROL file. The resulting files are moved to the output
1158            directory if its not equal to the input directory.
1159            The following modifications might be done if
1160            properly switched in CONTROL file:
1161            GRIB2 - Conversion to GRIB2
1162            ECTRANS - Transfer of files to gateway server
1163            ECSTORAGE - Storage at ECMWF server
1164
1165        @Input:
1166            self: instance of EcFlexpart
1167                The current object of the class.
1168
1169            c: instance of class ControlFile
1170                Contains all the parameters of CONTROL file and
1171                command line.
1172                For more information about format and content of the parameter
1173                see documentation.
1174
1175        @Return:
1176            <nothing>
1177
1178        '''
1179
1180        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1181
1182        if not c.ecapi:
1183            print('ecstorage: {}\n ecfsdir: {}\n'.
1184                  format(c.ecstorage, c.ecfsdir))
1185            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1186                  .format(c.ectrans, c.gateway, c.destination))
1187
1188        print('Output filelist: ')
1189        print(self.outputfilelist)
1190
1191        if c.format.lower() == 'grib2':
1192            for ofile in self.outputfilelist:
1193                p = subprocess.check_call(['grib_set', '-s', 'edition=2, \
1194                                           productDefinitionTemplateNumber=8',
1195                                           ofile, ofile + '_2'])
1196                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1197
1198        if c.ectrans and not c.ecapi:
1199            for ofile in self.outputfilelist:
1200                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1201                                           c.gateway, '-remote', c.destination,
1202                                           '-source', ofile])
1203
1204        if c.ecstorage and not c.ecapi:
1205            for ofile in self.outputfilelist:
1206                p = subprocess.check_call(['ecp', '-o', ofile,
1207                                           os.path.expandvars(c.ecfsdir)])
1208
1209        if c.outputdir != c.inputdir:
1210            for ofile in self.outputfilelist:
1211                p = subprocess.check_call(['mv',
1212                                           os.path.join(c.inputdir, ofile),
1213                                           c.outputdir])
1214
1215        return
1216
1217
1218    def prepare_fp_files(self, c):
1219        '''
1220        @Description:
1221            Conversion of GRIB files to FLEXPART binary format.
1222
1223        @Input:
1224            c: instance of class ControlFile
1225                Contains all the parameters of CONTROL file and
1226                command line.
1227                For more information about format and content of the parameter
1228                see documentation.
1229
1230
1231        @Return:
1232            <nothing>
1233        '''
1234        # generate AVAILABLE file
1235        # Example of AVAILABLE file data:
1236        # 20131107 000000      EN13110700              ON DISC
1237        clist = []
1238        for ofile in self.outputfilelist:
1239            fname = ofile.split('/')
1240            if '.' in fname[-1]:
1241                l = fname[-1].split('.')
1242                timestamp = datetime.strptime(l[0][-6:] + l[1],
1243                                              '%y%m%d%H')
1244                timestamp += timedelta(hours=int(l[2]))
1245                cdate = datetime.strftime(timestamp, '%Y%m%d')
1246                chms = datetime.strftime(timestamp, '%H%M%S')
1247            else:
1248                cdate = '20' + fname[-1][-8:-2]
1249                chms = fname[-1][-2:] + '0000'
1250            clist.append(cdate + ' ' + chms + ' '*6 +
1251                         fname[-1] + ' '*14 + 'ON DISC')
1252        clist.sort()
1253        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1254            f.write('\n'.join(clist) + '\n')
1255
1256        # generate pathnames file
1257        pwd = os.path.abspath(c.outputdir)
1258        with open(pwd + '/pathnames', 'w') as f:
1259            f.write(pwd + '/Options/\n')
1260            f.write(pwd + '/\n')
1261            f.write(pwd + '/\n')
1262            f.write(pwd + '/AVAILABLE\n')
1263            f.write(' = == = == = == = == = == ==  = \n')
1264
1265        # create Options dir if necessary
1266        if not os.path.exists(pwd + '/Options'):
1267            os.makedirs(pwd+'/Options')
1268
1269        # read template COMMAND file
1270        with open(os.path.expandvars(os.path.expanduser(
1271            c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f:
1272            lflist = f.read().split('\n')
1273
1274        # find index of list where to put in the
1275        # date and time information
1276        # usually after the LDIRECT parameter
1277        i = 0
1278        for l in lflist:
1279            if 'LDIRECT' in l.upper():
1280                break
1281            i += 1
1282
1283        # insert the date and time information of run start and end
1284        # into the list of lines of COMMAND file
1285        lflist = lflist[:i+1] + \
1286                 [clist[0][:16], clist[-1][:16]] + \
1287                 lflist[i+3:]
1288
1289        # write the new COMMAND file
1290        with open(pwd + '/Options/COMMAND', 'w') as g:
1291            g.write('\n'.join(lflist) + '\n')
1292
1293        # change to outputdir and start the grib2flexpart run
1294        # afterwards switch back to the working dir
1295        os.chdir(c.outputdir)
1296        p = subprocess.check_call([
1297            os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts))
1298            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1299        os.chdir(pwd)
1300
1301        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG