source: flex_extract.git/source/python/classes/EcFlexpart.py @ 79729d5

dev
Last change on this file since 79729d5 was 79729d5, checked in by Anne Philipp <anne.philipp@…>, 3 months ago

switched from python2 to python3

  • Property mode set to 100644
File size: 73.5 KB
Line 
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get, codes_get_array,
70                     codes_set_array, codes_grib_new_from_file)
71
72# software specific classes and modules from flex_extract
73sys.path.append('../')
74import _config
75from .GribUtil import GribUtil
76from mods.tools import (init128, to_param_id, silent_remove, product,
77                        my_error, make_dir, get_informations, get_dimensions,
78                        execute_subprocess, to_param_id_with_tablenumber)
79from .MarsRetrieval import MarsRetrieval
80from .UioFiles import UioFiles
81import mods.disaggregation as disaggregation
82
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = list(range(len(ftime)))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        self.types[str(self.acctype)] = {'times': str(self.acctime),
324                                         'steps': '{}/to/{}/by/{}'.format(
325                                             self.dtime,
326                                             self.accmaxstep,
327                                             self.dtime)}
328        return
329
330    def _create_params(self, gauss, eta, omega, cwc, wrf):
331        '''Define the specific parameter settings for retrievment.
332
333        The different parameters need specific grid types and level types
334        for retrievement. We might get following combination of types
335        (depending on selection and availability):
336        (These are short cuts for the grib file names (leading sequence)
337        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
338        where:
339            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
340            ML = Model Level, SL = Surface Level
341
342        For each of this combination there is a list of parameter names,
343        the level type, the level list and the grid resolution.
344
345        There are different scenarios for data extraction from MARS:
346        1) Retrieval of etadot
347           eta=1, gauss=0, omega=0
348        2) Calculation of etadot from divergence
349           eta=0, gauss=1, omega=0
350        3) Calculation of etadot from omega (for makes sense for debugging)
351           eta=0, gauss=0, omega=1
352        4) Retrieval and Calculation of etadot (only for debugging)
353           eta=1, gauss=1, omega=0
354        5) Download also specific model and surface level data for FLEXPART-WRF
355
356        Parameters:
357        -----------
358        gauss : int
359            Gaussian grid is retrieved.
360
361        eta : int
362            Etadot parameter will be directly retrieved.
363
364        omega : int
365            The omega paramterwill be retrieved.
366
367        cwc : int
368            The cloud liquid and ice water content will be retrieved.
369
370        wrf : int
371            Additional model level and surface level data will be retrieved for
372            WRF/FLEXPART-WRF simulations.
373
374        Return
375        ------
376
377        '''
378        # SURFACE FIELDS
379        #-----------------------------------------------------------------------
380        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
381        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
382                                 'SFC', '1', self.grid]
383        if self.addpar:
384            self.params['OG__SL'][0] += self.addpar
385
386        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
387            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
388                                            'SFC', '1', self.grid]
389        else:
390            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
391                                            'SFC', '1', self.grid]
392
393        # MODEL LEVEL FIELDS
394        #-----------------------------------------------------------------------
395        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
396
397        if not gauss and eta:
398            self.params['OG__ML'][0] += '/U/V/ETADOT'
399        elif gauss and not eta:
400            self.params['GG__SL'] = ['Q', 'ML', '1',
401                                     '{}'.format((int(self.resol) + 1) // 2)]
402            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
403        elif not gauss and not eta:
404            self.params['OG__ML'][0] += '/U/V'
405        else:  # GAUSS and ETA
406            print('Warning: Collecting etadot and parameters for gaussian grid '
407                           'is a very costly parameter combination, '
408                           'use this combination only for debugging!')
409            self.params['GG__SL'] = ['Q', 'ML', '1',
410                                     '{}'.format((int(self.resol) + 1) // 2)]
411            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
412                                     '{}'.format((int(self.resol) + 1) // 2)]
413
414        if omega:
415            self.params['OG__ML'][0] += '/W'
416
417        if cwc:
418            self.params['OG__ML'][0] += '/CLWC/CIWC'
419
420        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
421        # -----------------------------------------------------------------------
422        if wrf:
423            self.params['OG__ML'][0] += '/Z/VO'
424            if '/D' not in self.params['OG__ML'][0]:
425                self.params['OG__ML'][0] += '/D'
426
427            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
428                       'SWVL1','SWVL2','SWVL3','SWVL4']
429            for par in wrf_sfc:
430                if par not in self.params['OG__SL'][0]:
431                    self.params['OG__SL'][0] += '/' + par
432
433        return
434
435
436    def _create_params_fluxes(self):
437        '''Define the parameter setting for flux data.
438
439        Flux data are accumulated fields in time and are stored on the
440        surface level. The leading short cut name for the grib files is:
441        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
442        acc for Accumulated Grid.
443        The params dictionary stores a list of parameter names, the level type,
444        the level list and the grid resolution.
445
446        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
447
448        Parameters:
449        -----------
450
451        Return
452        ------
453
454        '''
455        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
456                                    'SFC', '1', self.grid]
457        return
458
459
460    def _mk_targetname(self, ftype, param, date):
461        '''Creates the filename for the requested grib data to be stored in.
462        This name is passed as the "target" parameter in the request.
463
464        Parameters
465        ----------
466        ftype : str
467            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
468
469        param : str
470            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
471            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
472
473        date : str
474            The date period of the grib data to be stored in this file.
475
476        Return
477        ------
478        targetname : str
479            The target filename for the grib data.
480        '''
481        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
482                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
483
484        return targetname
485
486
487    def _start_retrievement(self, request, par_dict):
488        '''Creates the Mars Retrieval and prints or submits the request
489        depending on the status of the request variable.
490
491        Parameters
492        ----------
493        request : int
494            Selects the mode of retrieval.
495            0: Retrieves the data from ECMWF.
496            1: Prints the mars requests to an output file.
497            2: Retrieves the data and prints the mars request.
498
499        par_dict : dictionary
500            Contains all parameter which have to be set for creating the
501            Mars Retrievals. The parameter are:
502            marsclass, dataset, stream, type, levtype, levelist, resol,
503            gaussian, accuracy, grid, target, area, date, time, number,
504            step, expver, param
505
506        Return
507        ------
508
509        '''
510        # increase number of mars requests
511        self.mreq_count += 1
512
513        MR = MarsRetrieval(self.server,
514                           self.public,
515                           marsclass=par_dict['marsclass'],
516                           dataset=par_dict['dataset'],
517                           stream=par_dict['stream'],
518                           type=par_dict['type'],
519                           levtype=par_dict['levtype'],
520                           levelist=par_dict['levelist'],
521                           resol=par_dict['resol'],
522                           gaussian=par_dict['gaussian'],
523                           accuracy=par_dict['accuracy'],
524                           grid=par_dict['grid'],
525                           target=par_dict['target'],
526                           area=par_dict['area'],
527                           date=par_dict['date'],
528                           time=par_dict['time'],
529                           number=par_dict['number'],
530                           step=par_dict['step'],
531                           expver=par_dict['expver'],
532                           param=par_dict['param'])
533
534        if request == 0:
535            MR.display_info()
536            MR.data_retrieve()
537        elif request == 1:
538            MR.print_infodata_csv(self.inputdir, self.mreq_count)
539        elif request == 2:
540            MR.print_infodata_csv(self.inputdir, self.mreq_count)
541            MR.display_info()
542            MR.data_retrieve()
543        else:
544            print('Failure')
545
546        return
547
548
549    def _mk_index_values(self, inputdir, inputfiles, keys):
550        '''Creates an index file for a set of grib parameter keys.
551        The values from the index keys are returned in a list.
552
553        Parameters
554        ----------
555        keys : dictionary
556            List of parameter names which serves as index.
557
558        inputfiles : UioFiles
559            Contains a list of files.
560
561        Return
562        ------
563        iid : codes_index
564            This is a grib specific index structure to access
565            messages in a file.
566
567        index_vals : list of list  of str
568            Contains the values from the keys used for a distinct selection
569            of grib messages in processing  the grib files.
570            Content looks like e.g.:
571            index_vals[0]: ('20171106', '20171107', '20171108') ; date
572            index_vals[1]: ('0', '1200', '1800', '600') ; time
573            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
574        '''
575        iid = None
576        index_keys = keys
577
578        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
579        silent_remove(indexfile)
580        grib = GribUtil(inputfiles.files)
581        # creates new index file
582        iid = grib.index(index_keys=index_keys, index_file=indexfile)
583
584        # read the values of index keys
585        index_vals = []
586        for key in index_keys:
587            key_vals = codes_index_get(iid, key)
588            # have to sort the key values for correct disaggregation,
589            # therefore convert to int first
590            key_vals = [int(k) for k in key_vals]
591            key_vals.sort()
592            key_vals = [str(k) for k in key_vals]
593            index_vals.append(key_vals)
594            # index_vals looks for example like:
595            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
596            # index_vals[1]: ('0', '1200') ; time
597            # index_vals[2]: (3', '6', '9', '12') ; stepRange
598
599        return iid, index_vals
600
601
602    def retrieve(self, server, dates, public, request, inputdir='.'):
603        '''Finalizing the retrieval information by setting final details
604        depending on grid type.
605        Prepares MARS retrievals per grid type and submits them.
606
607        Parameters
608        ----------
609        server : ECMWFService or ECMWFDataServer
610            The connection to the ECMWF server. This is different
611            for member state users which have full access and non
612            member state users which have only access to the public
613            data sets. The decision is made from command line argument
614            "public"; for public access its True (ECMWFDataServer)
615            for member state users its False (ECMWFService)
616
617        dates : str
618            Contains start and end date of the retrieval in the format
619            "YYYYMMDD/to/YYYYMMDD"
620
621        request : int
622            Selects the mode of retrieval.
623            0: Retrieves the data from ECMWF.
624            1: Prints the mars requests to an output file.
625            2: Retrieves the data and prints the mars request.
626
627        inputdir : str, optional
628            Path to the directory where the retrieved data is about
629            to be stored. The default is the current directory ('.').
630
631        Return
632        ------
633
634        '''
635        self.dates = dates
636        self.server = server
637        self.public = public
638        self.inputdir = inputdir
639        oro = False
640
641        # define times with datetime module
642        t12h = timedelta(hours=12)
643        t24h = timedelta(hours=24)
644
645        # dictionary which contains all parameter for the mars request,
646        # entries with a "None" will change in different requests and will
647        # therefore be set in each request seperately
648        retr_param_dict = {'marsclass':self.marsclass,
649                           'dataset':self.dataset,
650                           'stream':None,
651                           'type':None,
652                           'levtype':None,
653                           'levelist':None,
654                           'resol':self.resol,
655                           'gaussian':None,
656                           'accuracy':self.accuracy,
657                           'grid':None,
658                           'target':None,
659                           'area':None,
660                           'date':None,
661                           'time':None,
662                           'number':self.number,
663                           'step':None,
664                           'expver':self.expver,
665                           'param':None}
666
667        for ftype in self.types:
668            # ftype contains field types such as
669            #     [AN, FC, PF, CV]
670            for pk, pv in self.params.items():
671                # pk contains one of these keys of params
672                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
673                #      OG_OROLSM_SL, OG_acc_SL]
674                # pv contains all of the items of the belonging key
675                #     [param, levtype, levelist, grid]
676                if isinstance(pv, str):
677                    continue
678                retr_param_dict['type'] = ftype
679                retr_param_dict['time'] = self.types[ftype]['times']
680                retr_param_dict['step'] = self.types[ftype]['steps']
681                retr_param_dict['date'] = self.dates
682                retr_param_dict['stream'] = self.stream
683                retr_param_dict['target'] = \
684                    self._mk_targetname(ftype,
685                                        pk,
686                                        retr_param_dict['date'].split('/')[0])
687                table128 = init128(_config.PATH_GRIBTABLE)
688                ids = to_param_id_with_tablenumber(pv[0], table128)
689                retr_param_dict['param'] = ids
690                retr_param_dict['levtype'] = pv[1]
691                retr_param_dict['levelist'] = pv[2]
692                retr_param_dict['grid'] = pv[3]
693                retr_param_dict['area'] = self.area
694                retr_param_dict['gaussian'] = self.gaussian
695
696                if pk == 'OG_OROLSM__SL' and not oro:
697                    oro = True
698                    # in CERA20C (class EP) there is no stream "OPER"!
699                    if self.marsclass.upper() != 'EP':
700                        retr_param_dict['stream'] = 'OPER'
701                    retr_param_dict['type'] = 'AN'
702                    retr_param_dict['time'] = '00'
703                    retr_param_dict['step'] = '000'
704                    retr_param_dict['date'] = self.dates.split('/')[0]
705                    retr_param_dict['target'] = self._mk_targetname('',
706                                            pk, retr_param_dict['date'])
707                elif pk == 'OG_OROLSM__SL' and oro:
708                    continue
709                if pk == 'GG__SL' and pv[0] == 'Q':
710                    retr_param_dict['area'] = ""
711                    retr_param_dict['gaussian'] = 'reduced'
712                if ftype.upper() == 'FC' and \
713                    'acc' not in retr_param_dict['target']:
714                    if (int(retr_param_dict['time'][0]) +
715                        int(retr_param_dict['step'][0])) > 23:
716                        dates = retr_param_dict['date'].split('/')
717                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
718                        sdate = sdate - timedelta(days=1)
719                        retr_param_dict['date'] = '/'.join(
720                            [sdate.strftime("%Y%m%d")] +
721                            retr_param_dict['date'][1:])
722
723                        print('CHANGED FC start date to ' +
724                              sdate.strftime("%Y%m%d") +
725                              ' to accomodate TIME=' +
726                              retr_param_dict['time'][0] +
727                              ', STEP=' +
728                              retr_param_dict['time'][0])
729
730    # ------  on demand path  --------------------------------------------------
731                if self.basetime is None:
732                    # ******* start retrievement
733                    self._start_retrievement(request, retr_param_dict)
734    # ------  operational path  ------------------------------------------------
735                else:
736                    # check if mars job requests fields beyond basetime.
737                    # if yes eliminate those fields since they may not
738                    # be accessible with user's credentials
739
740                    enddate = retr_param_dict['date'].split('/')[-1]
741                    elimit = datetime.strptime(enddate + str(self.basetime),
742                                               '%Y%m%d%H')
743
744                    if self.basetime == 12:
745                        # --------------  flux data ----------------------------
746                        if 'acc' in pk:
747                            startdate = retr_param_dict['date'].split('/')[0]
748                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
749                            retr_param_dict['date'] = '/'.join([startdate,
750                                                                'to',
751                                                                enddate])
752
753                            # ******* start retrievement
754                            self._start_retrievement(request, retr_param_dict)
755
756                            retr_param_dict['date'] = \
757                                datetime.strftime(elimit - t12h, '%Y%m%d')
758                            retr_param_dict['time'] = '00'
759                            retr_param_dict['target'] = \
760                                self._mk_targetname(ftype, pk,
761                                                    retr_param_dict['date'])
762
763                            # ******* start retrievement
764                            self._start_retrievement(request, retr_param_dict)
765
766                        # --------------  non flux data ------------------------
767                        else:
768                            # ******* start retrievement
769                            self._start_retrievement(request, retr_param_dict)
770
771                    elif self.basetime == 0:
772                        retr_param_dict['date'] = \
773                            datetime.strftime(elimit - t24h, '%Y%m%d')
774
775                        timesave = ''.join(retr_param_dict['time'])
776
777                        if ('/' in retr_param_dict['time'] and
778                            pk != 'OG_OROLSM__SL' and
779                            'acc' not in pk ) :
780                            times = retr_param_dict['time'].split('/')
781                            steps = retr_param_dict['step'].split('/')
782
783                            while int(times[0]) + int(steps[0]) <= 12:
784                                times = times[1:]
785                                if len(times) > 1:
786                                    retr_param_dict['time'] = '/'.join(times)
787                                else:
788                                    retr_param_dict['time'] = times[0]
789
790                        if (pk != 'OG_OROLSM__SL' and
791                            int(retr_param_dict['step'].split('/')[0]) == 0 and
792                            int(timesave.split('/')[0]) == 0):
793
794                            retr_param_dict['date'] = \
795                                datetime.strftime(elimit, '%Y%m%d')
796                            retr_param_dict['time'] = '00'
797                            retr_param_dict['step'] = '000'
798                            retr_param_dict['target'] = \
799                                self._mk_targetname(ftype, pk,
800                                                    retr_param_dict['date'])
801
802                        # ******* start retrievement
803                        self._start_retrievement(request, retr_param_dict)
804                    else:
805                        raise ValueError('ERROR: Basetime has an invalid value '
806                                                 '-> {}'.format(str(basetime)))
807
808        if request == 0 or request == 2:
809            print('MARS retrieve done ... ')
810        elif request == 1:
811            print('MARS request printed ...')
812
813        return
814
815
816    def write_namelist(self, c):
817        '''Creates a namelist file in the temporary directory and writes
818        the following values to it: maxl, maxb, mlevel,
819        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
820        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
821
822        Parameters
823        ----------
824        c : ControlFile
825            Contains all the parameters of CONTROL file and
826            command line.
827
828        filename : str
829                Name of the namelist file.
830
831        Return
832        ------
833
834        '''
835
836        from genshi.template.text import NewTextTemplate
837        from genshi.template import  TemplateLoader
838        from genshi.template.eval import  UndefinedError
839
840        try:
841            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
842            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
843                                            cls=NewTextTemplate)
844
845            self.inputdir = c.inputdir
846            area = np.asarray(self.area.split('/')).astype(float)
847            grid = np.asarray(self.grid.split('/')).astype(float)
848
849            if area[1] > area[3]:
850                area[1] -= 360
851            maxl = int((area[3] - area[1]) / grid[1]) + 1
852            maxb = int((area[0] - area[2]) / grid[0]) + 1
853
854            stream = namelist_template.generate(
855                maxl = str(maxl),
856                maxb = str(maxb),
857                mlevel = str(self.level),
858                mlevelist = str(self.levelist),
859                mnauf = str(self.resol),
860                metapar = '77',
861                rlo0 = str(area[1]),
862                rlo1 = str(area[3]),
863                rla0 = str(area[2]),
864                rla1 = str(area[0]),
865                momega = str(c.omega),
866                momegadiff = str(c.omegadiff),
867                mgauss = str(c.gauss),
868                msmooth = str(c.smooth),
869                meta = str(c.eta),
870                metadiff = str(c.etadiff),
871                mdpdeta = str(c.dpdeta)
872            )
873        except UndefinedError as e:
874            print('... ERROR ' + str(e))
875
876            sys.exit('\n... error occured while trying to generate namelist ' +
877                     _config.TEMPFILE_NAMELIST)
878        except OSError as e:
879            print('... ERROR CODE: ' + str(e.errno))
880            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
881
882            sys.exit('\n... error occured while trying to generate template ' +
883                     _config.TEMPFILE_NAMELIST)
884
885        try:
886            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
887
888            with open(namelistfile, 'w') as f:
889                f.write(stream.render('text'))
890        except OSError as e:
891            print('... ERROR CODE: ' + str(e.errno))
892            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
893
894            sys.exit('\n... error occured while trying to write ' +
895                     namelistfile)
896
897        return
898
899
900    def deacc_fluxes(self, inputfiles, c):
901        '''De-accumulate and disaggregate flux data.
902
903        Goes through all flux fields in ordered time and de-accumulate
904        the fields. Afterwards the fields are disaggregated in time.
905        Different versions of disaggregation is provided for rainfall
906        data (darain, modified linear) and the surface fluxes and
907        stress data (dapoly, cubic polynomial).
908
909        Parameters
910        ----------
911        inputfiles : UioFiles
912            Contains the list of files that contain flux data.
913
914        c : ControlFile
915            Contains all the parameters of CONTROL file and
916            command line.
917
918        Return
919        ------
920
921        '''
922
923        table128 = init128(_config.PATH_GRIBTABLE)
924        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
925
926        iid = None
927        index_vals = None
928
929        # get the values of the keys which are used for distinct access
930        # of grib messages via product
931        if '/' in self.number:
932            # more than one ensemble member is selected
933            index_keys = ["number", "date", "time", "step"]
934        else:
935            index_keys = ["date", "time", "step"]
936        iid, index_vals = self._mk_index_values(c.inputdir,
937                                                inputfiles,
938                                                index_keys)
939        # index_vals looks like e.g.:
940        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
941        # index_vals[1]: ('0', '600', '1200', '1800') ; time
942        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
943
944        if c.rrint:
945            if not c.purefc:
946                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
947                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
948            else:
949                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
950                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
951                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
952                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
953                end_date = end_date + timedelta(hours=c.maxstep)
954
955            info = get_informations(os.path.join(c.inputdir,
956                                                 inputfiles.files[0]))
957            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
958                                  start_date, end_date)
959            # create numpy array
960            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
961            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
962            it_lsp = 0
963            it_cp = 0
964            date_list = []
965            step_list = []
966
967        # initialize dictionaries to store values
968        orig_vals = {}
969        deac_vals = {}
970        for p in pars:
971            orig_vals[p] = []
972            deac_vals[p] = []
973
974        # "product" genereates each possible combination between the
975        # values of the index keys
976        for prod in product(*index_vals):
977            # e.g. prod = ('20170505', '0', '12')
978            #             (  date    ,time, step)
979
980            print('CURRENT PRODUCT: ', prod)
981
982            for i in range(len(index_keys)):
983                codes_index_select(iid, index_keys[i], prod[i])
984
985            # get first id from current product
986            gid = codes_new_from_index(iid)
987
988            # if there is no data for this specific time combination / product
989            # skip the rest of the for loop and start with next timestep/product
990            if not gid:
991                continue
992
993            # create correct timestamp from the three time informations
994            cdate = str(codes_get(gid, 'date'))
995            time = codes_get(gid, 'time') // 100  # integer
996            step = codes_get(gid, 'step') # integer
997            ctime = '{:0>2}'.format(time)
998            cstep = '{:0>3}'.format(step)
999
1000            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1001            t_dt = t_date + timedelta(hours=step)
1002            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1003            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1004            if c.basetime is not None:
1005                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1006                                              '%Y%m%d%H')
1007            else:
1008                t_enddate = t_date + timedelta(2*int(c.dtime))
1009
1010            # if necessary, add ensemble member number to filename suffix
1011            # otherwise, add empty string
1012            if 'number' in index_keys:
1013                index_number = index_keys.index('number')
1014                if len(index_vals[index_number]) > 1:
1015                    numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1016            else:
1017                numbersuffix = ''
1018
1019            if c.purefc:
1020                fnout = os.path.join(c.inputdir, 'flux' +
1021                                     t_date.strftime('%Y%m%d.%H') +
1022                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1023                                     numbersuffix)
1024                gnout = os.path.join(c.inputdir, 'flux' +
1025                                     t_date.strftime('%Y%m%d.%H') +
1026                                     '.{:0>3}'.format(step-int(c.dtime)) +
1027                                     numbersuffix)
1028                hnout = os.path.join(c.inputdir, 'flux' +
1029                                     t_date.strftime('%Y%m%d.%H') +
1030                                     '.{:0>3}'.format(step) +
1031                                     numbersuffix)
1032            else:
1033                fnout = os.path.join(c.inputdir, 'flux' +
1034                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1035                gnout = os.path.join(c.inputdir, 'flux' +
1036                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1037                hnout = os.path.join(c.inputdir, 'flux' +
1038                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1039
1040            print("outputfile = " + fnout)
1041            f_handle = open(fnout, 'w')
1042            h_handle = open(hnout, 'w')
1043            g_handle = open(gnout, 'w')
1044
1045            # read message for message and store relevant data fields, where
1046            # data keywords are stored in pars
1047            while True:
1048                if not gid:
1049                    break
1050                parId = codes_get(gid, 'paramId') # integer
1051                step = codes_get(gid, 'step') # integer
1052                time = codes_get(gid, 'time') # integer
1053                ni = codes_get(gid, 'Ni') # integer
1054                nj = codes_get(gid, 'Nj') # integer
1055                if parId not in orig_vals.keys():
1056                    # parameter is not a flux, find next one
1057                    continue
1058
1059                # define conversion factor
1060                if parId == 142 or parId == 143:
1061                    fak = 1. / 1000.
1062                else:
1063                    fak = 3600.
1064
1065                # get parameter values and reshape
1066                values = codes_get_values(gid)
1067                values = (np.reshape(values, (nj, ni))).flatten() / fak
1068
1069                # save the original and accumulated values
1070                orig_vals[parId].append(values[:])
1071
1072                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1073                    # no de-accumulation needed
1074                    deac_vals[parId].append(values[:] / int(c.dtime))
1075                else:
1076                    # do de-accumulation
1077                    deac_vals[parId].append(
1078                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1079                         int(c.dtime))
1080
1081                # store precipitation if new disaggregation method is selected
1082                # only the exact days are needed
1083                if c.rrint:
1084                    if start_date <= t_dt <= end_date:
1085                        if not c.purefc:
1086                            if t_dt not in date_list:
1087                                date_list.append(t_dt)
1088                                step_list = [0]
1089                        else:
1090                            if t_date not in date_list:
1091                                date_list.append(t_date)
1092                            if step not in step_list:
1093                                step_list.append(step)
1094                        if c.rrint and parId == 142:
1095                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
1096                            it_lsp += 1
1097                        elif c.rrint and parId == 143:
1098                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
1099                            it_cp += 1
1100
1101                # information printout
1102                print(parId, time, step, len(values), values[0], np.std(values))
1103
1104                # length of deac_vals[parId] corresponds to the
1105                # number of time steps, max. 4 are needed for disaggegration
1106                # with the old and original method
1107                # run over all grib messages and perform
1108                # shifting in time
1109                if len(deac_vals[parId]) >= 3:
1110                    if len(deac_vals[parId]) > 3:
1111                        if not c.rrint and (parId == 142 or parId == 143):
1112                            values = disaggregation.darain(deac_vals[parId])
1113                        else:
1114                            values = disaggregation.dapoly(deac_vals[parId])
1115
1116                        if not (step == c.maxstep and c.purefc \
1117                                or t_dt == t_enddate):
1118                            # remove first time step in list to shift
1119                            # time line
1120                            orig_vals[parId].pop(0)
1121                            deac_vals[parId].pop(0)
1122                    else:
1123                        # if the third time step is read (per parId),
1124                        # write out the first one as a boundary value
1125                        if c.purefc:
1126                            values = deac_vals[parId][1]
1127                        else:
1128                            values = deac_vals[parId][0]
1129
1130                    if not (c.rrint and (parId == 142 or parId == 143)):
1131                        codes_set_values(gid, values)
1132
1133                        if c.purefc:
1134                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1135                        else:
1136                            codes_set(gid, 'stepRange', 0)
1137                            codes_set(gid, 'time', t_m2dt.hour*100)
1138                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1139
1140                        codes_write(gid, f_handle)
1141
1142                        # squeeze out information of last two steps
1143                        # contained in deac_vals[parId]
1144                        # Note that deac_vals[parId][0] has not been popped
1145                        # in this case
1146
1147                        if step == c.maxstep and c.purefc or \
1148                           t_dt == t_enddate:
1149                            # last step
1150                            if c.purefc:
1151                                values = deac_vals[parId][3]
1152                                codes_set_values(gid, values)
1153                                codes_set(gid, 'stepRange', step)
1154                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1155                                codes_write(gid, h_handle)
1156                            else:
1157                                values = deac_vals[parId][3]
1158                                codes_set_values(gid, values)
1159                                codes_set(gid, 'stepRange', 0)
1160                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1161                                codes_set(gid, 'time', truedatetime.hour * 100)
1162                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1163                                codes_write(gid, h_handle)
1164
1165                            if parId == 142 or parId == 143:
1166                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1167                            else:
1168                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1169
1170                            # step before last step
1171                            if c.purefc:
1172                                codes_set(gid, 'stepRange', step-int(c.dtime))
1173                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1174                                codes_set_values(gid, values)
1175                                codes_write(gid, g_handle)
1176                            else:
1177                                codes_set(gid, 'stepRange', 0)
1178                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1179                                codes_set(gid, 'time', truedatetime.hour * 100)
1180                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1181                                codes_set_values(gid, values)
1182                                codes_write(gid, g_handle)
1183
1184                codes_release(gid)
1185
1186                gid = codes_new_from_index(iid)
1187
1188            f_handle.close()
1189            g_handle.close()
1190            h_handle.close()
1191
1192        codes_index_release(iid)
1193
1194        if c.rrint:
1195            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1196
1197            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1198                                 cp_np, date_list, step_list, c)
1199
1200        return
1201
1202    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1203        '''Calculates and writes out the disaggregated precipitation fields.
1204
1205        Disaggregation is done in time and original times are written to the
1206        flux files, while the additional subgrid times are written to
1207        extra files.
1208
1209        Parameters
1210        ----------
1211        ni : int
1212            Amount of zonal grid points.
1213
1214        nj : int
1215            Amount of meridional grid points.
1216
1217        nt : int
1218            Number of time steps.
1219
1220        lsp_np : numpy array of float
1221            The large scale precipitation fields for each time step.
1222            Shape (ni * nj, nt).
1223
1224        cp_np : numpy array of float
1225            The convective precipitation fields for each time step.
1226            Shape (ni * nj, nt).
1227
1228        date_list : list of datetime
1229            The list of dates for which the disaggregation is to be done.
1230
1231        step_list : list of int
1232            The list of steps for a single forecast time.
1233            Only necessary for pure forecasts.
1234
1235        c : ControlFile
1236            Contains all the parameters of CONTROL file and
1237            command line.
1238
1239        Return
1240        ------
1241
1242        '''
1243        print('... disaggregation of precipitation with new method.')
1244        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1245        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1246
1247        # do the disaggregation, but neglect the last value of the
1248        # time series. This one corresponds for example to 24 hour,
1249        # which we don't need.
1250        for ix in range(ni*nj):
1251            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1252            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1253
1254        # write to grib files (full/orig times to flux file and inbetween
1255        # times into seperate end files)
1256        print('... write disaggregated precipitation to files.')
1257        # index variable of disaggregated fields
1258        it = 0
1259        # loop over times and write original time step and the two newly
1260        # generated sub time steps for each loop
1261        for date in date_list:
1262            for step in step_list:
1263                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1264
1265                if c.purefc:
1266                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1267                                   '.{:0>3}'.format(step)
1268                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1269                                '.{:0>3}'.format(step) + '_1'
1270                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1271                                '.{:0>3}'.format(step) + '_2'
1272                else:
1273                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1274                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1275                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1276
1277                # write original time step to flux file as usual
1278                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1279                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1280                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1281                                  keynames=['date','time','stepRange','values'],
1282                                  keyvalues=[int(date.strftime('%Y%m%d')),
1283                                             date.hour*100, step, lsp_new_np[:,it]],
1284                                 )
1285                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1286                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1287                                  keynames=['date','time','stepRange','values'],
1288                                  keyvalues=[int(date.strftime('%Y%m%d')),
1289                                             date.hour*100, step, cp_new_np[:,it]]
1290                                 )
1291
1292                # write first subgrid time step
1293                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1294                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1295                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1296                                  keynames=['date','time','stepRange','values'],
1297                                  keyvalues=[int(date.strftime('%Y%m%d')),
1298                                             date.hour*100, step, lsp_new_np[:,it+1]]
1299                                  )
1300                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1301                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1302                                  keynames=['date','time','stepRange','values'],
1303                                  keyvalues=[int(date.strftime('%Y%m%d')),
1304                                             date.hour*100, step, cp_new_np[:,it+1]]
1305                                 )
1306
1307                # write second subgrid time step
1308                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1309                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1310                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1311                                  keynames=['date','time','stepRange','values'],
1312                                  keyvalues=[int(date.strftime('%Y%m%d')),
1313                                             date.hour*100, step, lsp_new_np[:,it+2]]
1314                                 )
1315                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1316                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1317                                  keynames=['date','time','stepRange','values'],
1318                                  keyvalues=[int(date.strftime('%Y%m%d')),
1319                                             date.hour*100, step, cp_new_np[:,it+2]]
1320                                 )
1321                it = it + 3 # jump to next original time step
1322        return
1323
1324    def _create_rr_grib_dummy(self, ifile, inputdir):
1325        '''Creates a grib file with a dummy message for the two precipitation
1326        types lsp and cp each.
1327
1328        Parameters
1329        ----------
1330        ifile : str
1331            Filename of the input file to read the grib messages from.
1332
1333        inputdir : str, optional
1334            Path to the directory where the retrieved data is stored.
1335
1336        Return
1337        ------
1338
1339        '''
1340
1341        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1342
1343        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1344                      keyvalues=[142], filemode='w')
1345
1346        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1347                      keyvalues=[143], filemode='a')
1348
1349        return
1350
1351    def create(self, inputfiles, c):
1352        '''An index file will be created which depends on the combination
1353        of "date", "time" and "stepRange" values. This is used to iterate
1354        over all messages in each grib file which were passed through the
1355        parameter "inputfiles" to seperate specific parameters into fort.*
1356        files. Afterwards the FORTRAN program is called to convert
1357        the data fields all to the same grid and put them in one file
1358        per unique time step (combination of "date", "time" and
1359        "stepRange").
1360
1361        Note
1362        ----
1363        This method is based on the ECMWF example index.py
1364        https://software.ecmwf.int/wiki/display/GRIB/index.py
1365
1366        Parameters
1367        ----------
1368        inputfiles : UioFiles
1369            Contains a list of files.
1370
1371        c : ControlFile
1372            Contains all the parameters of CONTROL file and
1373            command line.
1374
1375        Return
1376        ------
1377
1378        '''
1379
1380        # generate start and end timestamp of the retrieval period
1381        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1382        start_period = start_period + timedelta(hours=int(c.step[0]))
1383        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1384        end_period = end_period + timedelta(hours=int(c.step[-1]))
1385
1386        if c.wrf:
1387            table128 = init128(_config.PATH_GRIBTABLE)
1388            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1389                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1390                                  table128)
1391
1392        # these numbers are indices for the temporary files "fort.xx"
1393        # which are used to seperate the grib fields to,
1394        # for the Fortran program input
1395        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1396        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1397        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1398                 '17':None, '18':None, '19':None, '21':None, '22':None}
1399
1400        iid = None
1401        index_vals = None
1402
1403        # get the values of the keys which are used for distinct access
1404        # of grib messages via product
1405        if '/' in self.number:
1406            # more than one ensemble member is selected
1407            index_keys = ["number", "date", "time", "step"]
1408        else:
1409            index_keys = ["date", "time", "step"]
1410        iid, index_vals = self._mk_index_values(c.inputdir,
1411                                                inputfiles,
1412                                                index_keys)
1413        # index_vals looks like e.g.:
1414        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1415        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1416        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1417
1418        # "product" genereates each possible combination between the
1419        # values of the index keys
1420        for prod in product(*index_vals):
1421            # e.g. prod = ('20170505', '0', '12')
1422            #             (  date    ,time, step)
1423
1424            print('current product: ', prod)
1425
1426            for i in range(len(index_keys)):
1427                codes_index_select(iid, index_keys[i], prod[i])
1428
1429            # get first id from current product
1430            gid = codes_new_from_index(iid)
1431
1432            # if there is no data for this specific time combination / product
1433            # skip the rest of the for loop and start with next timestep/product
1434            if not gid:
1435                continue
1436#============================================================================================
1437            # remove old fort.* files and open new ones
1438            # they are just valid for a single product
1439            for k, f in fdict.items():
1440                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1441                silent_remove(fortfile)
1442                fdict[k] = open(fortfile, 'w')
1443#============================================================================================
1444            # create correct timestamp from the three time informations
1445            cdate = str(codes_get(gid, 'date'))
1446            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1447            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1448            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1449            timestamp += timedelta(hours=int(cstep))
1450            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1451
1452            # skip all temporary times
1453            # which are outside the retrieval period
1454            if timestamp < start_period or \
1455               timestamp > end_period:
1456                continue
1457
1458            # if the timestamp is out of basetime start/end date period,
1459            # skip this specific product
1460            if c.basetime is not None:
1461                time_delta = timedelta(hours=12-int(c.dtime))
1462                start_time = datetime.strptime(c.end_date + str(c.basetime),
1463                                                '%Y%m%d%H') - time_delta
1464                end_time = datetime.strptime(c.end_date + str(c.basetime),
1465                                             '%Y%m%d%H')
1466                if timestamp < start_time or timestamp > end_time:
1467                    continue
1468
1469            if c.wrf:
1470                if 'olddate' not in locals() or cdate != olddate:
1471                    fwrf = open(os.path.join(c.outputdir,
1472                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1473                    olddate = cdate[:]
1474#============================================================================================
1475            # savedfields remembers which fields were already used.
1476            savedfields = []
1477            # sum of cloud liquid and ice water content
1478            scwc = None
1479            while 1:
1480                if not gid:
1481                    break
1482                paramId = codes_get(gid, 'paramId')
1483                gridtype = codes_get(gid, 'gridType')
1484                levtype = codes_get(gid, 'typeOfLevel')
1485                if paramId == 77: # ETADOT
1486                    codes_write(gid, fdict['21'])
1487                elif paramId == 130: # T
1488                    codes_write(gid, fdict['11'])
1489                elif paramId == 131 or paramId == 132: # U, V wind component
1490                    codes_write(gid, fdict['10'])
1491                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1492                    codes_write(gid, fdict['17'])
1493                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1494                    codes_write(gid, fdict['18'])
1495                elif paramId == 135: # W
1496                    codes_write(gid, fdict['19'])
1497                elif paramId == 152: # LNSP
1498                    codes_write(gid, fdict['12'])
1499                elif paramId == 155 and gridtype == 'sh': # D
1500                    codes_write(gid, fdict['13'])
1501                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1502                    # sum cloud liquid water and ice
1503                    if scwc is None:
1504                        scwc = codes_get_values(gid)
1505                    else:
1506                        scwc += codes_get_values(gid)
1507                        codes_set_values(gid, scwc)
1508                        codes_set(gid, 'paramId', 201031)
1509                        codes_write(gid, fdict['22'])
1510                        scwc = None
1511                elif c.wrf and paramId in [129, 138, 155] and \
1512                      levtype == 'hybrid': # Z, VO, D
1513                    # do not do anything right now
1514                    # these are specific parameter for WRF
1515                    pass
1516                else:
1517                    if paramId not in savedfields:
1518                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1519                        # and all ADDPAR parameter
1520                        codes_write(gid, fdict['16'])
1521                        savedfields.append(paramId)
1522                    else:
1523                        print('duplicate ' + str(paramId) + ' not written')
1524
1525                try:
1526                    if c.wrf:
1527                        # model layer
1528                        if levtype == 'hybrid' and \
1529                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1530                            codes_write(gid, fwrf)
1531                        # sfc layer
1532                        elif paramId in wrfpars:
1533                            codes_write(gid, fwrf)
1534                except AttributeError:
1535                    pass
1536
1537                codes_release(gid)
1538                gid = codes_new_from_index(iid)
1539#============================================================================================
1540            for f in fdict.values():
1541                f.close()
1542#============================================================================================
1543            # call for Fortran program to convert e.g. reduced_gg grids to
1544            # regular_ll and calculate detadot/dp
1545            pwd = os.getcwd()
1546            os.chdir(c.inputdir)
1547            if os.stat('fort.21').st_size == 0 and c.eta:
1548                print('Parameter 77 (etadot) is missing, most likely it is '
1549                      'not available for this type or date / time\n')
1550                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1551                my_error('fort.21 is empty while parameter eta '
1552                         'is set to 1 in CONTROL file')
1553# ============================================================================================
1554            # write out all output to log file before starting fortran programm
1555            sys.stdout.flush()
1556
1557            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1558            execute_subprocess([os.path.join(c.exedir,
1559                                _config.FORTRAN_EXECUTABLE)],
1560                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1561
1562            os.chdir(pwd)
1563# ============================================================================================
1564            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1565            if c.purefc:
1566                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1567            else:
1568                suffix = cdate_hour[2:10]
1569
1570            # if necessary, add ensemble member number to filename suffix
1571            if 'number' in index_keys:
1572                index_number = index_keys.index('number')
1573                if len(index_vals[index_number]) > 1:
1574                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1575
1576            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1577            print("outputfile = " + fnout)
1578            # collect for final processing
1579            self.outputfilelist.append(os.path.basename(fnout))
1580            # get additional precipitation subgrid data if available
1581            if c.rrint:
1582                self.outputfilelist.append(os.path.basename(fnout + '_1'))
1583                self.outputfilelist.append(os.path.basename(fnout + '_2'))
1584# ============================================================================================
1585            # create outputfile and copy all data from intermediate files
1586            # to the outputfile (final GRIB input files for FLEXPART)
1587            orolsm = os.path.basename(glob.glob(c.inputdir +
1588                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1589            fluxfile = 'flux' + cdate[0:2] + suffix
1590            if not c.cwc:
1591                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1592            else:
1593                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1594
1595            with open(fnout, 'wb') as fout:
1596                for f in flist:
1597                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1598                                       fout)
1599
1600            if c.omega:
1601                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1602                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1603                                            'rb'), fout)
1604# ============================================================================================
1605        if c.wrf:
1606            fwrf.close()
1607
1608        codes_index_release(iid)
1609
1610        return
1611
1612
1613    def calc_extra_elda(self, path, prefix):
1614        ''' Calculates extra ensemble members for ELDA - Stream.
1615
1616        Parameters
1617        ----------
1618        path : str
1619            Path to the output files.
1620
1621        prefix : str
1622            The prefix of the output filenames as defined in Control file.
1623
1624        Return
1625        ------
1626
1627        '''
1628        # max number
1629        maxnum = int(self.number.split('/')[-1])
1630
1631        # get a list of all prepared output files with control forecast (CF)
1632        CF_filelist = UioFiles(path, prefix + '*.N000')
1633
1634        for cffile in CF_filelist.files:
1635            with open(cffile, 'rb') as f:
1636                cfvalues=[]
1637                while True:
1638                    fid = codes_grib_new_from_file(f)
1639                    if fid is None:
1640                        break
1641                    cfvalues.append(codes_get_array(fid, 'values'))
1642                    codes_release(fid)
1643
1644            filename = cffile.split('N000')[0]
1645            for i in range(1, maxnum+1): # max number nehmen
1646
1647                # read an ensemble member
1648                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1649                # create file for newly calculated ensemble member
1650                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1651                # number of message in grib file
1652                j = 0
1653                while True:
1654                    gid = codes_grib_new_from_file(g)
1655                    if gid is None:
1656                        break
1657                    values = codes_get_array(gid, 'values')
1658                    codes_set_array(gid, 'values',
1659                                    values-2*(values-cfvalues[j]))
1660                    codes_set(gid, 'number', i+maxnum)
1661                    codes_write(gid, h)
1662                    codes_release(gid)
1663                    j += 1
1664
1665                g.close()
1666                h.close()
1667                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1668                self.outputfilelist.append(
1669                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1670
1671        return
1672
1673
1674    def process_output(self, c):
1675        '''Postprocessing of FLEXPART input files.
1676
1677        The grib files are postprocessed depending on the selection in
1678        CONTROL file. The resulting files are moved to the output
1679        directory if its not equal to the input directory.
1680        The following modifications might be done if
1681        properly switched in CONTROL file:
1682        GRIB2 - Conversion to GRIB2
1683        ECTRANS - Transfer of files to gateway server
1684        ECSTORAGE - Storage at ECMWF server
1685
1686        Parameters
1687        ----------
1688        c : ControlFile
1689            Contains all the parameters of CONTROL file and
1690            command line.
1691
1692        Return
1693        ------
1694
1695        '''
1696
1697        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1698
1699        if _config.FLAG_ON_ECMWFSERVER:
1700            print('ecstorage: {}\n ecfsdir: {}\n'.
1701                  format(c.ecstorage, c.ecfsdir))
1702            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1703                  .format(c.ectrans, c.gateway, c.destination))
1704
1705        print('Output filelist: ')
1706        print(self.outputfilelist)
1707
1708        for ofile in self.outputfilelist:
1709            ofile = os.path.join(self.inputdir, ofile)
1710
1711            if c.format.lower() == 'grib2':
1712                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1713                                    'productDefinitionTemplateNumber=8',
1714                                    ofile, ofile + '_2'],
1715                                   error_msg='GRIB2 CONVERSION FAILED!')
1716
1717                execute_subprocess(['mv', ofile + '_2', ofile],
1718                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1719                                   'FILES FAILED!')
1720
1721            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1722                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1723                                    c.gateway, '-remote', c.destination,
1724                                    '-source', ofile],
1725                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1726
1727            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1728                execute_subprocess(['ecp', '-o', ofile,
1729                                    os.path.expandvars(c.ecfsdir)],
1730                                   error_msg='COPY OF FILES TO ECSTORAGE '
1731                                   'AREA FAILED!')
1732
1733            if c.outputdir != c.inputdir:
1734                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1735                                    c.outputdir],
1736                                   error_msg='RELOCATION OF OUTPUT FILES '
1737                                   'TO OUTPUTDIR FAILED!')
1738
1739        return
1740
1741
1742    def prepare_fp_files(self, c):
1743        '''Conversion of GRIB files to FLEXPART binary format.
1744
1745        Parameters
1746        ----------
1747        c : ControlFile
1748            Contains all the parameters of CONTROL file and
1749            command line.
1750
1751        Return
1752        ------
1753
1754        '''
1755        # generate AVAILABLE file
1756        # Example of AVAILABLE file data:
1757        # 20131107 000000      EN13110700              ON DISC
1758        clist = []
1759        for ofile in self.outputfilelist:
1760            fname = ofile.split('/')
1761            if '.' in fname[-1]:
1762                l = fname[-1].split('.')
1763                timestamp = datetime.strptime(l[0][-6:] + l[1],
1764                                              '%y%m%d%H')
1765                timestamp += timedelta(hours=int(l[2]))
1766                cdate = datetime.strftime(timestamp, '%Y%m%d')
1767                chms = datetime.strftime(timestamp, '%H%M%S')
1768            else:
1769                cdate = '20' + fname[-1][-8:-2]
1770                chms = fname[-1][-2:] + '0000'
1771            clist.append(cdate + ' ' + chms + ' '*6 +
1772                         fname[-1] + ' '*14 + 'ON DISC')
1773        clist.sort()
1774        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1775            f.write('\n'.join(clist) + '\n')
1776
1777        # generate pathnames file
1778        pwd = os.path.abspath(c.outputdir)
1779        with open(pwd + '/pathnames', 'w') as f:
1780            f.write(pwd + '/Options/\n')
1781            f.write(pwd + '/\n')
1782            f.write(pwd + '/\n')
1783            f.write(pwd + '/AVAILABLE\n')
1784            f.write(' = == = == = == = == = == ==  = \n')
1785
1786        # create Options dir if necessary
1787        if not os.path.exists(pwd + '/Options'):
1788            make_dir(pwd+'/Options')
1789
1790        # read template COMMAND file
1791        with open(os.path.expandvars(os.path.expanduser(
1792            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1793            lflist = f.read().split('\n')
1794
1795        # find index of list where to put in the
1796        # date and time information
1797        # usually after the LDIRECT parameter
1798        i = 0
1799        for l in lflist:
1800            if 'LDIRECT' in l.upper():
1801                break
1802            i += 1
1803
1804        # insert the date and time information of run start and end
1805        # into the list of lines of COMMAND file
1806        lflist = lflist[:i+1] + \
1807                 [clist[0][:16], clist[-1][:16]] + \
1808                 lflist[i+3:]
1809
1810        # write the new COMMAND file
1811        with open(pwd + '/Options/COMMAND', 'w') as g:
1812            g.write('\n'.join(lflist) + '\n')
1813
1814        # change to outputdir and start the grib2flexpart run
1815        # afterwards switch back to the working dir
1816        os.chdir(c.outputdir)
1817        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1818         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1819        execute_subprocess(cmd)
1820        os.chdir(pwd)
1821
1822        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG