source: flex_extract.git/Source/Python/Classes/EcFlexpart.py @ 365c82a

dev
Last change on this file since 365c82a was 365c82a, checked in by Anne Tipka <anne.tipka@…>, 18 months ago

corrected date setting for operational retrieval for basetime 00

  • Property mode set to 100644
File size: 78.7 KB
Line 
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#
27#    February 2018 - Anne Philipp (University of Vienna):
28#        - applied PEP8 style guide
29#        - added documentation
30#        - removed function getFlexpartTime in class EcFlexpart
31#        - outsourced class ControlFile
32#        - outsourced class MarsRetrieval
33#        - changed class name from EIFlexpart to EcFlexpart
34#        - applied minor code changes (style)
35#        - removed "dead code" , e.g. retrieval of Q since it is not needed
36#        - removed "times" parameter from retrieve-method since it is not used
37#        - seperated function "retrieve" into smaller functions (less code
38#          duplication, easier testing)
39#
40# @License:
41#    (C) Copyright 2014-2020.
42#    Anne Philipp, Leopold Haimberger
43#
44#    SPDX-License-Identifier: CC-BY-4.0
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55#pylint: disable=unsubscriptable-object
56# this error is a bug
57#pylint: disable=ungrouped-imports
58# not necessary that we group the imports
59# ------------------------------------------------------------------------------
60# MODULES
61# ------------------------------------------------------------------------------
62from __future__ import print_function
63
64import os
65import sys
66import glob
67import shutil
68from datetime import datetime, timedelta
69
70# software specific classes and modules from flex_extract
71#pylint: disable=wrong-import-position
72sys.path.append('../')
73import _config
74from Classes.GribUtil import GribUtil
75from Mods.tools import (init128, to_param_id, silent_remove, product,
76                        my_error, get_informations, get_dimensions,
77                        execute_subprocess, to_param_id_with_tablenumber,
78                        generate_retrieval_period_boundary)
79from Classes.MarsRetrieval import MarsRetrieval
80from Classes.UioFiles import UioFiles
81import Mods.disaggregation as disaggregation
82#pylint: enable=wrong-import-position
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes:# and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = list(range(len(ftime)))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        if self.purefc:
324            # need to retrieve forecasts for step 000 in case of pure forecast
325            steps = '{}/to/{}/by/{}'.format(0, self.accmaxstep, self.dtime)
326        else:
327            steps = '{}/to/{}/by/{}'.format(self.dtime,
328                                            self.accmaxstep,
329                                            self.dtime)
330
331        self.types[str(self.acctype)] = {'times': str(self.acctime),
332                                         'steps': steps}
333        return
334
335    def _create_params(self, gauss, eta, omega, cwc, wrf):
336        '''Define the specific parameter settings for retrievment.
337
338        The different parameters need specific grid types and level types
339        for retrievement. We might get following combination of types
340        (depending on selection and availability):
341        (These are short cuts for the grib file names (leading sequence)
342        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
343        where:
344            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
345            ML = Model Level, SL = Surface Level
346
347        For each of this combination there is a list of parameter names,
348        the level type, the level list and the grid resolution.
349
350        There are different scenarios for data extraction from MARS:
351        1) Retrieval of etadot
352           eta=1, gauss=0, omega=0
353        2) Calculation of etadot from divergence
354           eta=0, gauss=1, omega=0
355        3) Calculation of etadot from omega (for makes sense for debugging)
356           eta=0, gauss=0, omega=1
357        4) Retrieval and Calculation of etadot (only for debugging)
358           eta=1, gauss=1, omega=0
359        5) Download also specific model and surface level data for FLEXPART-WRF
360
361        Parameters:
362        -----------
363        gauss : int
364            Gaussian grid is retrieved.
365
366        eta : int
367            Etadot parameter will be directly retrieved.
368
369        omega : int
370            The omega paramterwill be retrieved.
371
372        cwc : int
373            The cloud liquid and ice water content will be retrieved.
374
375        wrf : int
376            Additional model level and surface level data will be retrieved for
377            WRF/FLEXPART-WRF simulations.
378
379        Return
380        ------
381
382        '''
383        # SURFACE FIELDS
384        #-----------------------------------------------------------------------
385        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
386        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
387                                 'SFC', '1', self.grid]
388        if self.addpar:
389            self.params['OG__SL'][0] += self.addpar
390
391        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
392            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
393                                            'SFC', '1', self.grid]
394        else:
395            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
396                                            'SFC', '1', self.grid]
397
398        # MODEL LEVEL FIELDS
399        #-----------------------------------------------------------------------
400        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
401
402        if not gauss and eta:
403            self.params['OG__ML'][0] += '/U/V/ETADOT'
404        elif gauss and not eta:
405            self.params['GG__SL'] = ['Q', 'ML', '1',
406                                     '{}'.format((int(self.resol) + 1) // 2)]
407            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
408        elif not gauss and not eta:
409            self.params['OG__ML'][0] += '/U/V'
410        else:  # GAUSS and ETA
411            print('Warning: Collecting etadot and parameters for gaussian grid '
412                  'is a very costly parameter combination, '
413                  'use this combination only for debugging!')
414            self.params['GG__SL'] = ['Q', 'ML', '1',
415                                     '{}'.format((int(self.resol) + 1) // 2)]
416            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
417                                     '{}'.format((int(self.resol) + 1) // 2)]
418
419        if omega:
420            self.params['OG__ML'][0] += '/W'
421
422        if cwc:
423            self.params['OG__ML'][0] += '/CLWC/CIWC'
424
425        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
426        # ----------------------------------------------------------------------
427        if wrf:
428            # @WRF
429            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
430            #
431            # UNDER CONSTRUCTION !!!
432            #
433
434            print('WRF VERSION IS UNDER CONSTRUCTION!') # dummy argument
435
436            #self.params['OG__ML'][0] += '/Z/VO'
437            #if '/D' not in self.params['OG__ML'][0]:
438            #    self.params['OG__ML'][0] += '/D'
439
440            #wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
441            #           'SWVL1','SWVL2','SWVL3','SWVL4']
442            #for par in wrf_sfc:
443            #    if par not in self.params['OG__SL'][0]:
444            #        self.params['OG__SL'][0] += '/' + par
445
446        return
447
448
449    def _create_params_fluxes(self):
450        '''Define the parameter setting for flux data.
451
452        Flux data are accumulated fields in time and are stored on the
453        surface level. The leading short cut name for the grib files is:
454        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
455        acc for Accumulated Grid.
456        The params dictionary stores a list of parameter names, the level type,
457        the level list and the grid resolution.
458
459        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
460
461        Parameters:
462        -----------
463
464        Return
465        ------
466
467        '''
468        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
469                                    'SFC', '1', self.grid]
470        return
471
472
473    def _mk_targetname(self, ftype, param, date):
474        '''Creates the filename for the requested grib data to be stored in.
475        This name is passed as the "target" parameter in the request.
476
477        Parameters
478        ----------
479        ftype : str
480            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
481
482        param : str
483            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
484            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
485
486        date : str
487            The date period of the grib data to be stored in this file.
488
489        Return
490        ------
491        targetname : str
492            The target filename for the grib data.
493        '''
494        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
495                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
496
497        return targetname
498
499
500    def _start_retrievement(self, request, par_dict):
501        '''Creates the Mars Retrieval and prints or submits the request
502        depending on the status of the request variable.
503
504        Parameters
505        ----------
506        request : int
507            Selects the mode of retrieval.
508            0: Retrieves the data from ECMWF.
509            1: Prints the mars requests to an output file.
510            2: Retrieves the data and prints the mars request.
511
512        par_dict : dictionary
513            Contains all parameter which have to be set for creating the
514            Mars Retrievals. The parameter are:
515            marsclass, dataset, stream, type, levtype, levelist, resol,
516            gaussian, accuracy, grid, target, area, date, time, number,
517            step, expver, param
518
519        Return
520        ------
521
522        '''
523        # increase number of mars requests
524        self.mreq_count += 1
525
526        MR = MarsRetrieval(self.server,
527                           self.public,
528                           marsclass=par_dict['marsclass'],
529                           dataset=par_dict['dataset'],
530                           stream=par_dict['stream'],
531                           type=par_dict['type'],
532                           levtype=par_dict['levtype'],
533                           levelist=par_dict['levelist'],
534                           resol=par_dict['resol'],
535                           gaussian=par_dict['gaussian'],
536                           accuracy=par_dict['accuracy'],
537                           grid=par_dict['grid'],
538                           target=par_dict['target'],
539                           area=par_dict['area'],
540                           date=par_dict['date'],
541                           time=par_dict['time'],
542                           number=par_dict['number'],
543                           step=par_dict['step'],
544                           expver=par_dict['expver'],
545                           param=par_dict['param'])
546
547        if request == 0:
548            MR.display_info()
549            MR.data_retrieve()
550        elif request == 1:
551            MR.print_infodata_csv(self.inputdir, self.mreq_count)
552        elif request == 2:
553            MR.print_infodata_csv(self.inputdir, self.mreq_count)
554            MR.display_info()
555            MR.data_retrieve()
556        else:
557            print('Failure')
558
559        return
560
561
562    def _mk_index_values(self, inputdir, inputfiles, keys):
563        '''Creates an index file for a set of grib parameter keys.
564        The values from the index keys are returned in a list.
565
566        Parameters
567        ----------
568        keys : dictionary
569            List of parameter names which serves as index.
570
571        inputfiles : UioFiles
572            Contains a list of files.
573
574        Return
575        ------
576        iid : codes_index
577            This is a grib specific index structure to access
578            messages in a file.
579
580        index_vals : list of list  of str
581            Contains the values from the keys used for a distinct selection
582            of grib messages in processing  the grib files.
583            Content looks like e.g.:
584            index_vals[0]: ('20171106', '20171107', '20171108') ; date
585            index_vals[1]: ('0', '1200', '1800', '600') ; time
586            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
587        '''
588        from eccodes import codes_index_get
589
590        iid = None
591        index_keys = keys
592
593        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
594        silent_remove(indexfile)
595        grib = GribUtil(inputfiles.files)
596        # creates new index file
597        iid = grib.index(index_keys=index_keys, index_file=indexfile)
598
599        # read the values of index keys
600        index_vals = []
601        for key in index_keys:
602            key_vals = codes_index_get(iid, key)
603            # have to sort the key values for correct order,
604            # therefore convert to int first
605            key_vals = [int(k) for k in key_vals]
606            key_vals.sort()
607            key_vals = [str(k) for k in key_vals]
608            index_vals.append(key_vals)
609            # index_vals looks for example like:
610            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
611            # index_vals[1]: ('0', '1200') ; time
612            # index_vals[2]: (3', '6', '9', '12') ; stepRange
613
614        return iid, index_vals
615
616
617    def retrieve(self, server, dates, public, request, inputdir='.'):
618        '''Finalizing the retrieval information by setting final details
619        depending on grid type.
620        Prepares MARS retrievals per grid type and submits them.
621
622        Parameters
623        ----------
624        server : ECMWFService or ECMWFDataServer
625            The connection to the ECMWF server. This is different
626            for member state users which have full access and non
627            member state users which have only access to the public
628            data sets. The decision is made from command line argument
629            "public"; for public access its True (ECMWFDataServer)
630            for member state users its False (ECMWFService)
631
632        dates : str
633            Contains start and end date of the retrieval in the format
634            "YYYYMMDD/to/YYYYMMDD"
635
636        request : int
637            Selects the mode of retrieval.
638            0: Retrieves the data from ECMWF.
639            1: Prints the mars requests to an output file.
640            2: Retrieves the data and prints the mars request.
641
642        inputdir : str, optional
643            Path to the directory where the retrieved data is about
644            to be stored. The default is the current directory ('.').
645
646        Return
647        ------
648
649        '''
650        self.dates = dates
651        self.server = server
652        self.public = public
653        self.inputdir = inputdir
654        oro = False
655
656        # define times with datetime module
657        t12h = timedelta(hours=12)
658        t24h = timedelta(hours=24)
659
660        # dictionary which contains all parameter for the mars request,
661        # entries with a "None" will change in different requests and will
662        # therefore be set in each request seperately
663        retr_param_dict = {'marsclass':self.marsclass,
664                           'dataset':self.dataset,
665                           'stream':None,
666                           'type':None,
667                           'levtype':None,
668                           'levelist':None,
669                           'resol':self.resol,
670                           'gaussian':None,
671                           'accuracy':self.accuracy,
672                           'grid':None,
673                           'target':None,
674                           'area':None,
675                           'date':None,
676                           'time':None,
677                           'number':self.number,
678                           'step':None,
679                           'expver':self.expver,
680                           'param':None}
681
682        for ftype in sorted(self.types):
683            # ftype contains field types such as
684            #     [AN, FC, PF, CV]
685            for pk, pv in sorted(self.params.items()):
686                # pk contains one of these keys of params
687                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
688                #      OG_OROLSM_SL, OG_acc_SL]
689                # pv contains all of the items of the belonging key
690                #     [param, levtype, levelist, grid]
691                if isinstance(pv, str):
692                    continue
693                retr_param_dict['type'] = ftype
694                retr_param_dict['time'] = self.types[ftype]['times']
695                retr_param_dict['step'] = self.types[ftype]['steps']
696                retr_param_dict['date'] = self.dates
697                retr_param_dict['stream'] = self.stream
698                retr_param_dict['target'] = \
699                    self._mk_targetname(ftype,
700                                        pk,
701                                        retr_param_dict['date'].split('/')[0])
702                table128 = init128(_config.PATH_GRIBTABLE)
703                ids = to_param_id_with_tablenumber(pv[0], table128)
704                retr_param_dict['param'] = ids
705                retr_param_dict['levtype'] = pv[1]
706                retr_param_dict['levelist'] = pv[2]
707                retr_param_dict['grid'] = pv[3]
708                retr_param_dict['area'] = self.area
709                retr_param_dict['gaussian'] = self.gaussian
710
711                if pk == 'OG_OROLSM__SL' and not oro:
712                    oro = True
713                    # in CERA20C (class EP) there is no stream "OPER"!
714                    if self.marsclass.upper() != 'EP':
715                        retr_param_dict['stream'] = 'OPER'
716                    retr_param_dict['type'] = 'AN'
717                    retr_param_dict['time'] = '00'
718                    retr_param_dict['step'] = '000'
719                    retr_param_dict['date'] = self.dates.split('/')[0]
720                    retr_param_dict['target'] = self._mk_targetname('',
721                                                                    pk,
722                                                                    retr_param_dict['date'])
723                elif pk == 'OG_OROLSM__SL' and oro:
724                    continue
725                if pk == 'GG__SL' and pv[0] == 'Q':
726                    retr_param_dict['area'] = ""
727                    retr_param_dict['gaussian'] = 'reduced'
728                if ftype.upper() == 'FC' and \
729                    'acc' not in retr_param_dict['target']:
730                    if (int(retr_param_dict['time'][0]) +
731                        int(retr_param_dict['step'][0])) > 23:
732                        dates = retr_param_dict['date'].split('/')
733                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
734                        sdate = sdate - timedelta(days=1)
735                        retr_param_dict['date'] = '/'.join(
736                            [sdate.strftime("%Y%m%d")] +
737                            retr_param_dict['date'][1:])
738
739                        print('CHANGED FC start date to ' +
740                              sdate.strftime("%Y%m%d") +
741                              ' to accomodate TIME=' +
742                              retr_param_dict['time'][0] +
743                              ', STEP=' +
744                              retr_param_dict['time'][0])
745
746    # ------  on demand path  --------------------------------------------------
747                if self.basetime is None:
748                    # ******* start retrievement
749                    self._start_retrievement(request, retr_param_dict)
750    # ------  operational path  ------------------------------------------------
751                else:
752                    # check if mars job requests fields beyond basetime.
753                    # if yes eliminate those fields since they may not
754                    # be accessible with user's credentials
755
756                    enddate = retr_param_dict['date'].split('/')[-1]
757                    elimit = datetime.strptime(enddate + str(self.basetime),
758                                               '%Y%m%d%H')
759
760                    if self.basetime == 12:
761                        # --------------  flux data ----------------------------
762                        if 'acc' in pk:
763                            startdate = retr_param_dict['date'].split('/')[0]
764                            enddate = datetime.strftime(elimit - t24h, '%Y%m%d')
765                            retr_param_dict['date'] = '/'.join([startdate,
766                                                                'to',
767                                                                enddate])
768
769                            # ******* start retrievement
770                            self._start_retrievement(request, retr_param_dict)
771
772                            retr_param_dict['date'] = \
773                                datetime.strftime(elimit - t12h, '%Y%m%d')
774                            retr_param_dict['time'] = '00'
775                            retr_param_dict['target'] = \
776                                self._mk_targetname(ftype, pk,
777                                                    retr_param_dict['date'])
778
779                            # ******* start retrievement
780                            self._start_retrievement(request, retr_param_dict)
781
782                        # --------------  non flux data ------------------------
783                        else:
784                            # ******* start retrievement
785                            self._start_retrievement(request, retr_param_dict)
786
787                    elif self.basetime == 0:
788
789                        timesave = ''.join(retr_param_dict['time'])
790
791                        if all(['/' in retr_param_dict['time'],
792                                pk != 'OG_OROLSM__SL',
793                                'acc' not in pk]):
794                            times = retr_param_dict['time'].split('/')
795                            steps = retr_param_dict['step'].split('/')
796
797                            while int(times[0]) + int(steps[0]) <= 12:
798                                times = times[1:]
799                                if len(times) > 1:
800                                    retr_param_dict['time'] = '/'.join(times)
801                                else:
802                                    retr_param_dict['time'] = times[0]
803
804                        if all([pk != 'OG_OROLSM__SL',
805                                int(retr_param_dict['step'].split('/')[0]) == 0,
806                                int(timesave.split('/')[0]) == 0]):
807
808                            retr_param_dict['date'] = \
809                                datetime.strftime(elimit, '%Y%m%d')
810                            retr_param_dict['time'] = '00'
811                            retr_param_dict['step'] = '000'
812                            retr_param_dict['target'] = \
813                                self._mk_targetname(ftype, pk,
814                                                    retr_param_dict['date'])
815
816                        if ftype.upper() == 'FC' and \
817                                'acc' not in retr_param_dict['target']:
818
819                            retr_param_dict['date'] = \
820                                datetime.strftime(elimit - t24h, '%Y%m%d')
821
822                        # ******* start retrievement
823                        self._start_retrievement(request, retr_param_dict)
824                    else:
825                        raise ValueError('ERROR: Basetime has an invalid value '
826                                         '-> {}'.format(str(self.basetime)))
827
828        if request == 0 or request == 2:
829            print('MARS retrieve done ... ')
830        elif request == 1:
831            print('MARS request printed ...')
832
833        return
834
835
836    def write_namelist(self, c):
837        '''Creates a namelist file in the temporary directory and writes
838        the following values to it: maxl, maxb, mlevel,
839        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
840        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
841
842        Parameters
843        ----------
844        c : ControlFile
845            Contains all the parameters of CONTROL file and
846            command line.
847
848        filename : str
849                Name of the namelist file.
850
851        Return
852        ------
853
854        '''
855
856        from genshi.template.text import NewTextTemplate
857        from genshi.template import  TemplateLoader
858        from genshi.template.eval import  UndefinedError
859        import numpy as np
860
861        try:
862            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
863            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
864                                            cls=NewTextTemplate)
865
866            self.inputdir = c.inputdir
867            area = np.asarray(self.area.split('/')).astype(float)
868            grid = np.asarray(self.grid.split('/')).astype(float)
869
870            if area[1] > area[3]:
871                area[1] -= 360
872            maxl = int(round((area[3] - area[1]) / grid[1])) + 1
873            maxb = int(round((area[0] - area[2]) / grid[0])) + 1
874
875            stream = namelist_template.generate(
876                maxl=str(maxl),
877                maxb=str(maxb),
878                mlevel=str(self.level),
879                mlevelist=str(self.levelist),
880                mnauf=str(self.resol),
881                metapar='77',
882                rlo0=str(area[1]),
883                rlo1=str(area[3]),
884                rla0=str(area[2]),
885                rla1=str(area[0]),
886                momega=str(c.omega),
887                momegadiff=str(c.omegadiff),
888                mgauss=str(c.gauss),
889                msmooth=str(c.smooth),
890                meta=str(c.eta),
891                metadiff=str(c.etadiff),
892                mdpdeta=str(c.dpdeta)
893            )
894        except UndefinedError as e:
895            print('... ERROR ' + str(e))
896
897            sys.exit('\n... error occured while trying to generate namelist ' +
898                     _config.TEMPFILE_NAMELIST)
899        except OSError as e:
900            print('... ERROR CODE: ' + str(e.errno))
901            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
902
903            sys.exit('\n... error occured while trying to generate template ' +
904                     _config.TEMPFILE_NAMELIST)
905
906        try:
907            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
908
909            with open(namelistfile, 'w') as f:
910                f.write(stream.render('text'))
911        except OSError as e:
912            print('... ERROR CODE: ' + str(e.errno))
913            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
914
915            sys.exit('\n... error occured while trying to write ' +
916                     namelistfile)
917
918        return
919
920
921    def deacc_fluxes(self, inputfiles, c):
922        '''De-accumulate and disaggregate flux data.
923
924        Goes through all flux fields in ordered time and de-accumulate
925        the fields. Afterwards the fields are disaggregated in time.
926        Different versions of disaggregation is provided for rainfall
927        data (darain, modified linear) and the surface fluxes and
928        stress data (dapoly, cubic polynomial).
929
930        Parameters
931        ----------
932        inputfiles : UioFiles
933            Contains the list of files that contain flux data.
934
935        c : ControlFile
936            Contains all the parameters of CONTROL file and
937            command line.
938
939        Return
940        ------
941
942        '''
943        import numpy as np
944        from eccodes import (codes_index_select, codes_get,
945                             codes_get_values, codes_set_values, codes_set,
946                             codes_write, codes_release, codes_new_from_index,
947                             codes_index_release)
948
949        table128 = init128(_config.PATH_GRIBTABLE)
950        # get ids from the flux parameter names
951        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
952
953        iid = None
954        index_vals = None
955
956        # get the values of the keys which are used for distinct access
957        # of grib messages via product and save the maximum number of
958        # ensemble members if there is more than one
959        if '/' in self.number:
960            # more than one ensemble member is selected
961            index_keys = ["number", "date", "time", "step"]
962            # maximum ensemble number retrieved
963            # + 1 for the control run (ensemble number 0)
964            maxnum = int(self.number.split('/')[-1]) + 1
965            # remember the index of the number values
966            index_number = index_keys.index('number')
967            # empty set to save ensemble numbers which were already processed
968            ens_numbers = set()
969            # index for the ensemble number
970            inumb = 0
971        else:
972            index_keys = ["date", "time", "step"]
973            # maximum ensemble number
974            maxnum = None
975
976        # get sorted lists of the index values
977        # this is very important for disaggregating
978        # the flux data in correct order
979        iid, index_vals = self._mk_index_values(c.inputdir,
980                                                inputfiles,
981                                                index_keys)
982        # index_vals looks like e.g.:
983        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
984        # index_vals[1]: ('0', '600', '1200', '1800') ; time
985        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
986
987        if c.rrint:
988            # set start and end timestamps for retrieval period
989            if not c.purefc:
990                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
991                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
992            else:
993                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
994                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
995                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
996                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
997                end_date = end_date + timedelta(hours=c.maxstep)
998
999            # get necessary grid dimensions from grib files for storing the
1000            # precipitation fields
1001            info = get_informations(os.path.join(c.inputdir,
1002                                                 inputfiles.files[0]))
1003            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
1004                                  start_date, end_date)
1005
1006            # create empty numpy arrays
1007            if not maxnum:
1008                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
1009                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
1010            else:
1011                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
1012                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
1013
1014            # index counter for time line
1015            it_lsp = 0
1016            it_cp = 0
1017
1018            # store the order of date and step
1019            date_list = []
1020            step_list = []
1021
1022        # initialize dictionaries to store flux values per parameter
1023        orig_vals = {}
1024        deac_vals = {}
1025        for p in pars:
1026            orig_vals[p] = []
1027            deac_vals[p] = []
1028
1029        # "product" genereates each possible combination between the
1030        # values of the index keys
1031        for prod in product(*index_vals):
1032            # e.g. prod = ('20170505', '0', '12')
1033            #             ( date     ,time, step)
1034
1035            print('CURRENT PRODUCT: ', prod)
1036
1037            # the whole process has to be done for each seperate ensemble member
1038            # therefore, for each new ensemble member we delete old flux values
1039            # and start collecting flux data from the beginning time step
1040            if maxnum and prod[index_number] not in ens_numbers:
1041                ens_numbers.add(prod[index_number])
1042                inumb = len(ens_numbers) - 1
1043                # re-initialize dictionaries to store flux values per parameter
1044                # for the next ensemble member
1045                it_lsp = 0
1046                it_cp = 0
1047                orig_vals = {}
1048                deac_vals = {}
1049                for p in pars:
1050                    orig_vals[p] = []
1051                    deac_vals[p] = []
1052
1053            for i in range(len(index_keys)):
1054                codes_index_select(iid, index_keys[i], prod[i])
1055
1056            # get first id from current product
1057            gid = codes_new_from_index(iid)
1058
1059            # if there is no data for this specific time combination / product
1060            # skip the rest of the for loop and start with next timestep/product
1061            if not gid:
1062                continue
1063
1064            # create correct timestamp from the three time informations
1065            cdate = str(codes_get(gid, 'date'))
1066            time = codes_get(gid, 'time') // 100  # integer
1067            step = codes_get(gid, 'step') # integer
1068            ctime = '{:0>2}'.format(time)
1069
1070            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1071            t_dt = t_date + timedelta(hours=step)
1072            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1073            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1074            if c.basetime is not None:
1075                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1076                                              '%Y%m%d%H')
1077            else:
1078                t_enddate = t_date + timedelta(2*int(c.dtime))
1079
1080            # if necessary, add ensemble member number to filename suffix
1081            # otherwise, add empty string
1082            if maxnum:
1083                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1084            else:
1085                numbersuffix = ''
1086
1087            if c.purefc:
1088                fnout = os.path.join(c.inputdir, 'flux' +
1089                                     t_date.strftime('%Y%m%d.%H') +
1090                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1091                                     numbersuffix)
1092                gnout = os.path.join(c.inputdir, 'flux' +
1093                                     t_date.strftime('%Y%m%d.%H') +
1094                                     '.{:0>3}'.format(step-int(c.dtime)) +
1095                                     numbersuffix)
1096                hnout = os.path.join(c.inputdir, 'flux' +
1097                                     t_date.strftime('%Y%m%d.%H') +
1098                                     '.{:0>3}'.format(step) +
1099                                     numbersuffix)
1100            else:
1101                fnout = os.path.join(c.inputdir, 'flux' +
1102                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1103                gnout = os.path.join(c.inputdir, 'flux' +
1104                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1105                hnout = os.path.join(c.inputdir, 'flux' +
1106                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1107
1108            print("outputfile = " + fnout)
1109            f_handle = open(fnout, 'wb')
1110            h_handle = open(hnout, 'wb')
1111            g_handle = open(gnout, 'wb')
1112
1113            # read message for message and store relevant data fields, where
1114            # data keywords are stored in pars
1115            while True:
1116                if not gid:
1117                    break
1118                parId = codes_get(gid, 'paramId') # integer
1119                step = codes_get(gid, 'step') # integer
1120                time = codes_get(gid, 'time') # integer
1121                ni = codes_get(gid, 'Ni') # integer
1122                nj = codes_get(gid, 'Nj') # integer
1123                if parId not in orig_vals.keys():
1124                    # parameter is not a flux, find next one
1125                    continue
1126
1127                # define conversion factor
1128                if parId == 142 or parId == 143:
1129                    fak = 1. / 1000.
1130                else:
1131                    fak = 3600.
1132
1133                # get parameter values and reshape
1134                values = codes_get_values(gid)
1135                values = (np.reshape(values, (nj, ni))).flatten() / fak
1136
1137                # save the original and accumulated values
1138                orig_vals[parId].append(values[:])
1139
1140                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1141                    # no de-accumulation needed
1142                    deac_vals[parId].append(values[:] / int(c.dtime))
1143                else:
1144                    # do de-accumulation
1145                    deac_vals[parId].append(
1146                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1147                        int(c.dtime))
1148
1149                # store precipitation if new disaggregation method is selected
1150                # only the exact days are needed
1151                if c.rrint:
1152                    if start_date <= t_dt <= end_date:
1153                        if not c.purefc:
1154                            if t_dt not in date_list:
1155                                date_list.append(t_dt)
1156                                step_list = [0]
1157                        else:
1158                            if t_date not in date_list:
1159                                date_list.append(t_date)
1160                            if step not in step_list:
1161                                step_list.append(step)
1162                        # store precipitation values
1163                        if maxnum and parId == 142:
1164                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1165                            it_lsp += 1
1166                        elif not maxnum and parId == 142:
1167                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
1168                            it_lsp += 1
1169                        elif maxnum and parId == 143:
1170                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1171                            it_cp += 1
1172                        elif not maxnum and parId == 143:
1173                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
1174                            it_cp += 1
1175
1176                # information printout
1177                print(parId, time, step, len(values), values[0], np.std(values))
1178
1179                # length of deac_vals[parId] corresponds to the
1180                # number of time steps, max. 4 are needed for disaggegration
1181                # with the old and original method
1182                # run over all grib messages and perform
1183                # shifting in time
1184                if len(deac_vals[parId]) >= 3:
1185                    if len(deac_vals[parId]) > 3:
1186                        if not c.rrint and (parId == 142 or parId == 143):
1187                            values = disaggregation.darain(deac_vals[parId])
1188                        else:
1189                            values = disaggregation.dapoly(deac_vals[parId])
1190
1191                        if not (step == c.maxstep and c.purefc \
1192                                or t_dt == t_enddate):
1193                            # remove first time step in list to shift
1194                            # time line
1195                            orig_vals[parId].pop(0)
1196                            deac_vals[parId].pop(0)
1197                    else:
1198                        # if the third time step is read (per parId),
1199                        # write out the first one as a boundary value
1200                        if c.purefc:
1201                            values = deac_vals[parId][1]
1202                        else:
1203                            values = deac_vals[parId][0]
1204
1205                    if not (c.rrint and (parId == 142 or parId == 143)):
1206                        codes_set_values(gid, values)
1207
1208                        if c.purefc:
1209                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1210                        else:
1211                            codes_set(gid, 'stepRange', 0)
1212                            codes_set(gid, 'time', t_m2dt.hour*100)
1213                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1214
1215                        codes_write(gid, f_handle)
1216
1217                        # squeeze out information of last two steps
1218                        # contained in deac_vals[parId]
1219                        # Note that deac_vals[parId][0] has not been popped
1220                        # in this case
1221
1222                        if step == c.maxstep and c.purefc or \
1223                           t_dt == t_enddate:
1224                            # last step
1225                            if c.purefc:
1226                                values = deac_vals[parId][3]
1227                                codes_set_values(gid, values)
1228                                codes_set(gid, 'stepRange', step)
1229                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1230                                codes_write(gid, h_handle)
1231                            else:
1232                                values = deac_vals[parId][3]
1233                                codes_set_values(gid, values)
1234                                codes_set(gid, 'stepRange', 0)
1235                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1236                                codes_set(gid, 'time', truedatetime.hour * 100)
1237                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1238                                codes_write(gid, h_handle)
1239
1240                            if parId == 142 or parId == 143:
1241                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1242                            else:
1243                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1244
1245                            # step before last step
1246                            if c.purefc:
1247                                codes_set(gid, 'stepRange', step-int(c.dtime))
1248                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1249                                codes_set_values(gid, values)
1250                                codes_write(gid, g_handle)
1251                            else:
1252                                codes_set(gid, 'stepRange', 0)
1253                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1254                                codes_set(gid, 'time', truedatetime.hour * 100)
1255                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1256                                codes_set_values(gid, values)
1257                                codes_write(gid, g_handle)
1258
1259                codes_release(gid)
1260
1261                gid = codes_new_from_index(iid)
1262
1263            f_handle.close()
1264            g_handle.close()
1265            h_handle.close()
1266
1267        codes_index_release(iid)
1268
1269        if c.rrint:
1270            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1271
1272            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1273                                 cp_np, maxnum, index_keys, index_vals, c)
1274
1275        return
1276
1277    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
1278        '''Calculates and writes out the disaggregated precipitation fields.
1279
1280        Disaggregation is done in time and original times are written to the
1281        flux files, while the additional subgrid times are written to
1282        extra files output files. They are named like the original files with
1283        suffixes "_1" and "_2" for the first and second subgrid point.
1284
1285        Parameters
1286        ----------
1287        ni : int
1288            Amount of zonal grid points.
1289
1290        nj : int
1291            Amount of meridional grid points.
1292
1293        nt : int
1294            Number of time steps.
1295
1296        lsp_np : numpy array of float
1297            The large scale precipitation fields for each time step.
1298            Shape (ni * nj, nt).
1299
1300        cp_np : numpy array of float
1301            The convective precipitation fields for each time step.
1302            Shape (ni * nj, nt).
1303
1304        maxnum : int
1305            The maximum number of ensemble members. It is None
1306            if there are no or just one ensemble.
1307
1308        index_keys : dictionary
1309            List of parameter names which serves as index.
1310
1311        index_vals : list of list  of str
1312            Contains the values from the keys used for a distinct selection
1313            of grib messages in processing  the grib files.
1314            Content looks like e.g.:
1315            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1316            index_vals[1]: ('0', '1200', '1800', '600') ; time
1317            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1318
1319        c : ControlFile
1320            Contains all the parameters of CONTROL file and
1321            command line.
1322
1323        Return
1324        ------
1325
1326        '''
1327        import numpy as np
1328
1329        print('... disaggregation of precipitation with new method.')
1330
1331        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1332
1333        # initialize new numpy arrays for disaggregated fields
1334        if maxnum:
1335            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1336            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1337        else:
1338            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1339            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1340
1341        # do the disaggregation, but neglect the last value of the
1342        # original time series. This one corresponds for example to
1343        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1344        if maxnum:
1345            for inum in range(maxnum):
1346                for ix in range(ni*nj):
1347                    lsp_new_np[inum, ix, :] = disaggregation.IA3(lsp_np[inum, ix, :])[:-1]
1348                    cp_new_np[inum, ix, :] = disaggregation.IA3(cp_np[inum, ix, :])[:-1]
1349        else:
1350            for ix in range(ni*nj):
1351                lsp_new_np[0, ix, :] = disaggregation.IA3(lsp_np[ix, :])[:-1]
1352                cp_new_np[0, ix, :] = disaggregation.IA3(cp_np[ix, :])[:-1]
1353
1354        # write to grib files (full/orig times to flux file and inbetween
1355        # times with step 1 and 2, respectively)
1356        print('... write disaggregated precipitation to files.')
1357
1358        if maxnum:
1359            # remember the index of the number values
1360            index_number = index_keys.index('number')
1361            # empty set to save unique ensemble numbers which were already processed
1362            ens_numbers = set()
1363            # index for the ensemble number
1364            inumb = 0
1365        else:
1366            inumb = 0
1367
1368        # index variable of disaggregated fields
1369        it = 0
1370
1371        # "product" genereates each possible combination between the
1372        # values of the index keys
1373        for prod in product(*index_vals):
1374            # e.g. prod = ('20170505', '0', '12')
1375            #             ( date     ,time, step)
1376            # or   prod = ('0'   , '20170505', '0', '12')
1377            #             (number, date      ,time, step)
1378
1379            cdate = prod[index_keys.index('date')]
1380            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1381            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1382
1383            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1384            date += timedelta(hours=int(cstep))
1385
1386            start_period, end_period = generate_retrieval_period_boundary(c)
1387            # skip all temporary times
1388            # which are outside the retrieval period
1389            if date < start_period or \
1390               date > end_period:
1391                continue
1392
1393            # the whole process has to be done for each seperate ensemble member
1394            # therefore, for each new ensemble member we delete old flux values
1395            # and start collecting flux data from the beginning time step
1396            if maxnum and prod[index_number] not in ens_numbers:
1397                ens_numbers.add(prod[index_number])
1398                inumb = int(prod[index_number])
1399                it = 0
1400
1401            # if necessary, add ensemble member number to filename suffix
1402            # otherwise, add empty string
1403            if maxnum:
1404                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1405            else:
1406                numbersuffix = ''
1407
1408            # per original time stamp: write original time step and
1409            # the two newly generated sub time steps
1410            if c.purefc:
1411                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1412            else:
1413                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1414
1415            # write original time step to flux file as usual
1416            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1417            fluxfile.set_keys(tmpfile, filemode='ab',
1418                              wherekeynames=['paramId'], wherekeyvalues=[142],
1419                              keynames=['perturbationNumber', 'date', 'time',
1420                                        'stepRange', 'values'],
1421                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1422                                         date.hour*100, 0, lsp_new_np[inumb, :, it]]
1423                             )
1424            fluxfile.set_keys(tmpfile, filemode='ab',
1425                              wherekeynames=['paramId'], wherekeyvalues=[143],
1426                              keynames=['perturbationNumber', 'date', 'time',
1427                                        'stepRange', 'values'],
1428                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1429                                         date.hour*100, 0, cp_new_np[inumb, :, it]]
1430                             )
1431
1432            # rr for first subgrid point is identified by step = 1
1433            fluxfile.set_keys(tmpfile, filemode='ab',
1434                              wherekeynames=['paramId'], wherekeyvalues=[142],
1435                              keynames=['perturbationNumber', 'date', 'time',
1436                                        'stepRange', 'values'],
1437                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1438                                         date.hour*100, '1', lsp_new_np[inumb, :, it+1]]
1439                             )
1440            fluxfile.set_keys(tmpfile, filemode='ab',
1441                              wherekeynames=['paramId'], wherekeyvalues=[143],
1442                              keynames=['perturbationNumber', 'date', 'time',
1443                                        'stepRange', 'values'],
1444                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1445                                         date.hour*100, '1', cp_new_np[inumb, :, it+1]]
1446                             )
1447
1448            # rr for second subgrid point is identified by step = 2
1449            fluxfile.set_keys(tmpfile, filemode='ab',
1450                              wherekeynames=['paramId'], wherekeyvalues=[142],
1451                              keynames=['perturbationNumber', 'date', 'time',
1452                                        'stepRange', 'values'],
1453                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1454                                         date.hour*100, '2', lsp_new_np[inumb, :, it+2]]
1455                             )
1456            fluxfile.set_keys(tmpfile, filemode='ab',
1457                              wherekeynames=['paramId'], wherekeyvalues=[143],
1458                              keynames=['perturbationNumber', 'date', 'time',
1459                                        'stepRange', 'values'],
1460                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1461                                         date.hour*100, '2', cp_new_np[inumb, :, it+2]]
1462                             )
1463
1464            it = it + 3 # jump to next original time step in rr fields
1465        return
1466
1467    def _create_rr_grib_dummy(self, ifile, inputdir):
1468        '''Creates a grib file with a dummy message for the two precipitation
1469        types lsp and cp each.
1470
1471        Parameters
1472        ----------
1473        ifile : str
1474            Filename of the input file to read the grib messages from.
1475
1476        inputdir : str, optional
1477            Path to the directory where the retrieved data is stored.
1478
1479        Return
1480        ------
1481
1482        '''
1483
1484        gribfile = GribUtil(os.path.join(inputdir, 'rr_grib_dummy.grb'))
1485
1486        gribfile.copy_dummy_msg(ifile, keynames=['paramId','paramId'],
1487                                keyvalues=[142,143], filemode='wb')
1488
1489        return
1490
1491    def create(self, inputfiles, c):
1492        '''An index file will be created which depends on the combination
1493        of "date", "time" and "stepRange" values. This is used to iterate
1494        over all messages in each grib file which were passed through the
1495        parameter "inputfiles" to seperate specific parameters into fort.*
1496        files. Afterwards the FORTRAN program is called to convert
1497        the data fields all to the same grid and put them in one file
1498        per unique time step (combination of "date", "time" and
1499        "stepRange").
1500
1501        Note
1502        ----
1503        This method is based on the ECMWF example index.py
1504        https://software.ecmwf.int/wiki/display/GRIB/index.py
1505
1506        Parameters
1507        ----------
1508        inputfiles : UioFiles
1509            Contains a list of files.
1510
1511        c : ControlFile
1512            Contains all the parameters of CONTROL file and
1513            command line.
1514
1515        Return
1516        ------
1517
1518        '''
1519        from eccodes import (codes_index_select, codes_get,
1520                             codes_get_values, codes_set_values, codes_set,
1521                             codes_write, codes_release, codes_new_from_index,
1522                             codes_index_release)
1523
1524        # generate start and end timestamp of the retrieval period
1525        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1526        start_period = start_period + timedelta(hours=int(c.step[0]))
1527        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1528        end_period = end_period + timedelta(hours=int(c.step[-1]))
1529
1530        # @WRF
1531        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1532        #
1533        # UNDER CONSTRUCTION !!!
1534        #
1535        #if c.wrf:
1536        #    table128 = init128(_config.PATH_GRIBTABLE)
1537        #    wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1538        #                           stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1539        #                          table128)
1540
1541        # these numbers are indices for the temporary files "fort.xx"
1542        # which are used to seperate the grib fields to,
1543        # for the Fortran program input
1544        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1545        # 17: Q | 18: Q, SL, GG| 19: omega | 21: etadot | 22: clwc+ciwc
1546        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1547                 '17':None, '18':None, '19':None, '21':None, '22':None}
1548
1549        iid = None
1550        index_vals = None
1551
1552        # get the values of the keys which are used for distinct access
1553        # of grib messages via product
1554        if '/' in self.number:
1555            # more than one ensemble member is selected
1556            index_keys = ["number", "date", "time", "step"]
1557        else:
1558            index_keys = ["date", "time", "step"]
1559        iid, index_vals = self._mk_index_values(c.inputdir,
1560                                                inputfiles,
1561                                                index_keys)
1562        # index_vals looks like e.g.:
1563        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1564        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1565        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1566
1567        # "product" genereates each possible combination between the
1568        # values of the index keys
1569        for prod in product(*index_vals):
1570            # e.g. prod = ('20170505', '0', '12')
1571            #             (  date    ,time, step)
1572
1573            print('current product: ', prod)
1574
1575            for i in range(len(index_keys)):
1576                codes_index_select(iid, index_keys[i], prod[i])
1577
1578            # get first id from current product
1579            gid = codes_new_from_index(iid)
1580
1581            # if there is no data for this specific time combination / product
1582            # skip the rest of the for loop and start with next timestep/product
1583            if not gid:
1584                continue
1585#============================================================================================
1586            # remove old fort.* files and open new ones
1587            # they are just valid for a single product
1588            for k, f in fdict.items():
1589                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1590                silent_remove(fortfile)
1591                fdict[k] = open(fortfile, 'wb')
1592#============================================================================================
1593            # create correct timestamp from the three time informations
1594            cdate = str(codes_get(gid, 'date'))
1595            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1596            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1597            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1598            timestamp += timedelta(hours=int(cstep))
1599            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1600
1601            # if basetime is used, adapt start/end date period
1602            if c.basetime is not None:
1603                time_delta = timedelta(hours=12-int(c.dtime))
1604                start_period = datetime.strptime(c.end_date + str(c.basetime),
1605                                               '%Y%m%d%H') - time_delta
1606                end_period = datetime.strptime(c.end_date + str(c.basetime),
1607                                             '%Y%m%d%H')
1608
1609            # skip all temporary times
1610            # which are outside the retrieval period
1611            if timestamp < start_period or \
1612               timestamp > end_period:
1613                continue
1614
1615
1616            # @WRF
1617            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1618            #
1619            # UNDER CONSTRUCTION !!!
1620            #
1621            #if c.wrf:
1622            #    if 'olddate' not in locals() or cdate != olddate:
1623            #        fwrf = open(os.path.join(c.outputdir,
1624            #                    'WRF' + cdate + '.' + ctime + '.000.grb2'), 'wb')
1625            #        olddate = cdate[:]
1626#============================================================================================
1627            # savedfields remembers which fields were already used.
1628            savedfields = []
1629            # sum of cloud liquid and ice water content
1630            scwc = None
1631            while 1:
1632                if not gid:
1633                    break
1634                paramId = codes_get(gid, 'paramId')
1635                gridtype = codes_get(gid, 'gridType')
1636                if paramId == 77: # ETADOT
1637                    codes_write(gid, fdict['21'])
1638                elif paramId == 130: # T
1639                    codes_write(gid, fdict['11'])
1640                elif paramId == 131 or paramId == 132: # U, V wind component
1641                    codes_write(gid, fdict['10'])
1642                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1643                    codes_write(gid, fdict['17'])
1644                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1645                    codes_write(gid, fdict['18'])
1646                elif paramId == 135: # W
1647                    codes_write(gid, fdict['19'])
1648                elif paramId == 152: # LNSP
1649                    codes_write(gid, fdict['12'])
1650                elif paramId == 155 and gridtype == 'sh': # D
1651                    codes_write(gid, fdict['13'])
1652                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1653                    # sum cloud liquid water and ice
1654                    if scwc is None:
1655                        scwc = codes_get_values(gid)
1656                    else:
1657                        scwc += codes_get_values(gid)
1658                        codes_set_values(gid, scwc)
1659                        codes_set(gid, 'paramId', 201031)
1660                        codes_write(gid, fdict['22'])
1661                        scwc = None
1662                # @WRF
1663                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1664                #
1665                # UNDER CONSTRUCTION !!!
1666                #
1667                #elif c.wrf and paramId in [129, 138, 155] and \
1668                #      levtype == 'hybrid': # Z, VO, D
1669                #    # do not do anything right now
1670                #    # these are specific parameter for WRF
1671                #    pass
1672                else:
1673                    if paramId not in savedfields:
1674                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1675                        # and all ADDPAR parameter
1676                        codes_write(gid, fdict['16'])
1677                        savedfields.append(paramId)
1678                    else:
1679                        print('duplicate ' + str(paramId) + ' not written')
1680                # @WRF
1681                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1682                #
1683                # UNDER CONSTRUCTION !!!
1684                #
1685                #try:
1686                #    if c.wrf:
1687                #        # model layer
1688                #        if levtype == 'hybrid' and \
1689                #           paramId in [129, 130, 131, 132, 133, 138, 155]:
1690                #            codes_write(gid, fwrf)
1691                #        # sfc layer
1692                #        elif paramId in wrfpars:
1693                #            codes_write(gid, fwrf)
1694                #except AttributeError:
1695                #    pass
1696
1697                codes_release(gid)
1698                gid = codes_new_from_index(iid)
1699#============================================================================================
1700            for f in fdict.values():
1701                f.close()
1702#============================================================================================
1703            # call for Fortran program to convert e.g. reduced_gg grids to
1704            # regular_ll and calculate detadot/dp
1705            pwd = os.getcwd()
1706            os.chdir(c.inputdir)
1707            if os.stat('fort.21').st_size == 0 and c.eta:
1708                print('Parameter 77 (etadot) is missing, most likely it is '
1709                      'not available for this type or date / time\n')
1710                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1711                my_error('fort.21 is empty while parameter eta '
1712                         'is set to 1 in CONTROL file')
1713# ============================================================================================
1714            # write out all output to log file before starting fortran programm
1715            sys.stdout.flush()
1716
1717            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1718            execute_subprocess([os.path.join(c.exedir,
1719                                             _config.FORTRAN_EXECUTABLE)],
1720                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1721
1722            os.chdir(pwd)
1723# ============================================================================================
1724            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1725            # for CERA-20C we need all 4 digits for the year sinc 1900 - 2010
1726            if c.purefc:
1727                if c.marsclass == 'EP':
1728                    suffix = cdate[0:8] + '.' + ctime + '.' + cstep
1729                else:
1730                    suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1731            else:
1732                if c.marsclass == 'EP':
1733                    suffix = cdate_hour[0:10]
1734                else:
1735                    suffix = cdate_hour[2:10]
1736
1737            # if necessary, add ensemble member number to filename suffix
1738            if 'number' in index_keys:
1739                index_number = index_keys.index('number')
1740                if len(index_vals[index_number]) > 1:
1741                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1742
1743            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1744            print("outputfile = " + fnout)
1745            # collect for final processing
1746            self.outputfilelist.append(os.path.basename(fnout))
1747            # # get additional precipitation subgrid data if available
1748            # if c.rrint:
1749                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1750                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
1751# ============================================================================================
1752            # create outputfile and copy all data from intermediate files
1753            # to the outputfile (final GRIB input files for FLEXPART)
1754            orolsm = os.path.basename(glob.glob(c.inputdir +
1755                                                '/OG_OROLSM__SL.*.' +
1756                                                c.ppid +
1757                                                '*')[0])
1758            if c.marsclass == 'EP':
1759                fluxfile = 'flux' + suffix
1760            else:
1761                fluxfile = 'flux' + cdate[0:2] + suffix
1762            if not c.cwc:
1763                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1764            else:
1765                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1766
1767            with open(fnout, 'wb') as fout:
1768                for f in flist:
1769                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1770                                       fout)
1771
1772            if c.omega:
1773                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1774                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1775                                            'rb'), fout)
1776# ============================================================================================
1777
1778        # @WRF
1779        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1780        #
1781        # UNDER CONSTRUCTION !!!
1782        #
1783        #if c.wrf:
1784        #    fwrf.close()
1785
1786        codes_index_release(iid)
1787
1788        return
1789
1790
1791    def calc_extra_elda(self, path, prefix):
1792        ''' Calculates extra ensemble members for ELDA - Stream.
1793
1794        This is a specific feature which doubles the number of ensemble members
1795        for the ELDA Stream.
1796
1797        Parameters
1798        ----------
1799        path : str
1800            Path to the output files.
1801
1802        prefix : str
1803            The prefix of the output filenames as defined in Control file.
1804
1805        Return
1806        ------
1807
1808        '''
1809        from eccodes import (codes_grib_new_from_file, codes_get_array,
1810                             codes_set_array, codes_release,
1811                             codes_set, codes_write)
1812
1813        # max number
1814        maxnum = int(self.number.split('/')[-1])
1815
1816        # get a list of all prepared output files with control forecast (CF)
1817        cf_filelist = UioFiles(path, prefix + '*.N000')
1818        cf_filelist.files = sorted(cf_filelist.files)
1819
1820        for cffile in cf_filelist.files:
1821            with open(cffile, 'rb') as f:
1822                cfvalues = []
1823                while True:
1824                    fid = codes_grib_new_from_file(f)
1825                    if fid is None:
1826                        break
1827                    cfvalues.append(codes_get_array(fid, 'values'))
1828                    codes_release(fid)
1829
1830            filename = cffile.split('N000')[0]
1831            for i in range(1, maxnum + 1):
1832                # read an ensemble member
1833                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1834                # create file for newly calculated ensemble member
1835                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1836                # number of message in grib file
1837                j = 0
1838                while True:
1839                    gid = codes_grib_new_from_file(g)
1840                    if gid is None:
1841                        break
1842                    values = codes_get_array(gid, 'values')
1843                    # generate a new ensemble member by subtracting
1844                    # 2 * ( current time step value - last time step value )
1845                    codes_set_array(gid, 'values',
1846                                    values-2*(values-cfvalues[j]))
1847                    codes_set(gid, 'number', i+maxnum)
1848                    codes_write(gid, h)
1849                    codes_release(gid)
1850                    j += 1
1851
1852                g.close()
1853                h.close()
1854                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1855                self.outputfilelist.append(
1856                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1857
1858        return
1859
1860
1861    def process_output(self, c):
1862        '''Postprocessing of FLEXPART input files.
1863
1864        The grib files are postprocessed depending on the selection in
1865        CONTROL file. The resulting files are moved to the output
1866        directory if its not equal to the input directory.
1867        The following modifications might be done if
1868        properly switched in CONTROL file:
1869        GRIB2 - Conversion to GRIB2
1870        ECTRANS - Transfer of files to gateway server
1871        ECSTORAGE - Storage at ECMWF server
1872
1873        Parameters
1874        ----------
1875        c : ControlFile
1876            Contains all the parameters of CONTROL file and
1877            command line.
1878
1879        Return
1880        ------
1881
1882        '''
1883
1884        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1885
1886        print('\n\nGrib compression type:\n packingType: {}\n'.format(c.compression))
1887
1888        if _config.FLAG_ON_ECMWFSERVER:
1889            print('ecstorage: {}\n ecfsdir: {}\n'.
1890                  format(c.ecstorage, c.ecfsdir))
1891            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1892                  .format(c.ectrans, c.gateway, c.destination))
1893
1894        print('Output filelist: ')
1895        print(sorted(self.outputfilelist))
1896
1897        for ofile in self.outputfilelist:
1898            ofile = os.path.join(self.inputdir, ofile)
1899
1900            if c.format.lower() == 'grib2':
1901                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1902                                    'productDefinitionTemplateNumber=8',
1903                                    ofile, ofile + '_2'],
1904                                   error_msg='GRIB2 CONVERSION FAILED!')
1905
1906                execute_subprocess(['mv', ofile + '_2', ofile],
1907                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1908                                   'FILES FAILED!')
1909
1910            if c.compression.lower() != 'grid_simple':
1911                execute_subprocess(['grib_set', '-r', '-s',
1912                                    'packingType=' + c.compression,
1913                                    ofile, ofile + '_2'],
1914                                   error_msg='GRIB COMPRESSION FAILED!')
1915
1916                execute_subprocess(['mv', ofile + '_2', ofile],
1917                                   error_msg='RENAMING FOR NEW GRIB COMPRESSION '
1918                                             'FILES FAILED!')
1919
1920            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1921                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1922                                    c.gateway, '-remote', c.destination,
1923                                    '-source', ofile],
1924                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1925
1926            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1927                execute_subprocess(['ecp', '-o', ofile,
1928                                    os.path.expandvars(c.ecfsdir)],
1929                                   error_msg='COPY OF FILES TO ECSTORAGE '
1930                                   'AREA FAILED!')
1931
1932            if c.outputdir != c.inputdir:
1933                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1934                                    c.outputdir],
1935                                   error_msg='RELOCATION OF OUTPUT FILES '
1936                                   'TO OUTPUTDIR FAILED!')
1937
1938        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG