source: flex_extract.git/source/python/classes/EcFlexpart.py @ e811e1a

ctbtodev
Last change on this file since e811e1a was 45b99e6, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

added possibility to extract ensemble members with ELDA stream (and ENFO)

  • Property mode set to 100644
File size: 72.5 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get, codes_get_array,
70                     codes_set_array, codes_grib_new_from_file)
71
72# software specific classes and modules from flex_extract
73sys.path.append('../')
74import _config
75from GribUtil import GribUtil
76from mods.tools import (init128, to_param_id, silent_remove, product,
77                        my_error, make_dir, get_informations, get_dimensions,
78                        execute_subprocess)
79from MarsRetrieval import MarsRetrieval
80from UioFiles import UioFiles
81import mods.disaggregation as disaggregation
82
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = range(len(ftime))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        self.types[str(self.acctype)] = {'times': str(self.acctime),
324                                         'steps': '{}/to/{}/by/{}'.format(
325                                             self.dtime,
326                                             self.accmaxstep,
327                                             self.dtime)}
328        return
329
330    def _create_params(self, gauss, eta, omega, cwc, wrf):
331        '''Define the specific parameter settings for retrievment.
332
333        The different parameters need specific grid types and level types
334        for retrievement. We might get following combination of types
335        (depending on selection and availability):
336        (These are short cuts for the grib file names (leading sequence)
337        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
338        where:
339            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
340            ML = Model Level, SL = Surface Level
341
342        For each of this combination there is a list of parameter names,
343        the level type, the level list and the grid resolution.
344
345        There are different scenarios for data extraction from MARS:
346        1) Retrieval of etadot
347           eta=1, gauss=0, omega=0
348        2) Calculation of etadot from divergence
349           eta=0, gauss=1, omega=0
350        3) Calculation of etadot from omega (for makes sense for debugging)
351           eta=0, gauss=0, omega=1
352        4) Retrieval and Calculation of etadot (only for debugging)
353           eta=1, gauss=1, omega=0
354        5) Download also specific model and surface level data for FLEXPART-WRF
355
356        Parameters:
357        -----------
358        gauss : int
359            Gaussian grid is retrieved.
360
361        eta : int
362            Etadot parameter will be directly retrieved.
363
364        omega : int
365            The omega paramterwill be retrieved.
366
367        cwc : int
368            The cloud liquid and ice water content will be retrieved.
369
370        wrf : int
371            Additional model level and surface level data will be retrieved for
372            WRF/FLEXPART-WRF simulations.
373
374        Return
375        ------
376
377        '''
378        # SURFACE FIELDS
379        #-----------------------------------------------------------------------
380        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
381        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
382                                 'SFC', '1', self.grid]
383        if self.addpar:
384            self.params['OG__SL'][0] += self.addpar
385
386        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
387            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
388                                            'SFC', '1', self.grid]
389        else:
390            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
391                                            'SFC', '1', self.grid]
392
393        # MODEL LEVEL FIELDS
394        #-----------------------------------------------------------------------
395        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
396
397        if not gauss and eta:
398            self.params['OG__ML'][0] += '/U/V/ETADOT'
399        elif gauss and not eta:
400            self.params['GG__SL'] = ['Q', 'ML', '1', \
401                                     '{}'.format((int(self.resol) + 1) / 2)]
402            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
403        elif not gauss and not eta:
404            self.params['OG__ML'][0] += '/U/V'
405        else:
406            print('Warning: Collecting etadot and parameters for gaussian grid \
407                            is a very costly parameter combination, \
408                            use this combination only for debugging!')
409            self.params['GG__SL'] = ['Q', 'ML', '1', \
410                                     '{}'.format((int(self.resol) + 1) / 2)]
411            self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
412                                     '{}'.format((int(self.resol) + 1) / 2)]
413
414        if omega:
415            self.params['OG__ML'][0] += '/W'
416
417        if cwc:
418            self.params['OG__ML'][0] += '/CLWC/CIWC'
419
420        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
421        #-----------------------------------------------------------------------
422        if wrf:
423            self.params['OG__ML'][0] += '/Z/VO'
424            if '/D' not in self.params['OG__ML'][0]:
425                self.params['OG__ML'][0] += '/D'
426            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
427                       'SWVL1','SWVL2','SWVL3','SWVL4']
428            for par in wrf_sfc:
429                if par not in self.params['OG__SL'][0]:
430                    self.params['OG__SL'][0] += '/' + par
431
432        return
433
434
435    def _create_params_fluxes(self):
436        '''Define the parameter setting for flux data.
437
438        Flux data are accumulated fields in time and are stored on the
439        surface level. The leading short cut name for the grib files is:
440        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
441        acc for Accumulated Grid.
442        The params dictionary stores a list of parameter names, the level type,
443        the level list and the grid resolution.
444
445        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
446
447        Parameters:
448        -----------
449
450        Return
451        ------
452
453        '''
454        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
455                                    'SFC', '1', self.grid]
456        return
457
458
459    def _mk_targetname(self, ftype, param, date):
460        '''Creates the filename for the requested grib data to be stored in.
461        This name is passed as the "target" parameter in the request.
462
463        Parameters
464        ----------
465        ftype : str
466            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
467
468        param : str
469            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
470            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
471
472        date : str
473            The date period of the grib data to be stored in this file.
474
475        Return
476        ------
477        targetname : str
478            The target filename for the grib data.
479        '''
480        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
481                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
482
483        return targetname
484
485
486    def _start_retrievement(self, request, par_dict):
487        '''Creates the Mars Retrieval and prints or submits the request
488        depending on the status of the request variable.
489
490        Parameters
491        ----------
492        request : int
493            Selects the mode of retrieval.
494            0: Retrieves the data from ECMWF.
495            1: Prints the mars requests to an output file.
496            2: Retrieves the data and prints the mars request.
497
498        par_dict : dictionary
499            Contains all parameter which have to be set for creating the
500            Mars Retrievals. The parameter are:
501            marsclass, dataset, stream, type, levtype, levelist, resol,
502            gaussian, accuracy, grid, target, area, date, time, number,
503            step, expver, param
504
505        Return
506        ------
507
508        '''
509        # increase number of mars requests
510        self.mreq_count += 1
511
512        MR = MarsRetrieval(self.server,
513                           self.public,
514                           marsclass=par_dict['marsclass'],
515                           dataset=par_dict['dataset'],
516                           stream=par_dict['stream'],
517                           type=par_dict['type'],
518                           levtype=par_dict['levtype'],
519                           levelist=par_dict['levelist'],
520                           resol=par_dict['resol'],
521                           gaussian=par_dict['gaussian'],
522                           accuracy=par_dict['accuracy'],
523                           grid=par_dict['grid'],
524                           target=par_dict['target'],
525                           area=par_dict['area'],
526                           date=par_dict['date'],
527                           time=par_dict['time'],
528                           number=par_dict['number'],
529                           step=par_dict['step'],
530                           expver=par_dict['expver'],
531                           param=par_dict['param'])
532
533        if request == 0:
534            MR.display_info()
535            MR.data_retrieve()
536        elif request == 1:
537            MR.print_infodata_csv(self.inputdir, self.mreq_count)
538        elif request == 2:
539            MR.print_infodata_csv(self.inputdir, self.mreq_count)
540            MR.display_info()
541            MR.data_retrieve()
542        else:
543            print('Failure')
544
545        return
546
547
548    def _mk_index_values(self, inputdir, inputfiles, keys):
549        '''Creates an index file for a set of grib parameter keys.
550        The values from the index keys are returned in a list.
551
552        Parameters
553        ----------
554        keys : dictionary
555            List of parameter names which serves as index.
556
557        inputfiles : UioFiles
558            Contains a list of files.
559
560        Return
561        ------
562        iid : codes_index
563            This is a grib specific index structure to access
564            messages in a file.
565
566        index_vals : list of list  of str
567            Contains the values from the keys used for a distinct selection
568            of grib messages in processing  the grib files.
569            Content looks like e.g.:
570            index_vals[0]: ('20171106', '20171107', '20171108') ; date
571            index_vals[1]: ('0', '1200', '1800', '600') ; time
572            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
573        '''
574        iid = None
575        index_keys = keys
576
577        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
578        silent_remove(indexfile)
579        grib = GribUtil(inputfiles.files)
580        # creates new index file
581        iid = grib.index(index_keys=index_keys, index_file=indexfile)
582
583        # read the values of index keys
584        index_vals = []
585        for key in index_keys:
586            key_vals = codes_index_get(iid, key)
587            # have to sort the key values for correct disaggregation,
588            # therefore convert to int first
589            key_vals = [int(k) for k in key_vals]
590            key_vals.sort()
591            key_vals = [str(k) for k in key_vals]
592            index_vals.append(key_vals)
593            # index_vals looks for example like:
594            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
595            # index_vals[1]: ('0', '1200') ; time
596            # index_vals[2]: (3', '6', '9', '12') ; stepRange
597
598        return iid, index_vals
599
600
601    def retrieve(self, server, dates, public, request, inputdir='.'):
602        '''Finalizing the retrieval information by setting final details
603        depending on grid type.
604        Prepares MARS retrievals per grid type and submits them.
605
606        Parameters
607        ----------
608        server : ECMWFService or ECMWFDataServer
609            The connection to the ECMWF server. This is different
610            for member state users which have full access and non
611            member state users which have only access to the public
612            data sets. The decision is made from command line argument
613            "public"; for public access its True (ECMWFDataServer)
614            for member state users its False (ECMWFService)
615
616        dates : str
617            Contains start and end date of the retrieval in the format
618            "YYYYMMDD/to/YYYYMMDD"
619
620        request : int
621            Selects the mode of retrieval.
622            0: Retrieves the data from ECMWF.
623            1: Prints the mars requests to an output file.
624            2: Retrieves the data and prints the mars request.
625
626        inputdir : str, optional
627            Path to the directory where the retrieved data is about
628            to be stored. The default is the current directory ('.').
629
630        Return
631        ------
632
633        '''
634        self.dates = dates
635        self.server = server
636        self.public = public
637        self.inputdir = inputdir
638        oro = False
639
640        # define times with datetime module
641        t12h = timedelta(hours=12)
642        t24h = timedelta(hours=24)
643
644        # dictionary which contains all parameter for the mars request,
645        # entries with a "None" will change in different requests and will
646        # therefore be set in each request seperately
647        retr_param_dict = {'marsclass':self.marsclass,
648                           'dataset':self.dataset,
649                           'stream':None,
650                           'type':None,
651                           'levtype':None,
652                           'levelist':None,
653                           'resol':self.resol,
654                           'gaussian':None,
655                           'accuracy':self.accuracy,
656                           'grid':None,
657                           'target':None,
658                           'area':None,
659                           'date':None,
660                           'time':None,
661                           'number':self.number,
662                           'step':None,
663                           'expver':self.expver,
664                           'param':None}
665
666        for ftype in self.types:
667            # ftype contains field types such as
668            #     [AN, FC, PF, CV]
669            for pk, pv in self.params.iteritems():
670                # pk contains one of these keys of params
671                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
672                #      OG_OROLSM_SL, OG_acc_SL]
673                # pv contains all of the items of the belonging key
674                #     [param, levtype, levelist, grid]
675                if isinstance(pv, str):
676                    continue
677                retr_param_dict['type'] = ftype
678                retr_param_dict['time'] = self.types[ftype]['times']
679                retr_param_dict['step'] = self.types[ftype]['steps']
680                retr_param_dict['date'] = self.dates
681                retr_param_dict['stream'] = self.stream
682                retr_param_dict['target'] = \
683                    self._mk_targetname(ftype,
684                                        pk,
685                                        retr_param_dict['date'].split('/')[0])
686                retr_param_dict['param'] = pv[0]
687                retr_param_dict['levtype'] = pv[1]
688                retr_param_dict['levelist'] = pv[2]
689                retr_param_dict['grid'] = pv[3]
690                retr_param_dict['area'] = self.area
691                retr_param_dict['gaussian'] = self.gaussian
692
693                if pk == 'OG_OROLSM__SL' and not oro:
694                    oro = True
695                    # in CERA20C (class EP) there is no stream "OPER"!
696                    if self.marsclass.upper() != 'EP':
697                        retr_param_dict['stream'] = 'OPER'
698                    retr_param_dict['type'] = 'AN'
699                    retr_param_dict['time'] = '00'
700                    retr_param_dict['step'] = '000'
701                    retr_param_dict['date'] = self.dates.split('/')[0]
702                    retr_param_dict['target'] = self._mk_targetname('',
703                                            pk, retr_param_dict['date'])
704                elif pk == 'OG_OROLSM__SL' and oro:
705                    continue
706                if pk == 'GG__SL' and pv[0] == 'Q':
707                    retr_param_dict['area'] = ""
708                    retr_param_dict['gaussian'] = 'reduced'
709                if ftype.upper() == 'FC' and \
710                    'acc' not in retr_param_dict['target']:
711                    if (int(retr_param_dict['time'][0]) +
712                        int(retr_param_dict['step'][0])) > 23:
713                        dates = retr_param_dict['date'].split('/')
714                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
715                        sdate = sdate - timedelta(days=1)
716                        retr_param_dict['date'] = '/'.join(
717                            [sdate.strftime("%Y%m%d")] +
718                            retr_param_dict['date'][1:])
719
720                        print('CHANGED FC start date to ' +
721                              sdate.strftime("%Y%m%d") +
722                              ' to accomodate TIME=' +
723                              retr_param_dict['time'][0] +
724                              ', STEP=' +
725                              retr_param_dict['time'][0])
726
727    # ------  on demand path  --------------------------------------------------
728                if self.basetime is None:
729                    # ******* start retrievement
730                    self._start_retrievement(request, retr_param_dict)
731    # ------  operational path  ------------------------------------------------
732                else:
733                    # check if mars job requests fields beyond basetime.
734                    # if yes eliminate those fields since they may not
735                    # be accessible with user's credentials
736
737                    enddate = retr_param_dict['date'].split('/')[-1]
738                    elimit = datetime.strptime(enddate + str(self.basetime),
739                                               '%Y%m%d%H')
740
741                    if self.basetime == 12:
742                        # --------------  flux data ----------------------------
743                        if 'acc' in pk:
744                            startdate = retr_param_dict['date'].split('/')[0]
745                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
746                            retr_param_dict['date'] = '/'.join([startdate,
747                                                                'to',
748                                                                enddate])
749
750                            # ******* start retrievement
751                            self._start_retrievement(request, retr_param_dict)
752
753                            retr_param_dict['date'] = \
754                                datetime.strftime(elimit - t12h, '%Y%m%d')
755                            retr_param_dict['time'] = '00'
756                            retr_param_dict['target'] = \
757                                self._mk_targetname(ftype, pk,
758                                                    retr_param_dict['date'])
759
760                            # ******* start retrievement
761                            self._start_retrievement(request, retr_param_dict)
762
763                        # --------------  non flux data ------------------------
764                        else:
765                            # ******* start retrievement
766                            self._start_retrievement(request, retr_param_dict)
767
768                    elif self.basetime == 0:
769                        retr_param_dict['date'] = \
770                            datetime.strftime(elimit - t24h, '%Y%m%d')
771
772                        timesave = ''.join(retr_param_dict['time'])
773
774                        if ('/' in retr_param_dict['time'] and
775                            pk != 'OG_OROLSM__SL' and
776                            'acc' not in pk ) :
777                            times = retr_param_dict['time'].split('/')
778                            steps = retr_param_dict['step'].split('/')
779
780                            while int(times[0]) + int(steps[0]) <= 12:
781                                times = times[1:]
782                                if len(times) > 1:
783                                    retr_param_dict['time'] = '/'.join(times)
784                                else:
785                                    retr_param_dict['time'] = times[0]
786
787                        if (pk != 'OG_OROLSM__SL' and
788                            int(retr_param_dict['step'].split('/')[0]) == 0 and
789                            int(timesave.split('/')[0]) == 0):
790
791                            retr_param_dict['date'] = \
792                                datetime.strftime(elimit, '%Y%m%d')
793                            retr_param_dict['time'] = '00'
794                            retr_param_dict['step'] = '000'
795                            retr_param_dict['target'] = \
796                                self._mk_targetname(ftype, pk,
797                                                    retr_param_dict['date'])
798
799                        # ******* start retrievement
800                        self._start_retrievement(request, retr_param_dict)
801                    else:
802                        raise ValueError('ERROR: Basetime has an invalid value '
803                                                 '-> {}'.format(str(basetime)))
804
805        if request == 0 or request == 2:
806            print('MARS retrieve done ... ')
807        elif request == 1:
808            print('MARS request printed ...')
809
810        return
811
812
813    def write_namelist(self, c):
814        '''Creates a namelist file in the temporary directory and writes
815        the following values to it: maxl, maxb, mlevel,
816        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
817        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
818
819        Parameters
820        ----------
821        c : ControlFile
822            Contains all the parameters of CONTROL file and
823            command line.
824
825        filename : str
826                Name of the namelist file.
827
828        Return
829        ------
830
831        '''
832
833        from genshi.template.text import NewTextTemplate
834        from genshi.template import  TemplateLoader
835        from genshi.template.eval import  UndefinedError
836
837        try:
838            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
839            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
840                                            cls=NewTextTemplate)
841
842            self.inputdir = c.inputdir
843            area = np.asarray(self.area.split('/')).astype(float)
844            grid = np.asarray(self.grid.split('/')).astype(float)
845
846            if area[1] > area[3]:
847                area[1] -= 360
848            maxl = int((area[3] - area[1]) / grid[1]) + 1
849            maxb = int((area[0] - area[2]) / grid[0]) + 1
850
851            stream = namelist_template.generate(
852                maxl = str(maxl),
853                maxb = str(maxb),
854                mlevel = str(self.level),
855                mlevelist = str(self.levelist),
856                mnauf = str(self.resol),
857                metapar = '77',
858                rlo0 = str(area[1]),
859                rlo1 = str(area[3]),
860                rla0 = str(area[2]),
861                rla1 = str(area[0]),
862                momega = str(c.omega),
863                momegadiff = str(c.omegadiff),
864                mgauss = str(c.gauss),
865                msmooth = str(c.smooth),
866                meta = str(c.eta),
867                metadiff = str(c.etadiff),
868                mdpdeta = str(c.dpdeta)
869            )
870        except UndefinedError as e:
871            print('... ERROR ' + str(e))
872
873            sys.exit('\n... error occured while trying to generate namelist ' +
874                     _config.TEMPFILE_NAMELIST)
875        except OSError as e:
876            print('... ERROR CODE: ' + str(e.errno))
877            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
878
879            sys.exit('\n... error occured while trying to generate template ' +
880                     _config.TEMPFILE_NAMELIST)
881
882        try:
883            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
884
885            with open(namelistfile, 'w') as f:
886                f.write(stream.render('text'))
887        except OSError as e:
888            print('... ERROR CODE: ' + str(e.errno))
889            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
890
891            sys.exit('\n... error occured while trying to write ' +
892                     namelistfile)
893
894        return
895
896
897    def deacc_fluxes(self, inputfiles, c):
898        '''De-accumulate and disaggregate flux data.
899
900        Goes through all flux fields in ordered time and de-accumulate
901        the fields. Afterwards the fields are disaggregated in time.
902        Different versions of disaggregation is provided for rainfall
903        data (darain, modified linear) and the surface fluxes and
904        stress data (dapoly, cubic polynomial).
905
906        Parameters
907        ----------
908        inputfiles : UioFiles
909            Contains the list of files that contain flux data.
910
911        c : ControlFile
912            Contains all the parameters of CONTROL file and
913            command line.
914
915        Return
916        ------
917
918        '''
919
920        table128 = init128(_config.PATH_GRIBTABLE)
921        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
922
923        iid = None
924        index_vals = None
925
926        # get the values of the keys which are used for distinct access
927        # of grib messages via product
928        if '/' in self.number:
929            # more than one ensemble member is selected
930            index_keys = ["number", "date", "time", "step"]
931        else:
932            index_keys = ["date", "time", "step"]
933        iid, index_vals = self._mk_index_values(c.inputdir,
934                                                inputfiles,
935                                                index_keys)
936        # index_vals looks like e.g.:
937        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
938        # index_vals[1]: ('0', '600', '1200', '1800') ; time
939        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
940
941        if c.rrint:
942            if not c.purefc:
943                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
944                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
945            else:
946                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
947                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
948                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
949                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
950                end_date = end_date + timedelta(hours=c.maxstep)
951
952            info = get_informations(os.path.join(c.inputdir,
953                                                 inputfiles.files[0]))
954            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
955                                  start_date, end_date)
956            # create numpy array
957            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
958            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
959            it_lsp = 0
960            it_cp = 0
961            date_list = []
962            step_list = []
963
964        # initialize dictionaries to store values
965        orig_vals = {}
966        deac_vals = {}
967        for p in pars:
968            orig_vals[p] = []
969            deac_vals[p] = []
970
971        # "product" genereates each possible combination between the
972        # values of the index keys
973        for prod in product(*index_vals):
974            # e.g. prod = ('20170505', '0', '12')
975            #             (  date    ,time, step)
976
977            print('CURRENT PRODUCT: ', prod)
978
979            for i in range(len(index_keys)):
980                codes_index_select(iid, index_keys[i], prod[i])
981
982            # get first id from current product
983            gid = codes_new_from_index(iid)
984
985            # if there is no data for this specific time combination / product
986            # skip the rest of the for loop and start with next timestep/product
987            if not gid:
988                continue
989
990            # create correct timestamp from the three time informations
991            cdate = str(codes_get(gid, 'date'))
992            time = codes_get(gid, 'time')/100 # integer
993            step = codes_get(gid, 'step') # integer
994            ctime = '{:0>2}'.format(time)
995            cstep = '{:0>3}'.format(step)
996
997            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
998            t_dt = t_date + timedelta(hours=step)
999            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1000            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1001            if c.basetime is not None:
1002                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1003                                              '%Y%m%d%H')
1004            else:
1005                t_enddate = t_date + timedelta(2*int(c.dtime))
1006
1007            # if necessary, add ensemble member number to filename suffix
1008            # otherwise, add empty string
1009            if 'number' in index_keys:
1010                index_number = index_keys.index('number')
1011                if len(index_vals[index_number]) > 1:
1012                    numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1013            else:
1014                numbersuffix = ''
1015
1016            if c.purefc:
1017                fnout = os.path.join(c.inputdir, 'flux' +
1018                                     t_date.strftime('%Y%m%d.%H') +
1019                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1020                                     numbersuffix)
1021                gnout = os.path.join(c.inputdir, 'flux' +
1022                                     t_date.strftime('%Y%m%d.%H') +
1023                                     '.{:0>3}'.format(step-int(c.dtime)) +
1024                                     numbersuffix)
1025                hnout = os.path.join(c.inputdir, 'flux' +
1026                                     t_date.strftime('%Y%m%d.%H') +
1027                                     '.{:0>3}'.format(step) +
1028                                     numbersuffix)
1029            else:
1030                fnout = os.path.join(c.inputdir, 'flux' +
1031                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1032                gnout = os.path.join(c.inputdir, 'flux' +
1033                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1034                hnout = os.path.join(c.inputdir, 'flux' +
1035                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1036
1037            print("outputfile = " + fnout)
1038            f_handle = open(fnout, 'w')
1039            h_handle = open(hnout, 'w')
1040            g_handle = open(gnout, 'w')
1041
1042            # read message for message and store relevant data fields, where
1043            # data keywords are stored in pars
1044            while True:
1045                if not gid:
1046                    break
1047                parId = codes_get(gid, 'paramId') # integer
1048                step = codes_get(gid, 'step') # integer
1049                time = codes_get(gid, 'time') # integer
1050                ni = codes_get(gid, 'Ni') # integer
1051                nj = codes_get(gid, 'Nj') # integer
1052                if parId not in orig_vals.keys():
1053                    # parameter is not a flux, find next one
1054                    continue
1055
1056                # define conversion factor
1057                if parId == 142 or parId == 143:
1058                    fak = 1. / 1000.
1059                else:
1060                    fak = 3600.
1061
1062                # get parameter values and reshape
1063                values = codes_get_values(gid)
1064                values = (np.reshape(values, (nj, ni))).flatten() / fak
1065
1066                # save the original and accumulated values
1067                orig_vals[parId].append(values[:])
1068
1069                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1070                    # no de-accumulation needed
1071                    deac_vals[parId].append(values[:] / int(c.dtime))
1072                else:
1073                    # do de-accumulation
1074                    deac_vals[parId].append(
1075                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1076                         int(c.dtime))
1077
1078                # store precipitation if new disaggregation method is selected
1079                # only the exact days are needed
1080                if c.rrint:
1081                    if start_date <= t_dt <= end_date:
1082                        if not c.purefc:
1083                            if t_dt not in date_list:
1084                                date_list.append(t_dt)
1085                                step_list = [0]
1086                        else:
1087                            if t_date not in date_list:
1088                                date_list.append(t_date)
1089                            if step not in step_list:
1090                                step_list.append(step)
1091                        if c.rrint and parId == 142:
1092                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
1093                            it_lsp += 1
1094                        elif c.rrint and parId == 143:
1095                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
1096                            it_cp += 1
1097
1098                # information printout
1099                print(parId, time, step, len(values), values[0], np.std(values))
1100
1101                # length of deac_vals[parId] corresponds to the
1102                # number of time steps, max. 4 are needed for disaggegration
1103                # with the old and original method
1104                # run over all grib messages and perform
1105                # shifting in time
1106                if len(deac_vals[parId]) >= 3:
1107                    if len(deac_vals[parId]) > 3:
1108                        if not c.rrint and (parId == 142 or parId == 143):
1109                            values = disaggregation.darain(deac_vals[parId])
1110                        else:
1111                            values = disaggregation.dapoly(deac_vals[parId])
1112
1113                        if not (step == c.maxstep and c.purefc \
1114                                or t_dt == t_enddate):
1115                            # remove first time step in list to shift
1116                            # time line
1117                            orig_vals[parId].pop(0)
1118                            deac_vals[parId].pop(0)
1119                    else:
1120                        # if the third time step is read (per parId),
1121                        # write out the first one as a boundary value
1122                        if c.purefc:
1123                            values = deac_vals[parId][1]
1124                        else:
1125                            values = deac_vals[parId][0]
1126
1127                    if not (c.rrint and (parId == 142 or parId == 143)):
1128                        codes_set_values(gid, values)
1129
1130                        if c.purefc:
1131                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1132                        else:
1133                            codes_set(gid, 'stepRange', 0)
1134                            codes_set(gid, 'time', t_m2dt.hour*100)
1135                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1136
1137                        codes_write(gid, f_handle)
1138
1139                        # squeeze out information of last two steps
1140                        # contained in deac_vals[parId]
1141                        # Note that deac_vals[parId][0] has not been popped
1142                        # in this case
1143
1144                        if step == c.maxstep and c.purefc or \
1145                           t_dt == t_enddate:
1146                            # last step
1147                            if c.purefc:
1148                                values = deac_vals[parId][3]
1149                                codes_set_values(gid, values)
1150                                codes_set(gid, 'stepRange', step)
1151                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1152                                codes_write(gid, h_handle)
1153                            else:
1154                                values = deac_vals[parId][3]
1155                                codes_set_values(gid, values)
1156                                codes_set(gid, 'stepRange', 0)
1157                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1158                                codes_set(gid, 'time', truedatetime.hour * 100)
1159                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1160                                codes_write(gid, h_handle)
1161
1162                            if parId == 142 or parId == 143:
1163                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1164                            else:
1165                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1166
1167                            # step before last step
1168                            if c.purefc:
1169                                codes_set(gid, 'stepRange', step-int(c.dtime))
1170                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1171                                codes_set_values(gid, values)
1172                                codes_write(gid, g_handle)
1173                            else:
1174                                codes_set(gid, 'stepRange', 0)
1175                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1176                                codes_set(gid, 'time', truedatetime.hour * 100)
1177                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1178                                codes_set_values(gid, values)
1179                                codes_write(gid, g_handle)
1180
1181                codes_release(gid)
1182
1183                gid = codes_new_from_index(iid)
1184
1185            f_handle.close()
1186            g_handle.close()
1187            h_handle.close()
1188
1189        codes_index_release(iid)
1190
1191        if c.rrint:
1192            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1193
1194            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1195                                 cp_np, date_list, step_list, c)
1196
1197        return
1198
1199    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1200        '''Calculates and writes out the disaggregated precipitation fields.
1201
1202        Disaggregation is done in time and original times are written to the
1203        flux files, while the additional subgrid times are written to
1204        extra files.
1205
1206        Parameters
1207        ----------
1208        ni : int
1209            Amount of zonal grid points.
1210
1211        nj : int
1212            Amount of meridional grid points.
1213
1214        nt : int
1215            Number of time steps.
1216
1217        lsp_np : numpy array of float
1218            The large scale precipitation fields for each time step.
1219            Shape (ni * nj, nt).
1220
1221        cp_np : numpy array of float
1222            The convective precipitation fields for each time step.
1223            Shape (ni * nj, nt).
1224
1225        date_list : list of datetime
1226            The list of dates for which the disaggregation is to be done.
1227
1228        step_list : list of int
1229            The list of steps for a single forecast time.
1230            Only necessary for pure forecasts.
1231
1232        c : ControlFile
1233            Contains all the parameters of CONTROL file and
1234            command line.
1235
1236        Return
1237        ------
1238
1239        '''
1240        print('... disaggregation or precipitation with new method.')
1241        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1242        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1243
1244        # do the disaggregation, but neglect the last value of the
1245        # time series. This one corresponds for example to 24 hour,
1246        # which we don't need.
1247        for ix in range(ni*nj):
1248            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1249            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1250
1251        # write to grib files (full/orig times to flux file and inbetween
1252        # times into seperate end files)
1253        print('... write disaggregated precipitation to files.')
1254        it = 0
1255        for date in date_list:
1256            for step in step_list:
1257                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1258
1259                if c.purefc:
1260                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1261                                   '.{:0>3}'.format(step)
1262                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1263                                '.{:0>3}'.format(step) + '_1'
1264                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1265                                '.{:0>3}'.format(step) + '_2'
1266                else:
1267                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1268                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1269                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1270
1271                # collect for final processing
1272                self.outputfilelist.append(os.path.basename(fluxfilename))
1273                self.outputfilelist.append(os.path.basename(filename1))
1274                self.outputfilelist.append(os.path.basename(filename2))
1275
1276                # write original time step to flux file as usual
1277                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1278                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1279                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1280                                  keynames=['date','time','stepRange','values'],
1281                                  keyvalues=[int(date.strftime('%Y%m%d')),
1282                                             date.hour*100, step, lsp_new_np[:,it]],
1283                                 )
1284                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1285                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1286                                  keynames=['date','time','stepRange','values'],
1287                                  keyvalues=[int(date.strftime('%Y%m%d')),
1288                                             date.hour*100, step, cp_new_np[:,it]]
1289                                 )
1290
1291                # write first subgrid time step
1292                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1293                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1294                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1295                                  keynames=['date','time','stepRange','values'],
1296                                  keyvalues=[int(date.strftime('%Y%m%d')),
1297                                             date.hour*100, step, lsp_new_np[:,it+1]]
1298                                  )
1299                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1300                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1301                                  keynames=['date','time','stepRange','values'],
1302                                  keyvalues=[int(date.strftime('%Y%m%d')),
1303                                             date.hour*100, step, cp_new_np[:,it+1]]
1304                                 )
1305
1306                # write second subgrid time step
1307                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1308                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1309                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1310                                  keynames=['date','time','stepRange','values'],
1311                                  keyvalues=[int(date.strftime('%Y%m%d')),
1312                                             date.hour*100, step, lsp_new_np[:,it+2]]
1313                                 )
1314                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1315                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1316                                  keynames=['date','time','stepRange','values'],
1317                                  keyvalues=[int(date.strftime('%Y%m%d')),
1318                                             date.hour*100, step, cp_new_np[:,it+2]]
1319                                 )
1320                it = it + 3 # jump to next original time step
1321        return
1322
1323    def _create_rr_grib_dummy(self, ifile, inputdir):
1324        '''Creates a grib file with a dummy message for the two precipitation
1325        types lsp and cp each.
1326
1327        Parameters
1328        ----------
1329        ifile : str
1330            Filename of the input file to read the grib messages from.
1331
1332        inputdir : str, optional
1333            Path to the directory where the retrieved data is stored.
1334
1335        Return
1336        ------
1337
1338        '''
1339
1340        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1341
1342        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1343                      keyvalues=[142], filemode='w')
1344
1345        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1346                      keyvalues=[143], filemode='a')
1347
1348        return
1349
1350    def create(self, inputfiles, c):
1351        '''An index file will be created which depends on the combination
1352        of "date", "time" and "stepRange" values. This is used to iterate
1353        over all messages in each grib file which were passed through the
1354        parameter "inputfiles" to seperate specific parameters into fort.*
1355        files. Afterwards the FORTRAN program is called to convert
1356        the data fields all to the same grid and put them in one file
1357        per unique time step (combination of "date", "time" and
1358        "stepRange").
1359
1360        Note
1361        ----
1362        This method is based on the ECMWF example index.py
1363        https://software.ecmwf.int/wiki/display/GRIB/index.py
1364
1365        Parameters
1366        ----------
1367        inputfiles : UioFiles
1368            Contains a list of files.
1369
1370        c : ControlFile
1371            Contains all the parameters of CONTROL file and
1372            command line.
1373
1374        Return
1375        ------
1376
1377        '''
1378
1379        if c.wrf:
1380            table128 = init128(_config.PATH_GRIBTABLE)
1381            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1382                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1383                                  table128)
1384
1385        # these numbers are indices for the temporary files "fort.xx"
1386        # which are used to seperate the grib fields to,
1387        # for the Fortran program input
1388        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1389        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1390        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1391                 '17':None, '18':None, '19':None, '21':None, '22':None}
1392
1393        iid = None
1394        index_vals = None
1395
1396        # get the values of the keys which are used for distinct access
1397        # of grib messages via product
1398        if '/' in self.number:
1399            # more than one ensemble member is selected
1400            index_keys = ["number", "date", "time", "step"]
1401        else:
1402            index_keys = ["date", "time", "step"]
1403        iid, index_vals = self._mk_index_values(c.inputdir,
1404                                                inputfiles,
1405                                                index_keys)
1406        # index_vals looks like e.g.:
1407        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1408        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1409        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1410
1411        # "product" genereates each possible combination between the
1412        # values of the index keys
1413        for prod in product(*index_vals):
1414            # e.g. prod = ('20170505', '0', '12')
1415            #             (  date    ,time, step)
1416
1417            print('current product: ', prod)
1418
1419            for i in range(len(index_keys)):
1420                codes_index_select(iid, index_keys[i], prod[i])
1421
1422            # get first id from current product
1423            gid = codes_new_from_index(iid)
1424
1425            # if there is no data for this specific time combination / product
1426            # skip the rest of the for loop and start with next timestep/product
1427            if not gid:
1428                continue
1429#============================================================================================
1430            # remove old fort.* files and open new ones
1431            # they are just valid for a single product
1432            for k, f in fdict.iteritems():
1433                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1434                silent_remove(fortfile)
1435                fdict[k] = open(fortfile, 'w')
1436#============================================================================================
1437            # create correct timestamp from the three time informations
1438            cdate = str(codes_get(gid, 'date'))
1439            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1440            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1441            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1442            timestamp += timedelta(hours=int(cstep))
1443            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1444
1445            # if the timestamp is out of basetime start/end date period,
1446            # skip this specific product
1447            if c.basetime is not None:
1448                time_delta = timedelta(hours=12-int(c.dtime))
1449                start_time = datetime.strptime(c.end_date + str(c.basetime),
1450                                                '%Y%m%d%H') - time_delta
1451                end_time = datetime.strptime(c.end_date + str(c.basetime),
1452                                             '%Y%m%d%H')
1453                if timestamp < start_time or timestamp > end_time:
1454                    continue
1455
1456            if c.wrf:
1457                if 'olddate' not in locals() or cdate != olddate:
1458                    fwrf = open(os.path.join(c.outputdir,
1459                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1460                    olddate = cdate[:]
1461#============================================================================================
1462            # savedfields remembers which fields were already used.
1463            savedfields = []
1464            # sum of cloud liquid and ice water content
1465            scwc = None
1466            while 1:
1467                if not gid:
1468                    break
1469                paramId = codes_get(gid, 'paramId')
1470                gridtype = codes_get(gid, 'gridType')
1471                levtype = codes_get(gid, 'typeOfLevel')
1472                if paramId == 77: # ETADOT
1473                    codes_write(gid, fdict['21'])
1474                elif paramId == 130: # T
1475                    codes_write(gid, fdict['11'])
1476                elif paramId == 131 or paramId == 132: # U, V wind component
1477                    codes_write(gid, fdict['10'])
1478                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1479                    codes_write(gid, fdict['17'])
1480                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1481                    codes_write(gid, fdict['18'])
1482                elif paramId == 135: # W
1483                    codes_write(gid, fdict['19'])
1484                elif paramId == 152: # LNSP
1485                    codes_write(gid, fdict['12'])
1486                elif paramId == 155 and gridtype == 'sh': # D
1487                    codes_write(gid, fdict['13'])
1488                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1489                    # sum cloud liquid water and ice
1490                    if scwc is None:
1491                        scwc = codes_get_values(gid)
1492                    else:
1493                        scwc += codes_get_values(gid)
1494                        codes_set_values(gid, scwc)
1495                        codes_set(gid, 'paramId', 201031)
1496                        codes_write(gid, fdict['22'])
1497                        scwc = None
1498                elif c.wrf and paramId in [129, 138, 155] and \
1499                      levtype == 'hybrid': # Z, VO, D
1500                    # do not do anything right now
1501                    # these are specific parameter for WRF
1502                    pass
1503                else:
1504                    if paramId not in savedfields:
1505                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1506                        # and all ADDPAR parameter
1507                        codes_write(gid, fdict['16'])
1508                        savedfields.append(paramId)
1509                    else:
1510                        print('duplicate ' + str(paramId) + ' not written')
1511
1512                try:
1513                    if c.wrf:
1514                        # model layer
1515                        if levtype == 'hybrid' and \
1516                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1517                            codes_write(gid, fwrf)
1518                        # sfc layer
1519                        elif paramId in wrfpars:
1520                            codes_write(gid, fwrf)
1521                except AttributeError:
1522                    pass
1523
1524                codes_release(gid)
1525                gid = codes_new_from_index(iid)
1526#============================================================================================
1527            for f in fdict.values():
1528                f.close()
1529#============================================================================================
1530            # call for Fortran program to convert e.g. reduced_gg grids to
1531            # regular_ll and calculate detadot/dp
1532            pwd = os.getcwd()
1533            os.chdir(c.inputdir)
1534            if os.stat('fort.21').st_size == 0 and c.eta:
1535                print('Parameter 77 (etadot) is missing, most likely it is \
1536                       not available for this type or date/time\n')
1537                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1538                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1539                         is set to 1 in CONTROL file')
1540#============================================================================================
1541            # write out all output to log file before starting fortran programm
1542            sys.stdout.flush()
1543
1544            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1545            execute_subprocess([os.path.join(c.exedir,
1546                                _config.FORTRAN_EXECUTABLE)],
1547                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1548
1549            os.chdir(pwd)
1550#============================================================================================
1551            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1552            if c.purefc:
1553                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1554            else:
1555                suffix = cdate_hour[2:10]
1556
1557            # if necessary, add ensemble member number to filename suffix
1558            if 'number' in index_keys:
1559                index_number = index_keys.index('number')
1560                if len(index_vals[index_number]) > 1:
1561                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1562
1563            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1564            print("outputfile = " + fnout)
1565            # collect for final processing
1566            self.outputfilelist.append(os.path.basename(fnout))
1567#============================================================================================
1568            # create outputfile and copy all data from intermediate files
1569            # to the outputfile (final GRIB input files for FLEXPART)
1570            orolsm = os.path.basename(glob.glob(c.inputdir +
1571                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1572            fluxfile = 'flux' + cdate[0:2] + suffix
1573            if not c.cwc:
1574                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1575            else:
1576                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1577
1578            with open(fnout, 'wb') as fout:
1579                for f in flist:
1580                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1581                                       fout)
1582
1583            if c.omega:
1584                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1585                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1586                                            'rb'), fout)
1587#============================================================================================
1588        if c.wrf:
1589            fwrf.close()
1590
1591        codes_index_release(iid)
1592
1593        return
1594
1595
1596    def calc_extra_elda(self, path, prefix):
1597        ''' Calculates extra ensemble members for ELDA - Stream.
1598
1599        Parameters
1600        ----------
1601        path : str
1602            Path to the output files.
1603
1604        prefix : str
1605            The prefix of the output filenames as defined in Control file.
1606
1607        Return
1608        ------
1609
1610        '''
1611        # max number
1612        maxnum = int(self.number.split('/')[-1])
1613
1614        # get a list of all prepared output files with control forecast (CF)
1615        CF_filelist = UioFiles(path, prefix + '*.N000')
1616
1617        for cffile in CF_filelist.files:
1618            with open(cffile, 'rb') as f:
1619                cfvalues=[]
1620                while True:
1621                    fid = codes_grib_new_from_file(f)
1622                    if fid is None:
1623                        break
1624                    cfvalues.append(codes_get_array(fid, 'values'))
1625                    codes_release(fid)
1626
1627            filename = cffile.split('N000')[0]
1628            for i in range(1, maxnum+1): # max number nehmen
1629
1630                # read an ensemble member
1631                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1632                # create file for newly calculated ensemble member
1633                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1634                # number of message in grib file
1635                j = 0
1636                while True:
1637                    gid = codes_grib_new_from_file(g)
1638                    if gid is None:
1639                        break
1640                    values = codes_get_array(gid, 'values')
1641                    codes_set_array(gid, 'values',
1642                                    values-2*(values-cfvalues[j]))
1643                    codes_set(gid, 'number', i+maxnum)
1644                    codes_write(gid, h)
1645                    codes_release(gid)
1646                    j += 1
1647
1648                g.close()
1649                h.close()
1650                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1651                self.outputfilelist.append(
1652                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1653
1654        return
1655
1656
1657    def process_output(self, c):
1658        '''Postprocessing of FLEXPART input files.
1659
1660        The grib files are postprocessed depending on the selection in
1661        CONTROL file. The resulting files are moved to the output
1662        directory if its not equal to the input directory.
1663        The following modifications might be done if
1664        properly switched in CONTROL file:
1665        GRIB2 - Conversion to GRIB2
1666        ECTRANS - Transfer of files to gateway server
1667        ECSTORAGE - Storage at ECMWF server
1668
1669        Parameters
1670        ----------
1671        c : ControlFile
1672            Contains all the parameters of CONTROL file and
1673            command line.
1674
1675        Return
1676        ------
1677
1678        '''
1679
1680        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1681
1682        if not c.ecapi:
1683            print('ecstorage: {}\n ecfsdir: {}\n'.
1684                  format(c.ecstorage, c.ecfsdir))
1685            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1686                  .format(c.ectrans, c.gateway, c.destination))
1687
1688        print('Output filelist: ')
1689        print(self.outputfilelist)
1690
1691        for ofile in self.outputfilelist:
1692            ofile = os.path.join(self.inputdir, ofile)
1693
1694            if c.format.lower() == 'grib2':
1695                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1696                                    'productDefinitionTemplateNumber=8',
1697                                    ofile, ofile + '_2'],
1698                                   error_msg='GRIB2 CONVERSION FAILED!')
1699
1700                execute_subprocess(['mv', ofile + '_2', ofile],
1701                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1702                                   'FILES FAILED!')
1703
1704            if c.ectrans and not c.ecapi:
1705                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1706                                    c.gateway, '-remote', c.destination,
1707                                    '-source', ofile],
1708                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1709
1710            if c.ecstorage and not c.ecapi:
1711                execute_subprocess(['ecp', '-o', ofile,
1712                                    os.path.expandvars(c.ecfsdir)],
1713                                   error_msg='COPY OF FILES TO ECSTORAGE '
1714                                   'AREA FAILED!')
1715
1716            if c.outputdir != c.inputdir:
1717                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1718                                    c.outputdir],
1719                                   error_msg='RELOCATION OF OUTPUT FILES '
1720                                   'TO OUTPUTDIR FAILED!')
1721
1722        return
1723
1724
1725    def prepare_fp_files(self, c):
1726        '''Conversion of GRIB files to FLEXPART binary format.
1727
1728        Parameters
1729        ----------
1730        c : ControlFile
1731            Contains all the parameters of CONTROL file and
1732            command line.
1733
1734        Return
1735        ------
1736
1737        '''
1738        # generate AVAILABLE file
1739        # Example of AVAILABLE file data:
1740        # 20131107 000000      EN13110700              ON DISC
1741        clist = []
1742        for ofile in self.outputfilelist:
1743            fname = ofile.split('/')
1744            if '.' in fname[-1]:
1745                l = fname[-1].split('.')
1746                timestamp = datetime.strptime(l[0][-6:] + l[1],
1747                                              '%y%m%d%H')
1748                timestamp += timedelta(hours=int(l[2]))
1749                cdate = datetime.strftime(timestamp, '%Y%m%d')
1750                chms = datetime.strftime(timestamp, '%H%M%S')
1751            else:
1752                cdate = '20' + fname[-1][-8:-2]
1753                chms = fname[-1][-2:] + '0000'
1754            clist.append(cdate + ' ' + chms + ' '*6 +
1755                         fname[-1] + ' '*14 + 'ON DISC')
1756        clist.sort()
1757        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1758            f.write('\n'.join(clist) + '\n')
1759
1760        # generate pathnames file
1761        pwd = os.path.abspath(c.outputdir)
1762        with open(pwd + '/pathnames', 'w') as f:
1763            f.write(pwd + '/Options/\n')
1764            f.write(pwd + '/\n')
1765            f.write(pwd + '/\n')
1766            f.write(pwd + '/AVAILABLE\n')
1767            f.write(' = == = == = == = == = == ==  = \n')
1768
1769        # create Options dir if necessary
1770        if not os.path.exists(pwd + '/Options'):
1771            make_dir(pwd+'/Options')
1772
1773        # read template COMMAND file
1774        with open(os.path.expandvars(os.path.expanduser(
1775            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1776            lflist = f.read().split('\n')
1777
1778        # find index of list where to put in the
1779        # date and time information
1780        # usually after the LDIRECT parameter
1781        i = 0
1782        for l in lflist:
1783            if 'LDIRECT' in l.upper():
1784                break
1785            i += 1
1786
1787        # insert the date and time information of run start and end
1788        # into the list of lines of COMMAND file
1789        lflist = lflist[:i+1] + \
1790                 [clist[0][:16], clist[-1][:16]] + \
1791                 lflist[i+3:]
1792
1793        # write the new COMMAND file
1794        with open(pwd + '/Options/COMMAND', 'w') as g:
1795            g.write('\n'.join(lflist) + '\n')
1796
1797        # change to outputdir and start the grib2flexpart run
1798        # afterwards switch back to the working dir
1799        os.chdir(c.outputdir)
1800        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1801         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1802        execute_subprocess(cmd)
1803        os.chdir(pwd)
1804
1805        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG