source: flex_extract.git/source/python/classes/EcFlexpart.py @ d43706b

ctbtodev
Last change on this file since d43706b was d43706b, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

BugFix?: sorted time for product generation

  • Property mode set to 100644
File size: 52.0 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83
84from eccodes import (codes_index_select, codes_new_from_index, codes_get,
85                     codes_get_values, codes_set_values, codes_set,
86                     codes_write, codes_release, codes_new_from_index,
87                     codes_index_release, codes_index_get)
88
89# software specific classes and modules from flex_extract
90sys.path.append('../')
91import _config
92from GribTools import GribTools
93from mods.tools import (init128, to_param_id, silent_remove, product,
94                        my_error, make_dir)
95from MarsRetrieval import MarsRetrieval
96import mods.disaggregation as disaggregation
97
98# ------------------------------------------------------------------------------
99# CLASS
100# ------------------------------------------------------------------------------
101class EcFlexpart(object):
102    '''
103    Class to retrieve FLEXPART specific ECMWF data.
104    '''
105    # --------------------------------------------------------------------------
106    # CLASS FUNCTIONS
107    # --------------------------------------------------------------------------
108    def __init__(self, c, fluxes=False):
109        '''Creates an object/instance of EcFlexpart with the associated
110        settings of its attributes for the retrieval.
111
112        Parameters:
113        -----------
114        c : :obj:`ControlFile`
115            Contains all the parameters of CONTROL file and
116            command line.
117
118        fluxes : :obj:`boolean`, optional
119            Decides if the flux parameter settings are stored or
120            the rest of the parameter list.
121            Default value is False.
122
123        Return
124        ------
125
126        '''
127        # set a counter for the number of mars requests generated
128        self.mreq_count = 0
129
130        # different mars types for retrieving data for flexpart
131        self.types = dict()
132
133        # Pure forecast mode
134        if c.maxstep > len(c.type) and 'AN' not in c.type:
135            c.type = [c.type[0]]
136            c.step = ['{:0>3}'.format(int(c.step[0]))]
137            c.time = [c.time[0]]
138            for i in range(1, c.maxstep + 1):
139                c.type.append(c.type[0])
140                c.step.append('{:0>3}'.format(i))
141                c.time.append(c.time[0])
142
143        self.inputdir = c.inputdir
144        self.dataset = c.dataset
145        self.basetime = c.basetime
146        self.dtime = c.dtime
147        i = 0
148        if fluxes and c.maxstep <= 24:
149            # no forecast beyond one day is needed!
150            # Thus, prepare flux data manually as usual
151            # with only forecast fields with start times at 00/12
152            # (but without 00/12 fields since these are
153            # the initialisation times of the flux fields
154            # and therefore are zero all the time)
155            self.types[str(c.acctype)] = {'times': str(c.acctime),
156                                          'steps': '{}/to/{}/by/{}'.format(
157                                              c.dtime, c.accmaxstep, c.dtime)}
158        else:
159            for ty, st, ti in zip(c.type, c.step, c.time):
160                btlist = range(24)
161                if c.basetime == '12':
162                    btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
163                if c.basetime == '00':
164                    btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
165
166                if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
167                    (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
168                     (int(c.step[i]) % int(c.dtime) == 0)) ) and \
169                    (int(c.time[i]) in btlist or c.maxstep > 24):
170
171                    if ty not in self.types.keys():
172                        self.types[ty] = {'times': '', 'steps': ''}
173
174                    if ti not in self.types[ty]['times']:
175                        if self.types[ty]['times']:
176                            self.types[ty]['times'] += '/'
177                        self.types[ty]['times'] += ti
178
179                    if st not in self.types[ty]['steps']:
180                        if self.types[ty]['steps']:
181                            self.types[ty]['steps'] += '/'
182                        self.types[ty]['steps'] += st
183                i += 1
184
185        self.marsclass = c.marsclass
186        self.stream = c.stream
187        self.number = c.number
188        self.resol = c.resol
189        self.accuracy = c.accuracy
190        self.level = c.level
191        self.expver = c.expver
192        self.levelist = c.levelist
193        # for gaussian grid retrieval
194        self.glevelist = '1/to/' + c.level
195        self.gaussian = c.gaussian
196
197        if 'N' in c.grid:  # Gaussian output grid
198            self.grid = c.grid
199            self.area = 'G'
200        else:
201            self.grid = '{}/{}'.format(int(c.grid) / 1000., int(c.grid) / 1000.)
202            self.area = '{}/{}/{}/{}'.format(int(c.upper) / 1000.,
203                                             int(c.left) / 1000.,
204                                             int(c.lower) / 1000.,
205                                             int(c.right) / 1000.)
206
207        self.outputfilelist = []
208
209
210        # Now comes the nasty part that deals with the different
211        # scenarios we have:
212        # 1) Calculation of etadot on
213        #    a) Gaussian grid
214        #    b) Output grid
215        #    c) Output grid using parameter 77 retrieved from MARS
216        # 3) Calculation/Retrieval of omega
217        # 4) Download also data for WRF
218
219        # Different grids need different retrievals
220        # SH = Spherical Harmonics, GG = Gaussian Grid,
221        # OG = Output Grid, ML = MultiLevel, SL = SingleLevel
222        self.params = {'SH__ML': '', 'SH__SL': '',
223                       'GG__ML': '', 'GG__SL': '',
224                       'OG__ML': '', 'OG__SL': '',
225                       'OG_OROLSM_SL': '', 'OG_acc_SL': ''}
226        # the self.params dictionary stores a list of
227        # [param, levtype, levelist, grid] per key
228
229        if fluxes is False:
230            self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
231            #                        "SD/MSL/TCC/10U/10V/2T/2D/129/172"
232            self.params['OG__SL'] = ["141/151/164/165/166/167/168/129/172", \
233                                     'SFC', '1', self.grid]
234            if c.addpar:
235                if c.addpar[0] == '/':
236                    c.addpar = c.addpar[1:]
237                self.params['OG__SL'][0] += '/' + '/'.join(c.addpar)
238
239            if c.marsclass.upper() == 'EA' or c.marsclass.upper() == 'EP':
240                self.params['OG_OROLSM__SL'] = ["160/27/28/244",
241                                                'SFC', '1', self.grid]
242            else:
243                self.params['OG_OROLSM__SL'] = ["160/27/28/173", \
244                                                'SFC', '1', self.grid]
245
246            self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
247
248            #if c.gauss == '0' and c.eta == '1':
249            if not c.gauss and c.eta:
250                # the simplest case
251                self.params['OG__ML'][0] += '/U/V/77'
252            #elif c.gauss == '0' and c.eta == '0':
253            elif not c.gauss and not c.eta:
254            # this is not recommended (inaccurate)
255                self.params['OG__ML'][0] += '/U/V'
256            #elif c.gauss == '1' and c.eta == '0':
257            elif c.gauss and not c.eta:
258                # this is needed for data before 2008, or for reanalysis data
259                self.params['GG__SL'] = ['Q', 'ML', '1', \
260                                         '{}'.format((int(self.resol) + 1) / 2)]
261                self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
262            else:
263                print('Warning: This is a very costly parameter combination, \
264                      use only for debugging!')
265                self.params['GG__SL'] = ['Q', 'ML', '1', \
266                                         '{}'.format((int(self.resol) + 1) / 2)]
267                self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
268                                         '{}'.format((int(self.resol) + 1) / 2)]
269
270            if c.omega:
271                self.params['OG__ML'][0] += '/W'
272
273            # add cloud water content if necessary
274            if c.cwc:
275                self.params['OG__ML'][0] += '/CLWC/CIWC'
276
277            # add vorticity and geopotential height for WRF if necessary
278            if c.wrf:
279                self.params['OG__ML'][0] += '/Z/VO'
280                if '/D' not in self.params['OG__ML'][0]:
281                    self.params['OG__ML'][0] += '/D'
282                #wrf_sfc = 'sp/msl/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/stl1/ /
283                #           stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4'.upper()
284                wrf_sfc = '134/235/167/165/166/168/129/172/34/31/141/ \
285                           139/170/183/236/39/40/41/42'
286                lwrt_sfc = wrf_sfc.split('/')
287                for par in lwrt_sfc:
288                    if par not in self.params['OG__SL'][0]:
289                        self.params['OG__SL'][0] += '/' + par
290
291        else:
292            self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
293                                        'SFC', '1', self.grid]
294
295        return
296
297
298    def _mk_targetname(self, ftype, param, date):
299        '''Creates the filename for the requested grib data to be stored in.
300        This name is passed as the "target" parameter in the request.
301
302        Parameters
303        ----------
304        ftype : :obj:`string`
305            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
306
307        param : :obj:`string`
308            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
309            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
310
311        date : :obj:`string`
312            The date period of the grib data to be stored in this file.
313
314        Return
315        ------
316        targetname : :obj:`string`
317            The target filename for the grib data.
318        '''
319        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
320                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
321
322        return targetname
323
324
325    def _start_retrievement(self, request, par_dict):
326        '''Creates the Mars Retrieval and prints or submits the request
327        depending on the status of the request variable.
328
329        Parameters
330        ----------
331        request : :obj:`integer`
332            Selects the mode of retrieval.
333            0: Retrieves the data from ECMWF.
334            1: Prints the mars requests to an output file.
335            2: Retrieves the data and prints the mars request.
336
337        par_dict : :obj:`dictionary`
338            Contains all parameter which have to be set for creating the
339            Mars Retrievals. The parameter are:
340            marsclass, dataset, stream, type, levtype, levelist, resol,
341            gaussian, accuracy, grid, target, area, date, time, number,
342            step, expver, param
343
344        Return
345        ------
346
347        '''
348        # increase number of mars requests
349        self.mreq_count += 1
350
351        MR = MarsRetrieval(self.server,
352                           self.public,
353                           marsclass=par_dict['marsclass'],
354                           dataset=par_dict['dataset'],
355                           stream=par_dict['stream'],
356                           type=par_dict['type'],
357                           levtype=par_dict['levtype'],
358                           levelist=par_dict['levelist'],
359                           resol=par_dict['resol'],
360                           gaussian=par_dict['gaussian'],
361                           accuracy=par_dict['accuracy'],
362                           grid=par_dict['grid'],
363                           target=par_dict['target'],
364                           area=par_dict['area'],
365                           date=par_dict['date'],
366                           time=par_dict['time'],
367                           number=par_dict['number'],
368                           step=par_dict['step'],
369                           expver=par_dict['expver'],
370                           param=par_dict['param'])
371
372        if request == 0:
373            MR.display_info()
374            MR.data_retrieve()
375        elif request == 1:
376            MR.print_infodata_csv(self.inputdir, self.mreq_count)
377        elif request == 2:
378            MR.print_infodata_csv(self.inputdir, self.mreq_count)
379            MR.display_info()
380            MR.data_retrieve()
381        else:
382            print('Failure')
383
384        return
385
386
387    def _mk_index_values(self, inputdir, inputfiles, keys):
388        '''Creates an index file for a set of grib parameter keys.
389        The values from the index keys are returned in a list.
390
391        Parameters
392        ----------
393        keys : :obj:`dictionary`
394            List of parameter names which serves as index.
395
396        inputfiles : :obj:`UioFiles`
397            Contains a list of files.
398
399        Return
400        ------
401        iid : :obj:`codes_index`
402            This is a grib specific index structure to access
403            messages in a file.
404
405        index_vals : :obj:`list`
406            Contains the values from the keys used for a distinct selection
407            of grib messages in processing  the grib files.
408            Content looks like e.g.:
409            index_vals[0]: ('20171106', '20171107', '20171108') ; date
410            index_vals[1]: ('0', '1200', '1800', '600') ; time
411            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
412        '''
413        iid = None
414        index_keys = keys
415
416        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
417        silent_remove(indexfile)
418        grib = GribTools(inputfiles.files)
419        # creates new index file
420        iid = grib.index(index_keys=index_keys, index_file=indexfile)
421
422        # read the values of index keys
423        index_vals = []
424        for key in index_keys:
425            key_vals = codes_index_get(iid, key)
426            # have to sort the key values for correct disaggregation,
427            # therefore convert to int first
428            key_vals = [int(k) for k in key_vals]
429            key_vals.sort()
430            key_vals = [str(k) for k in key_vals]
431            index_vals.append(key_vals)
432            # index_vals looks for example like:
433            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
434            # index_vals[1]: ('0', '1200') ; time
435            # index_vals[2]: (3', '6', '9', '12') ; stepRange
436
437        return iid, index_vals
438
439
440    def retrieve(self, server, dates, public, request, inputdir='.'):
441        '''Finalizing the retrieval information by setting final details
442        depending on grid type.
443        Prepares MARS retrievals per grid type and submits them.
444
445        Parameters
446        ----------
447        server : :obj:`ECMWFService` or :obj:`ECMWFDataServer`
448            The connection to the ECMWF server. This is different
449            for member state users which have full access and non
450            member state users which have only access to the public
451            data sets. The decision is made from command line argument
452            "public"; for public access its True (ECMWFDataServer)
453            for member state users its False (ECMWFService)
454
455        dates : :obj:`string`
456            Contains start and end date of the retrieval in the format
457            "YYYYMMDD/to/YYYYMMDD"
458
459        request : :obj:`integer`
460            Selects the mode of retrieval.
461            0: Retrieves the data from ECMWF.
462            1: Prints the mars requests to an output file.
463            2: Retrieves the data and prints the mars request.
464
465        inputdir : :obj:`string`, optional
466            Path to the directory where the retrieved data is about
467            to be stored. The default is the current directory ('.').
468
469        Return
470        ------
471
472        '''
473        self.dates = dates
474        self.server = server
475        self.public = public
476        self.inputdir = inputdir
477        oro = False
478
479        # define times with datetime module
480        t12h = timedelta(hours=12)
481        t24h = timedelta(hours=24)
482
483        # dictionary which contains all parameter for the mars request,
484        # entries with a "None" will change in different requests and will
485        # therefore be set in each request seperately
486        retr_param_dict = {'marsclass':self.marsclass,
487                           'dataset':self.dataset,
488                           'stream':None,
489                           'type':None,
490                           'levtype':None,
491                           'levelist':None,
492                           'resol':self.resol,
493                           'gaussian':None,
494                           'accuracy':self.accuracy,
495                           'grid':None,
496                           'target':None,
497                           'area':None,
498                           'date':None,
499                           'time':None,
500                           'number':self.number,
501                           'step':None,
502                           'expver':self.expver,
503                           'param':None}
504
505        for ftype in self.types:
506            # fk contains field types such as
507            #     [AN, FC, PF, CV]
508            # fv contains all of the items of the belonging key
509            #     [times, steps]
510            for pk, pv in self.params.iteritems():
511                # pk contains one of these keys of params
512                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
513                #      OG_OROLSM_SL, OG_acc_SL]
514                # pv contains all of the items of the belonging key
515                #     [param, levtype, levelist, grid]
516                if isinstance(pv, str):
517                    continue
518                retr_param_dict['type'] = ftype
519                retr_param_dict['time'] = self.types[ftype]['times']
520                retr_param_dict['step'] = self.types[ftype]['steps']
521                retr_param_dict['date'] = self.dates
522                retr_param_dict['stream'] = self.stream
523                retr_param_dict['target'] = \
524                    self._mk_targetname(ftype,
525                                        pk,
526                                        retr_param_dict['date'].split('/')[0])
527                retr_param_dict['param'] = pv[0]
528                retr_param_dict['levtype'] = pv[1]
529                retr_param_dict['levelist'] = pv[2]
530                retr_param_dict['grid'] = pv[3]
531                retr_param_dict['area'] = self.area
532                retr_param_dict['gaussian'] = self.gaussian
533
534                if pk == 'OG__SL':
535                    pass
536                if pk == 'OG_OROLSM__SL' and not oro:
537                    oro = True
538                    # in CERA20C (class EP) there is no stream "OPER"!
539                    if self.marsclass.upper() != 'EP':
540                        retr_param_dict['stream'] = 'OPER'
541                    retr_param_dict['type'] = 'AN'
542                    retr_param_dict['time'] = '00'
543                    retr_param_dict['step'] = '000'
544                    retr_param_dict['date'] = self.dates.split('/')[0]
545                    retr_param_dict['target'] = self._mk_targetname('',
546                                            pk, retr_param_dict['date'])
547                elif pk == 'OG_OROLSM__SL' and oro:
548                    continue
549                if pk == 'GG__SL' and pv[0] == 'Q':
550                    retr_param_dict['area'] = ""
551                    retr_param_dict['gaussian'] = 'reduced'
552
553    # ------  on demand path  --------------------------------------------------
554                if not self.basetime:
555                    # ******* start retrievement
556                    self._start_retrievement(request, retr_param_dict)
557    # ------  operational path  ------------------------------------------------
558                else:
559                    # check if mars job requests fields beyond basetime.
560                    # if yes eliminate those fields since they may not
561                    # be accessible with user's credentials
562
563                    enddate = retr_param_dict['date'].split('/')[-1]
564                    elimit = datetime.strptime(enddate + self.basetime,
565                                               '%Y%m%d%H')
566
567                    if self.basetime == '12':
568                        # --------------  flux data ----------------------------
569                        if 'acc' in pk:
570
571                        # Strategy:
572                        # if maxtime-elimit >= 24h reduce date by 1,
573                        # if 12h <= maxtime-elimit<12h reduce time for last date
574                        # if maxtime-elimit<12h reduce step for last time
575                        # A split of the MARS job into 2 is likely necessary.
576
577
578                            startdate = retr_param_dict['date'].split('/')[0]
579                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
580                            retr_param_dict['date'] = '/'.join([startdate,
581                                                                'to',
582                                                                enddate])
583
584                            # ******* start retrievement
585                            self._start_retrievement(request, retr_param_dict)
586
587                            retr_param_dict['date'] = \
588                                datetime.strftime(elimit - t12h, '%Y%m%d')
589                            retr_param_dict['time'] = '00'
590                            retr_param_dict['target'] = \
591                                self._mk_targetname(ftype, pk,
592                                                    retr_param_dict['date'])
593
594                            # ******* start retrievement
595                            self._start_retrievement(request, retr_param_dict)
596
597                        # --------------  non flux data ------------------------
598                        else:
599                            # ******* start retrievement
600                            self._start_retrievement(request, retr_param_dict)
601
602                    else: # basetime = 0
603                        retr_param_dict['date'] = \
604                            datetime.strftime(elimit - t24h, '%Y%m%d')
605
606                        timesave = ''.join(retr_param_dict['time'])
607
608                        if '/' in retr_param_dict['time']:
609                            times = retr_param_dict['time'].split('/')
610                            steps = retr_param_dict['step'].split('/')
611                            while (pk != 'OG_OROLSM__SL' and
612                                   'acc' not in pk and
613                                   (int(times[0]) + int(steps[0])) <= 12):
614                                times = times[1:]
615
616                            if len(times) > 1:
617                                retr_param_dict['time'] = '/'.join(times)
618                            else:
619                                retr_param_dict['time'] = times[0]
620
621                        # ******* start retrievement
622                        self._start_retrievement(request, retr_param_dict)
623
624                        if (pk != 'OG_OROLSM__SL' and
625                            int(retr_param_dict['step'].split('/')[0]) == 0 and
626                            int(timesave.split('/')[0]) == 0):
627
628                            retr_param_dict['date'] = \
629                                datetime.strftime(elimit, '%Y%m%d')
630                            retr_param_dict['time'] = '00'
631                            retr_param_dict['step'] = '000'
632                            retr_param_dict['target'] = \
633                                self._mk_targetname(ftype, pk,
634                                                    retr_param_dict['date'])
635
636                            # ******* start retrievement
637                            self._start_retrievement(request, retr_param_dict)
638
639        if request == 0 or request == 2:
640            print('MARS retrieve done ... ')
641        elif request == 1:
642            print('MARS request printed ...')
643
644        return
645
646
647    def write_namelist(self, c):
648        '''Creates a namelist file in the temporary directory and writes
649        the following values to it: maxl, maxb, mlevel,
650        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
651        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
652
653        Parameters
654        ----------
655        c : :obj:`ControlFile`
656            Contains all the parameters of CONTROL file and
657            command line.
658
659        filename : :obj:`string`
660                Name of the namelist file.
661
662        Return
663        ------
664
665        '''
666
667        from genshi.template.text import NewTextTemplate
668        from genshi.template import  TemplateLoader
669
670        loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
671        namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
672                                       cls=NewTextTemplate)
673
674        self.inputdir = c.inputdir
675        area = np.asarray(self.area.split('/')).astype(float)
676        grid = np.asarray(self.grid.split('/')).astype(float)
677
678        if area[1] > area[3]:
679            area[1] -= 360
680        maxl = int((area[3] - area[1]) / grid[1]) + 1
681        maxb = int((area[0] - area[2]) / grid[0]) + 1
682
683        stream = namelist_template.generate(
684            maxl = str(maxl),
685            maxb = str(maxb),
686            mlevel = str(self.level),
687            mlevelist = str(self.levelist),
688            mnauf = str(self.resol),
689            metapar = '77',
690            rlo0 = str(area[1]),
691            rlo1 = str(area[3]),
692            rla0 = str(area[2]),
693            rla1 = str(area[0]),
694            momega = str(c.omega),
695            momegadiff = str(c.omegadiff),
696            mgauss = str(c.gauss),
697            msmooth = str(c.smooth),
698            meta = str(c.eta),
699            metadiff = str(c.etadiff),
700            mdpdeta = str(c.dpdeta)
701        )
702
703        namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
704
705        with open(namelistfile, 'w') as f:
706            f.write(stream.render('text'))
707
708        return
709
710
711    def deacc_fluxes(self, inputfiles, c):
712        '''Goes through all flux fields in ordered time and de-accumulate
713        the fields. Afterwards the fields are disaggregated in time.
714        Different versions of disaggregation is provided for rainfall
715        data (darain, modified linear) and the surface fluxes and
716        stress data (dapoly, cubic polynomial).
717
718        Parameters
719        ----------
720        inputfiles : :obj:`UioFiles`
721            Contains a list of files.
722
723        c : :obj:`ControlFile`
724            Contains all the parameters of CONTROL file and
725            command line.
726
727        Return
728        ------
729
730        '''
731
732        table128 = init128(_config.PATH_GRIBTABLE)
733        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
734
735        iid = None
736        index_vals = None
737
738        # get the values of the keys which are used for distinct access
739        # of grib messages via product
740        index_keys = ["date", "time", "step"]
741        iid, index_vals = self._mk_index_values(c.inputdir,
742                                                inputfiles,
743                                                index_keys)
744        # index_vals looks like e.g.:
745        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
746        # index_vals[1]: ('0', '1200', '1800', '600') ; time
747        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
748
749        # initialize dictionaries
750        valsdict = {}
751        svalsdict = {}
752        for p in pars:
753            valsdict[str(p)] = []
754            svalsdict[str(p)] = []
755
756        # "product" genereates each possible combination between the
757        # values of the index keys
758        for prod in product(*index_vals):
759            # e.g. prod = ('20170505', '0', '12')
760            #             (  date    ,time, step)
761
762            print('CURRENT PRODUCT: ', prod)
763
764            for i in range(len(index_keys)):
765                codes_index_select(iid, index_keys[i], prod[i])
766
767            # get first id from current product
768            gid = codes_new_from_index(iid)
769
770            # if there is no data for this specific time combination / product
771            # skip the rest of the for loop and start with next timestep/product
772            if not gid:
773                continue
774
775            # create correct timestamp from the three time informations
776            cdate = str(codes_get(gid, 'date'))
777            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
778            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
779            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
780            t_dt = t_date + timedelta(hours=int(cstep))
781            t_m1dt = t_date + timedelta(hours=int(cstep)-int(c.dtime))
782            t_m2dt = t_date + timedelta(hours=int(cstep)-2*int(c.dtime))
783            t_enddate = None
784
785            if c.maxstep > 12:
786                fnout = os.path.join(c.inputdir, 'flux' +
787                                     t_date.strftime('%Y%m%d.%H') +
788                                     '.{:0>3}'.format(step-2*int(c.dtime)))
789                gnout = os.path.join(c.inputdir, 'flux' +
790                                     t_date.strftime('%Y%m%d.%H') +
791                                     '.{:0>3}'.format(step-int(c.dtime)))
792                hnout = os.path.join(c.inputdir, 'flux' +
793                                     t_date.strftime('%Y%m%d.%H') +
794                                     '.{:0>3}'.format(step))
795            else:
796                fnout = os.path.join(c.inputdir, 'flux' +
797                                     t_m2dt.strftime('%Y%m%d%H'))
798                gnout = os.path.join(c.inputdir, 'flux' +
799                                     t_m1dt.strftime('%Y%m%d%H'))
800                hnout = os.path.join(c.inputdir, 'flux' +
801                                     t_dt.strftime('%Y%m%d%H'))
802
803            print("outputfile = " + fnout)
804            f_handle = open(fnout, 'w')
805            h_handle = open(hnout, 'w')
806            g_handle = open(gnout, 'w')
807
808            # read message for message and store relevant data fields
809            # data keywords are stored in pars
810            while 1:
811                if not gid:
812                    break
813                cparamId = str(codes_get(gid, 'paramId'))
814                step = codes_get(gid, 'step')
815                time = codes_get(gid, 'time')
816                ni = codes_get(gid, 'Ni')
817                nj = codes_get(gid, 'Nj')
818                if cparamId in valsdict.keys():
819                    values = codes_get_values(gid)
820                    vdp = valsdict[cparamId]
821                    svdp = svalsdict[cparamId]
822
823                    if cparamId == '142' or cparamId == '143':
824                        fak = 1. / 1000.
825                    else:
826                        fak = 3600.
827
828                    values = (np.reshape(values, (nj, ni))).flatten() / fak
829                    vdp.append(values[:])  # save the accumulated values
830                    if c.marsclass.upper() == 'EA' or \
831                       step <= int(c.dtime):
832                        svdp.append(values[:] / int(c.dtime))
833                    else:  # deaccumulate values
834                        svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime))
835
836                    print(cparamId, time, step, len(values),
837                          values[0], np.std(values))
838
839                    # len(svdp) corresponds to the time
840                    if len(svdp) >= 3:
841                        if len(svdp) > 3:
842                            if cparamId == '142' or cparamId == '143':
843                                values = disaggregation.darain(svdp)
844                            else:
845                                values = disaggregation.dapoly(svdp)
846
847                            if not (step == c.maxstep and c.maxstep > 12 \
848                                    or t_dt == t_enddate):
849                                vdp.pop(0)
850                                svdp.pop(0)
851                        else:
852                            if c.maxstep > 12:
853                                values = svdp[1]
854                            else:
855                                values = svdp[0]
856
857                        codes_set_values(gid, values)
858
859                        if c.maxstep > 12:
860                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
861                        else:
862                            codes_set(gid, 'stepRange', 0)
863                            codes_set(gid, 'time', t_m2dt.hour*100)
864                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
865
866                        codes_write(gid, f_handle)
867
868                        if c.basetime:
869                            t_enddate = datetime.strptime(c.end_date +
870                                                          c.basetime,
871                                                          '%Y%m%d%H')
872                        else:
873                            t_enddate = t_date + timedelta(2*int(c.dtime))
874
875                        # squeeze out information of last two steps contained
876                        # in svdp
877                        # if step+int(c.dtime) == c.maxstep and c.maxstep>12
878                        # or t_dt+timedelta(hours = int(c.dtime))
879                        # >= t_enddate:
880                        # Note that svdp[0] has not been popped in this case
881
882                        if step == c.maxstep and c.maxstep > 12 or \
883                           t_dt == t_enddate:
884
885                            values = svdp[3]
886                            codes_set_values(gid, values)
887                            codes_set(gid, 'stepRange', 0)
888                            truedatetime = t_m2dt + timedelta(hours=
889                                                              2*int(c.dtime))
890                            codes_set(gid, 'time', truedatetime.hour * 100)
891                            codes_set(gid, 'date', truedatetime.year * 10000 +
892                                      truedatetime.month * 100 +
893                                      truedatetime.day)
894                            codes_write(gid, h_handle)
895
896                            #values = (svdp[1]+svdp[2])/2.
897                            if cparamId == '142' or cparamId == '143':
898                                values = disaggregation.darain(list(reversed(svdp)))
899                            else:
900                                values = disaggregation.dapoly(list(reversed(svdp)))
901
902                            codes_set(gid, 'stepRange', 0)
903                            truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
904                            codes_set(gid, 'time', truedatetime.hour * 100)
905                            codes_set(gid, 'date', truedatetime.year * 10000 +
906                                      truedatetime.month * 100 +
907                                      truedatetime.day)
908                            codes_set_values(gid, values)
909                            codes_write(gid, g_handle)
910
911                codes_release(gid)
912
913                gid = codes_new_from_index(iid)
914
915            f_handle.close()
916            g_handle.close()
917            h_handle.close()
918
919        codes_index_release(iid)
920
921        return
922
923
924    def create(self, inputfiles, c):
925        '''An index file will be created which depends on the combination
926        of "date", "time" and "stepRange" values. This is used to iterate
927        over all messages in each grib file which were passed through the
928        parameter "inputfiles" to seperate specific parameters into fort.*
929        files. Afterwards the FORTRAN program is called to convert
930        the data fields all to the same grid and put them in one file
931        per unique time step (combination of "date", "time" and
932        "stepRange").
933
934        Note
935        ----
936        This method is based on the ECMWF example index.py
937        https://software.ecmwf.int/wiki/display/GRIB/index.py
938
939        Parameters
940        ----------
941        inputfiles : :obj:`UioFiles`
942            Contains a list of files.
943
944        c : :obj:`ControlFile`
945            Contains all the parameters of CONTROL file and
946            command line.
947
948        Return
949        ------
950
951        '''
952
953        if c.wrf:
954            table128 = init128(_config.PATH_GRIBTABLE)
955            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
956                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
957                                  table128)
958
959        # these numbers are indices for the temporary files "fort.xx"
960        # which are used to seperate the grib fields to,
961        # for the Fortran program input
962        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
963        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
964        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
965                 '17':None, '18':None, '19':None, '21':None, '22':None}
966
967        iid = None
968        index_vals = None
969
970        # get the values of the keys which are used for distinct access
971        # of grib messages via product
972        index_keys = ["date", "time", "step"]
973        iid, index_vals = self._mk_index_values(c.inputdir,
974                                                inputfiles,
975                                                index_keys)
976        # index_vals looks like e.g.:
977        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
978        # index_vals[1]: ('0', '1200', '1800', '600') ; time
979        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
980
981        # "product" genereates each possible combination between the
982        # values of the index keys
983        for prod in product(*index_vals):
984            # e.g. prod = ('20170505', '0', '12')
985            #             (  date    ,time, step)
986
987            print('current product: ', prod)
988
989            for i in range(len(index_keys)):
990                codes_index_select(iid, index_keys[i], prod[i])
991
992            # get first id from current product
993            gid = codes_new_from_index(iid)
994
995            # if there is no data for this specific time combination / product
996            # skip the rest of the for loop and start with next timestep/product
997            if not gid:
998                continue
999
1000            # remove old fort.* files and open new ones
1001            # they are just valid for a single product
1002            for k, f in fdict.iteritems():
1003                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1004                silent_remove(fortfile)
1005                fdict[k] = open(fortfile, 'w')
1006
1007            # create correct timestamp from the three time informations
1008            cdate = str(codes_get(gid, 'date'))
1009            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1010            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1011            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1012            timestamp += timedelta(hours=int(cstep))
1013            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1014
1015            # if the timestamp is out of basetime start/end date period,
1016            # skip this specific product
1017            if c.basetime:
1018                start_time = datetime.strptime(c.end_date + c.basetime,
1019                                                '%Y%m%d%H') - time_delta
1020                end_time = datetime.strptime(c.end_date + c.basetime,
1021                                             '%Y%m%d%H')
1022                if timestamp < start_time or timestamp > end_time:
1023                    continue
1024
1025            if c.wrf:
1026                if 'olddate' not in locals() or cdate != olddate:
1027                    fwrf = open(os.path.join(c.outputdir,
1028                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1029                    olddate = cdate[:]
1030
1031            # savedfields remembers which fields were already used.
1032            savedfields = []
1033            # sum of cloud liquid and ice water content
1034            scwc = None
1035            while 1:
1036                if not gid:
1037                    break
1038                paramId = codes_get(gid, 'paramId')
1039                gridtype = codes_get(gid, 'gridType')
1040                levtype = codes_get(gid, 'typeOfLevel')
1041                if paramId == 77: # ETADOT
1042                    codes_write(gid, fdict['21'])
1043                elif paramId == 130: # T
1044                    codes_write(gid, fdict['11'])
1045                elif paramId == 131 or paramId == 132: # U, V wind component
1046                    codes_write(gid, fdict['10'])
1047                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1048                    codes_write(gid, fdict['17'])
1049                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1050                    codes_write(gid, fdict['18'])
1051                elif paramId == 135: # W
1052                    codes_write(gid, fdict['19'])
1053                elif paramId == 152: # LNSP
1054                    codes_write(gid, fdict['12'])
1055                elif paramId == 155 and gridtype == 'sh': # D
1056                    codes_write(gid, fdict['13'])
1057                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1058                    # sum cloud liquid water and ice
1059                    if scwc is None:
1060                        scwc = codes_get_values(gid)
1061                    else:
1062                        scwc += codes_get_values(gid)
1063                        codes_set_values(gid, scwc)
1064                        codes_set(gid, 'paramId', 201031)
1065                        codes_write(gid, fdict['22'])
1066                elif c.wrf and paramId in [129, 138, 155] and \
1067                      levtype == 'hybrid': # Z, VO, D
1068                    # do not do anything right now
1069                    # these are specific parameter for WRF
1070                    pass
1071                else:
1072                    if paramId not in savedfields:
1073                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1074                        # and all ADDPAR parameter
1075                        codes_write(gid, fdict['16'])
1076                        savedfields.append(paramId)
1077                    else:
1078                        print('duplicate ' + str(paramId) + ' not written')
1079
1080                try:
1081                    if c.wrf:
1082                        # model layer
1083                        if levtype == 'hybrid' and \
1084                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1085                            codes_write(gid, fwrf)
1086                        # sfc layer
1087                        elif paramId in wrfpars:
1088                            codes_write(gid, fwrf)
1089                except AttributeError:
1090                    pass
1091
1092                codes_release(gid)
1093                gid = codes_new_from_index(iid)
1094
1095            for f in fdict.values():
1096                f.close()
1097
1098            # call for Fortran program to convert e.g. reduced_gg grids to
1099            # regular_ll and calculate detadot/dp
1100            pwd = os.getcwd()
1101            os.chdir(c.inputdir)
1102            if os.stat('fort.21').st_size == 0 and c.eta:
1103                print('Parameter 77 (etadot) is missing, most likely it is \
1104                       not available for this type or date/time\n')
1105                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1106                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1107                         is set to 1 in CONTROL file')
1108
1109            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1110            p = subprocess.check_call([os.path.join(
1111                c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1112            os.chdir(pwd)
1113
1114            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1115            if c.maxstep > 12:
1116                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1117            else:
1118                suffix = cdate_hour[2:10]
1119            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1120            print("outputfile = " + fnout)
1121            # collect for final processing
1122            self.outputfilelist.append(os.path.basename(fnout))
1123
1124            # create outputfile and copy all data from intermediate files
1125            # to the outputfile (final GRIB input files for FLEXPART)
1126            orolsm = os.path.basename(glob.glob(c.inputdir +
1127                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1128            fluxfile = 'flux' + cdate[0:2] + suffix
1129            if not c.cwc:
1130                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1131            else:
1132                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1133
1134            with open(fnout, 'wb') as fout:
1135                for f in flist:
1136                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1137                                       fout)
1138
1139            if c.omega:
1140                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1141                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1142                                            'rb'), fout)
1143
1144        if c.wrf:
1145            fwrf.close()
1146
1147        codes_index_release(iid)
1148
1149        return
1150
1151
1152    def process_output(self, c):
1153        '''The grib files are postprocessed depending on the selection in
1154        CONTROL file. The resulting files are moved to the output
1155        directory if its not equal to the input directory.
1156        The following modifications might be done if
1157        properly switched in CONTROL file:
1158        GRIB2 - Conversion to GRIB2
1159        ECTRANS - Transfer of files to gateway server
1160        ECSTORAGE - Storage at ECMWF server
1161
1162        Parameters
1163        ----------
1164        c : :obj:`ControlFile`
1165            Contains all the parameters of CONTROL file and
1166            command line.
1167
1168        Return
1169        ------
1170
1171        '''
1172
1173        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1174
1175        if not c.ecapi:
1176            print('ecstorage: {}\n ecfsdir: {}\n'.
1177                  format(c.ecstorage, c.ecfsdir))
1178            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1179                  .format(c.ectrans, c.gateway, c.destination))
1180
1181        print('Output filelist: ')
1182        print(self.outputfilelist)
1183
1184        for ofile in self.outputfilelist:
1185            ofile = os.path.join(self.inputdir, ofile)
1186
1187            if c.format.lower() == 'grib2':
1188                p = subprocess.check_call(['codes_set', '-s', 'edition=2,',
1189                                           'productDefinitionTemplateNumber=8',
1190                                           ofile, ofile + '_2'])
1191                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1192
1193            if c.ectrans and not c.ecapi:
1194                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1195                                           c.gateway, '-remote', c.destination,
1196                                           '-source', ofile])
1197
1198            if c.ecstorage and not c.ecapi:
1199                p = subprocess.check_call(['ecp', '-o', ofile,
1200                                           os.path.expandvars(c.ecfsdir)])
1201
1202            if c.outputdir != c.inputdir:
1203                p = subprocess.check_call(['mv',
1204                                           os.path.join(c.inputdir, ofile),
1205                                           c.outputdir])
1206
1207        return
1208
1209
1210    def prepare_fp_files(self, c):
1211        '''Conversion of GRIB files to FLEXPART binary format.
1212
1213        Parameters
1214        ----------
1215        c : :obj:`ControlFile`
1216            Contains all the parameters of CONTROL file and
1217            command line.
1218
1219        Return
1220        ------
1221
1222        '''
1223        # generate AVAILABLE file
1224        # Example of AVAILABLE file data:
1225        # 20131107 000000      EN13110700              ON DISC
1226        clist = []
1227        for ofile in self.outputfilelist:
1228            fname = ofile.split('/')
1229            if '.' in fname[-1]:
1230                l = fname[-1].split('.')
1231                timestamp = datetime.strptime(l[0][-6:] + l[1],
1232                                              '%y%m%d%H')
1233                timestamp += timedelta(hours=int(l[2]))
1234                cdate = datetime.strftime(timestamp, '%Y%m%d')
1235                chms = datetime.strftime(timestamp, '%H%M%S')
1236            else:
1237                cdate = '20' + fname[-1][-8:-2]
1238                chms = fname[-1][-2:] + '0000'
1239            clist.append(cdate + ' ' + chms + ' '*6 +
1240                         fname[-1] + ' '*14 + 'ON DISC')
1241        clist.sort()
1242        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1243            f.write('\n'.join(clist) + '\n')
1244
1245        # generate pathnames file
1246        pwd = os.path.abspath(c.outputdir)
1247        with open(pwd + '/pathnames', 'w') as f:
1248            f.write(pwd + '/Options/\n')
1249            f.write(pwd + '/\n')
1250            f.write(pwd + '/\n')
1251            f.write(pwd + '/AVAILABLE\n')
1252            f.write(' = == = == = == = == = == ==  = \n')
1253
1254        # create Options dir if necessary
1255        if not os.path.exists(pwd + '/Options'):
1256            make_dir(pwd+'/Options')
1257
1258        # read template COMMAND file
1259        with open(os.path.expandvars(os.path.expanduser(
1260            c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f:
1261            lflist = f.read().split('\n')
1262
1263        # find index of list where to put in the
1264        # date and time information
1265        # usually after the LDIRECT parameter
1266        i = 0
1267        for l in lflist:
1268            if 'LDIRECT' in l.upper():
1269                break
1270            i += 1
1271
1272        # insert the date and time information of run start and end
1273        # into the list of lines of COMMAND file
1274        lflist = lflist[:i+1] + \
1275                 [clist[0][:16], clist[-1][:16]] + \
1276                 lflist[i+3:]
1277
1278        # write the new COMMAND file
1279        with open(pwd + '/Options/COMMAND', 'w') as g:
1280            g.write('\n'.join(lflist) + '\n')
1281
1282        # change to outputdir and start the grib2flexpart run
1283        # afterwards switch back to the working dir
1284        os.chdir(c.outputdir)
1285        p = subprocess.check_call([
1286            os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts))
1287            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1288        os.chdir(pwd)
1289
1290        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG