source: flex_extract.git/source/python/classes/EcFlexpart.py @ d4696e0

ctbtodev
Last change on this file since d4696e0 was d4696e0, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

bugfix retrievement with basetime parameter

  • Property mode set to 100644
File size: 68.7 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
45#
46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
50#*******************************************************************************
51#pylint: disable=unsupported-assignment-operation
52# this is disabled because for this specific case its an error in pylint
53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
59import sys
60import glob
61import shutil
62import subprocess
63from datetime import datetime, timedelta
64import numpy as np
65
66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
69                     codes_index_release, codes_index_get)
70
71# software specific classes and modules from flex_extract
72sys.path.append('../')
73import _config
74from GribUtil import GribUtil
75from mods.tools import (init128, to_param_id, silent_remove, product,
76                        my_error, make_dir, get_informations, get_dimensions,
77                        execute_subprocess)
78from MarsRetrieval import MarsRetrieval
79import mods.disaggregation as disaggregation
80
81# ------------------------------------------------------------------------------
82# CLASS
83# ------------------------------------------------------------------------------
84class EcFlexpart(object):
85    '''
86    Class to represent FLEXPART specific ECMWF data.
87
88    FLEXPART needs grib files in a specifc format. All necessary data fields
89    for one time step are stored in a single file. The class represents an
90    instance with all the parameter and settings necessary for retrieving
91    MARS data and modifing them so they are fitting FLEXPART needs. The class
92    is able to disaggregate the fluxes and convert grid types to the one needed
93    by FLEXPART, therefore using the FORTRAN program.
94
95    Attributes
96    ----------
97    mreq_count : int
98        Counter for the number of generated mars requests.
99
100    inputdir : str
101        Path to the directory where the retrieved data is stored.
102
103    dataset : str
104        For public datasets there is the specific naming and parameter
105        dataset which has to be used to characterize the type of
106        data.
107
108    basetime : int
109        The time for a half day retrieval. The 12 hours upfront are to be
110        retrieved.
111
112    dtime : str
113        Time step in hours.
114
115    acctype : str
116        The field type for the accumulated forecast fields.
117
118    acctime : str
119        The starting time from the accumulated forecasts.
120
121    accmaxstep : str
122        The maximum forecast step for the accumulated forecast fields.
123
124    marsclass : str
125        Characterisation of dataset.
126
127    stream : str
128        Identifies the forecasting system used to generate the data.
129
130    number : str
131        Selects the member in ensemble forecast run.
132
133    resol : str
134        Specifies the desired triangular truncation of retrieved data,
135        before carrying out any other selected post-processing.
136
137    accuracy : str
138        Specifies the number of bits per value to be used in the
139        generated GRIB coded fields.
140
141    addpar : str
142        List of additional parameters to be retrieved.
143
144    level : str
145        Specifies the maximum level.
146
147    expver : str
148        The version of the dataset.
149
150    levelist : str
151        Specifies the required levels.
152
153    glevelist : str
154        Specifies the required levels for gaussian grids.
155
156    gaussian : str
157        This parameter is deprecated and should no longer be used.
158        Specifies the desired type of Gaussian grid for the output.
159
160    grid : str
161        Specifies the output grid which can be either a Gaussian grid
162        or a Latitude/Longitude grid.
163
164    area : str
165        Specifies the desired sub-area of data to be extracted.
166
167    purefc : int
168        Switch for definition of pure forecast mode or not.
169
170    outputfilelist : list of str
171        The final list of FLEXPART ready input files.
172
173    types : dictionary
174        Determines the combination of type of fields, time and forecast step
175        to be retrieved.
176
177    params : dictionary
178        Collection of grid types and their corresponding parameters,
179        levels, level types and the grid definition.
180
181    server : ECMWFService or ECMWFDataServer
182        This is the connection to the ECMWF data servers.
183
184    public : int
185        Decides which Web API Server version is used.
186
187    dates : str
188        Contains start and end date of the retrieval in the format
189        "YYYYMMDD/to/YYYYMMDD"
190    '''
191
192    # --------------------------------------------------------------------------
193    # CLASS FUNCTIONS
194    # --------------------------------------------------------------------------
195    def __init__(self, c, fluxes=False):
196        '''Creates an object/instance of EcFlexpart with the associated
197        settings of its attributes for the retrieval.
198
199        Parameters:
200        -----------
201        c : ControlFile
202            Contains all the parameters of CONTROL file and
203            command line.
204
205        fluxes : boolean, optional
206            Decides if the flux parameter settings are stored or
207            the rest of the parameter list.
208            Default value is False.
209
210        Return
211        ------
212
213        '''
214        # set a counter for the number of generated mars requests
215        self.mreq_count = 0
216
217        self.inputdir = c.inputdir
218        self.dataset = c.dataset
219        self.basetime = c.basetime
220        self.dtime = c.dtime
221        self.acctype = c.acctype
222        self.acctime = c.acctime
223        self.accmaxstep = c.accmaxstep
224        self.marsclass = c.marsclass
225        self.stream = c.stream
226        self.number = c.number
227        self.resol = c.resol
228        self.accuracy = c.accuracy
229        self.addpar = c.addpar
230        self.level = c.level
231        self.expver = c.expver
232        self.levelist = c.levelist
233        self.glevelist = '1/to/' + c.level # in case of gaussian grid
234        self.gaussian = c.gaussian
235        self.grid = c.grid
236        self.area = c.area
237        self.purefc = c.purefc
238        self.outputfilelist = []
239
240        # Define the different types of field combinations (type, time, step)
241        self.types = {}
242        # Define the parameters and their level types, level list and
243        # grid resolution for the retrieval job
244        self.params = {}
245
246        if fluxes:
247            self._create_params_fluxes()
248        else:
249            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
250
251        if fluxes and not c.purefc:
252            self._create_field_types_fluxes()
253        else:
254            self._create_field_types(c.type, c.time, c.step)
255        return
256
257    def _create_field_types(self, ftype, ftime, fstep):
258        '''Create the combination of field type, time and forecast step.
259
260        Parameters:
261        -----------
262        ftype : list of str
263            List of field types.
264
265        ftime : list of str
266            The time in hours of the field.
267
268        fstep : str
269            Specifies the forecast time step from forecast base time.
270            Valid values are hours (HH) from forecast base time.
271
272        Return
273        ------
274
275        '''
276        i = 0
277        for ty, st, ti in zip(ftype, fstep, ftime):
278            btlist = range(len(ftime))
279            if self.basetime == 12:
280                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
281            if self.basetime == 0:
282                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
283
284            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
285                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
286                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
287                 # (int(c.time[i]) in btlist or c.purefc):
288
289            if (i in btlist) or self.purefc:
290
291                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
292                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
293
294                    if ty not in self.types.keys():
295                        self.types[ty] = {'times': '', 'steps': ''}
296
297                    if ti not in self.types[ty]['times']:
298                        if self.types[ty]['times']:
299                            self.types[ty]['times'] += '/'
300                        self.types[ty]['times'] += ti
301
302                    if st not in self.types[ty]['steps']:
303                        if self.types[ty]['steps']:
304                            self.types[ty]['steps'] += '/'
305                        self.types[ty]['steps'] += st
306            i += 1
307
308        return
309
310    def _create_field_types_fluxes(self):
311        '''Create the combination of field type, time and forecast step
312        for the flux data.
313
314        Parameters:
315        -----------
316
317        Return
318        ------
319
320        '''
321        self.types[str(self.acctype)] = {'times': str(self.acctime),
322                                         'steps': '{}/to/{}/by/{}'.format(
323                                             self.dtime,
324                                             self.accmaxstep,
325                                             self.dtime)}
326        return
327
328    def _create_params(self, gauss, eta, omega, cwc, wrf):
329        '''Define the specific parameter settings for retrievment.
330
331        The different parameters need specific grid types and level types
332        for retrievement. We might get following combination of types
333        (depending on selection and availability):
334        (These are short cuts for the grib file names (leading sequence)
335        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
336        where:
337            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
338            ML = Model Level, SL = Surface Level
339
340        For each of this combination there is a list of parameter names,
341        the level type, the level list and the grid resolution.
342
343        There are different scenarios for data extraction from MARS:
344        1) Retrieval of etadot
345           eta=1, gauss=0, omega=0
346        2) Calculation of etadot from divergence
347           eta=0, gauss=1, omega=0
348        3) Calculation of etadot from omega (for makes sense for debugging)
349           eta=0, gauss=0, omega=1
350        4) Retrieval and Calculation of etadot (only for debugging)
351           eta=1, gauss=1, omega=0
352        5) Download also specific model and surface level data for FLEXPART-WRF
353
354        Parameters:
355        -----------
356        gauss : int
357            Gaussian grid is retrieved.
358
359        eta : int
360            Etadot parameter will be directly retrieved.
361
362        omega : int
363            The omega paramterwill be retrieved.
364
365        cwc : int
366            The cloud liquid and ice water content will be retrieved.
367
368        wrf : int
369            Additional model level and surface level data will be retrieved for
370            WRF/FLEXPART-WRF simulations.
371
372        Return
373        ------
374
375        '''
376        # SURFACE FIELDS
377        #-----------------------------------------------------------------------
378        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
379        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
380                                 'SFC', '1', self.grid]
381        if self.addpar:
382            self.params['OG__SL'][0] += self.addpar
383
384        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
385            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
386                                            'SFC', '1', self.grid]
387        else:
388            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
389                                            'SFC', '1', self.grid]
390
391        # MODEL LEVEL FIELDS
392        #-----------------------------------------------------------------------
393        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
394
395        if not gauss and eta:
396            self.params['OG__ML'][0] += '/U/V/ETADOT'
397        elif gauss and not eta:
398            self.params['GG__SL'] = ['Q', 'ML', '1', \
399                                     '{}'.format((int(self.resol) + 1) / 2)]
400            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
401        elif not gauss and not eta:
402            self.params['OG__ML'][0] += '/U/V'
403        else:
404            print('Warning: Collecting etadot and parameters for gaussian grid \
405                            is a very costly parameter combination, \
406                            use this combination only for debugging!')
407            self.params['GG__SL'] = ['Q', 'ML', '1', \
408                                     '{}'.format((int(self.resol) + 1) / 2)]
409            self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
410                                     '{}'.format((int(self.resol) + 1) / 2)]
411
412        if omega:
413            self.params['OG__ML'][0] += '/W'
414
415        if cwc:
416            self.params['OG__ML'][0] += '/CLWC/CIWC'
417
418        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
419        #-----------------------------------------------------------------------
420        if wrf:
421            self.params['OG__ML'][0] += '/Z/VO'
422            if '/D' not in self.params['OG__ML'][0]:
423                self.params['OG__ML'][0] += '/D'
424            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
425                       'SWVL1','SWVL2','SWVL3','SWVL4']
426            for par in wrf_sfc:
427                if par not in self.params['OG__SL'][0]:
428                    self.params['OG__SL'][0] += '/' + par
429
430        return
431
432
433    def _create_params_fluxes(self):
434        '''Define the parameter setting for flux data.
435
436        Flux data are accumulated fields in time and are stored on the
437        surface level. The leading short cut name for the grib files is:
438        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
439        acc for Accumulated Grid.
440        The params dictionary stores a list of parameter names, the level type,
441        the level list and the grid resolution.
442
443        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
444
445        Parameters:
446        -----------
447
448        Return
449        ------
450
451        '''
452        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
453                                    'SFC', '1', self.grid]
454        return
455
456
457    def _mk_targetname(self, ftype, param, date):
458        '''Creates the filename for the requested grib data to be stored in.
459        This name is passed as the "target" parameter in the request.
460
461        Parameters
462        ----------
463        ftype : str
464            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
465
466        param : str
467            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
468            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
469
470        date : str
471            The date period of the grib data to be stored in this file.
472
473        Return
474        ------
475        targetname : str
476            The target filename for the grib data.
477        '''
478        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
479                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
480
481        return targetname
482
483
484    def _start_retrievement(self, request, par_dict):
485        '''Creates the Mars Retrieval and prints or submits the request
486        depending on the status of the request variable.
487
488        Parameters
489        ----------
490        request : int
491            Selects the mode of retrieval.
492            0: Retrieves the data from ECMWF.
493            1: Prints the mars requests to an output file.
494            2: Retrieves the data and prints the mars request.
495
496        par_dict : dictionary
497            Contains all parameter which have to be set for creating the
498            Mars Retrievals. The parameter are:
499            marsclass, dataset, stream, type, levtype, levelist, resol,
500            gaussian, accuracy, grid, target, area, date, time, number,
501            step, expver, param
502
503        Return
504        ------
505
506        '''
507        # increase number of mars requests
508        self.mreq_count += 1
509
510        MR = MarsRetrieval(self.server,
511                           self.public,
512                           marsclass=par_dict['marsclass'],
513                           dataset=par_dict['dataset'],
514                           stream=par_dict['stream'],
515                           type=par_dict['type'],
516                           levtype=par_dict['levtype'],
517                           levelist=par_dict['levelist'],
518                           resol=par_dict['resol'],
519                           gaussian=par_dict['gaussian'],
520                           accuracy=par_dict['accuracy'],
521                           grid=par_dict['grid'],
522                           target=par_dict['target'],
523                           area=par_dict['area'],
524                           date=par_dict['date'],
525                           time=par_dict['time'],
526                           number=par_dict['number'],
527                           step=par_dict['step'],
528                           expver=par_dict['expver'],
529                           param=par_dict['param'])
530
531        if request == 0:
532            MR.display_info()
533            MR.data_retrieve()
534        elif request == 1:
535            MR.print_infodata_csv(self.inputdir, self.mreq_count)
536        elif request == 2:
537            MR.print_infodata_csv(self.inputdir, self.mreq_count)
538            MR.display_info()
539            MR.data_retrieve()
540        else:
541            print('Failure')
542
543        return
544
545
546    def _mk_index_values(self, inputdir, inputfiles, keys):
547        '''Creates an index file for a set of grib parameter keys.
548        The values from the index keys are returned in a list.
549
550        Parameters
551        ----------
552        keys : dictionary
553            List of parameter names which serves as index.
554
555        inputfiles : UioFiles
556            Contains a list of files.
557
558        Return
559        ------
560        iid : codes_index
561            This is a grib specific index structure to access
562            messages in a file.
563
564        index_vals : list of list  of str
565            Contains the values from the keys used for a distinct selection
566            of grib messages in processing  the grib files.
567            Content looks like e.g.:
568            index_vals[0]: ('20171106', '20171107', '20171108') ; date
569            index_vals[1]: ('0', '1200', '1800', '600') ; time
570            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
571        '''
572        iid = None
573        index_keys = keys
574
575        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
576        silent_remove(indexfile)
577        grib = GribUtil(inputfiles.files)
578        # creates new index file
579        iid = grib.index(index_keys=index_keys, index_file=indexfile)
580
581        # read the values of index keys
582        index_vals = []
583        for key in index_keys:
584            key_vals = codes_index_get(iid, key)
585            # have to sort the key values for correct disaggregation,
586            # therefore convert to int first
587            key_vals = [int(k) for k in key_vals]
588            key_vals.sort()
589            key_vals = [str(k) for k in key_vals]
590            index_vals.append(key_vals)
591            # index_vals looks for example like:
592            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
593            # index_vals[1]: ('0', '1200') ; time
594            # index_vals[2]: (3', '6', '9', '12') ; stepRange
595
596        return iid, index_vals
597
598
599    def retrieve(self, server, dates, public, request, inputdir='.'):
600        '''Finalizing the retrieval information by setting final details
601        depending on grid type.
602        Prepares MARS retrievals per grid type and submits them.
603
604        Parameters
605        ----------
606        server : ECMWFService or ECMWFDataServer
607            The connection to the ECMWF server. This is different
608            for member state users which have full access and non
609            member state users which have only access to the public
610            data sets. The decision is made from command line argument
611            "public"; for public access its True (ECMWFDataServer)
612            for member state users its False (ECMWFService)
613
614        dates : str
615            Contains start and end date of the retrieval in the format
616            "YYYYMMDD/to/YYYYMMDD"
617
618        request : int
619            Selects the mode of retrieval.
620            0: Retrieves the data from ECMWF.
621            1: Prints the mars requests to an output file.
622            2: Retrieves the data and prints the mars request.
623
624        inputdir : str, optional
625            Path to the directory where the retrieved data is about
626            to be stored. The default is the current directory ('.').
627
628        Return
629        ------
630
631        '''
632        self.dates = dates
633        self.server = server
634        self.public = public
635        self.inputdir = inputdir
636        oro = False
637
638        # define times with datetime module
639        t12h = timedelta(hours=12)
640        t24h = timedelta(hours=24)
641
642        # dictionary which contains all parameter for the mars request,
643        # entries with a "None" will change in different requests and will
644        # therefore be set in each request seperately
645        retr_param_dict = {'marsclass':self.marsclass,
646                           'dataset':self.dataset,
647                           'stream':None,
648                           'type':None,
649                           'levtype':None,
650                           'levelist':None,
651                           'resol':self.resol,
652                           'gaussian':None,
653                           'accuracy':self.accuracy,
654                           'grid':None,
655                           'target':None,
656                           'area':None,
657                           'date':None,
658                           'time':None,
659                           'number':self.number,
660                           'step':None,
661                           'expver':self.expver,
662                           'param':None}
663
664        for ftype in self.types:
665            # fk contains field types such as
666            #     [AN, FC, PF, CV]
667            # fv contains all of the items of the belonging key
668            #     [times, steps]
669            for pk, pv in self.params.iteritems():
670                # pk contains one of these keys of params
671                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
672                #      OG_OROLSM_SL, OG_acc_SL]
673                # pv contains all of the items of the belonging key
674                #     [param, levtype, levelist, grid]
675                if isinstance(pv, str):
676                    continue
677                retr_param_dict['type'] = ftype
678                retr_param_dict['time'] = self.types[ftype]['times']
679                retr_param_dict['step'] = self.types[ftype]['steps']
680                retr_param_dict['date'] = self.dates
681                retr_param_dict['stream'] = self.stream
682                retr_param_dict['target'] = \
683                    self._mk_targetname(ftype,
684                                        pk,
685                                        retr_param_dict['date'].split('/')[0])
686                retr_param_dict['param'] = pv[0]
687                retr_param_dict['levtype'] = pv[1]
688                retr_param_dict['levelist'] = pv[2]
689                retr_param_dict['grid'] = pv[3]
690                retr_param_dict['area'] = self.area
691                retr_param_dict['gaussian'] = self.gaussian
692
693                if pk == 'OG_OROLSM__SL' and not oro:
694                    oro = True
695                    # in CERA20C (class EP) there is no stream "OPER"!
696                    if self.marsclass.upper() != 'EP':
697                        retr_param_dict['stream'] = 'OPER'
698                    retr_param_dict['type'] = 'AN'
699                    retr_param_dict['time'] = '00'
700                    retr_param_dict['step'] = '000'
701                    retr_param_dict['date'] = self.dates.split('/')[0]
702                    retr_param_dict['target'] = self._mk_targetname('',
703                                            pk, retr_param_dict['date'])
704                elif pk == 'OG_OROLSM__SL' and oro:
705                    continue
706                if pk == 'GG__SL' and pv[0] == 'Q':
707                    retr_param_dict['area'] = ""
708                    retr_param_dict['gaussian'] = 'reduced'
709
710    # ------  on demand path  --------------------------------------------------
711                if self.basetime is None:
712                    # ******* start retrievement
713                    self._start_retrievement(request, retr_param_dict)
714    # ------  operational path  ------------------------------------------------
715                else:
716                    # check if mars job requests fields beyond basetime.
717                    # if yes eliminate those fields since they may not
718                    # be accessible with user's credentials
719
720                    enddate = retr_param_dict['date'].split('/')[-1]
721                    elimit = datetime.strptime(enddate + str(self.basetime),
722                                               '%Y%m%d%H')
723
724                    if self.basetime == 12:
725                        # --------------  flux data ----------------------------
726                        if 'acc' in pk:
727                            startdate = retr_param_dict['date'].split('/')[0]
728                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
729                            retr_param_dict['date'] = '/'.join([startdate,
730                                                                'to',
731                                                                enddate])
732
733                            # ******* start retrievement
734                            self._start_retrievement(request, retr_param_dict)
735
736                            retr_param_dict['date'] = \
737                                datetime.strftime(elimit - t12h, '%Y%m%d')
738                            retr_param_dict['time'] = '00'
739                            retr_param_dict['target'] = \
740                                self._mk_targetname(ftype, pk,
741                                                    retr_param_dict['date'])
742
743                            # ******* start retrievement
744                            self._start_retrievement(request, retr_param_dict)
745
746                        # --------------  non flux data ------------------------
747                        else:
748                            # ******* start retrievement
749                            self._start_retrievement(request, retr_param_dict)
750
751                    elif self.basetime == 0:
752                        retr_param_dict['date'] = \
753                            datetime.strftime(elimit - t24h, '%Y%m%d')
754
755                        timesave = ''.join(retr_param_dict['time'])
756
757                        if ('/' in retr_param_dict['time'] and
758                            pk != 'OG_OROLSM__SL' and
759                            'acc' not in pk ) :
760                            times = retr_param_dict['time'].split('/')
761                            steps = retr_param_dict['step'].split('/')
762                            print 'times', times, int(times[0]), times[1:]
763                            print 'steps', steps, int(steps[0])
764                            while int(times[0]) + int(steps[0]) <= 12:
765                                print 'HELLO'
766                                times = times[1:]
767                                print 'in while 1 ', times
768
769                                if len(times) > 1:
770                                    retr_param_dict['time'] = '/'.join(times)
771                                else:
772                                    retr_param_dict['time'] = times[0]
773
774                                print 'in while 2 ', times
775                                print retr_param_dict['time']
776
777                        if (pk != 'OG_OROLSM__SL' and
778                            int(retr_param_dict['step'].split('/')[0]) == 0 and
779                            int(timesave.split('/')[0]) == 0):
780
781                            retr_param_dict['date'] = \
782                                datetime.strftime(elimit, '%Y%m%d')
783                            retr_param_dict['time'] = '00'
784                            retr_param_dict['step'] = '000'
785                            retr_param_dict['target'] = \
786                                self._mk_targetname(ftype, pk,
787                                                    retr_param_dict['date'])
788
789                        # ******* start retrievement
790                        self._start_retrievement(request, retr_param_dict)
791                    else:
792                        raise ValueError('ERROR: Basetime has an invalid value '
793                                                 '-> {}'.format(str(basetime)))
794
795        if request == 0 or request == 2:
796            print('MARS retrieve done ... ')
797        elif request == 1:
798            print('MARS request printed ...')
799
800        return
801
802
803    def write_namelist(self, c):
804        '''Creates a namelist file in the temporary directory and writes
805        the following values to it: maxl, maxb, mlevel,
806        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
807        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
808
809        Parameters
810        ----------
811        c : ControlFile
812            Contains all the parameters of CONTROL file and
813            command line.
814
815        filename : str
816                Name of the namelist file.
817
818        Return
819        ------
820
821        '''
822
823        from genshi.template.text import NewTextTemplate
824        from genshi.template import  TemplateLoader
825        from genshi.template.eval import  UndefinedError
826
827        try:
828            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
829            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
830                                            cls=NewTextTemplate)
831
832            self.inputdir = c.inputdir
833            area = np.asarray(self.area.split('/')).astype(float)
834            grid = np.asarray(self.grid.split('/')).astype(float)
835
836            if area[1] > area[3]:
837                area[1] -= 360
838            maxl = int((area[3] - area[1]) / grid[1]) + 1
839            maxb = int((area[0] - area[2]) / grid[0]) + 1
840
841            stream = namelist_template.generate(
842                maxl = str(maxl),
843                maxb = str(maxb),
844                mlevel = str(self.level),
845                mlevelist = str(self.levelist),
846                mnauf = str(self.resol),
847                metapar = '77',
848                rlo0 = str(area[1]),
849                rlo1 = str(area[3]),
850                rla0 = str(area[2]),
851                rla1 = str(area[0]),
852                momega = str(c.omega),
853                momegadiff = str(c.omegadiff),
854                mgauss = str(c.gauss),
855                msmooth = str(c.smooth),
856                meta = str(c.eta),
857                metadiff = str(c.etadiff),
858                mdpdeta = str(c.dpdeta)
859            )
860        except UndefinedError as e:
861            print('... ERROR ' + str(e))
862
863            sys.exit('\n... error occured while trying to generate namelist ' +
864                     _config.TEMPFILE_NAMELIST)
865        except OSError as e:
866            print('... ERROR CODE: ' + str(e.errno))
867            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
868
869            sys.exit('\n... error occured while trying to generate template ' +
870                     _config.TEMPFILE_NAMELIST)
871
872        try:
873            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
874
875            with open(namelistfile, 'w') as f:
876                f.write(stream.render('text'))
877        except OSError as e:
878            print('... ERROR CODE: ' + str(e.errno))
879            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
880
881            sys.exit('\n... error occured while trying to write ' +
882                     namelistfile)
883
884        return
885
886
887    def deacc_fluxes(self, inputfiles, c):
888        '''De-accumulate and disaggregate flux data.
889
890        Goes through all flux fields in ordered time and de-accumulate
891        the fields. Afterwards the fields are disaggregated in time.
892        Different versions of disaggregation is provided for rainfall
893        data (darain, modified linear) and the surface fluxes and
894        stress data (dapoly, cubic polynomial).
895
896        Parameters
897        ----------
898        inputfiles : UioFiles
899            Contains the list of files that contain flux data.
900
901        c : ControlFile
902            Contains all the parameters of CONTROL file and
903            command line.
904
905        Return
906        ------
907
908        '''
909
910        table128 = init128(_config.PATH_GRIBTABLE)
911        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
912
913        iid = None
914        index_vals = None
915
916        # get the values of the keys which are used for distinct access
917        # of grib messages via product
918        index_keys = ["date", "time", "step"]
919        iid, index_vals = self._mk_index_values(c.inputdir,
920                                                inputfiles,
921                                                index_keys)
922        # index_vals looks like e.g.:
923        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
924        # index_vals[1]: ('0', '600', '1200', '1800') ; time
925        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
926
927        if c.rrint:
928            if not c.purefc:
929                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
930                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
931            else:
932                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
933                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
934                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
935                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
936                end_date = end_date + timedelta(hours=c.maxstep)
937
938            info = get_informations(os.path.join(c.inputdir,
939                                                 inputfiles.files[0]))
940            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
941                                  start_date, end_date)
942            # create numpy array
943            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
944            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
945            it_lsp = 0
946            it_cp = 0
947            date_list = []
948            step_list = []
949
950        # initialize dictionaries to store values
951        orig_vals = {}
952        deac_vals = {}
953        for p in pars:
954            orig_vals[p] = []
955            deac_vals[p] = []
956
957        # "product" genereates each possible combination between the
958        # values of the index keys
959        for prod in product(*index_vals):
960            # e.g. prod = ('20170505', '0', '12')
961            #             (  date    ,time, step)
962
963            print('CURRENT PRODUCT: ', prod)
964
965            for i in range(len(index_keys)):
966                codes_index_select(iid, index_keys[i], prod[i])
967
968            # get first id from current product
969            gid = codes_new_from_index(iid)
970
971            # if there is no data for this specific time combination / product
972            # skip the rest of the for loop and start with next timestep/product
973            if not gid:
974                continue
975
976            # create correct timestamp from the three time informations
977            cdate = str(codes_get(gid, 'date'))
978            time = codes_get(gid, 'time')/100 # integer
979            step = codes_get(gid, 'step') # integer
980            ctime = '{:0>2}'.format(time)
981            cstep = '{:0>3}'.format(step)
982
983            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
984            t_dt = t_date + timedelta(hours=step)
985            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
986            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
987            if c.basetime is not None:
988                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
989                                              '%Y%m%d%H')
990            else:
991                t_enddate = t_date + timedelta(2*int(c.dtime))
992
993            if c.purefc:
994                fnout = os.path.join(c.inputdir, 'flux' +
995                                     t_date.strftime('%Y%m%d.%H') +
996                                     '.{:0>3}'.format(step-2*int(c.dtime)))
997                gnout = os.path.join(c.inputdir, 'flux' +
998                                     t_date.strftime('%Y%m%d.%H') +
999                                     '.{:0>3}'.format(step-int(c.dtime)))
1000                hnout = os.path.join(c.inputdir, 'flux' +
1001                                     t_date.strftime('%Y%m%d.%H') +
1002                                     '.{:0>3}'.format(step))
1003            else:
1004                fnout = os.path.join(c.inputdir, 'flux' +
1005                                     t_m2dt.strftime('%Y%m%d%H'))
1006                gnout = os.path.join(c.inputdir, 'flux' +
1007                                     t_m1dt.strftime('%Y%m%d%H'))
1008                hnout = os.path.join(c.inputdir, 'flux' +
1009                                     t_dt.strftime('%Y%m%d%H'))
1010
1011            print("outputfile = " + fnout)
1012            f_handle = open(fnout, 'w')
1013            h_handle = open(hnout, 'w')
1014            g_handle = open(gnout, 'w')
1015
1016            # read message for message and store relevant data fields, where
1017            # data keywords are stored in pars
1018            while True:
1019                if not gid:
1020                    break
1021                parId = codes_get(gid, 'paramId') # integer
1022                step = codes_get(gid, 'step') # integer
1023                time = codes_get(gid, 'time') # integer
1024                ni = codes_get(gid, 'Ni') # integer
1025                nj = codes_get(gid, 'Nj') # integer
1026                if parId not in orig_vals.keys():
1027                    # parameter is not a flux, find next one
1028                    continue
1029
1030                # define conversion factor
1031                if parId == 142 or parId == 143:
1032                    fak = 1. / 1000.
1033                else:
1034                    fak = 3600.
1035
1036                # get parameter values and reshape
1037                values = codes_get_values(gid)
1038                values = (np.reshape(values, (nj, ni))).flatten() / fak
1039
1040                # save the original and accumulated values
1041                orig_vals[parId].append(values[:])
1042
1043                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1044                    # no de-accumulation needed
1045                    deac_vals[parId].append(values[:] / int(c.dtime))
1046                else:
1047                    # do de-accumulation
1048                    deac_vals[parId].append(
1049                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1050                         int(c.dtime))
1051
1052                # store precipitation if new disaggregation method is selected
1053                # only the exact days are needed
1054                if c.rrint:
1055                    if start_date <= t_dt <= end_date:
1056                        if not c.purefc:
1057                            if t_dt not in date_list:
1058                                date_list.append(t_dt)
1059                                step_list = [0]
1060                        else:
1061                            if t_date not in date_list:
1062                                date_list.append(t_date)
1063                            if step not in step_list:
1064                                step_list.append(step)
1065                        if c.rrint and parId == 142:
1066                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
1067                            it_lsp += 1
1068                        elif c.rrint and parId == 143:
1069                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
1070                            it_cp += 1
1071
1072                # information printout
1073                print(parId, time, step, len(values), values[0], np.std(values))
1074
1075                # length of deac_vals[parId] corresponds to the
1076                # number of time steps, max. 4 are needed for disaggegration
1077                # with the old and original method
1078                # run over all grib messages and perform
1079                # shifting in time
1080                if len(deac_vals[parId]) >= 3:
1081                    if len(deac_vals[parId]) > 3:
1082                        if not c.rrint and (parId == 142 or parId == 143):
1083                            values = disaggregation.darain(deac_vals[parId])
1084                        else:
1085                            values = disaggregation.dapoly(deac_vals[parId])
1086
1087                        if not (step == c.maxstep and c.purefc \
1088                                or t_dt == t_enddate):
1089                            # remove first time step in list to shift
1090                            # time line
1091                            orig_vals[parId].pop(0)
1092                            deac_vals[parId].pop(0)
1093                    else:
1094                        # if the third time step is read (per parId),
1095                        # write out the first one as a boundary value
1096                        if c.purefc:
1097                            values = deac_vals[parId][1]
1098                        else:
1099                            values = deac_vals[parId][0]
1100
1101                    if not (c.rrint and (parId == 142 or parId == 143)):
1102                        codes_set_values(gid, values)
1103
1104                        if c.purefc:
1105                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1106                        else:
1107                            codes_set(gid, 'stepRange', 0)
1108                            codes_set(gid, 'time', t_m2dt.hour*100)
1109                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1110
1111                        codes_write(gid, f_handle)
1112
1113                        # squeeze out information of last two steps
1114                        # contained in deac_vals[parId]
1115                        # Note that deac_vals[parId][0] has not been popped
1116                        # in this case
1117
1118                        if step == c.maxstep and c.purefc or \
1119                           t_dt == t_enddate:
1120                            # last step
1121                            if c.purefc:
1122                                values = deac_vals[parId][3]
1123                                codes_set_values(gid, values)
1124                                codes_set(gid, 'stepRange', step)
1125                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1126                                codes_write(gid, h_handle)
1127                            else:
1128                                values = deac_vals[parId][3]
1129                                codes_set_values(gid, values)
1130                                codes_set(gid, 'stepRange', 0)
1131                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1132                                codes_set(gid, 'time', truedatetime.hour * 100)
1133                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1134                                codes_write(gid, h_handle)
1135
1136                            if parId == 142 or parId == 143:
1137                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1138                            else:
1139                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1140
1141                            # step before last step
1142                            if c.purefc:
1143                                codes_set(gid, 'stepRange', step-int(c.dtime))
1144                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1145                                codes_set_values(gid, values)
1146                                codes_write(gid, g_handle)
1147                            else:
1148                                codes_set(gid, 'stepRange', 0)
1149                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1150                                codes_set(gid, 'time', truedatetime.hour * 100)
1151                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1152                                codes_set_values(gid, values)
1153                                codes_write(gid, g_handle)
1154
1155                codes_release(gid)
1156
1157                gid = codes_new_from_index(iid)
1158
1159            f_handle.close()
1160            g_handle.close()
1161            h_handle.close()
1162
1163        codes_index_release(iid)
1164
1165        if c.rrint:
1166            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1167
1168            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1169                                 cp_np, date_list, step_list, c)
1170
1171        return
1172
1173    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1174        '''Calculates and writes out the disaggregated precipitation fields.
1175
1176        Disaggregation is done in time and original times are written to the
1177        flux files, while the additional subgrid times are written to
1178        extra files.
1179
1180        Parameters
1181        ----------
1182        ni : int
1183            Amount of zonal grid points.
1184
1185        nj : int
1186            Amount of meridional grid points.
1187
1188        nt : int
1189            Number of time steps.
1190
1191        lsp_np : numpy array of float
1192            The large scale precipitation fields for each time step.
1193            Shape (ni * nj, nt).
1194
1195        cp_np : numpy array of float
1196            The convective precipitation fields for each time step.
1197            Shape (ni * nj, nt).
1198
1199        date_list : list of datetime
1200            The list of dates for which the disaggregation is to be done.
1201
1202        step_list : list of int
1203            The list of steps for a single forecast time.
1204            Only necessary for pure forecasts.
1205
1206        c : ControlFile
1207            Contains all the parameters of CONTROL file and
1208            command line.
1209
1210        Return
1211        ------
1212
1213        '''
1214        print('... disaggregation or precipitation with new method.')
1215        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1216        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1217
1218        # do the disaggregation, but neglect the last value of the
1219        # time series. This one corresponds for example to 24 hour,
1220        # which we don't need.
1221        for ix in range(ni*nj):
1222            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1223            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1224
1225        # write to grib files (full/orig times to flux file and inbetween
1226        # times into seperate end files)
1227        print('... write disaggregated precipitation to files.')
1228        it = 0
1229        for date in date_list:
1230            for step in step_list:
1231                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1232
1233                if c.purefc:
1234                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1235                                   '.{:0>3}'.format(step)
1236                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1237                                '.{:0>3}'.format(step) + '_1'
1238                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1239                                '.{:0>3}'.format(step) + '_2'
1240                else:
1241                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1242                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1243                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1244
1245                # collect for final processing
1246                self.outputfilelist.append(os.path.basename(fluxfilename))
1247                self.outputfilelist.append(os.path.basename(filename1))
1248                self.outputfilelist.append(os.path.basename(filename2))
1249
1250                # write original time step to flux file as usual
1251                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1252                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1253                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1254                                  keynames=['date','time','stepRange','values'],
1255                                  keyvalues=[int(date.strftime('%Y%m%d')),
1256                                             date.hour*100, step, lsp_new_np[:,it]],
1257                                 )
1258                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1259                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1260                                  keynames=['date','time','stepRange','values'],
1261                                  keyvalues=[int(date.strftime('%Y%m%d')),
1262                                             date.hour*100, step, cp_new_np[:,it]]
1263                                 )
1264
1265                # write first subgrid time step
1266                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1267                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1268                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1269                                  keynames=['date','time','stepRange','values'],
1270                                  keyvalues=[int(date.strftime('%Y%m%d')),
1271                                             date.hour*100, step, lsp_new_np[:,it+1]]
1272                                  )
1273                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1274                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1275                                  keynames=['date','time','stepRange','values'],
1276                                  keyvalues=[int(date.strftime('%Y%m%d')),
1277                                             date.hour*100, step, cp_new_np[:,it+1]]
1278                                 )
1279
1280                # write second subgrid time step
1281                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1282                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1283                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1284                                  keynames=['date','time','stepRange','values'],
1285                                  keyvalues=[int(date.strftime('%Y%m%d')),
1286                                             date.hour*100, step, lsp_new_np[:,it+2]]
1287                                 )
1288                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1289                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1290                                  keynames=['date','time','stepRange','values'],
1291                                  keyvalues=[int(date.strftime('%Y%m%d')),
1292                                             date.hour*100, step, cp_new_np[:,it+2]]
1293                                 )
1294                it = it + 3 # jump to next original time step
1295        return
1296
1297    def _create_rr_grib_dummy(self, ifile, inputdir):
1298        '''Creates a grib file with a dummy message for the two precipitation
1299        types lsp and cp each.
1300
1301        Parameters
1302        ----------
1303        ifile : str
1304            Filename of the input file to read the grib messages from.
1305
1306        inputdir : str, optional
1307            Path to the directory where the retrieved data is stored.
1308
1309        Return
1310        ------
1311
1312        '''
1313
1314        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1315
1316        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1317                      keyvalues=[142], filemode='w')
1318
1319        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1320                      keyvalues=[143], filemode='a')
1321
1322        return
1323
1324    def create(self, inputfiles, c):
1325        '''An index file will be created which depends on the combination
1326        of "date", "time" and "stepRange" values. This is used to iterate
1327        over all messages in each grib file which were passed through the
1328        parameter "inputfiles" to seperate specific parameters into fort.*
1329        files. Afterwards the FORTRAN program is called to convert
1330        the data fields all to the same grid and put them in one file
1331        per unique time step (combination of "date", "time" and
1332        "stepRange").
1333
1334        Note
1335        ----
1336        This method is based on the ECMWF example index.py
1337        https://software.ecmwf.int/wiki/display/GRIB/index.py
1338
1339        Parameters
1340        ----------
1341        inputfiles : UioFiles
1342            Contains a list of files.
1343
1344        c : ControlFile
1345            Contains all the parameters of CONTROL file and
1346            command line.
1347
1348        Return
1349        ------
1350
1351        '''
1352
1353        if c.wrf:
1354            table128 = init128(_config.PATH_GRIBTABLE)
1355            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1356                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1357                                  table128)
1358
1359        # these numbers are indices for the temporary files "fort.xx"
1360        # which are used to seperate the grib fields to,
1361        # for the Fortran program input
1362        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1363        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1364        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1365                 '17':None, '18':None, '19':None, '21':None, '22':None}
1366
1367        iid = None
1368        index_vals = None
1369
1370        # get the values of the keys which are used for distinct access
1371        # of grib messages via product
1372        index_keys = ["date", "time", "step"]
1373        iid, index_vals = self._mk_index_values(c.inputdir,
1374                                                inputfiles,
1375                                                index_keys)
1376        # index_vals looks like e.g.:
1377        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1378        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1379        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1380
1381        # "product" genereates each possible combination between the
1382        # values of the index keys
1383        for prod in product(*index_vals):
1384            # e.g. prod = ('20170505', '0', '12')
1385            #             (  date    ,time, step)
1386
1387            print('current product: ', prod)
1388
1389            for i in range(len(index_keys)):
1390                codes_index_select(iid, index_keys[i], prod[i])
1391
1392            # get first id from current product
1393            gid = codes_new_from_index(iid)
1394
1395            # if there is no data for this specific time combination / product
1396            # skip the rest of the for loop and start with next timestep/product
1397            if not gid:
1398                continue
1399#============================================================================================
1400            # remove old fort.* files and open new ones
1401            # they are just valid for a single product
1402            for k, f in fdict.iteritems():
1403                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1404                silent_remove(fortfile)
1405                fdict[k] = open(fortfile, 'w')
1406#============================================================================================
1407            # create correct timestamp from the three time informations
1408            cdate = str(codes_get(gid, 'date'))
1409            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1410            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1411            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1412            timestamp += timedelta(hours=int(cstep))
1413            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1414
1415            # if the timestamp is out of basetime start/end date period,
1416            # skip this specific product
1417            if c.basetime is not None:
1418                time_delta = timedelta(hours=12-int(c.dtime))
1419                start_time = datetime.strptime(c.end_date + str(c.basetime),
1420                                                '%Y%m%d%H') - time_delta
1421                end_time = datetime.strptime(c.end_date + str(c.basetime),
1422                                             '%Y%m%d%H')
1423                if timestamp < start_time or timestamp > end_time:
1424                    continue
1425
1426            if c.wrf:
1427                if 'olddate' not in locals() or cdate != olddate:
1428                    fwrf = open(os.path.join(c.outputdir,
1429                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1430                    olddate = cdate[:]
1431#============================================================================================
1432            # savedfields remembers which fields were already used.
1433            savedfields = []
1434            # sum of cloud liquid and ice water content
1435            scwc = None
1436            while 1:
1437                if not gid:
1438                    break
1439                paramId = codes_get(gid, 'paramId')
1440                gridtype = codes_get(gid, 'gridType')
1441                levtype = codes_get(gid, 'typeOfLevel')
1442                if paramId == 77: # ETADOT
1443                    codes_write(gid, fdict['21'])
1444                elif paramId == 130: # T
1445                    codes_write(gid, fdict['11'])
1446                elif paramId == 131 or paramId == 132: # U, V wind component
1447                    codes_write(gid, fdict['10'])
1448                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1449                    codes_write(gid, fdict['17'])
1450                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1451                    codes_write(gid, fdict['18'])
1452                elif paramId == 135: # W
1453                    codes_write(gid, fdict['19'])
1454                elif paramId == 152: # LNSP
1455                    codes_write(gid, fdict['12'])
1456                elif paramId == 155 and gridtype == 'sh': # D
1457                    codes_write(gid, fdict['13'])
1458                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1459                    # sum cloud liquid water and ice
1460                    if scwc is None:
1461                        scwc = codes_get_values(gid)
1462                    else:
1463                        scwc += codes_get_values(gid)
1464                        codes_set_values(gid, scwc)
1465                        codes_set(gid, 'paramId', 201031)
1466                        codes_write(gid, fdict['22'])
1467                        scwc = None
1468                elif c.wrf and paramId in [129, 138, 155] and \
1469                      levtype == 'hybrid': # Z, VO, D
1470                    # do not do anything right now
1471                    # these are specific parameter for WRF
1472                    pass
1473                else:
1474                    if paramId not in savedfields:
1475                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1476                        # and all ADDPAR parameter
1477                        codes_write(gid, fdict['16'])
1478                        savedfields.append(paramId)
1479                    else:
1480                        print('duplicate ' + str(paramId) + ' not written')
1481
1482                try:
1483                    if c.wrf:
1484                        # model layer
1485                        if levtype == 'hybrid' and \
1486                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1487                            codes_write(gid, fwrf)
1488                        # sfc layer
1489                        elif paramId in wrfpars:
1490                            codes_write(gid, fwrf)
1491                except AttributeError:
1492                    pass
1493
1494                codes_release(gid)
1495                gid = codes_new_from_index(iid)
1496#============================================================================================
1497            for f in fdict.values():
1498                f.close()
1499#============================================================================================
1500            # call for Fortran program to convert e.g. reduced_gg grids to
1501            # regular_ll and calculate detadot/dp
1502            pwd = os.getcwd()
1503            os.chdir(c.inputdir)
1504            if os.stat('fort.21').st_size == 0 and c.eta:
1505                print('Parameter 77 (etadot) is missing, most likely it is \
1506                       not available for this type or date/time\n')
1507                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1508                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1509                         is set to 1 in CONTROL file')
1510#============================================================================================
1511            # write out all output to log file before starting fortran programm
1512            sys.stdout.flush()
1513
1514            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1515            execute_subprocess([os.path.join(c.exedir,
1516                                _config.FORTRAN_EXECUTABLE)],
1517                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1518
1519            os.chdir(pwd)
1520#============================================================================================
1521            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1522            if c.purefc:
1523                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1524            else:
1525                suffix = cdate_hour[2:10]
1526            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1527            print("outputfile = " + fnout)
1528            # collect for final processing
1529            self.outputfilelist.append(os.path.basename(fnout))
1530#============================================================================================
1531            # create outputfile and copy all data from intermediate files
1532            # to the outputfile (final GRIB input files for FLEXPART)
1533            orolsm = os.path.basename(glob.glob(c.inputdir +
1534                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1535            fluxfile = 'flux' + cdate[0:2] + suffix
1536            if not c.cwc:
1537                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1538            else:
1539                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1540
1541            with open(fnout, 'wb') as fout:
1542                for f in flist:
1543                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1544                                       fout)
1545
1546            if c.omega:
1547                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1548                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1549                                            'rb'), fout)
1550#============================================================================================
1551        if c.wrf:
1552            fwrf.close()
1553
1554        codes_index_release(iid)
1555
1556        return
1557
1558
1559    def process_output(self, c):
1560        '''Postprocessing of FLEXPART input files.
1561
1562        The grib files are postprocessed depending on the selection in
1563        CONTROL file. The resulting files are moved to the output
1564        directory if its not equal to the input directory.
1565        The following modifications might be done if
1566        properly switched in CONTROL file:
1567        GRIB2 - Conversion to GRIB2
1568        ECTRANS - Transfer of files to gateway server
1569        ECSTORAGE - Storage at ECMWF server
1570
1571        Parameters
1572        ----------
1573        c : ControlFile
1574            Contains all the parameters of CONTROL file and
1575            command line.
1576
1577        Return
1578        ------
1579
1580        '''
1581
1582        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1583
1584        if not c.ecapi:
1585            print('ecstorage: {}\n ecfsdir: {}\n'.
1586                  format(c.ecstorage, c.ecfsdir))
1587            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1588                  .format(c.ectrans, c.gateway, c.destination))
1589
1590        print('Output filelist: ')
1591        print(self.outputfilelist)
1592
1593        for ofile in self.outputfilelist:
1594            ofile = os.path.join(self.inputdir, ofile)
1595
1596            if c.format.lower() == 'grib2':
1597                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1598                                    'productDefinitionTemplateNumber=8',
1599                                    ofile, ofile + '_2'],
1600                                   error_msg='GRIB2 CONVERSION FAILED!')
1601
1602                execute_subprocess(['mv', ofile + '_2', ofile],
1603                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1604                                   'FILES FAILED!')
1605
1606            if c.ectrans and not c.ecapi:
1607                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1608                                    c.gateway, '-remote', c.destination,
1609                                    '-source', ofile],
1610                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1611
1612            if c.ecstorage and not c.ecapi:
1613                execute_subprocess(['ecp', '-o', ofile,
1614                                    os.path.expandvars(c.ecfsdir)],
1615                                   error_msg='COPY OF FILES TO ECSTORAGE '
1616                                   'AREA FAILED!')
1617
1618            if c.outputdir != c.inputdir:
1619                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1620                                    c.outputdir],
1621                                   error_msg='RELOCATION OF OUTPUT FILES '
1622                                   'TO OUTPUTDIR FAILED!')
1623
1624        return
1625
1626
1627    def prepare_fp_files(self, c):
1628        '''Conversion of GRIB files to FLEXPART binary format.
1629
1630        Parameters
1631        ----------
1632        c : ControlFile
1633            Contains all the parameters of CONTROL file and
1634            command line.
1635
1636        Return
1637        ------
1638
1639        '''
1640        # generate AVAILABLE file
1641        # Example of AVAILABLE file data:
1642        # 20131107 000000      EN13110700              ON DISC
1643        clist = []
1644        for ofile in self.outputfilelist:
1645            fname = ofile.split('/')
1646            if '.' in fname[-1]:
1647                l = fname[-1].split('.')
1648                timestamp = datetime.strptime(l[0][-6:] + l[1],
1649                                              '%y%m%d%H')
1650                timestamp += timedelta(hours=int(l[2]))
1651                cdate = datetime.strftime(timestamp, '%Y%m%d')
1652                chms = datetime.strftime(timestamp, '%H%M%S')
1653            else:
1654                cdate = '20' + fname[-1][-8:-2]
1655                chms = fname[-1][-2:] + '0000'
1656            clist.append(cdate + ' ' + chms + ' '*6 +
1657                         fname[-1] + ' '*14 + 'ON DISC')
1658        clist.sort()
1659        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1660            f.write('\n'.join(clist) + '\n')
1661
1662        # generate pathnames file
1663        pwd = os.path.abspath(c.outputdir)
1664        with open(pwd + '/pathnames', 'w') as f:
1665            f.write(pwd + '/Options/\n')
1666            f.write(pwd + '/\n')
1667            f.write(pwd + '/\n')
1668            f.write(pwd + '/AVAILABLE\n')
1669            f.write(' = == = == = == = == = == ==  = \n')
1670
1671        # create Options dir if necessary
1672        if not os.path.exists(pwd + '/Options'):
1673            make_dir(pwd+'/Options')
1674
1675        # read template COMMAND file
1676        with open(os.path.expandvars(os.path.expanduser(
1677            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1678            lflist = f.read().split('\n')
1679
1680        # find index of list where to put in the
1681        # date and time information
1682        # usually after the LDIRECT parameter
1683        i = 0
1684        for l in lflist:
1685            if 'LDIRECT' in l.upper():
1686                break
1687            i += 1
1688
1689        # insert the date and time information of run start and end
1690        # into the list of lines of COMMAND file
1691        lflist = lflist[:i+1] + \
1692                 [clist[0][:16], clist[-1][:16]] + \
1693                 lflist[i+3:]
1694
1695        # write the new COMMAND file
1696        with open(pwd + '/Options/COMMAND', 'w') as g:
1697            g.write('\n'.join(lflist) + '\n')
1698
1699        # change to outputdir and start the grib2flexpart run
1700        # afterwards switch back to the working dir
1701        os.chdir(c.outputdir)
1702        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1703         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1704        execute_subprocess(cmd)
1705        os.chdir(pwd)
1706
1707        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG