source: flex_extract.git/Source/Python/Classes/EcFlexpart.py @ 2c3c135

ctbtodev
Last change on this file since 2c3c135 was 2c3c135, checked in by Anne Philipp <anne.philipp@…>, 4 years ago

eliminated bug with eccodes file opening for python3 for rrint feature

  • Property mode set to 100644
File size: 78.0 KB
Line 
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#
27#    February 2018 - Anne Philipp (University of Vienna):
28#        - applied PEP8 style guide
29#        - added documentation
30#        - removed function getFlexpartTime in class EcFlexpart
31#        - outsourced class ControlFile
32#        - outsourced class MarsRetrieval
33#        - changed class name from EIFlexpart to EcFlexpart
34#        - applied minor code changes (style)
35#        - removed "dead code" , e.g. retrieval of Q since it is not needed
36#        - removed "times" parameter from retrieve-method since it is not used
37#        - seperated function "retrieve" into smaller functions (less code
38#          duplication, easier testing)
39#
40# @License:
41#    (C) Copyright 2014-2020.
42#    Anne Philipp, Leopold Haimberger
43#
44#    SPDX-License-Identifier: CC-BY-4.0
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55#pylint: disable=unsubscriptable-object
56# this error is a bug
57#pylint: disable=ungrouped-imports
58# not necessary that we group the imports
59# ------------------------------------------------------------------------------
60# MODULES
61# ------------------------------------------------------------------------------
62from __future__ import print_function
63
64import os
65import sys
66import glob
67import shutil
68from datetime import datetime, timedelta
69
70# software specific classes and modules from flex_extract
71#pylint: disable=wrong-import-position
72sys.path.append('../')
73import _config
74from Classes.GribUtil import GribUtil
75from Mods.tools import (init128, to_param_id, silent_remove, product,
76                        my_error, get_informations, get_dimensions,
77                        execute_subprocess, to_param_id_with_tablenumber,
78                        generate_retrieval_period_boundary)
79from Classes.MarsRetrieval import MarsRetrieval
80from Classes.UioFiles import UioFiles
81import Mods.disaggregation as disaggregation
82#pylint: enable=wrong-import-position
83# ------------------------------------------------------------------------------
84# CLASS
85# ------------------------------------------------------------------------------
86class EcFlexpart(object):
87    '''
88    Class to represent FLEXPART specific ECMWF data.
89
90    FLEXPART needs grib files in a specifc format. All necessary data fields
91    for one time step are stored in a single file. The class represents an
92    instance with all the parameter and settings necessary for retrieving
93    MARS data and modifing them so they are fitting FLEXPART needs. The class
94    is able to disaggregate the fluxes and convert grid types to the one needed
95    by FLEXPART, therefore using the FORTRAN program.
96
97    Attributes
98    ----------
99    mreq_count : int
100        Counter for the number of generated mars requests.
101
102    inputdir : str
103        Path to the directory where the retrieved data is stored.
104
105    dataset : str
106        For public datasets there is the specific naming and parameter
107        dataset which has to be used to characterize the type of
108        data.
109
110    basetime : int
111        The time for a half day retrieval. The 12 hours upfront are to be
112        retrieved.
113
114    dtime : str
115        Time step in hours.
116
117    acctype : str
118        The field type for the accumulated forecast fields.
119
120    acctime : str
121        The starting time from the accumulated forecasts.
122
123    accmaxstep : str
124        The maximum forecast step for the accumulated forecast fields.
125
126    marsclass : str
127        Characterisation of dataset.
128
129    stream : str
130        Identifies the forecasting system used to generate the data.
131
132    number : str
133        Selects the member in ensemble forecast run.
134
135    resol : str
136        Specifies the desired triangular truncation of retrieved data,
137        before carrying out any other selected post-processing.
138
139    accuracy : str
140        Specifies the number of bits per value to be used in the
141        generated GRIB coded fields.
142
143    addpar : str
144        List of additional parameters to be retrieved.
145
146    level : str
147        Specifies the maximum level.
148
149    expver : str
150        The version of the dataset.
151
152    levelist : str
153        Specifies the required levels.
154
155    glevelist : str
156        Specifies the required levels for gaussian grids.
157
158    gaussian : str
159        This parameter is deprecated and should no longer be used.
160        Specifies the desired type of Gaussian grid for the output.
161
162    grid : str
163        Specifies the output grid which can be either a Gaussian grid
164        or a Latitude/Longitude grid.
165
166    area : str
167        Specifies the desired sub-area of data to be extracted.
168
169    purefc : int
170        Switch for definition of pure forecast mode or not.
171
172    outputfilelist : list of str
173        The final list of FLEXPART ready input files.
174
175    types : dictionary
176        Determines the combination of type of fields, time and forecast step
177        to be retrieved.
178
179    params : dictionary
180        Collection of grid types and their corresponding parameters,
181        levels, level types and the grid definition.
182
183    server : ECMWFService or ECMWFDataServer
184        This is the connection to the ECMWF data servers.
185
186    public : int
187        Decides which Web API Server version is used.
188
189    dates : str
190        Contains start and end date of the retrieval in the format
191        "YYYYMMDD/to/YYYYMMDD"
192    '''
193
194    # --------------------------------------------------------------------------
195    # CLASS FUNCTIONS
196    # --------------------------------------------------------------------------
197    def __init__(self, c, fluxes=False):
198        '''Creates an object/instance of EcFlexpart with the associated
199        settings of its attributes for the retrieval.
200
201        Parameters:
202        -----------
203        c : ControlFile
204            Contains all the parameters of CONTROL file and
205            command line.
206
207        fluxes : boolean, optional
208            Decides if the flux parameter settings are stored or
209            the rest of the parameter list.
210            Default value is False.
211
212        Return
213        ------
214
215        '''
216        # set a counter for the number of generated mars requests
217        self.mreq_count = 0
218
219        self.inputdir = c.inputdir
220        self.dataset = c.dataset
221        self.basetime = c.basetime
222        self.dtime = c.dtime
223        self.acctype = c.acctype
224        self.acctime = c.acctime
225        self.accmaxstep = c.accmaxstep
226        self.marsclass = c.marsclass
227        self.stream = c.stream
228        self.number = c.number
229        self.resol = c.resol
230        self.accuracy = c.accuracy
231        self.addpar = c.addpar
232        self.level = c.level
233        self.expver = c.expver
234        self.levelist = c.levelist
235        self.glevelist = '1/to/' + c.level # in case of gaussian grid
236        self.gaussian = c.gaussian
237        self.grid = c.grid
238        self.area = c.area
239        self.purefc = c.purefc
240        self.outputfilelist = []
241
242        # Define the different types of field combinations (type, time, step)
243        self.types = {}
244        # Define the parameters and their level types, level list and
245        # grid resolution for the retrieval job
246        self.params = {}
247
248        if fluxes:
249            self._create_params_fluxes()
250        else:
251            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
252
253        if fluxes:# and not c.purefc:
254            self._create_field_types_fluxes()
255        else:
256            self._create_field_types(c.type, c.time, c.step)
257        return
258
259    def _create_field_types(self, ftype, ftime, fstep):
260        '''Create the combination of field type, time and forecast step.
261
262        Parameters:
263        -----------
264        ftype : list of str
265            List of field types.
266
267        ftime : list of str
268            The time in hours of the field.
269
270        fstep : str
271            Specifies the forecast time step from forecast base time.
272            Valid values are hours (HH) from forecast base time.
273
274        Return
275        ------
276
277        '''
278        i = 0
279        for ty, st, ti in zip(ftype, fstep, ftime):
280            btlist = list(range(len(ftime)))
281            if self.basetime == 12:
282                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
283            if self.basetime == 0:
284                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
285
286            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
287                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
288                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
289                 # (int(c.time[i]) in btlist or c.purefc):
290
291            if (i in btlist) or self.purefc:
292
293                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
294                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
295
296                    if ty not in self.types.keys():
297                        self.types[ty] = {'times': '', 'steps': ''}
298
299                    if ti not in self.types[ty]['times']:
300                        if self.types[ty]['times']:
301                            self.types[ty]['times'] += '/'
302                        self.types[ty]['times'] += ti
303
304                    if st not in self.types[ty]['steps']:
305                        if self.types[ty]['steps']:
306                            self.types[ty]['steps'] += '/'
307                        self.types[ty]['steps'] += st
308            i += 1
309
310        return
311
312    def _create_field_types_fluxes(self):
313        '''Create the combination of field type, time and forecast step
314        for the flux data.
315
316        Parameters:
317        -----------
318
319        Return
320        ------
321
322        '''
323        if self.purefc:
324            # need to retrieve forecasts for step 000 in case of pure forecast
325            steps = '{}/to/{}/by/{}'.format(0, self.accmaxstep, self.dtime)
326        else:
327            steps = '{}/to/{}/by/{}'.format(self.dtime,
328                                            self.accmaxstep,
329                                            self.dtime)
330
331        self.types[str(self.acctype)] = {'times': str(self.acctime),
332                                         'steps': steps}
333        return
334
335    def _create_params(self, gauss, eta, omega, cwc, wrf):
336        '''Define the specific parameter settings for retrievment.
337
338        The different parameters need specific grid types and level types
339        for retrievement. We might get following combination of types
340        (depending on selection and availability):
341        (These are short cuts for the grib file names (leading sequence)
342        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
343        where:
344            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
345            ML = Model Level, SL = Surface Level
346
347        For each of this combination there is a list of parameter names,
348        the level type, the level list and the grid resolution.
349
350        There are different scenarios for data extraction from MARS:
351        1) Retrieval of etadot
352           eta=1, gauss=0, omega=0
353        2) Calculation of etadot from divergence
354           eta=0, gauss=1, omega=0
355        3) Calculation of etadot from omega (for makes sense for debugging)
356           eta=0, gauss=0, omega=1
357        4) Retrieval and Calculation of etadot (only for debugging)
358           eta=1, gauss=1, omega=0
359        5) Download also specific model and surface level data for FLEXPART-WRF
360
361        Parameters:
362        -----------
363        gauss : int
364            Gaussian grid is retrieved.
365
366        eta : int
367            Etadot parameter will be directly retrieved.
368
369        omega : int
370            The omega paramterwill be retrieved.
371
372        cwc : int
373            The cloud liquid and ice water content will be retrieved.
374
375        wrf : int
376            Additional model level and surface level data will be retrieved for
377            WRF/FLEXPART-WRF simulations.
378
379        Return
380        ------
381
382        '''
383        # SURFACE FIELDS
384        #-----------------------------------------------------------------------
385        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
386        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
387                                 'SFC', '1', self.grid]
388        if self.addpar:
389            self.params['OG__SL'][0] += self.addpar
390
391        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
392            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
393                                            'SFC', '1', self.grid]
394        else:
395            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
396                                            'SFC', '1', self.grid]
397
398        # MODEL LEVEL FIELDS
399        #-----------------------------------------------------------------------
400        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
401
402        if not gauss and eta:
403            self.params['OG__ML'][0] += '/U/V/ETADOT'
404        elif gauss and not eta:
405            self.params['GG__SL'] = ['Q', 'ML', '1',
406                                     '{}'.format((int(self.resol) + 1) // 2)]
407            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
408        elif not gauss and not eta:
409            self.params['OG__ML'][0] += '/U/V'
410        else:  # GAUSS and ETA
411            print('Warning: Collecting etadot and parameters for gaussian grid '
412                  'is a very costly parameter combination, '
413                  'use this combination only for debugging!')
414            self.params['GG__SL'] = ['Q', 'ML', '1',
415                                     '{}'.format((int(self.resol) + 1) // 2)]
416            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
417                                     '{}'.format((int(self.resol) + 1) // 2)]
418
419        if omega:
420            self.params['OG__ML'][0] += '/W'
421
422        if cwc:
423            self.params['OG__ML'][0] += '/CLWC/CIWC'
424
425        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
426        # ----------------------------------------------------------------------
427        if wrf:
428            # @WRF
429            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
430            #
431            # UNDER CONSTRUCTION !!!
432            #
433
434            print('WRF VERSION IS UNDER CONSTRUCTION!') # dummy argument
435
436            #self.params['OG__ML'][0] += '/Z/VO'
437            #if '/D' not in self.params['OG__ML'][0]:
438            #    self.params['OG__ML'][0] += '/D'
439
440            #wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
441            #           'SWVL1','SWVL2','SWVL3','SWVL4']
442            #for par in wrf_sfc:
443            #    if par not in self.params['OG__SL'][0]:
444            #        self.params['OG__SL'][0] += '/' + par
445
446        return
447
448
449    def _create_params_fluxes(self):
450        '''Define the parameter setting for flux data.
451
452        Flux data are accumulated fields in time and are stored on the
453        surface level. The leading short cut name for the grib files is:
454        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
455        acc for Accumulated Grid.
456        The params dictionary stores a list of parameter names, the level type,
457        the level list and the grid resolution.
458
459        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
460
461        Parameters:
462        -----------
463
464        Return
465        ------
466
467        '''
468        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
469                                    'SFC', '1', self.grid]
470        return
471
472
473    def _mk_targetname(self, ftype, param, date):
474        '''Creates the filename for the requested grib data to be stored in.
475        This name is passed as the "target" parameter in the request.
476
477        Parameters
478        ----------
479        ftype : str
480            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
481
482        param : str
483            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
484            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
485
486        date : str
487            The date period of the grib data to be stored in this file.
488
489        Return
490        ------
491        targetname : str
492            The target filename for the grib data.
493        '''
494        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
495                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
496
497        return targetname
498
499
500    def _start_retrievement(self, request, par_dict):
501        '''Creates the Mars Retrieval and prints or submits the request
502        depending on the status of the request variable.
503
504        Parameters
505        ----------
506        request : int
507            Selects the mode of retrieval.
508            0: Retrieves the data from ECMWF.
509            1: Prints the mars requests to an output file.
510            2: Retrieves the data and prints the mars request.
511
512        par_dict : dictionary
513            Contains all parameter which have to be set for creating the
514            Mars Retrievals. The parameter are:
515            marsclass, dataset, stream, type, levtype, levelist, resol,
516            gaussian, accuracy, grid, target, area, date, time, number,
517            step, expver, param
518
519        Return
520        ------
521
522        '''
523        # increase number of mars requests
524        self.mreq_count += 1
525
526        MR = MarsRetrieval(self.server,
527                           self.public,
528                           marsclass=par_dict['marsclass'],
529                           dataset=par_dict['dataset'],
530                           stream=par_dict['stream'],
531                           type=par_dict['type'],
532                           levtype=par_dict['levtype'],
533                           levelist=par_dict['levelist'],
534                           resol=par_dict['resol'],
535                           gaussian=par_dict['gaussian'],
536                           accuracy=par_dict['accuracy'],
537                           grid=par_dict['grid'],
538                           target=par_dict['target'],
539                           area=par_dict['area'],
540                           date=par_dict['date'],
541                           time=par_dict['time'],
542                           number=par_dict['number'],
543                           step=par_dict['step'],
544                           expver=par_dict['expver'],
545                           param=par_dict['param'])
546
547        if request == 0:
548            MR.display_info()
549            MR.data_retrieve()
550        elif request == 1:
551            MR.print_infodata_csv(self.inputdir, self.mreq_count)
552        elif request == 2:
553            MR.print_infodata_csv(self.inputdir, self.mreq_count)
554            MR.display_info()
555            MR.data_retrieve()
556        else:
557            print('Failure')
558
559        return
560
561
562    def _mk_index_values(self, inputdir, inputfiles, keys):
563        '''Creates an index file for a set of grib parameter keys.
564        The values from the index keys are returned in a list.
565
566        Parameters
567        ----------
568        keys : dictionary
569            List of parameter names which serves as index.
570
571        inputfiles : UioFiles
572            Contains a list of files.
573
574        Return
575        ------
576        iid : codes_index
577            This is a grib specific index structure to access
578            messages in a file.
579
580        index_vals : list of list  of str
581            Contains the values from the keys used for a distinct selection
582            of grib messages in processing  the grib files.
583            Content looks like e.g.:
584            index_vals[0]: ('20171106', '20171107', '20171108') ; date
585            index_vals[1]: ('0', '1200', '1800', '600') ; time
586            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
587        '''
588        from eccodes import codes_index_get
589
590        iid = None
591        index_keys = keys
592
593        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
594        silent_remove(indexfile)
595        grib = GribUtil(inputfiles.files)
596        # creates new index file
597        iid = grib.index(index_keys=index_keys, index_file=indexfile)
598
599        # read the values of index keys
600        index_vals = []
601        for key in index_keys:
602            key_vals = codes_index_get(iid, key)
603            # have to sort the key values for correct order,
604            # therefore convert to int first
605            key_vals = [int(k) for k in key_vals]
606            key_vals.sort()
607            key_vals = [str(k) for k in key_vals]
608            index_vals.append(key_vals)
609            # index_vals looks for example like:
610            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
611            # index_vals[1]: ('0', '1200') ; time
612            # index_vals[2]: (3', '6', '9', '12') ; stepRange
613
614        return iid, index_vals
615
616
617    def retrieve(self, server, dates, public, request, inputdir='.'):
618        '''Finalizing the retrieval information by setting final details
619        depending on grid type.
620        Prepares MARS retrievals per grid type and submits them.
621
622        Parameters
623        ----------
624        server : ECMWFService or ECMWFDataServer
625            The connection to the ECMWF server. This is different
626            for member state users which have full access and non
627            member state users which have only access to the public
628            data sets. The decision is made from command line argument
629            "public"; for public access its True (ECMWFDataServer)
630            for member state users its False (ECMWFService)
631
632        dates : str
633            Contains start and end date of the retrieval in the format
634            "YYYYMMDD/to/YYYYMMDD"
635
636        request : int
637            Selects the mode of retrieval.
638            0: Retrieves the data from ECMWF.
639            1: Prints the mars requests to an output file.
640            2: Retrieves the data and prints the mars request.
641
642        inputdir : str, optional
643            Path to the directory where the retrieved data is about
644            to be stored. The default is the current directory ('.').
645
646        Return
647        ------
648
649        '''
650        self.dates = dates
651        self.server = server
652        self.public = public
653        self.inputdir = inputdir
654        oro = False
655
656        # define times with datetime module
657        t12h = timedelta(hours=12)
658        t24h = timedelta(hours=24)
659
660        # dictionary which contains all parameter for the mars request,
661        # entries with a "None" will change in different requests and will
662        # therefore be set in each request seperately
663        retr_param_dict = {'marsclass':self.marsclass,
664                           'dataset':self.dataset,
665                           'stream':None,
666                           'type':None,
667                           'levtype':None,
668                           'levelist':None,
669                           'resol':self.resol,
670                           'gaussian':None,
671                           'accuracy':self.accuracy,
672                           'grid':None,
673                           'target':None,
674                           'area':None,
675                           'date':None,
676                           'time':None,
677                           'number':self.number,
678                           'step':None,
679                           'expver':self.expver,
680                           'param':None}
681
682        for ftype in sorted(self.types):
683            # ftype contains field types such as
684            #     [AN, FC, PF, CV]
685            for pk, pv in sorted(self.params.items()):
686                # pk contains one of these keys of params
687                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
688                #      OG_OROLSM_SL, OG_acc_SL]
689                # pv contains all of the items of the belonging key
690                #     [param, levtype, levelist, grid]
691                if isinstance(pv, str):
692                    continue
693                retr_param_dict['type'] = ftype
694                retr_param_dict['time'] = self.types[ftype]['times']
695                retr_param_dict['step'] = self.types[ftype]['steps']
696                retr_param_dict['date'] = self.dates
697                retr_param_dict['stream'] = self.stream
698                retr_param_dict['target'] = \
699                    self._mk_targetname(ftype,
700                                        pk,
701                                        retr_param_dict['date'].split('/')[0])
702                table128 = init128(_config.PATH_GRIBTABLE)
703                ids = to_param_id_with_tablenumber(pv[0], table128)
704                retr_param_dict['param'] = ids
705                retr_param_dict['levtype'] = pv[1]
706                retr_param_dict['levelist'] = pv[2]
707                retr_param_dict['grid'] = pv[3]
708                retr_param_dict['area'] = self.area
709                retr_param_dict['gaussian'] = self.gaussian
710
711                if pk == 'OG_OROLSM__SL' and not oro:
712                    oro = True
713                    # in CERA20C (class EP) there is no stream "OPER"!
714                    if self.marsclass.upper() != 'EP':
715                        retr_param_dict['stream'] = 'OPER'
716                    retr_param_dict['type'] = 'AN'
717                    retr_param_dict['time'] = '00'
718                    retr_param_dict['step'] = '000'
719                    retr_param_dict['date'] = self.dates.split('/')[0]
720                    retr_param_dict['target'] = self._mk_targetname('',
721                                                                    pk,
722                                                                    retr_param_dict['date'])
723                elif pk == 'OG_OROLSM__SL' and oro:
724                    continue
725                if pk == 'GG__SL' and pv[0] == 'Q':
726                    retr_param_dict['area'] = ""
727                    retr_param_dict['gaussian'] = 'reduced'
728                if ftype.upper() == 'FC' and \
729                    'acc' not in retr_param_dict['target']:
730                    if (int(retr_param_dict['time'][0]) +
731                        int(retr_param_dict['step'][0])) > 23:
732                        dates = retr_param_dict['date'].split('/')
733                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
734                        sdate = sdate - timedelta(days=1)
735                        retr_param_dict['date'] = '/'.join(
736                            [sdate.strftime("%Y%m%d")] +
737                            retr_param_dict['date'][1:])
738
739                        print('CHANGED FC start date to ' +
740                              sdate.strftime("%Y%m%d") +
741                              ' to accomodate TIME=' +
742                              retr_param_dict['time'][0] +
743                              ', STEP=' +
744                              retr_param_dict['time'][0])
745
746    # ------  on demand path  --------------------------------------------------
747                if self.basetime is None:
748                    # ******* start retrievement
749                    self._start_retrievement(request, retr_param_dict)
750    # ------  operational path  ------------------------------------------------
751                else:
752                    # check if mars job requests fields beyond basetime.
753                    # if yes eliminate those fields since they may not
754                    # be accessible with user's credentials
755
756                    enddate = retr_param_dict['date'].split('/')[-1]
757                    elimit = datetime.strptime(enddate + str(self.basetime),
758                                               '%Y%m%d%H')
759
760                    if self.basetime == 12:
761                        # --------------  flux data ----------------------------
762                        if 'acc' in pk:
763                            startdate = retr_param_dict['date'].split('/')[0]
764                            enddate = datetime.strftime(elimit - t24h, '%Y%m%d')
765                            retr_param_dict['date'] = '/'.join([startdate,
766                                                                'to',
767                                                                enddate])
768
769                            # ******* start retrievement
770                            self._start_retrievement(request, retr_param_dict)
771
772                            retr_param_dict['date'] = \
773                                datetime.strftime(elimit - t12h, '%Y%m%d')
774                            retr_param_dict['time'] = '00'
775                            retr_param_dict['target'] = \
776                                self._mk_targetname(ftype, pk,
777                                                    retr_param_dict['date'])
778
779                            # ******* start retrievement
780                            self._start_retrievement(request, retr_param_dict)
781
782                        # --------------  non flux data ------------------------
783                        else:
784                            # ******* start retrievement
785                            self._start_retrievement(request, retr_param_dict)
786
787                    elif self.basetime == 0:
788#                        retr_param_dict['date'] = \
789#                            datetime.strftime(elimit - t24h, '%Y%m%d')
790
791                        timesave = ''.join(retr_param_dict['time'])
792
793                        if all(['/' in retr_param_dict['time'],
794                                pk != 'OG_OROLSM__SL',
795                                'acc' not in pk]):
796                            times = retr_param_dict['time'].split('/')
797                            steps = retr_param_dict['step'].split('/')
798
799                            while int(times[0]) + int(steps[0]) <= 12:
800                                times = times[1:]
801                                if len(times) > 1:
802                                    retr_param_dict['time'] = '/'.join(times)
803                                else:
804                                    retr_param_dict['time'] = times[0]
805
806                        if all([pk != 'OG_OROLSM__SL',
807                                int(retr_param_dict['step'].split('/')[0]) == 0,
808                                int(timesave.split('/')[0]) == 0]):
809
810                            retr_param_dict['date'] = \
811                                datetime.strftime(elimit, '%Y%m%d')
812                            retr_param_dict['time'] = '00'
813                            retr_param_dict['step'] = '000'
814                            retr_param_dict['target'] = \
815                                self._mk_targetname(ftype, pk,
816                                                    retr_param_dict['date'])
817
818                        # ******* start retrievement
819                        self._start_retrievement(request, retr_param_dict)
820                    else:
821                        raise ValueError('ERROR: Basetime has an invalid value '
822                                         '-> {}'.format(str(self.basetime)))
823
824        if request == 0 or request == 2:
825            print('MARS retrieve done ... ')
826        elif request == 1:
827            print('MARS request printed ...')
828
829        return
830
831
832    def write_namelist(self, c):
833        '''Creates a namelist file in the temporary directory and writes
834        the following values to it: maxl, maxb, mlevel,
835        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
836        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
837
838        Parameters
839        ----------
840        c : ControlFile
841            Contains all the parameters of CONTROL file and
842            command line.
843
844        filename : str
845                Name of the namelist file.
846
847        Return
848        ------
849
850        '''
851
852        from genshi.template.text import NewTextTemplate
853        from genshi.template import  TemplateLoader
854        from genshi.template.eval import  UndefinedError
855        import numpy as np
856
857        try:
858            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
859            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
860                                            cls=NewTextTemplate)
861
862            self.inputdir = c.inputdir
863            area = np.asarray(self.area.split('/')).astype(float)
864            grid = np.asarray(self.grid.split('/')).astype(float)
865
866            if area[1] > area[3]:
867                area[1] -= 360
868            maxl = int(round((area[3] - area[1]) / grid[1])) + 1
869            maxb = int(round((area[0] - area[2]) / grid[0])) + 1
870
871            stream = namelist_template.generate(
872                maxl=str(maxl),
873                maxb=str(maxb),
874                mlevel=str(self.level),
875                mlevelist=str(self.levelist),
876                mnauf=str(self.resol),
877                metapar='77',
878                rlo0=str(area[1]),
879                rlo1=str(area[3]),
880                rla0=str(area[2]),
881                rla1=str(area[0]),
882                momega=str(c.omega),
883                momegadiff=str(c.omegadiff),
884                mgauss=str(c.gauss),
885                msmooth=str(c.smooth),
886                meta=str(c.eta),
887                metadiff=str(c.etadiff),
888                mdpdeta=str(c.dpdeta)
889            )
890        except UndefinedError as e:
891            print('... ERROR ' + str(e))
892
893            sys.exit('\n... error occured while trying to generate namelist ' +
894                     _config.TEMPFILE_NAMELIST)
895        except OSError as e:
896            print('... ERROR CODE: ' + str(e.errno))
897            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
898
899            sys.exit('\n... error occured while trying to generate template ' +
900                     _config.TEMPFILE_NAMELIST)
901
902        try:
903            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
904
905            with open(namelistfile, 'w') as f:
906                f.write(stream.render('text'))
907        except OSError as e:
908            print('... ERROR CODE: ' + str(e.errno))
909            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
910
911            sys.exit('\n... error occured while trying to write ' +
912                     namelistfile)
913
914        return
915
916
917    def deacc_fluxes(self, inputfiles, c):
918        '''De-accumulate and disaggregate flux data.
919
920        Goes through all flux fields in ordered time and de-accumulate
921        the fields. Afterwards the fields are disaggregated in time.
922        Different versions of disaggregation is provided for rainfall
923        data (darain, modified linear) and the surface fluxes and
924        stress data (dapoly, cubic polynomial).
925
926        Parameters
927        ----------
928        inputfiles : UioFiles
929            Contains the list of files that contain flux data.
930
931        c : ControlFile
932            Contains all the parameters of CONTROL file and
933            command line.
934
935        Return
936        ------
937
938        '''
939        import numpy as np
940        from eccodes import (codes_index_select, codes_get,
941                             codes_get_values, codes_set_values, codes_set,
942                             codes_write, codes_release, codes_new_from_index,
943                             codes_index_release)
944
945        table128 = init128(_config.PATH_GRIBTABLE)
946        # get ids from the flux parameter names
947        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
948
949        iid = None
950        index_vals = None
951
952        # get the values of the keys which are used for distinct access
953        # of grib messages via product and save the maximum number of
954        # ensemble members if there is more than one
955        if '/' in self.number:
956            # more than one ensemble member is selected
957            index_keys = ["number", "date", "time", "step"]
958            # maximum ensemble number retrieved
959            # + 1 for the control run (ensemble number 0)
960            maxnum = int(self.number.split('/')[-1]) + 1
961            # remember the index of the number values
962            index_number = index_keys.index('number')
963            # empty set to save ensemble numbers which were already processed
964            ens_numbers = set()
965            # index for the ensemble number
966            inumb = 0
967        else:
968            index_keys = ["date", "time", "step"]
969            # maximum ensemble number
970            maxnum = None
971
972        # get sorted lists of the index values
973        # this is very important for disaggregating
974        # the flux data in correct order
975        iid, index_vals = self._mk_index_values(c.inputdir,
976                                                inputfiles,
977                                                index_keys)
978        # index_vals looks like e.g.:
979        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
980        # index_vals[1]: ('0', '600', '1200', '1800') ; time
981        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
982
983        if c.rrint:
984            # set start and end timestamps for retrieval period
985            if not c.purefc:
986                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
987                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
988            else:
989                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
990                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
991                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
992                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
993                end_date = end_date + timedelta(hours=c.maxstep)
994
995            # get necessary grid dimensions from grib files for storing the
996            # precipitation fields
997            info = get_informations(os.path.join(c.inputdir,
998                                                 inputfiles.files[0]))
999            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
1000                                  start_date, end_date)
1001
1002            # create empty numpy arrays
1003            if not maxnum:
1004                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
1005                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
1006            else:
1007                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
1008                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
1009
1010            # index counter for time line
1011            it_lsp = 0
1012            it_cp = 0
1013
1014            # store the order of date and step
1015            date_list = []
1016            step_list = []
1017
1018        # initialize dictionaries to store flux values per parameter
1019        orig_vals = {}
1020        deac_vals = {}
1021        for p in pars:
1022            orig_vals[p] = []
1023            deac_vals[p] = []
1024
1025        # "product" genereates each possible combination between the
1026        # values of the index keys
1027        for prod in product(*index_vals):
1028            # e.g. prod = ('20170505', '0', '12')
1029            #             ( date     ,time, step)
1030
1031            print('CURRENT PRODUCT: ', prod)
1032
1033            # the whole process has to be done for each seperate ensemble member
1034            # therefore, for each new ensemble member we delete old flux values
1035            # and start collecting flux data from the beginning time step
1036            if maxnum and prod[index_number] not in ens_numbers:
1037                ens_numbers.add(prod[index_number])
1038                inumb = len(ens_numbers) - 1
1039                # re-initialize dictionaries to store flux values per parameter
1040                # for the next ensemble member
1041                it_lsp = 0
1042                it_cp = 0
1043                orig_vals = {}
1044                deac_vals = {}
1045                for p in pars:
1046                    orig_vals[p] = []
1047                    deac_vals[p] = []
1048
1049            for i in range(len(index_keys)):
1050                codes_index_select(iid, index_keys[i], prod[i])
1051
1052            # get first id from current product
1053            gid = codes_new_from_index(iid)
1054
1055            # if there is no data for this specific time combination / product
1056            # skip the rest of the for loop and start with next timestep/product
1057            if not gid:
1058                continue
1059
1060            # create correct timestamp from the three time informations
1061            cdate = str(codes_get(gid, 'date'))
1062            time = codes_get(gid, 'time') // 100  # integer
1063            step = codes_get(gid, 'step') # integer
1064            ctime = '{:0>2}'.format(time)
1065
1066            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1067            t_dt = t_date + timedelta(hours=step)
1068            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1069            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
1070            if c.basetime is not None:
1071                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1072                                              '%Y%m%d%H')
1073            else:
1074                t_enddate = t_date + timedelta(2*int(c.dtime))
1075
1076            # if necessary, add ensemble member number to filename suffix
1077            # otherwise, add empty string
1078            if maxnum:
1079                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1080            else:
1081                numbersuffix = ''
1082
1083            if c.purefc:
1084                fnout = os.path.join(c.inputdir, 'flux' +
1085                                     t_date.strftime('%Y%m%d.%H') +
1086                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1087                                     numbersuffix)
1088                gnout = os.path.join(c.inputdir, 'flux' +
1089                                     t_date.strftime('%Y%m%d.%H') +
1090                                     '.{:0>3}'.format(step-int(c.dtime)) +
1091                                     numbersuffix)
1092                hnout = os.path.join(c.inputdir, 'flux' +
1093                                     t_date.strftime('%Y%m%d.%H') +
1094                                     '.{:0>3}'.format(step) +
1095                                     numbersuffix)
1096            else:
1097                fnout = os.path.join(c.inputdir, 'flux' +
1098                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
1099                gnout = os.path.join(c.inputdir, 'flux' +
1100                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
1101                hnout = os.path.join(c.inputdir, 'flux' +
1102                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
1103
1104            print("outputfile = " + fnout)
1105            f_handle = open(fnout, 'wb')
1106            h_handle = open(hnout, 'wb')
1107            g_handle = open(gnout, 'wb')
1108
1109            # read message for message and store relevant data fields, where
1110            # data keywords are stored in pars
1111            while True:
1112                if not gid:
1113                    break
1114                parId = codes_get(gid, 'paramId') # integer
1115                step = codes_get(gid, 'step') # integer
1116                time = codes_get(gid, 'time') # integer
1117                ni = codes_get(gid, 'Ni') # integer
1118                nj = codes_get(gid, 'Nj') # integer
1119                if parId not in orig_vals.keys():
1120                    # parameter is not a flux, find next one
1121                    continue
1122
1123                # define conversion factor
1124                if parId == 142 or parId == 143:
1125                    fak = 1. / 1000.
1126                else:
1127                    fak = 3600.
1128
1129                # get parameter values and reshape
1130                values = codes_get_values(gid)
1131                values = (np.reshape(values, (nj, ni))).flatten() / fak
1132
1133                # save the original and accumulated values
1134                orig_vals[parId].append(values[:])
1135
1136                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1137                    # no de-accumulation needed
1138                    deac_vals[parId].append(values[:] / int(c.dtime))
1139                else:
1140                    # do de-accumulation
1141                    deac_vals[parId].append(
1142                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1143                        int(c.dtime))
1144
1145                # store precipitation if new disaggregation method is selected
1146                # only the exact days are needed
1147                if c.rrint:
1148                    if start_date <= t_dt <= end_date:
1149                        if not c.purefc:
1150                            if t_dt not in date_list:
1151                                date_list.append(t_dt)
1152                                step_list = [0]
1153                        else:
1154                            if t_date not in date_list:
1155                                date_list.append(t_date)
1156                            if step not in step_list:
1157                                step_list.append(step)
1158                        # store precipitation values
1159                        if maxnum and parId == 142:
1160                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1161                            it_lsp += 1
1162                        elif not maxnum and parId == 142:
1163                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
1164                            it_lsp += 1
1165                        elif maxnum and parId == 143:
1166                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1167                            it_cp += 1
1168                        elif not maxnum and parId == 143:
1169                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
1170                            it_cp += 1
1171
1172                # information printout
1173                print(parId, time, step, len(values), values[0], np.std(values))
1174
1175                # length of deac_vals[parId] corresponds to the
1176                # number of time steps, max. 4 are needed for disaggegration
1177                # with the old and original method
1178                # run over all grib messages and perform
1179                # shifting in time
1180                if len(deac_vals[parId]) >= 3:
1181                    if len(deac_vals[parId]) > 3:
1182                        if not c.rrint and (parId == 142 or parId == 143):
1183                            values = disaggregation.darain(deac_vals[parId])
1184                        else:
1185                            values = disaggregation.dapoly(deac_vals[parId])
1186
1187                        if not (step == c.maxstep and c.purefc \
1188                                or t_dt == t_enddate):
1189                            # remove first time step in list to shift
1190                            # time line
1191                            orig_vals[parId].pop(0)
1192                            deac_vals[parId].pop(0)
1193                    else:
1194                        # if the third time step is read (per parId),
1195                        # write out the first one as a boundary value
1196                        if c.purefc:
1197                            values = deac_vals[parId][1]
1198                        else:
1199                            values = deac_vals[parId][0]
1200
1201                    if not (c.rrint and (parId == 142 or parId == 143)):
1202                        codes_set_values(gid, values)
1203
1204                        if c.purefc:
1205                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1206                        else:
1207                            codes_set(gid, 'stepRange', 0)
1208                            codes_set(gid, 'time', t_m2dt.hour*100)
1209                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1210
1211                        codes_write(gid, f_handle)
1212
1213                        # squeeze out information of last two steps
1214                        # contained in deac_vals[parId]
1215                        # Note that deac_vals[parId][0] has not been popped
1216                        # in this case
1217
1218                        if step == c.maxstep and c.purefc or \
1219                           t_dt == t_enddate:
1220                            # last step
1221                            if c.purefc:
1222                                values = deac_vals[parId][3]
1223                                codes_set_values(gid, values)
1224                                codes_set(gid, 'stepRange', step)
1225                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1226                                codes_write(gid, h_handle)
1227                            else:
1228                                values = deac_vals[parId][3]
1229                                codes_set_values(gid, values)
1230                                codes_set(gid, 'stepRange', 0)
1231                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1232                                codes_set(gid, 'time', truedatetime.hour * 100)
1233                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1234                                codes_write(gid, h_handle)
1235
1236                            if parId == 142 or parId == 143:
1237                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1238                            else:
1239                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1240
1241                            # step before last step
1242                            if c.purefc:
1243                                codes_set(gid, 'stepRange', step-int(c.dtime))
1244                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1245                                codes_set_values(gid, values)
1246                                codes_write(gid, g_handle)
1247                            else:
1248                                codes_set(gid, 'stepRange', 0)
1249                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1250                                codes_set(gid, 'time', truedatetime.hour * 100)
1251                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1252                                codes_set_values(gid, values)
1253                                codes_write(gid, g_handle)
1254
1255                codes_release(gid)
1256
1257                gid = codes_new_from_index(iid)
1258
1259            f_handle.close()
1260            g_handle.close()
1261            h_handle.close()
1262
1263        codes_index_release(iid)
1264
1265        if c.rrint:
1266            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1267
1268            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1269                                 cp_np, maxnum, index_keys, index_vals, c)
1270
1271        return
1272
1273    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
1274        '''Calculates and writes out the disaggregated precipitation fields.
1275
1276        Disaggregation is done in time and original times are written to the
1277        flux files, while the additional subgrid times are written to
1278        extra files output files. They are named like the original files with
1279        suffixes "_1" and "_2" for the first and second subgrid point.
1280
1281        Parameters
1282        ----------
1283        ni : int
1284            Amount of zonal grid points.
1285
1286        nj : int
1287            Amount of meridional grid points.
1288
1289        nt : int
1290            Number of time steps.
1291
1292        lsp_np : numpy array of float
1293            The large scale precipitation fields for each time step.
1294            Shape (ni * nj, nt).
1295
1296        cp_np : numpy array of float
1297            The convective precipitation fields for each time step.
1298            Shape (ni * nj, nt).
1299
1300        maxnum : int
1301            The maximum number of ensemble members. It is None
1302            if there are no or just one ensemble.
1303
1304        index_keys : dictionary
1305            List of parameter names which serves as index.
1306
1307        index_vals : list of list  of str
1308            Contains the values from the keys used for a distinct selection
1309            of grib messages in processing  the grib files.
1310            Content looks like e.g.:
1311            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1312            index_vals[1]: ('0', '1200', '1800', '600') ; time
1313            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1314
1315        c : ControlFile
1316            Contains all the parameters of CONTROL file and
1317            command line.
1318
1319        Return
1320        ------
1321
1322        '''
1323        import numpy as np
1324
1325        print('... disaggregation of precipitation with new method.')
1326
1327        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1328
1329        # initialize new numpy arrays for disaggregated fields
1330        if maxnum:
1331            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1332            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1333        else:
1334            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1335            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1336
1337        # do the disaggregation, but neglect the last value of the
1338        # original time series. This one corresponds for example to
1339        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1340        if maxnum:
1341            for inum in range(maxnum):
1342                for ix in range(ni*nj):
1343                    lsp_new_np[inum, ix, :] = disaggregation.IA3(lsp_np[inum, ix, :])[:-1]
1344                    cp_new_np[inum, ix, :] = disaggregation.IA3(cp_np[inum, ix, :])[:-1]
1345        else:
1346            for ix in range(ni*nj):
1347                lsp_new_np[0, ix, :] = disaggregation.IA3(lsp_np[ix, :])[:-1]
1348                cp_new_np[0, ix, :] = disaggregation.IA3(cp_np[ix, :])[:-1]
1349
1350        # write to grib files (full/orig times to flux file and inbetween
1351        # times with step 1 and 2, respectively)
1352        print('... write disaggregated precipitation to files.')
1353
1354        if maxnum:
1355            # remember the index of the number values
1356            index_number = index_keys.index('number')
1357            # empty set to save unique ensemble numbers which were already processed
1358            ens_numbers = set()
1359            # index for the ensemble number
1360            inumb = 0
1361        else:
1362            inumb = 0
1363
1364        # index variable of disaggregated fields
1365        it = 0
1366
1367        # "product" genereates each possible combination between the
1368        # values of the index keys
1369        for prod in product(*index_vals):
1370            # e.g. prod = ('20170505', '0', '12')
1371            #             ( date     ,time, step)
1372            # or   prod = ('0'   , '20170505', '0', '12')
1373            #             (number, date      ,time, step)
1374
1375            cdate = prod[index_keys.index('date')]
1376            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1377            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1378
1379            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1380            date += timedelta(hours=int(cstep))
1381
1382            start_period, end_period = generate_retrieval_period_boundary(c)
1383            # skip all temporary times
1384            # which are outside the retrieval period
1385            if date < start_period or \
1386               date > end_period:
1387                continue
1388
1389            # the whole process has to be done for each seperate ensemble member
1390            # therefore, for each new ensemble member we delete old flux values
1391            # and start collecting flux data from the beginning time step
1392            if maxnum and prod[index_number] not in ens_numbers:
1393                ens_numbers.add(prod[index_number])
1394                inumb = int(prod[index_number])
1395                it = 0
1396
1397            # if necessary, add ensemble member number to filename suffix
1398            # otherwise, add empty string
1399            if maxnum:
1400                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1401            else:
1402                numbersuffix = ''
1403
1404            # per original time stamp: write original time step and
1405            # the two newly generated sub time steps
1406            if c.purefc:
1407                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1408            else:
1409                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1410
1411            # write original time step to flux file as usual
1412            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1413            fluxfile.set_keys(tmpfile, filemode='ab',
1414                              wherekeynames=['paramId'], wherekeyvalues=[142],
1415                              keynames=['perturbationNumber', 'date', 'time',
1416                                        'stepRange', 'values'],
1417                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1418                                         date.hour*100, 0, lsp_new_np[inumb, :, it]]
1419                             )
1420            fluxfile.set_keys(tmpfile, filemode='ab',
1421                              wherekeynames=['paramId'], wherekeyvalues=[143],
1422                              keynames=['perturbationNumber', 'date', 'time',
1423                                        'stepRange', 'values'],
1424                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1425                                         date.hour*100, 0, cp_new_np[inumb, :, it]]
1426                             )
1427
1428            # rr for first subgrid point is identified by step = 1
1429            fluxfile.set_keys(tmpfile, filemode='ab',
1430                              wherekeynames=['paramId'], wherekeyvalues=[142],
1431                              keynames=['perturbationNumber', 'date', 'time',
1432                                        'stepRange', 'values'],
1433                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1434                                         date.hour*100, '1', lsp_new_np[inumb, :, it+1]]
1435                             )
1436            fluxfile.set_keys(tmpfile, filemode='ab',
1437                              wherekeynames=['paramId'], wherekeyvalues=[143],
1438                              keynames=['perturbationNumber', 'date', 'time',
1439                                        'stepRange', 'values'],
1440                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1441                                         date.hour*100, '1', cp_new_np[inumb, :, it+1]]
1442                             )
1443
1444            # rr for second subgrid point is identified by step = 2
1445            fluxfile.set_keys(tmpfile, filemode='ab',
1446                              wherekeynames=['paramId'], wherekeyvalues=[142],
1447                              keynames=['perturbationNumber', 'date', 'time',
1448                                        'stepRange', 'values'],
1449                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1450                                         date.hour*100, '2', lsp_new_np[inumb, :, it+2]]
1451                             )
1452            fluxfile.set_keys(tmpfile, filemode='ab',
1453                              wherekeynames=['paramId'], wherekeyvalues=[143],
1454                              keynames=['perturbationNumber', 'date', 'time',
1455                                        'stepRange', 'values'],
1456                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1457                                         date.hour*100, '2', cp_new_np[inumb, :, it+2]]
1458                             )
1459
1460            it = it + 3 # jump to next original time step in rr fields
1461        return
1462
1463    def _create_rr_grib_dummy(self, ifile, inputdir):
1464        '''Creates a grib file with a dummy message for the two precipitation
1465        types lsp and cp each.
1466
1467        Parameters
1468        ----------
1469        ifile : str
1470            Filename of the input file to read the grib messages from.
1471
1472        inputdir : str, optional
1473            Path to the directory where the retrieved data is stored.
1474
1475        Return
1476        ------
1477
1478        '''
1479
1480        gribfile = GribUtil(os.path.join(inputdir, 'rr_grib_dummy.grb'))
1481       
1482        gribfile.copy_dummy_msg(ifile, keynames=['paramId','paramId'],
1483                                keyvalues=[142,143], filemode='wb')       
1484
1485        return
1486
1487    def create(self, inputfiles, c):
1488        '''An index file will be created which depends on the combination
1489        of "date", "time" and "stepRange" values. This is used to iterate
1490        over all messages in each grib file which were passed through the
1491        parameter "inputfiles" to seperate specific parameters into fort.*
1492        files. Afterwards the FORTRAN program is called to convert
1493        the data fields all to the same grid and put them in one file
1494        per unique time step (combination of "date", "time" and
1495        "stepRange").
1496
1497        Note
1498        ----
1499        This method is based on the ECMWF example index.py
1500        https://software.ecmwf.int/wiki/display/GRIB/index.py
1501
1502        Parameters
1503        ----------
1504        inputfiles : UioFiles
1505            Contains a list of files.
1506
1507        c : ControlFile
1508            Contains all the parameters of CONTROL file and
1509            command line.
1510
1511        Return
1512        ------
1513
1514        '''
1515        from eccodes import (codes_index_select, codes_get,
1516                             codes_get_values, codes_set_values, codes_set,
1517                             codes_write, codes_release, codes_new_from_index,
1518                             codes_index_release)
1519
1520        # generate start and end timestamp of the retrieval period
1521        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1522        start_period = start_period + timedelta(hours=int(c.step[0]))
1523        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1524        end_period = end_period + timedelta(hours=int(c.step[-1]))
1525
1526        # @WRF
1527        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1528        #
1529        # UNDER CONSTRUCTION !!!
1530        #
1531        #if c.wrf:
1532        #    table128 = init128(_config.PATH_GRIBTABLE)
1533        #    wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1534        #                           stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1535        #                          table128)
1536
1537        # these numbers are indices for the temporary files "fort.xx"
1538        # which are used to seperate the grib fields to,
1539        # for the Fortran program input
1540        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1541        # 17: Q | 18: Q, SL, GG| 19: omega | 21: etadot | 22: clwc+ciwc
1542        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1543                 '17':None, '18':None, '19':None, '21':None, '22':None}
1544
1545        iid = None
1546        index_vals = None
1547
1548        # get the values of the keys which are used for distinct access
1549        # of grib messages via product
1550        if '/' in self.number:
1551            # more than one ensemble member is selected
1552            index_keys = ["number", "date", "time", "step"]
1553        else:
1554            index_keys = ["date", "time", "step"]
1555        iid, index_vals = self._mk_index_values(c.inputdir,
1556                                                inputfiles,
1557                                                index_keys)
1558        # index_vals looks like e.g.:
1559        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1560        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1561        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1562
1563        # "product" genereates each possible combination between the
1564        # values of the index keys
1565        for prod in product(*index_vals):
1566            # e.g. prod = ('20170505', '0', '12')
1567            #             (  date    ,time, step)
1568
1569            print('current product: ', prod)
1570
1571            for i in range(len(index_keys)):
1572                codes_index_select(iid, index_keys[i], prod[i])
1573
1574            # get first id from current product
1575            gid = codes_new_from_index(iid)
1576
1577            # if there is no data for this specific time combination / product
1578            # skip the rest of the for loop and start with next timestep/product
1579            if not gid:
1580                continue
1581#============================================================================================
1582            # remove old fort.* files and open new ones
1583            # they are just valid for a single product
1584            for k, f in fdict.items():
1585                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1586                silent_remove(fortfile)
1587                fdict[k] = open(fortfile, 'wb')
1588#============================================================================================
1589            # create correct timestamp from the three time informations
1590            cdate = str(codes_get(gid, 'date'))
1591            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
1592            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1593            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1594            timestamp += timedelta(hours=int(cstep))
1595            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1596
1597            # if basetime is used, adapt start/end date period
1598            if c.basetime is not None:
1599                time_delta = timedelta(hours=12-int(c.dtime))
1600                start_period = datetime.strptime(c.end_date + str(c.basetime),
1601                                               '%Y%m%d%H') - time_delta
1602                end_period = datetime.strptime(c.end_date + str(c.basetime),
1603                                             '%Y%m%d%H')
1604
1605            # skip all temporary times
1606            # which are outside the retrieval period
1607            if timestamp < start_period or \
1608               timestamp > end_period:
1609                continue
1610
1611
1612            # @WRF
1613            # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1614            #
1615            # UNDER CONSTRUCTION !!!
1616            #
1617            #if c.wrf:
1618            #    if 'olddate' not in locals() or cdate != olddate:
1619            #        fwrf = open(os.path.join(c.outputdir,
1620            #                    'WRF' + cdate + '.' + ctime + '.000.grb2'), 'wb')
1621            #        olddate = cdate[:]
1622#============================================================================================
1623            # savedfields remembers which fields were already used.
1624            savedfields = []
1625            # sum of cloud liquid and ice water content
1626            scwc = None
1627            while 1:
1628                if not gid:
1629                    break
1630                paramId = codes_get(gid, 'paramId')
1631                gridtype = codes_get(gid, 'gridType')
1632                if paramId == 77: # ETADOT
1633                    codes_write(gid, fdict['21'])
1634                elif paramId == 130: # T
1635                    codes_write(gid, fdict['11'])
1636                elif paramId == 131 or paramId == 132: # U, V wind component
1637                    codes_write(gid, fdict['10'])
1638                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1639                    codes_write(gid, fdict['17'])
1640                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1641                    codes_write(gid, fdict['18'])
1642                elif paramId == 135: # W
1643                    codes_write(gid, fdict['19'])
1644                elif paramId == 152: # LNSP
1645                    codes_write(gid, fdict['12'])
1646                elif paramId == 155 and gridtype == 'sh': # D
1647                    codes_write(gid, fdict['13'])
1648                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1649                    # sum cloud liquid water and ice
1650                    if scwc is None:
1651                        scwc = codes_get_values(gid)
1652                    else:
1653                        scwc += codes_get_values(gid)
1654                        codes_set_values(gid, scwc)
1655                        codes_set(gid, 'paramId', 201031)
1656                        codes_write(gid, fdict['22'])
1657                        scwc = None
1658                # @WRF
1659                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1660                #
1661                # UNDER CONSTRUCTION !!!
1662                #
1663                #elif c.wrf and paramId in [129, 138, 155] and \
1664                #      levtype == 'hybrid': # Z, VO, D
1665                #    # do not do anything right now
1666                #    # these are specific parameter for WRF
1667                #    pass
1668                else:
1669                    if paramId not in savedfields:
1670                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1671                        # and all ADDPAR parameter
1672                        codes_write(gid, fdict['16'])
1673                        savedfields.append(paramId)
1674                    else:
1675                        print('duplicate ' + str(paramId) + ' not written')
1676                # @WRF
1677                # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1678                #
1679                # UNDER CONSTRUCTION !!!
1680                #
1681                #try:
1682                #    if c.wrf:
1683                #        # model layer
1684                #        if levtype == 'hybrid' and \
1685                #           paramId in [129, 130, 131, 132, 133, 138, 155]:
1686                #            codes_write(gid, fwrf)
1687                #        # sfc layer
1688                #        elif paramId in wrfpars:
1689                #            codes_write(gid, fwrf)
1690                #except AttributeError:
1691                #    pass
1692
1693                codes_release(gid)
1694                gid = codes_new_from_index(iid)
1695#============================================================================================
1696            for f in fdict.values():
1697                f.close()
1698#============================================================================================
1699            # call for Fortran program to convert e.g. reduced_gg grids to
1700            # regular_ll and calculate detadot/dp
1701            pwd = os.getcwd()
1702            os.chdir(c.inputdir)
1703            if os.stat('fort.21').st_size == 0 and c.eta:
1704                print('Parameter 77 (etadot) is missing, most likely it is '
1705                      'not available for this type or date / time\n')
1706                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1707                my_error('fort.21 is empty while parameter eta '
1708                         'is set to 1 in CONTROL file')
1709# ============================================================================================
1710            # write out all output to log file before starting fortran programm
1711            sys.stdout.flush()
1712
1713            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1714            execute_subprocess([os.path.join(c.exedir,
1715                                             _config.FORTRAN_EXECUTABLE)],
1716                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1717
1718            os.chdir(pwd)
1719# ============================================================================================
1720            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1721            # for CERA-20C we need all 4 digits for the year sinc 1900 - 2010
1722            if c.purefc:
1723                if c.marsclass == 'EP':
1724                    suffix = cdate[0:8] + '.' + ctime + '.' + cstep
1725                else:
1726                    suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1727            else:
1728                if c.marsclass == 'EP':
1729                    suffix = cdate_hour[0:10]
1730                else:
1731                    suffix = cdate_hour[2:10]
1732
1733            # if necessary, add ensemble member number to filename suffix
1734            if 'number' in index_keys:
1735                index_number = index_keys.index('number')
1736                if len(index_vals[index_number]) > 1:
1737                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1738
1739            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1740            print("outputfile = " + fnout)
1741            # collect for final processing
1742            self.outputfilelist.append(os.path.basename(fnout))
1743            # # get additional precipitation subgrid data if available
1744            # if c.rrint:
1745                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1746                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
1747# ============================================================================================
1748            # create outputfile and copy all data from intermediate files
1749            # to the outputfile (final GRIB input files for FLEXPART)
1750            orolsm = os.path.basename(glob.glob(c.inputdir +
1751                                                '/OG_OROLSM__SL.*.' +
1752                                                c.ppid +
1753                                                '*')[0])
1754            if c.marsclass == 'EP':
1755                fluxfile = 'flux' + suffix
1756            else:
1757                fluxfile = 'flux' + cdate[0:2] + suffix
1758            if not c.cwc:
1759                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1760            else:
1761                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1762
1763            with open(fnout, 'wb') as fout:
1764                for f in flist:
1765                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1766                                       fout)
1767
1768            if c.omega:
1769                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1770                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1771                                            'rb'), fout)
1772# ============================================================================================
1773
1774        # @WRF
1775        # THIS IS NOT YET CORRECTLY IMPLEMENTED !!!
1776        #
1777        # UNDER CONSTRUCTION !!!
1778        #
1779        #if c.wrf:
1780        #    fwrf.close()
1781
1782        codes_index_release(iid)
1783
1784        return
1785
1786
1787    def calc_extra_elda(self, path, prefix):
1788        ''' Calculates extra ensemble members for ELDA - Stream.
1789
1790        This is a specific feature which doubles the number of ensemble members
1791        for the ELDA Stream.
1792
1793        Parameters
1794        ----------
1795        path : str
1796            Path to the output files.
1797
1798        prefix : str
1799            The prefix of the output filenames as defined in Control file.
1800
1801        Return
1802        ------
1803
1804        '''
1805        from eccodes import (codes_grib_new_from_file, codes_get_array,
1806                             codes_set_array, codes_release,
1807                             codes_set, codes_write)
1808
1809        # max number
1810        maxnum = int(self.number.split('/')[-1])
1811
1812        # get a list of all prepared output files with control forecast (CF)
1813        cf_filelist = UioFiles(path, prefix + '*.N000')
1814        cf_filelist.files = sorted(cf_filelist.files)
1815
1816        for cffile in cf_filelist.files:
1817            with open(cffile, 'rb') as f:
1818                cfvalues = []
1819                while True:
1820                    fid = codes_grib_new_from_file(f)
1821                    if fid is None:
1822                        break
1823                    cfvalues.append(codes_get_array(fid, 'values'))
1824                    codes_release(fid)
1825
1826            filename = cffile.split('N000')[0]
1827            for i in range(1, maxnum + 1):
1828                # read an ensemble member
1829                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1830                # create file for newly calculated ensemble member
1831                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1832                # number of message in grib file
1833                j = 0
1834                while True:
1835                    gid = codes_grib_new_from_file(g)
1836                    if gid is None:
1837                        break
1838                    values = codes_get_array(gid, 'values')
1839                    # generate a new ensemble member by subtracting
1840                    # 2 * ( current time step value - last time step value )
1841                    codes_set_array(gid, 'values',
1842                                    values-2*(values-cfvalues[j]))
1843                    codes_set(gid, 'number', i+maxnum)
1844                    codes_write(gid, h)
1845                    codes_release(gid)
1846                    j += 1
1847
1848                g.close()
1849                h.close()
1850                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1851                self.outputfilelist.append(
1852                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1853
1854        return
1855
1856
1857    def process_output(self, c):
1858        '''Postprocessing of FLEXPART input files.
1859
1860        The grib files are postprocessed depending on the selection in
1861        CONTROL file. The resulting files are moved to the output
1862        directory if its not equal to the input directory.
1863        The following modifications might be done if
1864        properly switched in CONTROL file:
1865        GRIB2 - Conversion to GRIB2
1866        ECTRANS - Transfer of files to gateway server
1867        ECSTORAGE - Storage at ECMWF server
1868
1869        Parameters
1870        ----------
1871        c : ControlFile
1872            Contains all the parameters of CONTROL file and
1873            command line.
1874
1875        Return
1876        ------
1877
1878        '''
1879
1880        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1881
1882        if _config.FLAG_ON_ECMWFSERVER:
1883            print('ecstorage: {}\n ecfsdir: {}\n'.
1884                  format(c.ecstorage, c.ecfsdir))
1885            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1886                  .format(c.ectrans, c.gateway, c.destination))
1887
1888        print('Output filelist: ')
1889        print(sorted(self.outputfilelist))
1890
1891        for ofile in self.outputfilelist:
1892            ofile = os.path.join(self.inputdir, ofile)
1893
1894            if c.format.lower() == 'grib2':
1895                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1896                                    'productDefinitionTemplateNumber=8',
1897                                    ofile, ofile + '_2'],
1898                                   error_msg='GRIB2 CONVERSION FAILED!')
1899
1900                execute_subprocess(['mv', ofile + '_2', ofile],
1901                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1902                                   'FILES FAILED!')
1903
1904            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
1905                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1906                                    c.gateway, '-remote', c.destination,
1907                                    '-source', ofile],
1908                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1909
1910            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
1911                execute_subprocess(['ecp', '-o', ofile,
1912                                    os.path.expandvars(c.ecfsdir)],
1913                                   error_msg='COPY OF FILES TO ECSTORAGE '
1914                                   'AREA FAILED!')
1915
1916            if c.outputdir != c.inputdir:
1917                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1918                                    c.outputdir],
1919                                   error_msg='RELOCATION OF OUTPUT FILES '
1920                                   'TO OUTPUTDIR FAILED!')
1921
1922        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG