source: flex_extract.git/source/python/classes/EcFlexpart.py @ 6f951ca

ctbtodev
Last change on this file since 6f951ca was 6f951ca, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

new style of docstring params and updates in docstrings

  • Property mode set to 100644
File size: 68.8 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get)
70
71# software specific classes and modules from flex_extract
72sys.path.append('../')
73import _config
74from GribUtil import GribUtil
75from mods.tools import (init128, to_param_id, silent_remove, product,
76                        my_error, make_dir, get_informations, get_dimensions,
77                        execute_subprocess)
78from MarsRetrieval import MarsRetrieval
79import mods.disaggregation as disaggregation
80
81# ------------------------------------------------------------------------------
82# CLASS
83# ------------------------------------------------------------------------------
84class EcFlexpart(object):
85    '''
86    Class to represent FLEXPART specific ECMWF data.
87
88    FLEXPART needs grib files in a specifc format. All necessary data fields
89    for one time step are stored in a single file. The class represents an
90    instance with all the parameter and settings necessary for retrieving
91    MARS data and modifing them so they are fitting FLEXPART needs. The class
92    is able to disaggregate the fluxes and convert grid types to the one needed
93    by FLEXPART, therefore using the FORTRAN program.
94
95    Attributes
96    ----------
97    mreq_count : int
98        Counter for the number of generated mars requests.
99
100    inputdir : str
101        Path to the directory where the retrieved data is stored.
102
103    dataset : str
104        For public datasets there is the specific naming and parameter
105        dataset which has to be used to characterize the type of
106        data.
107
108    basetime : str
109        The time for a half day retrieval. The 12 hours upfront are to be
110        retrieved.
111
112    dtime : str
113        Time step in hours.
114
115    acctype : str
116        The field type for the accumulated forecast fields.
117
118    acctime : str
119        The starting time from the accumulated forecasts.
120
121    accmaxstep : str
122        The maximum forecast step for the accumulated forecast fields.
123
124    marsclass : str
125        Characterisation of dataset.
126
127    stream : str
128        Identifies the forecasting system used to generate the data.
129
130    number : str
131        Selects the member in ensemble forecast run.
132
133    resol : str
134        Specifies the desired triangular truncation of retrieved data,
135        before carrying out any other selected post-processing.
136
137    accuracy : str
138        Specifies the number of bits per value to be used in the
139        generated GRIB coded fields.
140
141    addpar : str
142        List of additional parameters to be retrieved.
143
144    level : str
145        Specifies the maximum level.
146
147    expver : str
148        The version of the dataset.
149
150    levelist : str
151        Specifies the required levels.
152
153    glevelist : str
154        Specifies the required levels for gaussian grids.
155
156    gaussian : str
157        This parameter is deprecated and should no longer be used.
158        Specifies the desired type of Gaussian grid for the output.
159
160    grid : str
161        Specifies the output grid which can be either a Gaussian grid
162        or a Latitude/Longitude grid.
163
164    area : str
165        Specifies the desired sub-area of data to be extracted.
166
167    purefc : int
168        Switch for definition of pure forecast mode or not.
169
170    outputfilelist : list of str
171        The final list of FLEXPART ready input files.
172
173    types : dictionary
174        Determines the combination of type of fields, time and forecast step
175        to be retrieved.
176
177    params : dictionary
178        Collection of grid types and their corresponding parameters,
179        levels, level types and the grid definition.
180
181    server : ECMWFService or ECMWFDataServer
182        This is the connection to the ECMWF data servers.
183
184    public : int
185        Decides which Web API Server version is used.
186
187    dates : str
188        Contains start and end date of the retrieval in the format
189        "YYYYMMDD/to/YYYYMMDD"
190    '''
191
192    # --------------------------------------------------------------------------
193    # CLASS FUNCTIONS
194    # --------------------------------------------------------------------------
195    def __init__(self, c, fluxes=False):
196        '''Creates an object/instance of EcFlexpart with the associated
197        settings of its attributes for the retrieval.
198
199        Parameters:
200        -----------
201        c : ControlFile
202            Contains all the parameters of CONTROL file and
203            command line.
204
205        fluxes : boolean, optional
206            Decides if the flux parameter settings are stored or
207            the rest of the parameter list.
208            Default value is False.
209
210        Return
211        ------
212
213        '''
214        # set a counter for the number of generated mars requests
215        self.mreq_count = 0
216
217        self.inputdir = c.inputdir
218        self.dataset = c.dataset
219        self.basetime = c.basetime
220        self.dtime = c.dtime
221        self.acctype = c.acctype
222        self.acctime = c.acctime
223        self.accmaxstep = c.accmaxstep
224        self.marsclass = c.marsclass
225        self.stream = c.stream
226        self.number = c.number
227        self.resol = c.resol
228        self.accuracy = c.accuracy
229        self.addpar = c.addpar
230        self.level = c.level
231        self.expver = c.expver
232        self.levelist = c.levelist
233        self.glevelist = '1/to/' + c.level # in case of gaussian grid
234        self.gaussian = c.gaussian
235        self.grid = c.grid
236        self.area = c.area
237        self.purefc = c.purefc
238        self.outputfilelist = []
239
240        # Define the different types of field combinations (type, time, step)
241        self.types = {}
242        # Define the parameters and their level types, level list and
243        # grid resolution for the retrieval job
244        self.params = {}
245
246        if fluxes:
247            self._create_params_fluxes()
248        else:
249            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
250
251        if fluxes and not c.purefc:
252            self._create_field_types_fluxes()
253        else:
254            self._create_field_types(c.type, c.time, c.step)
255        return
256
257    def _create_field_types(self, ftype, ftime, fstep):
258        '''Create the combination of field type, time and forecast step.
259
260        Parameters:
261        -----------
262        ftype : list of str
263            List of field types.
264
265        ftime : list of str
266            The time in hours of the field.
267
268        fstep : str
269            Specifies the forecast time step from forecast base time.
270            Valid values are hours (HH) from forecast base time.
271
272        Return
273        ------
274
275        '''
276        i = 0
277        for ty, st, ti in zip(ftype, fstep, ftime):
278            btlist = range(24)
279            if self.basetime == '12':
280                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
281            if self.basetime == '00':
282                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
283
284            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
285                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
286                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
287                 # (int(c.time[i]) in btlist or c.purefc):
288
289            if (int(ti) in btlist) or self.purefc:
290
291                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
292                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
293
294                    if ty not in self.types.keys():
295                        self.types[ty] = {'times': '', 'steps': ''}
296
297                    if ti not in self.types[ty]['times']:
298                        if self.types[ty]['times']:
299                            self.types[ty]['times'] += '/'
300                        self.types[ty]['times'] += ti
301
302                    if st not in self.types[ty]['steps']:
303                        if self.types[ty]['steps']:
304                            self.types[ty]['steps'] += '/'
305                        self.types[ty]['steps'] += st
306            i += 1
307        return
308
309    def _create_field_types_fluxes(self):
310        '''Create the combination of field type, time and forecast step
311        for the flux data.
312
313        Parameters:
314        -----------
315
316        Return
317        ------
318
319        '''
320        self.types[str(self.acctype)] = {'times': str(self.acctime),
321                                         'steps': '{}/to/{}/by/{}'.format(
322                                             self.dtime,
323                                             self.accmaxstep,
324                                             self.dtime)}
325        return
326
327    def _create_params(self, gauss, eta, omega, cwc, wrf):
328        '''Define the specific parameter settings for retrievment.
329
330        The different parameters need specific grid types and level types
331        for retrievement. We might get following combination of types
332        (depending on selection and availability):
333        (These are short cuts for the grib file names (leading sequence)
334        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
335        where:
336            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
337            ML = Model Level, SL = Surface Level
338
339        For each of this combination there is a list of parameter names,
340        the level type, the level list and the grid resolution.
341
342        There are different scenarios for data extraction from MARS:
343        1) Retrieval of etadot
344           eta=1, gauss=0, omega=0
345        2) Calculation of etadot from divergence
346           eta=0, gauss=1, omega=0
347        3) Calculation of etadot from omega (for makes sense for debugging)
348           eta=0, gauss=0, omega=1
349        4) Retrieval and Calculation of etadot (only for debugging)
350           eta=1, gauss=1, omega=0
351        5) Download also specific model and surface level data for FLEXPART-WRF
352
353        Parameters:
354        -----------
355        gauss : int
356            Gaussian grid is retrieved.
357
358        eta : int
359            Etadot parameter will be directly retrieved.
360
361        omega : int
362            The omega paramterwill be retrieved.
363
364        cwc : int
365            The cloud liquid and ice water content will be retrieved.
366
367        wrf : int
368            Additional model level and surface level data will be retrieved for
369            WRF/FLEXPART-WRF simulations.
370
371        Return
372        ------
373
374        '''
375        # SURFACE FIELDS
376        #-----------------------------------------------------------------------
377        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
378        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
379                                 'SFC', '1', self.grid]
380        if self.addpar:
381            self.params['OG__SL'][0] += self.addpar
382
383        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
384            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
385                                            'SFC', '1', self.grid]
386        else:
387            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
388                                            'SFC', '1', self.grid]
389
390        # MODEL LEVEL FIELDS
391        #-----------------------------------------------------------------------
392        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
393
394        if not gauss and eta:
395            self.params['OG__ML'][0] += '/U/V/ETADOT'
396        elif gauss and not eta:
397            self.params['GG__SL'] = ['Q', 'ML', '1', \
398                                     '{}'.format((int(self.resol) + 1) / 2)]
399            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
400        elif not gauss and not eta:
401            self.params['OG__ML'][0] += '/U/V'
402        else:
403            print('Warning: Collecting etadot and parameters for gaussian grid \
404                            is a very costly parameter combination, \
405                            use this combination only for debugging!')
406            self.params['GG__SL'] = ['Q', 'ML', '1', \
407                                     '{}'.format((int(self.resol) + 1) / 2)]
408            self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
409                                     '{}'.format((int(self.resol) + 1) / 2)]
410
411        if omega:
412            self.params['OG__ML'][0] += '/W'
413
414        if cwc:
415            self.params['OG__ML'][0] += '/CLWC/CIWC'
416
417        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
418        #-----------------------------------------------------------------------
419        if wrf:
420            self.params['OG__ML'][0] += '/Z/VO'
421            if '/D' not in self.params['OG__ML'][0]:
422                self.params['OG__ML'][0] += '/D'
423            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
424                       'SWVL1','SWVL2','SWVL3','SWVL4']
425            for par in wrf_sfc:
426                if par not in self.params['OG__SL'][0]:
427                    self.params['OG__SL'][0] += '/' + par
428
429        return
430
431
432    def _create_params_fluxes(self):
433        '''Define the parameter setting for flux data.
434
435        Flux data are accumulated fields in time and are stored on the
436        surface level. The leading short cut name for the grib files is:
437        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
438        acc for Accumulated Grid.
439        The params dictionary stores a list of parameter names, the level type,
440        the level list and the grid resolution.
441
442        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
443
444        Parameters:
445        -----------
446
447        Return
448        ------
449
450        '''
451        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
452                                    'SFC', '1', self.grid]
453        return
454
455
456    def _mk_targetname(self, ftype, param, date):
457        '''Creates the filename for the requested grib data to be stored in.
458        This name is passed as the "target" parameter in the request.
459
460        Parameters
461        ----------
462        ftype : str
463            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
464
465        param : str
466            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
467            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
468
469        date : str
470            The date period of the grib data to be stored in this file.
471
472        Return
473        ------
474        targetname : str
475            The target filename for the grib data.
476        '''
477        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
478                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
479
480        return targetname
481
482
483    def _start_retrievement(self, request, par_dict):
484        '''Creates the Mars Retrieval and prints or submits the request
485        depending on the status of the request variable.
486
487        Parameters
488        ----------
489        request : int
490            Selects the mode of retrieval.
491            0: Retrieves the data from ECMWF.
492            1: Prints the mars requests to an output file.
493            2: Retrieves the data and prints the mars request.
494
495        par_dict : dictionary
496            Contains all parameter which have to be set for creating the
497            Mars Retrievals. The parameter are:
498            marsclass, dataset, stream, type, levtype, levelist, resol,
499            gaussian, accuracy, grid, target, area, date, time, number,
500            step, expver, param
501
502        Return
503        ------
504
505        '''
506        # increase number of mars requests
507        self.mreq_count += 1
508
509        MR = MarsRetrieval(self.server,
510                           self.public,
511                           marsclass=par_dict['marsclass'],
512                           dataset=par_dict['dataset'],
513                           stream=par_dict['stream'],
514                           type=par_dict['type'],
515                           levtype=par_dict['levtype'],
516                           levelist=par_dict['levelist'],
517                           resol=par_dict['resol'],
518                           gaussian=par_dict['gaussian'],
519                           accuracy=par_dict['accuracy'],
520                           grid=par_dict['grid'],
521                           target=par_dict['target'],
522                           area=par_dict['area'],
523                           date=par_dict['date'],
524                           time=par_dict['time'],
525                           number=par_dict['number'],
526                           step=par_dict['step'],
527                           expver=par_dict['expver'],
528                           param=par_dict['param'])
529
530        if request == 0:
531            MR.display_info()
532            MR.data_retrieve()
533        elif request == 1:
534            MR.print_infodata_csv(self.inputdir, self.mreq_count)
535        elif request == 2:
536            MR.print_infodata_csv(self.inputdir, self.mreq_count)
537            MR.display_info()
538            MR.data_retrieve()
539        else:
540            print('Failure')
541
542        return
543
544
545    def _mk_index_values(self, inputdir, inputfiles, keys):
546        '''Creates an index file for a set of grib parameter keys.
547        The values from the index keys are returned in a list.
548
549        Parameters
550        ----------
551        keys : dictionary
552            List of parameter names which serves as index.
553
554        inputfiles : UioFiles
555            Contains a list of files.
556
557        Return
558        ------
559        iid : codes_index
560            This is a grib specific index structure to access
561            messages in a file.
562
563        index_vals : list of list  of str
564            Contains the values from the keys used for a distinct selection
565            of grib messages in processing  the grib files.
566            Content looks like e.g.:
567            index_vals[0]: ('20171106', '20171107', '20171108') ; date
568            index_vals[1]: ('0', '1200', '1800', '600') ; time
569            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
570        '''
571        iid = None
572        index_keys = keys
573
574        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
575        silent_remove(indexfile)
576        grib = GribUtil(inputfiles.files)
577        # creates new index file
578        iid = grib.index(index_keys=index_keys, index_file=indexfile)
579
580        # read the values of index keys
581        index_vals = []
582        for key in index_keys:
583            key_vals = codes_index_get(iid, key)
584            # have to sort the key values for correct disaggregation,
585            # therefore convert to int first
586            key_vals = [int(k) for k in key_vals]
587            key_vals.sort()
588            key_vals = [str(k) for k in key_vals]
589            index_vals.append(key_vals)
590            # index_vals looks for example like:
591            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
592            # index_vals[1]: ('0', '1200') ; time
593            # index_vals[2]: (3', '6', '9', '12') ; stepRange
594
595        return iid, index_vals
596
597
598    def retrieve(self, server, dates, public, request, inputdir='.'):
599        '''Finalizing the retrieval information by setting final details
600        depending on grid type.
601        Prepares MARS retrievals per grid type and submits them.
602
603        Parameters
604        ----------
605        server : ECMWFService or ECMWFDataServer
606            The connection to the ECMWF server. This is different
607            for member state users which have full access and non
608            member state users which have only access to the public
609            data sets. The decision is made from command line argument
610            "public"; for public access its True (ECMWFDataServer)
611            for member state users its False (ECMWFService)
612
613        dates : str
614            Contains start and end date of the retrieval in the format
615            "YYYYMMDD/to/YYYYMMDD"
616
617        request : int
618            Selects the mode of retrieval.
619            0: Retrieves the data from ECMWF.
620            1: Prints the mars requests to an output file.
621            2: Retrieves the data and prints the mars request.
622
623        inputdir : str, optional
624            Path to the directory where the retrieved data is about
625            to be stored. The default is the current directory ('.').
626
627        Return
628        ------
629
630        '''
631        self.dates = dates
632        self.server = server
633        self.public = public
634        self.inputdir = inputdir
635        oro = False
636
637        # define times with datetime module
638        t12h = timedelta(hours=12)
639        t24h = timedelta(hours=24)
640
641        # dictionary which contains all parameter for the mars request,
642        # entries with a "None" will change in different requests and will
643        # therefore be set in each request seperately
644        retr_param_dict = {'marsclass':self.marsclass,
645                           'dataset':self.dataset,
646                           'stream':None,
647                           'type':None,
648                           'levtype':None,
649                           'levelist':None,
650                           'resol':self.resol,
651                           'gaussian':None,
652                           'accuracy':self.accuracy,
653                           'grid':None,
654                           'target':None,
655                           'area':None,
656                           'date':None,
657                           'time':None,
658                           'number':self.number,
659                           'step':None,
660                           'expver':self.expver,
661                           'param':None}
662
663        for ftype in self.types:
664            # fk contains field types such as
665            #     [AN, FC, PF, CV]
666            # fv contains all of the items of the belonging key
667            #     [times, steps]
668            for pk, pv in self.params.iteritems():
669                # pk contains one of these keys of params
670                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
671                #      OG_OROLSM_SL, OG_acc_SL]
672                # pv contains all of the items of the belonging key
673                #     [param, levtype, levelist, grid]
674                if isinstance(pv, str):
675                    continue
676                retr_param_dict['type'] = ftype
677                retr_param_dict['time'] = self.types[ftype]['times']
678                retr_param_dict['step'] = self.types[ftype]['steps']
679                retr_param_dict['date'] = self.dates
680                retr_param_dict['stream'] = self.stream
681                retr_param_dict['target'] = \
682                    self._mk_targetname(ftype,
683                                        pk,
684                                        retr_param_dict['date'].split('/')[0])
685                retr_param_dict['param'] = pv[0]
686                retr_param_dict['levtype'] = pv[1]
687                retr_param_dict['levelist'] = pv[2]
688                retr_param_dict['grid'] = pv[3]
689                retr_param_dict['area'] = self.area
690                retr_param_dict['gaussian'] = self.gaussian
691
692                if pk == 'OG_OROLSM__SL' and not oro:
693                    oro = True
694                    # in CERA20C (class EP) there is no stream "OPER"!
695                    if self.marsclass.upper() != 'EP':
696                        retr_param_dict['stream'] = 'OPER'
697                    retr_param_dict['type'] = 'AN'
698                    retr_param_dict['time'] = '00'
699                    retr_param_dict['step'] = '000'
700                    retr_param_dict['date'] = self.dates.split('/')[0]
701                    retr_param_dict['target'] = self._mk_targetname('',
702                                            pk, retr_param_dict['date'])
703                elif pk == 'OG_OROLSM__SL' and oro:
704                    continue
705                if pk == 'GG__SL' and pv[0] == 'Q':
706                    retr_param_dict['area'] = ""
707                    retr_param_dict['gaussian'] = 'reduced'
708
709    # ------  on demand path  --------------------------------------------------
710                if not self.basetime:
711                    # ******* start retrievement
712                    self._start_retrievement(request, retr_param_dict)
713    # ------  operational path  ------------------------------------------------
714                else:
715                    # check if mars job requests fields beyond basetime.
716                    # if yes eliminate those fields since they may not
717                    # be accessible with user's credentials
718
719                    enddate = retr_param_dict['date'].split('/')[-1]
720                    elimit = datetime.strptime(enddate + self.basetime,
721                                               '%Y%m%d%H')
722
723                    if self.basetime == '12':
724                        # --------------  flux data ----------------------------
725                        if 'acc' in pk:
726
727                        # Strategy:
728                        # if maxtime-elimit >= 24h reduce date by 1,
729                        # if 12h <= maxtime-elimit<12h reduce time for last date
730                        # if maxtime-elimit<12h reduce step for last time
731                        # A split of the MARS job into 2 is likely necessary.
732
733
734                            startdate = retr_param_dict['date'].split('/')[0]
735                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
736                            retr_param_dict['date'] = '/'.join([startdate,
737                                                                'to',
738                                                                enddate])
739
740                            # ******* start retrievement
741                            self._start_retrievement(request, retr_param_dict)
742
743                            retr_param_dict['date'] = \
744                                datetime.strftime(elimit - t12h, '%Y%m%d')
745                            retr_param_dict['time'] = '00'
746                            retr_param_dict['target'] = \
747                                self._mk_targetname(ftype, pk,
748                                                    retr_param_dict['date'])
749
750                            # ******* start retrievement
751                            self._start_retrievement(request, retr_param_dict)
752
753                        # --------------  non flux data ------------------------
754                        else:
755                            # ******* start retrievement
756                            self._start_retrievement(request, retr_param_dict)
757
758                    else: # basetime = 0
759                        retr_param_dict['date'] = \
760                            datetime.strftime(elimit - t24h, '%Y%m%d')
761
762                        timesave = ''.join(retr_param_dict['time'])
763
764                        if '/' in retr_param_dict['time']:
765                            times = retr_param_dict['time'].split('/')
766                            steps = retr_param_dict['step'].split('/')
767                            while (pk != 'OG_OROLSM__SL' and
768                                   'acc' not in pk and
769                                   (int(times[0]) + int(steps[0])) <= 12):
770                                times = times[1:]
771
772                            if len(times) > 1:
773                                retr_param_dict['time'] = '/'.join(times)
774                            else:
775                                retr_param_dict['time'] = times[0]
776
777                        # ******* start retrievement
778                        self._start_retrievement(request, retr_param_dict)
779
780                        if (pk != 'OG_OROLSM__SL' and
781                            int(retr_param_dict['step'].split('/')[0]) == 0 and
782                            int(timesave.split('/')[0]) == 0):
783
784                            retr_param_dict['date'] = \
785                                datetime.strftime(elimit, '%Y%m%d')
786                            retr_param_dict['time'] = '00'
787                            retr_param_dict['step'] = '000'
788                            retr_param_dict['target'] = \
789                                self._mk_targetname(ftype, pk,
790                                                    retr_param_dict['date'])
791
792                            # ******* start retrievement
793                            self._start_retrievement(request, retr_param_dict)
794
795        if request == 0 or request == 2:
796            print('MARS retrieve done ... ')
797        elif request == 1:
798            print('MARS request printed ...')
799
800        return
801
802
803    def write_namelist(self, c):
804        '''Creates a namelist file in the temporary directory and writes
805        the following values to it: maxl, maxb, mlevel,
806        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
807        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
808
809        Parameters
810        ----------
811        c : ControlFile
812            Contains all the parameters of CONTROL file and
813            command line.
814
815        filename : str
816                Name of the namelist file.
817
818        Return
819        ------
820
821        '''
822
823        from genshi.template.text import NewTextTemplate
824        from genshi.template import  TemplateLoader
825        from genshi.template.eval import  UndefinedError
826
827        try:
828            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
829            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
830                                            cls=NewTextTemplate)
831
832            self.inputdir = c.inputdir
833            area = np.asarray(self.area.split('/')).astype(float)
834            grid = np.asarray(self.grid.split('/')).astype(float)
835
836            if area[1] > area[3]:
837                area[1] -= 360
838            maxl = int((area[3] - area[1]) / grid[1]) + 1
839            maxb = int((area[0] - area[2]) / grid[0]) + 1
840
841            stream = namelist_template.generate(
842                maxl = str(maxl),
843                maxb = str(maxb),
844                mlevel = str(self.level),
845                mlevelist = str(self.levelist),
846                mnauf = str(self.resol),
847                metapar = '77',
848                rlo0 = str(area[1]),
849                rlo1 = str(area[3]),
850                rla0 = str(area[2]),
851                rla1 = str(area[0]),
852                momega = str(c.omega),
853                momegadiff = str(c.omegadiff),
854                mgauss = str(c.gauss),
855                msmooth = str(c.smooth),
856                meta = str(c.eta),
857                metadiff = str(c.etadiff),
858                mdpdeta = str(c.dpdeta)
859            )
860        except UndefinedError as e:
861            print('... ERROR ' + str(e))
862
863            sys.exit('\n... error occured while trying to generate namelist ' +
864                     _config.TEMPFILE_NAMELIST)
865        except OSError as e:
866            print('... ERROR CODE: ' + str(e.errno))
867            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
868
869            sys.exit('\n... error occured while trying to generate template ' +
870                     _config.TEMPFILE_NAMELIST)
871
872        try:
873            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
874
875            with open(namelistfile, 'w') as f:
876                f.write(stream.render('text'))
877        except OSError as e:
878            print('... ERROR CODE: ' + str(e.errno))
879            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
880
881            sys.exit('\n... error occured while trying to write ' +
882                     namelistfile)
883
884        return
885
886
887    def deacc_fluxes(self, inputfiles, c):
888        '''De-accumulate and disaggregate flux data.
889
890        Goes through all flux fields in ordered time and de-accumulate
891        the fields. Afterwards the fields are disaggregated in time.
892        Different versions of disaggregation is provided for rainfall
893        data (darain, modified linear) and the surface fluxes and
894        stress data (dapoly, cubic polynomial).
895
896        Parameters
897        ----------
898        inputfiles : UioFiles
899            Contains the list of files that contain flux data.
900
901        c : ControlFile
902            Contains all the parameters of CONTROL file and
903            command line.
904
905        Return
906        ------
907
908        '''
909
910        table128 = init128(_config.PATH_GRIBTABLE)
911        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
912
913        iid = None
914        index_vals = None
915
916        # get the values of the keys which are used for distinct access
917        # of grib messages via product
918        index_keys = ["date", "time", "step"]
919        iid, index_vals = self._mk_index_values(c.inputdir,
920                                                inputfiles,
921                                                index_keys)
922        # index_vals looks like e.g.:
923        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
924        # index_vals[1]: ('0', '600', '1200', '1800') ; time
925        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
926
927        if c.rrint:
928            if not c.purefc:
929                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
930                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
931            else:
932                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
933                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
934                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
935                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
936                end_date = end_date + timedelta(hours=c.maxstep)
937
938            info = get_informations(os.path.join(c.inputdir,
939                                                 inputfiles.files[0]))
940            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
941                                  start_date, end_date)
942            # create numpy array
943            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
944            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
945            it_lsp = 0
946            it_cp = 0
947            date_list = []
948            step_list = []
949
950        # initialize dictionaries to store values
951        orig_vals = {}
952        deac_vals = {}
953        for p in pars:
954            orig_vals[p] = []
955            deac_vals[p] = []
956
957        # "product" genereates each possible combination between the
958        # values of the index keys
959        for prod in product(*index_vals):
960            # e.g. prod = ('20170505', '0', '12')
961            #             (  date    ,time, step)
962
963            print('CURRENT PRODUCT: ', prod)
964
965            for i in range(len(index_keys)):
966                codes_index_select(iid, index_keys[i], prod[i])
967
968            # get first id from current product
969            gid = codes_new_from_index(iid)
970
971            # if there is no data for this specific time combination / product
972            # skip the rest of the for loop and start with next timestep/product
973            if not gid:
974                continue
975
976            # create correct timestamp from the three time informations
977            cdate = str(codes_get(gid, 'date'))
978            time = codes_get(gid, 'time')/100 # integer
979            step = codes_get(gid, 'step') # integer
980            ctime = '{:0>2}'.format(time)
981            cstep = '{:0>3}'.format(step)
982
983            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
984            t_dt = t_date + timedelta(hours=step)
985            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
986            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
987            t_enddate = None
988
989            if c.purefc:
990                fnout = os.path.join(c.inputdir, 'flux' +
991                                     t_date.strftime('%Y%m%d.%H') +
992                                     '.{:0>3}'.format(step-2*int(c.dtime)))
993                gnout = os.path.join(c.inputdir, 'flux' +
994                                     t_date.strftime('%Y%m%d.%H') +
995                                     '.{:0>3}'.format(step-int(c.dtime)))
996                hnout = os.path.join(c.inputdir, 'flux' +
997                                     t_date.strftime('%Y%m%d.%H') +
998                                     '.{:0>3}'.format(step))
999            else:
1000                fnout = os.path.join(c.inputdir, 'flux' +
1001                                     t_m2dt.strftime('%Y%m%d%H'))
1002                gnout = os.path.join(c.inputdir, 'flux' +
1003                                     t_m1dt.strftime('%Y%m%d%H'))
1004                hnout = os.path.join(c.inputdir, 'flux' +
1005                                     t_dt.strftime('%Y%m%d%H'))
1006
1007            print("outputfile = " + fnout)
1008            f_handle = open(fnout, 'w')
1009            h_handle = open(hnout, 'w')
1010            g_handle = open(gnout, 'w')
1011
1012            # read message for message and store relevant data fields, where
1013            # data keywords are stored in pars
1014            while True:
1015                if not gid:
1016                    break
1017                parId = codes_get(gid, 'paramId') # integer
1018                step = codes_get(gid, 'step') # integer
1019                time = codes_get(gid, 'time') # integer
1020                ni = codes_get(gid, 'Ni') # integer
1021                nj = codes_get(gid, 'Nj') # integer
1022                if parId not in orig_vals.keys():
1023                    # parameter is not a flux, find next one
1024                    continue
1025
1026                # define conversion factor
1027                if parId == 142 or parId == 143:
1028                    fak = 1. / 1000.
1029                else:
1030                    fak = 3600.
1031
1032                # get parameter values and reshape
1033                values = codes_get_values(gid)
1034                values = (np.reshape(values, (nj, ni))).flatten() / fak
1035
1036                # save the original and accumulated values
1037                orig_vals[parId].append(values[:])
1038
1039                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1040                    # no de-accumulation needed
1041                    deac_vals[parId].append(values[:] / int(c.dtime))
1042                else:
1043                    # do de-accumulation
1044                    deac_vals[parId].append(
1045                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1046                         int(c.dtime))
1047
1048                # store precipitation if new disaggregation method is selected
1049                # only the exact days are needed
1050                if c.rrint:
1051                    if start_date <= t_dt <= end_date:
1052                        if not c.purefc:
1053                            if t_dt not in date_list:
1054                                date_list.append(t_dt)
1055                                step_list = [0]
1056                        else:
1057                            if t_date not in date_list:
1058                                date_list.append(t_date)
1059                            if step not in step_list:
1060                                step_list.append(step)
1061                        if c.rrint and parId == 142:
1062                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
1063                            it_lsp += 1
1064                        elif c.rrint and parId == 143:
1065                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
1066                            it_cp += 1
1067
1068                # information printout
1069                print(parId, time, step, len(values), values[0], np.std(values))
1070
1071                # length of deac_vals[parId] corresponds to the
1072                # number of time steps, max. 4 are needed for disaggegration
1073                # with the old and original method
1074                # run over all grib messages and perform
1075                # shifting in time
1076                if len(deac_vals[parId]) >= 3:
1077                    if len(deac_vals[parId]) > 3:
1078                        if not c.rrint and (parId == 142 or parId == 143):
1079                            values = disaggregation.darain(deac_vals[parId])
1080                        else:
1081                            values = disaggregation.dapoly(deac_vals[parId])
1082
1083                        if not (step == c.maxstep and c.purefc \
1084                                or t_dt == t_enddate):
1085                            # remove first time step in list to shift
1086                            # time line
1087                            orig_vals[parId].pop(0)
1088                            deac_vals[parId].pop(0)
1089                    else:
1090                        # if the third time step is read (per parId),
1091                        # write out the first one as a boundary value
1092                        if c.purefc:
1093                            values = deac_vals[parId][1]
1094                        else:
1095                            values = deac_vals[parId][0]
1096
1097                    if not (c.rrint and (parId == 142 or parId == 143)):
1098                        codes_set_values(gid, values)
1099
1100                        if c.purefc:
1101                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1102                        else:
1103                            codes_set(gid, 'stepRange', 0)
1104                            codes_set(gid, 'time', t_m2dt.hour*100)
1105                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1106
1107                        codes_write(gid, f_handle)
1108
1109                        if c.basetime:
1110                            t_enddate = datetime.strptime(c.end_date + c.basetime,
1111                                                          '%Y%m%d%H')
1112                        else:
1113                            t_enddate = t_date + timedelta(2*int(c.dtime))
1114
1115                            # squeeze out information of last two steps
1116                            # contained in deac_vals[parId]
1117                            # Note that deac_vals[parId][0] has not been popped
1118                            # in this case
1119
1120                            if step == c.maxstep and c.purefc or \
1121                               t_dt == t_enddate:
1122                                # last step
1123                                if c.purefc:
1124                                    values = deac_vals[parId][3]
1125                                    codes_set_values(gid, values)
1126                                    codes_set(gid, 'stepRange', step)
1127                                    #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1128                                    codes_write(gid, h_handle)
1129                                else:
1130                                    values = deac_vals[parId][3]
1131                                    codes_set_values(gid, values)
1132                                    codes_set(gid, 'stepRange', 0)
1133                                    truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1134                                    codes_set(gid, 'time', truedatetime.hour * 100)
1135                                    codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1136                                    codes_write(gid, h_handle)
1137
1138                                if parId == 142 or parId == 143:
1139                                    values = disaggregation.darain(list(reversed(deac_vals[parId])))
1140                                else:
1141                                    values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1142
1143                                # step before last step
1144                                if c.purefc:
1145                                    codes_set(gid, 'stepRange', step-int(c.dtime))
1146                                    #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1147                                    codes_set_values(gid, values)
1148                                    codes_write(gid, g_handle)
1149                                else:
1150                                    codes_set(gid, 'stepRange', 0)
1151                                    truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1152                                    codes_set(gid, 'time', truedatetime.hour * 100)
1153                                    codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1154                                    codes_set_values(gid, values)
1155                                    codes_write(gid, g_handle)
1156
1157                codes_release(gid)
1158
1159                gid = codes_new_from_index(iid)
1160
1161            f_handle.close()
1162            g_handle.close()
1163            h_handle.close()
1164
1165        codes_index_release(iid)
1166
1167        if c.rrint:
1168            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1169
1170            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1171                                 cp_np, date_list, step_list, c)
1172
1173        return
1174
1175    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1176        '''Calculates and writes out the disaggregated precipitation fields.
1177
1178        Disaggregation is done in time and original times are written to the
1179        flux files, while the additional subgrid times are written to
1180        extra files.
1181
1182        Parameters
1183        ----------
1184        ni : int
1185            Amount of zonal grid points.
1186
1187        nj : int
1188            Amount of meridional grid points.
1189
1190        nt : int
1191            Number of time steps.
1192
1193        lsp_np : numpy array of float
1194            The large scale precipitation fields for each time step.
1195            Shape (ni * nj, nt).
1196
1197        cp_np : numpy array of float
1198            The convective precipitation fields for each time step.
1199            Shape (ni * nj, nt).
1200
1201        date_list : list of datetime
1202            The list of dates for which the disaggregation is to be done.
1203
1204        step_list : list of int
1205            The list of steps for a single forecast time.
1206            Only necessary for pure forecasts.
1207
1208        c : ControlFile
1209            Contains all the parameters of CONTROL file and
1210            command line.
1211
1212        Return
1213        ------
1214
1215        '''
1216        print('... disaggregation or precipitation with new method.')
1217        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1218        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1219
1220        # do the disaggregation, but neglect the last value of the
1221        # time series. This one corresponds for example to 24 hour,
1222        # which we don't need.
1223        for ix in range(ni*nj):
1224            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1225            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1226
1227        # write to grib files (full/orig times to flux file and inbetween
1228        # times into seperate end files)
1229        print('... write disaggregated precipitation to files.')
1230        it = 0
1231        for date in date_list:
1232            for step in step_list:
1233                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1234
1235                if c.purefc:
1236                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1237                                   '.{:0>3}'.format(step)
1238                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1239                                '.{:0>3}'.format(step) + '_1'
1240                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1241                                '.{:0>3}'.format(step) + '_2'
1242                else:
1243                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1244                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1245                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1246
1247                # collect for final processing
1248                self.outputfilelist.append(os.path.basename(fluxfilename))
1249                self.outputfilelist.append(os.path.basename(filename1))
1250                self.outputfilelist.append(os.path.basename(filename2))
1251
1252                # write original time step to flux file as usual
1253                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1254                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1255                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1256                                  keynames=['date','time','stepRange','values'],
1257                                  keyvalues=[int(date.strftime('%Y%m%d')),
1258                                             date.hour*100, step, lsp_new_np[:,it]],
1259                                 )
1260                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1261                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1262                                  keynames=['date','time','stepRange','values'],
1263                                  keyvalues=[int(date.strftime('%Y%m%d')),
1264                                             date.hour*100, step, cp_new_np[:,it]]
1265                                 )
1266
1267                # write first subgrid time step
1268                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1269                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1270                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1271                                  keynames=['date','time','stepRange','values'],
1272                                  keyvalues=[int(date.strftime('%Y%m%d')),
1273                                             date.hour*100, step, lsp_new_np[:,it+1]]
1274                                  )
1275                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1276                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1277                                  keynames=['date','time','stepRange','values'],
1278                                  keyvalues=[int(date.strftime('%Y%m%d')),
1279                                             date.hour*100, step, cp_new_np[:,it+1]]
1280                                 )
1281
1282                # write second subgrid time step
1283                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1284                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1285                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1286                                  keynames=['date','time','stepRange','values'],
1287                                  keyvalues=[int(date.strftime('%Y%m%d')),
1288                                             date.hour*100, step, lsp_new_np[:,it+2]]
1289                                 )
1290                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1291                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1292                                  keynames=['date','time','stepRange','values'],
1293                                  keyvalues=[int(date.strftime('%Y%m%d')),
1294                                             date.hour*100, step, cp_new_np[:,it+2]]
1295                                 )
1296                it = it + 3 # jump to next original time step
1297        return
1298
1299    def _create_rr_grib_dummy(self, ifile, inputdir):
1300        '''Creates a grib file with a dummy message for the two precipitation
1301        types lsp and cp each.
1302
1303        Parameters
1304        ----------
1305        ifile : str
1306            Filename of the input file to read the grib messages from.
1307
1308        inputdir : str, optional
1309            Path to the directory where the retrieved data is stored.
1310
1311        Return
1312        ------
1313
1314        '''
1315
1316        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1317
1318        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1319                      keyvalues=[142], filemode='w')
1320
1321        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1322                      keyvalues=[143], filemode='a')
1323
1324        return
1325
1326    def create(self, inputfiles, c):
1327        '''An index file will be created which depends on the combination
1328        of "date", "time" and "stepRange" values. This is used to iterate
1329        over all messages in each grib file which were passed through the
1330        parameter "inputfiles" to seperate specific parameters into fort.*
1331        files. Afterwards the FORTRAN program is called to convert
1332        the data fields all to the same grid and put them in one file
1333        per unique time step (combination of "date", "time" and
1334        "stepRange").
1335
1336        Note
1337        ----
1338        This method is based on the ECMWF example index.py
1339        https://software.ecmwf.int/wiki/display/GRIB/index.py
1340
1341        Parameters
1342        ----------
1343        inputfiles : UioFiles
1344            Contains a list of files.
1345
1346        c : ControlFile
1347            Contains all the parameters of CONTROL file and
1348            command line.
1349
1350        Return
1351        ------
1352
1353        '''
1354
1355        if c.wrf:
1356            table128 = init128(_config.PATH_GRIBTABLE)
1357            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1358                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1359                                  table128)
1360
1361        # these numbers are indices for the temporary files "fort.xx"
1362        # which are used to seperate the grib fields to,
1363        # for the Fortran program input
1364        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1365        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1366        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1367                 '17':None, '18':None, '19':None, '21':None, '22':None}
1368
1369        iid = None
1370        index_vals = None
1371
1372        # get the values of the keys which are used for distinct access
1373        # of grib messages via product
1374        index_keys = ["date", "time", "step"]
1375        iid, index_vals = self._mk_index_values(c.inputdir,
1376                                                inputfiles,
1377                                                index_keys)
1378        # index_vals looks like e.g.:
1379        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1380        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1381        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1382
1383        # "product" genereates each possible combination between the
1384        # values of the index keys
1385        for prod in product(*index_vals):
1386            # e.g. prod = ('20170505', '0', '12')
1387            #             (  date    ,time, step)
1388
1389            print('current product: ', prod)
1390
1391            for i in range(len(index_keys)):
1392                codes_index_select(iid, index_keys[i], prod[i])
1393
1394            # get first id from current product
1395            gid = codes_new_from_index(iid)
1396
1397            # if there is no data for this specific time combination / product
1398            # skip the rest of the for loop and start with next timestep/product
1399            if not gid:
1400                continue
1401#============================================================================================
1402            # remove old fort.* files and open new ones
1403            # they are just valid for a single product
1404            for k, f in fdict.iteritems():
1405                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1406                silent_remove(fortfile)
1407                fdict[k] = open(fortfile, 'w')
1408#============================================================================================
1409            # create correct timestamp from the three time informations
1410            cdate = str(codes_get(gid, 'date'))
1411            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1412            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1413            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1414            timestamp += timedelta(hours=int(cstep))
1415            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1416
1417            # if the timestamp is out of basetime start/end date period,
1418            # skip this specific product
1419            if c.basetime:
1420                start_time = datetime.strptime(c.end_date + c.basetime,
1421                                                '%Y%m%d%H') - time_delta
1422                end_time = datetime.strptime(c.end_date + c.basetime,
1423                                             '%Y%m%d%H')
1424                if timestamp < start_time or timestamp > end_time:
1425                    continue
1426
1427            if c.wrf:
1428                if 'olddate' not in locals() or cdate != olddate:
1429                    fwrf = open(os.path.join(c.outputdir,
1430                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1431                    olddate = cdate[:]
1432#============================================================================================
1433            # savedfields remembers which fields were already used.
1434            savedfields = []
1435            # sum of cloud liquid and ice water content
1436            scwc = None
1437            while 1:
1438                if not gid:
1439                    break
1440                paramId = codes_get(gid, 'paramId')
1441                gridtype = codes_get(gid, 'gridType')
1442                levtype = codes_get(gid, 'typeOfLevel')
1443                if paramId == 77: # ETADOT
1444                    codes_write(gid, fdict['21'])
1445                elif paramId == 130: # T
1446                    codes_write(gid, fdict['11'])
1447                elif paramId == 131 or paramId == 132: # U, V wind component
1448                    codes_write(gid, fdict['10'])
1449                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1450                    codes_write(gid, fdict['17'])
1451                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1452                    codes_write(gid, fdict['18'])
1453                elif paramId == 135: # W
1454                    codes_write(gid, fdict['19'])
1455                elif paramId == 152: # LNSP
1456                    codes_write(gid, fdict['12'])
1457                elif paramId == 155 and gridtype == 'sh': # D
1458                    codes_write(gid, fdict['13'])
1459                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1460                    # sum cloud liquid water and ice
1461                    if scwc is None:
1462                        scwc = codes_get_values(gid)
1463                    else:
1464                        scwc += codes_get_values(gid)
1465                        codes_set_values(gid, scwc)
1466                        codes_set(gid, 'paramId', 201031)
1467                        codes_write(gid, fdict['22'])
1468                        scwc = None
1469                elif c.wrf and paramId in [129, 138, 155] and \
1470                      levtype == 'hybrid': # Z, VO, D
1471                    # do not do anything right now
1472                    # these are specific parameter for WRF
1473                    pass
1474                else:
1475                    if paramId not in savedfields:
1476                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1477                        # and all ADDPAR parameter
1478                        codes_write(gid, fdict['16'])
1479                        savedfields.append(paramId)
1480                    else:
1481                        print('duplicate ' + str(paramId) + ' not written')
1482
1483                try:
1484                    if c.wrf:
1485                        # model layer
1486                        if levtype == 'hybrid' and \
1487                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1488                            codes_write(gid, fwrf)
1489                        # sfc layer
1490                        elif paramId in wrfpars:
1491                            codes_write(gid, fwrf)
1492                except AttributeError:
1493                    pass
1494
1495                codes_release(gid)
1496                gid = codes_new_from_index(iid)
1497#============================================================================================
1498            for f in fdict.values():
1499                f.close()
1500#============================================================================================
1501            # call for Fortran program to convert e.g. reduced_gg grids to
1502            # regular_ll and calculate detadot/dp
1503            pwd = os.getcwd()
1504            os.chdir(c.inputdir)
1505            if os.stat('fort.21').st_size == 0 and c.eta:
1506                print('Parameter 77 (etadot) is missing, most likely it is \
1507                       not available for this type or date/time\n')
1508                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1509                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1510                         is set to 1 in CONTROL file')
1511#============================================================================================
1512            # write out all output to log file before starting fortran programm
1513            sys.stdout.flush()
1514
1515            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1516            execute_subprocess([os.path.join(c.exedir,
1517                                _config.FORTRAN_EXECUTABLE)],
1518                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1519
1520            os.chdir(pwd)
1521#============================================================================================
1522            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1523            if c.purefc:
1524                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1525            else:
1526                suffix = cdate_hour[2:10]
1527            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1528            print("outputfile = " + fnout)
1529            # collect for final processing
1530            self.outputfilelist.append(os.path.basename(fnout))
1531#============================================================================================
1532            # create outputfile and copy all data from intermediate files
1533            # to the outputfile (final GRIB input files for FLEXPART)
1534            orolsm = os.path.basename(glob.glob(c.inputdir +
1535                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1536            fluxfile = 'flux' + cdate[0:2] + suffix
1537            if not c.cwc:
1538                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1539            else:
1540                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1541
1542            with open(fnout, 'wb') as fout:
1543                for f in flist:
1544                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1545                                       fout)
1546
1547            if c.omega:
1548                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1549                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1550                                            'rb'), fout)
1551#============================================================================================
1552        if c.wrf:
1553            fwrf.close()
1554
1555        codes_index_release(iid)
1556
1557        return
1558
1559
1560    def process_output(self, c):
1561        '''Postprocessing of FLEXPART input files.
1562
1563        The grib files are postprocessed depending on the selection in
1564        CONTROL file. The resulting files are moved to the output
1565        directory if its not equal to the input directory.
1566        The following modifications might be done if
1567        properly switched in CONTROL file:
1568        GRIB2 - Conversion to GRIB2
1569        ECTRANS - Transfer of files to gateway server
1570        ECSTORAGE - Storage at ECMWF server
1571
1572        Parameters
1573        ----------
1574        c : ControlFile
1575            Contains all the parameters of CONTROL file and
1576            command line.
1577
1578        Return
1579        ------
1580
1581        '''
1582
1583        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1584
1585        if not c.ecapi:
1586            print('ecstorage: {}\n ecfsdir: {}\n'.
1587                  format(c.ecstorage, c.ecfsdir))
1588            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1589                  .format(c.ectrans, c.gateway, c.destination))
1590
1591        print('Output filelist: ')
1592        print(self.outputfilelist)
1593
1594        for ofile in self.outputfilelist:
1595            ofile = os.path.join(self.inputdir, ofile)
1596
1597            if c.format.lower() == 'grib2':
1598                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1599                                    'productDefinitionTemplateNumber=8',
1600                                    ofile, ofile + '_2'],
1601                                   error_msg='GRIB2 CONVERSION FAILED!')
1602
1603                execute_subprocess(['mv', ofile + '_2', ofile],
1604                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1605                                   'FILES FAILED!')
1606
1607            if c.ectrans and not c.ecapi:
1608                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1609                                    c.gateway, '-remote', c.destination,
1610                                    '-source', ofile],
1611                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1612
1613            if c.ecstorage and not c.ecapi:
1614                execute_subprocess(['ecp', '-o', ofile,
1615                                    os.path.expandvars(c.ecfsdir)],
1616                                   error_msg='COPY OF FILES TO ECSTORAGE '
1617                                   'AREA FAILED!')
1618
1619            if c.outputdir != c.inputdir:
1620                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1621                                    c.outputdir],
1622                                   error_msg='RELOCATION OF OUTPUT FILES '
1623                                   'TO OUTPUTDIR FAILED!')
1624
1625        return
1626
1627
1628    def prepare_fp_files(self, c):
1629        '''Conversion of GRIB files to FLEXPART binary format.
1630
1631        Parameters
1632        ----------
1633        c : ControlFile
1634            Contains all the parameters of CONTROL file and
1635            command line.
1636
1637        Return
1638        ------
1639
1640        '''
1641        # generate AVAILABLE file
1642        # Example of AVAILABLE file data:
1643        # 20131107 000000      EN13110700              ON DISC
1644        clist = []
1645        for ofile in self.outputfilelist:
1646            fname = ofile.split('/')
1647            if '.' in fname[-1]:
1648                l = fname[-1].split('.')
1649                timestamp = datetime.strptime(l[0][-6:] + l[1],
1650                                              '%y%m%d%H')
1651                timestamp += timedelta(hours=int(l[2]))
1652                cdate = datetime.strftime(timestamp, '%Y%m%d')
1653                chms = datetime.strftime(timestamp, '%H%M%S')
1654            else:
1655                cdate = '20' + fname[-1][-8:-2]
1656                chms = fname[-1][-2:] + '0000'
1657            clist.append(cdate + ' ' + chms + ' '*6 +
1658                         fname[-1] + ' '*14 + 'ON DISC')
1659        clist.sort()
1660        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1661            f.write('\n'.join(clist) + '\n')
1662
1663        # generate pathnames file
1664        pwd = os.path.abspath(c.outputdir)
1665        with open(pwd + '/pathnames', 'w') as f:
1666            f.write(pwd + '/Options/\n')
1667            f.write(pwd + '/\n')
1668            f.write(pwd + '/\n')
1669            f.write(pwd + '/AVAILABLE\n')
1670            f.write(' = == = == = == = == = == ==  = \n')
1671
1672        # create Options dir if necessary
1673        if not os.path.exists(pwd + '/Options'):
1674            make_dir(pwd+'/Options')
1675
1676        # read template COMMAND file
1677        with open(os.path.expandvars(os.path.expanduser(
1678            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1679            lflist = f.read().split('\n')
1680
1681        # find index of list where to put in the
1682        # date and time information
1683        # usually after the LDIRECT parameter
1684        i = 0
1685        for l in lflist:
1686            if 'LDIRECT' in l.upper():
1687                break
1688            i += 1
1689
1690        # insert the date and time information of run start and end
1691        # into the list of lines of COMMAND file
1692        lflist = lflist[:i+1] + \
1693                 [clist[0][:16], clist[-1][:16]] + \
1694                 lflist[i+3:]
1695
1696        # write the new COMMAND file
1697        with open(pwd + '/Options/COMMAND', 'w') as g:
1698            g.write('\n'.join(lflist) + '\n')
1699
1700        # change to outputdir and start the grib2flexpart run
1701        # afterwards switch back to the working dir
1702        os.chdir(c.outputdir)
1703        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1704         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1705        execute_subprocess(cmd)
1706        os.chdir(pwd)
1707
1708        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG