source: flex_extract.git/source/python/classes/EcFlexpart.py @ c49aa73

ctbtodev
Last change on this file since c49aa73 was c49aa73, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

python2 downgrade/ corrected maxl/maxb calculation for Fortran prog. / rm .fp conversion calculation / bug fix for new disaggregation call / bug fix of loops over types and params (sorted) / bugfix flux selection

  • Property mode set to 100644
File size: 76.2 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#
27#    February 2018 - Anne Philipp (University of Vienna):
28#        - applied PEP8 style guide
29#        - added documentation
30#        - removed function getFlexpartTime in class EcFlexpart
31#        - outsourced class ControlFile
32#        - outsourced class MarsRetrieval
33#        - changed class name from EIFlexpart to EcFlexpart
34#        - applied minor code changes (style)
35#        - removed "dead code" , e.g. retrieval of Q since it is not needed
36#        - removed "times" parameter from retrieve-method since it is not used
37#        - seperated function "retrieve" into smaller functions (less code
38#          duplication, easier testing)
39#
40# @License:
41#    (C) Copyright 2014-2019.
42#    Anne Philipp, Leopold Haimberger
43#
44#    This work is licensed under the Creative Commons Attribution 4.0
45#    International License. To view a copy of this license, visit
46#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
47#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
48#*******************************************************************************
49#pylint: disable=unsupported-assignment-operation
50# this is disabled because for this specific case its an error in pylint
51#pylint: disable=consider-using-enumerate
52# this is not useful in this case
53# ------------------------------------------------------------------------------
54# MODULES
55# ------------------------------------------------------------------------------
56from __future__ import print_function
57
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64
65# software specific classes and modules from flex_extract
66sys.path.append('../')
67import _config
68from classes.GribUtil import GribUtil
69from mods.tools import (init128, to_param_id, silent_remove, product,
70                        my_error, make_dir, get_informations, get_dimensions,
71                        execute_subprocess, to_param_id_with_tablenumber,
72                        generate_retrieval_period_boundary)
73from classes.MarsRetrieval import MarsRetrieval
74from classes.UioFiles import UioFiles
75import mods.disaggregation as disaggregation
76
77# ------------------------------------------------------------------------------
78# CLASS
79# ------------------------------------------------------------------------------
80class EcFlexpart(object):
81    '''
82    Class to represent FLEXPART specific ECMWF data.
83
84    FLEXPART needs grib files in a specifc format. All necessary data fields
85    for one time step are stored in a single file. The class represents an
86    instance with all the parameter and settings necessary for retrieving
87    MARS data and modifing them so they are fitting FLEXPART needs. The class
88    is able to disaggregate the fluxes and convert grid types to the one needed
89    by FLEXPART, therefore using the FORTRAN program.
90
91    Attributes
92    ----------
93    mreq_count : int
94        Counter for the number of generated mars requests.
95
96    inputdir : str
97        Path to the directory where the retrieved data is stored.
98
99    dataset : str
100        For public datasets there is the specific naming and parameter
101        dataset which has to be used to characterize the type of
102        data.
103
104    basetime : int
105        The time for a half day retrieval. The 12 hours upfront are to be
106        retrieved.
107
108    dtime : str
109        Time step in hours.
110
111    acctype : str
112        The field type for the accumulated forecast fields.
113
114    acctime : str
115        The starting time from the accumulated forecasts.
116
117    accmaxstep : str
118        The maximum forecast step for the accumulated forecast fields.
119
120    marsclass : str
121        Characterisation of dataset.
122
123    stream : str
124        Identifies the forecasting system used to generate the data.
125
126    number : str
127        Selects the member in ensemble forecast run.
128
129    resol : str
130        Specifies the desired triangular truncation of retrieved data,
131        before carrying out any other selected post-processing.
132
133    accuracy : str
134        Specifies the number of bits per value to be used in the
135        generated GRIB coded fields.
136
137    addpar : str
138        List of additional parameters to be retrieved.
139
140    level : str
141        Specifies the maximum level.
142
143    expver : str
144        The version of the dataset.
145
146    levelist : str
147        Specifies the required levels.
148
149    glevelist : str
150        Specifies the required levels for gaussian grids.
151
152    gaussian : str
153        This parameter is deprecated and should no longer be used.
154        Specifies the desired type of Gaussian grid for the output.
155
156    grid : str
157        Specifies the output grid which can be either a Gaussian grid
158        or a Latitude/Longitude grid.
159
160    area : str
161        Specifies the desired sub-area of data to be extracted.
162
163    purefc : int
164        Switch for definition of pure forecast mode or not.
165
166    outputfilelist : list of str
167        The final list of FLEXPART ready input files.
168
169    types : dictionary
170        Determines the combination of type of fields, time and forecast step
171        to be retrieved.
172
173    params : dictionary
174        Collection of grid types and their corresponding parameters,
175        levels, level types and the grid definition.
176
177    server : ECMWFService or ECMWFDataServer
178        This is the connection to the ECMWF data servers.
179
180    public : int
181        Decides which Web API Server version is used.
182
183    dates : str
184        Contains start and end date of the retrieval in the format
185        "YYYYMMDD/to/YYYYMMDD"
186    '''
187
188    # --------------------------------------------------------------------------
189    # CLASS FUNCTIONS
190    # --------------------------------------------------------------------------
191    def __init__(self, c, fluxes=False):
192        '''Creates an object/instance of EcFlexpart with the associated
193        settings of its attributes for the retrieval.
194
195        Parameters:
196        -----------
197        c : ControlFile
198            Contains all the parameters of CONTROL file and
199            command line.
200
201        fluxes : boolean, optional
202            Decides if the flux parameter settings are stored or
203            the rest of the parameter list.
204            Default value is False.
205
206        Return
207        ------
208
209        '''
210        # set a counter for the number of generated mars requests
211        self.mreq_count = 0
212
213        self.inputdir = c.inputdir
214        self.dataset = c.dataset
215        self.basetime = c.basetime
216        self.dtime = c.dtime
217        self.acctype = c.acctype
218        self.acctime = c.acctime
219        self.accmaxstep = c.accmaxstep
220        self.marsclass = c.marsclass
221        self.stream = c.stream
222        self.number = c.number
223        self.resol = c.resol
224        self.accuracy = c.accuracy
225        self.addpar = c.addpar
226        self.level = c.level
227        self.expver = c.expver
228        self.levelist = c.levelist
229        self.glevelist = '1/to/' + c.level # in case of gaussian grid
230        self.gaussian = c.gaussian
231        self.grid = c.grid
232        self.area = c.area
233        self.purefc = c.purefc
234        self.outputfilelist = []
235
236        # Define the different types of field combinations (type, time, step)
237        self.types = {}
238        # Define the parameters and their level types, level list and
239        # grid resolution for the retrieval job
240        self.params = {}
241
242        if fluxes:
243            self._create_params_fluxes()
244        else:
245            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
246
247        if fluxes:# and not c.purefc:
248            self._create_field_types_fluxes()
249        else:
250            self._create_field_types(c.type, c.time, c.step)
251        return
252
253    def _create_field_types(self, ftype, ftime, fstep):
254        '''Create the combination of field type, time and forecast step.
255
256        Parameters:
257        -----------
258        ftype : list of str
259            List of field types.
260
261        ftime : list of str
262            The time in hours of the field.
263
264        fstep : str
265            Specifies the forecast time step from forecast base time.
266            Valid values are hours (HH) from forecast base time.
267
268        Return
269        ------
270
271        '''
272        i = 0
273        for ty, st, ti in zip(ftype, fstep, ftime):
274            btlist = list(range(len(ftime)))
275            if self.basetime == 12:
276                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
277            if self.basetime == 0:
278                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
279
280            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
281                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
282                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
283                 # (int(c.time[i]) in btlist or c.purefc):
284
285            if (i in btlist) or self.purefc:
286
287                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
288                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
289
290                    if ty not in self.types.keys():
291                        self.types[ty] = {'times': '', 'steps': ''}
292
293                    if ti not in self.types[ty]['times']:
294                        if self.types[ty]['times']:
295                            self.types[ty]['times'] += '/'
296                        self.types[ty]['times'] += ti
297
298                    if st not in self.types[ty]['steps']:
299                        if self.types[ty]['steps']:
300                            self.types[ty]['steps'] += '/'
301                        self.types[ty]['steps'] += st
302            i += 1
303
304        return
305
306    def _create_field_types_fluxes(self):
307        '''Create the combination of field type, time and forecast step
308        for the flux data.
309
310        Parameters:
311        -----------
312
313        Return
314        ------
315
316        '''
317        self.types[str(self.acctype)] = {'times': str(self.acctime),
318                                         'steps': '{}/to/{}/by/{}'.format(
319                                             self.dtime,
320                                             self.accmaxstep,
321                                             self.dtime)}
322        return
323
324    def _create_params(self, gauss, eta, omega, cwc, wrf):
325        '''Define the specific parameter settings for retrievment.
326
327        The different parameters need specific grid types and level types
328        for retrievement. We might get following combination of types
329        (depending on selection and availability):
330        (These are short cuts for the grib file names (leading sequence)
331        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
332        where:
333            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
334            ML = Model Level, SL = Surface Level
335
336        For each of this combination there is a list of parameter names,
337        the level type, the level list and the grid resolution.
338
339        There are different scenarios for data extraction from MARS:
340        1) Retrieval of etadot
341           eta=1, gauss=0, omega=0
342        2) Calculation of etadot from divergence
343           eta=0, gauss=1, omega=0
344        3) Calculation of etadot from omega (for makes sense for debugging)
345           eta=0, gauss=0, omega=1
346        4) Retrieval and Calculation of etadot (only for debugging)
347           eta=1, gauss=1, omega=0
348        5) Download also specific model and surface level data for FLEXPART-WRF
349
350        Parameters:
351        -----------
352        gauss : int
353            Gaussian grid is retrieved.
354
355        eta : int
356            Etadot parameter will be directly retrieved.
357
358        omega : int
359            The omega paramterwill be retrieved.
360
361        cwc : int
362            The cloud liquid and ice water content will be retrieved.
363
364        wrf : int
365            Additional model level and surface level data will be retrieved for
366            WRF/FLEXPART-WRF simulations.
367
368        Return
369        ------
370
371        '''
372        # SURFACE FIELDS
373        #-----------------------------------------------------------------------
374        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
375        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
376                                 'SFC', '1', self.grid]
377        if self.addpar:
378            self.params['OG__SL'][0] += self.addpar
379
380        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
381            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
382                                            'SFC', '1', self.grid]
383        else:
384            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
385                                            'SFC', '1', self.grid]
386
387        # MODEL LEVEL FIELDS
388        #-----------------------------------------------------------------------
389        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
390
391        if not gauss and eta:
392            self.params['OG__ML'][0] += '/U/V/ETADOT'
393        elif gauss and not eta:
394            self.params['GG__SL'] = ['Q', 'ML', '1',
395                                     '{}'.format((int(self.resol) + 1) // 2)]
396            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
397        elif not gauss and not eta:
398            self.params['OG__ML'][0] += '/U/V'
399        else:  # GAUSS and ETA
400            print('Warning: Collecting etadot and parameters for gaussian grid '
401                           'is a very costly parameter combination, '
402                           'use this combination only for debugging!')
403            self.params['GG__SL'] = ['Q', 'ML', '1',
404                                     '{}'.format((int(self.resol) + 1) // 2)]
405            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
406                                     '{}'.format((int(self.resol) + 1) // 2)]
407
408        if omega:
409            self.params['OG__ML'][0] += '/W'
410
411        if cwc:
412            self.params['OG__ML'][0] += '/CLWC/CIWC'
413
414        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
415        # -----------------------------------------------------------------------
416        if wrf:
417            self.params['OG__ML'][0] += '/Z/VO'
418            if '/D' not in self.params['OG__ML'][0]:
419                self.params['OG__ML'][0] += '/D'
420
421            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
422                       'SWVL1','SWVL2','SWVL3','SWVL4']
423            for par in wrf_sfc:
424                if par not in self.params['OG__SL'][0]:
425                    self.params['OG__SL'][0] += '/' + par
426
427        return
428
429
430    def _create_params_fluxes(self):
431        '''Define the parameter setting for flux data.
432
433        Flux data are accumulated fields in time and are stored on the
434        surface level. The leading short cut name for the grib files is:
435        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
436        acc for Accumulated Grid.
437        The params dictionary stores a list of parameter names, the level type,
438        the level list and the grid resolution.
439
440        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
441
442        Parameters:
443        -----------
444
445        Return
446        ------
447
448        '''
449        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
450                                    'SFC', '1', self.grid]
451        return
452
453
454    def _mk_targetname(self, ftype, param, date):
455        '''Creates the filename for the requested grib data to be stored in.
456        This name is passed as the "target" parameter in the request.
457
458        Parameters
459        ----------
460        ftype : str
461            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
462
463        param : str
464            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
465            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
466
467        date : str
468            The date period of the grib data to be stored in this file.
469
470        Return
471        ------
472        targetname : str
473            The target filename for the grib data.
474        '''
475        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
476                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
477
478        return targetname
479
480
481    def _start_retrievement(self, request, par_dict):
482        '''Creates the Mars Retrieval and prints or submits the request
483        depending on the status of the request variable.
484
485        Parameters
486        ----------
487        request : int
488            Selects the mode of retrieval.
489            0: Retrieves the data from ECMWF.
490            1: Prints the mars requests to an output file.
491            2: Retrieves the data and prints the mars request.
492
493        par_dict : dictionary
494            Contains all parameter which have to be set for creating the
495            Mars Retrievals. The parameter are:
496            marsclass, dataset, stream, type, levtype, levelist, resol,
497            gaussian, accuracy, grid, target, area, date, time, number,
498            step, expver, param
499
500        Return
501        ------
502
503        '''
504        # increase number of mars requests
505        self.mreq_count += 1
506
507        MR = MarsRetrieval(self.server,
508                           self.public,
509                           marsclass=par_dict['marsclass'],
510                           dataset=par_dict['dataset'],
511                           stream=par_dict['stream'],
512                           type=par_dict['type'],
513                           levtype=par_dict['levtype'],
514                           levelist=par_dict['levelist'],
515                           resol=par_dict['resol'],
516                           gaussian=par_dict['gaussian'],
517                           accuracy=par_dict['accuracy'],
518                           grid=par_dict['grid'],
519                           target=par_dict['target'],
520                           area=par_dict['area'],
521                           date=par_dict['date'],
522                           time=par_dict['time'],
523                           number=par_dict['number'],
524                           step=par_dict['step'],
525                           expver=par_dict['expver'],
526                           param=par_dict['param'])
527
528        if request == 0:
529            MR.display_info()
530            MR.data_retrieve()
531        elif request == 1:
532            MR.print_infodata_csv(self.inputdir, self.mreq_count)
533        elif request == 2:
534            MR.print_infodata_csv(self.inputdir, self.mreq_count)
535            MR.display_info()
536            MR.data_retrieve()
537        else:
538            print('Failure')
539
540        return
541
542
543    def _mk_index_values(self, inputdir, inputfiles, keys):
544        '''Creates an index file for a set of grib parameter keys.
545        The values from the index keys are returned in a list.
546
547        Parameters
548        ----------
549        keys : dictionary
550            List of parameter names which serves as index.
551
552        inputfiles : UioFiles
553            Contains a list of files.
554
555        Return
556        ------
557        iid : codes_index
558            This is a grib specific index structure to access
559            messages in a file.
560
561        index_vals : list of list  of str
562            Contains the values from the keys used for a distinct selection
563            of grib messages in processing  the grib files.
564            Content looks like e.g.:
565            index_vals[0]: ('20171106', '20171107', '20171108') ; date
566            index_vals[1]: ('0', '1200', '1800', '600') ; time
567            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
568        '''
569        from eccodes import codes_index_get
570
571        iid = None
572        index_keys = keys
573
574        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
575        silent_remove(indexfile)
576        grib = GribUtil(inputfiles.files)
577        # creates new index file
578        iid = grib.index(index_keys=index_keys, index_file=indexfile)
579
580        # read the values of index keys
581        index_vals = []
582        for key in index_keys:
583            key_vals = codes_index_get(iid, key)
584            # have to sort the key values for correct order,
585            # therefore convert to int first
586            key_vals = [int(k) for k in key_vals]
587            key_vals.sort()
588            key_vals = [str(k) for k in key_vals]
589            index_vals.append(key_vals)
590            # index_vals looks for example like:
591            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
592            # index_vals[1]: ('0', '1200') ; time
593            # index_vals[2]: (3', '6', '9', '12') ; stepRange
594
595        return iid, index_vals
596
597
598    def retrieve(self, server, dates, public, request, inputdir='.'):
599        '''Finalizing the retrieval information by setting final details
600        depending on grid type.
601        Prepares MARS retrievals per grid type and submits them.
602
603        Parameters
604        ----------
605        server : ECMWFService or ECMWFDataServer
606            The connection to the ECMWF server. This is different
607            for member state users which have full access and non
608            member state users which have only access to the public
609            data sets. The decision is made from command line argument
610            "public"; for public access its True (ECMWFDataServer)
611            for member state users its False (ECMWFService)
612
613        dates : str
614            Contains start and end date of the retrieval in the format
615            "YYYYMMDD/to/YYYYMMDD"
616
617        request : int
618            Selects the mode of retrieval.
619            0: Retrieves the data from ECMWF.
620            1: Prints the mars requests to an output file.
621            2: Retrieves the data and prints the mars request.
622
623        inputdir : str, optional
624            Path to the directory where the retrieved data is about
625            to be stored. The default is the current directory ('.').
626
627        Return
628        ------
629
630        '''
631        self.dates = dates
632        self.server = server
633        self.public = public
634        self.inputdir = inputdir
635        oro = False
636
637        # define times with datetime module
638        t12h = timedelta(hours=12)
639        t24h = timedelta(hours=24)
640
641        # dictionary which contains all parameter for the mars request,
642        # entries with a "None" will change in different requests and will
643        # therefore be set in each request seperately
644        retr_param_dict = {'marsclass':self.marsclass,
645                           'dataset':self.dataset,
646                           'stream':None,
647                           'type':None,
648                           'levtype':None,
649                           'levelist':None,
650                           'resol':self.resol,
651                           'gaussian':None,
652                           'accuracy':self.accuracy,
653                           'grid':None,
654                           'target':None,
655                           'area':None,
656                           'date':None,
657                           'time':None,
658                           'number':self.number,
659                           'step':None,
660                           'expver':self.expver,
661                           'param':None}
662
663        for ftype in sorted(self.types):
664            # ftype contains field types such as
665            #     [AN, FC, PF, CV]
666            for pk, pv in sorted(self.params.items()):
667                # pk contains one of these keys of params
668                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
669                #      OG_OROLSM_SL, OG_acc_SL]
670                # pv contains all of the items of the belonging key
671                #     [param, levtype, levelist, grid]
672                if isinstance(pv, str):
673                    continue
674                retr_param_dict['type'] = ftype
675                retr_param_dict['time'] = self.types[ftype]['times']
676                retr_param_dict['step'] = self.types[ftype]['steps']
677                retr_param_dict['date'] = self.dates
678                retr_param_dict['stream'] = self.stream
679                retr_param_dict['target'] = \
680                    self._mk_targetname(ftype,
681                                        pk,
682                                        retr_param_dict['date'].split('/')[0])
683                table128 = init128(_config.PATH_GRIBTABLE)
684                ids = to_param_id_with_tablenumber(pv[0], table128)
685                retr_param_dict['param'] = ids
686                retr_param_dict['levtype'] = pv[1]
687                retr_param_dict['levelist'] = pv[2]
688                retr_param_dict['grid'] = pv[3]
689                retr_param_dict['area'] = self.area
690                retr_param_dict['gaussian'] = self.gaussian
691
692                if pk == 'OG_OROLSM__SL' and not oro:
693                    oro = True
694                    # in CERA20C (class EP) there is no stream "OPER"!
695                    if self.marsclass.upper() != 'EP':
696                        retr_param_dict['stream'] = 'OPER'
697                    retr_param_dict['type'] = 'AN'
698                    retr_param_dict['time'] = '00'
699                    retr_param_dict['step'] = '000'
700                    retr_param_dict['date'] = self.dates.split('/')[0]
701                    retr_param_dict['target'] = self._mk_targetname('',
702                                            pk, retr_param_dict['date'])
703                elif pk == 'OG_OROLSM__SL' and oro:
704                    continue
705                if pk == 'GG__SL' and pv[0] == 'Q':
706                    retr_param_dict['area'] = ""
707                    retr_param_dict['gaussian'] = 'reduced'
708                if ftype.upper() == 'FC' and \
709                    'acc' not in retr_param_dict['target']:
710                    if (int(retr_param_dict['time'][0]) +
711                        int(retr_param_dict['step'][0])) > 23:
712                        dates = retr_param_dict['date'].split('/')
713                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
714                        sdate = sdate - timedelta(days=1)
715                        retr_param_dict['date'] = '/'.join(
716                            [sdate.strftime("%Y%m%d")] +
717                            retr_param_dict['date'][1:])
718
719                        print('CHANGED FC start date to ' +
720                              sdate.strftime("%Y%m%d") +
721                              ' to accomodate TIME=' +
722                              retr_param_dict['time'][0] +
723                              ', STEP=' +
724                              retr_param_dict['time'][0])
725
726    # ------  on demand path  --------------------------------------------------
727                if self.basetime is None:
728                    # ******* start retrievement
729                    self._start_retrievement(request, retr_param_dict)
730    # ------  operational path  ------------------------------------------------
731                else:
732                    # check if mars job requests fields beyond basetime.
733                    # if yes eliminate those fields since they may not
734                    # be accessible with user's credentials
735
736                    enddate = retr_param_dict['date'].split('/')[-1]
737                    elimit = datetime.strptime(enddate + str(self.basetime),
738                                               '%Y%m%d%H')
739
740                    if self.basetime == 12:
741                        # --------------  flux data ----------------------------
742                        if 'acc' in pk:
743                            startdate = retr_param_dict['date'].split('/')[0]
744                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
745                            retr_param_dict['date'] = '/'.join([startdate,
746                                                                'to',
747                                                                enddate])
748
749                            # ******* start retrievement
750                            self._start_retrievement(request, retr_param_dict)
751
752                            retr_param_dict['date'] = \
753                                datetime.strftime(elimit - t12h, '%Y%m%d')
754                            retr_param_dict['time'] = '00'
755                            retr_param_dict['target'] = \
756                                self._mk_targetname(ftype, pk,
757                                                    retr_param_dict['date'])
758
759                            # ******* start retrievement
760                            self._start_retrievement(request, retr_param_dict)
761
762                        # --------------  non flux data ------------------------
763                        else:
764                            # ******* start retrievement
765                            self._start_retrievement(request, retr_param_dict)
766
767                    elif self.basetime == 0:
768                        retr_param_dict['date'] = \
769                            datetime.strftime(elimit - t24h, '%Y%m%d')
770
771                        timesave = ''.join(retr_param_dict['time'])
772
773                        if ('/' in retr_param_dict['time'] and
774                            pk != 'OG_OROLSM__SL' and
775                            'acc' not in pk ) :
776                            times = retr_param_dict['time'].split('/')
777                            steps = retr_param_dict['step'].split('/')
778
779                            while int(times[0]) + int(steps[0]) <= 12:
780                                times = times[1:]
781                                if len(times) > 1:
782                                    retr_param_dict['time'] = '/'.join(times)
783                                else:
784                                    retr_param_dict['time'] = times[0]
785
786                        if (pk != 'OG_OROLSM__SL' and
787                            int(retr_param_dict['step'].split('/')[0]) == 0 and
788                            int(timesave.split('/')[0]) == 0):
789
790                            retr_param_dict['date'] = \
791                                datetime.strftime(elimit, '%Y%m%d')
792                            retr_param_dict['time'] = '00'
793                            retr_param_dict['step'] = '000'
794                            retr_param_dict['target'] = \
795                                self._mk_targetname(ftype, pk,
796                                                    retr_param_dict['date'])
797
798                        # ******* start retrievement
799                        self._start_retrievement(request, retr_param_dict)
800                    else:
801                        raise ValueError('ERROR: Basetime has an invalid value '
802                                                 '-> {}'.format(str(basetime)))
803
804        if request == 0 or request == 2:
805            print('MARS retrieve done ... ')
806        elif request == 1:
807            print('MARS request printed ...')
808
809        return
810
811
812    def write_namelist(self, c):
813        '''Creates a namelist file in the temporary directory and writes
814        the following values to it: maxl, maxb, mlevel,
815        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
816        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
817
818        Parameters
819        ----------
820        c : ControlFile
821            Contains all the parameters of CONTROL file and
822            command line.
823
824        filename : str
825                Name of the namelist file.
826
827        Return
828        ------
829
830        '''
831
832        from genshi.template.text import NewTextTemplate
833        from genshi.template import  TemplateLoader
834        from genshi.template.eval import  UndefinedError
835        import numpy as np
836
837        try:
838            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
839            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
840                                            cls=NewTextTemplate)
841
842            self.inputdir = c.inputdir
843            area = np.asarray(self.area.split('/')).astype(float)
844            grid = np.asarray(self.grid.split('/')).astype(float)
845
846            if area[1] > area[3]:
847                area[1] -= 360
848            maxl = round((area[3] - area[1]) / grid[1]) + 1
849            maxb = round((area[0] - area[2]) / grid[0]) + 1
850
851            stream = namelist_template.generate(
852                maxl = str(maxl),
853                maxb = str(maxb),
854                mlevel = str(self.level),
855                mlevelist = str(self.levelist),
856                mnauf = str(self.resol),
857                metapar = '77',
858                rlo0 = str(area[1]),
859                rlo1 = str(area[3]),
860                rla0 = str(area[2]),
861                rla1 = str(area[0]),
862                momega = str(c.omega),
863                momegadiff = str(c.omegadiff),
864                mgauss = str(c.gauss),
865                msmooth = str(c.smooth),
866                meta = str(c.eta),
867                metadiff = str(c.etadiff),
868                mdpdeta = str(c.dpdeta)
869            )
870        except UndefinedError as e:
871            print('... ERROR ' + str(e))
872
873            sys.exit('\n... error occured while trying to generate namelist ' +
874                     _config.TEMPFILE_NAMELIST)
875        except OSError as e:
876            print('... ERROR CODE: ' + str(e.errno))
877            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
878
879            sys.exit('\n... error occured while trying to generate template ' +
880                     _config.TEMPFILE_NAMELIST)
881
882        try:
883            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
884
885            with open(namelistfile, 'w') as f:
886                f.write(stream.render('text'))
887        except OSError as e:
888            print('... ERROR CODE: ' + str(e.errno))
889            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
890
891            sys.exit('\n... error occured while trying to write ' +
892                     namelistfile)
893
894        return
895
896
897    def deacc_fluxes(self, inputfiles, c):
898        '''De-accumulate and disaggregate flux data.
899
900        Goes through all flux fields in ordered time and de-accumulate
901        the fields. Afterwards the fields are disaggregated in time.
902        Different versions of disaggregation is provided for rainfall
903        data (darain, modified linear) and the surface fluxes and
904        stress data (dapoly, cubic polynomial).
905
906        Parameters
907        ----------
908        inputfiles : UioFiles
909            Contains the list of files that contain flux data.
910
911        c : ControlFile
912            Contains all the parameters of CONTROL file and
913            command line.
914
915        Return
916        ------
917
918        '''
919        import numpy as np
920        from eccodes import (codes_index_select, codes_new_from_index, codes_get,
921                             codes_get_values, codes_set_values, codes_set,
922                             codes_write, codes_release, codes_new_from_index,
923                             codes_index_release)
924
925        table128 = init128(_config.PATH_GRIBTABLE)
926        # get ids from the flux parameter names
927        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
928
929        iid = None
930        index_vals = None
931
932        # get the values of the keys which are used for distinct access
933        # of grib messages via product and save the maximum number of
934        # ensemble members if there is more than one
935        if '/' in self.number:
936            # more than one ensemble member is selected
937            index_keys = ["number", "date", "time", "step"]
938            # maximum ensemble number retrieved
939            # + 1 for the control run (ensemble number 0)
940            maxnum = int(self.number.split('/')[-1]) + 1
941            # remember the index of the number values
942            index_number = index_keys.index('number')
943            # empty set to save ensemble numbers which were already processed
944            ens_numbers = set()
945            # index for the ensemble number
946            inumb = 0
947        else:
948            index_keys = ["date", "time", "step"]
949            # maximum ensemble number
950            maxnum = None
951
952        # get sorted lists of the index values
953        # this is very important for disaggregating
954        # the flux data in correct order
955        iid, index_vals = self._mk_index_values(c.inputdir,
956                                                inputfiles,
957                                                index_keys)
958        # index_vals looks like e.g.:
959        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
960        # index_vals[1]: ('0', '600', '1200', '1800') ; time
961        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
962
963        if c.rrint:
964            # set start and end timestamps for retrieval period
965            if not c.purefc:
966                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
967                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
968            else:
969                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
970                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
971                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
972                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
973                end_date = end_date + timedelta(hours=c.maxstep)
974
975            # get necessary grid dimensions from grib files for storing the
976            # precipitation fields
977            info = get_informations(os.path.join(c.inputdir,
978                                                 inputfiles.files[0]))
979            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
980                                  start_date, end_date)
981
982            # create empty numpy arrays
983            if not maxnum:
984                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
985                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
986            else:
987                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
988                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
989
990            # index counter for time line
991            it_lsp = 0
992            it_cp = 0
993
994            # store the order of date and step
995            date_list = []
996            step_list = []
997
998        # initialize dictionaries to store flux values per parameter
999        orig_vals = {}
1000        deac_vals = {}
1001        for p in pars:
1002            orig_vals[p] = []
1003            deac_vals[p] = []
1004
1005        # "product" genereates each possible combination between the
1006        # values of the index keys
1007        for prod in product(*index_vals):
1008            # e.g. prod = ('20170505', '0', '12')
1009            #             ( date     ,time, step)
1010
1011            print('CURRENT PRODUCT: ', prod)
1012
1013            # the whole process has to be done for each seperate ensemble member
1014            # therefore, for each new ensemble member we delete old flux values
1015            # and start collecting flux data from the beginning time step
1016            if maxnum and prod[index_number] not in ens_numbers:
1017                ens_numbers.add(prod[index_number])
1018                inumb = len(ens_numbers) - 1
1019                # re-initialize dictionaries to store flux values per parameter
1020                # for the next ensemble member
1021                it_lsp = 0
1022                it_cp = 0
1023                orig_vals = {}
1024                deac_vals = {}
1025                for p in pars:
1026                    orig_vals[p] = []
1027                    deac_vals[p] = []
1028
1029            for i in range(len(index_keys)):
1030                codes_index_select(iid, index_keys[i], prod[i])
1031
1032            # get first id from current product
1033            gid = codes_new_from_index(iid)
1034
1035            # if there is no data for this specific time combination / product
1036            # skip the rest of the for loop and start with next timestep/product
1037            if not gid:
1038                continue
1039
1040            # create correct timestamp from the three time informations
1041            cdate = str(codes_get(gid, 'date'))
1042            time = codes_get(gid, 'time') // 100  # integer
1043            step = codes_get(gid, 'step') # integer
1044            ctime = '{:0>2}'.format(time)
1045            cstep = '{:0>3}'.format(step)
1046
1047            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1048            t_dt = t_date + timedelta(hours=step)
1049            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1050            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1051            if c.basetime is not None:
1052                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1053                                              '%Y%m%d%H')
1054            else:
1055                t_enddate = t_date + timedelta(2*int(c.dtime))
1056
1057            # if necessary, add ensemble member number to filename suffix
1058            # otherwise, add empty string
1059            if maxnum:
1060                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1061            else:
1062                numbersuffix = ''
1063
1064            if c.purefc:
1065                fnout = os.path.join(c.inputdir, 'flux' +
1066                                     t_date.strftime('%Y%m%d.%H') +
1067                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1068                                     numbersuffix)
1069                gnout = os.path.join(c.inputdir, 'flux' +
1070                                     t_date.strftime('%Y%m%d.%H') +
1071                                     '.{:0>3}'.format(step-int(c.dtime)) +
1072                                     numbersuffix)
1073                hnout = os.path.join(c.inputdir, 'flux' +
1074                                     t_date.strftime('%Y%m%d.%H') +
1075                                     '.{:0>3}'.format(step) +
1076                                     numbersuffix)
1077            else:
1078                fnout = os.path.join(c.inputdir, 'flux' +
1079                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1080                gnout = os.path.join(c.inputdir, 'flux' +
1081                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1082                hnout = os.path.join(c.inputdir, 'flux' +
1083                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1084
1085            print("outputfile = " + fnout)
1086            f_handle = open(fnout, 'w')
1087            h_handle = open(hnout, 'w')
1088            g_handle = open(gnout, 'w')
1089
1090            # read message for message and store relevant data fields, where
1091            # data keywords are stored in pars
1092            while True:
1093                if not gid:
1094                    break
1095                parId = codes_get(gid, 'paramId') # integer
1096                step = codes_get(gid, 'step') # integer
1097                time = codes_get(gid, 'time') # integer
1098                ni = codes_get(gid, 'Ni') # integer
1099                nj = codes_get(gid, 'Nj') # integer
1100                if parId not in orig_vals.keys():
1101                    # parameter is not a flux, find next one
1102                    continue
1103
1104                # define conversion factor
1105                if parId == 142 or parId == 143:
1106                    fak = 1. / 1000.
1107                else:
1108                    fak = 3600.
1109
1110                # get parameter values and reshape
1111                values = codes_get_values(gid)
1112                values = (np.reshape(values, (nj, ni))).flatten() / fak
1113
1114                # save the original and accumulated values
1115                orig_vals[parId].append(values[:])
1116
1117                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1118                    # no de-accumulation needed
1119                    deac_vals[parId].append(values[:] / int(c.dtime))
1120                else:
1121                    # do de-accumulation
1122                    deac_vals[parId].append(
1123                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1124                         int(c.dtime))
1125
1126                # store precipitation if new disaggregation method is selected
1127                # only the exact days are needed
1128                if c.rrint:
1129                    if start_date <= t_dt <= end_date:
1130                        if not c.purefc:
1131                            if t_dt not in date_list:
1132                                date_list.append(t_dt)
1133                                step_list = [0]
1134                        else:
1135                            if t_date not in date_list:
1136                                date_list.append(t_date)
1137                            if step not in step_list:
1138                                step_list.append(step)
1139                        # store precipitation values
1140                        if maxnum and parId == 142:
1141                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1142                            it_lsp += 1
1143                        elif not maxnum and parId == 142:
1144                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
1145                            it_lsp += 1
1146                        elif maxnum and parId == 143:
1147                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1148                            it_cp += 1
1149                        elif not maxnum and parId == 143:
1150                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
1151                            it_cp += 1
1152
1153                # information printout
1154                print(parId, time, step, len(values), values[0], np.std(values))
1155
1156                # length of deac_vals[parId] corresponds to the
1157                # number of time steps, max. 4 are needed for disaggegration
1158                # with the old and original method
1159                # run over all grib messages and perform
1160                # shifting in time
1161                if len(deac_vals[parId]) >= 3:
1162                    if len(deac_vals[parId]) > 3:
1163                        if not c.rrint and (parId == 142 or parId == 143):
1164                            values = disaggregation.darain(deac_vals[parId])
1165                        else:
1166                            values = disaggregation.dapoly(deac_vals[parId])
1167
1168                        if not (step == c.maxstep and c.purefc \
1169                                or t_dt == t_enddate):
1170                            # remove first time step in list to shift
1171                            # time line
1172                            orig_vals[parId].pop(0)
1173                            deac_vals[parId].pop(0)
1174                    else:
1175                        # if the third time step is read (per parId),
1176                        # write out the first one as a boundary value
1177                        if c.purefc:
1178                            values = deac_vals[parId][1]
1179                        else:
1180                            values = deac_vals[parId][0]
1181
1182                    if not (c.rrint and (parId == 142 or parId == 143)):
1183                        codes_set_values(gid, values)
1184
1185                        if c.purefc:
1186                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1187                        else:
1188                            codes_set(gid, 'stepRange', 0)
1189                            codes_set(gid, 'time', t_m2dt.hour*100)
1190                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1191
1192                        codes_write(gid, f_handle)
1193
1194                        # squeeze out information of last two steps
1195                        # contained in deac_vals[parId]
1196                        # Note that deac_vals[parId][0] has not been popped
1197                        # in this case
1198
1199                        if step == c.maxstep and c.purefc or \
1200                           t_dt == t_enddate:
1201                            # last step
1202                            if c.purefc:
1203                                values = deac_vals[parId][3]
1204                                codes_set_values(gid, values)
1205                                codes_set(gid, 'stepRange', step)
1206                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1207                                codes_write(gid, h_handle)
1208                            else:
1209                                values = deac_vals[parId][3]
1210                                codes_set_values(gid, values)
1211                                codes_set(gid, 'stepRange', 0)
1212                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1213                                codes_set(gid, 'time', truedatetime.hour * 100)
1214                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1215                                codes_write(gid, h_handle)
1216
1217                            if parId == 142 or parId == 143:
1218                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1219                            else:
1220                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1221
1222                            # step before last step
1223                            if c.purefc:
1224                                codes_set(gid, 'stepRange', step-int(c.dtime))
1225                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1226                                codes_set_values(gid, values)
1227                                codes_write(gid, g_handle)
1228                            else:
1229                                codes_set(gid, 'stepRange', 0)
1230                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1231                                codes_set(gid, 'time', truedatetime.hour * 100)
1232                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1233                                codes_set_values(gid, values)
1234                                codes_write(gid, g_handle)
1235
1236                codes_release(gid)
1237
1238                gid = codes_new_from_index(iid)
1239
1240            f_handle.close()
1241            g_handle.close()
1242            h_handle.close()
1243
1244        codes_index_release(iid)
1245
1246        if c.rrint:
1247            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1248
1249            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1250                                 cp_np, maxnum, index_keys, index_vals, c)
1251
1252        return
1253
1254    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
1255        '''Calculates and writes out the disaggregated precipitation fields.
1256
1257        Disaggregation is done in time and original times are written to the
1258        flux files, while the additional subgrid times are written to
1259        extra files output files. They are named like the original files with
1260        suffixes "_1" and "_2" for the first and second subgrid point.
1261
1262        Parameters
1263        ----------
1264        ni : int
1265            Amount of zonal grid points.
1266
1267        nj : int
1268            Amount of meridional grid points.
1269
1270        nt : int
1271            Number of time steps.
1272
1273        lsp_np : numpy array of float
1274            The large scale precipitation fields for each time step.
1275            Shape (ni * nj, nt).
1276
1277        cp_np : numpy array of float
1278            The convective precipitation fields for each time step.
1279            Shape (ni * nj, nt).
1280
1281        maxnum : int
1282            The maximum number of ensemble members. It is None
1283            if there are no or just one ensemble.
1284
1285        index_keys : dictionary
1286            List of parameter names which serves as index.
1287
1288        index_vals : list of list  of str
1289            Contains the values from the keys used for a distinct selection
1290            of grib messages in processing  the grib files.
1291            Content looks like e.g.:
1292            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1293            index_vals[1]: ('0', '1200', '1800', '600') ; time
1294            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1295
1296        c : ControlFile
1297            Contains all the parameters of CONTROL file and
1298            command line.
1299
1300        Return
1301        ------
1302
1303        '''
1304        import numpy as np
1305
1306        print('... disaggregation of precipitation with new method.')
1307
1308        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1309
1310        # initialize new numpy arrays for disaggregated fields
1311        if maxnum:
1312            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1313            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1314        else:
1315            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1316            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1317
1318        # do the disaggregation, but neglect the last value of the
1319        # original time series. This one corresponds for example to
1320        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1321        if maxnum:
1322            for inum in range(maxnum):
1323                for ix in range(ni*nj):
1324                    lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1]
1325                    cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1]
1326        else:
1327            for ix in range(ni*nj):
1328                lsp_new_np[0,ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1329                cp_new_np[0,ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1330
1331        # write to grib files (full/orig times to flux file and inbetween
1332        # times with step 1 and 2, respectively)
1333        print('... write disaggregated precipitation to files.')
1334
1335        if maxnum:
1336            # remember the index of the number values
1337            index_number = index_keys.index('number')
1338            # empty set to save unique ensemble numbers which were already processed
1339            ens_numbers = set()
1340            # index for the ensemble number
1341            inumb = 0
1342        else:
1343            inumb = 0
1344
1345        # index variable of disaggregated fields
1346        it = 0
1347
1348        # "product" genereates each possible combination between the
1349        # values of the index keys
1350        for prod in product(*index_vals):
1351            # e.g. prod = ('20170505', '0', '12')
1352            #             ( date     ,time, step)
1353            # or   prod = ('0'   , '20170505', '0', '12')
1354            #             (number, date      ,time, step)
1355
1356            cdate = prod[index_keys.index('date')]
1357            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1358            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1359
1360            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1361            date += timedelta(hours=int(cstep))
1362
1363            start_period, end_period = generate_retrieval_period_boundary(c)
1364            # skip all temporary times
1365            # which are outside the retrieval period
1366            if date < start_period or \
1367               date > end_period:
1368                continue
1369
1370            # the whole process has to be done for each seperate ensemble member
1371            # therefore, for each new ensemble member we delete old flux values
1372            # and start collecting flux data from the beginning time step
1373            if maxnum and prod[index_number] not in ens_numbers:
1374                ens_numbers.add(prod[index_number])
1375                inumb = int(prod[index_number])
1376                it = 0
1377
1378            # if necessary, add ensemble member number to filename suffix
1379            # otherwise, add empty string
1380            if maxnum:
1381                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1382            else:
1383                numbersuffix = ''
1384
1385            # per original time stamp: write original time step and
1386            # the two newly generated sub time steps
1387            if c.purefc:
1388                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1389            else:
1390                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1391
1392            # write original time step to flux file as usual
1393            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1394            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1395                              wherekeynames=['paramId'], wherekeyvalues=[142],
1396                              keynames=['perturbationNumber','date','time','stepRange','values'],
1397                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1398                                         date.hour*100, 0, lsp_new_np[inumb,:,it]],
1399                             )
1400            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1401                              wherekeynames=['paramId'], wherekeyvalues=[143],
1402                              keynames=['perturbationNumber','date','time','stepRange','values'],
1403                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1404                                         date.hour*100, 0, cp_new_np[inumb,:,it]]
1405                             )
1406
1407            # rr for first subgrid point is identified by step = 1
1408            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1409                              wherekeynames=['paramId'], wherekeyvalues=[142],
1410                              keynames=['perturbationNumber','date','time','stepRange','values'],
1411                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1412                                         date.hour*100, '1', lsp_new_np[inumb,:,it+1]]
1413                              )
1414            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1415                              wherekeynames=['paramId'], wherekeyvalues=[143],
1416                              keynames=['perturbationNumber','date','time','stepRange','values'],
1417                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1418                                         date.hour*100, '1', cp_new_np[inumb,:,it+1]]
1419                              )
1420
1421            # rr for second subgrid point is identified by step = 2
1422            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1423                              wherekeynames=['paramId'], wherekeyvalues=[142],
1424                              keynames=['perturbationNumber','date','time','stepRange','values'],
1425                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1426                                         date.hour*100, '2', lsp_new_np[inumb,:,it+2]]
1427                              )
1428            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1429                              wherekeynames=['paramId'], wherekeyvalues=[143],
1430                              keynames=['perturbationNumber','date','time','stepRange','values'],
1431                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1432                                         date.hour*100, '2', cp_new_np[inumb,:,it+2]]
1433                              )
1434
1435            it = it + 3 # jump to next original time step in rr fields
1436        return
1437
1438    def _create_rr_grib_dummy(self, ifile, inputdir):
1439        '''Creates a grib file with a dummy message for the two precipitation
1440        types lsp and cp each.
1441
1442        Parameters
1443        ----------
1444        ifile : str
1445            Filename of the input file to read the grib messages from.
1446
1447        inputdir : str, optional
1448            Path to the directory where the retrieved data is stored.
1449
1450        Return
1451        ------
1452
1453        '''
1454
1455        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1456
1457        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1458                      keyvalues=[142], filemode='w')
1459
1460        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1461                      keyvalues=[143], filemode='a')
1462
1463        return
1464
1465    def create(self, inputfiles, c):
1466        '''An index file will be created which depends on the combination
1467        of "date", "time" and "stepRange" values. This is used to iterate
1468        over all messages in each grib file which were passed through the
1469        parameter "inputfiles" to seperate specific parameters into fort.*
1470        files. Afterwards the FORTRAN program is called to convert
1471        the data fields all to the same grid and put them in one file
1472        per unique time step (combination of "date", "time" and
1473        "stepRange").
1474
1475        Note
1476        ----
1477        This method is based on the ECMWF example index.py
1478        https://software.ecmwf.int/wiki/display/GRIB/index.py
1479
1480        Parameters
1481        ----------
1482        inputfiles : UioFiles
1483            Contains a list of files.
1484
1485        c : ControlFile
1486            Contains all the parameters of CONTROL file and
1487            command line.
1488
1489        Return
1490        ------
1491
1492        '''
1493        from eccodes import (codes_index_select, codes_new_from_index, codes_get,
1494                             codes_get_values, codes_set_values, codes_set,
1495                             codes_write, codes_release, codes_new_from_index,
1496                             codes_index_release)
1497
1498        # generate start and end timestamp of the retrieval period
1499        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1500        start_period = start_period + timedelta(hours=int(c.step[0]))
1501        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1502        end_period = end_period + timedelta(hours=int(c.step[-1]))
1503
1504        if c.wrf:
1505            table128 = init128(_config.PATH_GRIBTABLE)
1506            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1507                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1508                                  table128)
1509
1510        # these numbers are indices for the temporary files "fort.xx"
1511        # which are used to seperate the grib fields to,
1512        # for the Fortran program input
1513        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1514        # 17: Q | 18: Q, SL, GG| 19: omega | 21: etadot | 22: clwc+ciwc
1515        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1516                 '17':None, '18':None, '19':None, '21':None, '22':None}
1517
1518        iid = None
1519        index_vals = None
1520
1521        # get the values of the keys which are used for distinct access
1522        # of grib messages via product
1523        if '/' in self.number:
1524            # more than one ensemble member is selected
1525            index_keys = ["number", "date", "time", "step"]
1526        else:
1527            index_keys = ["date", "time", "step"]
1528        iid, index_vals = self._mk_index_values(c.inputdir,
1529                                                inputfiles,
1530                                                index_keys)
1531        # index_vals looks like e.g.:
1532        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1533        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1534        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1535
1536        # "product" genereates each possible combination between the
1537        # values of the index keys
1538        for prod in product(*index_vals):
1539            # e.g. prod = ('20170505', '0', '12')
1540            #             (  date    ,time, step)
1541
1542            print('current product: ', prod)
1543
1544            for i in range(len(index_keys)):
1545                codes_index_select(iid, index_keys[i], prod[i])
1546
1547            # get first id from current product
1548            gid = codes_new_from_index(iid)
1549
1550            # if there is no data for this specific time combination / product
1551            # skip the rest of the for loop and start with next timestep/product
1552            if not gid:
1553                continue
1554#============================================================================================
1555            # remove old fort.* files and open new ones
1556            # they are just valid for a single product
1557            for k, f in fdict.items():
1558                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1559                silent_remove(fortfile)
1560                fdict[k] = open(fortfile, 'w')
1561#============================================================================================
1562            # create correct timestamp from the three time informations
1563            cdate = str(codes_get(gid, 'date'))
1564            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1565            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1566            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1567            timestamp += timedelta(hours=int(cstep))
1568            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1569
1570            # skip all temporary times
1571            # which are outside the retrieval period
1572            if timestamp < start_period or \
1573               timestamp > end_period:
1574                continue
1575
1576            # if the timestamp is out of basetime start/end date period,
1577            # skip this specific product
1578            if c.basetime is not None:
1579                time_delta = timedelta(hours=12-int(c.dtime))
1580                start_time = datetime.strptime(c.end_date + str(c.basetime),
1581                                                '%Y%m%d%H') - time_delta
1582                end_time = datetime.strptime(c.end_date + str(c.basetime),
1583                                             '%Y%m%d%H')
1584                if timestamp < start_time or timestamp > end_time:
1585                    continue
1586
1587            if c.wrf:
1588                if 'olddate' not in locals() or cdate != olddate:
1589                    fwrf = open(os.path.join(c.outputdir,
1590                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1591                    olddate = cdate[:]
1592#============================================================================================
1593            # savedfields remembers which fields were already used.
1594            savedfields = []
1595            # sum of cloud liquid and ice water content
1596            scwc = None
1597            while 1:
1598                if not gid:
1599                    break
1600                paramId = codes_get(gid, 'paramId')
1601                gridtype = codes_get(gid, 'gridType')
1602                levtype = codes_get(gid, 'typeOfLevel')
1603                if paramId == 77: # ETADOT
1604                    codes_write(gid, fdict['21'])
1605                elif paramId == 130: # T
1606                    codes_write(gid, fdict['11'])
1607                elif paramId == 131 or paramId == 132: # U, V wind component
1608                    codes_write(gid, fdict['10'])
1609                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1610                    codes_write(gid, fdict['17'])
1611                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1612                    codes_write(gid, fdict['18'])
1613                elif paramId == 135: # W
1614                    codes_write(gid, fdict['19'])
1615                elif paramId == 152: # LNSP
1616                    codes_write(gid, fdict['12'])
1617                elif paramId == 155 and gridtype == 'sh': # D
1618                    codes_write(gid, fdict['13'])
1619                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1620                    # sum cloud liquid water and ice
1621                    if scwc is None:
1622                        scwc = codes_get_values(gid)
1623                    else:
1624                        scwc += codes_get_values(gid)
1625                        codes_set_values(gid, scwc)
1626                        codes_set(gid, 'paramId', 201031)
1627                        codes_write(gid, fdict['22'])
1628                        scwc = None
1629                elif c.wrf and paramId in [129, 138, 155] and \
1630                      levtype == 'hybrid': # Z, VO, D
1631                    # do not do anything right now
1632                    # these are specific parameter for WRF
1633                    pass
1634                else:
1635                    if paramId not in savedfields:
1636                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1637                        # and all ADDPAR parameter
1638                        codes_write(gid, fdict['16'])
1639                        savedfields.append(paramId)
1640                    else:
1641                        print('duplicate ' + str(paramId) + ' not written')
1642
1643                try:
1644                    if c.wrf:
1645                        # model layer
1646                        if levtype == 'hybrid' and \
1647                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1648                            codes_write(gid, fwrf)
1649                        # sfc layer
1650                        elif paramId in wrfpars:
1651                            codes_write(gid, fwrf)
1652                except AttributeError:
1653                    pass
1654
1655                codes_release(gid)
1656                gid = codes_new_from_index(iid)
1657#============================================================================================
1658            for f in fdict.values():
1659                f.close()
1660#============================================================================================
1661            # call for Fortran program to convert e.g. reduced_gg grids to
1662            # regular_ll and calculate detadot/dp
1663            pwd = os.getcwd()
1664            os.chdir(c.inputdir)
1665            if os.stat('fort.21').st_size == 0 and c.eta:
1666                print('Parameter 77 (etadot) is missing, most likely it is '
1667                      'not available for this type or date / time\n')
1668                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1669                my_error('fort.21 is empty while parameter eta '
1670                         'is set to 1 in CONTROL file')
1671# ============================================================================================
1672            # write out all output to log file before starting fortran programm
1673            sys.stdout.flush()
1674
1675            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1676            execute_subprocess([os.path.join(c.exedir,
1677                                _config.FORTRAN_EXECUTABLE)],
1678                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1679
1680            os.chdir(pwd)
1681# ============================================================================================
1682            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1683            if c.purefc:
1684                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1685            else:
1686                suffix = cdate_hour[2:10]
1687
1688            # if necessary, add ensemble member number to filename suffix
1689            if 'number' in index_keys:
1690                index_number = index_keys.index('number')
1691                if len(index_vals[index_number]) > 1:
1692                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1693
1694            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1695            print("outputfile = " + fnout)
1696            # collect for final processing
1697            self.outputfilelist.append(os.path.basename(fnout))
1698            # # get additional precipitation subgrid data if available
1699            # if c.rrint:
1700                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1701                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
1702# ============================================================================================
1703            # create outputfile and copy all data from intermediate files
1704            # to the outputfile (final GRIB input files for FLEXPART)
1705            orolsm = os.path.basename(glob.glob(c.inputdir +
1706                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1707            fluxfile = 'flux' + cdate[0:2] + suffix
1708            if not c.cwc:
1709                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1710            else:
1711                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1712
1713            with open(fnout, 'wb') as fout:
1714                for f in flist:
1715                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1716                                       fout)
1717
1718            if c.omega:
1719                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1720                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1721                                            'rb'), fout)
1722# ============================================================================================
1723        if c.wrf:
1724            fwrf.close()
1725
1726        codes_index_release(iid)
1727
1728        return
1729
1730
1731    def calc_extra_elda(self, path, prefix):
1732        ''' Calculates extra ensemble members for ELDA - Stream.
1733
1734        This is a specific feature which doubles the number of ensemble members
1735        for the ELDA Stream.
1736
1737        Parameters
1738        ----------
1739        path : str
1740            Path to the output files.
1741
1742        prefix : str
1743            The prefix of the output filenames as defined in Control file.
1744
1745        Return
1746        ------
1747
1748        '''
1749        from eccodes import (codes_grib_new_from_file, codes_get_array,
1750                             codes_set_array, codes_release, codes_set_values,
1751                             codes_set, codes_write, codes_release)
1752
1753        # max number
1754        maxnum = int(self.number.split('/')[-1])
1755
1756        # get a list of all prepared output files with control forecast (CF)
1757        CF_filelist = UioFiles(path, prefix + '*.N000')
1758
1759        for cffile in CF_filelist.files:
1760            with open(cffile, 'rb') as f:
1761                cfvalues=[]
1762                while True:
1763                    fid = codes_grib_new_from_file(f)
1764                    if fid is None:
1765                        break
1766                    cfvalues.append(codes_get_array(fid, 'values'))
1767                    codes_release(fid)
1768
1769            filename = cffile.split('N000')[0]
1770            for i in range(1, maxnum + 1):
1771
1772                # read an ensemble member
1773                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1774                # create file for newly calculated ensemble member
1775                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1776                # number of message in grib file
1777                j = 0
1778                while True:
1779                    gid = codes_grib_new_from_file(g)
1780                    if gid is None:
1781                        break
1782                    values = codes_get_array(gid, 'values')
1783                    # generate a new ensemble member by subtracting
1784                    # 2 * ( current time step value - last time step value )
1785                    codes_set_array(gid, 'values',
1786                                    values-2*(values-cfvalues[j]))
1787                    codes_set(gid, 'number', i+maxnum)
1788                    codes_write(gid, h)
1789                    codes_release(gid)
1790                    j += 1
1791
1792                g.close()
1793                h.close()
1794                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1795                self.outputfilelist.append(
1796                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1797
1798        return
1799
1800
1801    def process_output(self, c):
1802        '''Postprocessing of FLEXPART input files.
1803
1804        The grib files are postprocessed depending on the selection in
1805        CONTROL file. The resulting files are moved to the output
1806        directory if its not equal to the input directory.
1807        The following modifications might be done if
1808        properly switched in CONTROL file:
1809        GRIB2 - Conversion to GRIB2
1810        ECTRANS - Transfer of files to gateway server
1811        ECSTORAGE - Storage at ECMWF server
1812
1813        Parameters
1814        ----------
1815        c : ControlFile
1816            Contains all the parameters of CONTROL file and
1817            command line.
1818
1819        Return
1820        ------
1821
1822        '''
1823
1824        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1825
1826        if _config.FLAG_ON_ECMWFSERVER:
1827            print('ecstorage: {}\n ecfsdir: {}\n'.
1828                  format(c.ecstorage, c.ecfsdir))
1829            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1830                  .format(c.ectrans, c.gateway, c.destination))
1831
1832        print('Output filelist: ')
1833        print(sorted(self.outputfilelist))
1834
1835        for ofile in self.outputfilelist:
1836            ofile = os.path.join(self.inputdir, ofile)
1837
1838            if c.format.lower() == 'grib2':
1839                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1840                                    'productDefinitionTemplateNumber=8',
1841                                    ofile, ofile + '_2'],
1842                                   error_msg='GRIB2 CONVERSION FAILED!')
1843
1844                execute_subprocess(['mv', ofile + '_2', ofile],
1845                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1846                                   'FILES FAILED!')
1847
1848            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1849                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1850                                    c.gateway, '-remote', c.destination,
1851                                    '-source', ofile],
1852                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1853
1854            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1855                execute_subprocess(['ecp', '-o', ofile,
1856                                    os.path.expandvars(c.ecfsdir)],
1857                                   error_msg='COPY OF FILES TO ECSTORAGE '
1858                                   'AREA FAILED!')
1859
1860            if c.outputdir != c.inputdir:
1861                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1862                                    c.outputdir],
1863                                   error_msg='RELOCATION OF OUTPUT FILES '
1864                                   'TO OUTPUTDIR FAILED!')
1865
1866        return
1867
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG