source: flex_extract.git/Source/Python/Classes/EcFlexpart.py @ 0f89116

ctbtodev
Last change on this file since 0f89116 was 0f89116, checked in by Anne Philipp <anne.philipp@…>, 4 years ago

diverse changes due to PEP8 style guide and eliminating grib2flexpart; removed unused parameter

  • Property mode set to 100644
File size: 77.5 KB
Line 
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#
27#    February 2018 - Anne Philipp (University of Vienna):
28#        - applied PEP8 style guide
29#        - added documentation
30#        - removed function getFlexpartTime in class EcFlexpart
31#        - outsourced class ControlFile
32#        - outsourced class MarsRetrieval
33#        - changed class name from EIFlexpart to EcFlexpart
34#        - applied minor code changes (style)
35#        - removed "dead code" , e.g. retrieval of Q since it is not needed
36#        - removed "times" parameter from retrieve-method since it is not used
37#        - seperated function "retrieve" into smaller functions (less code
38#          duplication, easier testing)
39#
40# @License:
41#    (C) Copyright 2014-2019.
42#    Anne Philipp, Leopold Haimberger
43#
44#    SPDX-License-Identifier: CC-BY-4.0
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55#pylint: disable=unsubscriptable-object
56# this error is a bug
57#pylint: disable=ungrouped-imports
58# not necessary that we group the imports
59# ------------------------------------------------------------------------------
60# MODULES
61# ------------------------------------------------------------------------------
62from __future__ import print_function
63
64import os
65import sys
66import glob
67import shutil
68from datetime import datetime, timedelta
69
70# software specific classes and modules from flex_extract
71#pylint: disable=wrong-import-position
72sys.path.append('../')
73import _config
74from Classes.GribUtil import GribUtil
75from Mods.tools import (init128, to_param_id, silent_remove, product,
76                        my_error, get_informations, get_dimensions,
77                        execute_subprocess, to_param_id_with_tablenumber,
78                        generate_retrieval_period_boundary)
79from Classes.MarsRetrieval import MarsRetrieval
80from Classes.UioFiles import UioFiles
81import Mods.disaggregation as disaggregation
82#pylint: enable=wrong-import-position
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes:# and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = list(range(len(ftime)))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        self.types[str(self.acctype)] = {'times': str(self.acctime),
324                                         'steps': '{}/to/{}/by/{}'.format(
325                                             self.dtime,
326                                             self.accmaxstep,
327                                             self.dtime)}
328        return
329
330    def _create_params(self, gauss, eta, omega, cwc, wrf):
331        '''Define the specific parameter settings for retrievment.
332
333        The different parameters need specific grid types and level types
334        for retrievement. We might get following combination of types
335        (depending on selection and availability):
336        (These are short cuts for the grib file names (leading sequence)
337        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
338        where:
339            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
340            ML = Model Level, SL = Surface Level
341
342        For each of this combination there is a list of parameter names,
343        the level type, the level list and the grid resolution.
344
345        There are different scenarios for data extraction from MARS:
346        1) Retrieval of etadot
347           eta=1, gauss=0, omega=0
348        2) Calculation of etadot from divergence
349           eta=0, gauss=1, omega=0
350        3) Calculation of etadot from omega (for makes sense for debugging)
351           eta=0, gauss=0, omega=1
352        4) Retrieval and Calculation of etadot (only for debugging)
353           eta=1, gauss=1, omega=0
354        5) Download also specific model and surface level data for FLEXPART-WRF
355
356        Parameters:
357        -----------
358        gauss : int
359            Gaussian grid is retrieved.
360
361        eta : int
362            Etadot parameter will be directly retrieved.
363
364        omega : int
365            The omega paramterwill be retrieved.
366
367        cwc : int
368            The cloud liquid and ice water content will be retrieved.
369
370        wrf : int
371            Additional model level and surface level data will be retrieved for
372            WRF/FLEXPART-WRF simulations.
373
374        Return
375        ------
376
377        '''
378        # SURFACE FIELDS
379        #-----------------------------------------------------------------------
380        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
381        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
382                                 'SFC', '1', self.grid]
383        if self.addpar:
384            self.params['OG__SL'][0] += self.addpar
385
386        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
387            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
388                                            'SFC', '1', self.grid]
389        else:
390            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
391                                            'SFC', '1', self.grid]
392
393        # MODEL LEVEL FIELDS
394        #-----------------------------------------------------------------------
395        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
396
397        if not gauss and eta:
398            self.params['OG__ML'][0] += '/U/V/ETADOT'
399        elif gauss and not eta:
400            self.params['GG__SL'] = ['Q', 'ML', '1',
401                                     '{}'.format((int(self.resol) + 1) // 2)]
402            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
403        elif not gauss and not eta:
404            self.params['OG__ML'][0] += '/U/V'
405        else:  # GAUSS and ETA
406            print('Warning: Collecting etadot and parameters for gaussian grid '
407                  'is a very costly parameter combination, '
408                  'use this combination only for debugging!')
409            self.params['GG__SL'] = ['Q', 'ML', '1',
410                                     '{}'.format((int(self.resol) + 1) // 2)]
411            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
412                                     '{}'.format((int(self.resol) + 1) // 2)]
413
414        if omega:
415            self.params['OG__ML'][0] += '/W'
416
417        if cwc:
418            self.params['OG__ML'][0] += '/CLWC/CIWC'
419
420        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
421        # ----------------------------------------------------------------------
422        if wrf:
423            # @WRF
424            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
425            #
426            # UNDER CONSTRUCTION !!!
427            #
428
429            print('WRF VERSION IS UNDER CONSTRUCTION!') # dummy argument
430
431            #self.params['OG__ML'][0] += '/Z/VO'
432            #if '/D' not in self.params['OG__ML'][0]:
433            #    self.params['OG__ML'][0] += '/D'
434
435            #wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
436            #           'SWVL1','SWVL2','SWVL3','SWVL4']
437            #for par in wrf_sfc:
438            #    if par not in self.params['OG__SL'][0]:
439            #        self.params['OG__SL'][0] += '/' + par
440
441        return
442
443
444    def _create_params_fluxes(self):
445        '''Define the parameter setting for flux data.
446
447        Flux data are accumulated fields in time and are stored on the
448        surface level. The leading short cut name for the grib files is:
449        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
450        acc for Accumulated Grid.
451        The params dictionary stores a list of parameter names, the level type,
452        the level list and the grid resolution.
453
454        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
455
456        Parameters:
457        -----------
458
459        Return
460        ------
461
462        '''
463        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
464                                    'SFC', '1', self.grid]
465        return
466
467
468    def _mk_targetname(self, ftype, param, date):
469        '''Creates the filename for the requested grib data to be stored in.
470        This name is passed as the "target" parameter in the request.
471
472        Parameters
473        ----------
474        ftype : str
475            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
476
477        param : str
478            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
479            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
480
481        date : str
482            The date period of the grib data to be stored in this file.
483
484        Return
485        ------
486        targetname : str
487            The target filename for the grib data.
488        '''
489        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
490                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
491
492        return targetname
493
494
495    def _start_retrievement(self, request, par_dict):
496        '''Creates the Mars Retrieval and prints or submits the request
497        depending on the status of the request variable.
498
499        Parameters
500        ----------
501        request : int
502            Selects the mode of retrieval.
503            0: Retrieves the data from ECMWF.
504            1: Prints the mars requests to an output file.
505            2: Retrieves the data and prints the mars request.
506
507        par_dict : dictionary
508            Contains all parameter which have to be set for creating the
509            Mars Retrievals. The parameter are:
510            marsclass, dataset, stream, type, levtype, levelist, resol,
511            gaussian, accuracy, grid, target, area, date, time, number,
512            step, expver, param
513
514        Return
515        ------
516
517        '''
518        # increase number of mars requests
519        self.mreq_count += 1
520
521        MR = MarsRetrieval(self.server,
522                           self.public,
523                           marsclass=par_dict['marsclass'],
524                           dataset=par_dict['dataset'],
525                           stream=par_dict['stream'],
526                           type=par_dict['type'],
527                           levtype=par_dict['levtype'],
528                           levelist=par_dict['levelist'],
529                           resol=par_dict['resol'],
530                           gaussian=par_dict['gaussian'],
531                           accuracy=par_dict['accuracy'],
532                           grid=par_dict['grid'],
533                           target=par_dict['target'],
534                           area=par_dict['area'],
535                           date=par_dict['date'],
536                           time=par_dict['time'],
537                           number=par_dict['number'],
538                           step=par_dict['step'],
539                           expver=par_dict['expver'],
540                           param=par_dict['param'])
541
542        if request == 0:
543            MR.display_info()
544            MR.data_retrieve()
545        elif request == 1:
546            MR.print_infodata_csv(self.inputdir, self.mreq_count)
547        elif request == 2:
548            MR.print_infodata_csv(self.inputdir, self.mreq_count)
549            MR.display_info()
550            MR.data_retrieve()
551        else:
552            print('Failure')
553
554        return
555
556
557    def _mk_index_values(self, inputdir, inputfiles, keys):
558        '''Creates an index file for a set of grib parameter keys.
559        The values from the index keys are returned in a list.
560
561        Parameters
562        ----------
563        keys : dictionary
564            List of parameter names which serves as index.
565
566        inputfiles : UioFiles
567            Contains a list of files.
568
569        Return
570        ------
571        iid : codes_index
572            This is a grib specific index structure to access
573            messages in a file.
574
575        index_vals : list of list  of str
576            Contains the values from the keys used for a distinct selection
577            of grib messages in processing  the grib files.
578            Content looks like e.g.:
579            index_vals[0]: ('20171106', '20171107', '20171108') ; date
580            index_vals[1]: ('0', '1200', '1800', '600') ; time
581            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
582        '''
583        from eccodes import codes_index_get
584
585        iid = None
586        index_keys = keys
587
588        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
589        silent_remove(indexfile)
590        grib = GribUtil(inputfiles.files)
591        # creates new index file
592        iid = grib.index(index_keys=index_keys, index_file=indexfile)
593
594        # read the values of index keys
595        index_vals = []
596        for key in index_keys:
597            key_vals = codes_index_get(iid, key)
598            # have to sort the key values for correct order,
599            # therefore convert to int first
600            key_vals = [int(k) for k in key_vals]
601            key_vals.sort()
602            key_vals = [str(k) for k in key_vals]
603            index_vals.append(key_vals)
604            # index_vals looks for example like:
605            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
606            # index_vals[1]: ('0', '1200') ; time
607            # index_vals[2]: (3', '6', '9', '12') ; stepRange
608
609        return iid, index_vals
610
611
612    def retrieve(self, server, dates, public, request, inputdir='.'):
613        '''Finalizing the retrieval information by setting final details
614        depending on grid type.
615        Prepares MARS retrievals per grid type and submits them.
616
617        Parameters
618        ----------
619        server : ECMWFService or ECMWFDataServer
620            The connection to the ECMWF server. This is different
621            for member state users which have full access and non
622            member state users which have only access to the public
623            data sets. The decision is made from command line argument
624            "public"; for public access its True (ECMWFDataServer)
625            for member state users its False (ECMWFService)
626
627        dates : str
628            Contains start and end date of the retrieval in the format
629            "YYYYMMDD/to/YYYYMMDD"
630
631        request : int
632            Selects the mode of retrieval.
633            0: Retrieves the data from ECMWF.
634            1: Prints the mars requests to an output file.
635            2: Retrieves the data and prints the mars request.
636
637        inputdir : str, optional
638            Path to the directory where the retrieved data is about
639            to be stored. The default is the current directory ('.').
640
641        Return
642        ------
643
644        '''
645        self.dates = dates
646        self.server = server
647        self.public = public
648        self.inputdir = inputdir
649        oro = False
650
651        # define times with datetime module
652        t12h = timedelta(hours=12)
653        t24h = timedelta(hours=24)
654
655        # dictionary which contains all parameter for the mars request,
656        # entries with a "None" will change in different requests and will
657        # therefore be set in each request seperately
658        retr_param_dict = {'marsclass':self.marsclass,
659                           'dataset':self.dataset,
660                           'stream':None,
661                           'type':None,
662                           'levtype':None,
663                           'levelist':None,
664                           'resol':self.resol,
665                           'gaussian':None,
666                           'accuracy':self.accuracy,
667                           'grid':None,
668                           'target':None,
669                           'area':None,
670                           'date':None,
671                           'time':None,
672                           'number':self.number,
673                           'step':None,
674                           'expver':self.expver,
675                           'param':None}
676
677        for ftype in sorted(self.types):
678            # ftype contains field types such as
679            #     [AN, FC, PF, CV]
680            for pk, pv in sorted(self.params.items()):
681                # pk contains one of these keys of params
682                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
683                #      OG_OROLSM_SL, OG_acc_SL]
684                # pv contains all of the items of the belonging key
685                #     [param, levtype, levelist, grid]
686                if isinstance(pv, str):
687                    continue
688                retr_param_dict['type'] = ftype
689                retr_param_dict['time'] = self.types[ftype]['times']
690                retr_param_dict['step'] = self.types[ftype]['steps']
691                retr_param_dict['date'] = self.dates
692                retr_param_dict['stream'] = self.stream
693                retr_param_dict['target'] = \
694                    self._mk_targetname(ftype,
695                                        pk,
696                                        retr_param_dict['date'].split('/')[0])
697                table128 = init128(_config.PATH_GRIBTABLE)
698                ids = to_param_id_with_tablenumber(pv[0], table128)
699                retr_param_dict['param'] = ids
700                retr_param_dict['levtype'] = pv[1]
701                retr_param_dict['levelist'] = pv[2]
702                retr_param_dict['grid'] = pv[3]
703                retr_param_dict['area'] = self.area
704                retr_param_dict['gaussian'] = self.gaussian
705
706                if pk == 'OG_OROLSM__SL' and not oro:
707                    oro = True
708                    # in CERA20C (class EP) there is no stream "OPER"!
709                    if self.marsclass.upper() != 'EP':
710                        retr_param_dict['stream'] = 'OPER'
711                    retr_param_dict['type'] = 'AN'
712                    retr_param_dict['time'] = '00'
713                    retr_param_dict['step'] = '000'
714                    retr_param_dict['date'] = self.dates.split('/')[0]
715                    retr_param_dict['target'] = self._mk_targetname('',
716                                                                    pk,
717                                                                    retr_param_dict['date'])
718                elif pk == 'OG_OROLSM__SL' and oro:
719                    continue
720                if pk == 'GG__SL' and pv[0] == 'Q':
721                    retr_param_dict['area'] = ""
722                    retr_param_dict['gaussian'] = 'reduced'
723                if ftype.upper() == 'FC' and \
724                    'acc' not in retr_param_dict['target']:
725                    if (int(retr_param_dict['time'][0]) +
726                        int(retr_param_dict['step'][0])) > 23:
727                        dates = retr_param_dict['date'].split('/')
728                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
729                        sdate = sdate - timedelta(days=1)
730                        retr_param_dict['date'] = '/'.join(
731                            [sdate.strftime("%Y%m%d")] +
732                            retr_param_dict['date'][1:])
733
734                        print('CHANGED FC start date to ' +
735                              sdate.strftime("%Y%m%d") +
736                              ' to accomodate TIME=' +
737                              retr_param_dict['time'][0] +
738                              ', STEP=' +
739                              retr_param_dict['time'][0])
740
741    # ------  on demand path  --------------------------------------------------
742                if self.basetime is None:
743                    # ******* start retrievement
744                    self._start_retrievement(request, retr_param_dict)
745    # ------  operational path  ------------------------------------------------
746                else:
747                    # check if mars job requests fields beyond basetime.
748                    # if yes eliminate those fields since they may not
749                    # be accessible with user's credentials
750
751                    enddate = retr_param_dict['date'].split('/')[-1]
752                    elimit = datetime.strptime(enddate + str(self.basetime),
753                                               '%Y%m%d%H')
754
755                    if self.basetime == 12:
756                        # --------------  flux data ----------------------------
757                        if 'acc' in pk:
758                            startdate = retr_param_dict['date'].split('/')[0]
759                            enddate = datetime.strftime(elimit - t24h, '%Y%m%d')
760                            retr_param_dict['date'] = '/'.join([startdate,
761                                                                'to',
762                                                                enddate])
763
764                            # ******* start retrievement
765                            self._start_retrievement(request, retr_param_dict)
766
767                            retr_param_dict['date'] = \
768                                datetime.strftime(elimit - t12h, '%Y%m%d')
769                            retr_param_dict['time'] = '00'
770                            retr_param_dict['target'] = \
771                                self._mk_targetname(ftype, pk,
772                                                    retr_param_dict['date'])
773
774                            # ******* start retrievement
775                            self._start_retrievement(request, retr_param_dict)
776
777                        # --------------  non flux data ------------------------
778                        else:
779                            # ******* start retrievement
780                            self._start_retrievement(request, retr_param_dict)
781
782                    elif self.basetime == 0:
783                        retr_param_dict['date'] = \
784                            datetime.strftime(elimit - t24h, '%Y%m%d')
785
786                        timesave = ''.join(retr_param_dict['time'])
787
788                        if all(['/' in retr_param_dict['time'],
789                                pk != 'OG_OROLSM__SL',
790                                'acc' not in pk]):
791                            times = retr_param_dict['time'].split('/')
792                            steps = retr_param_dict['step'].split('/')
793
794                            while int(times[0]) + int(steps[0]) <= 12:
795                                times = times[1:]
796                                if len(times) > 1:
797                                    retr_param_dict['time'] = '/'.join(times)
798                                else:
799                                    retr_param_dict['time'] = times[0]
800
801                        if all([pk != 'OG_OROLSM__SL',
802                                int(retr_param_dict['step'].split('/')[0]) == 0,
803                                int(timesave.split('/')[0]) == 0]):
804
805                            retr_param_dict['date'] = \
806                                datetime.strftime(elimit, '%Y%m%d')
807                            retr_param_dict['time'] = '00'
808                            retr_param_dict['step'] = '000'
809                            retr_param_dict['target'] = \
810                                self._mk_targetname(ftype, pk,
811                                                    retr_param_dict['date'])
812
813                        # ******* start retrievement
814                        self._start_retrievement(request, retr_param_dict)
815                    else:
816                        raise ValueError('ERROR: Basetime has an invalid value '
817                                         '-> {}'.format(str(self.basetime)))
818
819        if request == 0 or request == 2:
820            print('MARS retrieve done ... ')
821        elif request == 1:
822            print('MARS request printed ...')
823
824        return
825
826
827    def write_namelist(self, c):
828        '''Creates a namelist file in the temporary directory and writes
829        the following values to it: maxl, maxb, mlevel,
830        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
831        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
832
833        Parameters
834        ----------
835        c : ControlFile
836            Contains all the parameters of CONTROL file and
837            command line.
838
839        filename : str
840                Name of the namelist file.
841
842        Return
843        ------
844
845        '''
846
847        from genshi.template.text import NewTextTemplate
848        from genshi.template import  TemplateLoader
849        from genshi.template.eval import  UndefinedError
850        import numpy as np
851
852        try:
853            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
854            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
855                                            cls=NewTextTemplate)
856
857            self.inputdir = c.inputdir
858            area = np.asarray(self.area.split('/')).astype(float)
859            grid = np.asarray(self.grid.split('/')).astype(float)
860
861            if area[1] > area[3]:
862                area[1] -= 360
863            maxl = int(round((area[3] - area[1]) / grid[1])) + 1
864            maxb = int(round((area[0] - area[2]) / grid[0])) + 1
865
866            stream = namelist_template.generate(
867                maxl=str(maxl),
868                maxb=str(maxb),
869                mlevel=str(self.level),
870                mlevelist=str(self.levelist),
871                mnauf=str(self.resol),
872                metapar='77',
873                rlo0=str(area[1]),
874                rlo1=str(area[3]),
875                rla0=str(area[2]),
876                rla1=str(area[0]),
877                momega=str(c.omega),
878                momegadiff=str(c.omegadiff),
879                mgauss=str(c.gauss),
880                msmooth=str(c.smooth),
881                meta=str(c.eta),
882                metadiff=str(c.etadiff),
883                mdpdeta=str(c.dpdeta)
884            )
885        except UndefinedError as e:
886            print('... ERROR ' + str(e))
887
888            sys.exit('\n... error occured while trying to generate namelist ' +
889                     _config.TEMPFILE_NAMELIST)
890        except OSError as e:
891            print('... ERROR CODE: ' + str(e.errno))
892            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
893
894            sys.exit('\n... error occured while trying to generate template ' +
895                     _config.TEMPFILE_NAMELIST)
896
897        try:
898            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
899
900            with open(namelistfile, 'w') as f:
901                f.write(stream.render('text'))
902        except OSError as e:
903            print('... ERROR CODE: ' + str(e.errno))
904            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
905
906            sys.exit('\n... error occured while trying to write ' +
907                     namelistfile)
908
909        return
910
911
912    def deacc_fluxes(self, inputfiles, c):
913        '''De-accumulate and disaggregate flux data.
914
915        Goes through all flux fields in ordered time and de-accumulate
916        the fields. Afterwards the fields are disaggregated in time.
917        Different versions of disaggregation is provided for rainfall
918        data (darain, modified linear) and the surface fluxes and
919        stress data (dapoly, cubic polynomial).
920
921        Parameters
922        ----------
923        inputfiles : UioFiles
924            Contains the list of files that contain flux data.
925
926        c : ControlFile
927            Contains all the parameters of CONTROL file and
928            command line.
929
930        Return
931        ------
932
933        '''
934        import numpy as np
935        from eccodes import (codes_index_select, codes_get,
936                             codes_get_values, codes_set_values, codes_set,
937                             codes_write, codes_release, codes_new_from_index,
938                             codes_index_release)
939
940        table128 = init128(_config.PATH_GRIBTABLE)
941        # get ids from the flux parameter names
942        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
943
944        iid = None
945        index_vals = None
946
947        # get the values of the keys which are used for distinct access
948        # of grib messages via product and save the maximum number of
949        # ensemble members if there is more than one
950        if '/' in self.number:
951            # more than one ensemble member is selected
952            index_keys = ["number", "date", "time", "step"]
953            # maximum ensemble number retrieved
954            # + 1 for the control run (ensemble number 0)
955            maxnum = int(self.number.split('/')[-1]) + 1
956            # remember the index of the number values
957            index_number = index_keys.index('number')
958            # empty set to save ensemble numbers which were already processed
959            ens_numbers = set()
960            # index for the ensemble number
961            inumb = 0
962        else:
963            index_keys = ["date", "time", "step"]
964            # maximum ensemble number
965            maxnum = None
966
967        # get sorted lists of the index values
968        # this is very important for disaggregating
969        # the flux data in correct order
970        iid, index_vals = self._mk_index_values(c.inputdir,
971                                                inputfiles,
972                                                index_keys)
973        # index_vals looks like e.g.:
974        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
975        # index_vals[1]: ('0', '600', '1200', '1800') ; time
976        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
977
978        if c.rrint:
979            # set start and end timestamps for retrieval period
980            if not c.purefc:
981                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
982                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
983            else:
984                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
985                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
986                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
987                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
988                end_date = end_date + timedelta(hours=c.maxstep)
989
990            # get necessary grid dimensions from grib files for storing the
991            # precipitation fields
992            info = get_informations(os.path.join(c.inputdir,
993                                                 inputfiles.files[0]))
994            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
995                                  start_date, end_date)
996
997            # create empty numpy arrays
998            if not maxnum:
999                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
1000                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
1001            else:
1002                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
1003                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
1004
1005            # index counter for time line
1006            it_lsp = 0
1007            it_cp = 0
1008
1009            # store the order of date and step
1010            date_list = []
1011            step_list = []
1012
1013        # initialize dictionaries to store flux values per parameter
1014        orig_vals = {}
1015        deac_vals = {}
1016        for p in pars:
1017            orig_vals[p] = []
1018            deac_vals[p] = []
1019
1020        # "product" genereates each possible combination between the
1021        # values of the index keys
1022        for prod in product(*index_vals):
1023            # e.g. prod = ('20170505', '0', '12')
1024            #             ( date     ,time, step)
1025
1026            print('CURRENT PRODUCT: ', prod)
1027
1028            # the whole process has to be done for each seperate ensemble member
1029            # therefore, for each new ensemble member we delete old flux values
1030            # and start collecting flux data from the beginning time step
1031            if maxnum and prod[index_number] not in ens_numbers:
1032                ens_numbers.add(prod[index_number])
1033                inumb = len(ens_numbers) - 1
1034                # re-initialize dictionaries to store flux values per parameter
1035                # for the next ensemble member
1036                it_lsp = 0
1037                it_cp = 0
1038                orig_vals = {}
1039                deac_vals = {}
1040                for p in pars:
1041                    orig_vals[p] = []
1042                    deac_vals[p] = []
1043
1044            for i in range(len(index_keys)):
1045                codes_index_select(iid, index_keys[i], prod[i])
1046
1047            # get first id from current product
1048            gid = codes_new_from_index(iid)
1049
1050            # if there is no data for this specific time combination / product
1051            # skip the rest of the for loop and start with next timestep/product
1052            if not gid:
1053                continue
1054
1055            # create correct timestamp from the three time informations
1056            cdate = str(codes_get(gid, 'date'))
1057            time = codes_get(gid, 'time') // 100  # integer
1058            step = codes_get(gid, 'step') # integer
1059            ctime = '{:0>2}'.format(time)
1060
1061            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1062            t_dt = t_date + timedelta(hours=step)
1063            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1064            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1065            if c.basetime is not None:
1066                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1067                                              '%Y%m%d%H')
1068            else:
1069                t_enddate = t_date + timedelta(2*int(c.dtime))
1070
1071            # if necessary, add ensemble member number to filename suffix
1072            # otherwise, add empty string
1073            if maxnum:
1074                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1075            else:
1076                numbersuffix = ''
1077
1078            if c.purefc:
1079                fnout = os.path.join(c.inputdir, 'flux' +
1080                                     t_date.strftime('%Y%m%d.%H') +
1081                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1082                                     numbersuffix)
1083                gnout = os.path.join(c.inputdir, 'flux' +
1084                                     t_date.strftime('%Y%m%d.%H') +
1085                                     '.{:0>3}'.format(step-int(c.dtime)) +
1086                                     numbersuffix)
1087                hnout = os.path.join(c.inputdir, 'flux' +
1088                                     t_date.strftime('%Y%m%d.%H') +
1089                                     '.{:0>3}'.format(step) +
1090                                     numbersuffix)
1091            else:
1092                fnout = os.path.join(c.inputdir, 'flux' +
1093                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1094                gnout = os.path.join(c.inputdir, 'flux' +
1095                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1096                hnout = os.path.join(c.inputdir, 'flux' +
1097                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1098
1099            print("outputfile = " + fnout)
1100            f_handle = open(fnout, 'wb')
1101            h_handle = open(hnout, 'wb')
1102            g_handle = open(gnout, 'wb')
1103
1104            # read message for message and store relevant data fields, where
1105            # data keywords are stored in pars
1106            while True:
1107                if not gid:
1108                    break
1109                parId = codes_get(gid, 'paramId') # integer
1110                step = codes_get(gid, 'step') # integer
1111                time = codes_get(gid, 'time') # integer
1112                ni = codes_get(gid, 'Ni') # integer
1113                nj = codes_get(gid, 'Nj') # integer
1114                if parId not in orig_vals.keys():
1115                    # parameter is not a flux, find next one
1116                    continue
1117
1118                # define conversion factor
1119                if parId == 142 or parId == 143:
1120                    fak = 1. / 1000.
1121                else:
1122                    fak = 3600.
1123
1124                # get parameter values and reshape
1125                values = codes_get_values(gid)
1126                values = (np.reshape(values, (nj, ni))).flatten() / fak
1127
1128                # save the original and accumulated values
1129                orig_vals[parId].append(values[:])
1130
1131                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1132                    # no de-accumulation needed
1133                    deac_vals[parId].append(values[:] / int(c.dtime))
1134                else:
1135                    # do de-accumulation
1136                    deac_vals[parId].append(
1137                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1138                        int(c.dtime))
1139
1140                # store precipitation if new disaggregation method is selected
1141                # only the exact days are needed
1142                if c.rrint:
1143                    if start_date <= t_dt <= end_date:
1144                        if not c.purefc:
1145                            if t_dt not in date_list:
1146                                date_list.append(t_dt)
1147                                step_list = [0]
1148                        else:
1149                            if t_date not in date_list:
1150                                date_list.append(t_date)
1151                            if step not in step_list:
1152                                step_list.append(step)
1153                        # store precipitation values
1154                        if maxnum and parId == 142:
1155                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1156                            it_lsp += 1
1157                        elif not maxnum and parId == 142:
1158                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
1159                            it_lsp += 1
1160                        elif maxnum and parId == 143:
1161                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1162                            it_cp += 1
1163                        elif not maxnum and parId == 143:
1164                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
1165                            it_cp += 1
1166
1167                # information printout
1168                print(parId, time, step, len(values), values[0], np.std(values))
1169
1170                # length of deac_vals[parId] corresponds to the
1171                # number of time steps, max. 4 are needed for disaggegration
1172                # with the old and original method
1173                # run over all grib messages and perform
1174                # shifting in time
1175                if len(deac_vals[parId]) >= 3:
1176                    if len(deac_vals[parId]) > 3:
1177                        if not c.rrint and (parId == 142 or parId == 143):
1178                            values = disaggregation.darain(deac_vals[parId])
1179                        else:
1180                            values = disaggregation.dapoly(deac_vals[parId])
1181
1182                        if not (step == c.maxstep and c.purefc \
1183                                or t_dt == t_enddate):
1184                            # remove first time step in list to shift
1185                            # time line
1186                            orig_vals[parId].pop(0)
1187                            deac_vals[parId].pop(0)
1188                    else:
1189                        # if the third time step is read (per parId),
1190                        # write out the first one as a boundary value
1191                        if c.purefc:
1192                            values = deac_vals[parId][1]
1193                        else:
1194                            values = deac_vals[parId][0]
1195
1196                    if not (c.rrint and (parId == 142 or parId == 143)):
1197                        codes_set_values(gid, values)
1198
1199                        if c.purefc:
1200                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1201                        else:
1202                            codes_set(gid, 'stepRange', 0)
1203                            codes_set(gid, 'time', t_m2dt.hour*100)
1204                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1205
1206                        codes_write(gid, f_handle)
1207
1208                        # squeeze out information of last two steps
1209                        # contained in deac_vals[parId]
1210                        # Note that deac_vals[parId][0] has not been popped
1211                        # in this case
1212
1213                        if step == c.maxstep and c.purefc or \
1214                           t_dt == t_enddate:
1215                            # last step
1216                            if c.purefc:
1217                                values = deac_vals[parId][3]
1218                                codes_set_values(gid, values)
1219                                codes_set(gid, 'stepRange', step)
1220                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1221                                codes_write(gid, h_handle)
1222                            else:
1223                                values = deac_vals[parId][3]
1224                                codes_set_values(gid, values)
1225                                codes_set(gid, 'stepRange', 0)
1226                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1227                                codes_set(gid, 'time', truedatetime.hour * 100)
1228                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1229                                codes_write(gid, h_handle)
1230
1231                            if parId == 142 or parId == 143:
1232                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1233                            else:
1234                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1235
1236                            # step before last step
1237                            if c.purefc:
1238                                codes_set(gid, 'stepRange', step-int(c.dtime))
1239                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1240                                codes_set_values(gid, values)
1241                                codes_write(gid, g_handle)
1242                            else:
1243                                codes_set(gid, 'stepRange', 0)
1244                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1245                                codes_set(gid, 'time', truedatetime.hour * 100)
1246                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1247                                codes_set_values(gid, values)
1248                                codes_write(gid, g_handle)
1249
1250                codes_release(gid)
1251
1252                gid = codes_new_from_index(iid)
1253
1254            f_handle.close()
1255            g_handle.close()
1256            h_handle.close()
1257
1258        codes_index_release(iid)
1259
1260        if c.rrint:
1261            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1262
1263            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1264                                 cp_np, maxnum, index_keys, index_vals, c)
1265
1266        return
1267
1268    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
1269        '''Calculates and writes out the disaggregated precipitation fields.
1270
1271        Disaggregation is done in time and original times are written to the
1272        flux files, while the additional subgrid times are written to
1273        extra files output files. They are named like the original files with
1274        suffixes "_1" and "_2" for the first and second subgrid point.
1275
1276        Parameters
1277        ----------
1278        ni : int
1279            Amount of zonal grid points.
1280
1281        nj : int
1282            Amount of meridional grid points.
1283
1284        nt : int
1285            Number of time steps.
1286
1287        lsp_np : numpy array of float
1288            The large scale precipitation fields for each time step.
1289            Shape (ni * nj, nt).
1290
1291        cp_np : numpy array of float
1292            The convective precipitation fields for each time step.
1293            Shape (ni * nj, nt).
1294
1295        maxnum : int
1296            The maximum number of ensemble members. It is None
1297            if there are no or just one ensemble.
1298
1299        index_keys : dictionary
1300            List of parameter names which serves as index.
1301
1302        index_vals : list of list  of str
1303            Contains the values from the keys used for a distinct selection
1304            of grib messages in processing  the grib files.
1305            Content looks like e.g.:
1306            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1307            index_vals[1]: ('0', '1200', '1800', '600') ; time
1308            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1309
1310        c : ControlFile
1311            Contains all the parameters of CONTROL file and
1312            command line.
1313
1314        Return
1315        ------
1316
1317        '''
1318        import numpy as np
1319
1320        print('... disaggregation of precipitation with new method.')
1321
1322        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1323
1324        # initialize new numpy arrays for disaggregated fields
1325        if maxnum:
1326            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1327            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1328        else:
1329            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1330            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1331
1332        # do the disaggregation, but neglect the last value of the
1333        # original time series. This one corresponds for example to
1334        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1335        if maxnum:
1336            for inum in range(maxnum):
1337                for ix in range(ni*nj):
1338                    lsp_new_np[inum, ix, :] = disaggregation.IA3(lsp_np[inum, ix, :])[:-1]
1339                    cp_new_np[inum, ix, :] = disaggregation.IA3(cp_np[inum, ix, :])[:-1]
1340        else:
1341            for ix in range(ni*nj):
1342                lsp_new_np[0, ix, :] = disaggregation.IA3(lsp_np[ix, :])[:-1]
1343                cp_new_np[0, ix, :] = disaggregation.IA3(cp_np[ix, :])[:-1]
1344
1345        # write to grib files (full/orig times to flux file and inbetween
1346        # times with step 1 and 2, respectively)
1347        print('... write disaggregated precipitation to files.')
1348
1349        if maxnum:
1350            # remember the index of the number values
1351            index_number = index_keys.index('number')
1352            # empty set to save unique ensemble numbers which were already processed
1353            ens_numbers = set()
1354            # index for the ensemble number
1355            inumb = 0
1356        else:
1357            inumb = 0
1358
1359        # index variable of disaggregated fields
1360        it = 0
1361
1362        # "product" genereates each possible combination between the
1363        # values of the index keys
1364        for prod in product(*index_vals):
1365            # e.g. prod = ('20170505', '0', '12')
1366            #             ( date     ,time, step)
1367            # or   prod = ('0'   , '20170505', '0', '12')
1368            #             (number, date      ,time, step)
1369
1370            cdate = prod[index_keys.index('date')]
1371            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1372            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1373
1374            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1375            date += timedelta(hours=int(cstep))
1376
1377            start_period, end_period = generate_retrieval_period_boundary(c)
1378            # skip all temporary times
1379            # which are outside the retrieval period
1380            if date < start_period or \
1381               date > end_period:
1382                continue
1383
1384            # the whole process has to be done for each seperate ensemble member
1385            # therefore, for each new ensemble member we delete old flux values
1386            # and start collecting flux data from the beginning time step
1387            if maxnum and prod[index_number] not in ens_numbers:
1388                ens_numbers.add(prod[index_number])
1389                inumb = int(prod[index_number])
1390                it = 0
1391
1392            # if necessary, add ensemble member number to filename suffix
1393            # otherwise, add empty string
1394            if maxnum:
1395                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1396            else:
1397                numbersuffix = ''
1398
1399            # per original time stamp: write original time step and
1400            # the two newly generated sub time steps
1401            if c.purefc:
1402                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1403            else:
1404                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1405
1406            # write original time step to flux file as usual
1407            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1408            fluxfile.set_keys(tmpfile, filemode='ab',
1409                              wherekeynames=['paramId'], wherekeyvalues=[142],
1410                              keynames=['perturbationNumber', 'date', 'time',
1411                                        'stepRange', 'values'],
1412                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1413                                         date.hour*100, 0, lsp_new_np[inumb, :, it]],
1414                             )
1415            fluxfile.set_keys(tmpfile, filemode='ab',
1416                              wherekeynames=['paramId'], wherekeyvalues=[143],
1417                              keynames=['perturbationNumber', 'date', 'time',
1418                                        'stepRange', 'values'],
1419                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1420                                         date.hour*100, 0, cp_new_np[inumb, :, it]]
1421                             )
1422
1423            # rr for first subgrid point is identified by step = 1
1424            fluxfile.set_keys(tmpfile, filemode='ab',
1425                              wherekeynames=['paramId'], wherekeyvalues=[142],
1426                              keynames=['perturbationNumber', 'date', 'time',
1427                                        'stepRange', 'values'],
1428                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1429                                         date.hour*100, '1', lsp_new_np[inumb, :, it+1]]
1430                             )
1431            fluxfile.set_keys(tmpfile, filemode='ab',
1432                              wherekeynames=['paramId'], wherekeyvalues=[143],
1433                              keynames=['perturbationNumber', 'date', 'time',
1434                                        'stepRange', 'values'],
1435                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1436                                         date.hour*100, '1', cp_new_np[inumb, :, it+1]]
1437                             )
1438
1439            # rr for second subgrid point is identified by step = 2
1440            fluxfile.set_keys(tmpfile, filemode='ab',
1441                              wherekeynames=['paramId'], wherekeyvalues=[142],
1442                              keynames=['perturbationNumber', 'date', 'time',
1443                                        'stepRange', 'values'],
1444                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1445                                         date.hour*100, '2', lsp_new_np[inumb, :, it+2]]
1446                             )
1447            fluxfile.set_keys(tmpfile, filemode='ab',
1448                              wherekeynames=['paramId'], wherekeyvalues=[143],
1449                              keynames=['perturbationNumber', 'date', 'time',
1450                                        'stepRange', 'values'],
1451                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1452                                         date.hour*100, '2', cp_new_np[inumb, :, it+2]]
1453                             )
1454
1455            it = it + 3 # jump to next original time step in rr fields
1456        return
1457
1458    def _create_rr_grib_dummy(self, ifile, inputdir):
1459        '''Creates a grib file with a dummy message for the two precipitation
1460        types lsp and cp each.
1461
1462        Parameters
1463        ----------
1464        ifile : str
1465            Filename of the input file to read the grib messages from.
1466
1467        inputdir : str, optional
1468            Path to the directory where the retrieved data is stored.
1469
1470        Return
1471        ------
1472
1473        '''
1474
1475        gribfile = GribUtil(os.path.join(inputdir, 'rr_grib_dummy.grb'))
1476
1477        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1478                                keyvalues=[142], filemode='wb')
1479
1480        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1481                                keyvalues=[143], filemode='ab')
1482
1483        return
1484
1485    def create(self, inputfiles, c):
1486        '''An index file will be created which depends on the combination
1487        of "date", "time" and "stepRange" values. This is used to iterate
1488        over all messages in each grib file which were passed through the
1489        parameter "inputfiles" to seperate specific parameters into fort.*
1490        files. Afterwards the FORTRAN program is called to convert
1491        the data fields all to the same grid and put them in one file
1492        per unique time step (combination of "date", "time" and
1493        "stepRange").
1494
1495        Note
1496        ----
1497        This method is based on the ECMWF example index.py
1498        https://software.ecmwf.int/wiki/display/GRIB/index.py
1499
1500        Parameters
1501        ----------
1502        inputfiles : UioFiles
1503            Contains a list of files.
1504
1505        c : ControlFile
1506            Contains all the parameters of CONTROL file and
1507            command line.
1508
1509        Return
1510        ------
1511
1512        '''
1513        from eccodes import (codes_index_select, codes_get,
1514                             codes_get_values, codes_set_values, codes_set,
1515                             codes_write, codes_release, codes_new_from_index,
1516                             codes_index_release)
1517
1518        # generate start and end timestamp of the retrieval period
1519        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1520        start_period = start_period + timedelta(hours=int(c.step[0]))
1521        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1522        end_period = end_period + timedelta(hours=int(c.step[-1]))
1523
1524        # @WRF
1525        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1526        #
1527        # UNDER CONSTRUCTION !!!
1528        #
1529        #if c.wrf:
1530        #    table128 = init128(_config.PATH_GRIBTABLE)
1531        #    wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1532        #                           stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1533        #                          table128)
1534
1535        # these numbers are indices for the temporary files "fort.xx"
1536        # which are used to seperate the grib fields to,
1537        # for the Fortran program input
1538        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1539        # 17: Q | 18: Q, SL, GG| 19: omega | 21: etadot | 22: clwc+ciwc
1540        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1541                 '17':None, '18':None, '19':None, '21':None, '22':None}
1542
1543        iid = None
1544        index_vals = None
1545
1546        # get the values of the keys which are used for distinct access
1547        # of grib messages via product
1548        if '/' in self.number:
1549            # more than one ensemble member is selected
1550            index_keys = ["number", "date", "time", "step"]
1551        else:
1552            index_keys = ["date", "time", "step"]
1553        iid, index_vals = self._mk_index_values(c.inputdir,
1554                                                inputfiles,
1555                                                index_keys)
1556        # index_vals looks like e.g.:
1557        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1558        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1559        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1560
1561        # "product" genereates each possible combination between the
1562        # values of the index keys
1563        for prod in product(*index_vals):
1564            # e.g. prod = ('20170505', '0', '12')
1565            #             (  date    ,time, step)
1566
1567            print('current product: ', prod)
1568
1569            for i in range(len(index_keys)):
1570                codes_index_select(iid, index_keys[i], prod[i])
1571
1572            # get first id from current product
1573            gid = codes_new_from_index(iid)
1574
1575            # if there is no data for this specific time combination / product
1576            # skip the rest of the for loop and start with next timestep/product
1577            if not gid:
1578                continue
1579#============================================================================================
1580            # remove old fort.* files and open new ones
1581            # they are just valid for a single product
1582            for k, f in fdict.items():
1583                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1584                silent_remove(fortfile)
1585                fdict[k] = open(fortfile, 'wb')
1586#============================================================================================
1587            # create correct timestamp from the three time informations
1588            cdate = str(codes_get(gid, 'date'))
1589            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1590            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1591            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1592            timestamp += timedelta(hours=int(cstep))
1593            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1594
1595            # if basetime is used, adapt start/end date period
1596            if c.basetime is not None:
1597                time_delta = timedelta(hours=12-int(c.dtime))
1598                start_period = datetime.strptime(c.end_date + str(c.basetime),
1599                                               '%Y%m%d%H') - time_delta
1600                end_period = datetime.strptime(c.end_date + str(c.basetime),
1601                                             '%Y%m%d%H')
1602
1603            # skip all temporary times
1604            # which are outside the retrieval period               
1605            if timestamp < start_period or \
1606               timestamp > end_period:
1607                continue
1608               
1609
1610            # @WRF
1611            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1612            #
1613            # UNDER CONSTRUCTION !!!
1614            #
1615            #if c.wrf:
1616            #    if 'olddate' not in locals() or cdate != olddate:
1617            #        fwrf = open(os.path.join(c.outputdir,
1618            #                    'WRF' + cdate + '.' + ctime + '.000.grb2'), 'wb')
1619            #        olddate = cdate[:]
1620#============================================================================================
1621            # savedfields remembers which fields were already used.
1622            savedfields = []
1623            # sum of cloud liquid and ice water content
1624            scwc = None
1625            while 1:
1626                if not gid:
1627                    break
1628                paramId = codes_get(gid, 'paramId')
1629                gridtype = codes_get(gid, 'gridType')
1630                if paramId == 77: # ETADOT
1631                    codes_write(gid, fdict['21'])
1632                elif paramId == 130: # T
1633                    codes_write(gid, fdict['11'])
1634                elif paramId == 131 or paramId == 132: # U, V wind component
1635                    codes_write(gid, fdict['10'])
1636                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1637                    codes_write(gid, fdict['17'])
1638                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1639                    codes_write(gid, fdict['18'])
1640                elif paramId == 135: # W
1641                    codes_write(gid, fdict['19'])
1642                elif paramId == 152: # LNSP
1643                    codes_write(gid, fdict['12'])
1644                elif paramId == 155 and gridtype == 'sh': # D
1645                    codes_write(gid, fdict['13'])
1646                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1647                    # sum cloud liquid water and ice
1648                    if scwc is None:
1649                        scwc = codes_get_values(gid)
1650                    else:
1651                        scwc += codes_get_values(gid)
1652                        codes_set_values(gid, scwc)
1653                        codes_set(gid, 'paramId', 201031)
1654                        codes_write(gid, fdict['22'])
1655                        scwc = None
1656                # @WRF
1657                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1658                #
1659                # UNDER CONSTRUCTION !!!
1660                #
1661                #elif c.wrf and paramId in [129, 138, 155] and \
1662                #      levtype == 'hybrid': # Z, VO, D
1663                #    # do not do anything right now
1664                #    # these are specific parameter for WRF
1665                #    pass
1666                else:
1667                    if paramId not in savedfields:
1668                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1669                        # and all ADDPAR parameter
1670                        codes_write(gid, fdict['16'])
1671                        savedfields.append(paramId)
1672                    else:
1673                        print('duplicate ' + str(paramId) + ' not written')
1674                # @WRF
1675                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1676                #
1677                # UNDER CONSTRUCTION !!!
1678                #
1679                #try:
1680                #    if c.wrf:
1681                #        # model layer
1682                #        if levtype == 'hybrid' and \
1683                #           paramId in [129, 130, 131, 132, 133, 138, 155]:
1684                #            codes_write(gid, fwrf)
1685                #        # sfc layer
1686                #        elif paramId in wrfpars:
1687                #            codes_write(gid, fwrf)
1688                #except AttributeError:
1689                #    pass
1690
1691                codes_release(gid)
1692                gid = codes_new_from_index(iid)
1693#============================================================================================
1694            for f in fdict.values():
1695                f.close()
1696#============================================================================================
1697            # call for Fortran program to convert e.g. reduced_gg grids to
1698            # regular_ll and calculate detadot/dp
1699            pwd = os.getcwd()
1700            os.chdir(c.inputdir)
1701            if os.stat('fort.21').st_size == 0 and c.eta:
1702                print('Parameter 77 (etadot) is missing, most likely it is '
1703                      'not available for this type or date / time\n')
1704                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1705                my_error('fort.21 is empty while parameter eta '
1706                         'is set to 1 in CONTROL file')
1707# ============================================================================================
1708            # write out all output to log file before starting fortran programm
1709            sys.stdout.flush()
1710
1711            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1712            execute_subprocess([os.path.join(c.exedir,
1713                                             _config.FORTRAN_EXECUTABLE)],
1714                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1715
1716            os.chdir(pwd)
1717# ============================================================================================
1718            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1719            if c.purefc:
1720                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1721            else:
1722                suffix = cdate_hour[2:10]
1723
1724            # if necessary, add ensemble member number to filename suffix
1725            if 'number' in index_keys:
1726                index_number = index_keys.index('number')
1727                if len(index_vals[index_number]) > 1:
1728                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1729
1730            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1731            print("outputfile = " + fnout)
1732            # collect for final processing
1733            self.outputfilelist.append(os.path.basename(fnout))
1734            # # get additional precipitation subgrid data if available
1735            # if c.rrint:
1736                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1737                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
1738# ============================================================================================
1739            # create outputfile and copy all data from intermediate files
1740            # to the outputfile (final GRIB input files for FLEXPART)
1741            orolsm = os.path.basename(glob.glob(c.inputdir +
1742                                                '/OG_OROLSM__SL.*.' +
1743                                                c.ppid +
1744                                                '*')[0])
1745            fluxfile = 'flux' + cdate[0:2] + suffix
1746            if not c.cwc:
1747                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1748            else:
1749                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1750
1751            with open(fnout, 'wb') as fout:
1752                for f in flist:
1753                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1754                                       fout)
1755
1756            if c.omega:
1757                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1758                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1759                                            'rb'), fout)
1760# ============================================================================================
1761
1762        # @WRF
1763        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1764        #
1765        # UNDER CONSTRUCTION !!!
1766        #
1767        #if c.wrf:
1768        #    fwrf.close()
1769
1770        codes_index_release(iid)
1771
1772        return
1773
1774
1775    def calc_extra_elda(self, path, prefix):
1776        ''' Calculates extra ensemble members for ELDA - Stream.
1777
1778        This is a specific feature which doubles the number of ensemble members
1779        for the ELDA Stream.
1780
1781        Parameters
1782        ----------
1783        path : str
1784            Path to the output files.
1785
1786        prefix : str
1787            The prefix of the output filenames as defined in Control file.
1788
1789        Return
1790        ------
1791
1792        '''
1793        from eccodes import (codes_grib_new_from_file, codes_get_array,
1794                             codes_set_array, codes_release,
1795                             codes_set, codes_write)
1796
1797        # max number
1798        maxnum = int(self.number.split('/')[-1])
1799
1800        # get a list of all prepared output files with control forecast (CF)
1801        cf_filelist = UioFiles(path, prefix + '*.N000')
1802        cf_filelist.files = sorted(cf_filelist.files)
1803
1804        for cffile in cf_filelist.files:
1805            with open(cffile, 'rb') as f:
1806                cfvalues = []
1807                while True:
1808                    fid = codes_grib_new_from_file(f)
1809                    if fid is None:
1810                        break
1811                    cfvalues.append(codes_get_array(fid, 'values'))
1812                    codes_release(fid)
1813
1814            filename = cffile.split('N000')[0]
1815            for i in range(1, maxnum + 1):
1816                # read an ensemble member
1817                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1818                # create file for newly calculated ensemble member
1819                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1820                # number of message in grib file
1821                j = 0
1822                while True:
1823                    gid = codes_grib_new_from_file(g)
1824                    if gid is None:
1825                        break
1826                    values = codes_get_array(gid, 'values')
1827                    # generate a new ensemble member by subtracting
1828                    # 2 * ( current time step value - last time step value )
1829                    codes_set_array(gid, 'values',
1830                                    values-2*(values-cfvalues[j]))
1831                    codes_set(gid, 'number', i+maxnum)
1832                    codes_write(gid, h)
1833                    codes_release(gid)
1834                    j += 1
1835
1836                g.close()
1837                h.close()
1838                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1839                self.outputfilelist.append(
1840                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1841
1842        return
1843
1844
1845    def process_output(self, c):
1846        '''Postprocessing of FLEXPART input files.
1847
1848        The grib files are postprocessed depending on the selection in
1849        CONTROL file. The resulting files are moved to the output
1850        directory if its not equal to the input directory.
1851        The following modifications might be done if
1852        properly switched in CONTROL file:
1853        GRIB2 - Conversion to GRIB2
1854        ECTRANS - Transfer of files to gateway server
1855        ECSTORAGE - Storage at ECMWF server
1856
1857        Parameters
1858        ----------
1859        c : ControlFile
1860            Contains all the parameters of CONTROL file and
1861            command line.
1862
1863        Return
1864        ------
1865
1866        '''
1867
1868        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1869
1870        if _config.FLAG_ON_ECMWFSERVER:
1871            print('ecstorage: {}\n ecfsdir: {}\n'.
1872                  format(c.ecstorage, c.ecfsdir))
1873            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1874                  .format(c.ectrans, c.gateway, c.destination))
1875
1876        print('Output filelist: ')
1877        print(sorted(self.outputfilelist))
1878
1879        for ofile in self.outputfilelist:
1880            ofile = os.path.join(self.inputdir, ofile)
1881
1882            if c.format.lower() == 'grib2':
1883                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1884                                    'productDefinitionTemplateNumber=8',
1885                                    ofile, ofile + '_2'],
1886                                   error_msg='GRIB2 CONVERSION FAILED!')
1887
1888                execute_subprocess(['mv', ofile + '_2', ofile],
1889                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1890                                   'FILES FAILED!')
1891
1892            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1893                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1894                                    c.gateway, '-remote', c.destination,
1895                                    '-source', ofile],
1896                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1897
1898            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1899                execute_subprocess(['ecp', '-o', ofile,
1900                                    os.path.expandvars(c.ecfsdir)],
1901                                   error_msg='COPY OF FILES TO ECSTORAGE '
1902                                   'AREA FAILED!')
1903
1904            if c.outputdir != c.inputdir:
1905                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1906                                    c.outputdir],
1907                                   error_msg='RELOCATION OF OUTPUT FILES '
1908                                   'TO OUTPUTDIR FAILED!')
1909
1910        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG