source: flex_extract.git/source/python/classes/EcFlexpart.py @ d727af2

dev
Last change on this file since d727af2 was d727af2, checked in by Anne Philipp <anne.philipp@…>, 3 months ago

updated rrint processes (added missing rr for ensembles and wrote all precip fields into the output files with different steps to distingush)

  • Property mode set to 100644
File size: 78.5 KB
Line 
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get, codes_get_array,
70                     codes_set_array, codes_grib_new_from_file)
71
72# software specific classes and modules from flex_extract
73sys.path.append('../')
74import _config
75from classes.GribUtil import GribUtil
76from mods.tools import (init128, to_param_id, silent_remove, product,
77                        my_error, make_dir, get_informations, get_dimensions,
78                        execute_subprocess, to_param_id_with_tablenumber,
79                        generate_retrieval_period_boundary)
80from classes.MarsRetrieval import MarsRetrieval
81from classes.UioFiles import UioFiles
82import mods.disaggregation as disaggregation
83
84# ------------------------------------------------------------------------------
85# CLASS
86# ------------------------------------------------------------------------------
87class EcFlexpart(object):
88    '''
89    Class to represent FLEXPART specific ECMWF data.
90
91    FLEXPART needs grib files in a specifc format. All necessary data fields
92    for one time step are stored in a single file. The class represents an
93    instance with all the parameter and settings necessary for retrieving
94    MARS data and modifing them so they are fitting FLEXPART needs. The class
95    is able to disaggregate the fluxes and convert grid types to the one needed
96    by FLEXPART, therefore using the FORTRAN program.
97
98    Attributes
99    ----------
100    mreq_count : int
101        Counter for the number of generated mars requests.
102
103    inputdir : str
104        Path to the directory where the retrieved data is stored.
105
106    dataset : str
107        For public datasets there is the specific naming and parameter
108        dataset which has to be used to characterize the type of
109        data.
110
111    basetime : int
112        The time for a half day retrieval. The 12 hours upfront are to be
113        retrieved.
114
115    dtime : str
116        Time step in hours.
117
118    acctype : str
119        The field type for the accumulated forecast fields.
120
121    acctime : str
122        The starting time from the accumulated forecasts.
123
124    accmaxstep : str
125        The maximum forecast step for the accumulated forecast fields.
126
127    marsclass : str
128        Characterisation of dataset.
129
130    stream : str
131        Identifies the forecasting system used to generate the data.
132
133    number : str
134        Selects the member in ensemble forecast run.
135
136    resol : str
137        Specifies the desired triangular truncation of retrieved data,
138        before carrying out any other selected post-processing.
139
140    accuracy : str
141        Specifies the number of bits per value to be used in the
142        generated GRIB coded fields.
143
144    addpar : str
145        List of additional parameters to be retrieved.
146
147    level : str
148        Specifies the maximum level.
149
150    expver : str
151        The version of the dataset.
152
153    levelist : str
154        Specifies the required levels.
155
156    glevelist : str
157        Specifies the required levels for gaussian grids.
158
159    gaussian : str
160        This parameter is deprecated and should no longer be used.
161        Specifies the desired type of Gaussian grid for the output.
162
163    grid : str
164        Specifies the output grid which can be either a Gaussian grid
165        or a Latitude/Longitude grid.
166
167    area : str
168        Specifies the desired sub-area of data to be extracted.
169
170    purefc : int
171        Switch for definition of pure forecast mode or not.
172
173    outputfilelist : list of str
174        The final list of FLEXPART ready input files.
175
176    types : dictionary
177        Determines the combination of type of fields, time and forecast step
178        to be retrieved.
179
180    params : dictionary
181        Collection of grid types and their corresponding parameters,
182        levels, level types and the grid definition.
183
184    server : ECMWFService or ECMWFDataServer
185        This is the connection to the ECMWF data servers.
186
187    public : int
188        Decides which Web API Server version is used.
189
190    dates : str
191        Contains start and end date of the retrieval in the format
192        "YYYYMMDD/to/YYYYMMDD"
193    '''
194
195    # --------------------------------------------------------------------------
196    # CLASS FUNCTIONS
197    # --------------------------------------------------------------------------
198    def __init__(self, c, fluxes=False):
199        '''Creates an object/instance of EcFlexpart with the associated
200        settings of its attributes for the retrieval.
201
202        Parameters:
203        -----------
204        c : ControlFile
205            Contains all the parameters of CONTROL file and
206            command line.
207
208        fluxes : boolean, optional
209            Decides if the flux parameter settings are stored or
210            the rest of the parameter list.
211            Default value is False.
212
213        Return
214        ------
215
216        '''
217        # set a counter for the number of generated mars requests
218        self.mreq_count = 0
219
220        self.inputdir = c.inputdir
221        self.dataset = c.dataset
222        self.basetime = c.basetime
223        self.dtime = c.dtime
224        self.acctype = c.acctype
225        self.acctime = c.acctime
226        self.accmaxstep = c.accmaxstep
227        self.marsclass = c.marsclass
228        self.stream = c.stream
229        self.number = c.number
230        self.resol = c.resol
231        self.accuracy = c.accuracy
232        self.addpar = c.addpar
233        self.level = c.level
234        self.expver = c.expver
235        self.levelist = c.levelist
236        self.glevelist = '1/to/' + c.level # in case of gaussian grid
237        self.gaussian = c.gaussian
238        self.grid = c.grid
239        self.area = c.area
240        self.purefc = c.purefc
241        self.outputfilelist = []
242
243        # Define the different types of field combinations (type, time, step)
244        self.types = {}
245        # Define the parameters and their level types, level list and
246        # grid resolution for the retrieval job
247        self.params = {}
248
249        if fluxes:
250            self._create_params_fluxes()
251        else:
252            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
253
254        if fluxes and not c.purefc:
255            self._create_field_types_fluxes()
256        else:
257            self._create_field_types(c.type, c.time, c.step)
258        return
259
260    def _create_field_types(self, ftype, ftime, fstep):
261        '''Create the combination of field type, time and forecast step.
262
263        Parameters:
264        -----------
265        ftype : list of str
266            List of field types.
267
268        ftime : list of str
269            The time in hours of the field.
270
271        fstep : str
272            Specifies the forecast time step from forecast base time.
273            Valid values are hours (HH) from forecast base time.
274
275        Return
276        ------
277
278        '''
279        i = 0
280        for ty, st, ti in zip(ftype, fstep, ftime):
281            btlist = list(range(len(ftime)))
282            if self.basetime == 12:
283                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
284            if self.basetime == 0:
285                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
286
287            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
288                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
289                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
290                 # (int(c.time[i]) in btlist or c.purefc):
291
292            if (i in btlist) or self.purefc:
293
294                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
295                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
296
297                    if ty not in self.types.keys():
298                        self.types[ty] = {'times': '', 'steps': ''}
299
300                    if ti not in self.types[ty]['times']:
301                        if self.types[ty]['times']:
302                            self.types[ty]['times'] += '/'
303                        self.types[ty]['times'] += ti
304
305                    if st not in self.types[ty]['steps']:
306                        if self.types[ty]['steps']:
307                            self.types[ty]['steps'] += '/'
308                        self.types[ty]['steps'] += st
309            i += 1
310
311        return
312
313    def _create_field_types_fluxes(self):
314        '''Create the combination of field type, time and forecast step
315        for the flux data.
316
317        Parameters:
318        -----------
319
320        Return
321        ------
322
323        '''
324        self.types[str(self.acctype)] = {'times': str(self.acctime),
325                                         'steps': '{}/to/{}/by/{}'.format(
326                                             self.dtime,
327                                             self.accmaxstep,
328                                             self.dtime)}
329        return
330
331    def _create_params(self, gauss, eta, omega, cwc, wrf):
332        '''Define the specific parameter settings for retrievment.
333
334        The different parameters need specific grid types and level types
335        for retrievement. We might get following combination of types
336        (depending on selection and availability):
337        (These are short cuts for the grib file names (leading sequence)
338        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
339        where:
340            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
341            ML = Model Level, SL = Surface Level
342
343        For each of this combination there is a list of parameter names,
344        the level type, the level list and the grid resolution.
345
346        There are different scenarios for data extraction from MARS:
347        1) Retrieval of etadot
348           eta=1, gauss=0, omega=0
349        2) Calculation of etadot from divergence
350           eta=0, gauss=1, omega=0
351        3) Calculation of etadot from omega (for makes sense for debugging)
352           eta=0, gauss=0, omega=1
353        4) Retrieval and Calculation of etadot (only for debugging)
354           eta=1, gauss=1, omega=0
355        5) Download also specific model and surface level data for FLEXPART-WRF
356
357        Parameters:
358        -----------
359        gauss : int
360            Gaussian grid is retrieved.
361
362        eta : int
363            Etadot parameter will be directly retrieved.
364
365        omega : int
366            The omega paramterwill be retrieved.
367
368        cwc : int
369            The cloud liquid and ice water content will be retrieved.
370
371        wrf : int
372            Additional model level and surface level data will be retrieved for
373            WRF/FLEXPART-WRF simulations.
374
375        Return
376        ------
377
378        '''
379        # SURFACE FIELDS
380        #-----------------------------------------------------------------------
381        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
382        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
383                                 'SFC', '1', self.grid]
384        if self.addpar:
385            self.params['OG__SL'][0] += self.addpar
386
387        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
388            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
389                                            'SFC', '1', self.grid]
390        else:
391            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
392                                            'SFC', '1', self.grid]
393
394        # MODEL LEVEL FIELDS
395        #-----------------------------------------------------------------------
396        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
397
398        if not gauss and eta:
399            self.params['OG__ML'][0] += '/U/V/ETADOT'
400        elif gauss and not eta:
401            self.params['GG__SL'] = ['Q', 'ML', '1',
402                                     '{}'.format((int(self.resol) + 1) // 2)]
403            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
404        elif not gauss and not eta:
405            self.params['OG__ML'][0] += '/U/V'
406        else:  # GAUSS and ETA
407            print('Warning: Collecting etadot and parameters for gaussian grid '
408                           'is a very costly parameter combination, '
409                           'use this combination only for debugging!')
410            self.params['GG__SL'] = ['Q', 'ML', '1',
411                                     '{}'.format((int(self.resol) + 1) // 2)]
412            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
413                                     '{}'.format((int(self.resol) + 1) // 2)]
414
415        if omega:
416            self.params['OG__ML'][0] += '/W'
417
418        if cwc:
419            self.params['OG__ML'][0] += '/CLWC/CIWC'
420
421        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
422        # -----------------------------------------------------------------------
423        if wrf:
424            self.params['OG__ML'][0] += '/Z/VO'
425            if '/D' not in self.params['OG__ML'][0]:
426                self.params['OG__ML'][0] += '/D'
427
428            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
429                       'SWVL1','SWVL2','SWVL3','SWVL4']
430            for par in wrf_sfc:
431                if par not in self.params['OG__SL'][0]:
432                    self.params['OG__SL'][0] += '/' + par
433
434        return
435
436
437    def _create_params_fluxes(self):
438        '''Define the parameter setting for flux data.
439
440        Flux data are accumulated fields in time and are stored on the
441        surface level. The leading short cut name for the grib files is:
442        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
443        acc for Accumulated Grid.
444        The params dictionary stores a list of parameter names, the level type,
445        the level list and the grid resolution.
446
447        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
448
449        Parameters:
450        -----------
451
452        Return
453        ------
454
455        '''
456        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
457                                    'SFC', '1', self.grid]
458        return
459
460
461    def _mk_targetname(self, ftype, param, date):
462        '''Creates the filename for the requested grib data to be stored in.
463        This name is passed as the "target" parameter in the request.
464
465        Parameters
466        ----------
467        ftype : str
468            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
469
470        param : str
471            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
472            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
473
474        date : str
475            The date period of the grib data to be stored in this file.
476
477        Return
478        ------
479        targetname : str
480            The target filename for the grib data.
481        '''
482        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
483                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
484
485        return targetname
486
487
488    def _start_retrievement(self, request, par_dict):
489        '''Creates the Mars Retrieval and prints or submits the request
490        depending on the status of the request variable.
491
492        Parameters
493        ----------
494        request : int
495            Selects the mode of retrieval.
496            0: Retrieves the data from ECMWF.
497            1: Prints the mars requests to an output file.
498            2: Retrieves the data and prints the mars request.
499
500        par_dict : dictionary
501            Contains all parameter which have to be set for creating the
502            Mars Retrievals. The parameter are:
503            marsclass, dataset, stream, type, levtype, levelist, resol,
504            gaussian, accuracy, grid, target, area, date, time, number,
505            step, expver, param
506
507        Return
508        ------
509
510        '''
511        # increase number of mars requests
512        self.mreq_count += 1
513
514        MR = MarsRetrieval(self.server,
515                           self.public,
516                           marsclass=par_dict['marsclass'],
517                           dataset=par_dict['dataset'],
518                           stream=par_dict['stream'],
519                           type=par_dict['type'],
520                           levtype=par_dict['levtype'],
521                           levelist=par_dict['levelist'],
522                           resol=par_dict['resol'],
523                           gaussian=par_dict['gaussian'],
524                           accuracy=par_dict['accuracy'],
525                           grid=par_dict['grid'],
526                           target=par_dict['target'],
527                           area=par_dict['area'],
528                           date=par_dict['date'],
529                           time=par_dict['time'],
530                           number=par_dict['number'],
531                           step=par_dict['step'],
532                           expver=par_dict['expver'],
533                           param=par_dict['param'])
534
535        if request == 0:
536            MR.display_info()
537            MR.data_retrieve()
538        elif request == 1:
539            MR.print_infodata_csv(self.inputdir, self.mreq_count)
540        elif request == 2:
541            MR.print_infodata_csv(self.inputdir, self.mreq_count)
542            MR.display_info()
543            MR.data_retrieve()
544        else:
545            print('Failure')
546
547        return
548
549
550    def _mk_index_values(self, inputdir, inputfiles, keys):
551        '''Creates an index file for a set of grib parameter keys.
552        The values from the index keys are returned in a list.
553
554        Parameters
555        ----------
556        keys : dictionary
557            List of parameter names which serves as index.
558
559        inputfiles : UioFiles
560            Contains a list of files.
561
562        Return
563        ------
564        iid : codes_index
565            This is a grib specific index structure to access
566            messages in a file.
567
568        index_vals : list of list  of str
569            Contains the values from the keys used for a distinct selection
570            of grib messages in processing  the grib files.
571            Content looks like e.g.:
572            index_vals[0]: ('20171106', '20171107', '20171108') ; date
573            index_vals[1]: ('0', '1200', '1800', '600') ; time
574            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
575        '''
576        iid = None
577        index_keys = keys
578
579        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
580        silent_remove(indexfile)
581        grib = GribUtil(inputfiles.files)
582        # creates new index file
583        iid = grib.index(index_keys=index_keys, index_file=indexfile)
584
585        # read the values of index keys
586        index_vals = []
587        for key in index_keys:
588            key_vals = codes_index_get(iid, key)
589            # have to sort the key values for correct order,
590            # therefore convert to int first
591            key_vals = [int(k) for k in key_vals]
592            key_vals.sort()
593            key_vals = [str(k) for k in key_vals]
594            index_vals.append(key_vals)
595            # index_vals looks for example like:
596            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
597            # index_vals[1]: ('0', '1200') ; time
598            # index_vals[2]: (3', '6', '9', '12') ; stepRange
599
600        return iid, index_vals
601
602
603    def retrieve(self, server, dates, public, request, inputdir='.'):
604        '''Finalizing the retrieval information by setting final details
605        depending on grid type.
606        Prepares MARS retrievals per grid type and submits them.
607
608        Parameters
609        ----------
610        server : ECMWFService or ECMWFDataServer
611            The connection to the ECMWF server. This is different
612            for member state users which have full access and non
613            member state users which have only access to the public
614            data sets. The decision is made from command line argument
615            "public"; for public access its True (ECMWFDataServer)
616            for member state users its False (ECMWFService)
617
618        dates : str
619            Contains start and end date of the retrieval in the format
620            "YYYYMMDD/to/YYYYMMDD"
621
622        request : int
623            Selects the mode of retrieval.
624            0: Retrieves the data from ECMWF.
625            1: Prints the mars requests to an output file.
626            2: Retrieves the data and prints the mars request.
627
628        inputdir : str, optional
629            Path to the directory where the retrieved data is about
630            to be stored. The default is the current directory ('.').
631
632        Return
633        ------
634
635        '''
636        self.dates = dates
637        self.server = server
638        self.public = public
639        self.inputdir = inputdir
640        oro = False
641
642        # define times with datetime module
643        t12h = timedelta(hours=12)
644        t24h = timedelta(hours=24)
645
646        # dictionary which contains all parameter for the mars request,
647        # entries with a "None" will change in different requests and will
648        # therefore be set in each request seperately
649        retr_param_dict = {'marsclass':self.marsclass,
650                           'dataset':self.dataset,
651                           'stream':None,
652                           'type':None,
653                           'levtype':None,
654                           'levelist':None,
655                           'resol':self.resol,
656                           'gaussian':None,
657                           'accuracy':self.accuracy,
658                           'grid':None,
659                           'target':None,
660                           'area':None,
661                           'date':None,
662                           'time':None,
663                           'number':self.number,
664                           'step':None,
665                           'expver':self.expver,
666                           'param':None}
667
668        for ftype in self.types:
669            # ftype contains field types such as
670            #     [AN, FC, PF, CV]
671            for pk, pv in self.params.items():
672                # pk contains one of these keys of params
673                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
674                #      OG_OROLSM_SL, OG_acc_SL]
675                # pv contains all of the items of the belonging key
676                #     [param, levtype, levelist, grid]
677                if isinstance(pv, str):
678                    continue
679                retr_param_dict['type'] = ftype
680                retr_param_dict['time'] = self.types[ftype]['times']
681                retr_param_dict['step'] = self.types[ftype]['steps']
682                retr_param_dict['date'] = self.dates
683                retr_param_dict['stream'] = self.stream
684                retr_param_dict['target'] = \
685                    self._mk_targetname(ftype,
686                                        pk,
687                                        retr_param_dict['date'].split('/')[0])
688                table128 = init128(_config.PATH_GRIBTABLE)
689                ids = to_param_id_with_tablenumber(pv[0], table128)
690                retr_param_dict['param'] = ids
691                retr_param_dict['levtype'] = pv[1]
692                retr_param_dict['levelist'] = pv[2]
693                retr_param_dict['grid'] = pv[3]
694                retr_param_dict['area'] = self.area
695                retr_param_dict['gaussian'] = self.gaussian
696
697                if pk == 'OG_OROLSM__SL' and not oro:
698                    oro = True
699                    # in CERA20C (class EP) there is no stream "OPER"!
700                    if self.marsclass.upper() != 'EP':
701                        retr_param_dict['stream'] = 'OPER'
702                    retr_param_dict['type'] = 'AN'
703                    retr_param_dict['time'] = '00'
704                    retr_param_dict['step'] = '000'
705                    retr_param_dict['date'] = self.dates.split('/')[0]
706                    retr_param_dict['target'] = self._mk_targetname('',
707                                            pk, retr_param_dict['date'])
708                elif pk == 'OG_OROLSM__SL' and oro:
709                    continue
710                if pk == 'GG__SL' and pv[0] == 'Q':
711                    retr_param_dict['area'] = ""
712                    retr_param_dict['gaussian'] = 'reduced'
713                if ftype.upper() == 'FC' and \
714                    'acc' not in retr_param_dict['target']:
715                    if (int(retr_param_dict['time'][0]) +
716                        int(retr_param_dict['step'][0])) > 23:
717                        dates = retr_param_dict['date'].split('/')
718                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
719                        sdate = sdate - timedelta(days=1)
720                        retr_param_dict['date'] = '/'.join(
721                            [sdate.strftime("%Y%m%d")] +
722                            retr_param_dict['date'][1:])
723
724                        print('CHANGED FC start date to ' +
725                              sdate.strftime("%Y%m%d") +
726                              ' to accomodate TIME=' +
727                              retr_param_dict['time'][0] +
728                              ', STEP=' +
729                              retr_param_dict['time'][0])
730
731    # ------  on demand path  --------------------------------------------------
732                if self.basetime is None:
733                    # ******* start retrievement
734                    self._start_retrievement(request, retr_param_dict)
735    # ------  operational path  ------------------------------------------------
736                else:
737                    # check if mars job requests fields beyond basetime.
738                    # if yes eliminate those fields since they may not
739                    # be accessible with user's credentials
740
741                    enddate = retr_param_dict['date'].split('/')[-1]
742                    elimit = datetime.strptime(enddate + str(self.basetime),
743                                               '%Y%m%d%H')
744
745                    if self.basetime == 12:
746                        # --------------  flux data ----------------------------
747                        if 'acc' in pk:
748                            startdate = retr_param_dict['date'].split('/')[0]
749                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
750                            retr_param_dict['date'] = '/'.join([startdate,
751                                                                'to',
752                                                                enddate])
753
754                            # ******* start retrievement
755                            self._start_retrievement(request, retr_param_dict)
756
757                            retr_param_dict['date'] = \
758                                datetime.strftime(elimit - t12h, '%Y%m%d')
759                            retr_param_dict['time'] = '00'
760                            retr_param_dict['target'] = \
761                                self._mk_targetname(ftype, pk,
762                                                    retr_param_dict['date'])
763
764                            # ******* start retrievement
765                            self._start_retrievement(request, retr_param_dict)
766
767                        # --------------  non flux data ------------------------
768                        else:
769                            # ******* start retrievement
770                            self._start_retrievement(request, retr_param_dict)
771
772                    elif self.basetime == 0:
773                        retr_param_dict['date'] = \
774                            datetime.strftime(elimit - t24h, '%Y%m%d')
775
776                        timesave = ''.join(retr_param_dict['time'])
777
778                        if ('/' in retr_param_dict['time'] and
779                            pk != 'OG_OROLSM__SL' and
780                            'acc' not in pk ) :
781                            times = retr_param_dict['time'].split('/')
782                            steps = retr_param_dict['step'].split('/')
783
784                            while int(times[0]) + int(steps[0]) <= 12:
785                                times = times[1:]
786                                if len(times) > 1:
787                                    retr_param_dict['time'] = '/'.join(times)
788                                else:
789                                    retr_param_dict['time'] = times[0]
790
791                        if (pk != 'OG_OROLSM__SL' and
792                            int(retr_param_dict['step'].split('/')[0]) == 0 and
793                            int(timesave.split('/')[0]) == 0):
794
795                            retr_param_dict['date'] = \
796                                datetime.strftime(elimit, '%Y%m%d')
797                            retr_param_dict['time'] = '00'
798                            retr_param_dict['step'] = '000'
799                            retr_param_dict['target'] = \
800                                self._mk_targetname(ftype, pk,
801                                                    retr_param_dict['date'])
802
803                        # ******* start retrievement
804                        self._start_retrievement(request, retr_param_dict)
805                    else:
806                        raise ValueError('ERROR: Basetime has an invalid value '
807                                                 '-> {}'.format(str(basetime)))
808
809        if request == 0 or request == 2:
810            print('MARS retrieve done ... ')
811        elif request == 1:
812            print('MARS request printed ...')
813
814        return
815
816
817    def write_namelist(self, c):
818        '''Creates a namelist file in the temporary directory and writes
819        the following values to it: maxl, maxb, mlevel,
820        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
821        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
822
823        Parameters
824        ----------
825        c : ControlFile
826            Contains all the parameters of CONTROL file and
827            command line.
828
829        filename : str
830                Name of the namelist file.
831
832        Return
833        ------
834
835        '''
836
837        from genshi.template.text import NewTextTemplate
838        from genshi.template import  TemplateLoader
839        from genshi.template.eval import  UndefinedError
840
841        try:
842            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
843            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
844                                            cls=NewTextTemplate)
845
846            self.inputdir = c.inputdir
847            area = np.asarray(self.area.split('/')).astype(float)
848            grid = np.asarray(self.grid.split('/')).astype(float)
849
850            if area[1] > area[3]:
851                area[1] -= 360
852            maxl = int((area[3] - area[1]) / grid[1]) + 1
853            maxb = int((area[0] - area[2]) / grid[0]) + 1
854
855            stream = namelist_template.generate(
856                maxl = str(maxl),
857                maxb = str(maxb),
858                mlevel = str(self.level),
859                mlevelist = str(self.levelist),
860                mnauf = str(self.resol),
861                metapar = '77',
862                rlo0 = str(area[1]),
863                rlo1 = str(area[3]),
864                rla0 = str(area[2]),
865                rla1 = str(area[0]),
866                momega = str(c.omega),
867                momegadiff = str(c.omegadiff),
868                mgauss = str(c.gauss),
869                msmooth = str(c.smooth),
870                meta = str(c.eta),
871                metadiff = str(c.etadiff),
872                mdpdeta = str(c.dpdeta)
873            )
874        except UndefinedError as e:
875            print('... ERROR ' + str(e))
876
877            sys.exit('\n... error occured while trying to generate namelist ' +
878                     _config.TEMPFILE_NAMELIST)
879        except OSError as e:
880            print('... ERROR CODE: ' + str(e.errno))
881            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
882
883            sys.exit('\n... error occured while trying to generate template ' +
884                     _config.TEMPFILE_NAMELIST)
885
886        try:
887            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
888
889            with open(namelistfile, 'w') as f:
890                f.write(stream.render('text'))
891        except OSError as e:
892            print('... ERROR CODE: ' + str(e.errno))
893            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
894
895            sys.exit('\n... error occured while trying to write ' +
896                     namelistfile)
897
898        return
899
900
901    def deacc_fluxes(self, inputfiles, c):
902        '''De-accumulate and disaggregate flux data.
903
904        Goes through all flux fields in ordered time and de-accumulate
905        the fields. Afterwards the fields are disaggregated in time.
906        Different versions of disaggregation is provided for rainfall
907        data (darain, modified linear) and the surface fluxes and
908        stress data (dapoly, cubic polynomial).
909
910        Parameters
911        ----------
912        inputfiles : UioFiles
913            Contains the list of files that contain flux data.
914
915        c : ControlFile
916            Contains all the parameters of CONTROL file and
917            command line.
918
919        Return
920        ------
921
922        '''
923
924        table128 = init128(_config.PATH_GRIBTABLE)
925        # get ids from the flux parameter names
926        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
927
928        iid = None
929        index_vals = None
930
931        # get the values of the keys which are used for distinct access
932        # of grib messages via product and save the maximum number of
933        # ensemble members if there is more than one
934        if '/' in self.number:
935            # more than one ensemble member is selected
936            index_keys = ["number", "date", "time", "step"]
937            # maximum ensemble number retrieved
938            # + 1 for the control run (ensemble number 0)
939            maxnum = int(self.number.split('/')[-1]) + 1
940            # remember the index of the number values
941            index_number = index_keys.index('number')
942            # empty set to save ensemble numbers which were already processed
943            ens_numbers = set()
944            # index for the ensemble number
945            inumb = 0
946        else:
947            index_keys = ["date", "time", "step"]
948            # maximum ensemble number
949            maxnum = None
950
951        # get sorted lists of the index values
952        # this is very important for disaggregating
953        # the flux data in correct order
954        iid, index_vals = self._mk_index_values(c.inputdir,
955                                                inputfiles,
956                                                index_keys)
957        # index_vals looks like e.g.:
958        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
959        # index_vals[1]: ('0', '600', '1200', '1800') ; time
960        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
961
962        if c.rrint:
963            # set start and end timestamps for retrieval period
964            if not c.purefc:
965                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
966                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
967            else:
968                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
969                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
970                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
971                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
972                end_date = end_date + timedelta(hours=c.maxstep)
973
974            # get necessary grid dimensions from grib files for storing the
975            # precipitation fields
976            info = get_informations(os.path.join(c.inputdir,
977                                                 inputfiles.files[0]))
978            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
979                                  start_date, end_date)
980
981            # create empty numpy arrays
982            if not maxnum:
983                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
984                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
985            else:
986                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
987                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
988
989            # index counter for time line
990            it_lsp = 0
991            it_cp = 0
992
993            # store the order of date and step
994            date_list = []
995            step_list = []
996
997        # initialize dictionaries to store flux values per parameter
998        orig_vals = {}
999        deac_vals = {}
1000        for p in pars:
1001            orig_vals[p] = []
1002            deac_vals[p] = []
1003
1004        # "product" genereates each possible combination between the
1005        # values of the index keys
1006        for prod in product(*index_vals):
1007            # e.g. prod = ('20170505', '0', '12')
1008            #             ( date     ,time, step)
1009
1010            print('CURRENT PRODUCT: ', prod)
1011
1012            # the whole process has to be done for each seperate ensemble member
1013            # therefore, for each new ensemble member we delete old flux values
1014            # and start collecting flux data from the beginning time step
1015            if maxnum and prod[index_number] not in ens_numbers:
1016                ens_numbers.add(prod[index_number])
1017                inumb = len(ens_numbers) - 1
1018                # re-initialize dictionaries to store flux values per parameter
1019                # for the next ensemble member
1020                it_lsp = 0
1021                it_cp = 0
1022                orig_vals = {}
1023                deac_vals = {}
1024                for p in pars:
1025                    orig_vals[p] = []
1026                    deac_vals[p] = []
1027
1028            for i in range(len(index_keys)):
1029                codes_index_select(iid, index_keys[i], prod[i])
1030
1031            # get first id from current product
1032            gid = codes_new_from_index(iid)
1033
1034            # if there is no data for this specific time combination / product
1035            # skip the rest of the for loop and start with next timestep/product
1036            if not gid:
1037                continue
1038
1039            # create correct timestamp from the three time informations
1040            cdate = str(codes_get(gid, 'date'))
1041            time = codes_get(gid, 'time') // 100  # integer
1042            step = codes_get(gid, 'step') # integer
1043            ctime = '{:0>2}'.format(time)
1044            cstep = '{:0>3}'.format(step)
1045
1046            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1047            t_dt = t_date + timedelta(hours=step)
1048            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1049            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1050            if c.basetime is not None:
1051                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1052                                              '%Y%m%d%H')
1053            else:
1054                t_enddate = t_date + timedelta(2*int(c.dtime))
1055
1056            # if necessary, add ensemble member number to filename suffix
1057            # otherwise, add empty string
1058            if maxnum:
1059                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1060            else:
1061                numbersuffix = ''
1062
1063            if c.purefc:
1064                fnout = os.path.join(c.inputdir, 'flux' +
1065                                     t_date.strftime('%Y%m%d.%H') +
1066                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1067                                     numbersuffix)
1068                gnout = os.path.join(c.inputdir, 'flux' +
1069                                     t_date.strftime('%Y%m%d.%H') +
1070                                     '.{:0>3}'.format(step-int(c.dtime)) +
1071                                     numbersuffix)
1072                hnout = os.path.join(c.inputdir, 'flux' +
1073                                     t_date.strftime('%Y%m%d.%H') +
1074                                     '.{:0>3}'.format(step) +
1075                                     numbersuffix)
1076            else:
1077                fnout = os.path.join(c.inputdir, 'flux' +
1078                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1079                gnout = os.path.join(c.inputdir, 'flux' +
1080                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1081                hnout = os.path.join(c.inputdir, 'flux' +
1082                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1083
1084            print("outputfile = " + fnout)
1085            f_handle = open(fnout, 'w')
1086            h_handle = open(hnout, 'w')
1087            g_handle = open(gnout, 'w')
1088
1089            # read message for message and store relevant data fields, where
1090            # data keywords are stored in pars
1091            while True:
1092                if not gid:
1093                    break
1094                parId = codes_get(gid, 'paramId') # integer
1095                step = codes_get(gid, 'step') # integer
1096                time = codes_get(gid, 'time') # integer
1097                ni = codes_get(gid, 'Ni') # integer
1098                nj = codes_get(gid, 'Nj') # integer
1099                if parId not in orig_vals.keys():
1100                    # parameter is not a flux, find next one
1101                    continue
1102
1103                # define conversion factor
1104                if parId == 142 or parId == 143:
1105                    fak = 1. / 1000.
1106                else:
1107                    fak = 3600.
1108
1109                # get parameter values and reshape
1110                values = codes_get_values(gid)
1111                values = (np.reshape(values, (nj, ni))).flatten() / fak
1112
1113                # save the original and accumulated values
1114                orig_vals[parId].append(values[:])
1115
1116                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1117                    # no de-accumulation needed
1118                    deac_vals[parId].append(values[:] / int(c.dtime))
1119                else:
1120                    # do de-accumulation
1121                    deac_vals[parId].append(
1122                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1123                         int(c.dtime))
1124
1125                # store precipitation if new disaggregation method is selected
1126                # only the exact days are needed
1127                if c.rrint:
1128                    if start_date <= t_dt <= end_date:
1129                        if not c.purefc:
1130                            if t_dt not in date_list:
1131                                date_list.append(t_dt)
1132                                step_list = [0]
1133                        else:
1134                            if t_date not in date_list:
1135                                date_list.append(t_date)
1136                            if step not in step_list:
1137                                step_list.append(step)
1138                        # store precipitation values
1139                        if maxnum and parId == 142:
1140                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1141                            it_lsp += 1
1142                        elif not maxnum and parId == 142:
1143                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
1144                            it_lsp += 1
1145                        elif maxnum and parId == 143:
1146                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1147                            it_cp += 1
1148                        elif not maxnum and parId == 143:
1149                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
1150                            it_cp += 1
1151
1152                # information printout
1153                print(parId, time, step, len(values), values[0], np.std(values))
1154
1155                # length of deac_vals[parId] corresponds to the
1156                # number of time steps, max. 4 are needed for disaggegration
1157                # with the old and original method
1158                # run over all grib messages and perform
1159                # shifting in time
1160                if len(deac_vals[parId]) >= 3:
1161                    if len(deac_vals[parId]) > 3:
1162                        if not c.rrint and (parId == 142 or parId == 143):
1163                            values = disaggregation.darain(deac_vals[parId])
1164                        else:
1165                            values = disaggregation.dapoly(deac_vals[parId])
1166
1167                        if not (step == c.maxstep and c.purefc \
1168                                or t_dt == t_enddate):
1169                            # remove first time step in list to shift
1170                            # time line
1171                            orig_vals[parId].pop(0)
1172                            deac_vals[parId].pop(0)
1173                    else:
1174                        # if the third time step is read (per parId),
1175                        # write out the first one as a boundary value
1176                        if c.purefc:
1177                            values = deac_vals[parId][1]
1178                        else:
1179                            values = deac_vals[parId][0]
1180
1181                    if not (c.rrint and (parId == 142 or parId == 143)):
1182                        codes_set_values(gid, values)
1183
1184                        if c.purefc:
1185                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1186                        else:
1187                            codes_set(gid, 'stepRange', 0)
1188                            codes_set(gid, 'time', t_m2dt.hour*100)
1189                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1190
1191                        codes_write(gid, f_handle)
1192
1193                        # squeeze out information of last two steps
1194                        # contained in deac_vals[parId]
1195                        # Note that deac_vals[parId][0] has not been popped
1196                        # in this case
1197
1198                        if step == c.maxstep and c.purefc or \
1199                           t_dt == t_enddate:
1200                            # last step
1201                            if c.purefc:
1202                                values = deac_vals[parId][3]
1203                                codes_set_values(gid, values)
1204                                codes_set(gid, 'stepRange', step)
1205                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1206                                codes_write(gid, h_handle)
1207                            else:
1208                                values = deac_vals[parId][3]
1209                                codes_set_values(gid, values)
1210                                codes_set(gid, 'stepRange', 0)
1211                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1212                                codes_set(gid, 'time', truedatetime.hour * 100)
1213                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1214                                codes_write(gid, h_handle)
1215
1216                            if parId == 142 or parId == 143:
1217                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1218                            else:
1219                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1220
1221                            # step before last step
1222                            if c.purefc:
1223                                codes_set(gid, 'stepRange', step-int(c.dtime))
1224                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1225                                codes_set_values(gid, values)
1226                                codes_write(gid, g_handle)
1227                            else:
1228                                codes_set(gid, 'stepRange', 0)
1229                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1230                                codes_set(gid, 'time', truedatetime.hour * 100)
1231                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1232                                codes_set_values(gid, values)
1233                                codes_write(gid, g_handle)
1234
1235                codes_release(gid)
1236
1237                gid = codes_new_from_index(iid)
1238
1239            f_handle.close()
1240            g_handle.close()
1241            h_handle.close()
1242
1243        codes_index_release(iid)
1244
1245        if c.rrint:
1246            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1247
1248            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1249                                 cp_np, maxnum, index_keys, index_vals, c)
1250
1251        return
1252
1253    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
1254        '''Calculates and writes out the disaggregated precipitation fields.
1255
1256        Disaggregation is done in time and original times are written to the
1257        flux files, while the additional subgrid times are written to
1258        extra files output files. They are named like the original files with
1259        suffixes "_1" and "_2" for the first and second subgrid point.
1260
1261        Parameters
1262        ----------
1263        ni : int
1264            Amount of zonal grid points.
1265
1266        nj : int
1267            Amount of meridional grid points.
1268
1269        nt : int
1270            Number of time steps.
1271
1272        lsp_np : numpy array of float
1273            The large scale precipitation fields for each time step.
1274            Shape (ni * nj, nt).
1275
1276        cp_np : numpy array of float
1277            The convective precipitation fields for each time step.
1278            Shape (ni * nj, nt).
1279
1280        maxnum : int
1281            The maximum number of ensemble members. It is None
1282            if there are no or just one ensemble.
1283
1284        index_keys : dictionary
1285            List of parameter names which serves as index.
1286
1287        index_vals : list of list  of str
1288            Contains the values from the keys used for a distinct selection
1289            of grib messages in processing  the grib files.
1290            Content looks like e.g.:
1291            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1292            index_vals[1]: ('0', '1200', '1800', '600') ; time
1293            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1294
1295        c : ControlFile
1296            Contains all the parameters of CONTROL file and
1297            command line.
1298
1299        Return
1300        ------
1301
1302        '''
1303        print('... disaggregation of precipitation with new method.')
1304
1305        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1306
1307        # initialize new numpy arrays for disaggregated fields
1308        if maxnum:
1309            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1310            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1311        else:
1312            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1313            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1314
1315        # do the disaggregation, but neglect the last value of the
1316        # original time series. This one corresponds for example to
1317        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1318        if maxnum:
1319            for inum in range(maxnum):
1320                for ix in range(ni*nj):
1321                    lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1]
1322                    cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1]
1323        else:
1324            for ix in range(ni*nj):
1325                lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[0,ix,:])[:-1]
1326                cp_new_np[ix,:] = disaggregation.IA3(cp_np[0,ix,:])[:-1]
1327
1328        # write to grib files (full/orig times to flux file and inbetween
1329        # times into seperate end files)
1330        print('... write disaggregated precipitation to files.')
1331
1332        if maxnum:
1333            # remember the index of the number values
1334            index_number = index_keys.index('number')
1335            # empty set to save unique ensemble numbers which were already processed
1336            ens_numbers = set()
1337            # index for the ensemble number
1338            inumb = 0
1339        else:
1340            inumb = 0
1341
1342        # index variable of disaggregated fields
1343        it = 0
1344
1345        # "product" genereates each possible combination between the
1346        # values of the index keys
1347        for prod in product(*index_vals):
1348            # e.g. prod = ('20170505', '0', '12')
1349            #             ( date     ,time, step)
1350            # or   prod = ('0'   , '20170505', '0', '12')
1351            #             (number, date      ,time, step)
1352
1353            cdate = prod[index_keys.index('date')]
1354            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1355            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1356
1357            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1358            date += timedelta(hours=int(cstep))
1359
1360            start_period, end_period = generate_retrieval_period_boundary(c)
1361            # skip all temporary times
1362            # which are outside the retrieval period
1363            if date < start_period or \
1364               date > end_period:
1365                continue
1366
1367            # the whole process has to be done for each seperate ensemble member
1368            # therefore, for each new ensemble member we delete old flux values
1369            # and start collecting flux data from the beginning time step
1370            if maxnum and prod[index_number] not in ens_numbers:
1371                ens_numbers.add(prod[index_number])
1372                inumb = int(prod[index_number])
1373                it = 0
1374
1375            # if necessary, add ensemble member number to filename suffix
1376            # otherwise, add empty string
1377            if maxnum:
1378                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1379            else:
1380                numbersuffix = ''
1381
1382            # per original time stamp: write original time step and
1383            # the two newly generated sub time steps
1384            if c.purefc:
1385                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1386            else:
1387                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1388
1389            # write original time step to flux file as usual
1390            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1391            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1392                              wherekeynames=['paramId'], wherekeyvalues=[142],
1393                              keynames=['perturbationNumber','date','time','stepRange','values'],
1394                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1395                                         date.hour*100, 0, lsp_new_np[inumb,:,it]],
1396                             )
1397            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1398                              wherekeynames=['paramId'], wherekeyvalues=[143],
1399                              keynames=['perturbationNumber','date','time','stepRange','values'],
1400                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1401                                         date.hour*100, 0, cp_new_np[inumb,:,it]]
1402                             )
1403
1404            # rr for first subgrid point is identified by step = 1
1405            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1406                              wherekeynames=['paramId'], wherekeyvalues=[142],
1407                              keynames=['perturbationNumber','date','time','stepRange','values'],
1408                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1409                                         date.hour*100, '1', lsp_new_np[inumb,:,it+1]]
1410                              )
1411            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1412                              wherekeynames=['paramId'], wherekeyvalues=[143],
1413                              keynames=['perturbationNumber','date','time','stepRange','values'],
1414                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1415                                         date.hour*100, '1', cp_new_np[inumb,:,it+1]]
1416                              )
1417
1418            # rr for second subgrid point is identified by step = 2
1419            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1420                              wherekeynames=['paramId'], wherekeyvalues=[142],
1421                              keynames=['perturbationNumber','date','time','stepRange','values'],
1422                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1423                                         date.hour*100, '2', lsp_new_np[inumb,:,it+2]]
1424                              )
1425            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1426                              wherekeynames=['paramId'], wherekeyvalues=[143],
1427                              keynames=['perturbationNumber','date','time','stepRange','values'],
1428                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1429                                         date.hour*100, '2', cp_new_np[inumb,:,it+2]]
1430                              )
1431
1432            it = it + 3 # jump to next original time step in rr fields
1433        return
1434
1435    def _create_rr_grib_dummy(self, ifile, inputdir):
1436        '''Creates a grib file with a dummy message for the two precipitation
1437        types lsp and cp each.
1438
1439        Parameters
1440        ----------
1441        ifile : str
1442            Filename of the input file to read the grib messages from.
1443
1444        inputdir : str, optional
1445            Path to the directory where the retrieved data is stored.
1446
1447        Return
1448        ------
1449
1450        '''
1451
1452        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1453
1454        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1455                      keyvalues=[142], filemode='w')
1456
1457        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1458                      keyvalues=[143], filemode='a')
1459
1460        return
1461
1462    def create(self, inputfiles, c):
1463        '''An index file will be created which depends on the combination
1464        of "date", "time" and "stepRange" values. This is used to iterate
1465        over all messages in each grib file which were passed through the
1466        parameter "inputfiles" to seperate specific parameters into fort.*
1467        files. Afterwards the FORTRAN program is called to convert
1468        the data fields all to the same grid and put them in one file
1469        per unique time step (combination of "date", "time" and
1470        "stepRange").
1471
1472        Note
1473        ----
1474        This method is based on the ECMWF example index.py
1475        https://software.ecmwf.int/wiki/display/GRIB/index.py
1476
1477        Parameters
1478        ----------
1479        inputfiles : UioFiles
1480            Contains a list of files.
1481
1482        c : ControlFile
1483            Contains all the parameters of CONTROL file and
1484            command line.
1485
1486        Return
1487        ------
1488
1489        '''
1490
1491        # generate start and end timestamp of the retrieval period
1492        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1493        start_period = start_period + timedelta(hours=int(c.step[0]))
1494        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1495        end_period = end_period + timedelta(hours=int(c.step[-1]))
1496
1497        if c.wrf:
1498            table128 = init128(_config.PATH_GRIBTABLE)
1499            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1500                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1501                                  table128)
1502
1503        # these numbers are indices for the temporary files "fort.xx"
1504        # which are used to seperate the grib fields to,
1505        # for the Fortran program input
1506        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1507        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1508        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1509                 '17':None, '18':None, '19':None, '21':None, '22':None}
1510
1511        iid = None
1512        index_vals = None
1513
1514        # get the values of the keys which are used for distinct access
1515        # of grib messages via product
1516        if '/' in self.number:
1517            # more than one ensemble member is selected
1518            index_keys = ["number", "date", "time", "step"]
1519        else:
1520            index_keys = ["date", "time", "step"]
1521        iid, index_vals = self._mk_index_values(c.inputdir,
1522                                                inputfiles,
1523                                                index_keys)
1524        # index_vals looks like e.g.:
1525        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1526        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1527        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1528
1529        # "product" genereates each possible combination between the
1530        # values of the index keys
1531        for prod in product(*index_vals):
1532            # e.g. prod = ('20170505', '0', '12')
1533            #             (  date    ,time, step)
1534
1535            print('current product: ', prod)
1536
1537            for i in range(len(index_keys)):
1538                codes_index_select(iid, index_keys[i], prod[i])
1539
1540            # get first id from current product
1541            gid = codes_new_from_index(iid)
1542
1543            # if there is no data for this specific time combination / product
1544            # skip the rest of the for loop and start with next timestep/product
1545            if not gid:
1546                continue
1547#============================================================================================
1548            # remove old fort.* files and open new ones
1549            # they are just valid for a single product
1550            for k, f in fdict.items():
1551                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1552                silent_remove(fortfile)
1553                fdict[k] = open(fortfile, 'w')
1554#============================================================================================
1555            # create correct timestamp from the three time informations
1556            cdate = str(codes_get(gid, 'date'))
1557            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1558            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1559            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1560            timestamp += timedelta(hours=int(cstep))
1561            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1562
1563            # skip all temporary times
1564            # which are outside the retrieval period
1565            if timestamp < start_period or \
1566               timestamp > end_period:
1567                continue
1568
1569            # if the timestamp is out of basetime start/end date period,
1570            # skip this specific product
1571            if c.basetime is not None:
1572                time_delta = timedelta(hours=12-int(c.dtime))
1573                start_time = datetime.strptime(c.end_date + str(c.basetime),
1574                                                '%Y%m%d%H') - time_delta
1575                end_time = datetime.strptime(c.end_date + str(c.basetime),
1576                                             '%Y%m%d%H')
1577                if timestamp < start_time or timestamp > end_time:
1578                    continue
1579
1580            if c.wrf:
1581                if 'olddate' not in locals() or cdate != olddate:
1582                    fwrf = open(os.path.join(c.outputdir,
1583                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1584                    olddate = cdate[:]
1585#============================================================================================
1586            # savedfields remembers which fields were already used.
1587            savedfields = []
1588            # sum of cloud liquid and ice water content
1589            scwc = None
1590            while 1:
1591                if not gid:
1592                    break
1593                paramId = codes_get(gid, 'paramId')
1594                gridtype = codes_get(gid, 'gridType')
1595                levtype = codes_get(gid, 'typeOfLevel')
1596                if paramId == 77: # ETADOT
1597                    codes_write(gid, fdict['21'])
1598                elif paramId == 130: # T
1599                    codes_write(gid, fdict['11'])
1600                elif paramId == 131 or paramId == 132: # U, V wind component
1601                    codes_write(gid, fdict['10'])
1602                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1603                    codes_write(gid, fdict['17'])
1604                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1605                    codes_write(gid, fdict['18'])
1606                elif paramId == 135: # W
1607                    codes_write(gid, fdict['19'])
1608                elif paramId == 152: # LNSP
1609                    codes_write(gid, fdict['12'])
1610                elif paramId == 155 and gridtype == 'sh': # D
1611                    codes_write(gid, fdict['13'])
1612                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1613                    # sum cloud liquid water and ice
1614                    if scwc is None:
1615                        scwc = codes_get_values(gid)
1616                    else:
1617                        scwc += codes_get_values(gid)
1618                        codes_set_values(gid, scwc)
1619                        codes_set(gid, 'paramId', 201031)
1620                        codes_write(gid, fdict['22'])
1621                        scwc = None
1622                elif c.wrf and paramId in [129, 138, 155] and \
1623                      levtype == 'hybrid': # Z, VO, D
1624                    # do not do anything right now
1625                    # these are specific parameter for WRF
1626                    pass
1627                else:
1628                    if paramId not in savedfields:
1629                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1630                        # and all ADDPAR parameter
1631                        codes_write(gid, fdict['16'])
1632                        savedfields.append(paramId)
1633                    else:
1634                        print('duplicate ' + str(paramId) + ' not written')
1635
1636                try:
1637                    if c.wrf:
1638                        # model layer
1639                        if levtype == 'hybrid' and \
1640                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1641                            codes_write(gid, fwrf)
1642                        # sfc layer
1643                        elif paramId in wrfpars:
1644                            codes_write(gid, fwrf)
1645                except AttributeError:
1646                    pass
1647
1648                codes_release(gid)
1649                gid = codes_new_from_index(iid)
1650#============================================================================================
1651            for f in fdict.values():
1652                f.close()
1653#============================================================================================
1654            # call for Fortran program to convert e.g. reduced_gg grids to
1655            # regular_ll and calculate detadot/dp
1656            pwd = os.getcwd()
1657            os.chdir(c.inputdir)
1658            if os.stat('fort.21').st_size == 0 and c.eta:
1659                print('Parameter 77 (etadot) is missing, most likely it is '
1660                      'not available for this type or date / time\n')
1661                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1662                my_error('fort.21 is empty while parameter eta '
1663                         'is set to 1 in CONTROL file')
1664# ============================================================================================
1665            # write out all output to log file before starting fortran programm
1666            sys.stdout.flush()
1667
1668            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1669            execute_subprocess([os.path.join(c.exedir,
1670                                _config.FORTRAN_EXECUTABLE)],
1671                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1672
1673            os.chdir(pwd)
1674# ============================================================================================
1675            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1676            if c.purefc:
1677                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1678            else:
1679                suffix = cdate_hour[2:10]
1680
1681            # if necessary, add ensemble member number to filename suffix
1682            if 'number' in index_keys:
1683                index_number = index_keys.index('number')
1684                if len(index_vals[index_number]) > 1:
1685                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1686
1687            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1688            print("outputfile = " + fnout)
1689            # collect for final processing
1690            self.outputfilelist.append(os.path.basename(fnout))
1691            # # get additional precipitation subgrid data if available
1692            # if c.rrint:
1693                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1694                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
1695# ============================================================================================
1696            # create outputfile and copy all data from intermediate files
1697            # to the outputfile (final GRIB input files for FLEXPART)
1698            orolsm = os.path.basename(glob.glob(c.inputdir +
1699                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1700            fluxfile = 'flux' + cdate[0:2] + suffix
1701            if not c.cwc:
1702                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1703            else:
1704                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1705
1706            with open(fnout, 'wb') as fout:
1707                for f in flist:
1708                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1709                                       fout)
1710
1711            if c.omega:
1712                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1713                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1714                                            'rb'), fout)
1715# ============================================================================================
1716        if c.wrf:
1717            fwrf.close()
1718
1719        codes_index_release(iid)
1720
1721        return
1722
1723
1724    def calc_extra_elda(self, path, prefix):
1725        ''' Calculates extra ensemble members for ELDA - Stream.
1726
1727        This is a specific feature which doubles the number of ensemble members
1728        for the ELDA Stream.
1729
1730        Parameters
1731        ----------
1732        path : str
1733            Path to the output files.
1734
1735        prefix : str
1736            The prefix of the output filenames as defined in Control file.
1737
1738        Return
1739        ------
1740
1741        '''
1742        # max number
1743        maxnum = int(self.number.split('/')[-1])
1744
1745        # get a list of all prepared output files with control forecast (CF)
1746        CF_filelist = UioFiles(path, prefix + '*.N000')
1747
1748        for cffile in CF_filelist.files:
1749            with open(cffile, 'rb') as f:
1750                cfvalues=[]
1751                while True:
1752                    fid = codes_grib_new_from_file(f)
1753                    if fid is None:
1754                        break
1755                    cfvalues.append(codes_get_array(fid, 'values'))
1756                    codes_release(fid)
1757
1758            filename = cffile.split('N000')[0]
1759            for i in range(1, maxnum + 1):
1760
1761                # read an ensemble member
1762                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1763                # create file for newly calculated ensemble member
1764                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1765                # number of message in grib file
1766                j = 0
1767                while True:
1768                    gid = codes_grib_new_from_file(g)
1769                    if gid is None:
1770                        break
1771                    values = codes_get_array(gid, 'values')
1772                    # generate a new ensemble member by subtracting
1773                    # 2 * ( current time step value - last time step value )
1774                    codes_set_array(gid, 'values',
1775                                    values-2*(values-cfvalues[j]))
1776                    codes_set(gid, 'number', i+maxnum)
1777                    codes_write(gid, h)
1778                    codes_release(gid)
1779                    j += 1
1780
1781                g.close()
1782                h.close()
1783                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1784                self.outputfilelist.append(
1785                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1786
1787        return
1788
1789
1790    def process_output(self, c):
1791        '''Postprocessing of FLEXPART input files.
1792
1793        The grib files are postprocessed depending on the selection in
1794        CONTROL file. The resulting files are moved to the output
1795        directory if its not equal to the input directory.
1796        The following modifications might be done if
1797        properly switched in CONTROL file:
1798        GRIB2 - Conversion to GRIB2
1799        ECTRANS - Transfer of files to gateway server
1800        ECSTORAGE - Storage at ECMWF server
1801
1802        Parameters
1803        ----------
1804        c : ControlFile
1805            Contains all the parameters of CONTROL file and
1806            command line.
1807
1808        Return
1809        ------
1810
1811        '''
1812
1813        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1814
1815        if _config.FLAG_ON_ECMWFSERVER:
1816            print('ecstorage: {}\n ecfsdir: {}\n'.
1817                  format(c.ecstorage, c.ecfsdir))
1818            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1819                  .format(c.ectrans, c.gateway, c.destination))
1820
1821        print('Output filelist: ')
1822        print(sorted(self.outputfilelist))
1823
1824        for ofile in self.outputfilelist:
1825            ofile = os.path.join(self.inputdir, ofile)
1826
1827            if c.format.lower() == 'grib2':
1828                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1829                                    'productDefinitionTemplateNumber=8',
1830                                    ofile, ofile + '_2'],
1831                                   error_msg='GRIB2 CONVERSION FAILED!')
1832
1833                execute_subprocess(['mv', ofile + '_2', ofile],
1834                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1835                                   'FILES FAILED!')
1836
1837            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1838                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1839                                    c.gateway, '-remote', c.destination,
1840                                    '-source', ofile],
1841                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1842
1843            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1844                execute_subprocess(['ecp', '-o', ofile,
1845                                    os.path.expandvars(c.ecfsdir)],
1846                                   error_msg='COPY OF FILES TO ECSTORAGE '
1847                                   'AREA FAILED!')
1848
1849            if c.outputdir != c.inputdir:
1850                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1851                                    c.outputdir],
1852                                   error_msg='RELOCATION OF OUTPUT FILES '
1853                                   'TO OUTPUTDIR FAILED!')
1854
1855        return
1856
1857
1858    def prepare_fp_files(self, c):
1859        '''Conversion of GRIB files to FLEXPART binary format.
1860
1861        Parameters
1862        ----------
1863        c : ControlFile
1864            Contains all the parameters of CONTROL file and
1865            command line.
1866
1867        Return
1868        ------
1869
1870        '''
1871        # generate AVAILABLE file
1872        # Example of AVAILABLE file data:
1873        # 20131107 000000      EN13110700              ON DISC
1874        clist = []
1875        for ofile in self.outputfilelist:
1876            fname = ofile.split('/')
1877            if '.' in fname[-1]:
1878                l = fname[-1].split('.')
1879                timestamp = datetime.strptime(l[0][-6:] + l[1],
1880                                              '%y%m%d%H')
1881                timestamp += timedelta(hours=int(l[2]))
1882                cdate = datetime.strftime(timestamp, '%Y%m%d')
1883                chms = datetime.strftime(timestamp, '%H%M%S')
1884            else:
1885                cdate = '20' + fname[-1][-8:-2]
1886                chms = fname[-1][-2:] + '0000'
1887            clist.append(cdate + ' ' + chms + ' '*6 +
1888                         fname[-1] + ' '*14 + 'ON DISC')
1889        clist.sort()
1890        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1891            f.write('\n'.join(clist) + '\n')
1892
1893        # generate pathnames file
1894        pwd = os.path.abspath(c.outputdir)
1895        with open(pwd + '/pathnames', 'w') as f:
1896            f.write(pwd + '/Options/\n')
1897            f.write(pwd + '/\n')
1898            f.write(pwd + '/\n')
1899            f.write(pwd + '/AVAILABLE\n')
1900            f.write(' = == = == = == = == = == ==  = \n')
1901
1902        # create Options dir if necessary
1903        if not os.path.exists(pwd + '/Options'):
1904            make_dir(pwd+'/Options')
1905
1906        # read template COMMAND file
1907        with open(os.path.expandvars(os.path.expanduser(
1908            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1909            lflist = f.read().split('\n')
1910
1911        # find index of list where to put in the
1912        # date and time information
1913        # usually after the LDIRECT parameter
1914        i = 0
1915        for l in lflist:
1916            if 'LDIRECT' in l.upper():
1917                break
1918            i += 1
1919
1920        # insert the date and time information of run start and end
1921        # into the list of lines of COMMAND file
1922        lflist = lflist[:i+1] + \
1923                 [clist[0][:16], clist[-1][:16]] + \
1924                 lflist[i+3:]
1925
1926        # write the new COMMAND file
1927        with open(pwd + '/Options/COMMAND', 'w') as g:
1928            g.write('\n'.join(lflist) + '\n')
1929
1930        # change to outputdir and start the grib2flexpart run
1931        # afterwards switch back to the working dir
1932        os.chdir(c.outputdir)
1933        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1934         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1935        execute_subprocess(cmd)
1936        os.chdir(pwd)
1937
1938        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG