source: flex_extract.git/Source/Python/Classes/EcFlexpart.py @ c77630a

ctbtodev
Last change on this file since c77630a was c77630a, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

BUGFIX: for python3-eccodes the file openings need to be in binary mode! added binary marker in filemode

  • Property mode set to 100644
File size: 77.2 KB
Line 
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#
27#    February 2018 - Anne Philipp (University of Vienna):
28#        - applied PEP8 style guide
29#        - added documentation
30#        - removed function getFlexpartTime in class EcFlexpart
31#        - outsourced class ControlFile
32#        - outsourced class MarsRetrieval
33#        - changed class name from EIFlexpart to EcFlexpart
34#        - applied minor code changes (style)
35#        - removed "dead code" , e.g. retrieval of Q since it is not needed
36#        - removed "times" parameter from retrieve-method since it is not used
37#        - seperated function "retrieve" into smaller functions (less code
38#          duplication, easier testing)
39#
40# @License:
41#    (C) Copyright 2014-2019.
42#    Anne Philipp, Leopold Haimberger
43#
44#    SPDX-License-Identifier: CC-BY-4.0
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58from __future__ import print_function
59
60import os
61import sys
62import glob
63import shutil
64import subprocess
65from datetime import datetime, timedelta
66
67# software specific classes and modules from flex_extract
68sys.path.append('../')
69import _config
70from Classes.GribUtil import GribUtil
71from Mods.tools import (init128, to_param_id, silent_remove, product,
72                        my_error, make_dir, get_informations, get_dimensions,
73                        execute_subprocess, to_param_id_with_tablenumber,
74                        generate_retrieval_period_boundary)
75from Classes.MarsRetrieval import MarsRetrieval
76from Classes.UioFiles import UioFiles
77import Mods.disaggregation as disaggregation
78
79# ------------------------------------------------------------------------------
80# CLASS
81# ------------------------------------------------------------------------------
82class EcFlexpart(object):
83    '''
84    Class to represent FLEXPART specific ECMWF data.
85
86    FLEXPART needs grib files in a specifc format. All necessary data fields
87    for one time step are stored in a single file. The class represents an
88    instance with all the parameter and settings necessary for retrieving
89    MARS data and modifing them so they are fitting FLEXPART needs. The class
90    is able to disaggregate the fluxes and convert grid types to the one needed
91    by FLEXPART, therefore using the FORTRAN program.
92
93    Attributes
94    ----------
95    mreq_count : int
96        Counter for the number of generated mars requests.
97
98    inputdir : str
99        Path to the directory where the retrieved data is stored.
100
101    dataset : str
102        For public datasets there is the specific naming and parameter
103        dataset which has to be used to characterize the type of
104        data.
105
106    basetime : int
107        The time for a half day retrieval. The 12 hours upfront are to be
108        retrieved.
109
110    dtime : str
111        Time step in hours.
112
113    acctype : str
114        The field type for the accumulated forecast fields.
115
116    acctime : str
117        The starting time from the accumulated forecasts.
118
119    accmaxstep : str
120        The maximum forecast step for the accumulated forecast fields.
121
122    marsclass : str
123        Characterisation of dataset.
124
125    stream : str
126        Identifies the forecasting system used to generate the data.
127
128    number : str
129        Selects the member in ensemble forecast run.
130
131    resol : str
132        Specifies the desired triangular truncation of retrieved data,
133        before carrying out any other selected post-processing.
134
135    accuracy : str
136        Specifies the number of bits per value to be used in the
137        generated GRIB coded fields.
138
139    addpar : str
140        List of additional parameters to be retrieved.
141
142    level : str
143        Specifies the maximum level.
144
145    expver : str
146        The version of the dataset.
147
148    levelist : str
149        Specifies the required levels.
150
151    glevelist : str
152        Specifies the required levels for gaussian grids.
153
154    gaussian : str
155        This parameter is deprecated and should no longer be used.
156        Specifies the desired type of Gaussian grid for the output.
157
158    grid : str
159        Specifies the output grid which can be either a Gaussian grid
160        or a Latitude/Longitude grid.
161
162    area : str
163        Specifies the desired sub-area of data to be extracted.
164
165    purefc : int
166        Switch for definition of pure forecast mode or not.
167
168    outputfilelist : list of str
169        The final list of FLEXPART ready input files.
170
171    types : dictionary
172        Determines the combination of type of fields, time and forecast step
173        to be retrieved.
174
175    params : dictionary
176        Collection of grid types and their corresponding parameters,
177        levels, level types and the grid definition.
178
179    server : ECMWFService or ECMWFDataServer
180        This is the connection to the ECMWF data servers.
181
182    public : int
183        Decides which Web API Server version is used.
184
185    dates : str
186        Contains start and end date of the retrieval in the format
187        "YYYYMMDD/to/YYYYMMDD"
188    '''
189
190    # --------------------------------------------------------------------------
191    # CLASS FUNCTIONS
192    # --------------------------------------------------------------------------
193    def __init__(self, c, fluxes=False):
194        '''Creates an object/instance of EcFlexpart with the associated
195        settings of its attributes for the retrieval.
196
197        Parameters:
198        -----------
199        c : ControlFile
200            Contains all the parameters of CONTROL file and
201            command line.
202
203        fluxes : boolean, optional
204            Decides if the flux parameter settings are stored or
205            the rest of the parameter list.
206            Default value is False.
207
208        Return
209        ------
210
211        '''
212        # set a counter for the number of generated mars requests
213        self.mreq_count = 0
214
215        self.inputdir = c.inputdir
216        self.dataset = c.dataset
217        self.basetime = c.basetime
218        self.dtime = c.dtime
219        self.acctype = c.acctype
220        self.acctime = c.acctime
221        self.accmaxstep = c.accmaxstep
222        self.marsclass = c.marsclass
223        self.stream = c.stream
224        self.number = c.number
225        self.resol = c.resol
226        self.accuracy = c.accuracy
227        self.addpar = c.addpar
228        self.level = c.level
229        self.expver = c.expver
230        self.levelist = c.levelist
231        self.glevelist = '1/to/' + c.level # in case of gaussian grid
232        self.gaussian = c.gaussian
233        self.grid = c.grid
234        self.area = c.area
235        self.purefc = c.purefc
236        self.outputfilelist = []
237
238        # Define the different types of field combinations (type, time, step)
239        self.types = {}
240        # Define the parameters and their level types, level list and
241        # grid resolution for the retrieval job
242        self.params = {}
243
244        if fluxes:
245            self._create_params_fluxes()
246        else:
247            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
248
249        if fluxes:# and not c.purefc:
250            self._create_field_types_fluxes()
251        else:
252            self._create_field_types(c.type, c.time, c.step)
253        return
254
255    def _create_field_types(self, ftype, ftime, fstep):
256        '''Create the combination of field type, time and forecast step.
257
258        Parameters:
259        -----------
260        ftype : list of str
261            List of field types.
262
263        ftime : list of str
264            The time in hours of the field.
265
266        fstep : str
267            Specifies the forecast time step from forecast base time.
268            Valid values are hours (HH) from forecast base time.
269
270        Return
271        ------
272
273        '''
274        i = 0
275        for ty, st, ti in zip(ftype, fstep, ftime):
276            btlist = list(range(len(ftime)))
277            if self.basetime == 12:
278                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
279            if self.basetime == 0:
280                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
281
282            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
283                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
284                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
285                 # (int(c.time[i]) in btlist or c.purefc):
286
287            if (i in btlist) or self.purefc:
288
289                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
290                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
291
292                    if ty not in self.types.keys():
293                        self.types[ty] = {'times': '', 'steps': ''}
294
295                    if ti not in self.types[ty]['times']:
296                        if self.types[ty]['times']:
297                            self.types[ty]['times'] += '/'
298                        self.types[ty]['times'] += ti
299
300                    if st not in self.types[ty]['steps']:
301                        if self.types[ty]['steps']:
302                            self.types[ty]['steps'] += '/'
303                        self.types[ty]['steps'] += st
304            i += 1
305
306        return
307
308    def _create_field_types_fluxes(self):
309        '''Create the combination of field type, time and forecast step
310        for the flux data.
311
312        Parameters:
313        -----------
314
315        Return
316        ------
317
318        '''
319        self.types[str(self.acctype)] = {'times': str(self.acctime),
320                                         'steps': '{}/to/{}/by/{}'.format(
321                                             self.dtime,
322                                             self.accmaxstep,
323                                             self.dtime)}
324        return
325
326    def _create_params(self, gauss, eta, omega, cwc, wrf):
327        '''Define the specific parameter settings for retrievment.
328
329        The different parameters need specific grid types and level types
330        for retrievement. We might get following combination of types
331        (depending on selection and availability):
332        (These are short cuts for the grib file names (leading sequence)
333        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
334        where:
335            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
336            ML = Model Level, SL = Surface Level
337
338        For each of this combination there is a list of parameter names,
339        the level type, the level list and the grid resolution.
340
341        There are different scenarios for data extraction from MARS:
342        1) Retrieval of etadot
343           eta=1, gauss=0, omega=0
344        2) Calculation of etadot from divergence
345           eta=0, gauss=1, omega=0
346        3) Calculation of etadot from omega (for makes sense for debugging)
347           eta=0, gauss=0, omega=1
348        4) Retrieval and Calculation of etadot (only for debugging)
349           eta=1, gauss=1, omega=0
350        5) Download also specific model and surface level data for FLEXPART-WRF
351
352        Parameters:
353        -----------
354        gauss : int
355            Gaussian grid is retrieved.
356
357        eta : int
358            Etadot parameter will be directly retrieved.
359
360        omega : int
361            The omega paramterwill be retrieved.
362
363        cwc : int
364            The cloud liquid and ice water content will be retrieved.
365
366        wrf : int
367            Additional model level and surface level data will be retrieved for
368            WRF/FLEXPART-WRF simulations.
369
370        Return
371        ------
372
373        '''
374        # SURFACE FIELDS
375        #-----------------------------------------------------------------------
376        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
377        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
378                                 'SFC', '1', self.grid]
379        if self.addpar:
380            self.params['OG__SL'][0] += self.addpar
381
382        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
383            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
384                                            'SFC', '1', self.grid]
385        else:
386            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
387                                            'SFC', '1', self.grid]
388
389        # MODEL LEVEL FIELDS
390        #-----------------------------------------------------------------------
391        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
392
393        if not gauss and eta:
394            self.params['OG__ML'][0] += '/U/V/ETADOT'
395        elif gauss and not eta:
396            self.params['GG__SL'] = ['Q', 'ML', '1',
397                                     '{}'.format((int(self.resol) + 1) // 2)]
398            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
399        elif not gauss and not eta:
400            self.params['OG__ML'][0] += '/U/V'
401        else:  # GAUSS and ETA
402            print('Warning: Collecting etadot and parameters for gaussian grid '
403                           'is a very costly parameter combination, '
404                           'use this combination only for debugging!')
405            self.params['GG__SL'] = ['Q', 'ML', '1',
406                                     '{}'.format((int(self.resol) + 1) // 2)]
407            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
408                                     '{}'.format((int(self.resol) + 1) // 2)]
409
410        if omega:
411            self.params['OG__ML'][0] += '/W'
412
413        if cwc:
414            self.params['OG__ML'][0] += '/CLWC/CIWC'
415
416        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
417        # -----------------------------------------------------------------------
418        if wrf:
419            # @WRF
420            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
421            #
422            # UNDER CONSTRUCTION !!!
423            #
424
425            print('WRF VERSION IS UNDER CONSTRUCTION!') # dummy argument
426
427            #self.params['OG__ML'][0] += '/Z/VO'
428            #if '/D' not in self.params['OG__ML'][0]:
429            #    self.params['OG__ML'][0] += '/D'
430
431            #wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
432            #           'SWVL1','SWVL2','SWVL3','SWVL4']
433            #for par in wrf_sfc:
434            #    if par not in self.params['OG__SL'][0]:
435            #        self.params['OG__SL'][0] += '/' + par
436
437        return
438
439
440    def _create_params_fluxes(self):
441        '''Define the parameter setting for flux data.
442
443        Flux data are accumulated fields in time and are stored on the
444        surface level. The leading short cut name for the grib files is:
445        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
446        acc for Accumulated Grid.
447        The params dictionary stores a list of parameter names, the level type,
448        the level list and the grid resolution.
449
450        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
451
452        Parameters:
453        -----------
454
455        Return
456        ------
457
458        '''
459        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
460                                    'SFC', '1', self.grid]
461        return
462
463
464    def _mk_targetname(self, ftype, param, date):
465        '''Creates the filename for the requested grib data to be stored in.
466        This name is passed as the "target" parameter in the request.
467
468        Parameters
469        ----------
470        ftype : str
471            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
472
473        param : str
474            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
475            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
476
477        date : str
478            The date period of the grib data to be stored in this file.
479
480        Return
481        ------
482        targetname : str
483            The target filename for the grib data.
484        '''
485        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
486                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
487
488        return targetname
489
490
491    def _start_retrievement(self, request, par_dict):
492        '''Creates the Mars Retrieval and prints or submits the request
493        depending on the status of the request variable.
494
495        Parameters
496        ----------
497        request : int
498            Selects the mode of retrieval.
499            0: Retrieves the data from ECMWF.
500            1: Prints the mars requests to an output file.
501            2: Retrieves the data and prints the mars request.
502
503        par_dict : dictionary
504            Contains all parameter which have to be set for creating the
505            Mars Retrievals. The parameter are:
506            marsclass, dataset, stream, type, levtype, levelist, resol,
507            gaussian, accuracy, grid, target, area, date, time, number,
508            step, expver, param
509
510        Return
511        ------
512
513        '''
514        # increase number of mars requests
515        self.mreq_count += 1
516
517        MR = MarsRetrieval(self.server,
518                           self.public,
519                           marsclass=par_dict['marsclass'],
520                           dataset=par_dict['dataset'],
521                           stream=par_dict['stream'],
522                           type=par_dict['type'],
523                           levtype=par_dict['levtype'],
524                           levelist=par_dict['levelist'],
525                           resol=par_dict['resol'],
526                           gaussian=par_dict['gaussian'],
527                           accuracy=par_dict['accuracy'],
528                           grid=par_dict['grid'],
529                           target=par_dict['target'],
530                           area=par_dict['area'],
531                           date=par_dict['date'],
532                           time=par_dict['time'],
533                           number=par_dict['number'],
534                           step=par_dict['step'],
535                           expver=par_dict['expver'],
536                           param=par_dict['param'])
537
538        if request == 0:
539            MR.display_info()
540            MR.data_retrieve()
541        elif request == 1:
542            MR.print_infodata_csv(self.inputdir, self.mreq_count)
543        elif request == 2:
544            MR.print_infodata_csv(self.inputdir, self.mreq_count)
545            MR.display_info()
546            MR.data_retrieve()
547        else:
548            print('Failure')
549
550        return
551
552
553    def _mk_index_values(self, inputdir, inputfiles, keys):
554        '''Creates an index file for a set of grib parameter keys.
555        The values from the index keys are returned in a list.
556
557        Parameters
558        ----------
559        keys : dictionary
560            List of parameter names which serves as index.
561
562        inputfiles : UioFiles
563            Contains a list of files.
564
565        Return
566        ------
567        iid : codes_index
568            This is a grib specific index structure to access
569            messages in a file.
570
571        index_vals : list of list  of str
572            Contains the values from the keys used for a distinct selection
573            of grib messages in processing  the grib files.
574            Content looks like e.g.:
575            index_vals[0]: ('20171106', '20171107', '20171108') ; date
576            index_vals[1]: ('0', '1200', '1800', '600') ; time
577            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
578        '''
579        from eccodes import codes_index_get
580
581        iid = None
582        index_keys = keys
583
584        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
585        silent_remove(indexfile)
586        grib = GribUtil(inputfiles.files)
587        # creates new index file
588        iid = grib.index(index_keys=index_keys, index_file=indexfile)
589
590        # read the values of index keys
591        index_vals = []
592        for key in index_keys:
593            key_vals = codes_index_get(iid, key)
594            # have to sort the key values for correct order,
595            # therefore convert to int first
596            key_vals = [int(k) for k in key_vals]
597            key_vals.sort()
598            key_vals = [str(k) for k in key_vals]
599            index_vals.append(key_vals)
600            # index_vals looks for example like:
601            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
602            # index_vals[1]: ('0', '1200') ; time
603            # index_vals[2]: (3', '6', '9', '12') ; stepRange
604
605        return iid, index_vals
606
607
608    def retrieve(self, server, dates, public, request, inputdir='.'):
609        '''Finalizing the retrieval information by setting final details
610        depending on grid type.
611        Prepares MARS retrievals per grid type and submits them.
612
613        Parameters
614        ----------
615        server : ECMWFService or ECMWFDataServer
616            The connection to the ECMWF server. This is different
617            for member state users which have full access and non
618            member state users which have only access to the public
619            data sets. The decision is made from command line argument
620            "public"; for public access its True (ECMWFDataServer)
621            for member state users its False (ECMWFService)
622
623        dates : str
624            Contains start and end date of the retrieval in the format
625            "YYYYMMDD/to/YYYYMMDD"
626
627        request : int
628            Selects the mode of retrieval.
629            0: Retrieves the data from ECMWF.
630            1: Prints the mars requests to an output file.
631            2: Retrieves the data and prints the mars request.
632
633        inputdir : str, optional
634            Path to the directory where the retrieved data is about
635            to be stored. The default is the current directory ('.').
636
637        Return
638        ------
639
640        '''
641        self.dates = dates
642        self.server = server
643        self.public = public
644        self.inputdir = inputdir
645        oro = False
646
647        # define times with datetime module
648        t12h = timedelta(hours=12)
649        t24h = timedelta(hours=24)
650
651        # dictionary which contains all parameter for the mars request,
652        # entries with a "None" will change in different requests and will
653        # therefore be set in each request seperately
654        retr_param_dict = {'marsclass':self.marsclass,
655                           'dataset':self.dataset,
656                           'stream':None,
657                           'type':None,
658                           'levtype':None,
659                           'levelist':None,
660                           'resol':self.resol,
661                           'gaussian':None,
662                           'accuracy':self.accuracy,
663                           'grid':None,
664                           'target':None,
665                           'area':None,
666                           'date':None,
667                           'time':None,
668                           'number':self.number,
669                           'step':None,
670                           'expver':self.expver,
671                           'param':None}
672
673        for ftype in sorted(self.types):
674            # ftype contains field types such as
675            #     [AN, FC, PF, CV]
676            for pk, pv in sorted(self.params.items()):
677                # pk contains one of these keys of params
678                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
679                #      OG_OROLSM_SL, OG_acc_SL]
680                # pv contains all of the items of the belonging key
681                #     [param, levtype, levelist, grid]
682                if isinstance(pv, str):
683                    continue
684                retr_param_dict['type'] = ftype
685                retr_param_dict['time'] = self.types[ftype]['times']
686                retr_param_dict['step'] = self.types[ftype]['steps']
687                retr_param_dict['date'] = self.dates
688                retr_param_dict['stream'] = self.stream
689                retr_param_dict['target'] = \
690                    self._mk_targetname(ftype,
691                                        pk,
692                                        retr_param_dict['date'].split('/')[0])
693                table128 = init128(_config.PATH_GRIBTABLE)
694                ids = to_param_id_with_tablenumber(pv[0], table128)
695                retr_param_dict['param'] = ids
696                retr_param_dict['levtype'] = pv[1]
697                retr_param_dict['levelist'] = pv[2]
698                retr_param_dict['grid'] = pv[3]
699                retr_param_dict['area'] = self.area
700                retr_param_dict['gaussian'] = self.gaussian
701
702                if pk == 'OG_OROLSM__SL' and not oro:
703                    oro = True
704                    # in CERA20C (class EP) there is no stream "OPER"!
705                    if self.marsclass.upper() != 'EP':
706                        retr_param_dict['stream'] = 'OPER'
707                    retr_param_dict['type'] = 'AN'
708                    retr_param_dict['time'] = '00'
709                    retr_param_dict['step'] = '000'
710                    retr_param_dict['date'] = self.dates.split('/')[0]
711                    retr_param_dict['target'] = self._mk_targetname('',
712                                            pk, retr_param_dict['date'])
713                elif pk == 'OG_OROLSM__SL' and oro:
714                    continue
715                if pk == 'GG__SL' and pv[0] == 'Q':
716                    retr_param_dict['area'] = ""
717                    retr_param_dict['gaussian'] = 'reduced'
718                if ftype.upper() == 'FC' and \
719                    'acc' not in retr_param_dict['target']:
720                    if (int(retr_param_dict['time'][0]) +
721                        int(retr_param_dict['step'][0])) > 23:
722                        dates = retr_param_dict['date'].split('/')
723                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
724                        sdate = sdate - timedelta(days=1)
725                        retr_param_dict['date'] = '/'.join(
726                            [sdate.strftime("%Y%m%d")] +
727                            retr_param_dict['date'][1:])
728
729                        print('CHANGED FC start date to ' +
730                              sdate.strftime("%Y%m%d") +
731                              ' to accomodate TIME=' +
732                              retr_param_dict['time'][0] +
733                              ', STEP=' +
734                              retr_param_dict['time'][0])
735
736    # ------  on demand path  --------------------------------------------------
737                if self.basetime is None:
738                    # ******* start retrievement
739                    self._start_retrievement(request, retr_param_dict)
740    # ------  operational path  ------------------------------------------------
741                else:
742                    # check if mars job requests fields beyond basetime.
743                    # if yes eliminate those fields since they may not
744                    # be accessible with user's credentials
745
746                    enddate = retr_param_dict['date'].split('/')[-1]
747                    elimit = datetime.strptime(enddate + str(self.basetime),
748                                               '%Y%m%d%H')
749
750                    if self.basetime == 12:
751                        # --------------  flux data ----------------------------
752                        if 'acc' in pk:
753                            startdate = retr_param_dict['date'].split('/')[0]
754                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
755                            retr_param_dict['date'] = '/'.join([startdate,
756                                                                'to',
757                                                                enddate])
758
759                            # ******* start retrievement
760                            self._start_retrievement(request, retr_param_dict)
761
762                            retr_param_dict['date'] = \
763                                datetime.strftime(elimit - t12h, '%Y%m%d')
764                            retr_param_dict['time'] = '00'
765                            retr_param_dict['target'] = \
766                                self._mk_targetname(ftype, pk,
767                                                    retr_param_dict['date'])
768
769                            # ******* start retrievement
770                            self._start_retrievement(request, retr_param_dict)
771
772                        # --------------  non flux data ------------------------
773                        else:
774                            # ******* start retrievement
775                            self._start_retrievement(request, retr_param_dict)
776
777                    elif self.basetime == 0:
778                        retr_param_dict['date'] = \
779                            datetime.strftime(elimit - t24h, '%Y%m%d')
780
781                        timesave = ''.join(retr_param_dict['time'])
782
783                        if ('/' in retr_param_dict['time'] and
784                            pk != 'OG_OROLSM__SL' and
785                            'acc' not in pk ) :
786                            times = retr_param_dict['time'].split('/')
787                            steps = retr_param_dict['step'].split('/')
788
789                            while int(times[0]) + int(steps[0]) <= 12:
790                                times = times[1:]
791                                if len(times) > 1:
792                                    retr_param_dict['time'] = '/'.join(times)
793                                else:
794                                    retr_param_dict['time'] = times[0]
795
796                        if (pk != 'OG_OROLSM__SL' and
797                            int(retr_param_dict['step'].split('/')[0]) == 0 and
798                            int(timesave.split('/')[0]) == 0):
799
800                            retr_param_dict['date'] = \
801                                datetime.strftime(elimit, '%Y%m%d')
802                            retr_param_dict['time'] = '00'
803                            retr_param_dict['step'] = '000'
804                            retr_param_dict['target'] = \
805                                self._mk_targetname(ftype, pk,
806                                                    retr_param_dict['date'])
807
808                        # ******* start retrievement
809                        self._start_retrievement(request, retr_param_dict)
810                    else:
811                        raise ValueError('ERROR: Basetime has an invalid value '
812                                                 '-> {}'.format(str(basetime)))
813
814        if request == 0 or request == 2:
815            print('MARS retrieve done ... ')
816        elif request == 1:
817            print('MARS request printed ...')
818
819        return
820
821
822    def write_namelist(self, c):
823        '''Creates a namelist file in the temporary directory and writes
824        the following values to it: maxl, maxb, mlevel,
825        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
826        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
827
828        Parameters
829        ----------
830        c : ControlFile
831            Contains all the parameters of CONTROL file and
832            command line.
833
834        filename : str
835                Name of the namelist file.
836
837        Return
838        ------
839
840        '''
841
842        from genshi.template.text import NewTextTemplate
843        from genshi.template import  TemplateLoader
844        from genshi.template.eval import  UndefinedError
845        import numpy as np
846
847        try:
848            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
849            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
850                                            cls=NewTextTemplate)
851
852            self.inputdir = c.inputdir
853            area = np.asarray(self.area.split('/')).astype(float)
854            grid = np.asarray(self.grid.split('/')).astype(float)
855
856            if area[1] > area[3]:
857                area[1] -= 360
858            maxl = int(round((area[3] - area[1]) / grid[1])) + 1
859            maxb = int(round((area[0] - area[2]) / grid[0])) + 1
860
861            stream = namelist_template.generate(
862                maxl = str(maxl),
863                maxb = str(maxb),
864                mlevel = str(self.level),
865                mlevelist = str(self.levelist),
866                mnauf = str(self.resol),
867                metapar = '77',
868                rlo0 = str(area[1]),
869                rlo1 = str(area[3]),
870                rla0 = str(area[2]),
871                rla1 = str(area[0]),
872                momega = str(c.omega),
873                momegadiff = str(c.omegadiff),
874                mgauss = str(c.gauss),
875                msmooth = str(c.smooth),
876                meta = str(c.eta),
877                metadiff = str(c.etadiff),
878                mdpdeta = str(c.dpdeta)
879            )
880        except UndefinedError as e:
881            print('... ERROR ' + str(e))
882
883            sys.exit('\n... error occured while trying to generate namelist ' +
884                     _config.TEMPFILE_NAMELIST)
885        except OSError as e:
886            print('... ERROR CODE: ' + str(e.errno))
887            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
888
889            sys.exit('\n... error occured while trying to generate template ' +
890                     _config.TEMPFILE_NAMELIST)
891
892        try:
893            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
894
895            with open(namelistfile, 'w') as f:
896                f.write(stream.render('text'))
897        except OSError as e:
898            print('... ERROR CODE: ' + str(e.errno))
899            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
900
901            sys.exit('\n... error occured while trying to write ' +
902                     namelistfile)
903
904        return
905
906
907    def deacc_fluxes(self, inputfiles, c):
908        '''De-accumulate and disaggregate flux data.
909
910        Goes through all flux fields in ordered time and de-accumulate
911        the fields. Afterwards the fields are disaggregated in time.
912        Different versions of disaggregation is provided for rainfall
913        data (darain, modified linear) and the surface fluxes and
914        stress data (dapoly, cubic polynomial).
915
916        Parameters
917        ----------
918        inputfiles : UioFiles
919            Contains the list of files that contain flux data.
920
921        c : ControlFile
922            Contains all the parameters of CONTROL file and
923            command line.
924
925        Return
926        ------
927
928        '''
929        import numpy as np
930        from eccodes import (codes_index_select, codes_new_from_index, codes_get,
931                             codes_get_values, codes_set_values, codes_set,
932                             codes_write, codes_release, codes_new_from_index,
933                             codes_index_release)
934
935        table128 = init128(_config.PATH_GRIBTABLE)
936        # get ids from the flux parameter names
937        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
938
939        iid = None
940        index_vals = None
941
942        # get the values of the keys which are used for distinct access
943        # of grib messages via product and save the maximum number of
944        # ensemble members if there is more than one
945        if '/' in self.number:
946            # more than one ensemble member is selected
947            index_keys = ["number", "date", "time", "step"]
948            # maximum ensemble number retrieved
949            # + 1 for the control run (ensemble number 0)
950            maxnum = int(self.number.split('/')[-1]) + 1
951            # remember the index of the number values
952            index_number = index_keys.index('number')
953            # empty set to save ensemble numbers which were already processed
954            ens_numbers = set()
955            # index for the ensemble number
956            inumb = 0
957        else:
958            index_keys = ["date", "time", "step"]
959            # maximum ensemble number
960            maxnum = None
961
962        # get sorted lists of the index values
963        # this is very important for disaggregating
964        # the flux data in correct order
965        iid, index_vals = self._mk_index_values(c.inputdir,
966                                                inputfiles,
967                                                index_keys)
968        # index_vals looks like e.g.:
969        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
970        # index_vals[1]: ('0', '600', '1200', '1800') ; time
971        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
972
973        if c.rrint:
974            # set start and end timestamps for retrieval period
975            if not c.purefc:
976                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
977                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
978            else:
979                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
980                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
981                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
982                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
983                end_date = end_date + timedelta(hours=c.maxstep)
984
985            # get necessary grid dimensions from grib files for storing the
986            # precipitation fields
987            info = get_informations(os.path.join(c.inputdir,
988                                                 inputfiles.files[0]))
989            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
990                                  start_date, end_date)
991
992            # create empty numpy arrays
993            if not maxnum:
994                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
995                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
996            else:
997                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
998                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
999
1000            # index counter for time line
1001            it_lsp = 0
1002            it_cp = 0
1003
1004            # store the order of date and step
1005            date_list = []
1006            step_list = []
1007
1008        # initialize dictionaries to store flux values per parameter
1009        orig_vals = {}
1010        deac_vals = {}
1011        for p in pars:
1012            orig_vals[p] = []
1013            deac_vals[p] = []
1014
1015        # "product" genereates each possible combination between the
1016        # values of the index keys
1017        for prod in product(*index_vals):
1018            # e.g. prod = ('20170505', '0', '12')
1019            #             ( date     ,time, step)
1020
1021            print('CURRENT PRODUCT: ', prod)
1022
1023            # the whole process has to be done for each seperate ensemble member
1024            # therefore, for each new ensemble member we delete old flux values
1025            # and start collecting flux data from the beginning time step
1026            if maxnum and prod[index_number] not in ens_numbers:
1027                ens_numbers.add(prod[index_number])
1028                inumb = len(ens_numbers) - 1
1029                # re-initialize dictionaries to store flux values per parameter
1030                # for the next ensemble member
1031                it_lsp = 0
1032                it_cp = 0
1033                orig_vals = {}
1034                deac_vals = {}
1035                for p in pars:
1036                    orig_vals[p] = []
1037                    deac_vals[p] = []
1038
1039            for i in range(len(index_keys)):
1040                codes_index_select(iid, index_keys[i], prod[i])
1041
1042            # get first id from current product
1043            gid = codes_new_from_index(iid)
1044
1045            # if there is no data for this specific time combination / product
1046            # skip the rest of the for loop and start with next timestep/product
1047            if not gid:
1048                continue
1049
1050            # create correct timestamp from the three time informations
1051            cdate = str(codes_get(gid, 'date'))
1052            time = codes_get(gid, 'time') // 100  # integer
1053            step = codes_get(gid, 'step') # integer
1054            ctime = '{:0>2}'.format(time)
1055            cstep = '{:0>3}'.format(step)
1056
1057            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1058            t_dt = t_date + timedelta(hours=step)
1059            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1060            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1061            if c.basetime is not None:
1062                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1063                                              '%Y%m%d%H')
1064            else:
1065                t_enddate = t_date + timedelta(2*int(c.dtime))
1066
1067            # if necessary, add ensemble member number to filename suffix
1068            # otherwise, add empty string
1069            if maxnum:
1070                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1071            else:
1072                numbersuffix = ''
1073
1074            if c.purefc:
1075                fnout = os.path.join(c.inputdir, 'flux' +
1076                                     t_date.strftime('%Y%m%d.%H') +
1077                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1078                                     numbersuffix)
1079                gnout = os.path.join(c.inputdir, 'flux' +
1080                                     t_date.strftime('%Y%m%d.%H') +
1081                                     '.{:0>3}'.format(step-int(c.dtime)) +
1082                                     numbersuffix)
1083                hnout = os.path.join(c.inputdir, 'flux' +
1084                                     t_date.strftime('%Y%m%d.%H') +
1085                                     '.{:0>3}'.format(step) +
1086                                     numbersuffix)
1087            else:
1088                fnout = os.path.join(c.inputdir, 'flux' +
1089                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1090                gnout = os.path.join(c.inputdir, 'flux' +
1091                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1092                hnout = os.path.join(c.inputdir, 'flux' +
1093                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1094
1095            print("outputfile = " + fnout)
1096            f_handle = open(fnout, 'wb')
1097            h_handle = open(hnout, 'wb')
1098            g_handle = open(gnout, 'wb')
1099
1100            # read message for message and store relevant data fields, where
1101            # data keywords are stored in pars
1102            while True:
1103                if not gid:
1104                    break
1105                parId = codes_get(gid, 'paramId') # integer
1106                step = codes_get(gid, 'step') # integer
1107                time = codes_get(gid, 'time') # integer
1108                ni = codes_get(gid, 'Ni') # integer
1109                nj = codes_get(gid, 'Nj') # integer
1110                if parId not in orig_vals.keys():
1111                    # parameter is not a flux, find next one
1112                    continue
1113
1114                # define conversion factor
1115                if parId == 142 or parId == 143:
1116                    fak = 1. / 1000.
1117                else:
1118                    fak = 3600.
1119
1120                # get parameter values and reshape
1121                values = codes_get_values(gid)
1122                values = (np.reshape(values, (nj, ni))).flatten() / fak
1123
1124                # save the original and accumulated values
1125                orig_vals[parId].append(values[:])
1126
1127                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1128                    # no de-accumulation needed
1129                    deac_vals[parId].append(values[:] / int(c.dtime))
1130                else:
1131                    # do de-accumulation
1132                    deac_vals[parId].append(
1133                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1134                         int(c.dtime))
1135
1136                # store precipitation if new disaggregation method is selected
1137                # only the exact days are needed
1138                if c.rrint:
1139                    if start_date <= t_dt <= end_date:
1140                        if not c.purefc:
1141                            if t_dt not in date_list:
1142                                date_list.append(t_dt)
1143                                step_list = [0]
1144                        else:
1145                            if t_date not in date_list:
1146                                date_list.append(t_date)
1147                            if step not in step_list:
1148                                step_list.append(step)
1149                        # store precipitation values
1150                        if maxnum and parId == 142:
1151                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1152                            it_lsp += 1
1153                        elif not maxnum and parId == 142:
1154                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
1155                            it_lsp += 1
1156                        elif maxnum and parId == 143:
1157                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1158                            it_cp += 1
1159                        elif not maxnum and parId == 143:
1160                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
1161                            it_cp += 1
1162
1163                # information printout
1164                print(parId, time, step, len(values), values[0], np.std(values))
1165
1166                # length of deac_vals[parId] corresponds to the
1167                # number of time steps, max. 4 are needed for disaggegration
1168                # with the old and original method
1169                # run over all grib messages and perform
1170                # shifting in time
1171                if len(deac_vals[parId]) >= 3:
1172                    if len(deac_vals[parId]) > 3:
1173                        if not c.rrint and (parId == 142 or parId == 143):
1174                            values = disaggregation.darain(deac_vals[parId])
1175                        else:
1176                            values = disaggregation.dapoly(deac_vals[parId])
1177
1178                        if not (step == c.maxstep and c.purefc \
1179                                or t_dt == t_enddate):
1180                            # remove first time step in list to shift
1181                            # time line
1182                            orig_vals[parId].pop(0)
1183                            deac_vals[parId].pop(0)
1184                    else:
1185                        # if the third time step is read (per parId),
1186                        # write out the first one as a boundary value
1187                        if c.purefc:
1188                            values = deac_vals[parId][1]
1189                        else:
1190                            values = deac_vals[parId][0]
1191
1192                    if not (c.rrint and (parId == 142 or parId == 143)):
1193                        codes_set_values(gid, values)
1194
1195                        if c.purefc:
1196                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1197                        else:
1198                            codes_set(gid, 'stepRange', 0)
1199                            codes_set(gid, 'time', t_m2dt.hour*100)
1200                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1201
1202                        codes_write(gid, f_handle)
1203
1204                        # squeeze out information of last two steps
1205                        # contained in deac_vals[parId]
1206                        # Note that deac_vals[parId][0] has not been popped
1207                        # in this case
1208
1209                        if step == c.maxstep and c.purefc or \
1210                           t_dt == t_enddate:
1211                            # last step
1212                            if c.purefc:
1213                                values = deac_vals[parId][3]
1214                                codes_set_values(gid, values)
1215                                codes_set(gid, 'stepRange', step)
1216                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1217                                codes_write(gid, h_handle)
1218                            else:
1219                                values = deac_vals[parId][3]
1220                                codes_set_values(gid, values)
1221                                codes_set(gid, 'stepRange', 0)
1222                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1223                                codes_set(gid, 'time', truedatetime.hour * 100)
1224                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1225                                codes_write(gid, h_handle)
1226
1227                            if parId == 142 or parId == 143:
1228                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1229                            else:
1230                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1231
1232                            # step before last step
1233                            if c.purefc:
1234                                codes_set(gid, 'stepRange', step-int(c.dtime))
1235                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1236                                codes_set_values(gid, values)
1237                                codes_write(gid, g_handle)
1238                            else:
1239                                codes_set(gid, 'stepRange', 0)
1240                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1241                                codes_set(gid, 'time', truedatetime.hour * 100)
1242                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1243                                codes_set_values(gid, values)
1244                                codes_write(gid, g_handle)
1245
1246                codes_release(gid)
1247
1248                gid = codes_new_from_index(iid)
1249
1250            f_handle.close()
1251            g_handle.close()
1252            h_handle.close()
1253
1254        codes_index_release(iid)
1255
1256        if c.rrint:
1257            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1258
1259            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1260                                 cp_np, maxnum, index_keys, index_vals, c)
1261
1262        return
1263
1264    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
1265        '''Calculates and writes out the disaggregated precipitation fields.
1266
1267        Disaggregation is done in time and original times are written to the
1268        flux files, while the additional subgrid times are written to
1269        extra files output files. They are named like the original files with
1270        suffixes "_1" and "_2" for the first and second subgrid point.
1271
1272        Parameters
1273        ----------
1274        ni : int
1275            Amount of zonal grid points.
1276
1277        nj : int
1278            Amount of meridional grid points.
1279
1280        nt : int
1281            Number of time steps.
1282
1283        lsp_np : numpy array of float
1284            The large scale precipitation fields for each time step.
1285            Shape (ni * nj, nt).
1286
1287        cp_np : numpy array of float
1288            The convective precipitation fields for each time step.
1289            Shape (ni * nj, nt).
1290
1291        maxnum : int
1292            The maximum number of ensemble members. It is None
1293            if there are no or just one ensemble.
1294
1295        index_keys : dictionary
1296            List of parameter names which serves as index.
1297
1298        index_vals : list of list  of str
1299            Contains the values from the keys used for a distinct selection
1300            of grib messages in processing  the grib files.
1301            Content looks like e.g.:
1302            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1303            index_vals[1]: ('0', '1200', '1800', '600') ; time
1304            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1305
1306        c : ControlFile
1307            Contains all the parameters of CONTROL file and
1308            command line.
1309
1310        Return
1311        ------
1312
1313        '''
1314        import numpy as np
1315
1316        print('... disaggregation of precipitation with new method.')
1317
1318        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1319
1320        # initialize new numpy arrays for disaggregated fields
1321        if maxnum:
1322            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1323            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1324        else:
1325            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1326            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1327
1328        # do the disaggregation, but neglect the last value of the
1329        # original time series. This one corresponds for example to
1330        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1331        if maxnum:
1332            for inum in range(maxnum):
1333                for ix in range(ni*nj):
1334                    lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1]
1335                    cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1]
1336        else:
1337            for ix in range(ni*nj):
1338                lsp_new_np[0,ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1339                cp_new_np[0,ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1340
1341        # write to grib files (full/orig times to flux file and inbetween
1342        # times with step 1 and 2, respectively)
1343        print('... write disaggregated precipitation to files.')
1344
1345        if maxnum:
1346            # remember the index of the number values
1347            index_number = index_keys.index('number')
1348            # empty set to save unique ensemble numbers which were already processed
1349            ens_numbers = set()
1350            # index for the ensemble number
1351            inumb = 0
1352        else:
1353            inumb = 0
1354
1355        # index variable of disaggregated fields
1356        it = 0
1357
1358        # "product" genereates each possible combination between the
1359        # values of the index keys
1360        for prod in product(*index_vals):
1361            # e.g. prod = ('20170505', '0', '12')
1362            #             ( date     ,time, step)
1363            # or   prod = ('0'   , '20170505', '0', '12')
1364            #             (number, date      ,time, step)
1365
1366            cdate = prod[index_keys.index('date')]
1367            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1368            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1369
1370            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1371            date += timedelta(hours=int(cstep))
1372
1373            start_period, end_period = generate_retrieval_period_boundary(c)
1374            # skip all temporary times
1375            # which are outside the retrieval period
1376            if date < start_period or \
1377               date > end_period:
1378                continue
1379
1380            # the whole process has to be done for each seperate ensemble member
1381            # therefore, for each new ensemble member we delete old flux values
1382            # and start collecting flux data from the beginning time step
1383            if maxnum and prod[index_number] not in ens_numbers:
1384                ens_numbers.add(prod[index_number])
1385                inumb = int(prod[index_number])
1386                it = 0
1387
1388            # if necessary, add ensemble member number to filename suffix
1389            # otherwise, add empty string
1390            if maxnum:
1391                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1392            else:
1393                numbersuffix = ''
1394
1395            # per original time stamp: write original time step and
1396            # the two newly generated sub time steps
1397            if c.purefc:
1398                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1399            else:
1400                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1401
1402            # write original time step to flux file as usual
1403            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1404            fluxfile.set_keys(tmpfile, filemode='ab', strict=True,
1405                              wherekeynames=['paramId'], wherekeyvalues=[142],
1406                              keynames=['perturbationNumber','date','time','stepRange','values'],
1407                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1408                                         date.hour*100, 0, lsp_new_np[inumb,:,it]],
1409                             )
1410            fluxfile.set_keys(tmpfile, filemode='ab', strict=True,
1411                              wherekeynames=['paramId'], wherekeyvalues=[143],
1412                              keynames=['perturbationNumber','date','time','stepRange','values'],
1413                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1414                                         date.hour*100, 0, cp_new_np[inumb,:,it]]
1415                             )
1416
1417            # rr for first subgrid point is identified by step = 1
1418            fluxfile.set_keys(tmpfile, filemode='ab', strict=True,
1419                              wherekeynames=['paramId'], wherekeyvalues=[142],
1420                              keynames=['perturbationNumber','date','time','stepRange','values'],
1421                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1422                                         date.hour*100, '1', lsp_new_np[inumb,:,it+1]]
1423                              )
1424            fluxfile.set_keys(tmpfile, filemode='ab', strict=True,
1425                              wherekeynames=['paramId'], wherekeyvalues=[143],
1426                              keynames=['perturbationNumber','date','time','stepRange','values'],
1427                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1428                                         date.hour*100, '1', cp_new_np[inumb,:,it+1]]
1429                              )
1430
1431            # rr for second subgrid point is identified by step = 2
1432            fluxfile.set_keys(tmpfile, filemode='ab', strict=True,
1433                              wherekeynames=['paramId'], wherekeyvalues=[142],
1434                              keynames=['perturbationNumber','date','time','stepRange','values'],
1435                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1436                                         date.hour*100, '2', lsp_new_np[inumb,:,it+2]]
1437                              )
1438            fluxfile.set_keys(tmpfile, filemode='ab', strict=True,
1439                              wherekeynames=['paramId'], wherekeyvalues=[143],
1440                              keynames=['perturbationNumber','date','time','stepRange','values'],
1441                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1442                                         date.hour*100, '2', cp_new_np[inumb,:,it+2]]
1443                              )
1444
1445            it = it + 3 # jump to next original time step in rr fields
1446        return
1447
1448    def _create_rr_grib_dummy(self, ifile, inputdir):
1449        '''Creates a grib file with a dummy message for the two precipitation
1450        types lsp and cp each.
1451
1452        Parameters
1453        ----------
1454        ifile : str
1455            Filename of the input file to read the grib messages from.
1456
1457        inputdir : str, optional
1458            Path to the directory where the retrieved data is stored.
1459
1460        Return
1461        ------
1462
1463        '''
1464
1465        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1466
1467        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1468                      keyvalues=[142], filemode='wb')
1469
1470        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1471                      keyvalues=[143], filemode='ab')
1472
1473        return
1474
1475    def create(self, inputfiles, c):
1476        '''An index file will be created which depends on the combination
1477        of "date", "time" and "stepRange" values. This is used to iterate
1478        over all messages in each grib file which were passed through the
1479        parameter "inputfiles" to seperate specific parameters into fort.*
1480        files. Afterwards the FORTRAN program is called to convert
1481        the data fields all to the same grid and put them in one file
1482        per unique time step (combination of "date", "time" and
1483        "stepRange").
1484
1485        Note
1486        ----
1487        This method is based on the ECMWF example index.py
1488        https://software.ecmwf.int/wiki/display/GRIB/index.py
1489
1490        Parameters
1491        ----------
1492        inputfiles : UioFiles
1493            Contains a list of files.
1494
1495        c : ControlFile
1496            Contains all the parameters of CONTROL file and
1497            command line.
1498
1499        Return
1500        ------
1501
1502        '''
1503        from eccodes import (codes_index_select, codes_new_from_index, codes_get,
1504                             codes_get_values, codes_set_values, codes_set,
1505                             codes_write, codes_release, codes_new_from_index,
1506                             codes_index_release)
1507
1508        # generate start and end timestamp of the retrieval period
1509        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1510        start_period = start_period + timedelta(hours=int(c.step[0]))
1511        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1512        end_period = end_period + timedelta(hours=int(c.step[-1]))
1513
1514        # @WRF
1515        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1516        #
1517        # UNDER CONSTRUCTION !!!
1518        #
1519        #if c.wrf:
1520        #    table128 = init128(_config.PATH_GRIBTABLE)
1521        #    wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1522        #                           stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1523        #                          table128)
1524
1525        # these numbers are indices for the temporary files "fort.xx"
1526        # which are used to seperate the grib fields to,
1527        # for the Fortran program input
1528        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1529        # 17: Q | 18: Q, SL, GG| 19: omega | 21: etadot | 22: clwc+ciwc
1530        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1531                 '17':None, '18':None, '19':None, '21':None, '22':None}
1532
1533        iid = None
1534        index_vals = None
1535
1536        # get the values of the keys which are used for distinct access
1537        # of grib messages via product
1538        if '/' in self.number:
1539            # more than one ensemble member is selected
1540            index_keys = ["number", "date", "time", "step"]
1541        else:
1542            index_keys = ["date", "time", "step"]
1543        iid, index_vals = self._mk_index_values(c.inputdir,
1544                                                inputfiles,
1545                                                index_keys)
1546        # index_vals looks like e.g.:
1547        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1548        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1549        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1550
1551        # "product" genereates each possible combination between the
1552        # values of the index keys
1553        for prod in product(*index_vals):
1554            # e.g. prod = ('20170505', '0', '12')
1555            #             (  date    ,time, step)
1556
1557            print('current product: ', prod)
1558
1559            for i in range(len(index_keys)):
1560                codes_index_select(iid, index_keys[i], prod[i])
1561
1562            # get first id from current product
1563            gid = codes_new_from_index(iid)
1564
1565            # if there is no data for this specific time combination / product
1566            # skip the rest of the for loop and start with next timestep/product
1567            if not gid:
1568                continue
1569#============================================================================================
1570            # remove old fort.* files and open new ones
1571            # they are just valid for a single product
1572            for k, f in fdict.items():
1573                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1574                silent_remove(fortfile)
1575                fdict[k] = open(fortfile, 'wb')
1576#============================================================================================
1577            # create correct timestamp from the three time informations
1578            cdate = str(codes_get(gid, 'date'))
1579            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1580            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1581            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1582            timestamp += timedelta(hours=int(cstep))
1583            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1584
1585            # skip all temporary times
1586            # which are outside the retrieval period
1587            if timestamp < start_period or \
1588               timestamp > end_period:
1589                continue
1590
1591            # if the timestamp is out of basetime start/end date period,
1592            # skip this specific product
1593            if c.basetime is not None:
1594                time_delta = timedelta(hours=12-int(c.dtime))
1595                start_time = datetime.strptime(c.end_date + str(c.basetime),
1596                                                '%Y%m%d%H') - time_delta
1597                end_time = datetime.strptime(c.end_date + str(c.basetime),
1598                                             '%Y%m%d%H')
1599                if timestamp < start_time or timestamp > end_time:
1600                    continue
1601
1602            # @WRF
1603            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1604            #
1605            # UNDER CONSTRUCTION !!!
1606            #
1607            #if c.wrf:
1608            #    if 'olddate' not in locals() or cdate != olddate:
1609            #        fwrf = open(os.path.join(c.outputdir,
1610            #                    'WRF' + cdate + '.' + ctime + '.000.grb2'), 'wb')
1611            #        olddate = cdate[:]
1612#============================================================================================
1613            # savedfields remembers which fields were already used.
1614            savedfields = []
1615            # sum of cloud liquid and ice water content
1616            scwc = None
1617            while 1:
1618                if not gid:
1619                    break
1620                paramId = codes_get(gid, 'paramId')
1621                gridtype = codes_get(gid, 'gridType')
1622                levtype = codes_get(gid, 'typeOfLevel')
1623                if paramId == 77: # ETADOT
1624                    codes_write(gid, fdict['21'])
1625                elif paramId == 130: # T
1626                    codes_write(gid, fdict['11'])
1627                elif paramId == 131 or paramId == 132: # U, V wind component
1628                    codes_write(gid, fdict['10'])
1629                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1630                    codes_write(gid, fdict['17'])
1631                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1632                    codes_write(gid, fdict['18'])
1633                elif paramId == 135: # W
1634                    codes_write(gid, fdict['19'])
1635                elif paramId == 152: # LNSP
1636                    codes_write(gid, fdict['12'])
1637                elif paramId == 155 and gridtype == 'sh': # D
1638                    codes_write(gid, fdict['13'])
1639                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1640                    # sum cloud liquid water and ice
1641                    if scwc is None:
1642                        scwc = codes_get_values(gid)
1643                    else:
1644                        scwc += codes_get_values(gid)
1645                        codes_set_values(gid, scwc)
1646                        codes_set(gid, 'paramId', 201031)
1647                        codes_write(gid, fdict['22'])
1648                        scwc = None
1649                # @WRF
1650                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1651                #
1652                # UNDER CONSTRUCTION !!!
1653                #
1654                #elif c.wrf and paramId in [129, 138, 155] and \
1655                #      levtype == 'hybrid': # Z, VO, D
1656                #    # do not do anything right now
1657                #    # these are specific parameter for WRF
1658                #    pass
1659                else:
1660                    if paramId not in savedfields:
1661                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1662                        # and all ADDPAR parameter
1663                        codes_write(gid, fdict['16'])
1664                        savedfields.append(paramId)
1665                    else:
1666                        print('duplicate ' + str(paramId) + ' not written')
1667                # @WRF
1668                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1669                #
1670                # UNDER CONSTRUCTION !!!
1671                #
1672                #try:
1673                #    if c.wrf:
1674                #        # model layer
1675                #        if levtype == 'hybrid' and \
1676                #           paramId in [129, 130, 131, 132, 133, 138, 155]:
1677                #            codes_write(gid, fwrf)
1678                #        # sfc layer
1679                #        elif paramId in wrfpars:
1680                #            codes_write(gid, fwrf)
1681                #except AttributeError:
1682                #    pass
1683
1684                codes_release(gid)
1685                gid = codes_new_from_index(iid)
1686#============================================================================================
1687            for f in fdict.values():
1688                f.close()
1689#============================================================================================
1690            # call for Fortran program to convert e.g. reduced_gg grids to
1691            # regular_ll and calculate detadot/dp
1692            pwd = os.getcwd()
1693            os.chdir(c.inputdir)
1694            if os.stat('fort.21').st_size == 0 and c.eta:
1695                print('Parameter 77 (etadot) is missing, most likely it is '
1696                      'not available for this type or date / time\n')
1697                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1698                my_error('fort.21 is empty while parameter eta '
1699                         'is set to 1 in CONTROL file')
1700# ============================================================================================
1701            # write out all output to log file before starting fortran programm
1702            sys.stdout.flush()
1703
1704            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1705            execute_subprocess([os.path.join(c.exedir,
1706                                _config.FORTRAN_EXECUTABLE)],
1707                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1708
1709            os.chdir(pwd)
1710# ============================================================================================
1711            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1712            if c.purefc:
1713                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1714            else:
1715                suffix = cdate_hour[2:10]
1716
1717            # if necessary, add ensemble member number to filename suffix
1718            if 'number' in index_keys:
1719                index_number = index_keys.index('number')
1720                if len(index_vals[index_number]) > 1:
1721                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1722
1723            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1724            print("outputfile = " + fnout)
1725            # collect for final processing
1726            self.outputfilelist.append(os.path.basename(fnout))
1727            # # get additional precipitation subgrid data if available
1728            # if c.rrint:
1729                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1730                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
1731# ============================================================================================
1732            # create outputfile and copy all data from intermediate files
1733            # to the outputfile (final GRIB input files for FLEXPART)
1734            orolsm = os.path.basename(glob.glob(c.inputdir +
1735                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1736            fluxfile = 'flux' + cdate[0:2] + suffix
1737            if not c.cwc:
1738                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1739            else:
1740                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1741
1742            with open(fnout, 'wb') as fout:
1743                for f in flist:
1744                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1745                                       fout)
1746
1747            if c.omega:
1748                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1749                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1750                                            'rb'), fout)
1751# ============================================================================================
1752
1753        # @WRF
1754        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1755        #
1756        # UNDER CONSTRUCTION !!!
1757        #
1758        #if c.wrf:
1759        #    fwrf.close()
1760
1761        codes_index_release(iid)
1762
1763        return
1764
1765
1766    def calc_extra_elda(self, path, prefix):
1767        ''' Calculates extra ensemble members for ELDA - Stream.
1768
1769        This is a specific feature which doubles the number of ensemble members
1770        for the ELDA Stream.
1771
1772        Parameters
1773        ----------
1774        path : str
1775            Path to the output files.
1776
1777        prefix : str
1778            The prefix of the output filenames as defined in Control file.
1779
1780        Return
1781        ------
1782
1783        '''
1784        from eccodes import (codes_grib_new_from_file, codes_get_array,
1785                             codes_set_array, codes_release, codes_set_values,
1786                             codes_set, codes_write, codes_release)
1787
1788        # max number
1789        maxnum = int(self.number.split('/')[-1])
1790
1791        # get a list of all prepared output files with control forecast (CF)
1792        CF_filelist = UioFiles(path, prefix + '*.N000')
1793        CF_filelist.files = sorted(CF_filelist.files)
1794
1795        for cffile in CF_filelist.files:
1796            with open(cffile, 'rb') as f:
1797                cfvalues=[]
1798                while True:
1799                    fid = codes_grib_new_from_file(f)
1800                    if fid is None:
1801                        break
1802                    cfvalues.append(codes_get_array(fid, 'values'))
1803                    codes_release(fid)
1804
1805            filename = cffile.split('N000')[0]
1806            for i in range(1, maxnum + 1):
1807                # read an ensemble member
1808                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1809                # create file for newly calculated ensemble member
1810                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1811                # number of message in grib file
1812                j = 0
1813                while True:
1814                    gid = codes_grib_new_from_file(g)
1815                    if gid is None:
1816                        break
1817                    values = codes_get_array(gid, 'values')
1818                    # generate a new ensemble member by subtracting
1819                    # 2 * ( current time step value - last time step value )
1820                    codes_set_array(gid, 'values',
1821                                    values-2*(values-cfvalues[j]))
1822                    codes_set(gid, 'number', i+maxnum)
1823                    codes_write(gid, h)
1824                    codes_release(gid)
1825                    j += 1
1826
1827                g.close()
1828                h.close()
1829                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1830                self.outputfilelist.append(
1831                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1832
1833        return
1834
1835
1836    def process_output(self, c):
1837        '''Postprocessing of FLEXPART input files.
1838
1839        The grib files are postprocessed depending on the selection in
1840        CONTROL file. The resulting files are moved to the output
1841        directory if its not equal to the input directory.
1842        The following modifications might be done if
1843        properly switched in CONTROL file:
1844        GRIB2 - Conversion to GRIB2
1845        ECTRANS - Transfer of files to gateway server
1846        ECSTORAGE - Storage at ECMWF server
1847
1848        Parameters
1849        ----------
1850        c : ControlFile
1851            Contains all the parameters of CONTROL file and
1852            command line.
1853
1854        Return
1855        ------
1856
1857        '''
1858
1859        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1860
1861        if _config.FLAG_ON_ECMWFSERVER:
1862            print('ecstorage: {}\n ecfsdir: {}\n'.
1863                  format(c.ecstorage, c.ecfsdir))
1864            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1865                  .format(c.ectrans, c.gateway, c.destination))
1866
1867        print('Output filelist: ')
1868        print(sorted(self.outputfilelist))
1869
1870        for ofile in self.outputfilelist:
1871            ofile = os.path.join(self.inputdir, ofile)
1872
1873            if c.format.lower() == 'grib2':
1874                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1875                                    'productDefinitionTemplateNumber=8',
1876                                    ofile, ofile + '_2'],
1877                                   error_msg='GRIB2 CONVERSION FAILED!')
1878
1879                execute_subprocess(['mv', ofile + '_2', ofile],
1880                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1881                                   'FILES FAILED!')
1882
1883            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1884                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1885                                    c.gateway, '-remote', c.destination,
1886                                    '-source', ofile],
1887                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1888
1889            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1890                execute_subprocess(['ecp', '-o', ofile,
1891                                    os.path.expandvars(c.ecfsdir)],
1892                                   error_msg='COPY OF FILES TO ECSTORAGE '
1893                                   'AREA FAILED!')
1894
1895            if c.outputdir != c.inputdir:
1896                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1897                                    c.outputdir],
1898                                   error_msg='RELOCATION OF OUTPUT FILES '
1899                                   'TO OUTPUTDIR FAILED!')
1900
1901        return
1902
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG