source: flex_extract.git/source/python/classes/EcFlexpart.py @ 2f5ca80

ctbtodev
Last change on this file since 2f5ca80 was 2e62398, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

moved from grib_api to eccodes

  • Property mode set to 100644
File size: 52.1 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83
84from eccodes import (codes_index_select, codes_new_from_index, codes_get,
85                     codes_get_values, codes_set_values, codes_set,
86                     codes_write, codes_release, codes_new_from_index,
87                     codes_index_release, codes_index_get)
88
89# software specific classes and modules from flex_extract
90sys.path.append('../')
91import _config
92from GribTools import GribTools
93from mods.tools import (init128, to_param_id, silent_remove, product,
94                        my_error, make_dir)
95from MarsRetrieval import MarsRetrieval
96import mods.disaggregation as disaggregation
97
98# ------------------------------------------------------------------------------
99# CLASS
100# ------------------------------------------------------------------------------
101class EcFlexpart(object):
102    '''
103    Class to retrieve FLEXPART specific ECMWF data.
104    '''
105    # --------------------------------------------------------------------------
106    # CLASS FUNCTIONS
107    # --------------------------------------------------------------------------
108    def __init__(self, c, fluxes=False):
109        '''Creates an object/instance of EcFlexpart with the associated
110        settings of its attributes for the retrieval.
111
112        Parameters:
113        -----------
114        c : :obj:`ControlFile`
115            Contains all the parameters of CONTROL file and
116            command line.
117
118        fluxes : :obj:`boolean`, optional
119            Decides if the flux parameter settings are stored or
120            the rest of the parameter list.
121            Default value is False.
122
123        Return
124        ------
125
126        '''
127        # set a counter for the number of mars requests generated
128        self.mreq_count = 0
129
130        # different mars types for retrieving data for flexpart
131        self.types = dict()
132
133        # Pure forecast mode
134        if c.maxstep > len(c.type) and 'AN' not in c.type:
135            c.type = [c.type[0]]
136            c.step = ['{:0>3}'.format(int(c.step[0]))]
137            c.time = [c.time[0]]
138            for i in range(1, c.maxstep + 1):
139                c.type.append(c.type[0])
140                c.step.append('{:0>3}'.format(i))
141                c.time.append(c.time[0])
142
143        self.inputdir = c.inputdir
144        self.dataset = c.dataset
145        self.basetime = c.basetime
146        self.dtime = c.dtime
147        i = 0
148        if fluxes and c.maxstep <= 24:
149            # no forecast beyond one day is needed!
150            # Thus, prepare flux data manually as usual
151            # with only forecast fields with start times at 00/12
152            # (but without 00/12 fields since these are
153            # the initialisation times of the flux fields
154            # and therefore are zero all the time)
155            self.types[str(c.acctype)] = {'times': str(c.acctime),
156                                          'steps': '{}/to/{}/by/{}'.format(
157                                              c.dtime, c.accmaxstep, c.dtime)}
158        else:
159            for ty, st, ti in zip(c.type, c.step, c.time):
160                btlist = range(24)
161                if c.basetime == '12':
162                    btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
163                if c.basetime == '00':
164                    btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
165
166                if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
167                    (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
168                     (int(c.step[i]) % int(c.dtime) == 0)) ) and \
169                    (int(c.time[i]) in btlist or c.maxstep > 24):
170
171                    if ty not in self.types.keys():
172                        self.types[ty] = {'times': '', 'steps': ''}
173
174                    if ti not in self.types[ty]['times']:
175                        if self.types[ty]['times']:
176                            self.types[ty]['times'] += '/'
177                        self.types[ty]['times'] += ti
178
179                    if st not in self.types[ty]['steps']:
180                        if self.types[ty]['steps']:
181                            self.types[ty]['steps'] += '/'
182                        self.types[ty]['steps'] += st
183                i += 1
184
185        self.marsclass = c.marsclass
186        self.stream = c.stream
187        self.number = c.number
188        self.resol = c.resol
189        self.accuracy = c.accuracy
190        self.level = c.level
191        self.expver = c.expver
192        self.levelist = c.levelist
193        # for gaussian grid retrieval
194        self.glevelist = '1/to/' + c.level
195        self.gaussian = c.gaussian
196
197        if 'N' in c.grid:  # Gaussian output grid
198            self.grid = c.grid
199            self.area = 'G'
200        else:
201            self.grid = '{}/{}'.format(int(c.grid) / 1000., int(c.grid) / 1000.)
202            self.area = '{}/{}/{}/{}'.format(int(c.upper) / 1000.,
203                                             int(c.left) / 1000.,
204                                             int(c.lower) / 1000.,
205                                             int(c.right) / 1000.)
206
207        self.outputfilelist = []
208
209
210        # Now comes the nasty part that deals with the different
211        # scenarios we have:
212        # 1) Calculation of etadot on
213        #    a) Gaussian grid
214        #    b) Output grid
215        #    c) Output grid using parameter 77 retrieved from MARS
216        # 3) Calculation/Retrieval of omega
217        # 4) Download also data for WRF
218
219        # Different grids need different retrievals
220        # SH = Spherical Harmonics, GG = Gaussian Grid,
221        # OG = Output Grid, ML = MultiLevel, SL = SingleLevel
222        self.params = {'SH__ML': '', 'SH__SL': '',
223                       'GG__ML': '', 'GG__SL': '',
224                       'OG__ML': '', 'OG__SL': '',
225                       'OG_OROLSM_SL': '', 'OG_acc_SL': ''}
226        # the self.params dictionary stores a list of
227        # [param, levtype, levelist, grid] per key
228
229        if fluxes is False:
230            self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
231            #                        "SD/MSL/TCC/10U/10V/2T/2D/129/172"
232            self.params['OG__SL'] = ["141/151/164/165/166/167/168/129/172", \
233                                     'SFC', '1', self.grid]
234            if c.addpar:
235                if c.addpar[0] == '/':
236                    c.addpar = c.addpar[1:]
237                self.params['OG__SL'][0] += '/' + '/'.join(c.addpar)
238
239            if c.marsclass.upper() == 'EA' or c.marsclass.upper() == 'EP':
240                self.params['OG_OROLSM__SL'] = ["160/27/28/244",
241                                                'SFC', '1', self.grid]
242            else:
243                self.params['OG_OROLSM__SL'] = ["160/27/28/173", \
244                                                'SFC', '1', self.grid]
245
246            self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
247
248            #if c.gauss == '0' and c.eta == '1':
249            if not c.gauss and c.eta:
250                # the simplest case
251                self.params['OG__ML'][0] += '/U/V/77'
252            #elif c.gauss == '0' and c.eta == '0':
253            elif not c.gauss and not c.eta:
254            # this is not recommended (inaccurate)
255                self.params['OG__ML'][0] += '/U/V'
256            #elif c.gauss == '1' and c.eta == '0':
257            elif c.gauss and not c.eta:
258                # this is needed for data before 2008, or for reanalysis data
259                self.params['GG__SL'] = ['Q', 'ML', '1', \
260                                         '{}'.format((int(self.resol) + 1) / 2)]
261                self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
262            else:
263                print('Warning: This is a very costly parameter combination, \
264                      use only for debugging!')
265                self.params['GG__SL'] = ['Q', 'ML', '1', \
266                                         '{}'.format((int(self.resol) + 1) / 2)]
267                self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
268                                         '{}'.format((int(self.resol) + 1) / 2)]
269
270            if c.omega:
271                self.params['OG__ML'][0] += '/W'
272
273            # add cloud water content if necessary
274            if c.cwc:
275                self.params['OG__ML'][0] += '/CLWC/CIWC'
276
277            # add vorticity and geopotential height for WRF if necessary
278            if c.wrf:
279                self.params['OG__ML'][0] += '/Z/VO'
280                if '/D' not in self.params['OG__ML'][0]:
281                    self.params['OG__ML'][0] += '/D'
282                #wrf_sfc = 'sp/msl/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/stl1/ /
283                #           stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4'.upper()
284                wrf_sfc = '134/235/167/165/166/168/129/172/34/31/141/ \
285                           139/170/183/236/39/40/41/42'
286                lwrt_sfc = wrf_sfc.split('/')
287                for par in lwrt_sfc:
288                    if par not in self.params['OG__SL'][0]:
289                        self.params['OG__SL'][0] += '/' + par
290
291        else:
292            self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
293                                        'SFC', '1', self.grid]
294
295        return
296
297
298    def _mk_targetname(self, ftype, param, date):
299        '''Creates the filename for the requested grib data to be stored in.
300        This name is passed as the "target" parameter in the request.
301
302        Parameters
303        ----------
304        ftype : :obj:`string`
305            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
306
307        param : :obj:`string`
308            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
309            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
310
311        date : :obj:`string`
312            The date period of the grib data to be stored in this file.
313
314        Return
315        ------
316        targetname : :obj:`string`
317            The target filename for the grib data.
318        '''
319        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
320                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
321
322        return targetname
323
324
325    def _start_retrievement(self, request, par_dict):
326        '''Creates the Mars Retrieval and prints or submits the request
327        depending on the status of the request variable.
328
329        Parameters
330        ----------
331        request : :obj:`integer`
332            Selects the mode of retrieval.
333            0: Retrieves the data from ECMWF.
334            1: Prints the mars requests to an output file.
335            2: Retrieves the data and prints the mars request.
336
337        par_dict : :obj:`dictionary`
338            Contains all parameter which have to be set for creating the
339            Mars Retrievals. The parameter are:
340            marsclass, dataset, stream, type, levtype, levelist, resol,
341            gaussian, accuracy, grid, target, area, date, time, number,
342            step, expver, param
343
344        Return
345        ------
346
347        '''
348        # increase number of mars requests
349        self.mreq_count += 1
350
351        MR = MarsRetrieval(self.server,
352                           self.public,
353                           marsclass=par_dict['marsclass'],
354                           dataset=par_dict['dataset'],
355                           stream=par_dict['stream'],
356                           type=par_dict['type'],
357                           levtype=par_dict['levtype'],
358                           levelist=par_dict['levelist'],
359                           resol=par_dict['resol'],
360                           gaussian=par_dict['gaussian'],
361                           accuracy=par_dict['accuracy'],
362                           grid=par_dict['grid'],
363                           target=par_dict['target'],
364                           area=par_dict['area'],
365                           date=par_dict['date'],
366                           time=par_dict['time'],
367                           number=par_dict['number'],
368                           step=par_dict['step'],
369                           expver=par_dict['expver'],
370                           param=par_dict['param'])
371
372        if request == 0:
373            MR.display_info()
374            MR.data_retrieve()
375        elif request == 1:
376            MR.print_infodata_csv(self.inputdir, self.mreq_count)
377        elif request == 2:
378            MR.print_infodata_csv(self.inputdir, self.mreq_count)
379            MR.display_info()
380            MR.data_retrieve()
381        else:
382            print('Failure')
383
384        return
385
386
387    def _mk_index_values(self, inputdir, inputfiles, keys):
388        '''Creates an index file for a set of grib parameter keys.
389        The values from the index keys are returned in a list.
390
391        Parameters
392        ----------
393        keys : :obj:`dictionary`
394            List of parameter names which serves as index.
395
396        inputfiles : :obj:`UioFiles`
397            Contains a list of files.
398
399        Return
400        ------
401        iid : :obj:`codes_index`
402            This is a grib specific index structure to access
403            messages in a file.
404
405        index_vals : :obj:`list`
406            Contains the values from the keys used for a distinct selection
407            of grib messages in processing  the grib files.
408            Content looks like e.g.:
409            index_vals[0]: ('20171106', '20171107', '20171108') ; date
410            index_vals[1]: ('0', '1200', '1800', '600') ; time
411            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
412        '''
413        iid = None
414        index_keys = keys
415
416        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
417        silent_remove(indexfile)
418        grib = GribTools(inputfiles.files)
419        # creates new index file
420        iid = grib.index(index_keys=index_keys, index_file=indexfile)
421
422        # read the values of index keys
423        index_vals = []
424        for key in index_keys:
425            key_vals = codes_index_get(iid, key)
426            print(key_vals)
427            # have to sort the steps for disaggregation,
428            # therefore convert to int first
429            if key == 'step':
430                key_vals = [int(k) for k in key_vals]
431                key_vals.sort()
432                key_vals = [str(k) for k in key_vals]
433            index_vals.append(key_vals)
434            # index_vals looks for example like:
435            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
436            # index_vals[1]: ('0', '1200') ; time
437            # index_vals[2]: (3', '6', '9', '12') ; stepRange
438
439        return iid, index_vals
440
441
442    def retrieve(self, server, dates, public, request, inputdir='.'):
443        '''Finalizing the retrieval information by setting final details
444        depending on grid type.
445        Prepares MARS retrievals per grid type and submits them.
446
447        Parameters
448        ----------
449        server : :obj:`ECMWFService` or :obj:`ECMWFDataServer`
450            The connection to the ECMWF server. This is different
451            for member state users which have full access and non
452            member state users which have only access to the public
453            data sets. The decision is made from command line argument
454            "public"; for public access its True (ECMWFDataServer)
455            for member state users its False (ECMWFService)
456
457        dates : :obj:`string`
458            Contains start and end date of the retrieval in the format
459            "YYYYMMDD/to/YYYYMMDD"
460
461        request : :obj:`integer`
462            Selects the mode of retrieval.
463            0: Retrieves the data from ECMWF.
464            1: Prints the mars requests to an output file.
465            2: Retrieves the data and prints the mars request.
466
467        inputdir : :obj:`string`, optional
468            Path to the directory where the retrieved data is about
469            to be stored. The default is the current directory ('.').
470
471        Return
472        ------
473
474        '''
475        self.dates = dates
476        self.server = server
477        self.public = public
478        self.inputdir = inputdir
479        oro = False
480
481        # define times with datetime module
482        t12h = timedelta(hours=12)
483        t24h = timedelta(hours=24)
484
485        # dictionary which contains all parameter for the mars request,
486        # entries with a "None" will change in different requests and will
487        # therefore be set in each request seperately
488        retr_param_dict = {'marsclass':self.marsclass,
489                           'dataset':self.dataset,
490                           'stream':None,
491                           'type':None,
492                           'levtype':None,
493                           'levelist':None,
494                           'resol':self.resol,
495                           'gaussian':None,
496                           'accuracy':self.accuracy,
497                           'grid':None,
498                           'target':None,
499                           'area':None,
500                           'date':None,
501                           'time':None,
502                           'number':self.number,
503                           'step':None,
504                           'expver':self.expver,
505                           'param':None}
506
507        for ftype in self.types:
508            # fk contains field types such as
509            #     [AN, FC, PF, CV]
510            # fv contains all of the items of the belonging key
511            #     [times, steps]
512            for pk, pv in self.params.iteritems():
513                # pk contains one of these keys of params
514                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
515                #      OG_OROLSM_SL, OG_acc_SL]
516                # pv contains all of the items of the belonging key
517                #     [param, levtype, levelist, grid]
518                if isinstance(pv, str):
519                    continue
520                retr_param_dict['type'] = ftype
521                retr_param_dict['time'] = self.types[ftype]['times']
522                retr_param_dict['step'] = self.types[ftype]['steps']
523                retr_param_dict['date'] = self.dates
524                retr_param_dict['stream'] = self.stream
525                retr_param_dict['target'] = \
526                    self._mk_targetname(ftype,
527                                        pk,
528                                        retr_param_dict['date'].split('/')[0])
529                retr_param_dict['param'] = pv[0]
530                retr_param_dict['levtype'] = pv[1]
531                retr_param_dict['levelist'] = pv[2]
532                retr_param_dict['grid'] = pv[3]
533                retr_param_dict['area'] = self.area
534                retr_param_dict['gaussian'] = self.gaussian
535
536                if pk == 'OG__SL':
537                    pass
538                if pk == 'OG_OROLSM__SL' and not oro:
539                    oro = True
540                    # in CERA20C (class EP) there is no stream "OPER"!
541                    if self.marsclass.upper() != 'EP':
542                        retr_param_dict['stream'] = 'OPER'
543                    retr_param_dict['type'] = 'AN'
544                    retr_param_dict['time'] = '00'
545                    retr_param_dict['step'] = '000'
546                    retr_param_dict['date'] = self.dates.split('/')[0]
547                    retr_param_dict['target'] = self._mk_targetname('',
548                                            pk, retr_param_dict['date'])
549                elif pk == 'OG_OROLSM__SL' and oro:
550                    continue
551                if pk == 'GG__SL' and pv[0] == 'Q':
552                    retr_param_dict['area'] = ""
553                    retr_param_dict['gaussian'] = 'reduced'
554
555    # ------  on demand path  --------------------------------------------------
556                if not self.basetime:
557                    # ******* start retrievement
558                    self._start_retrievement(request, retr_param_dict)
559    # ------  operational path  ------------------------------------------------
560                else:
561                    # check if mars job requests fields beyond basetime.
562                    # if yes eliminate those fields since they may not
563                    # be accessible with user's credentials
564
565                    enddate = retr_param_dict['date'].split('/')[-1]
566                    elimit = datetime.strptime(enddate + self.basetime,
567                                               '%Y%m%d%H')
568
569                    if self.basetime == '12':
570                        # --------------  flux data ----------------------------
571                        if 'acc' in pk:
572
573                        # Strategy:
574                        # if maxtime-elimit >= 24h reduce date by 1,
575                        # if 12h <= maxtime-elimit<12h reduce time for last date
576                        # if maxtime-elimit<12h reduce step for last time
577                        # A split of the MARS job into 2 is likely necessary.
578
579
580                            startdate = retr_param_dict['date'].split('/')[0]
581                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
582                            retr_param_dict['date'] = '/'.join([startdate,
583                                                                'to',
584                                                                enddate])
585
586                            # ******* start retrievement
587                            self._start_retrievement(request, retr_param_dict)
588
589                            retr_param_dict['date'] = \
590                                datetime.strftime(elimit - t12h, '%Y%m%d')
591                            retr_param_dict['time'] = '00'
592                            retr_param_dict['target'] = \
593                                self._mk_targetname(ftype, pk,
594                                                    retr_param_dict['date'])
595
596                            # ******* start retrievement
597                            self._start_retrievement(request, retr_param_dict)
598
599                        # --------------  non flux data ------------------------
600                        else:
601                            # ******* start retrievement
602                            self._start_retrievement(request, retr_param_dict)
603
604                    else: # basetime = 0
605                        retr_param_dict['date'] = \
606                            datetime.strftime(elimit - t24h, '%Y%m%d')
607
608                        timesave = ''.join(retr_param_dict['time'])
609
610                        if '/' in retr_param_dict['time']:
611                            times = retr_param_dict['time'].split('/')
612                            steps = retr_param_dict['step'].split('/')
613                            while (pk != 'OG_OROLSM__SL' and
614                                   'acc' not in pk and
615                                   (int(times[0]) + int(steps[0])) <= 12):
616                                times = times[1:]
617
618                            if len(times) > 1:
619                                retr_param_dict['time'] = '/'.join(times)
620                            else:
621                                retr_param_dict['time'] = times[0]
622
623                        # ******* start retrievement
624                        self._start_retrievement(request, retr_param_dict)
625
626                        if (pk != 'OG_OROLSM__SL' and
627                            int(retr_param_dict['step'].split('/')[0]) == 0 and
628                            int(timesave.split('/')[0]) == 0):
629
630                            retr_param_dict['date'] = \
631                                datetime.strftime(elimit, '%Y%m%d')
632                            retr_param_dict['time'] = '00'
633                            retr_param_dict['step'] = '000'
634                            retr_param_dict['target'] = \
635                                self._mk_targetname(ftype, pk,
636                                                    retr_param_dict['date'])
637
638                            # ******* start retrievement
639                            self._start_retrievement(request, retr_param_dict)
640
641        if request == 0 or request == 2:
642            print('MARS retrieve done ... ')
643        elif request == 1:
644            print('MARS request printed ...')
645
646        return
647
648
649    def write_namelist(self, c):
650        '''Creates a namelist file in the temporary directory and writes
651        the following values to it: maxl, maxb, mlevel,
652        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
653        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
654
655        Parameters
656        ----------
657        c : :obj:`ControlFile`
658            Contains all the parameters of CONTROL file and
659            command line.
660
661        filename : :obj:`string`
662                Name of the namelist file.
663
664        Return
665        ------
666
667        '''
668
669        from genshi.template.text import NewTextTemplate
670        from genshi.template import  TemplateLoader
671
672        loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
673        namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
674                                       cls=NewTextTemplate)
675
676        self.inputdir = c.inputdir
677        area = np.asarray(self.area.split('/')).astype(float)
678        grid = np.asarray(self.grid.split('/')).astype(float)
679
680        if area[1] > area[3]:
681            area[1] -= 360
682        maxl = int((area[3] - area[1]) / grid[1]) + 1
683        maxb = int((area[0] - area[2]) / grid[0]) + 1
684
685        stream = namelist_template.generate(
686            maxl = str(maxl),
687            maxb = str(maxb),
688            mlevel = str(self.level),
689            mlevelist = str(self.levelist),
690            mnauf = str(self.resol),
691            metapar = '77',
692            rlo0 = str(area[1]),
693            rlo1 = str(area[3]),
694            rla0 = str(area[2]),
695            rla1 = str(area[0]),
696            momega = str(c.omega),
697            momegadiff = str(c.omegadiff),
698            mgauss = str(c.gauss),
699            msmooth = str(c.smooth),
700            meta = str(c.eta),
701            metadiff = str(c.etadiff),
702            mdpdeta = str(c.dpdeta)
703        )
704
705        namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
706
707        with open(namelistfile, 'w') as f:
708            f.write(stream.render('text'))
709
710        return
711
712
713    def deacc_fluxes(self, inputfiles, c):
714        '''Goes through all flux fields in ordered time and de-accumulate
715        the fields. Afterwards the fields are disaggregated in time.
716        Different versions of disaggregation is provided for rainfall
717        data (darain, modified linear) and the surface fluxes and
718        stress data (dapoly, cubic polynomial).
719
720        Parameters
721        ----------
722        inputfiles : :obj:`UioFiles`
723            Contains a list of files.
724
725        c : :obj:`ControlFile`
726            Contains all the parameters of CONTROL file and
727            command line.
728
729        Return
730        ------
731
732        '''
733
734        table128 = init128(_config.PATH_GRIBTABLE)
735        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
736
737        iid = None
738        index_vals = None
739
740        # get the values of the keys which are used for distinct access
741        # of grib messages via product
742        index_keys = ["date", "time", "step"]
743        iid, index_vals = self._mk_index_values(c.inputdir,
744                                                inputfiles,
745                                                index_keys)
746        # index_vals looks like e.g.:
747        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
748        # index_vals[1]: ('0', '1200', '1800', '600') ; time
749        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
750
751        # initialize dictionaries
752        valsdict = {}
753        svalsdict = {}
754        for p in pars:
755            valsdict[str(p)] = []
756            svalsdict[str(p)] = []
757
758        # "product" genereates each possible combination between the
759        # values of the index keys
760        for prod in product(*index_vals):
761            # e.g. prod = ('20170505', '0', '12')
762            #             (  date    ,time, step)
763
764            print('current product: ', prod)
765
766            for i in range(len(index_keys)):
767                codes_index_select(iid, index_keys[i], prod[i])
768
769            # get first id from current product
770            gid = codes_new_from_index(iid)
771
772            # if there is no data for this specific time combination / product
773            # skip the rest of the for loop and start with next timestep/product
774            if not gid:
775                continue
776
777            # create correct timestamp from the three time informations
778            cdate = str(codes_get(gid, 'date'))
779            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
780            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
781            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
782            t_dt = t_date + timedelta(hours=int(cstep))
783            t_m1dt = t_date + timedelta(hours=int(cstep)-int(c.dtime))
784            t_m2dt = t_date + timedelta(hours=int(cstep)-2*int(c.dtime))
785            t_enddate = None
786
787            if c.maxstep > 12:
788                fnout = os.path.join(c.inputdir, 'flux' +
789                                     t_date.strftime('%Y%m%d.%H') +
790                                     '.{:0>3}'.format(step-2*int(c.dtime)))
791                gnout = os.path.join(c.inputdir, 'flux' +
792                                     t_date.strftime('%Y%m%d.%H') +
793                                     '.{:0>3}'.format(step-int(c.dtime)))
794                hnout = os.path.join(c.inputdir, 'flux' +
795                                     t_date.strftime('%Y%m%d.%H') +
796                                     '.{:0>3}'.format(step))
797            else:
798                fnout = os.path.join(c.inputdir, 'flux' +
799                                     t_m2dt.strftime('%Y%m%d%H'))
800                gnout = os.path.join(c.inputdir, 'flux' +
801                                     t_m1dt.strftime('%Y%m%d%H'))
802                hnout = os.path.join(c.inputdir, 'flux' +
803                                     t_dt.strftime('%Y%m%d%H'))
804
805            print("outputfile = " + fnout)
806            f_handle = open(fnout, 'w')
807            h_handle = open(hnout, 'w')
808            g_handle = open(gnout, 'w')
809
810            # read message for message and store relevant data fields
811            # data keywords are stored in pars
812            while 1:
813                if not gid:
814                    break
815                cparamId = str(codes_get(gid, 'paramId'))
816                step = codes_get(gid, 'step')
817                time = codes_get(gid, 'time')
818                ni = codes_get(gid, 'Ni')
819                nj = codes_get(gid, 'Nj')
820                if cparamId in valsdict.keys():
821                    values = codes_get_values(gid)
822                    vdp = valsdict[cparamId]
823                    svdp = svalsdict[cparamId]
824
825                    if cparamId == '142' or cparamId == '143':
826                        fak = 1. / 1000.
827                    else:
828                        fak = 3600.
829
830                    values = (np.reshape(values, (nj, ni))).flatten() / fak
831                    vdp.append(values[:])  # save the accumulated values
832                    if c.marsclass.upper() == 'EA' or \
833                       step <= int(c.dtime):
834                        svdp.append(values[:] / int(c.dtime))
835                    else:  # deaccumulate values
836                        svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime))
837
838                    print(cparamId, time, step, len(values),
839                          values[0], np.std(values))
840
841                    # len(svdp) corresponds to the time
842                    if len(svdp) >= 3:
843                        if len(svdp) > 3:
844                            if cparamId == '142' or cparamId == '143':
845                                values = disaggregation.darain(svdp)
846                            else:
847                                values = disaggregation.dapoly(svdp)
848
849                            if not (step == c.maxstep and c.maxstep > 12 \
850                                    or t_dt == t_enddate):
851                                vdp.pop(0)
852                                svdp.pop(0)
853                        else:
854                            if c.maxstep > 12:
855                                values = svdp[1]
856                            else:
857                                values = svdp[0]
858
859                        codes_set_values(gid, values)
860
861                        if c.maxstep > 12:
862                            codes_set(gid, 'step', max(0, step-2*int(c.dtime)))
863                        else:
864                            codes_set(gid, 'step', 0)
865                            codes_set(gid, 'time', t_m2dt.hour*100)
866                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
867
868                        codes_write(gid, f_handle)
869
870                        if c.basetime:
871                            t_enddate = datetime.strptime(c.end_date +
872                                                          c.basetime,
873                                                          '%Y%m%d%H')
874                        else:
875                            t_enddate = t_date + timedelta(2*int(c.dtime))
876
877                        # squeeze out information of last two steps contained
878                        # in svdp
879                        # if step+int(c.dtime) == c.maxstep and c.maxstep>12
880                        # or t_dt+timedelta(hours = int(c.dtime))
881                        # >= t_enddate:
882                        # Note that svdp[0] has not been popped in this case
883
884                        if step == c.maxstep and c.maxstep > 12 or \
885                           t_dt == t_enddate:
886
887                            values = svdp[3]
888                            codes_set_values(gid, values)
889                            codes_set(gid, 'step', 0)
890                            truedatetime = t_m2dt + timedelta(hours=
891                                                              2*int(c.dtime))
892                            codes_set(gid, 'time', truedatetime.hour * 100)
893                            codes_set(gid, 'date', truedatetime.year * 10000 +
894                                      truedatetime.month * 100 +
895                                      truedatetime.day)
896                            codes_write(gid, h_handle)
897
898                            #values = (svdp[1]+svdp[2])/2.
899                            if cparamId == '142' or cparamId == '143':
900                                values = disaggregation.darain(list(reversed(svdp)))
901                            else:
902                                values = disaggregation.dapoly(list(reversed(svdp)))
903
904                            codes_set(gid, 'step', 0)
905                            truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
906                            codes_set(gid, 'time', truedatetime.hour * 100)
907                            codes_set(gid, 'date', truedatetime.year * 10000 +
908                                      truedatetime.month * 100 +
909                                      truedatetime.day)
910                            codes_set_values(gid, values)
911                            codes_write(gid, g_handle)
912
913                codes_release(gid)
914
915                gid = codes_new_from_index(iid)
916
917            f_handle.close()
918            g_handle.close()
919            h_handle.close()
920
921        codes_index_release(iid)
922
923        return
924
925
926    def create(self, inputfiles, c):
927        '''An index file will be created which depends on the combination
928        of "date", "time" and "stepRange" values. This is used to iterate
929        over all messages in each grib file which were passed through the
930        parameter "inputfiles" to seperate specific parameters into fort.*
931        files. Afterwards the FORTRAN program is called to convert
932        the data fields all to the same grid and put them in one file
933        per unique time step (combination of "date", "time" and
934        "stepRange").
935
936        Note
937        ----
938        This method is based on the ECMWF example index.py
939        https://software.ecmwf.int/wiki/display/GRIB/index.py
940
941        Parameters
942        ----------
943        inputfiles : :obj:`UioFiles`
944            Contains a list of files.
945
946        c : :obj:`ControlFile`
947            Contains all the parameters of CONTROL file and
948            command line.
949
950        Return
951        ------
952
953        '''
954
955        if c.wrf:
956            table128 = init128(_config.PATH_GRIBTABLE)
957            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
958                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
959                                  table128)
960
961        # these numbers are indices for the temporary files "fort.xx"
962        # which are used to seperate the grib fields to,
963        # for the Fortran program input
964        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
965        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
966        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
967                 '17':None, '18':None, '19':None, '21':None, '22':None}
968
969        iid = None
970        index_vals = None
971
972        # get the values of the keys which are used for distinct access
973        # of grib messages via product
974        index_keys = ["date", "time", "step"]
975        iid, index_vals = self._mk_index_values(c.inputdir,
976                                                inputfiles,
977                                                index_keys)
978        # index_vals looks like e.g.:
979        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
980        # index_vals[1]: ('0', '1200', '1800', '600') ; time
981        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
982
983        # "product" genereates each possible combination between the
984        # values of the index keys
985        for prod in product(*index_vals):
986            # e.g. prod = ('20170505', '0', '12')
987            #             (  date    ,time, step)
988
989            print('current product: ', prod)
990
991            for i in range(len(index_keys)):
992                codes_index_select(iid, index_keys[i], prod[i])
993
994            # get first id from current product
995            gid = codes_new_from_index(iid)
996
997            # if there is no data for this specific time combination / product
998            # skip the rest of the for loop and start with next timestep/product
999            if not gid:
1000                continue
1001
1002            # remove old fort.* files and open new ones
1003            # they are just valid for a single product
1004            for k, f in fdict.iteritems():
1005                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1006                silent_remove(fortfile)
1007                fdict[k] = open(fortfile, 'w')
1008
1009            # create correct timestamp from the three time informations
1010            cdate = str(codes_get(gid, 'date'))
1011            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1012            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1013            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1014            timestamp += timedelta(hours=int(cstep))
1015            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1016
1017            # if the timestamp is out of basetime start/end date period,
1018            # skip this specific product
1019            if c.basetime:
1020                start_time = datetime.strptime(c.end_date + c.basetime,
1021                                                '%Y%m%d%H') - time_delta
1022                end_time = datetime.strptime(c.end_date + c.basetime,
1023                                             '%Y%m%d%H')
1024                if timestamp < start_time or timestamp > end_time:
1025                    continue
1026
1027            if c.wrf:
1028                if 'olddate' not in locals() or cdate != olddate:
1029                    fwrf = open(os.path.join(c.outputdir,
1030                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1031                    olddate = cdate[:]
1032
1033            # savedfields remembers which fields were already used.
1034            savedfields = []
1035            # sum of cloud liquid and ice water content
1036            scwc = None
1037            while 1:
1038                if not gid:
1039                    break
1040                paramId = codes_get(gid, 'paramId')
1041                gridtype = codes_get(gid, 'gridType')
1042                levtype = codes_get(gid, 'typeOfLevel')
1043                if paramId == 77: # ETADOT
1044                    codes_write(gid, fdict['21'])
1045                elif paramId == 130: # T
1046                    codes_write(gid, fdict['11'])
1047                elif paramId == 131 or paramId == 132: # U, V wind component
1048                    codes_write(gid, fdict['10'])
1049                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1050                    codes_write(gid, fdict['17'])
1051                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1052                    codes_write(gid, fdict['18'])
1053                elif paramId == 135: # W
1054                    codes_write(gid, fdict['19'])
1055                elif paramId == 152: # LNSP
1056                    codes_write(gid, fdict['12'])
1057                elif paramId == 155 and gridtype == 'sh': # D
1058                    codes_write(gid, fdict['13'])
1059                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1060                    # sum cloud liquid water and ice
1061                    if scwc is None:
1062                        scwc = codes_get_values(gid)
1063                    else:
1064                        scwc += codes_get_values(gid)
1065                        codes_set_values(gid, scwc)
1066                        codes_set(gid, 'paramId', 201031)
1067                        codes_write(gid, fdict['22'])
1068                elif c.wrf and paramId in [129, 138, 155] and \
1069                      levtype == 'hybrid': # Z, VO, D
1070                    # do not do anything right now
1071                    # these are specific parameter for WRF
1072                    pass
1073                else:
1074                    if paramId not in savedfields:
1075                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1076                        # and all ADDPAR parameter
1077                        codes_write(gid, fdict['16'])
1078                        savedfields.append(paramId)
1079                    else:
1080                        print('duplicate ' + str(paramId) + ' not written')
1081
1082                try:
1083                    if c.wrf:
1084                        # model layer
1085                        if levtype == 'hybrid' and \
1086                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1087                            codes_write(gid, fwrf)
1088                        # sfc layer
1089                        elif paramId in wrfpars:
1090                            codes_write(gid, fwrf)
1091                except AttributeError:
1092                    pass
1093
1094                codes_release(gid)
1095                gid = codes_new_from_index(iid)
1096
1097            for f in fdict.values():
1098                f.close()
1099
1100            # call for Fortran program to convert e.g. reduced_gg grids to
1101            # regular_ll and calculate detadot/dp
1102            pwd = os.getcwd()
1103            os.chdir(c.inputdir)
1104            if os.stat('fort.21').st_size == 0 and c.eta:
1105                print('Parameter 77 (etadot) is missing, most likely it is \
1106                       not available for this type or date/time\n')
1107                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1108                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1109                         is set to 1 in CONTROL file')
1110
1111            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1112            p = subprocess.check_call([os.path.join(
1113                c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1114            os.chdir(pwd)
1115
1116            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1117            if c.maxstep > 12:
1118                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1119            else:
1120                suffix = cdate_hour[2:10]
1121            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1122            print("outputfile = " + fnout)
1123            # collect for final processing
1124            self.outputfilelist.append(os.path.basename(fnout))
1125
1126            # create outputfile and copy all data from intermediate files
1127            # to the outputfile (final GRIB input files for FLEXPART)
1128            orolsm = os.path.basename(glob.glob(c.inputdir +
1129                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1130            fluxfile = 'flux' + cdate[0:2] + suffix
1131            if not c.cwc:
1132                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1133            else:
1134                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1135
1136            with open(fnout, 'wb') as fout:
1137                for f in flist:
1138                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1139                                       fout)
1140
1141            if c.omega:
1142                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1143                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1144                                            'rb'), fout)
1145
1146        if c.wrf:
1147            fwrf.close()
1148
1149        codes_index_release(iid)
1150
1151        return
1152
1153
1154    def process_output(self, c):
1155        '''The grib files are postprocessed depending on the selection in
1156        CONTROL file. The resulting files are moved to the output
1157        directory if its not equal to the input directory.
1158        The following modifications might be done if
1159        properly switched in CONTROL file:
1160        GRIB2 - Conversion to GRIB2
1161        ECTRANS - Transfer of files to gateway server
1162        ECSTORAGE - Storage at ECMWF server
1163
1164        Parameters
1165        ----------
1166        c : :obj:`ControlFile`
1167            Contains all the parameters of CONTROL file and
1168            command line.
1169
1170        Return
1171        ------
1172
1173        '''
1174
1175        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1176
1177        if not c.ecapi:
1178            print('ecstorage: {}\n ecfsdir: {}\n'.
1179                  format(c.ecstorage, c.ecfsdir))
1180            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1181                  .format(c.ectrans, c.gateway, c.destination))
1182
1183        print('Output filelist: ')
1184        print(self.outputfilelist)
1185
1186        for ofile in self.outputfilelist:
1187            ofile = os.path.join(self.inputdir, ofile)
1188
1189            if c.format.lower() == 'grib2':
1190                p = subprocess.check_call(['codes_set', '-s', 'edition=2,',
1191                                           'productDefinitionTemplateNumber=8',
1192                                           ofile, ofile + '_2'])
1193                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1194
1195            if c.ectrans and not c.ecapi:
1196                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1197                                           c.gateway, '-remote', c.destination,
1198                                           '-source', ofile])
1199
1200            if c.ecstorage and not c.ecapi:
1201                p = subprocess.check_call(['ecp', '-o', ofile,
1202                                           os.path.expandvars(c.ecfsdir)])
1203
1204            if c.outputdir != c.inputdir:
1205                p = subprocess.check_call(['mv',
1206                                           os.path.join(c.inputdir, ofile),
1207                                           c.outputdir])
1208
1209        return
1210
1211
1212    def prepare_fp_files(self, c):
1213        '''Conversion of GRIB files to FLEXPART binary format.
1214
1215        Parameters
1216        ----------
1217        c : :obj:`ControlFile`
1218            Contains all the parameters of CONTROL file and
1219            command line.
1220
1221        Return
1222        ------
1223
1224        '''
1225        # generate AVAILABLE file
1226        # Example of AVAILABLE file data:
1227        # 20131107 000000      EN13110700              ON DISC
1228        clist = []
1229        for ofile in self.outputfilelist:
1230            fname = ofile.split('/')
1231            if '.' in fname[-1]:
1232                l = fname[-1].split('.')
1233                timestamp = datetime.strptime(l[0][-6:] + l[1],
1234                                              '%y%m%d%H')
1235                timestamp += timedelta(hours=int(l[2]))
1236                cdate = datetime.strftime(timestamp, '%Y%m%d')
1237                chms = datetime.strftime(timestamp, '%H%M%S')
1238            else:
1239                cdate = '20' + fname[-1][-8:-2]
1240                chms = fname[-1][-2:] + '0000'
1241            clist.append(cdate + ' ' + chms + ' '*6 +
1242                         fname[-1] + ' '*14 + 'ON DISC')
1243        clist.sort()
1244        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1245            f.write('\n'.join(clist) + '\n')
1246
1247        # generate pathnames file
1248        pwd = os.path.abspath(c.outputdir)
1249        with open(pwd + '/pathnames', 'w') as f:
1250            f.write(pwd + '/Options/\n')
1251            f.write(pwd + '/\n')
1252            f.write(pwd + '/\n')
1253            f.write(pwd + '/AVAILABLE\n')
1254            f.write(' = == = == = == = == = == ==  = \n')
1255
1256        # create Options dir if necessary
1257        if not os.path.exists(pwd + '/Options'):
1258            make_dir(pwd+'/Options')
1259
1260        # read template COMMAND file
1261        with open(os.path.expandvars(os.path.expanduser(
1262            c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f:
1263            lflist = f.read().split('\n')
1264
1265        # find index of list where to put in the
1266        # date and time information
1267        # usually after the LDIRECT parameter
1268        i = 0
1269        for l in lflist:
1270            if 'LDIRECT' in l.upper():
1271                break
1272            i += 1
1273
1274        # insert the date and time information of run start and end
1275        # into the list of lines of COMMAND file
1276        lflist = lflist[:i+1] + \
1277                 [clist[0][:16], clist[-1][:16]] + \
1278                 lflist[i+3:]
1279
1280        # write the new COMMAND file
1281        with open(pwd + '/Options/COMMAND', 'w') as g:
1282            g.write('\n'.join(lflist) + '\n')
1283
1284        # change to outputdir and start the grib2flexpart run
1285        # afterwards switch back to the working dir
1286        os.chdir(c.outputdir)
1287        p = subprocess.check_call([
1288            os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts))
1289            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1290        os.chdir(pwd)
1291
1292        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG