source: flex_extract.git/source/python/classes/EcFlexpart.py @ e446e85

dev
Last change on this file since e446e85 was e446e85, checked in by Anne Philipp <anne.philipp@…>, 3 months ago

added functionality to eliminate unnecessary data/times

  • Property mode set to 100644
File size: 73.1 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get, codes_get_array,
70                     codes_set_array, codes_grib_new_from_file)
71
72# software specific classes and modules from flex_extract
73sys.path.append('../')
74import _config
75from GribUtil import GribUtil
76from mods.tools import (init128, to_param_id, silent_remove, product,
77                        my_error, make_dir, get_informations, get_dimensions,
78                        execute_subprocess)
79from MarsRetrieval import MarsRetrieval
80from UioFiles import UioFiles
81import mods.disaggregation as disaggregation
82
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = range(len(ftime))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        self.types[str(self.acctype)] = {'times': str(self.acctime),
324                                         'steps': '{}/to/{}/by/{}'.format(
325                                             self.dtime,
326                                             self.accmaxstep,
327                                             self.dtime)}
328        return
329
330    def _create_params(self, gauss, eta, omega, cwc, wrf):
331        '''Define the specific parameter settings for retrievment.
332
333        The different parameters need specific grid types and level types
334        for retrievement. We might get following combination of types
335        (depending on selection and availability):
336        (These are short cuts for the grib file names (leading sequence)
337        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
338        where:
339            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
340            ML = Model Level, SL = Surface Level
341
342        For each of this combination there is a list of parameter names,
343        the level type, the level list and the grid resolution.
344
345        There are different scenarios for data extraction from MARS:
346        1) Retrieval of etadot
347           eta=1, gauss=0, omega=0
348        2) Calculation of etadot from divergence
349           eta=0, gauss=1, omega=0
350        3) Calculation of etadot from omega (for makes sense for debugging)
351           eta=0, gauss=0, omega=1
352        4) Retrieval and Calculation of etadot (only for debugging)
353           eta=1, gauss=1, omega=0
354        5) Download also specific model and surface level data for FLEXPART-WRF
355
356        Parameters:
357        -----------
358        gauss : int
359            Gaussian grid is retrieved.
360
361        eta : int
362            Etadot parameter will be directly retrieved.
363
364        omega : int
365            The omega paramterwill be retrieved.
366
367        cwc : int
368            The cloud liquid and ice water content will be retrieved.
369
370        wrf : int
371            Additional model level and surface level data will be retrieved for
372            WRF/FLEXPART-WRF simulations.
373
374        Return
375        ------
376
377        '''
378        # SURFACE FIELDS
379        #-----------------------------------------------------------------------
380        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
381        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
382                                 'SFC', '1', self.grid]
383        if self.addpar:
384            self.params['OG__SL'][0] += self.addpar
385
386        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
387            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
388                                            'SFC', '1', self.grid]
389        else:
390            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
391                                            'SFC', '1', self.grid]
392
393        # MODEL LEVEL FIELDS
394        #-----------------------------------------------------------------------
395        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
396
397        if not gauss and eta:
398            self.params['OG__ML'][0] += '/U/V/ETADOT'
399        elif gauss and not eta:
400            self.params['GG__SL'] = ['Q', 'ML', '1', \
401                                     '{}'.format((int(self.resol) + 1) / 2)]
402            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
403        elif not gauss and not eta:
404            self.params['OG__ML'][0] += '/U/V'
405        else:
406            print('Warning: Collecting etadot and parameters for gaussian grid \
407                            is a very costly parameter combination, \
408                            use this combination only for debugging!')
409            self.params['GG__SL'] = ['Q', 'ML', '1', \
410                                     '{}'.format((int(self.resol) + 1) / 2)]
411            self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
412                                     '{}'.format((int(self.resol) + 1) / 2)]
413
414        if omega:
415            self.params['OG__ML'][0] += '/W'
416
417        if cwc:
418            self.params['OG__ML'][0] += '/CLWC/CIWC'
419
420        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
421        #-----------------------------------------------------------------------
422        if wrf:
423            self.params['OG__ML'][0] += '/Z/VO'
424            if '/D' not in self.params['OG__ML'][0]:
425                self.params['OG__ML'][0] += '/D'
426            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
427                       'SWVL1','SWVL2','SWVL3','SWVL4']
428            for par in wrf_sfc:
429                if par not in self.params['OG__SL'][0]:
430                    self.params['OG__SL'][0] += '/' + par
431
432        return
433
434
435    def _create_params_fluxes(self):
436        '''Define the parameter setting for flux data.
437
438        Flux data are accumulated fields in time and are stored on the
439        surface level. The leading short cut name for the grib files is:
440        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
441        acc for Accumulated Grid.
442        The params dictionary stores a list of parameter names, the level type,
443        the level list and the grid resolution.
444
445        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
446
447        Parameters:
448        -----------
449
450        Return
451        ------
452
453        '''
454        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
455                                    'SFC', '1', self.grid]
456        return
457
458
459    def _mk_targetname(self, ftype, param, date):
460        '''Creates the filename for the requested grib data to be stored in.
461        This name is passed as the "target" parameter in the request.
462
463        Parameters
464        ----------
465        ftype : str
466            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
467
468        param : str
469            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
470            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
471
472        date : str
473            The date period of the grib data to be stored in this file.
474
475        Return
476        ------
477        targetname : str
478            The target filename for the grib data.
479        '''
480        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
481                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
482
483        return targetname
484
485
486    def _start_retrievement(self, request, par_dict):
487        '''Creates the Mars Retrieval and prints or submits the request
488        depending on the status of the request variable.
489
490        Parameters
491        ----------
492        request : int
493            Selects the mode of retrieval.
494            0: Retrieves the data from ECMWF.
495            1: Prints the mars requests to an output file.
496            2: Retrieves the data and prints the mars request.
497
498        par_dict : dictionary
499            Contains all parameter which have to be set for creating the
500            Mars Retrievals. The parameter are:
501            marsclass, dataset, stream, type, levtype, levelist, resol,
502            gaussian, accuracy, grid, target, area, date, time, number,
503            step, expver, param
504
505        Return
506        ------
507
508        '''
509        # increase number of mars requests
510        self.mreq_count += 1
511
512        MR = MarsRetrieval(self.server,
513                           self.public,
514                           marsclass=par_dict['marsclass'],
515                           dataset=par_dict['dataset'],
516                           stream=par_dict['stream'],
517                           type=par_dict['type'],
518                           levtype=par_dict['levtype'],
519                           levelist=par_dict['levelist'],
520                           resol=par_dict['resol'],
521                           gaussian=par_dict['gaussian'],
522                           accuracy=par_dict['accuracy'],
523                           grid=par_dict['grid'],
524                           target=par_dict['target'],
525                           area=par_dict['area'],
526                           date=par_dict['date'],
527                           time=par_dict['time'],
528                           number=par_dict['number'],
529                           step=par_dict['step'],
530                           expver=par_dict['expver'],
531                           param=par_dict['param'])
532
533        if request == 0:
534            MR.display_info()
535            MR.data_retrieve()
536        elif request == 1:
537            MR.print_infodata_csv(self.inputdir, self.mreq_count)
538        elif request == 2:
539            MR.print_infodata_csv(self.inputdir, self.mreq_count)
540            MR.display_info()
541            MR.data_retrieve()
542        else:
543            print('Failure')
544
545        return
546
547
548    def _mk_index_values(self, inputdir, inputfiles, keys):
549        '''Creates an index file for a set of grib parameter keys.
550        The values from the index keys are returned in a list.
551
552        Parameters
553        ----------
554        keys : dictionary
555            List of parameter names which serves as index.
556
557        inputfiles : UioFiles
558            Contains a list of files.
559
560        Return
561        ------
562        iid : codes_index
563            This is a grib specific index structure to access
564            messages in a file.
565
566        index_vals : list of list  of str
567            Contains the values from the keys used for a distinct selection
568            of grib messages in processing  the grib files.
569            Content looks like e.g.:
570            index_vals[0]: ('20171106', '20171107', '20171108') ; date
571            index_vals[1]: ('0', '1200', '1800', '600') ; time
572            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
573        '''
574        iid = None
575        index_keys = keys
576
577        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
578        silent_remove(indexfile)
579        grib = GribUtil(inputfiles.files)
580        # creates new index file
581        iid = grib.index(index_keys=index_keys, index_file=indexfile)
582
583        # read the values of index keys
584        index_vals = []
585        for key in index_keys:
586            key_vals = codes_index_get(iid, key)
587            # have to sort the key values for correct disaggregation,
588            # therefore convert to int first
589            key_vals = [int(k) for k in key_vals]
590            key_vals.sort()
591            key_vals = [str(k) for k in key_vals]
592            index_vals.append(key_vals)
593            # index_vals looks for example like:
594            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
595            # index_vals[1]: ('0', '1200') ; time
596            # index_vals[2]: (3', '6', '9', '12') ; stepRange
597
598        return iid, index_vals
599
600
601    def retrieve(self, server, dates, public, request, inputdir='.'):
602        '''Finalizing the retrieval information by setting final details
603        depending on grid type.
604        Prepares MARS retrievals per grid type and submits them.
605
606        Parameters
607        ----------
608        server : ECMWFService or ECMWFDataServer
609            The connection to the ECMWF server. This is different
610            for member state users which have full access and non
611            member state users which have only access to the public
612            data sets. The decision is made from command line argument
613            "public"; for public access its True (ECMWFDataServer)
614            for member state users its False (ECMWFService)
615
616        dates : str
617            Contains start and end date of the retrieval in the format
618            "YYYYMMDD/to/YYYYMMDD"
619
620        request : int
621            Selects the mode of retrieval.
622            0: Retrieves the data from ECMWF.
623            1: Prints the mars requests to an output file.
624            2: Retrieves the data and prints the mars request.
625
626        inputdir : str, optional
627            Path to the directory where the retrieved data is about
628            to be stored. The default is the current directory ('.').
629
630        Return
631        ------
632
633        '''
634        self.dates = dates
635        self.server = server
636        self.public = public
637        self.inputdir = inputdir
638        oro = False
639
640        # define times with datetime module
641        t12h = timedelta(hours=12)
642        t24h = timedelta(hours=24)
643
644        # dictionary which contains all parameter for the mars request,
645        # entries with a "None" will change in different requests and will
646        # therefore be set in each request seperately
647        retr_param_dict = {'marsclass':self.marsclass,
648                           'dataset':self.dataset,
649                           'stream':None,
650                           'type':None,
651                           'levtype':None,
652                           'levelist':None,
653                           'resol':self.resol,
654                           'gaussian':None,
655                           'accuracy':self.accuracy,
656                           'grid':None,
657                           'target':None,
658                           'area':None,
659                           'date':None,
660                           'time':None,
661                           'number':self.number,
662                           'step':None,
663                           'expver':self.expver,
664                           'param':None}
665
666        for ftype in self.types:
667            # ftype contains field types such as
668            #     [AN, FC, PF, CV]
669            for pk, pv in self.params.iteritems():
670                # pk contains one of these keys of params
671                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
672                #      OG_OROLSM_SL, OG_acc_SL]
673                # pv contains all of the items of the belonging key
674                #     [param, levtype, levelist, grid]
675                if isinstance(pv, str):
676                    continue
677                retr_param_dict['type'] = ftype
678                retr_param_dict['time'] = self.types[ftype]['times']
679                retr_param_dict['step'] = self.types[ftype]['steps']
680                retr_param_dict['date'] = self.dates
681                retr_param_dict['stream'] = self.stream
682                retr_param_dict['target'] = \
683                    self._mk_targetname(ftype,
684                                        pk,
685                                        retr_param_dict['date'].split('/')[0])
686                retr_param_dict['param'] = pv[0]
687                retr_param_dict['levtype'] = pv[1]
688                retr_param_dict['levelist'] = pv[2]
689                retr_param_dict['grid'] = pv[3]
690                retr_param_dict['area'] = self.area
691                retr_param_dict['gaussian'] = self.gaussian
692
693                if pk == 'OG_OROLSM__SL' and not oro:
694                    oro = True
695                    # in CERA20C (class EP) there is no stream "OPER"!
696                    if self.marsclass.upper() != 'EP':
697                        retr_param_dict['stream'] = 'OPER'
698                    retr_param_dict['type'] = 'AN'
699                    retr_param_dict['time'] = '00'
700                    retr_param_dict['step'] = '000'
701                    retr_param_dict['date'] = self.dates.split('/')[0]
702                    retr_param_dict['target'] = self._mk_targetname('',
703                                            pk, retr_param_dict['date'])
704                elif pk == 'OG_OROLSM__SL' and oro:
705                    continue
706                if pk == 'GG__SL' and pv[0] == 'Q':
707                    retr_param_dict['area'] = ""
708                    retr_param_dict['gaussian'] = 'reduced'
709                if ftype.upper() == 'FC' and \
710                    'acc' not in retr_param_dict['target']:
711                    if (int(retr_param_dict['time'][0]) +
712                        int(retr_param_dict['step'][0])) > 23:
713                        dates = retr_param_dict['date'].split('/')
714                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
715                        sdate = sdate - timedelta(days=1)
716                        retr_param_dict['date'] = '/'.join(
717                            [sdate.strftime("%Y%m%d")] +
718                            retr_param_dict['date'][1:])
719
720                        print('CHANGED FC start date to ' +
721                              sdate.strftime("%Y%m%d") +
722                              ' to accomodate TIME=' +
723                              retr_param_dict['time'][0] +
724                              ', STEP=' +
725                              retr_param_dict['time'][0])
726
727    # ------  on demand path  --------------------------------------------------
728                if self.basetime is None:
729                    # ******* start retrievement
730                    self._start_retrievement(request, retr_param_dict)
731    # ------  operational path  ------------------------------------------------
732                else:
733                    # check if mars job requests fields beyond basetime.
734                    # if yes eliminate those fields since they may not
735                    # be accessible with user's credentials
736
737                    enddate = retr_param_dict['date'].split('/')[-1]
738                    elimit = datetime.strptime(enddate + str(self.basetime),
739                                               '%Y%m%d%H')
740
741                    if self.basetime == 12:
742                        # --------------  flux data ----------------------------
743                        if 'acc' in pk:
744                            startdate = retr_param_dict['date'].split('/')[0]
745                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
746                            retr_param_dict['date'] = '/'.join([startdate,
747                                                                'to',
748                                                                enddate])
749
750                            # ******* start retrievement
751                            self._start_retrievement(request, retr_param_dict)
752
753                            retr_param_dict['date'] = \
754                                datetime.strftime(elimit - t12h, '%Y%m%d')
755                            retr_param_dict['time'] = '00'
756                            retr_param_dict['target'] = \
757                                self._mk_targetname(ftype, pk,
758                                                    retr_param_dict['date'])
759
760                            # ******* start retrievement
761                            self._start_retrievement(request, retr_param_dict)
762
763                        # --------------  non flux data ------------------------
764                        else:
765                            # ******* start retrievement
766                            self._start_retrievement(request, retr_param_dict)
767
768                    elif self.basetime == 0:
769                        retr_param_dict['date'] = \
770                            datetime.strftime(elimit - t24h, '%Y%m%d')
771
772                        timesave = ''.join(retr_param_dict['time'])
773
774                        if ('/' in retr_param_dict['time'] and
775                            pk != 'OG_OROLSM__SL' and
776                            'acc' not in pk ) :
777                            times = retr_param_dict['time'].split('/')
778                            steps = retr_param_dict['step'].split('/')
779
780                            while int(times[0]) + int(steps[0]) <= 12:
781                                times = times[1:]
782                                if len(times) > 1:
783                                    retr_param_dict['time'] = '/'.join(times)
784                                else:
785                                    retr_param_dict['time'] = times[0]
786
787                        if (pk != 'OG_OROLSM__SL' and
788                            int(retr_param_dict['step'].split('/')[0]) == 0 and
789                            int(timesave.split('/')[0]) == 0):
790
791                            retr_param_dict['date'] = \
792                                datetime.strftime(elimit, '%Y%m%d')
793                            retr_param_dict['time'] = '00'
794                            retr_param_dict['step'] = '000'
795                            retr_param_dict['target'] = \
796                                self._mk_targetname(ftype, pk,
797                                                    retr_param_dict['date'])
798
799                        # ******* start retrievement
800                        self._start_retrievement(request, retr_param_dict)
801                    else:
802                        raise ValueError('ERROR: Basetime has an invalid value '
803                                                 '-> {}'.format(str(basetime)))
804
805        if request == 0 or request == 2:
806            print('MARS retrieve done ... ')
807        elif request == 1:
808            print('MARS request printed ...')
809
810        return
811
812
813    def write_namelist(self, c):
814        '''Creates a namelist file in the temporary directory and writes
815        the following values to it: maxl, maxb, mlevel,
816        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
817        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
818
819        Parameters
820        ----------
821        c : ControlFile
822            Contains all the parameters of CONTROL file and
823            command line.
824
825        filename : str
826                Name of the namelist file.
827
828        Return
829        ------
830
831        '''
832
833        from genshi.template.text import NewTextTemplate
834        from genshi.template import  TemplateLoader
835        from genshi.template.eval import  UndefinedError
836
837        try:
838            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
839            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
840                                            cls=NewTextTemplate)
841
842            self.inputdir = c.inputdir
843            area = np.asarray(self.area.split('/')).astype(float)
844            grid = np.asarray(self.grid.split('/')).astype(float)
845
846            if area[1] > area[3]:
847                area[1] -= 360
848            maxl = int((area[3] - area[1]) / grid[1]) + 1
849            maxb = int((area[0] - area[2]) / grid[0]) + 1
850
851            stream = namelist_template.generate(
852                maxl = str(maxl),
853                maxb = str(maxb),
854                mlevel = str(self.level),
855                mlevelist = str(self.levelist),
856                mnauf = str(self.resol),
857                metapar = '77',
858                rlo0 = str(area[1]),
859                rlo1 = str(area[3]),
860                rla0 = str(area[2]),
861                rla1 = str(area[0]),
862                momega = str(c.omega),
863                momegadiff = str(c.omegadiff),
864                mgauss = str(c.gauss),
865                msmooth = str(c.smooth),
866                meta = str(c.eta),
867                metadiff = str(c.etadiff),
868                mdpdeta = str(c.dpdeta)
869            )
870        except UndefinedError as e:
871            print('... ERROR ' + str(e))
872
873            sys.exit('\n... error occured while trying to generate namelist ' +
874                     _config.TEMPFILE_NAMELIST)
875        except OSError as e:
876            print('... ERROR CODE: ' + str(e.errno))
877            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
878
879            sys.exit('\n... error occured while trying to generate template ' +
880                     _config.TEMPFILE_NAMELIST)
881
882        try:
883            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
884
885            with open(namelistfile, 'w') as f:
886                f.write(stream.render('text'))
887        except OSError as e:
888            print('... ERROR CODE: ' + str(e.errno))
889            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
890
891            sys.exit('\n... error occured while trying to write ' +
892                     namelistfile)
893
894        return
895
896
897    def deacc_fluxes(self, inputfiles, c):
898        '''De-accumulate and disaggregate flux data.
899
900        Goes through all flux fields in ordered time and de-accumulate
901        the fields. Afterwards the fields are disaggregated in time.
902        Different versions of disaggregation is provided for rainfall
903        data (darain, modified linear) and the surface fluxes and
904        stress data (dapoly, cubic polynomial).
905
906        Parameters
907        ----------
908        inputfiles : UioFiles
909            Contains the list of files that contain flux data.
910
911        c : ControlFile
912            Contains all the parameters of CONTROL file and
913            command line.
914
915        Return
916        ------
917
918        '''
919
920        table128 = init128(_config.PATH_GRIBTABLE)
921        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
922
923        iid = None
924        index_vals = None
925
926        # get the values of the keys which are used for distinct access
927        # of grib messages via product
928        if '/' in self.number:
929            # more than one ensemble member is selected
930            index_keys = ["number", "date", "time", "step"]
931        else:
932            index_keys = ["date", "time", "step"]
933        iid, index_vals = self._mk_index_values(c.inputdir,
934                                                inputfiles,
935                                                index_keys)
936        # index_vals looks like e.g.:
937        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
938        # index_vals[1]: ('0', '600', '1200', '1800') ; time
939        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
940
941        if c.rrint:
942            if not c.purefc:
943                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
944                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
945            else:
946                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
947                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
948                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
949                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
950                end_date = end_date + timedelta(hours=c.maxstep)
951
952            info = get_informations(os.path.join(c.inputdir,
953                                                 inputfiles.files[0]))
954            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
955                                  start_date, end_date)
956            # create numpy array
957            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
958            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
959            it_lsp = 0
960            it_cp = 0
961            date_list = []
962            step_list = []
963
964        # initialize dictionaries to store values
965        orig_vals = {}
966        deac_vals = {}
967        for p in pars:
968            orig_vals[p] = []
969            deac_vals[p] = []
970
971        # "product" genereates each possible combination between the
972        # values of the index keys
973        for prod in product(*index_vals):
974            # e.g. prod = ('20170505', '0', '12')
975            #             (  date    ,time, step)
976
977            print('CURRENT PRODUCT: ', prod)
978
979            for i in range(len(index_keys)):
980                codes_index_select(iid, index_keys[i], prod[i])
981
982            # get first id from current product
983            gid = codes_new_from_index(iid)
984
985            # if there is no data for this specific time combination / product
986            # skip the rest of the for loop and start with next timestep/product
987            if not gid:
988                continue
989
990            # create correct timestamp from the three time informations
991            cdate = str(codes_get(gid, 'date'))
992            time = codes_get(gid, 'time')/100 # integer
993            step = codes_get(gid, 'step') # integer
994            ctime = '{:0>2}'.format(time)
995            cstep = '{:0>3}'.format(step)
996
997            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
998            t_dt = t_date + timedelta(hours=step)
999            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1000            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1001            if c.basetime is not None:
1002                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1003                                              '%Y%m%d%H')
1004            else:
1005                t_enddate = t_date + timedelta(2*int(c.dtime))
1006
1007            # if necessary, add ensemble member number to filename suffix
1008            # otherwise, add empty string
1009            if 'number' in index_keys:
1010                index_number = index_keys.index('number')
1011                if len(index_vals[index_number]) > 1:
1012                    numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1013            else:
1014                numbersuffix = ''
1015
1016            if c.purefc:
1017                fnout = os.path.join(c.inputdir, 'flux' +
1018                                     t_date.strftime('%Y%m%d.%H') +
1019                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1020                                     numbersuffix)
1021                gnout = os.path.join(c.inputdir, 'flux' +
1022                                     t_date.strftime('%Y%m%d.%H') +
1023                                     '.{:0>3}'.format(step-int(c.dtime)) +
1024                                     numbersuffix)
1025                hnout = os.path.join(c.inputdir, 'flux' +
1026                                     t_date.strftime('%Y%m%d.%H') +
1027                                     '.{:0>3}'.format(step) +
1028                                     numbersuffix)
1029            else:
1030                fnout = os.path.join(c.inputdir, 'flux' +
1031                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1032                gnout = os.path.join(c.inputdir, 'flux' +
1033                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1034                hnout = os.path.join(c.inputdir, 'flux' +
1035                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1036
1037            print("outputfile = " + fnout)
1038            f_handle = open(fnout, 'w')
1039            h_handle = open(hnout, 'w')
1040            g_handle = open(gnout, 'w')
1041
1042            # read message for message and store relevant data fields, where
1043            # data keywords are stored in pars
1044            while True:
1045                if not gid:
1046                    break
1047                parId = codes_get(gid, 'paramId') # integer
1048                step = codes_get(gid, 'step') # integer
1049                time = codes_get(gid, 'time') # integer
1050                ni = codes_get(gid, 'Ni') # integer
1051                nj = codes_get(gid, 'Nj') # integer
1052                if parId not in orig_vals.keys():
1053                    # parameter is not a flux, find next one
1054                    continue
1055
1056                # define conversion factor
1057                if parId == 142 or parId == 143:
1058                    fak = 1. / 1000.
1059                else:
1060                    fak = 3600.
1061
1062                # get parameter values and reshape
1063                values = codes_get_values(gid)
1064                values = (np.reshape(values, (nj, ni))).flatten() / fak
1065
1066                # save the original and accumulated values
1067                orig_vals[parId].append(values[:])
1068
1069                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1070                    # no de-accumulation needed
1071                    deac_vals[parId].append(values[:] / int(c.dtime))
1072                else:
1073                    # do de-accumulation
1074                    deac_vals[parId].append(
1075                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1076                         int(c.dtime))
1077
1078                # store precipitation if new disaggregation method is selected
1079                # only the exact days are needed
1080                if c.rrint:
1081                    if start_date <= t_dt <= end_date:
1082                        if not c.purefc:
1083                            if t_dt not in date_list:
1084                                date_list.append(t_dt)
1085                                step_list = [0]
1086                        else:
1087                            if t_date not in date_list:
1088                                date_list.append(t_date)
1089                            if step not in step_list:
1090                                step_list.append(step)
1091                        if c.rrint and parId == 142:
1092                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
1093                            it_lsp += 1
1094                        elif c.rrint and parId == 143:
1095                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
1096                            it_cp += 1
1097
1098                # information printout
1099                print(parId, time, step, len(values), values[0], np.std(values))
1100
1101                # length of deac_vals[parId] corresponds to the
1102                # number of time steps, max. 4 are needed for disaggegration
1103                # with the old and original method
1104                # run over all grib messages and perform
1105                # shifting in time
1106                if len(deac_vals[parId]) >= 3:
1107                    if len(deac_vals[parId]) > 3:
1108                        if not c.rrint and (parId == 142 or parId == 143):
1109                            values = disaggregation.darain(deac_vals[parId])
1110                        else:
1111                            values = disaggregation.dapoly(deac_vals[parId])
1112
1113                        if not (step == c.maxstep and c.purefc \
1114                                or t_dt == t_enddate):
1115                            # remove first time step in list to shift
1116                            # time line
1117                            orig_vals[parId].pop(0)
1118                            deac_vals[parId].pop(0)
1119                    else:
1120                        # if the third time step is read (per parId),
1121                        # write out the first one as a boundary value
1122                        if c.purefc:
1123                            values = deac_vals[parId][1]
1124                        else:
1125                            values = deac_vals[parId][0]
1126
1127                    if not (c.rrint and (parId == 142 or parId == 143)):
1128                        codes_set_values(gid, values)
1129
1130                        if c.purefc:
1131                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1132                        else:
1133                            codes_set(gid, 'stepRange', 0)
1134                            codes_set(gid, 'time', t_m2dt.hour*100)
1135                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1136
1137                        codes_write(gid, f_handle)
1138
1139                        # squeeze out information of last two steps
1140                        # contained in deac_vals[parId]
1141                        # Note that deac_vals[parId][0] has not been popped
1142                        # in this case
1143
1144                        if step == c.maxstep and c.purefc or \
1145                           t_dt == t_enddate:
1146                            # last step
1147                            if c.purefc:
1148                                values = deac_vals[parId][3]
1149                                codes_set_values(gid, values)
1150                                codes_set(gid, 'stepRange', step)
1151                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1152                                codes_write(gid, h_handle)
1153                            else:
1154                                values = deac_vals[parId][3]
1155                                codes_set_values(gid, values)
1156                                codes_set(gid, 'stepRange', 0)
1157                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1158                                codes_set(gid, 'time', truedatetime.hour * 100)
1159                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1160                                codes_write(gid, h_handle)
1161
1162                            if parId == 142 or parId == 143:
1163                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1164                            else:
1165                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1166
1167                            # step before last step
1168                            if c.purefc:
1169                                codes_set(gid, 'stepRange', step-int(c.dtime))
1170                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1171                                codes_set_values(gid, values)
1172                                codes_write(gid, g_handle)
1173                            else:
1174                                codes_set(gid, 'stepRange', 0)
1175                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1176                                codes_set(gid, 'time', truedatetime.hour * 100)
1177                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1178                                codes_set_values(gid, values)
1179                                codes_write(gid, g_handle)
1180
1181                codes_release(gid)
1182
1183                gid = codes_new_from_index(iid)
1184
1185            f_handle.close()
1186            g_handle.close()
1187            h_handle.close()
1188
1189        codes_index_release(iid)
1190
1191        if c.rrint:
1192            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1193
1194            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1195                                 cp_np, date_list, step_list, c)
1196
1197        return
1198
1199    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1200        '''Calculates and writes out the disaggregated precipitation fields.
1201
1202        Disaggregation is done in time and original times are written to the
1203        flux files, while the additional subgrid times are written to
1204        extra files.
1205
1206        Parameters
1207        ----------
1208        ni : int
1209            Amount of zonal grid points.
1210
1211        nj : int
1212            Amount of meridional grid points.
1213
1214        nt : int
1215            Number of time steps.
1216
1217        lsp_np : numpy array of float
1218            The large scale precipitation fields for each time step.
1219            Shape (ni * nj, nt).
1220
1221        cp_np : numpy array of float
1222            The convective precipitation fields for each time step.
1223            Shape (ni * nj, nt).
1224
1225        date_list : list of datetime
1226            The list of dates for which the disaggregation is to be done.
1227
1228        step_list : list of int
1229            The list of steps for a single forecast time.
1230            Only necessary for pure forecasts.
1231
1232        c : ControlFile
1233            Contains all the parameters of CONTROL file and
1234            command line.
1235
1236        Return
1237        ------
1238
1239        '''
1240        print('... disaggregation or precipitation with new method.')
1241        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1242        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1243
1244        # do the disaggregation, but neglect the last value of the
1245        # time series. This one corresponds for example to 24 hour,
1246        # which we don't need.
1247        for ix in range(ni*nj):
1248            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1249            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1250
1251        # write to grib files (full/orig times to flux file and inbetween
1252        # times into seperate end files)
1253        print('... write disaggregated precipitation to files.')
1254        it = 0
1255        for date in date_list:
1256            for step in step_list:
1257                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1258
1259                if c.purefc:
1260                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1261                                   '.{:0>3}'.format(step)
1262                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1263                                '.{:0>3}'.format(step) + '_1'
1264                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1265                                '.{:0>3}'.format(step) + '_2'
1266                else:
1267                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1268                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1269                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1270
1271                # collect for final processing
1272                self.outputfilelist.append(os.path.basename(fluxfilename))
1273                self.outputfilelist.append(os.path.basename(filename1))
1274                self.outputfilelist.append(os.path.basename(filename2))
1275
1276                # write original time step to flux file as usual
1277                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1278                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1279                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1280                                  keynames=['date','time','stepRange','values'],
1281                                  keyvalues=[int(date.strftime('%Y%m%d')),
1282                                             date.hour*100, step, lsp_new_np[:,it]],
1283                                 )
1284                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1285                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1286                                  keynames=['date','time','stepRange','values'],
1287                                  keyvalues=[int(date.strftime('%Y%m%d')),
1288                                             date.hour*100, step, cp_new_np[:,it]]
1289                                 )
1290
1291                # write first subgrid time step
1292                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1293                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1294                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1295                                  keynames=['date','time','stepRange','values'],
1296                                  keyvalues=[int(date.strftime('%Y%m%d')),
1297                                             date.hour*100, step, lsp_new_np[:,it+1]]
1298                                  )
1299                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1300                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1301                                  keynames=['date','time','stepRange','values'],
1302                                  keyvalues=[int(date.strftime('%Y%m%d')),
1303                                             date.hour*100, step, cp_new_np[:,it+1]]
1304                                 )
1305
1306                # write second subgrid time step
1307                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1308                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1309                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1310                                  keynames=['date','time','stepRange','values'],
1311                                  keyvalues=[int(date.strftime('%Y%m%d')),
1312                                             date.hour*100, step, lsp_new_np[:,it+2]]
1313                                 )
1314                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1315                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1316                                  keynames=['date','time','stepRange','values'],
1317                                  keyvalues=[int(date.strftime('%Y%m%d')),
1318                                             date.hour*100, step, cp_new_np[:,it+2]]
1319                                 )
1320                it = it + 3 # jump to next original time step
1321        return
1322
1323    def _create_rr_grib_dummy(self, ifile, inputdir):
1324        '''Creates a grib file with a dummy message for the two precipitation
1325        types lsp and cp each.
1326
1327        Parameters
1328        ----------
1329        ifile : str
1330            Filename of the input file to read the grib messages from.
1331
1332        inputdir : str, optional
1333            Path to the directory where the retrieved data is stored.
1334
1335        Return
1336        ------
1337
1338        '''
1339
1340        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1341
1342        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1343                      keyvalues=[142], filemode='w')
1344
1345        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1346                      keyvalues=[143], filemode='a')
1347
1348        return
1349
1350    def create(self, inputfiles, c):
1351        '''An index file will be created which depends on the combination
1352        of "date", "time" and "stepRange" values. This is used to iterate
1353        over all messages in each grib file which were passed through the
1354        parameter "inputfiles" to seperate specific parameters into fort.*
1355        files. Afterwards the FORTRAN program is called to convert
1356        the data fields all to the same grid and put them in one file
1357        per unique time step (combination of "date", "time" and
1358        "stepRange").
1359
1360        Note
1361        ----
1362        This method is based on the ECMWF example index.py
1363        https://software.ecmwf.int/wiki/display/GRIB/index.py
1364
1365        Parameters
1366        ----------
1367        inputfiles : UioFiles
1368            Contains a list of files.
1369
1370        c : ControlFile
1371            Contains all the parameters of CONTROL file and
1372            command line.
1373
1374        Return
1375        ------
1376
1377        '''
1378
1379        # generate start and end timestamp of the retrieval period
1380        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1381        start_period = start_period + timedelta(hours=int(c.step[0]))
1382        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1383        end_period = end_period + timedelta(hours=int(c.step[-1]))
1384
1385        if c.wrf:
1386            table128 = init128(_config.PATH_GRIBTABLE)
1387            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1388                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1389                                  table128)
1390
1391        # these numbers are indices for the temporary files "fort.xx"
1392        # which are used to seperate the grib fields to,
1393        # for the Fortran program input
1394        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1395        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1396        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1397                 '17':None, '18':None, '19':None, '21':None, '22':None}
1398
1399        iid = None
1400        index_vals = None
1401
1402        # get the values of the keys which are used for distinct access
1403        # of grib messages via product
1404        if '/' in self.number:
1405            # more than one ensemble member is selected
1406            index_keys = ["number", "date", "time", "step"]
1407        else:
1408            index_keys = ["date", "time", "step"]
1409        iid, index_vals = self._mk_index_values(c.inputdir,
1410                                                inputfiles,
1411                                                index_keys)
1412        # index_vals looks like e.g.:
1413        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1414        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1415        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1416
1417        # "product" genereates each possible combination between the
1418        # values of the index keys
1419        for prod in product(*index_vals):
1420            # e.g. prod = ('20170505', '0', '12')
1421            #             (  date    ,time, step)
1422
1423            print('current product: ', prod)
1424
1425            for i in range(len(index_keys)):
1426                codes_index_select(iid, index_keys[i], prod[i])
1427
1428            # get first id from current product
1429            gid = codes_new_from_index(iid)
1430
1431            # if there is no data for this specific time combination / product
1432            # skip the rest of the for loop and start with next timestep/product
1433            if not gid:
1434                continue
1435#============================================================================================
1436            # remove old fort.* files and open new ones
1437            # they are just valid for a single product
1438            for k, f in fdict.iteritems():
1439                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1440                silent_remove(fortfile)
1441                fdict[k] = open(fortfile, 'w')
1442#============================================================================================
1443            # create correct timestamp from the three time informations
1444            cdate = str(codes_get(gid, 'date'))
1445            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1446            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1447            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1448            timestamp += timedelta(hours=int(cstep))
1449            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1450
1451            # eliminate all temporary times
1452            # which are outside the retrieval period
1453            if timestamp < start_period or \
1454               timestamp > end_period:
1455                continue
1456
1457            # if the timestamp is out of basetime start/end date period,
1458            # skip this specific product
1459            if c.basetime is not None:
1460                time_delta = timedelta(hours=12-int(c.dtime))
1461                start_time = datetime.strptime(c.end_date + str(c.basetime),
1462                                                '%Y%m%d%H') - time_delta
1463                end_time = datetime.strptime(c.end_date + str(c.basetime),
1464                                             '%Y%m%d%H')
1465                if timestamp < start_time or timestamp > end_time:
1466                    continue
1467
1468            if c.wrf:
1469                if 'olddate' not in locals() or cdate != olddate:
1470                    fwrf = open(os.path.join(c.outputdir,
1471                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1472                    olddate = cdate[:]
1473#============================================================================================
1474            # savedfields remembers which fields were already used.
1475            savedfields = []
1476            # sum of cloud liquid and ice water content
1477            scwc = None
1478            while 1:
1479                if not gid:
1480                    break
1481                paramId = codes_get(gid, 'paramId')
1482                gridtype = codes_get(gid, 'gridType')
1483                levtype = codes_get(gid, 'typeOfLevel')
1484                if paramId == 77: # ETADOT
1485                    codes_write(gid, fdict['21'])
1486                elif paramId == 130: # T
1487                    codes_write(gid, fdict['11'])
1488                elif paramId == 131 or paramId == 132: # U, V wind component
1489                    codes_write(gid, fdict['10'])
1490                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1491                    codes_write(gid, fdict['17'])
1492                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1493                    codes_write(gid, fdict['18'])
1494                elif paramId == 135: # W
1495                    codes_write(gid, fdict['19'])
1496                elif paramId == 152: # LNSP
1497                    codes_write(gid, fdict['12'])
1498                elif paramId == 155 and gridtype == 'sh': # D
1499                    codes_write(gid, fdict['13'])
1500                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1501                    # sum cloud liquid water and ice
1502                    if scwc is None:
1503                        scwc = codes_get_values(gid)
1504                    else:
1505                        scwc += codes_get_values(gid)
1506                        codes_set_values(gid, scwc)
1507                        codes_set(gid, 'paramId', 201031)
1508                        codes_write(gid, fdict['22'])
1509                        scwc = None
1510                elif c.wrf and paramId in [129, 138, 155] and \
1511                      levtype == 'hybrid': # Z, VO, D
1512                    # do not do anything right now
1513                    # these are specific parameter for WRF
1514                    pass
1515                else:
1516                    if paramId not in savedfields:
1517                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1518                        # and all ADDPAR parameter
1519                        codes_write(gid, fdict['16'])
1520                        savedfields.append(paramId)
1521                    else:
1522                        print('duplicate ' + str(paramId) + ' not written')
1523
1524                try:
1525                    if c.wrf:
1526                        # model layer
1527                        if levtype == 'hybrid' and \
1528                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1529                            codes_write(gid, fwrf)
1530                        # sfc layer
1531                        elif paramId in wrfpars:
1532                            codes_write(gid, fwrf)
1533                except AttributeError:
1534                    pass
1535
1536                codes_release(gid)
1537                gid = codes_new_from_index(iid)
1538#============================================================================================
1539            for f in fdict.values():
1540                f.close()
1541#============================================================================================
1542            # call for Fortran program to convert e.g. reduced_gg grids to
1543            # regular_ll and calculate detadot/dp
1544            pwd = os.getcwd()
1545            os.chdir(c.inputdir)
1546            if os.stat('fort.21').st_size == 0 and c.eta:
1547                print('Parameter 77 (etadot) is missing, most likely it is \
1548                       not available for this type or date/time\n')
1549                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1550                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1551                         is set to 1 in CONTROL file')
1552#============================================================================================
1553            # write out all output to log file before starting fortran programm
1554            sys.stdout.flush()
1555
1556            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1557            execute_subprocess([os.path.join(c.exedir,
1558                                _config.FORTRAN_EXECUTABLE)],
1559                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1560
1561            os.chdir(pwd)
1562#============================================================================================
1563            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1564            if c.purefc:
1565                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1566            else:
1567                suffix = cdate_hour[2:10]
1568
1569            # if necessary, add ensemble member number to filename suffix
1570            if 'number' in index_keys:
1571                index_number = index_keys.index('number')
1572                if len(index_vals[index_number]) > 1:
1573                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1574
1575            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1576            print("outputfile = " + fnout)
1577            # collect for final processing
1578            self.outputfilelist.append(os.path.basename(fnout))
1579#============================================================================================
1580            # create outputfile and copy all data from intermediate files
1581            # to the outputfile (final GRIB input files for FLEXPART)
1582            orolsm = os.path.basename(glob.glob(c.inputdir +
1583                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1584            fluxfile = 'flux' + cdate[0:2] + suffix
1585            if not c.cwc:
1586                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1587            else:
1588                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1589
1590            with open(fnout, 'wb') as fout:
1591                for f in flist:
1592                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1593                                       fout)
1594
1595            if c.omega:
1596                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1597                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1598                                            'rb'), fout)
1599#============================================================================================
1600        if c.wrf:
1601            fwrf.close()
1602
1603        codes_index_release(iid)
1604
1605        return
1606
1607
1608    def calc_extra_elda(self, path, prefix):
1609        ''' Calculates extra ensemble members for ELDA - Stream.
1610
1611        Parameters
1612        ----------
1613        path : str
1614            Path to the output files.
1615
1616        prefix : str
1617            The prefix of the output filenames as defined in Control file.
1618
1619        Return
1620        ------
1621
1622        '''
1623        # max number
1624        maxnum = int(self.number.split('/')[-1])
1625
1626        # get a list of all prepared output files with control forecast (CF)
1627        CF_filelist = UioFiles(path, prefix + '*.N000')
1628
1629        for cffile in CF_filelist.files:
1630            with open(cffile, 'rb') as f:
1631                cfvalues=[]
1632                while True:
1633                    fid = codes_grib_new_from_file(f)
1634                    if fid is None:
1635                        break
1636                    cfvalues.append(codes_get_array(fid, 'values'))
1637                    codes_release(fid)
1638
1639            filename = cffile.split('N000')[0]
1640            for i in range(1, maxnum+1): # max number nehmen
1641
1642                # read an ensemble member
1643                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1644                # create file for newly calculated ensemble member
1645                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1646                # number of message in grib file
1647                j = 0
1648                while True:
1649                    gid = codes_grib_new_from_file(g)
1650                    if gid is None:
1651                        break
1652                    values = codes_get_array(gid, 'values')
1653                    codes_set_array(gid, 'values',
1654                                    values-2*(values-cfvalues[j]))
1655                    codes_set(gid, 'number', i+maxnum)
1656                    codes_write(gid, h)
1657                    codes_release(gid)
1658                    j += 1
1659
1660                g.close()
1661                h.close()
1662                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1663                self.outputfilelist.append(
1664                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1665
1666        return
1667
1668
1669    def process_output(self, c):
1670        '''Postprocessing of FLEXPART input files.
1671
1672        The grib files are postprocessed depending on the selection in
1673        CONTROL file. The resulting files are moved to the output
1674        directory if its not equal to the input directory.
1675        The following modifications might be done if
1676        properly switched in CONTROL file:
1677        GRIB2 - Conversion to GRIB2
1678        ECTRANS - Transfer of files to gateway server
1679        ECSTORAGE - Storage at ECMWF server
1680
1681        Parameters
1682        ----------
1683        c : ControlFile
1684            Contains all the parameters of CONTROL file and
1685            command line.
1686
1687        Return
1688        ------
1689
1690        '''
1691
1692        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1693
1694        if not c.ecapi:
1695            print('ecstorage: {}\n ecfsdir: {}\n'.
1696                  format(c.ecstorage, c.ecfsdir))
1697            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1698                  .format(c.ectrans, c.gateway, c.destination))
1699
1700        print('Output filelist: ')
1701        print(self.outputfilelist)
1702
1703        for ofile in self.outputfilelist:
1704            ofile = os.path.join(self.inputdir, ofile)
1705
1706            if c.format.lower() == 'grib2':
1707                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1708                                    'productDefinitionTemplateNumber=8',
1709                                    ofile, ofile + '_2'],
1710                                   error_msg='GRIB2 CONVERSION FAILED!')
1711
1712                execute_subprocess(['mv', ofile + '_2', ofile],
1713                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1714                                   'FILES FAILED!')
1715
1716            if c.ectrans and not c.ecapi:
1717                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1718                                    c.gateway, '-remote', c.destination,
1719                                    '-source', ofile],
1720                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1721
1722            if c.ecstorage and not c.ecapi:
1723                execute_subprocess(['ecp', '-o', ofile,
1724                                    os.path.expandvars(c.ecfsdir)],
1725                                   error_msg='COPY OF FILES TO ECSTORAGE '
1726                                   'AREA FAILED!')
1727
1728            if c.outputdir != c.inputdir:
1729                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1730                                    c.outputdir],
1731                                   error_msg='RELOCATION OF OUTPUT FILES '
1732                                   'TO OUTPUTDIR FAILED!')
1733
1734        return
1735
1736
1737    def prepare_fp_files(self, c):
1738        '''Conversion of GRIB files to FLEXPART binary format.
1739
1740        Parameters
1741        ----------
1742        c : ControlFile
1743            Contains all the parameters of CONTROL file and
1744            command line.
1745
1746        Return
1747        ------
1748
1749        '''
1750        # generate AVAILABLE file
1751        # Example of AVAILABLE file data:
1752        # 20131107 000000      EN13110700              ON DISC
1753        clist = []
1754        for ofile in self.outputfilelist:
1755            fname = ofile.split('/')
1756            if '.' in fname[-1]:
1757                l = fname[-1].split('.')
1758                timestamp = datetime.strptime(l[0][-6:] + l[1],
1759                                              '%y%m%d%H')
1760                timestamp += timedelta(hours=int(l[2]))
1761                cdate = datetime.strftime(timestamp, '%Y%m%d')
1762                chms = datetime.strftime(timestamp, '%H%M%S')
1763            else:
1764                cdate = '20' + fname[-1][-8:-2]
1765                chms = fname[-1][-2:] + '0000'
1766            clist.append(cdate + ' ' + chms + ' '*6 +
1767                         fname[-1] + ' '*14 + 'ON DISC')
1768        clist.sort()
1769        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1770            f.write('\n'.join(clist) + '\n')
1771
1772        # generate pathnames file
1773        pwd = os.path.abspath(c.outputdir)
1774        with open(pwd + '/pathnames', 'w') as f:
1775            f.write(pwd + '/Options/\n')
1776            f.write(pwd + '/\n')
1777            f.write(pwd + '/\n')
1778            f.write(pwd + '/AVAILABLE\n')
1779            f.write(' = == = == = == = == = == ==  = \n')
1780
1781        # create Options dir if necessary
1782        if not os.path.exists(pwd + '/Options'):
1783            make_dir(pwd+'/Options')
1784
1785        # read template COMMAND file
1786        with open(os.path.expandvars(os.path.expanduser(
1787            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1788            lflist = f.read().split('\n')
1789
1790        # find index of list where to put in the
1791        # date and time information
1792        # usually after the LDIRECT parameter
1793        i = 0
1794        for l in lflist:
1795            if 'LDIRECT' in l.upper():
1796                break
1797            i += 1
1798
1799        # insert the date and time information of run start and end
1800        # into the list of lines of COMMAND file
1801        lflist = lflist[:i+1] + \
1802                 [clist[0][:16], clist[-1][:16]] + \
1803                 lflist[i+3:]
1804
1805        # write the new COMMAND file
1806        with open(pwd + '/Options/COMMAND', 'w') as g:
1807            g.write('\n'.join(lflist) + '\n')
1808
1809        # change to outputdir and start the grib2flexpart run
1810        # afterwards switch back to the working dir
1811        os.chdir(c.outputdir)
1812        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1813         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1814        execute_subprocess(cmd)
1815        os.chdir(pwd)
1816
1817        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG