source: flex_extract.git/source/python/classes/EcFlexpart.py @ f20af73

dev
Last change on this file since f20af73 was f20af73, checked in by Anne Philipp <anne.philipp@…>, 6 months ago

added CDS API support for ERA5 instead of ECMWFAPI / refactored setup of CONTROL parameter because for local version it wanted to use not installed ECMWF_ENV file.

  • Property mode set to 100644
File size: 73.3 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get, codes_get_array,
70                     codes_set_array, codes_grib_new_from_file)
71
72# software specific classes and modules from flex_extract
73sys.path.append('../')
74import _config
75from .GribUtil import GribUtil
76from mods.tools import (init128, to_param_id, silent_remove, product,
77                        my_error, make_dir, get_informations, get_dimensions,
78                        execute_subprocess, to_param_id_with_tablenumber)
79from .MarsRetrieval import MarsRetrieval
80from .UioFiles import UioFiles
81import mods.disaggregation as disaggregation
82
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = range(len(ftime))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        self.types[str(self.acctype)] = {'times': str(self.acctime),
324                                         'steps': '{}/to/{}/by/{}'.format(
325                                             self.dtime,
326                                             self.accmaxstep,
327                                             self.dtime)}
328        return
329
330    def _create_params(self, gauss, eta, omega, cwc, wrf):
331        '''Define the specific parameter settings for retrievment.
332
333        The different parameters need specific grid types and level types
334        for retrievement. We might get following combination of types
335        (depending on selection and availability):
336        (These are short cuts for the grib file names (leading sequence)
337        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
338        where:
339            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
340            ML = Model Level, SL = Surface Level
341
342        For each of this combination there is a list of parameter names,
343        the level type, the level list and the grid resolution.
344
345        There are different scenarios for data extraction from MARS:
346        1) Retrieval of etadot
347           eta=1, gauss=0, omega=0
348        2) Calculation of etadot from divergence
349           eta=0, gauss=1, omega=0
350        3) Calculation of etadot from omega (for makes sense for debugging)
351           eta=0, gauss=0, omega=1
352        4) Retrieval and Calculation of etadot (only for debugging)
353           eta=1, gauss=1, omega=0
354        5) Download also specific model and surface level data for FLEXPART-WRF
355
356        Parameters:
357        -----------
358        gauss : int
359            Gaussian grid is retrieved.
360
361        eta : int
362            Etadot parameter will be directly retrieved.
363
364        omega : int
365            The omega paramterwill be retrieved.
366
367        cwc : int
368            The cloud liquid and ice water content will be retrieved.
369
370        wrf : int
371            Additional model level and surface level data will be retrieved for
372            WRF/FLEXPART-WRF simulations.
373
374        Return
375        ------
376
377        '''
378        # SURFACE FIELDS
379        #-----------------------------------------------------------------------
380        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
381        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
382                                 'SFC', '1', self.grid]
383        if self.addpar:
384            self.params['OG__SL'][0] += self.addpar
385
386        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
387            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
388                                            'SFC', '1', self.grid]
389        else:
390            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
391                                            'SFC', '1', self.grid]
392
393        # MODEL LEVEL FIELDS
394        #-----------------------------------------------------------------------
395        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
396
397        if not gauss and eta:
398            self.params['OG__ML'][0] += '/U/V/ETADOT'
399        elif gauss and not eta:
400            self.params['GG__SL'] = ['Q', 'ML', '1', \
401                                     '{}'.format((int(self.resol) + 1) / 2)]
402            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
403        elif not gauss and not eta:
404            self.params['OG__ML'][0] += '/U/V'
405        else:  # GAUSS and ETA
406            print('Warning: Collecting etadot and parameters for gaussian grid '
407                           'is a very costly parameter combination, '
408                           'use this combination only for debugging!')
409            self.params['GG__SL'] = ['Q', 'ML', '1',
410                                     '{}'.format((int(self.resol) + 1) / 2)]
411            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
412                                     '{}'.format((int(self.resol) + 1) / 2)]
413
414        if omega:
415            self.params['OG__ML'][0] += '/W'
416
417        if cwc:
418            self.params['OG__ML'][0] += '/CLWC/CIWC'
419
420        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
421        # -----------------------------------------------------------------------
422        if wrf:
423            self.params['OG__ML'][0] += '/Z/VO'
424            if '/D' not in self.params['OG__ML'][0]:
425                self.params['OG__ML'][0] += '/D'
426
427            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
428                       'SWVL1','SWVL2','SWVL3','SWVL4']
429            for par in wrf_sfc:
430                if par not in self.params['OG__SL'][0]:
431                    self.params['OG__SL'][0] += '/' + par
432
433        return
434
435
436    def _create_params_fluxes(self):
437        '''Define the parameter setting for flux data.
438
439        Flux data are accumulated fields in time and are stored on the
440        surface level. The leading short cut name for the grib files is:
441        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
442        acc for Accumulated Grid.
443        The params dictionary stores a list of parameter names, the level type,
444        the level list and the grid resolution.
445
446        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
447
448        Parameters:
449        -----------
450
451        Return
452        ------
453
454        '''
455        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
456                                    'SFC', '1', self.grid]
457        return
458
459
460    def _mk_targetname(self, ftype, param, date):
461        '''Creates the filename for the requested grib data to be stored in.
462        This name is passed as the "target" parameter in the request.
463
464        Parameters
465        ----------
466        ftype : str
467            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
468
469        param : str
470            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
471            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
472
473        date : str
474            The date period of the grib data to be stored in this file.
475
476        Return
477        ------
478        targetname : str
479            The target filename for the grib data.
480        '''
481        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
482                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
483
484        return targetname
485
486
487    def _start_retrievement(self, request, par_dict):
488        '''Creates the Mars Retrieval and prints or submits the request
489        depending on the status of the request variable.
490
491        Parameters
492        ----------
493        request : int
494            Selects the mode of retrieval.
495            0: Retrieves the data from ECMWF.
496            1: Prints the mars requests to an output file.
497            2: Retrieves the data and prints the mars request.
498
499        par_dict : dictionary
500            Contains all parameter which have to be set for creating the
501            Mars Retrievals. The parameter are:
502            marsclass, dataset, stream, type, levtype, levelist, resol,
503            gaussian, accuracy, grid, target, area, date, time, number,
504            step, expver, param
505
506        Return
507        ------
508
509        '''
510        # increase number of mars requests
511        self.mreq_count += 1
512
513        MR = MarsRetrieval(self.server,
514                           self.public,
515                           marsclass=par_dict['marsclass'],
516                           dataset=par_dict['dataset'],
517                           stream=par_dict['stream'],
518                           type=par_dict['type'],
519                           levtype=par_dict['levtype'],
520                           levelist=par_dict['levelist'],
521                           resol=par_dict['resol'],
522                           gaussian=par_dict['gaussian'],
523                           accuracy=par_dict['accuracy'],
524                           grid=par_dict['grid'],
525                           target=par_dict['target'],
526                           area=par_dict['area'],
527                           date=par_dict['date'],
528                           time=par_dict['time'],
529                           number=par_dict['number'],
530                           step=par_dict['step'],
531                           expver=par_dict['expver'],
532                           param=par_dict['param'])
533
534        if request == 0:
535            MR.display_info()
536            MR.data_retrieve()
537        elif request == 1:
538            MR.print_infodata_csv(self.inputdir, self.mreq_count)
539        elif request == 2:
540            MR.print_infodata_csv(self.inputdir, self.mreq_count)
541            MR.display_info()
542            MR.data_retrieve()
543        else:
544            print('Failure')
545
546        return
547
548
549    def _mk_index_values(self, inputdir, inputfiles, keys):
550        '''Creates an index file for a set of grib parameter keys.
551        The values from the index keys are returned in a list.
552
553        Parameters
554        ----------
555        keys : dictionary
556            List of parameter names which serves as index.
557
558        inputfiles : UioFiles
559            Contains a list of files.
560
561        Return
562        ------
563        iid : codes_index
564            This is a grib specific index structure to access
565            messages in a file.
566
567        index_vals : list of list  of str
568            Contains the values from the keys used for a distinct selection
569            of grib messages in processing  the grib files.
570            Content looks like e.g.:
571            index_vals[0]: ('20171106', '20171107', '20171108') ; date
572            index_vals[1]: ('0', '1200', '1800', '600') ; time
573            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
574        '''
575        iid = None
576        index_keys = keys
577
578        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
579        silent_remove(indexfile)
580        grib = GribUtil(inputfiles.files)
581        # creates new index file
582        iid = grib.index(index_keys=index_keys, index_file=indexfile)
583
584        # read the values of index keys
585        index_vals = []
586        for key in index_keys:
587            key_vals = codes_index_get(iid, key)
588            # have to sort the key values for correct disaggregation,
589            # therefore convert to int first
590            key_vals = [int(k) for k in key_vals]
591            key_vals.sort()
592            key_vals = [str(k) for k in key_vals]
593            index_vals.append(key_vals)
594            # index_vals looks for example like:
595            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
596            # index_vals[1]: ('0', '1200') ; time
597            # index_vals[2]: (3', '6', '9', '12') ; stepRange
598
599        return iid, index_vals
600
601
602    def retrieve(self, server, dates, public, request, inputdir='.'):
603        '''Finalizing the retrieval information by setting final details
604        depending on grid type.
605        Prepares MARS retrievals per grid type and submits them.
606
607        Parameters
608        ----------
609        server : ECMWFService or ECMWFDataServer
610            The connection to the ECMWF server. This is different
611            for member state users which have full access and non
612            member state users which have only access to the public
613            data sets. The decision is made from command line argument
614            "public"; for public access its True (ECMWFDataServer)
615            for member state users its False (ECMWFService)
616
617        dates : str
618            Contains start and end date of the retrieval in the format
619            "YYYYMMDD/to/YYYYMMDD"
620
621        request : int
622            Selects the mode of retrieval.
623            0: Retrieves the data from ECMWF.
624            1: Prints the mars requests to an output file.
625            2: Retrieves the data and prints the mars request.
626
627        inputdir : str, optional
628            Path to the directory where the retrieved data is about
629            to be stored. The default is the current directory ('.').
630
631        Return
632        ------
633
634        '''
635        self.dates = dates
636        self.server = server
637        self.public = public
638        self.inputdir = inputdir
639        oro = False
640
641        # define times with datetime module
642        t12h = timedelta(hours=12)
643        t24h = timedelta(hours=24)
644
645        # dictionary which contains all parameter for the mars request,
646        # entries with a "None" will change in different requests and will
647        # therefore be set in each request seperately
648        retr_param_dict = {'marsclass':self.marsclass,
649                           'dataset':self.dataset,
650                           'stream':None,
651                           'type':None,
652                           'levtype':None,
653                           'levelist':None,
654                           'resol':self.resol,
655                           'gaussian':None,
656                           'accuracy':self.accuracy,
657                           'grid':None,
658                           'target':None,
659                           'area':None,
660                           'date':None,
661                           'time':None,
662                           'number':self.number,
663                           'step':None,
664                           'expver':self.expver,
665                           'param':None}
666
667        for ftype in self.types:
668            # ftype contains field types such as
669            #     [AN, FC, PF, CV]
670            for pk, pv in self.params.iteritems():
671                # pk contains one of these keys of params
672                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
673                #      OG_OROLSM_SL, OG_acc_SL]
674                # pv contains all of the items of the belonging key
675                #     [param, levtype, levelist, grid]
676                if isinstance(pv, str):
677                    continue
678                retr_param_dict['type'] = ftype
679                retr_param_dict['time'] = self.types[ftype]['times']
680                retr_param_dict['step'] = self.types[ftype]['steps']
681                retr_param_dict['date'] = self.dates
682                retr_param_dict['stream'] = self.stream
683                retr_param_dict['target'] = \
684                    self._mk_targetname(ftype,
685                                        pk,
686                                        retr_param_dict['date'].split('/')[0])
687                table128 = init128(_config.PATH_GRIBTABLE)
688                ids = to_param_id_with_tablenumber(pv[0], table128)
689                retr_param_dict['param'] = ids
690                retr_param_dict['levtype'] = pv[1]
691                retr_param_dict['levelist'] = pv[2]
692                retr_param_dict['grid'] = pv[3]
693                retr_param_dict['area'] = self.area
694                retr_param_dict['gaussian'] = self.gaussian
695
696                if pk == 'OG_OROLSM__SL' and not oro:
697                    oro = True
698                    # in CERA20C (class EP) there is no stream "OPER"!
699                    if self.marsclass.upper() != 'EP':
700                        retr_param_dict['stream'] = 'OPER'
701                    retr_param_dict['type'] = 'AN'
702                    retr_param_dict['time'] = '00'
703                    retr_param_dict['step'] = '000'
704                    retr_param_dict['date'] = self.dates.split('/')[0]
705                    retr_param_dict['target'] = self._mk_targetname('',
706                                            pk, retr_param_dict['date'])
707                elif pk == 'OG_OROLSM__SL' and oro:
708                    continue
709                if pk == 'GG__SL' and pv[0] == 'Q':
710                    retr_param_dict['area'] = ""
711                    retr_param_dict['gaussian'] = 'reduced'
712                if ftype.upper() == 'FC' and \
713                    'acc' not in retr_param_dict['target']:
714                    if (int(retr_param_dict['time'][0]) +
715                        int(retr_param_dict['step'][0])) > 23:
716                        dates = retr_param_dict['date'].split('/')
717                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
718                        sdate = sdate - timedelta(days=1)
719                        retr_param_dict['date'] = '/'.join(
720                            [sdate.strftime("%Y%m%d")] +
721                            retr_param_dict['date'][1:])
722
723                        print('CHANGED FC start date to ' +
724                              sdate.strftime("%Y%m%d") +
725                              ' to accomodate TIME=' +
726                              retr_param_dict['time'][0] +
727                              ', STEP=' +
728                              retr_param_dict['time'][0])
729
730    # ------  on demand path  --------------------------------------------------
731                if self.basetime is None:
732                    # ******* start retrievement
733                    self._start_retrievement(request, retr_param_dict)
734    # ------  operational path  ------------------------------------------------
735                else:
736                    # check if mars job requests fields beyond basetime.
737                    # if yes eliminate those fields since they may not
738                    # be accessible with user's credentials
739
740                    enddate = retr_param_dict['date'].split('/')[-1]
741                    elimit = datetime.strptime(enddate + str(self.basetime),
742                                               '%Y%m%d%H')
743
744                    if self.basetime == 12:
745                        # --------------  flux data ----------------------------
746                        if 'acc' in pk:
747                            startdate = retr_param_dict['date'].split('/')[0]
748                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
749                            retr_param_dict['date'] = '/'.join([startdate,
750                                                                'to',
751                                                                enddate])
752
753                            # ******* start retrievement
754                            self._start_retrievement(request, retr_param_dict)
755
756                            retr_param_dict['date'] = \
757                                datetime.strftime(elimit - t12h, '%Y%m%d')
758                            retr_param_dict['time'] = '00'
759                            retr_param_dict['target'] = \
760                                self._mk_targetname(ftype, pk,
761                                                    retr_param_dict['date'])
762
763                            # ******* start retrievement
764                            self._start_retrievement(request, retr_param_dict)
765
766                        # --------------  non flux data ------------------------
767                        else:
768                            # ******* start retrievement
769                            self._start_retrievement(request, retr_param_dict)
770
771                    elif self.basetime == 0:
772                        retr_param_dict['date'] = \
773                            datetime.strftime(elimit - t24h, '%Y%m%d')
774
775                        timesave = ''.join(retr_param_dict['time'])
776
777                        if ('/' in retr_param_dict['time'] and
778                            pk != 'OG_OROLSM__SL' and
779                            'acc' not in pk ) :
780                            times = retr_param_dict['time'].split('/')
781                            steps = retr_param_dict['step'].split('/')
782
783                            while int(times[0]) + int(steps[0]) <= 12:
784                                times = times[1:]
785                                if len(times) > 1:
786                                    retr_param_dict['time'] = '/'.join(times)
787                                else:
788                                    retr_param_dict['time'] = times[0]
789
790                        if (pk != 'OG_OROLSM__SL' and
791                            int(retr_param_dict['step'].split('/')[0]) == 0 and
792                            int(timesave.split('/')[0]) == 0):
793
794                            retr_param_dict['date'] = \
795                                datetime.strftime(elimit, '%Y%m%d')
796                            retr_param_dict['time'] = '00'
797                            retr_param_dict['step'] = '000'
798                            retr_param_dict['target'] = \
799                                self._mk_targetname(ftype, pk,
800                                                    retr_param_dict['date'])
801
802                        # ******* start retrievement
803                        self._start_retrievement(request, retr_param_dict)
804                    else:
805                        raise ValueError('ERROR: Basetime has an invalid value '
806                                                 '-> {}'.format(str(basetime)))
807
808        if request == 0 or request == 2:
809            print('MARS retrieve done ... ')
810        elif request == 1:
811            print('MARS request printed ...')
812
813        return
814
815
816    def write_namelist(self, c):
817        '''Creates a namelist file in the temporary directory and writes
818        the following values to it: maxl, maxb, mlevel,
819        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
820        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
821
822        Parameters
823        ----------
824        c : ControlFile
825            Contains all the parameters of CONTROL file and
826            command line.
827
828        filename : str
829                Name of the namelist file.
830
831        Return
832        ------
833
834        '''
835
836        from genshi.template.text import NewTextTemplate
837        from genshi.template import  TemplateLoader
838        from genshi.template.eval import  UndefinedError
839
840        try:
841            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
842            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
843                                            cls=NewTextTemplate)
844
845            self.inputdir = c.inputdir
846            area = np.asarray(self.area.split('/')).astype(float)
847            grid = np.asarray(self.grid.split('/')).astype(float)
848
849            if area[1] > area[3]:
850                area[1] -= 360
851            maxl = int((area[3] - area[1]) / grid[1]) + 1
852            maxb = int((area[0] - area[2]) / grid[0]) + 1
853
854            stream = namelist_template.generate(
855                maxl = str(maxl),
856                maxb = str(maxb),
857                mlevel = str(self.level),
858                mlevelist = str(self.levelist),
859                mnauf = str(self.resol),
860                metapar = '77',
861                rlo0 = str(area[1]),
862                rlo1 = str(area[3]),
863                rla0 = str(area[2]),
864                rla1 = str(area[0]),
865                momega = str(c.omega),
866                momegadiff = str(c.omegadiff),
867                mgauss = str(c.gauss),
868                msmooth = str(c.smooth),
869                meta = str(c.eta),
870                metadiff = str(c.etadiff),
871                mdpdeta = str(c.dpdeta)
872            )
873        except UndefinedError as e:
874            print('... ERROR ' + str(e))
875
876            sys.exit('\n... error occured while trying to generate namelist ' +
877                     _config.TEMPFILE_NAMELIST)
878        except OSError as e:
879            print('... ERROR CODE: ' + str(e.errno))
880            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
881
882            sys.exit('\n... error occured while trying to generate template ' +
883                     _config.TEMPFILE_NAMELIST)
884
885        try:
886            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
887
888            with open(namelistfile, 'w') as f:
889                f.write(stream.render('text'))
890        except OSError as e:
891            print('... ERROR CODE: ' + str(e.errno))
892            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
893
894            sys.exit('\n... error occured while trying to write ' +
895                     namelistfile)
896
897        return
898
899
900    def deacc_fluxes(self, inputfiles, c):
901        '''De-accumulate and disaggregate flux data.
902
903        Goes through all flux fields in ordered time and de-accumulate
904        the fields. Afterwards the fields are disaggregated in time.
905        Different versions of disaggregation is provided for rainfall
906        data (darain, modified linear) and the surface fluxes and
907        stress data (dapoly, cubic polynomial).
908
909        Parameters
910        ----------
911        inputfiles : UioFiles
912            Contains the list of files that contain flux data.
913
914        c : ControlFile
915            Contains all the parameters of CONTROL file and
916            command line.
917
918        Return
919        ------
920
921        '''
922
923        table128 = init128(_config.PATH_GRIBTABLE)
924        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
925
926        iid = None
927        index_vals = None
928
929        # get the values of the keys which are used for distinct access
930        # of grib messages via product
931        if '/' in self.number:
932            # more than one ensemble member is selected
933            index_keys = ["number", "date", "time", "step"]
934        else:
935            index_keys = ["date", "time", "step"]
936        iid, index_vals = self._mk_index_values(c.inputdir,
937                                                inputfiles,
938                                                index_keys)
939        # index_vals looks like e.g.:
940        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
941        # index_vals[1]: ('0', '600', '1200', '1800') ; time
942        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
943
944        if c.rrint:
945            if not c.purefc:
946                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
947                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
948            else:
949                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
950                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
951                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
952                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
953                end_date = end_date + timedelta(hours=c.maxstep)
954
955            info = get_informations(os.path.join(c.inputdir,
956                                                 inputfiles.files[0]))
957            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
958                                  start_date, end_date)
959            # create numpy array
960            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
961            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
962            it_lsp = 0
963            it_cp = 0
964            date_list = []
965            step_list = []
966
967        # initialize dictionaries to store values
968        orig_vals = {}
969        deac_vals = {}
970        for p in pars:
971            orig_vals[p] = []
972            deac_vals[p] = []
973
974        # "product" genereates each possible combination between the
975        # values of the index keys
976        for prod in product(*index_vals):
977            # e.g. prod = ('20170505', '0', '12')
978            #             (  date    ,time, step)
979
980            print('CURRENT PRODUCT: ', prod)
981
982            for i in range(len(index_keys)):
983                codes_index_select(iid, index_keys[i], prod[i])
984
985            # get first id from current product
986            gid = codes_new_from_index(iid)
987
988            # if there is no data for this specific time combination / product
989            # skip the rest of the for loop and start with next timestep/product
990            if not gid:
991                continue
992
993            # create correct timestamp from the three time informations
994            cdate = str(codes_get(gid, 'date'))
995            time = codes_get(gid, 'time')/100 # integer
996            step = codes_get(gid, 'step') # integer
997            ctime = '{:0>2}'.format(time)
998            cstep = '{:0>3}'.format(step)
999
1000            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1001            t_dt = t_date + timedelta(hours=step)
1002            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1003            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1004            if c.basetime is not None:
1005                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1006                                              '%Y%m%d%H')
1007            else:
1008                t_enddate = t_date + timedelta(2*int(c.dtime))
1009
1010            # if necessary, add ensemble member number to filename suffix
1011            # otherwise, add empty string
1012            if 'number' in index_keys:
1013                index_number = index_keys.index('number')
1014                if len(index_vals[index_number]) > 1:
1015                    numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1016            else:
1017                numbersuffix = ''
1018
1019            if c.purefc:
1020                fnout = os.path.join(c.inputdir, 'flux' +
1021                                     t_date.strftime('%Y%m%d.%H') +
1022                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1023                                     numbersuffix)
1024                gnout = os.path.join(c.inputdir, 'flux' +
1025                                     t_date.strftime('%Y%m%d.%H') +
1026                                     '.{:0>3}'.format(step-int(c.dtime)) +
1027                                     numbersuffix)
1028                hnout = os.path.join(c.inputdir, 'flux' +
1029                                     t_date.strftime('%Y%m%d.%H') +
1030                                     '.{:0>3}'.format(step) +
1031                                     numbersuffix)
1032            else:
1033                fnout = os.path.join(c.inputdir, 'flux' +
1034                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1035                gnout = os.path.join(c.inputdir, 'flux' +
1036                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1037                hnout = os.path.join(c.inputdir, 'flux' +
1038                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1039
1040            print("outputfile = " + fnout)
1041            f_handle = open(fnout, 'w')
1042            h_handle = open(hnout, 'w')
1043            g_handle = open(gnout, 'w')
1044
1045            # read message for message and store relevant data fields, where
1046            # data keywords are stored in pars
1047            while True:
1048                if not gid:
1049                    break
1050                parId = codes_get(gid, 'paramId') # integer
1051                step = codes_get(gid, 'step') # integer
1052                time = codes_get(gid, 'time') # integer
1053                ni = codes_get(gid, 'Ni') # integer
1054                nj = codes_get(gid, 'Nj') # integer
1055                if parId not in orig_vals.keys():
1056                    # parameter is not a flux, find next one
1057                    continue
1058
1059                # define conversion factor
1060                if parId == 142 or parId == 143:
1061                    fak = 1. / 1000.
1062                else:
1063                    fak = 3600.
1064
1065                # get parameter values and reshape
1066                values = codes_get_values(gid)
1067                values = (np.reshape(values, (nj, ni))).flatten() / fak
1068
1069                # save the original and accumulated values
1070                orig_vals[parId].append(values[:])
1071
1072                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1073                    # no de-accumulation needed
1074                    deac_vals[parId].append(values[:] / int(c.dtime))
1075                else:
1076                    # do de-accumulation
1077                    deac_vals[parId].append(
1078                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1079                         int(c.dtime))
1080
1081                # store precipitation if new disaggregation method is selected
1082                # only the exact days are needed
1083                if c.rrint:
1084                    if start_date <= t_dt <= end_date:
1085                        if not c.purefc:
1086                            if t_dt not in date_list:
1087                                date_list.append(t_dt)
1088                                step_list = [0]
1089                        else:
1090                            if t_date not in date_list:
1091                                date_list.append(t_date)
1092                            if step not in step_list:
1093                                step_list.append(step)
1094                        if c.rrint and parId == 142:
1095                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
1096                            it_lsp += 1
1097                        elif c.rrint and parId == 143:
1098                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
1099                            it_cp += 1
1100
1101                # information printout
1102                print(parId, time, step, len(values), values[0], np.std(values))
1103
1104                # length of deac_vals[parId] corresponds to the
1105                # number of time steps, max. 4 are needed for disaggegration
1106                # with the old and original method
1107                # run over all grib messages and perform
1108                # shifting in time
1109                if len(deac_vals[parId]) >= 3:
1110                    if len(deac_vals[parId]) > 3:
1111                        if not c.rrint and (parId == 142 or parId == 143):
1112                            values = disaggregation.darain(deac_vals[parId])
1113                        else:
1114                            values = disaggregation.dapoly(deac_vals[parId])
1115
1116                        if not (step == c.maxstep and c.purefc \
1117                                or t_dt == t_enddate):
1118                            # remove first time step in list to shift
1119                            # time line
1120                            orig_vals[parId].pop(0)
1121                            deac_vals[parId].pop(0)
1122                    else:
1123                        # if the third time step is read (per parId),
1124                        # write out the first one as a boundary value
1125                        if c.purefc:
1126                            values = deac_vals[parId][1]
1127                        else:
1128                            values = deac_vals[parId][0]
1129
1130                    if not (c.rrint and (parId == 142 or parId == 143)):
1131                        codes_set_values(gid, values)
1132
1133                        if c.purefc:
1134                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1135                        else:
1136                            codes_set(gid, 'stepRange', 0)
1137                            codes_set(gid, 'time', t_m2dt.hour*100)
1138                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1139
1140                        codes_write(gid, f_handle)
1141
1142                        # squeeze out information of last two steps
1143                        # contained in deac_vals[parId]
1144                        # Note that deac_vals[parId][0] has not been popped
1145                        # in this case
1146
1147                        if step == c.maxstep and c.purefc or \
1148                           t_dt == t_enddate:
1149                            # last step
1150                            if c.purefc:
1151                                values = deac_vals[parId][3]
1152                                codes_set_values(gid, values)
1153                                codes_set(gid, 'stepRange', step)
1154                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1155                                codes_write(gid, h_handle)
1156                            else:
1157                                values = deac_vals[parId][3]
1158                                codes_set_values(gid, values)
1159                                codes_set(gid, 'stepRange', 0)
1160                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1161                                codes_set(gid, 'time', truedatetime.hour * 100)
1162                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1163                                codes_write(gid, h_handle)
1164
1165                            if parId == 142 or parId == 143:
1166                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1167                            else:
1168                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1169
1170                            # step before last step
1171                            if c.purefc:
1172                                codes_set(gid, 'stepRange', step-int(c.dtime))
1173                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1174                                codes_set_values(gid, values)
1175                                codes_write(gid, g_handle)
1176                            else:
1177                                codes_set(gid, 'stepRange', 0)
1178                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1179                                codes_set(gid, 'time', truedatetime.hour * 100)
1180                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1181                                codes_set_values(gid, values)
1182                                codes_write(gid, g_handle)
1183
1184                codes_release(gid)
1185
1186                gid = codes_new_from_index(iid)
1187
1188            f_handle.close()
1189            g_handle.close()
1190            h_handle.close()
1191
1192        codes_index_release(iid)
1193
1194        if c.rrint:
1195            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1196
1197            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1198                                 cp_np, date_list, step_list, c)
1199
1200        return
1201
1202    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1203        '''Calculates and writes out the disaggregated precipitation fields.
1204
1205        Disaggregation is done in time and original times are written to the
1206        flux files, while the additional subgrid times are written to
1207        extra files.
1208
1209        Parameters
1210        ----------
1211        ni : int
1212            Amount of zonal grid points.
1213
1214        nj : int
1215            Amount of meridional grid points.
1216
1217        nt : int
1218            Number of time steps.
1219
1220        lsp_np : numpy array of float
1221            The large scale precipitation fields for each time step.
1222            Shape (ni * nj, nt).
1223
1224        cp_np : numpy array of float
1225            The convective precipitation fields for each time step.
1226            Shape (ni * nj, nt).
1227
1228        date_list : list of datetime
1229            The list of dates for which the disaggregation is to be done.
1230
1231        step_list : list of int
1232            The list of steps for a single forecast time.
1233            Only necessary for pure forecasts.
1234
1235        c : ControlFile
1236            Contains all the parameters of CONTROL file and
1237            command line.
1238
1239        Return
1240        ------
1241
1242        '''
1243        print('... disaggregation of precipitation with new method.')
1244        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1245        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1246
1247        # do the disaggregation, but neglect the last value of the
1248        # time series. This one corresponds for example to 24 hour,
1249        # which we don't need.
1250        for ix in range(ni*nj):
1251            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1252            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1253
1254        # write to grib files (full/orig times to flux file and inbetween
1255        # times into seperate end files)
1256        print('... write disaggregated precipitation to files.')
1257        it = 0
1258        for date in date_list:
1259            for step in step_list:
1260                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1261
1262                if c.purefc:
1263                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1264                                   '.{:0>3}'.format(step)
1265                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1266                                '.{:0>3}'.format(step) + '_1'
1267                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1268                                '.{:0>3}'.format(step) + '_2'
1269                else:
1270                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1271                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1272                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1273
1274                # write original time step to flux file as usual
1275                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1276                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1277                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1278                                  keynames=['date','time','stepRange','values'],
1279                                  keyvalues=[int(date.strftime('%Y%m%d')),
1280                                             date.hour*100, step, lsp_new_np[:,it]],
1281                                 )
1282                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1283                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1284                                  keynames=['date','time','stepRange','values'],
1285                                  keyvalues=[int(date.strftime('%Y%m%d')),
1286                                             date.hour*100, step, cp_new_np[:,it]]
1287                                 )
1288
1289                # write first subgrid time step
1290                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1291                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1292                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1293                                  keynames=['date','time','stepRange','values'],
1294                                  keyvalues=[int(date.strftime('%Y%m%d')),
1295                                             date.hour*100, step, lsp_new_np[:,it+1]]
1296                                  )
1297                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1298                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1299                                  keynames=['date','time','stepRange','values'],
1300                                  keyvalues=[int(date.strftime('%Y%m%d')),
1301                                             date.hour*100, step, cp_new_np[:,it+1]]
1302                                 )
1303
1304                # write second subgrid time step
1305                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1306                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1307                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1308                                  keynames=['date','time','stepRange','values'],
1309                                  keyvalues=[int(date.strftime('%Y%m%d')),
1310                                             date.hour*100, step, lsp_new_np[:,it+2]]
1311                                 )
1312                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1313                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1314                                  keynames=['date','time','stepRange','values'],
1315                                  keyvalues=[int(date.strftime('%Y%m%d')),
1316                                             date.hour*100, step, cp_new_np[:,it+2]]
1317                                 )
1318                it = it + 3 # jump to next original time step
1319        return
1320
1321    def _create_rr_grib_dummy(self, ifile, inputdir):
1322        '''Creates a grib file with a dummy message for the two precipitation
1323        types lsp and cp each.
1324
1325        Parameters
1326        ----------
1327        ifile : str
1328            Filename of the input file to read the grib messages from.
1329
1330        inputdir : str, optional
1331            Path to the directory where the retrieved data is stored.
1332
1333        Return
1334        ------
1335
1336        '''
1337
1338        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1339
1340        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1341                      keyvalues=[142], filemode='w')
1342
1343        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1344                      keyvalues=[143], filemode='a')
1345
1346        return
1347
1348    def create(self, inputfiles, c):
1349        '''An index file will be created which depends on the combination
1350        of "date", "time" and "stepRange" values. This is used to iterate
1351        over all messages in each grib file which were passed through the
1352        parameter "inputfiles" to seperate specific parameters into fort.*
1353        files. Afterwards the FORTRAN program is called to convert
1354        the data fields all to the same grid and put them in one file
1355        per unique time step (combination of "date", "time" and
1356        "stepRange").
1357
1358        Note
1359        ----
1360        This method is based on the ECMWF example index.py
1361        https://software.ecmwf.int/wiki/display/GRIB/index.py
1362
1363        Parameters
1364        ----------
1365        inputfiles : UioFiles
1366            Contains a list of files.
1367
1368        c : ControlFile
1369            Contains all the parameters of CONTROL file and
1370            command line.
1371
1372        Return
1373        ------
1374
1375        '''
1376
1377        # generate start and end timestamp of the retrieval period
1378        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1379        start_period = start_period + timedelta(hours=int(c.step[0]))
1380        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1381        end_period = end_period + timedelta(hours=int(c.step[-1]))
1382
1383        if c.wrf:
1384            table128 = init128(_config.PATH_GRIBTABLE)
1385            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1386                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1387                                  table128)
1388
1389        # these numbers are indices for the temporary files "fort.xx"
1390        # which are used to seperate the grib fields to,
1391        # for the Fortran program input
1392        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1393        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1394        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1395                 '17':None, '18':None, '19':None, '21':None, '22':None}
1396
1397        iid = None
1398        index_vals = None
1399
1400        # get the values of the keys which are used for distinct access
1401        # of grib messages via product
1402        if '/' in self.number:
1403            # more than one ensemble member is selected
1404            index_keys = ["number", "date", "time", "step"]
1405        else:
1406            index_keys = ["date", "time", "step"]
1407        iid, index_vals = self._mk_index_values(c.inputdir,
1408                                                inputfiles,
1409                                                index_keys)
1410        # index_vals looks like e.g.:
1411        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1412        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1413        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1414
1415        # "product" genereates each possible combination between the
1416        # values of the index keys
1417        for prod in product(*index_vals):
1418            # e.g. prod = ('20170505', '0', '12')
1419            #             (  date    ,time, step)
1420
1421            print('current product: ', prod)
1422
1423            for i in range(len(index_keys)):
1424                codes_index_select(iid, index_keys[i], prod[i])
1425
1426            # get first id from current product
1427            gid = codes_new_from_index(iid)
1428
1429            # if there is no data for this specific time combination / product
1430            # skip the rest of the for loop and start with next timestep/product
1431            if not gid:
1432                continue
1433#============================================================================================
1434            # remove old fort.* files and open new ones
1435            # they are just valid for a single product
1436            for k, f in fdict.iteritems():
1437                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1438                silent_remove(fortfile)
1439                fdict[k] = open(fortfile, 'w')
1440#============================================================================================
1441            # create correct timestamp from the three time informations
1442            cdate = str(codes_get(gid, 'date'))
1443            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1444            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1445            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1446            timestamp += timedelta(hours=int(cstep))
1447            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1448
1449            # skip all temporary times
1450            # which are outside the retrieval period
1451            if timestamp < start_period or \
1452               timestamp > end_period:
1453                continue
1454
1455            # if the timestamp is out of basetime start/end date period,
1456            # skip this specific product
1457            if c.basetime is not None:
1458                time_delta = timedelta(hours=12-int(c.dtime))
1459                start_time = datetime.strptime(c.end_date + str(c.basetime),
1460                                                '%Y%m%d%H') - time_delta
1461                end_time = datetime.strptime(c.end_date + str(c.basetime),
1462                                             '%Y%m%d%H')
1463                if timestamp < start_time or timestamp > end_time:
1464                    continue
1465
1466            if c.wrf:
1467                if 'olddate' not in locals() or cdate != olddate:
1468                    fwrf = open(os.path.join(c.outputdir,
1469                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1470                    olddate = cdate[:]
1471#============================================================================================
1472            # savedfields remembers which fields were already used.
1473            savedfields = []
1474            # sum of cloud liquid and ice water content
1475            scwc = None
1476            while 1:
1477                if not gid:
1478                    break
1479                paramId = codes_get(gid, 'paramId')
1480                gridtype = codes_get(gid, 'gridType')
1481                levtype = codes_get(gid, 'typeOfLevel')
1482                if paramId == 77: # ETADOT
1483                    codes_write(gid, fdict['21'])
1484                elif paramId == 130: # T
1485                    codes_write(gid, fdict['11'])
1486                elif paramId == 131 or paramId == 132: # U, V wind component
1487                    codes_write(gid, fdict['10'])
1488                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1489                    codes_write(gid, fdict['17'])
1490                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1491                    codes_write(gid, fdict['18'])
1492                elif paramId == 135: # W
1493                    codes_write(gid, fdict['19'])
1494                elif paramId == 152: # LNSP
1495                    codes_write(gid, fdict['12'])
1496                elif paramId == 155 and gridtype == 'sh': # D
1497                    codes_write(gid, fdict['13'])
1498                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1499                    # sum cloud liquid water and ice
1500                    if scwc is None:
1501                        scwc = codes_get_values(gid)
1502                    else:
1503                        scwc += codes_get_values(gid)
1504                        codes_set_values(gid, scwc)
1505                        codes_set(gid, 'paramId', 201031)
1506                        codes_write(gid, fdict['22'])
1507                        scwc = None
1508                elif c.wrf and paramId in [129, 138, 155] and \
1509                      levtype == 'hybrid': # Z, VO, D
1510                    # do not do anything right now
1511                    # these are specific parameter for WRF
1512                    pass
1513                else:
1514                    if paramId not in savedfields:
1515                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1516                        # and all ADDPAR parameter
1517                        codes_write(gid, fdict['16'])
1518                        savedfields.append(paramId)
1519                    else:
1520                        print('duplicate ' + str(paramId) + ' not written')
1521
1522                try:
1523                    if c.wrf:
1524                        # model layer
1525                        if levtype == 'hybrid' and \
1526                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1527                            codes_write(gid, fwrf)
1528                        # sfc layer
1529                        elif paramId in wrfpars:
1530                            codes_write(gid, fwrf)
1531                except AttributeError:
1532                    pass
1533
1534                codes_release(gid)
1535                gid = codes_new_from_index(iid)
1536#============================================================================================
1537            for f in fdict.values():
1538                f.close()
1539#============================================================================================
1540            # call for Fortran program to convert e.g. reduced_gg grids to
1541            # regular_ll and calculate detadot/dp
1542            pwd = os.getcwd()
1543            os.chdir(c.inputdir)
1544            if os.stat('fort.21').st_size == 0 and c.eta:
1545                print('Parameter 77 (etadot) is missing, most likely it is '
1546                      'not available for this type or date / time\n')
1547                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1548                my_error('fort.21 is empty while parameter eta '
1549                         'is set to 1 in CONTROL file')
1550# ============================================================================================
1551            # write out all output to log file before starting fortran programm
1552            sys.stdout.flush()
1553
1554            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1555            execute_subprocess([os.path.join(c.exedir,
1556                                _config.FORTRAN_EXECUTABLE)],
1557                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1558
1559            os.chdir(pwd)
1560# ============================================================================================
1561            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1562            if c.purefc:
1563                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1564            else:
1565                suffix = cdate_hour[2:10]
1566
1567            # if necessary, add ensemble member number to filename suffix
1568            if 'number' in index_keys:
1569                index_number = index_keys.index('number')
1570                if len(index_vals[index_number]) > 1:
1571                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1572
1573            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1574            print("outputfile = " + fnout)
1575            # collect for final processing
1576            self.outputfilelist.append(os.path.basename(fnout))
1577            # get additional precipitation subgrid data if available
1578            if c.rrint:
1579                self.outputfilelist.append(os.path.basename(fnout + '_1'))
1580                self.outputfilelist.append(os.path.basename(fnout + '_2'))
1581# ============================================================================================
1582            # create outputfile and copy all data from intermediate files
1583            # to the outputfile (final GRIB input files for FLEXPART)
1584            orolsm = os.path.basename(glob.glob(c.inputdir +
1585                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1586            fluxfile = 'flux' + cdate[0:2] + suffix
1587            if not c.cwc:
1588                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1589            else:
1590                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1591
1592            with open(fnout, 'wb') as fout:
1593                for f in flist:
1594                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1595                                       fout)
1596
1597            if c.omega:
1598                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1599                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1600                                            'rb'), fout)
1601# ============================================================================================
1602        if c.wrf:
1603            fwrf.close()
1604
1605        codes_index_release(iid)
1606
1607        return
1608
1609
1610    def calc_extra_elda(self, path, prefix):
1611        ''' Calculates extra ensemble members for ELDA - Stream.
1612
1613        Parameters
1614        ----------
1615        path : str
1616            Path to the output files.
1617
1618        prefix : str
1619            The prefix of the output filenames as defined in Control file.
1620
1621        Return
1622        ------
1623
1624        '''
1625        # max number
1626        maxnum = int(self.number.split('/')[-1])
1627
1628        # get a list of all prepared output files with control forecast (CF)
1629        CF_filelist = UioFiles(path, prefix + '*.N000')
1630
1631        for cffile in CF_filelist.files:
1632            with open(cffile, 'rb') as f:
1633                cfvalues=[]
1634                while True:
1635                    fid = codes_grib_new_from_file(f)
1636                    if fid is None:
1637                        break
1638                    cfvalues.append(codes_get_array(fid, 'values'))
1639                    codes_release(fid)
1640
1641            filename = cffile.split('N000')[0]
1642            for i in range(1, maxnum+1): # max number nehmen
1643
1644                # read an ensemble member
1645                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1646                # create file for newly calculated ensemble member
1647                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1648                # number of message in grib file
1649                j = 0
1650                while True:
1651                    gid = codes_grib_new_from_file(g)
1652                    if gid is None:
1653                        break
1654                    values = codes_get_array(gid, 'values')
1655                    codes_set_array(gid, 'values',
1656                                    values-2*(values-cfvalues[j]))
1657                    codes_set(gid, 'number', i+maxnum)
1658                    codes_write(gid, h)
1659                    codes_release(gid)
1660                    j += 1
1661
1662                g.close()
1663                h.close()
1664                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1665                self.outputfilelist.append(
1666                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1667
1668        return
1669
1670
1671    def process_output(self, c):
1672        '''Postprocessing of FLEXPART input files.
1673
1674        The grib files are postprocessed depending on the selection in
1675        CONTROL file. The resulting files are moved to the output
1676        directory if its not equal to the input directory.
1677        The following modifications might be done if
1678        properly switched in CONTROL file:
1679        GRIB2 - Conversion to GRIB2
1680        ECTRANS - Transfer of files to gateway server
1681        ECSTORAGE - Storage at ECMWF server
1682
1683        Parameters
1684        ----------
1685        c : ControlFile
1686            Contains all the parameters of CONTROL file and
1687            command line.
1688
1689        Return
1690        ------
1691
1692        '''
1693
1694        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1695
1696        if _config.FLAG_ON_ECMWFSERVER:
1697            print('ecstorage: {}\n ecfsdir: {}\n'.
1698                  format(c.ecstorage, c.ecfsdir))
1699            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1700                  .format(c.ectrans, c.gateway, c.destination))
1701
1702        print('Output filelist: ')
1703        print(self.outputfilelist)
1704
1705        for ofile in self.outputfilelist:
1706            ofile = os.path.join(self.inputdir, ofile)
1707
1708            if c.format.lower() == 'grib2':
1709                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1710                                    'productDefinitionTemplateNumber=8',
1711                                    ofile, ofile + '_2'],
1712                                   error_msg='GRIB2 CONVERSION FAILED!')
1713
1714                execute_subprocess(['mv', ofile + '_2', ofile],
1715                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1716                                   'FILES FAILED!')
1717
1718            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1719                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1720                                    c.gateway, '-remote', c.destination,
1721                                    '-source', ofile],
1722                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1723
1724            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1725                execute_subprocess(['ecp', '-o', ofile,
1726                                    os.path.expandvars(c.ecfsdir)],
1727                                   error_msg='COPY OF FILES TO ECSTORAGE '
1728                                   'AREA FAILED!')
1729
1730            if c.outputdir != c.inputdir:
1731                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1732                                    c.outputdir],
1733                                   error_msg='RELOCATION OF OUTPUT FILES '
1734                                   'TO OUTPUTDIR FAILED!')
1735
1736        return
1737
1738
1739    def prepare_fp_files(self, c):
1740        '''Conversion of GRIB files to FLEXPART binary format.
1741
1742        Parameters
1743        ----------
1744        c : ControlFile
1745            Contains all the parameters of CONTROL file and
1746            command line.
1747
1748        Return
1749        ------
1750
1751        '''
1752        # generate AVAILABLE file
1753        # Example of AVAILABLE file data:
1754        # 20131107 000000      EN13110700              ON DISC
1755        clist = []
1756        for ofile in self.outputfilelist:
1757            fname = ofile.split('/')
1758            if '.' in fname[-1]:
1759                l = fname[-1].split('.')
1760                timestamp = datetime.strptime(l[0][-6:] + l[1],
1761                                              '%y%m%d%H')
1762                timestamp += timedelta(hours=int(l[2]))
1763                cdate = datetime.strftime(timestamp, '%Y%m%d')
1764                chms = datetime.strftime(timestamp, '%H%M%S')
1765            else:
1766                cdate = '20' + fname[-1][-8:-2]
1767                chms = fname[-1][-2:] + '0000'
1768            clist.append(cdate + ' ' + chms + ' '*6 +
1769                         fname[-1] + ' '*14 + 'ON DISC')
1770        clist.sort()
1771        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1772            f.write('\n'.join(clist) + '\n')
1773
1774        # generate pathnames file
1775        pwd = os.path.abspath(c.outputdir)
1776        with open(pwd + '/pathnames', 'w') as f:
1777            f.write(pwd + '/Options/\n')
1778            f.write(pwd + '/\n')
1779            f.write(pwd + '/\n')
1780            f.write(pwd + '/AVAILABLE\n')
1781            f.write(' = == = == = == = == = == ==  = \n')
1782
1783        # create Options dir if necessary
1784        if not os.path.exists(pwd + '/Options'):
1785            make_dir(pwd+'/Options')
1786
1787        # read template COMMAND file
1788        with open(os.path.expandvars(os.path.expanduser(
1789            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1790            lflist = f.read().split('\n')
1791
1792        # find index of list where to put in the
1793        # date and time information
1794        # usually after the LDIRECT parameter
1795        i = 0
1796        for l in lflist:
1797            if 'LDIRECT' in l.upper():
1798                break
1799            i += 1
1800
1801        # insert the date and time information of run start and end
1802        # into the list of lines of COMMAND file
1803        lflist = lflist[:i+1] + \
1804                 [clist[0][:16], clist[-1][:16]] + \
1805                 lflist[i+3:]
1806
1807        # write the new COMMAND file
1808        with open(pwd + '/Options/COMMAND', 'w') as g:
1809            g.write('\n'.join(lflist) + '\n')
1810
1811        # change to outputdir and start the grib2flexpart run
1812        # afterwards switch back to the working dir
1813        os.chdir(c.outputdir)
1814        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1815         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1816        execute_subprocess(cmd)
1817        os.chdir(pwd)
1818
1819        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG