source: flex_extract.git/source/python/classes/EcFlexpart.py @ 5bad6ec

ctbtodev
Last change on this file since 5bad6ec was 5bad6ec, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

added possibility to extract public datasets via an logical public parameter

  • Property mode set to 100644
File size: 53.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83from gribapi import grib_set, grib_index_select, grib_new_from_index, grib_get,\
84                    grib_write, grib_get_values, grib_set_values, grib_release,\
85                    grib_index_release, grib_index_get
86
87# software specific classes and modules from flex_extract
88sys.path.append('../')
89import _config
90from GribTools import GribTools
91from mods.tools import (init128, to_param_id, silent_remove, product,
92                        my_error, make_dir)
93from MarsRetrieval import MarsRetrieval
94import mods.disaggregation as disaggregation
95
96# ------------------------------------------------------------------------------
97# CLASS
98# ------------------------------------------------------------------------------
99class EcFlexpart(object):
100    '''
101    Class to retrieve FLEXPART specific ECMWF data.
102    '''
103    # --------------------------------------------------------------------------
104    # CLASS FUNCTIONS
105    # --------------------------------------------------------------------------
106    def __init__(self, c, fluxes=False):
107        '''
108        @Description:
109            Creates an object/instance of EcFlexpart with the
110            associated settings of its attributes for the retrieval.
111
112        @Input:
113            self: instance of EcFlexpart
114                The current object of the class.
115
116            c: instance of class ControlFile
117                Contains all the parameters of CONTROL file and
118                command line.
119                For more information about format and content of the parameter
120                see documentation.
121
122            fluxes: boolean, optional
123                Decides if the flux parameter settings are stored or
124                the rest of the parameter list.
125                Default value is False.
126
127        @Return:
128            <nothing>
129        '''
130        # set a counter for the number of mars requests generated
131        self.mreq_count = 0
132
133        # different mars types for retrieving data for flexpart
134        self.types = dict()
135
136        if c.maxstep > len(c.type):    # Pure forecast mode
137            c.type = [c.type[1]]
138            c.step = ['{:0>3}'.format(int(c.step[0]))]
139            c.time = [c.time[0]]
140            for i in range(1, c.maxstep + 1):
141                c.type.append(c.type[0])
142                c.step.append('{:0>3}'.format(i))
143                c.time.append(c.time[0])
144
145        self.inputdir = c.inputdir
146        self.dataset = c.dataset
147        self.basetime = c.basetime
148        self.dtime = c.dtime
149        i = 0
150        if fluxes and c.maxstep <= 24:
151            # no forecast beyond one day is needed!
152            # Thus, prepare flux data manually as usual
153            # with only forecast fields with start times at 00/12
154            # (but without 00/12 fields since these are
155            # the initialisation times of the flux fields
156            # and therefore are zero all the time)
157            self.types[c.type[1]] = {'times': '00/12', 'steps':
158                                     '{}/to/12/by/{}'.format(c.dtime, c.dtime)}
159        else:
160            for ty, st, ti in zip(c.type, c.step, c.time):
161                btlist = range(24)
162                if c.basetime == '12':
163                    btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
164                if c.basetime == '00':
165                    btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
166
167                if i % int(c.dtime) == 0 and (i in btlist or c.maxstep > 24):
168
169                    if ty not in self.types.keys():
170                        self.types[ty] = {'times': '', 'steps': ''}
171
172                    if ti not in self.types[ty]['times']:
173                        if self.types[ty]['times']:
174                            self.types[ty]['times'] += '/'
175                        self.types[ty]['times'] += ti
176
177                    if st not in self.types[ty]['steps']:
178                        if self.types[ty]['steps']:
179                            self.types[ty]['steps'] += '/'
180                        self.types[ty]['steps'] += st
181                i += 1
182
183        self.marsclass = c.marsclass
184        self.stream = c.stream
185        self.number = c.number
186        self.resol = c.resol
187        self.accuracy = c.accuracy
188        self.level = c.level
189        self.expver = c.expver
190        self.levelist = c.levelist
191        # for gaussian grid retrieval
192        self.glevelist = '1/to/' + c.level
193
194        if hasattr(c, 'gaussian') and c.gaussian:
195            self.gaussian = c.gaussian
196        else:
197            self.gaussian = ''
198
199        if 'N' in c.grid:  # Gaussian output grid
200            self.grid = c.grid
201            self.area = 'G'
202        else:
203            self.grid = '{}/{}'.format(int(c.grid) / 1000., int(c.grid) / 1000.)
204            self.area = '{}/{}/{}/{}'.format(int(c.upper) / 1000.,
205                                             int(c.left) / 1000.,
206                                             int(c.lower) / 1000.,
207                                             int(c.right) / 1000.)
208
209        self.outputfilelist = []
210
211
212        # Now comes the nasty part that deals with the different
213        # scenarios we have:
214        # 1) Calculation of etadot on
215        #    a) Gaussian grid
216        #    b) Output grid
217        #    c) Output grid using parameter 77 retrieved from MARS
218        # 3) Calculation/Retrieval of omega
219        # 4) Download also data for WRF
220
221        # Different grids need different retrievals
222        # SH = Spherical Harmonics, GG = Gaussian Grid,
223        # OG = Output Grid, ML = MultiLevel, SL = SingleLevel
224        self.params = {'SH__ML': '', 'SH__SL': '',
225                       'GG__ML': '', 'GG__SL': '',
226                       'OG__ML': '', 'OG__SL': '',
227                       'OG_OROLSM_SL': '', 'OG_acc_SL': ''}
228        # the self.params dictionary stores a list of
229        # [param, levtype, levelist, grid] per key
230
231        if fluxes is False:
232            self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
233            #                        "SD/MSL/TCC/10U/10V/2T/2D/129/172"
234            self.params['OG__SL'] = ["141/151/164/165/166/167/168/129/172", \
235                                     'SFC', '1', self.grid]
236            if c.addpar:
237                if c.addpar[0] == '/':
238                    c.addpar = c.addpar[1:]
239                self.params['OG__SL'][0] += '/' + '/'.join(c.addpar)
240
241            self.params['OG_OROLSM__SL'] = ["160/27/28/173", \
242                                            'SFC', '1', self.grid]
243
244            self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
245
246            #if c.gauss == '0' and c.eta == '1':
247            if not c.gauss and c.eta:
248                # the simplest case
249                self.params['OG__ML'][0] += '/U/V/77'
250            #elif c.gauss == '0' and c.eta == '0':
251            elif not c.gauss and not c.eta:
252            # this is not recommended (inaccurate)
253                self.params['OG__ML'][0] += '/U/V'
254            #elif c.gauss == '1' and c.eta == '0':
255            elif c.gauss and not c.eta:
256                # this is needed for data before 2008, or for reanalysis data
257                self.params['GG__SL'] = ['Q', 'ML', '1', \
258                                         '{}'.format((int(self.resol) + 1) / 2)]
259                self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
260            else:
261                print('Warning: This is a very costly parameter combination, \
262                      use only for debugging!')
263                self.params['GG__SL'] = ['Q', 'ML', '1', \
264                                         '{}'.format((int(self.resol) + 1) / 2)]
265                self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
266                                         '{}'.format((int(self.resol) + 1) / 2)]
267
268            if c.omega:
269                self.params['OG__ML'][0] += '/W'
270
271            # add cloud water content if necessary
272            if c.cwc:
273                self.params['OG__ML'][0] += '/CLWC/CIWC'
274
275            # add vorticity and geopotential height for WRF if necessary
276            if c.wrf:
277                self.params['OG__ML'][0] += '/Z/VO'
278                if '/D' not in self.params['OG__ML'][0]:
279                    self.params['OG__ML'][0] += '/D'
280                #wrf_sfc = 'sp/msl/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/stl1/ /
281                #           stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4'.upper()
282                wrf_sfc = '134/235/167/165/166/168/129/172/34/31/141/ \
283                           139/170/183/236/39/40/41/42'
284                lwrt_sfc = wrf_sfc.split('/')
285                for par in lwrt_sfc:
286                    if par not in self.params['OG__SL'][0]:
287                        self.params['OG__SL'][0] += '/' + par
288
289        else:
290            self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
291                                        'SFC', '1', self.grid]
292
293        return
294
295
296    def _mk_targetname(self, ftype, param, date):
297        '''
298        @Description:
299            Creates the filename for the requested grib data to be stored in.
300            This name is passed as the "target" parameter in the request.
301
302        @Input:
303            ftype: string
304                Shortcut name of the type of the field. E.g. AN, FC, PF, ...
305
306            param: string
307                Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
308                GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
309
310            date: string
311                The date period of the grib data to be stored in this file.
312
313        @Return:
314            targetname: string
315                The target filename for the grib data.
316        '''
317        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
318                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
319
320        return targetname
321
322
323    def _start_retrievement(self, request, par_dict):
324        '''
325        @Description:
326            Creates the Mars Retrieval and prints or submits the request
327            depending on the status of the request variable.
328
329        @Input:
330            self: instance of EcFlexpart
331                The current object of the class.
332
333            request: integer
334                Selects the mode of retrieval.
335                0: Retrieves the data from ECMWF.
336                1: Prints the mars requests to an output file.
337                2: Retrieves the data and prints the mars request.
338
339            par_dict: dictionary
340                Contains all parameter which have to be set for creating the
341                Mars Retrievals. The parameter are:
342                marsclass, dataset, stream, type, levtype, levelist, resol,
343                gaussian, accuracy, grid, target, area, date, time, number,
344                step, expver, param
345
346        @Return:
347            <nothing>
348        '''
349        # increase number of mars requests
350        self.mreq_count += 1
351
352        MR = MarsRetrieval(self.server,
353                           self.public,
354                           marsclass=par_dict['marsclass'],
355                           dataset=par_dict['dataset'],
356                           stream=par_dict['stream'],
357                           type=par_dict['type'],
358                           levtype=par_dict['levtype'],
359                           levelist=par_dict['levelist'],
360                           resol=par_dict['resol'],
361                           gaussian=par_dict['gaussian'],
362                           accuracy=par_dict['accuracy'],
363                           grid=par_dict['grid'],
364                           target=par_dict['target'],
365                           area=par_dict['area'],
366                           date=par_dict['date'],
367                           time=par_dict['time'],
368                           number=par_dict['number'],
369                           step=par_dict['step'],
370                           expver=par_dict['expver'],
371                           param=par_dict['param'])
372
373        if request == 0:
374            MR.display_info()
375            MR.data_retrieve()
376        elif request == 1:
377            MR.print_infodata_csv(self.inputdir, self.mreq_count)
378        elif request == 2:
379            MR.print_infodata_csv(self.inputdir, self.mreq_count)
380            MR.display_info()
381            MR.data_retrieve()
382        else:
383            print('Failure')
384
385        return
386
387
388    def _mk_index_values(self, inputdir, inputfiles, keys):
389        '''
390        @Description:
391            Creates an index file for a set of grib parameter keys.
392            The values from the index keys are returned in a list.
393
394        @Input:
395            keys: dictionary
396                List of parameter names which serves as index.
397
398            inputfiles: instance of UioFiles
399                Contains a list of files.
400
401        @Return:
402            iid: grib_index
403                This is a grib specific index structure to access
404                messages in a file.
405
406            index_vals: list
407                Contains the values from the keys used for a distinct selection
408                of grib messages in processing  the grib files.
409                Content looks like e.g.:
410                index_vals[0]: ('20171106', '20171107', '20171108') ; date
411                index_vals[1]: ('0', '1200', '1800', '600') ; time
412                index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
413        '''
414        iid = None
415        index_keys = keys
416
417        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
418        silent_remove(indexfile)
419        grib = GribTools(inputfiles.files)
420        # creates new index file
421        iid = grib.index(index_keys=index_keys, index_file=indexfile)
422
423        # read the values of index keys
424        index_vals = []
425        for key in index_keys:
426            #index_vals.append(grib_index_get(iid, key))
427            #print(index_vals[-1])
428            key_vals = grib_index_get(iid, key)
429            print(key_vals)
430            # have to sort the steps for disaggregation,
431            # therefore convert to int first
432            if key == 'step':
433                key_vals = [int(k) for k in key_vals]
434                key_vals.sort()
435                key_vals = [str(k) for k in key_vals]
436            index_vals.append(key_vals)
437            # index_vals looks for example like:
438            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
439            # index_vals[1]: ('0', '1200') ; time
440            # index_vals[2]: (3', '6', '9', '12') ; stepRange
441
442        return iid, index_vals
443
444
445    def retrieve(self, server, dates, public, request, inputdir='.'):
446        '''
447        @Description:
448            Finalizing the retrieval information by setting final details
449            depending on grid type.
450            Prepares MARS retrievals per grid type and submits them.
451
452        @Input:
453            self: instance of EcFlexpart
454                The current object of the class.
455
456            server: instance of ECMWFService or ECMWFDataServer
457                The connection to the ECMWF server. This is different
458                for member state users which have full access and non
459                member state users which have only access to the public
460                data sets. The decision is made from command line argument
461                "public"; for public access its True (ECMWFDataServer)
462                for member state users its False (ECMWFService)
463
464            dates: string
465                Contains start and end date of the retrieval in the format
466                "YYYYMMDD/to/YYYYMMDD"
467
468            request: integer
469                Selects the mode of retrieval.
470                0: Retrieves the data from ECMWF.
471                1: Prints the mars requests to an output file.
472                2: Retrieves the data and prints the mars request.
473
474            inputdir: string, optional
475                Path to the directory where the retrieved data is about
476                to be stored. The default is the current directory ('.').
477
478        @Return:
479            <nothing>
480        '''
481        self.dates = dates
482        self.server = server
483        self.public = public
484        self.inputdir = inputdir
485        oro = False
486
487        # define times with datetime module
488        t12h = timedelta(hours=12)
489        t24h = timedelta(hours=24)
490
491        # dictionary which contains all parameter for the mars request,
492        # entries with a "None" will change in different requests and will
493        # therefore be set in each request seperately
494        retr_param_dict = {'marsclass':self.marsclass,
495                           'dataset':self.dataset,
496                           'stream':None,
497                           'type':None,
498                           'levtype':None,
499                           'levelist':None,
500                           'resol':self.resol,
501                           'gaussian':None,
502                           'accuracy':self.accuracy,
503                           'grid':None,
504                           'target':None,
505                           'area':None,
506                           'date':None,
507                           'time':None,
508                           'number':self.number,
509                           'step':None,
510                           'expver':self.expver,
511                           'param':None}
512
513        for ftype in self.types:
514            # fk contains field types such as
515            #     [AN, FC, PF, CV]
516            # fv contains all of the items of the belonging key
517            #     [times, steps]
518            for pk, pv in self.params.iteritems():
519                # pk contains one of these keys of params
520                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
521                #      OG_OROLSM_SL, OG_acc_SL]
522                # pv contains all of the items of the belonging key
523                #     [param, levtype, levelist, grid]
524                if isinstance(pv, str):
525                    continue
526                retr_param_dict['type'] = '' + ftype
527                retr_param_dict['time'] = self.types[ftype]['times']
528                retr_param_dict['step'] = self.types[ftype]['steps']
529                retr_param_dict['date'] = self.dates
530                retr_param_dict['stream'] = self.stream
531                retr_param_dict['target'] = \
532                    self._mk_targetname(ftype, pk,
533                                        retr_param_dict['date'].split('/')[0])
534                retr_param_dict['param'] = pv[0]
535                retr_param_dict['levtype'] = pv[1]
536                retr_param_dict['levelist'] = pv[2]
537                retr_param_dict['grid'] = pv[3]
538                retr_param_dict['area'] = self.area
539                retr_param_dict['gaussian'] = self.gaussian
540
541                if pk == 'OG__SL':
542                    pass
543                if pk == 'OG_OROLSM__SL' and not oro:
544                    oro = True
545                    retr_param_dict['stream'] = 'OPER'
546                    retr_param_dict['type'] = 'AN'
547                    retr_param_dict['time'] = '00'
548                    retr_param_dict['step'] = '000'
549                    retr_param_dict['date'] = self.dates.split('/')[0]
550                    retr_param_dict['target'] = self._mk_targetname('',
551                                            pk, retr_param_dict['date'])
552                elif pk == 'OG_OROLSM__SL' and oro:
553                    continue
554                if pk == 'GG__SL' and pv[0] == 'Q':
555                    retr_param_dict['area'] = ""
556                    retr_param_dict['gaussian'] = 'reduced'
557
558    # ------  on demand path  --------------------------------------------------
559                if not self.basetime:
560                    # ******* start retrievement
561                    self._start_retrievement(request, retr_param_dict)
562    # ------  operational path  ------------------------------------------------
563                else:
564                    # check if mars job requests fields beyond basetime.
565                    # if yes eliminate those fields since they may not
566                    # be accessible with user's credentials
567
568                    enddate = retr_param_dict['date'].split('/')[-1]
569                    elimit = datetime.strptime(enddate + self.basetime,
570                                               '%Y%m%d%H')
571
572                    if self.basetime == '12':
573                        # --------------  flux data ----------------------------
574                        if 'acc' in pk:
575
576                        # Strategy:
577                        # if maxtime-elimit >= 24h reduce date by 1,
578                        # if 12h <= maxtime-elimit<12h reduce time for last date
579                        # if maxtime-elimit<12h reduce step for last time
580                        # A split of the MARS job into 2 is likely necessary.
581
582
583                            startdate = retr_param_dict['date'].split('/')[0]
584                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
585                            retr_param_dict['date'] = '/'.join([startdate,
586                                                                'to',
587                                                                enddate])
588
589                            # ******* start retrievement
590                            self._start_retrievement(request, retr_param_dict)
591
592                            retr_param_dict['date'] = \
593                                datetime.strftime(elimit - t12h, '%Y%m%d')
594                            retr_param_dict['time'] = '00'
595                            retr_param_dict['target'] = \
596                                self._mk_targetname(ftype, pk,
597                                                    retr_param_dict['date'])
598
599                            # ******* start retrievement
600                            self._start_retrievement(request, retr_param_dict)
601
602                        # --------------  non flux data ------------------------
603                        else:
604                            # ******* start retrievement
605                            self._start_retrievement(request, retr_param_dict)
606
607                    else: # basetime = 0
608                        retr_param_dict['date'] = \
609                            datetime.strftime(elimit - t24h, '%Y%m%d')
610
611                        timesave = ''.join(retr_param_dict['time'])
612
613                        if '/' in retr_param_dict['time']:
614                            times = retr_param_dict['time'].split('/')
615                            steps = retr_param_dict['step'].split('/')
616                            while (pk != 'OG_OROLSM__SL' and
617                                   'acc' not in pk and
618                                   (int(times[0]) + int(steps[0])) <= 12):
619                                times = times[1:]
620
621                            if len(times) > 1:
622                                retr_param_dict['time'] = '/'.join(times)
623                            else:
624                                retr_param_dict['time'] = times[0]
625
626                        # ******* start retrievement
627                        self._start_retrievement(request, retr_param_dict)
628
629                        if (pk != 'OG_OROLSM__SL' and
630                            int(retr_param_dict['step'].split('/')[0]) == 0 and
631                            int(timesave.split('/')[0]) == 0):
632
633                            retr_param_dict['date'] = \
634                                datetime.strftime(elimit, '%Y%m%d')
635                            retr_param_dict['time'] = '00'
636                            retr_param_dict['step'] = '000'
637                            retr_param_dict['target'] = \
638                                self._mk_targetname(ftype, pk,
639                                                    retr_param_dict['date'])
640
641                            # ******* start retrievement
642                            self._start_retrievement(request, retr_param_dict)
643
644        if request == 0 or request == 2:
645            print('MARS retrieve done ... ')
646        elif request == 1:
647            print('MARS request printed ...')
648
649        return
650
651
652    def write_namelist(self, c, filename):
653        '''
654        @Description:
655            Creates a namelist file in the temporary directory and writes
656            the following values to it: maxl, maxb, mlevel,
657            mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
658            momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
659
660        @Input:
661            self: instance of EcFlexpart
662                The current object of the class.
663
664            c: instance of class ControlFile
665                Contains all the parameters of CONTROL file and
666                command line.
667                For more information about format and content of the parameter
668                see documentation.
669
670            filename: string
671                Name of the namelist file.
672
673        @Return:
674            <nothing>
675        '''
676
677        self.inputdir = c.inputdir
678        area = np.asarray(self.area.split('/')).astype(float)
679        grid = np.asarray(self.grid.split('/')).astype(float)
680
681        if area[1] > area[3]:
682            area[1] -= 360
683        maxl = int((area[3] - area[1]) / grid[1]) + 1
684        maxb = int((area[0] - area[2]) / grid[0]) + 1
685
686        with open(self.inputdir + '/' + filename, 'w') as f:
687            f.write('&NAMGEN\n')
688            f.write(',\n  '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb),
689                                  'mlevel = ' + str(self.level),
690                                  'mlevelist = ' + '"' + str(self.levelist)
691                                                 + '"',
692                                  'mnauf = ' + str(self.resol),
693                                  'metapar = ' + '77',
694                                  'rlo0 = ' + str(area[1]),
695                                  'rlo1 = ' + str(area[3]),
696                                  'rla0 = ' + str(area[2]),
697                                  'rla1 = ' + str(area[0]),
698                                  'momega = ' + str(c.omega),
699                                  'momegadiff = ' + str(c.omegadiff),
700                                  'mgauss = ' + str(c.gauss),
701                                  'msmooth = ' + str(c.smooth),
702                                  'meta = ' + str(c.eta),
703                                  'metadiff = ' + str(c.etadiff),
704                                  'mdpdeta = ' + str(c.dpdeta)]))
705
706            f.write('\n/\n')
707
708        return
709
710
711    def deacc_fluxes(self, inputfiles, c):
712        '''
713        @Description:
714            Goes through all flux fields in ordered time and de-accumulate
715            the fields. Afterwards the fields are disaggregated in time.
716            Different versions of disaggregation is provided for rainfall
717            data (darain, modified linear) and the surface fluxes and
718            stress data (dapoly, cubic polynomial).
719
720        @Input:
721            self: instance of EcFlexpart
722                The current object of the class.
723
724            inputfiles: instance of UioFiles
725                Contains a list of files.
726
727            c: instance of class ControlFile
728                Contains all the parameters of CONTROL file and
729                command line.
730                For more information about format and content of the parameter
731                see documentation.
732
733        @Return:
734            <nothing>
735        '''
736
737        table128 = init128(_config.PATH_GRIBTABLE)
738        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
739
740        iid = None
741        index_vals = None
742
743        # get the values of the keys which are used for distinct access
744        # of grib messages via product
745        index_keys = ["date", "time", "step"]
746        iid, index_vals = self._mk_index_values(c.inputdir,
747                                                inputfiles,
748                                                index_keys)
749        # index_vals looks like e.g.:
750        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
751        # index_vals[1]: ('0', '1200', '1800', '600') ; time
752        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
753
754        valsdict = {}
755        svalsdict = {}
756#        stepsdict = {}
757        for p in pars:
758            valsdict[str(p)] = []
759            svalsdict[str(p)] = []
760#            stepsdict[str(p)] = []
761
762        print('maxstep: ', c.maxstep)
763
764        # "product" genereates each possible combination between the
765        # values of the index keys
766        for prod in product(*index_vals):
767            # e.g. prod = ('20170505', '0', '12')
768            #             (  date    ,time, step)
769
770            print('current product: ', prod)
771
772            for i in range(len(index_keys)):
773                grib_index_select(iid, index_keys[i], prod[i])
774
775            # get first id from current product
776            gid = grib_new_from_index(iid)
777
778            # if there is no data for this specific time combination / product
779            # skip the rest of the for loop and start with next timestep/product
780            if not gid:
781                continue
782
783            # create correct timestamp from the three time informations
784            cdate = str(grib_get(gid, 'date'))
785            ctime = '{:0>2}'.format(grib_get(gid, 'time')/100)
786            cstep = '{:0>3}'.format(grib_get(gid, 'step'))
787            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
788            t_dt = t_date + timedelta(hours=int(cstep))
789            t_m1dt = t_date + timedelta(hours=int(cstep)-int(c.dtime))
790            t_m2dt = t_date + timedelta(hours=int(cstep)-2*int(c.dtime))
791            t_enddate = None
792
793            if c.maxstep > 12:
794                fnout = os.path.join(c.inputdir, 'flux' +
795                                     t_date.strftime('%Y%m%d.%H') +
796                                     '.{:0>3}'.format(step-2*int(c.dtime)))
797                gnout = os.path.join(c.inputdir, 'flux' +
798                                     t_date.strftime('%Y%m%d.%H') +
799                                     '.{:0>3}'.format(step-int(c.dtime)))
800                hnout = os.path.join(c.inputdir, 'flux' +
801                                     t_date.strftime('%Y%m%d.%H') +
802                                     '.{:0>3}'.format(step))
803            else:
804                fnout = os.path.join(c.inputdir, 'flux' +
805                                     t_m2dt.strftime('%Y%m%d%H'))
806                gnout = os.path.join(c.inputdir, 'flux' +
807                                     t_m1dt.strftime('%Y%m%d%H'))
808                hnout = os.path.join(c.inputdir, 'flux' +
809                                     t_dt.strftime('%Y%m%d%H'))
810
811            print("outputfile = " + fnout)
812
813            # read message for message and store relevant data fields
814            # data keywords are stored in pars
815            while 1:
816                if not gid:
817                    break
818                cparamId = str(grib_get(gid, 'paramId'))
819                step = grib_get(gid, 'step')
820                time = grib_get(gid, 'time')
821                ni = grib_get(gid, 'Ni')
822                nj = grib_get(gid, 'Nj')
823                if cparamId in valsdict.keys():
824                    values = grib_get_values(gid)
825                    vdp = valsdict[cparamId]
826                    svdp = svalsdict[cparamId]
827 #                   sd = stepsdict[cparamId]
828
829                    if cparamId == '142' or cparamId == '143':
830                        fak = 1. / 1000.
831                    else:
832                        fak = 3600.
833
834                    values = (np.reshape(values, (nj, ni))).flatten() / fak
835                    vdp.append(values[:])  # save the accumulated values
836                    if step <= int(c.dtime):
837                        svdp.append(values[:] / int(c.dtime))
838                    else:  # deaccumulate values
839                        svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime))
840
841                    print(cparamId, time, step, len(values),
842                          values[0], np.std(values))
843
844                    # len(svdp) correspond to the time
845                    if len(svdp) >= 3:
846                        if len(svdp) > 3:
847                            if cparamId == '142' or cparamId == '143':
848                                values = disaggregation.darain(svdp)
849                            else:
850                                values = disaggregation.dapoly(svdp)
851
852                            if not (step == c.maxstep and c.maxstep > 12 \
853                                    or t_dt == t_enddate):
854                                vdp.pop(0)
855                                svdp.pop(0)
856                        else:
857                            if c.maxstep > 12:
858                                values = svdp[1]
859                            else:
860                                values = svdp[0]
861
862                        grib_set_values(gid, values)
863
864                        if c.maxstep > 12:
865                            grib_set(gid, 'step', max(0, step-2*int(c.dtime)))
866                        else:
867                            grib_set(gid, 'step', 0)
868                            grib_set(gid, 'time', t_m2dt.hour*100)
869                            grib_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
870
871                        with open(fnout, 'w') as f_handle:
872                            grib_write(gid, f_handle)
873
874                        if c.basetime:
875                            t_enddate = datetime.strptime(c.end_date +
876                                                          c.basetime,
877                                                          '%Y%m%d%H')
878                        else:
879                            t_enddate = t_date + timedelta(2*int(c.dtime))
880
881                        # squeeze out information of last two steps contained
882                        # in svdp
883                        # if step+int(c.dtime) == c.maxstep and c.maxstep>12
884                        # or t_dt+timedelta(hours = int(c.dtime))
885                        # >= t_enddate:
886                        # Note that svdp[0] has not been popped in this case
887
888                        if step == c.maxstep and c.maxstep > 12 or \
889                           t_dt == t_enddate:
890
891                            values = svdp[3]
892                            grib_set_values(gid, values)
893                            grib_set(gid, 'step', 0)
894                            truedatetime = t_m2dt + timedelta(hours=
895                                                             2*int(c.dtime))
896                            grib_set(gid, 'time', truedatetime.hour * 100)
897                            grib_set(gid, 'date', truedatetime.year * 10000 +
898                                     truedatetime.month * 100 +
899                                     truedatetime.day)
900                            with open(hnout, 'w') as h_handle:
901                                grib_write(gid, h_handle)
902
903                            #values = (svdp[1]+svdp[2])/2.
904                            if cparamId == '142' or cparamId == '143':
905                                values = disaggregation.darain(list(reversed(svdp)))
906                            else:
907                                values = disaggregation.dapoly(list(reversed(svdp)))
908
909                            grib_set(gid, 'step', 0)
910                            truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
911                            grib_set(gid, 'time', truedatetime.hour * 100)
912                            grib_set(gid, 'date', truedatetime.year * 10000 +
913                                     truedatetime.month * 100 +
914                                     truedatetime.day)
915                            grib_set_values(gid, values)
916                            with open(gnout, 'w') as g_handle:
917                                grib_write(gid, g_handle)
918
919                grib_release(gid)
920
921                gid = grib_new_from_index(iid)
922
923        grib_index_release(iid)
924
925        return
926
927
928    def create(self, inputfiles, c):
929        '''
930        @Description:
931            This method is based on the ECMWF example index.py
932            https://software.ecmwf.int/wiki/display/GRIB/index.py
933
934            An index file will be created which depends on the combination
935            of "date", "time" and "stepRange" values. This is used to iterate
936            over all messages in each grib file which were passed through the
937            parameter "inputfiles" to seperate specific parameters into fort.*
938            files. Afterwards the FORTRAN program is called to convert
939            the data fields all to the same grid and put them in one file
940            per unique time step (combination of "date", "time" and
941            "stepRange").
942
943        @Input:
944            self: instance of EcFlexpart
945                The current object of the class.
946
947            inputfiles: instance of UioFiles
948                Contains a list of files.
949
950            c: instance of class ControlFile
951                Contains all the parameters of CONTROL file and
952                command line.
953                For more information about format and content of the parameter
954                see documentation.
955
956        @Return:
957            <nothing>
958        '''
959
960        if c.wrf:
961            table128 = init128(_config.PATH_GRIBTABLE)
962            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
963                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
964                                  table128)
965
966        # these numbers are indices for the temporary files "fort.xx"
967        # which are used to seperate the grib fields to,
968        # for the Fortran program input
969        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
970        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
971        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
972                 '17':None, '18':None, '19':None, '21':None, '22':None}
973
974        iid = None
975        index_vals = None
976
977        # get the values of the keys which are used for distinct access
978        # of grib messages via product
979        index_keys = ["date", "time", "step"]
980        iid, index_vals = self._mk_index_values(c.inputdir,
981                                                inputfiles,
982                                                index_keys)
983        # index_vals looks like e.g.:
984        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
985        # index_vals[1]: ('0', '1200', '1800', '600') ; time
986        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
987
988        # "product" genereates each possible combination between the
989        # values of the index keys
990        for prod in product(*index_vals):
991            # e.g. prod = ('20170505', '0', '12')
992            #             (  date    ,time, step)
993
994            print('current product: ', prod)
995
996            for i in range(len(index_keys)):
997                grib_index_select(iid, index_keys[i], prod[i])
998
999            # get first id from current product
1000            gid = grib_new_from_index(iid)
1001
1002            # if there is no data for this specific time combination / product
1003            # skip the rest of the for loop and start with next timestep/product
1004            if not gid:
1005                continue
1006
1007            # remove old fort.* files and open new ones
1008            # they are just valid for a single product
1009            for k, f in fdict.iteritems():
1010                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1011                silent_remove(fortfile)
1012                fdict[k] = open(fortfile, 'w')
1013
1014            # create correct timestamp from the three time informations
1015            cdate = str(grib_get(gid, 'date'))
1016            ctime = '{:0>2}'.format(grib_get(gid, 'time')/100)
1017            cstep = '{:0>3}'.format(grib_get(gid, 'step'))
1018            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1019            timestamp += timedelta(hours=int(cstep))
1020            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1021
1022            # if the timestamp is out of basetime start/end date period,
1023            # skip this specific product
1024            if c.basetime:
1025                start_time = datetime.strptime(c.end_date + c.basetime,
1026                                                '%Y%m%d%H') - time_delta
1027                end_time = datetime.strptime(c.end_date + c.basetime,
1028                                             '%Y%m%d%H')
1029                if timestamp < start_time or timestamp > end_time:
1030                    continue
1031
1032            if c.wrf:
1033                if 'olddate' not in locals() or cdate != olddate:
1034                    fwrf = open(os.path.join(c.outputdir,
1035                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1036                    olddate = cdate[:]
1037
1038            # savedfields remembers which fields were already used.
1039            savedfields = []
1040            # sum of cloud liquid and ice water content
1041            scwc = None
1042            while 1:
1043                if not gid:
1044                    break
1045                paramId = grib_get(gid, 'paramId')
1046                gridtype = grib_get(gid, 'gridType')
1047                levtype = grib_get(gid, 'typeOfLevel')
1048                if paramId == 77: # ETADOT
1049                    grib_write(gid, fdict['21'])
1050                elif paramId == 130: # T
1051                    grib_write(gid, fdict['11'])
1052                elif paramId == 131 or paramId == 132: # U, V wind component
1053                    grib_write(gid, fdict['10'])
1054                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1055                    grib_write(gid, fdict['17'])
1056                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1057                    grib_write(gid, fdict['18'])
1058                elif paramId == 135: # W
1059                    grib_write(gid, fdict['19'])
1060                elif paramId == 152: # LNSP
1061                    grib_write(gid, fdict['12'])
1062                elif paramId == 155 and gridtype == 'sh': # D
1063                    grib_write(gid, fdict['13'])
1064                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1065                    # sum cloud liquid water and ice
1066                    if not scwc:
1067                        scwc = grib_get_values(gid)
1068                    else:
1069                        scwc += grib_get_values(gid)
1070                        grib_set_values(gid, scwc)
1071                        grib_set(gid, 'paramId', 201031)
1072                        grib_write(gid, fdict['22'])
1073                elif c.wrf and paramId in [129, 138, 155] and \
1074                      levtype == 'hybrid': # Z, VO, D
1075                    # do not do anything right now
1076                    # these are specific parameter for WRF
1077                    pass
1078                else:
1079                    if paramId not in savedfields:
1080                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1081                        # and all ADDPAR parameter
1082                        grib_write(gid, fdict['16'])
1083                        savedfields.append(paramId)
1084                    else:
1085                        print('duplicate ' + str(paramId) + ' not written')
1086
1087                try:
1088                    if c.wrf:
1089                        # model layer
1090                        if levtype == 'hybrid' and \
1091                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1092                            grib_write(gid, fwrf)
1093                        # sfc layer
1094                        elif paramId in wrfpars:
1095                            grib_write(gid, fwrf)
1096                except AttributeError:
1097                    pass
1098
1099                grib_release(gid)
1100                gid = grib_new_from_index(iid)
1101
1102            for f in fdict.values():
1103                f.close()
1104
1105            # call for Fortran program to convert e.g. reduced_gg grids to
1106            # regular_ll and calculate detadot/dp
1107            pwd = os.getcwd()
1108            os.chdir(c.inputdir)
1109            if os.stat('fort.21').st_size == 0 and c.eta:
1110                print('Parameter 77 (etadot) is missing, most likely it is \
1111                       not available for this type or date/time\n')
1112                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1113                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1114                         is set to 1 in CONTROL file')
1115
1116            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1117            p = subprocess.check_call([os.path.join(
1118                c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1119            os.chdir(pwd)
1120
1121            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1122            if c.maxstep > 12:
1123                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1124            else:
1125                suffix = cdate_hour[2:10]
1126            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1127            print("outputfile = " + fnout)
1128            # collect for final processing
1129            self.outputfilelist.append(os.path.basename(fnout))
1130
1131            # create outputfile and copy all data from intermediate files
1132            # to the outputfile (final GRIB input files for FLEXPART)
1133            orolsm = os.path.basename(glob.glob(c.inputdir +
1134                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1135            fluxfile = 'flux' + cdate[0:2] + suffix
1136            if not c.cwc:
1137                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1138            else:
1139                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1140
1141            with open(fnout, 'wb') as fout:
1142                for f in flist:
1143                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1144                                       fout)
1145
1146            if c.omega:
1147                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1148                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1149                                            'rb'), fout)
1150
1151        if c.wrf:
1152            fwrf.close()
1153
1154        grib_index_release(iid)
1155
1156        return
1157
1158
1159    def process_output(self, c):
1160        '''
1161        @Description:
1162            The grib files are postprocessed depending on the selection in
1163            CONTROL file. The resulting files are moved to the output
1164            directory if its not equal to the input directory.
1165            The following modifications might be done if
1166            properly switched in CONTROL file:
1167            GRIB2 - Conversion to GRIB2
1168            ECTRANS - Transfer of files to gateway server
1169            ECSTORAGE - Storage at ECMWF server
1170
1171        @Input:
1172            self: instance of EcFlexpart
1173                The current object of the class.
1174
1175            c: instance of class ControlFile
1176                Contains all the parameters of CONTROL file and
1177                command line.
1178                For more information about format and content of the parameter
1179                see documentation.
1180
1181        @Return:
1182            <nothing>
1183
1184        '''
1185
1186        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1187
1188        if not c.ecapi:
1189            print('ecstorage: {}\n ecfsdir: {}\n'.
1190                  format(c.ecstorage, c.ecfsdir))
1191            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1192                  .format(c.ectrans, c.gateway, c.destination))
1193
1194        print('Output filelist: ')
1195        print(self.outputfilelist)
1196
1197        if c.format.lower() == 'grib2':
1198            for ofile in self.outputfilelist:
1199                p = subprocess.check_call(['grib_set', '-s', 'edition=2, \
1200                                           productDefinitionTemplateNumber=8',
1201                                           ofile, ofile + '_2'])
1202                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1203
1204        if c.ectrans and not c.ecapi:
1205            for ofile in self.outputfilelist:
1206                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1207                                           c.gateway, '-remote', c.destination,
1208                                           '-source', ofile])
1209
1210        if c.ecstorage and not c.ecapi:
1211            for ofile in self.outputfilelist:
1212                p = subprocess.check_call(['ecp', '-o', ofile,
1213                                           os.path.expandvars(c.ecfsdir)])
1214
1215        if c.outputdir != c.inputdir:
1216            for ofile in self.outputfilelist:
1217                p = subprocess.check_call(['mv',
1218                                           os.path.join(c.inputdir, ofile),
1219                                           c.outputdir])
1220
1221        return
1222
1223
1224    def prepare_fp_files(self, c):
1225        '''
1226        @Description:
1227            Conversion of GRIB files to FLEXPART binary format.
1228
1229        @Input:
1230            c: instance of class ControlFile
1231                Contains all the parameters of CONTROL file and
1232                command line.
1233                For more information about format and content of the parameter
1234                see documentation.
1235
1236
1237        @Return:
1238            <nothing>
1239        '''
1240        # generate AVAILABLE file
1241        # Example of AVAILABLE file data:
1242        # 20131107 000000      EN13110700              ON DISC
1243        clist = []
1244        for ofile in self.outputfilelist:
1245            fname = ofile.split('/')
1246            if '.' in fname[-1]:
1247                l = fname[-1].split('.')
1248                timestamp = datetime.strptime(l[0][-6:] + l[1],
1249                                              '%y%m%d%H')
1250                timestamp += timedelta(hours=int(l[2]))
1251                cdate = datetime.strftime(timestamp, '%Y%m%d')
1252                chms = datetime.strftime(timestamp, '%H%M%S')
1253            else:
1254                cdate = '20' + fname[-1][-8:-2]
1255                chms = fname[-1][-2:] + '0000'
1256            clist.append(cdate + ' ' + chms + ' '*6 +
1257                         fname[-1] + ' '*14 + 'ON DISC')
1258        clist.sort()
1259        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1260            f.write('\n'.join(clist) + '\n')
1261
1262        # generate pathnames file
1263        pwd = os.path.abspath(c.outputdir)
1264        with open(pwd + '/pathnames', 'w') as f:
1265            f.write(pwd + '/Options/\n')
1266            f.write(pwd + '/\n')
1267            f.write(pwd + '/\n')
1268            f.write(pwd + '/AVAILABLE\n')
1269            f.write(' = == = == = == = == = == ==  = \n')
1270
1271        # create Options dir if necessary
1272        if not os.path.exists(pwd + '/Options'):
1273            make_dir(pwd+'/Options')
1274
1275        # read template COMMAND file
1276        with open(os.path.expandvars(os.path.expanduser(
1277            c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f:
1278            lflist = f.read().split('\n')
1279
1280        # find index of list where to put in the
1281        # date and time information
1282        # usually after the LDIRECT parameter
1283        i = 0
1284        for l in lflist:
1285            if 'LDIRECT' in l.upper():
1286                break
1287            i += 1
1288
1289        # insert the date and time information of run start and end
1290        # into the list of lines of COMMAND file
1291        lflist = lflist[:i+1] + \
1292                 [clist[0][:16], clist[-1][:16]] + \
1293                 lflist[i+3:]
1294
1295        # write the new COMMAND file
1296        with open(pwd + '/Options/COMMAND', 'w') as g:
1297            g.write('\n'.join(lflist) + '\n')
1298
1299        # change to outputdir and start the grib2flexpart run
1300        # afterwards switch back to the working dir
1301        os.chdir(c.outputdir)
1302        p = subprocess.check_call([
1303            os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts))
1304            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1305        os.chdir(pwd)
1306
1307        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG