source: flex_extract.git/source/python/classes/EcFlexpart.py @ d727af2

ctbtodev
Last change on this file since d727af2 was d727af2, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

updated rrint processes (added missing rr for ensembles and wrote all precip fields into the output files with different steps to distingush)

  • Property mode set to 100644
File size: 78.5 KB
RevLine 
[79729d5]1#!/usr/bin/env python3
[efdb01a]2# -*- coding: utf-8 -*-
[991df6a]3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
[ff99eae]13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
[991df6a]15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
[ff99eae]32#        - removed function getFlexpartTime in class EcFlexpart
[991df6a]33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
[ff99eae]35#        - changed class name from EIFlexpart to EcFlexpart
[991df6a]36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
[27fe969]39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
[991df6a]41#
42# @License:
[6f951ca]43#    (C) Copyright 2014-2019.
44#    Anne Philipp, Leopold Haimberger
[ff99eae]45#
[6f951ca]46#    This work is licensed under the Creative Commons Attribution 4.0
47#    International License. To view a copy of this license, visit
48#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
49#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
[991df6a]50#*******************************************************************************
[ff99eae]51#pylint: disable=unsupported-assignment-operation
[25b14be]52# this is disabled because for this specific case its an error in pylint
[ff99eae]53#pylint: disable=consider-using-enumerate
54# this is not useful in this case
[efdb01a]55# ------------------------------------------------------------------------------
56# MODULES
57# ------------------------------------------------------------------------------
58import os
[ca867de]59import sys
[efdb01a]60import glob
[ca867de]61import shutil
62import subprocess
[ff99eae]63from datetime import datetime, timedelta
64import numpy as np
[aa275fc]65
[2e62398]66from eccodes import (codes_index_select, codes_new_from_index, codes_get,
67                     codes_get_values, codes_set_values, codes_set,
68                     codes_write, codes_release, codes_new_from_index,
[45b99e6]69                     codes_index_release, codes_index_get, codes_get_array,
70                     codes_set_array, codes_grib_new_from_file)
[991df6a]71
72# software specific classes and modules from flex_extract
[ca867de]73sys.path.append('../')
[2fb99de]74import _config
[d727af2]75from classes.GribUtil import GribUtil
[5bad6ec]76from mods.tools import (init128, to_param_id, silent_remove, product,
[bf48c8a]77                        my_error, make_dir, get_informations, get_dimensions,
[d727af2]78                        execute_subprocess, to_param_id_with_tablenumber,
79                        generate_retrieval_period_boundary)
80from classes.MarsRetrieval import MarsRetrieval
81from classes.UioFiles import UioFiles
[ca867de]82import mods.disaggregation as disaggregation
[991df6a]83
[efdb01a]84# ------------------------------------------------------------------------------
85# CLASS
86# ------------------------------------------------------------------------------
[ff99eae]87class EcFlexpart(object):
[efdb01a]88    '''
[6f951ca]89    Class to represent FLEXPART specific ECMWF data.
90
91    FLEXPART needs grib files in a specifc format. All necessary data fields
92    for one time step are stored in a single file. The class represents an
93    instance with all the parameter and settings necessary for retrieving
94    MARS data and modifing them so they are fitting FLEXPART needs. The class
95    is able to disaggregate the fluxes and convert grid types to the one needed
96    by FLEXPART, therefore using the FORTRAN program.
97
98    Attributes
99    ----------
100    mreq_count : int
101        Counter for the number of generated mars requests.
102
103    inputdir : str
104        Path to the directory where the retrieved data is stored.
105
106    dataset : str
107        For public datasets there is the specific naming and parameter
108        dataset which has to be used to characterize the type of
109        data.
110
[d4696e0]111    basetime : int
[6f951ca]112        The time for a half day retrieval. The 12 hours upfront are to be
113        retrieved.
114
115    dtime : str
116        Time step in hours.
117
118    acctype : str
119        The field type for the accumulated forecast fields.
120
121    acctime : str
122        The starting time from the accumulated forecasts.
123
124    accmaxstep : str
125        The maximum forecast step for the accumulated forecast fields.
126
127    marsclass : str
128        Characterisation of dataset.
129
130    stream : str
131        Identifies the forecasting system used to generate the data.
132
133    number : str
134        Selects the member in ensemble forecast run.
135
136    resol : str
137        Specifies the desired triangular truncation of retrieved data,
138        before carrying out any other selected post-processing.
139
140    accuracy : str
141        Specifies the number of bits per value to be used in the
142        generated GRIB coded fields.
143
144    addpar : str
145        List of additional parameters to be retrieved.
146
147    level : str
148        Specifies the maximum level.
149
150    expver : str
151        The version of the dataset.
152
153    levelist : str
154        Specifies the required levels.
155
156    glevelist : str
157        Specifies the required levels for gaussian grids.
158
159    gaussian : str
160        This parameter is deprecated and should no longer be used.
161        Specifies the desired type of Gaussian grid for the output.
162
163    grid : str
164        Specifies the output grid which can be either a Gaussian grid
165        or a Latitude/Longitude grid.
166
167    area : str
168        Specifies the desired sub-area of data to be extracted.
169
170    purefc : int
171        Switch for definition of pure forecast mode or not.
172
173    outputfilelist : list of str
174        The final list of FLEXPART ready input files.
175
176    types : dictionary
177        Determines the combination of type of fields, time and forecast step
178        to be retrieved.
179
180    params : dictionary
181        Collection of grid types and their corresponding parameters,
182        levels, level types and the grid definition.
183
184    server : ECMWFService or ECMWFDataServer
185        This is the connection to the ECMWF data servers.
186
187    public : int
188        Decides which Web API Server version is used.
189
190    dates : str
191        Contains start and end date of the retrieval in the format
192        "YYYYMMDD/to/YYYYMMDD"
[efdb01a]193    '''
[6f951ca]194
[efdb01a]195    # --------------------------------------------------------------------------
196    # CLASS FUNCTIONS
197    # --------------------------------------------------------------------------
[991df6a]198    def __init__(self, c, fluxes=False):
[274f9ef]199        '''Creates an object/instance of EcFlexpart with the associated
200        settings of its attributes for the retrieval.
201
202        Parameters:
203        -----------
[6f951ca]204        c : ControlFile
[274f9ef]205            Contains all the parameters of CONTROL file and
206            command line.
207
[6f951ca]208        fluxes : boolean, optional
[274f9ef]209            Decides if the flux parameter settings are stored or
210            the rest of the parameter list.
211            Default value is False.
212
213        Return
214        ------
215
[efdb01a]216        '''
[092aaf1]217        # set a counter for the number of generated mars requests
[295ff45]218        self.mreq_count = 0
[efdb01a]219
220        self.inputdir = c.inputdir
[5bad6ec]221        self.dataset = c.dataset
[efdb01a]222        self.basetime = c.basetime
223        self.dtime = c.dtime
[092aaf1]224        self.acctype = c.acctype
225        self.acctime = c.acctime
226        self.accmaxstep = c.accmaxstep
227        self.marsclass = c.marsclass
228        self.stream = c.stream
229        self.number = c.number
230        self.resol = c.resol
231        self.accuracy = c.accuracy
232        self.addpar = c.addpar
233        self.level = c.level
234        self.expver = c.expver
235        self.levelist = c.levelist
236        self.glevelist = '1/to/' + c.level # in case of gaussian grid
237        self.gaussian = c.gaussian
238        self.grid = c.grid
239        self.area = c.area
240        self.purefc = c.purefc
241        self.outputfilelist = []
242
243        # Define the different types of field combinations (type, time, step)
244        self.types = {}
245        # Define the parameters and their level types, level list and
246        # grid resolution for the retrieval job
247        self.params = {}
248
249        if fluxes:
250            self._create_params_fluxes()
251        else:
252            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
253
[a4531f1]254        if fluxes and not c.purefc:
[092aaf1]255            self._create_field_types_fluxes()
[efdb01a]256        else:
[092aaf1]257            self._create_field_types(c.type, c.time, c.step)
258        return
259
260    def _create_field_types(self, ftype, ftime, fstep):
261        '''Create the combination of field type, time and forecast step.
262
263        Parameters:
264        -----------
[6f951ca]265        ftype : list of str
[092aaf1]266            List of field types.
267
[6f951ca]268        ftime : list of str
[092aaf1]269            The time in hours of the field.
270
[6f951ca]271        fstep : str
[092aaf1]272            Specifies the forecast time step from forecast base time.
273            Valid values are hours (HH) from forecast base time.
274
275        Return
276        ------
277
278        '''
279        i = 0
280        for ty, st, ti in zip(ftype, fstep, ftime):
[79729d5]281            btlist = list(range(len(ftime)))
[d4696e0]282            if self.basetime == 12:
[092aaf1]283                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
[d4696e0]284            if self.basetime == 0:
[092aaf1]285                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
286
287            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
288                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
289                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
290                 # (int(c.time[i]) in btlist or c.purefc):
291
[d4696e0]292            if (i in btlist) or self.purefc:
[092aaf1]293
294                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
295                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
[efdb01a]296
297                    if ty not in self.types.keys():
298                        self.types[ty] = {'times': '', 'steps': ''}
299
300                    if ti not in self.types[ty]['times']:
[ff99eae]301                        if self.types[ty]['times']:
[efdb01a]302                            self.types[ty]['times'] += '/'
303                        self.types[ty]['times'] += ti
304
305                    if st not in self.types[ty]['steps']:
[ff99eae]306                        if self.types[ty]['steps']:
[efdb01a]307                            self.types[ty]['steps'] += '/'
308                        self.types[ty]['steps'] += st
[092aaf1]309            i += 1
[d4696e0]310
[092aaf1]311        return
[991df6a]312
[092aaf1]313    def _create_field_types_fluxes(self):
314        '''Create the combination of field type, time and forecast step
315        for the flux data.
[efdb01a]316
[092aaf1]317        Parameters:
318        -----------
[efdb01a]319
[092aaf1]320        Return
321        ------
[efdb01a]322
[092aaf1]323        '''
324        self.types[str(self.acctype)] = {'times': str(self.acctime),
325                                         'steps': '{}/to/{}/by/{}'.format(
326                                             self.dtime,
327                                             self.accmaxstep,
328                                             self.dtime)}
329        return
[efdb01a]330
[092aaf1]331    def _create_params(self, gauss, eta, omega, cwc, wrf):
332        '''Define the specific parameter settings for retrievment.
333
334        The different parameters need specific grid types and level types
335        for retrievement. We might get following combination of types
336        (depending on selection and availability):
337        (These are short cuts for the grib file names (leading sequence)
338        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
339        where:
340            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
341            ML = Model Level, SL = Surface Level
342
343        For each of this combination there is a list of parameter names,
344        the level type, the level list and the grid resolution.
345
346        There are different scenarios for data extraction from MARS:
347        1) Retrieval of etadot
348           eta=1, gauss=0, omega=0
349        2) Calculation of etadot from divergence
350           eta=0, gauss=1, omega=0
351        3) Calculation of etadot from omega (for makes sense for debugging)
352           eta=0, gauss=0, omega=1
353        4) Retrieval and Calculation of etadot (only for debugging)
354           eta=1, gauss=1, omega=0
355        5) Download also specific model and surface level data for FLEXPART-WRF
[ff99eae]356
[092aaf1]357        Parameters:
358        -----------
[6f951ca]359        gauss : int
[092aaf1]360            Gaussian grid is retrieved.
361
[6f951ca]362        eta : int
[092aaf1]363            Etadot parameter will be directly retrieved.
364
[6f951ca]365        omega : int
[092aaf1]366            The omega paramterwill be retrieved.
367
[6f951ca]368        cwc : int
[092aaf1]369            The cloud liquid and ice water content will be retrieved.
[efdb01a]370
[6f951ca]371        wrf : int
[092aaf1]372            Additional model level and surface level data will be retrieved for
373            WRF/FLEXPART-WRF simulations.
374
375        Return
376        ------
377
378        '''
379        # SURFACE FIELDS
380        #-----------------------------------------------------------------------
381        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
382        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
383                                 'SFC', '1', self.grid]
384        if self.addpar:
385            self.params['OG__SL'][0] += self.addpar
386
387        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
388            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
389                                            'SFC', '1', self.grid]
390        else:
391            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
392                                            'SFC', '1', self.grid]
393
394        # MODEL LEVEL FIELDS
395        #-----------------------------------------------------------------------
396        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
397
398        if not gauss and eta:
399            self.params['OG__ML'][0] += '/U/V/ETADOT'
400        elif gauss and not eta:
[79729d5]401            self.params['GG__SL'] = ['Q', 'ML', '1',
402                                     '{}'.format((int(self.resol) + 1) // 2)]
[092aaf1]403            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
404        elif not gauss and not eta:
405            self.params['OG__ML'][0] += '/U/V'
[f20af73]406        else:  # GAUSS and ETA
407            print('Warning: Collecting etadot and parameters for gaussian grid '
408                           'is a very costly parameter combination, '
409                           'use this combination only for debugging!')
410            self.params['GG__SL'] = ['Q', 'ML', '1',
[79729d5]411                                     '{}'.format((int(self.resol) + 1) // 2)]
[f20af73]412            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
[79729d5]413                                     '{}'.format((int(self.resol) + 1) // 2)]
[092aaf1]414
415        if omega:
416            self.params['OG__ML'][0] += '/W'
417
418        if cwc:
419            self.params['OG__ML'][0] += '/CLWC/CIWC'
420
421        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
[f20af73]422        # -----------------------------------------------------------------------
[092aaf1]423        if wrf:
424            self.params['OG__ML'][0] += '/Z/VO'
425            if '/D' not in self.params['OG__ML'][0]:
426                self.params['OG__ML'][0] += '/D'
[f20af73]427
[092aaf1]428            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
429                       'SWVL1','SWVL2','SWVL3','SWVL4']
430            for par in wrf_sfc:
431                if par not in self.params['OG__SL'][0]:
432                    self.params['OG__SL'][0] += '/' + par
433
434        return
435
436
437    def _create_params_fluxes(self):
438        '''Define the parameter setting for flux data.
439
440        Flux data are accumulated fields in time and are stored on the
441        surface level. The leading short cut name for the grib files is:
442        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
443        acc for Accumulated Grid.
444        The params dictionary stores a list of parameter names, the level type,
445        the level list and the grid resolution.
446
447        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
448
449        Parameters:
450        -----------
451
452        Return
453        ------
[efdb01a]454
[092aaf1]455        '''
[f20af73]456        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
[092aaf1]457                                    'SFC', '1', self.grid]
[efdb01a]458        return
459
460
[27fe969]461    def _mk_targetname(self, ftype, param, date):
[274f9ef]462        '''Creates the filename for the requested grib data to be stored in.
463        This name is passed as the "target" parameter in the request.
[27fe969]464
[274f9ef]465        Parameters
466        ----------
[6f951ca]467        ftype : str
[274f9ef]468            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
[27fe969]469
[6f951ca]470        param : str
[274f9ef]471            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
472            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
[27fe969]473
[6f951ca]474        date : str
[274f9ef]475            The date period of the grib data to be stored in this file.
[27fe969]476
[274f9ef]477        Return
478        ------
[6f951ca]479        targetname : str
[274f9ef]480            The target filename for the grib data.
[27fe969]481        '''
482        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
483                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
484
485        return targetname
486
487
[295ff45]488    def _start_retrievement(self, request, par_dict):
[274f9ef]489        '''Creates the Mars Retrieval and prints or submits the request
490        depending on the status of the request variable.
491
492        Parameters
493        ----------
[6f951ca]494        request : int
[274f9ef]495            Selects the mode of retrieval.
496            0: Retrieves the data from ECMWF.
497            1: Prints the mars requests to an output file.
498            2: Retrieves the data and prints the mars request.
499
[6f951ca]500        par_dict : dictionary
[274f9ef]501            Contains all parameter which have to be set for creating the
502            Mars Retrievals. The parameter are:
503            marsclass, dataset, stream, type, levtype, levelist, resol,
504            gaussian, accuracy, grid, target, area, date, time, number,
505            step, expver, param
506
507        Return
508        ------
509
[295ff45]510        '''
511        # increase number of mars requests
512        self.mreq_count += 1
513
514        MR = MarsRetrieval(self.server,
[5bad6ec]515                           self.public,
[295ff45]516                           marsclass=par_dict['marsclass'],
[5bad6ec]517                           dataset=par_dict['dataset'],
[295ff45]518                           stream=par_dict['stream'],
519                           type=par_dict['type'],
520                           levtype=par_dict['levtype'],
521                           levelist=par_dict['levelist'],
522                           resol=par_dict['resol'],
523                           gaussian=par_dict['gaussian'],
524                           accuracy=par_dict['accuracy'],
525                           grid=par_dict['grid'],
526                           target=par_dict['target'],
527                           area=par_dict['area'],
528                           date=par_dict['date'],
529                           time=par_dict['time'],
530                           number=par_dict['number'],
531                           step=par_dict['step'],
532                           expver=par_dict['expver'],
533                           param=par_dict['param'])
534
535        if request == 0:
536            MR.display_info()
537            MR.data_retrieve()
538        elif request == 1:
539            MR.print_infodata_csv(self.inputdir, self.mreq_count)
540        elif request == 2:
541            MR.print_infodata_csv(self.inputdir, self.mreq_count)
542            MR.display_info()
543            MR.data_retrieve()
544        else:
545            print('Failure')
546
547        return
548
549
[ca867de]550    def _mk_index_values(self, inputdir, inputfiles, keys):
[274f9ef]551        '''Creates an index file for a set of grib parameter keys.
552        The values from the index keys are returned in a list.
553
554        Parameters
555        ----------
[6f951ca]556        keys : dictionary
[274f9ef]557            List of parameter names which serves as index.
558
[6f951ca]559        inputfiles : UioFiles
[274f9ef]560            Contains a list of files.
561
562        Return
563        ------
[6f951ca]564        iid : codes_index
[274f9ef]565            This is a grib specific index structure to access
566            messages in a file.
567
[6f951ca]568        index_vals : list of list  of str
[274f9ef]569            Contains the values from the keys used for a distinct selection
570            of grib messages in processing  the grib files.
571            Content looks like e.g.:
572            index_vals[0]: ('20171106', '20171107', '20171108') ; date
573            index_vals[1]: ('0', '1200', '1800', '600') ; time
574            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
[ca867de]575        '''
576        iid = None
577        index_keys = keys
578
579        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
580        silent_remove(indexfile)
[092aaf1]581        grib = GribUtil(inputfiles.files)
[ca867de]582        # creates new index file
583        iid = grib.index(index_keys=index_keys, index_file=indexfile)
584
585        # read the values of index keys
586        index_vals = []
587        for key in index_keys:
[2e62398]588            key_vals = codes_index_get(iid, key)
[d727af2]589            # have to sort the key values for correct order,
[ca867de]590            # therefore convert to int first
[d43706b]591            key_vals = [int(k) for k in key_vals]
592            key_vals.sort()
593            key_vals = [str(k) for k in key_vals]
[ca867de]594            index_vals.append(key_vals)
595            # index_vals looks for example like:
596            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
597            # index_vals[1]: ('0', '1200') ; time
598            # index_vals[2]: (3', '6', '9', '12') ; stepRange
599
600        return iid, index_vals
601
602
[5bad6ec]603    def retrieve(self, server, dates, public, request, inputdir='.'):
[274f9ef]604        '''Finalizing the retrieval information by setting final details
605        depending on grid type.
606        Prepares MARS retrievals per grid type and submits them.
607
608        Parameters
609        ----------
[6f951ca]610        server : ECMWFService or ECMWFDataServer
[274f9ef]611            The connection to the ECMWF server. This is different
612            for member state users which have full access and non
613            member state users which have only access to the public
614            data sets. The decision is made from command line argument
615            "public"; for public access its True (ECMWFDataServer)
616            for member state users its False (ECMWFService)
617
[6f951ca]618        dates : str
[274f9ef]619            Contains start and end date of the retrieval in the format
620            "YYYYMMDD/to/YYYYMMDD"
621
[6f951ca]622        request : int
[274f9ef]623            Selects the mode of retrieval.
624            0: Retrieves the data from ECMWF.
625            1: Prints the mars requests to an output file.
626            2: Retrieves the data and prints the mars request.
627
[6f951ca]628        inputdir : str, optional
[274f9ef]629            Path to the directory where the retrieved data is about
630            to be stored. The default is the current directory ('.').
631
632        Return
633        ------
634
[efdb01a]635        '''
636        self.dates = dates
637        self.server = server
[5bad6ec]638        self.public = public
[991df6a]639        self.inputdir = inputdir
[efdb01a]640        oro = False
[991df6a]641
[65748f4]642        # define times with datetime module
[295ff45]643        t12h = timedelta(hours=12)
644        t24h = timedelta(hours=24)
645
[ca867de]646        # dictionary which contains all parameter for the mars request,
[65748f4]647        # entries with a "None" will change in different requests and will
648        # therefore be set in each request seperately
[295ff45]649        retr_param_dict = {'marsclass':self.marsclass,
[5bad6ec]650                           'dataset':self.dataset,
[295ff45]651                           'stream':None,
652                           'type':None,
653                           'levtype':None,
654                           'levelist':None,
655                           'resol':self.resol,
656                           'gaussian':None,
657                           'accuracy':self.accuracy,
658                           'grid':None,
659                           'target':None,
660                           'area':None,
661                           'date':None,
662                           'time':None,
663                           'number':self.number,
664                           'step':None,
665                           'expver':self.expver,
666                           'param':None}
667
[efdb01a]668        for ftype in self.types:
[45b99e6]669            # ftype contains field types such as
[295ff45]670            #     [AN, FC, PF, CV]
[79729d5]671            for pk, pv in self.params.items():
[295ff45]672                # pk contains one of these keys of params
673                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
674                #      OG_OROLSM_SL, OG_acc_SL]
675                # pv contains all of the items of the belonging key
676                #     [param, levtype, levelist, grid]
[efdb01a]677                if isinstance(pv, str):
678                    continue
[70fee58]679                retr_param_dict['type'] = ftype
[295ff45]680                retr_param_dict['time'] = self.types[ftype]['times']
681                retr_param_dict['step'] = self.types[ftype]['steps']
682                retr_param_dict['date'] = self.dates
683                retr_param_dict['stream'] = self.stream
[65748f4]684                retr_param_dict['target'] = \
[70fee58]685                    self._mk_targetname(ftype,
686                                        pk,
[65748f4]687                                        retr_param_dict['date'].split('/')[0])
[f20af73]688                table128 = init128(_config.PATH_GRIBTABLE)
689                ids = to_param_id_with_tablenumber(pv[0], table128)
690                retr_param_dict['param'] = ids
[295ff45]691                retr_param_dict['levtype'] = pv[1]
692                retr_param_dict['levelist'] = pv[2]
693                retr_param_dict['grid'] = pv[3]
694                retr_param_dict['area'] = self.area
695                retr_param_dict['gaussian'] = self.gaussian
696
697                if pk == 'OG_OROLSM__SL' and not oro:
698                    oro = True
[0e576fc]699                    # in CERA20C (class EP) there is no stream "OPER"!
700                    if self.marsclass.upper() != 'EP':
701                        retr_param_dict['stream'] = 'OPER'
[295ff45]702                    retr_param_dict['type'] = 'AN'
703                    retr_param_dict['time'] = '00'
704                    retr_param_dict['step'] = '000'
705                    retr_param_dict['date'] = self.dates.split('/')[0]
706                    retr_param_dict['target'] = self._mk_targetname('',
707                                            pk, retr_param_dict['date'])
708                elif pk == 'OG_OROLSM__SL' and oro:
709                    continue
[efdb01a]710                if pk == 'GG__SL' and pv[0] == 'Q':
[295ff45]711                    retr_param_dict['area'] = ""
712                    retr_param_dict['gaussian'] = 'reduced'
[45b99e6]713                if ftype.upper() == 'FC' and \
714                    'acc' not in retr_param_dict['target']:
715                    if (int(retr_param_dict['time'][0]) +
716                        int(retr_param_dict['step'][0])) > 23:
717                        dates = retr_param_dict['date'].split('/')
718                        sdate = datetime.strptime(dates[0], '%Y%m%d%H')
719                        sdate = sdate - timedelta(days=1)
720                        retr_param_dict['date'] = '/'.join(
721                            [sdate.strftime("%Y%m%d")] +
722                            retr_param_dict['date'][1:])
723
724                        print('CHANGED FC start date to ' +
725                              sdate.strftime("%Y%m%d") +
726                              ' to accomodate TIME=' +
727                              retr_param_dict['time'][0] +
728                              ', STEP=' +
729                              retr_param_dict['time'][0])
[efdb01a]730
[991df6a]731    # ------  on demand path  --------------------------------------------------
[d4696e0]732                if self.basetime is None:
[65748f4]733                    # ******* start retrievement
[295ff45]734                    self._start_retrievement(request, retr_param_dict)
[991df6a]735    # ------  operational path  ------------------------------------------------
[efdb01a]736                else:
737                    # check if mars job requests fields beyond basetime.
[65748f4]738                    # if yes eliminate those fields since they may not
[efdb01a]739                    # be accessible with user's credentials
[991df6a]740
[65748f4]741                    enddate = retr_param_dict['date'].split('/')[-1]
[d4696e0]742                    elimit = datetime.strptime(enddate + str(self.basetime),
[65748f4]743                                               '%Y%m%d%H')
[efdb01a]744
[d4696e0]745                    if self.basetime == 12:
[991df6a]746                        # --------------  flux data ----------------------------
[efdb01a]747                        if 'acc' in pk:
[295ff45]748                            startdate = retr_param_dict['date'].split('/')[0]
749                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
[65748f4]750                            retr_param_dict['date'] = '/'.join([startdate,
751                                                                'to',
752                                                                enddate])
[295ff45]753
[65748f4]754                            # ******* start retrievement
[295ff45]755                            self._start_retrievement(request, retr_param_dict)
756
757                            retr_param_dict['date'] = \
758                                datetime.strftime(elimit - t12h, '%Y%m%d')
759                            retr_param_dict['time'] = '00'
760                            retr_param_dict['target'] = \
761                                self._mk_targetname(ftype, pk,
762                                                    retr_param_dict['date'])
763
[65748f4]764                            # ******* start retrievement
[295ff45]765                            self._start_retrievement(request, retr_param_dict)
766
[991df6a]767                        # --------------  non flux data ------------------------
[efdb01a]768                        else:
[65748f4]769                            # ******* start retrievement
[295ff45]770                            self._start_retrievement(request, retr_param_dict)
771
[d4696e0]772                    elif self.basetime == 0:
[295ff45]773                        retr_param_dict['date'] = \
774                            datetime.strftime(elimit - t24h, '%Y%m%d')
775
776                        timesave = ''.join(retr_param_dict['time'])
777
[d4696e0]778                        if ('/' in retr_param_dict['time'] and
779                            pk != 'OG_OROLSM__SL' and
780                            'acc' not in pk ) :
[295ff45]781                            times = retr_param_dict['time'].split('/')
782                            steps = retr_param_dict['step'].split('/')
[45b99e6]783
[d4696e0]784                            while int(times[0]) + int(steps[0]) <= 12:
[efdb01a]785                                times = times[1:]
[d4696e0]786                                if len(times) > 1:
787                                    retr_param_dict['time'] = '/'.join(times)
788                                else:
789                                    retr_param_dict['time'] = times[0]
[295ff45]790
791                        if (pk != 'OG_OROLSM__SL' and
792                            int(retr_param_dict['step'].split('/')[0]) == 0 and
793                            int(timesave.split('/')[0]) == 0):
794
795                            retr_param_dict['date'] = \
796                                datetime.strftime(elimit, '%Y%m%d')
797                            retr_param_dict['time'] = '00'
798                            retr_param_dict['step'] = '000'
799                            retr_param_dict['target'] = \
800                                self._mk_targetname(ftype, pk,
801                                                    retr_param_dict['date'])
802
[d4696e0]803                        # ******* start retrievement
804                        self._start_retrievement(request, retr_param_dict)
805                    else:
806                        raise ValueError('ERROR: Basetime has an invalid value '
807                                                 '-> {}'.format(str(basetime)))
[ff99eae]808
[2fb99de]809        if request == 0 or request == 2:
810            print('MARS retrieve done ... ')
811        elif request == 1:
812            print('MARS request printed ...')
[efdb01a]813
814        return
815
816
[c5074d2]817    def write_namelist(self, c):
[274f9ef]818        '''Creates a namelist file in the temporary directory and writes
819        the following values to it: maxl, maxb, mlevel,
820        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
821        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
822
823        Parameters
824        ----------
[6f951ca]825        c : ControlFile
[274f9ef]826            Contains all the parameters of CONTROL file and
827            command line.
828
[6f951ca]829        filename : str
[27fe969]830                Name of the namelist file.
831
[274f9ef]832        Return
833        ------
834
[efdb01a]835        '''
836
[c5074d2]837        from genshi.template.text import NewTextTemplate
838        from genshi.template import  TemplateLoader
[3f36e42]839        from genshi.template.eval import  UndefinedError
840
841        try:
842            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
843            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
844                                            cls=NewTextTemplate)
845
846            self.inputdir = c.inputdir
847            area = np.asarray(self.area.split('/')).astype(float)
848            grid = np.asarray(self.grid.split('/')).astype(float)
849
850            if area[1] > area[3]:
851                area[1] -= 360
852            maxl = int((area[3] - area[1]) / grid[1]) + 1
853            maxb = int((area[0] - area[2]) / grid[0]) + 1
854
855            stream = namelist_template.generate(
856                maxl = str(maxl),
857                maxb = str(maxb),
858                mlevel = str(self.level),
859                mlevelist = str(self.levelist),
860                mnauf = str(self.resol),
861                metapar = '77',
862                rlo0 = str(area[1]),
863                rlo1 = str(area[3]),
864                rla0 = str(area[2]),
865                rla1 = str(area[0]),
866                momega = str(c.omega),
867                momegadiff = str(c.omegadiff),
868                mgauss = str(c.gauss),
869                msmooth = str(c.smooth),
870                meta = str(c.eta),
871                metadiff = str(c.etadiff),
872                mdpdeta = str(c.dpdeta)
873            )
874        except UndefinedError as e:
875            print('... ERROR ' + str(e))
876
877            sys.exit('\n... error occured while trying to generate namelist ' +
878                     _config.TEMPFILE_NAMELIST)
879        except OSError as e:
880            print('... ERROR CODE: ' + str(e.errno))
881            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
882
883            sys.exit('\n... error occured while trying to generate template ' +
884                     _config.TEMPFILE_NAMELIST)
885
886        try:
887            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
888
889            with open(namelistfile, 'w') as f:
890                f.write(stream.render('text'))
891        except OSError as e:
892            print('... ERROR CODE: ' + str(e.errno))
893            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
894
895            sys.exit('\n... error occured while trying to write ' +
896                     namelistfile)
[efdb01a]897
898        return
899
[27fe969]900
901    def deacc_fluxes(self, inputfiles, c):
[3f36e42]902        '''De-accumulate and disaggregate flux data.
903
904        Goes through all flux fields in ordered time and de-accumulate
[274f9ef]905        the fields. Afterwards the fields are disaggregated in time.
906        Different versions of disaggregation is provided for rainfall
907        data (darain, modified linear) and the surface fluxes and
908        stress data (dapoly, cubic polynomial).
909
910        Parameters
911        ----------
[6f951ca]912        inputfiles : UioFiles
[c274d9a]913            Contains the list of files that contain flux data.
[274f9ef]914
[6f951ca]915        c : ControlFile
[274f9ef]916            Contains all the parameters of CONTROL file and
917            command line.
918
919        Return
920        ------
921
[efdb01a]922        '''
923
[dda0e9d]924        table128 = init128(_config.PATH_GRIBTABLE)
[d727af2]925        # get ids from the flux parameter names
[27fe969]926        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
[efdb01a]927
[ca867de]928        iid = None
929        index_vals = None
930
931        # get the values of the keys which are used for distinct access
[d727af2]932        # of grib messages via product and save the maximum number of
933        # ensemble members if there is more than one
[45b99e6]934        if '/' in self.number:
935            # more than one ensemble member is selected
936            index_keys = ["number", "date", "time", "step"]
[d727af2]937            # maximum ensemble number retrieved
938            # + 1 for the control run (ensemble number 0)
939            maxnum = int(self.number.split('/')[-1]) + 1
940            # remember the index of the number values
941            index_number = index_keys.index('number')
942            # empty set to save ensemble numbers which were already processed
943            ens_numbers = set()
944            # index for the ensemble number
945            inumb = 0
[45b99e6]946        else:
947            index_keys = ["date", "time", "step"]
[d727af2]948            # maximum ensemble number
949            maxnum = None
950
951        # get sorted lists of the index values
952        # this is very important for disaggregating
953        # the flux data in correct order
[ca867de]954        iid, index_vals = self._mk_index_values(c.inputdir,
955                                                inputfiles,
956                                                index_keys)
957        # index_vals looks like e.g.:
958        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
[092aaf1]959        # index_vals[1]: ('0', '600', '1200', '1800') ; time
[6f951ca]960        # index_vals[2]: ('0', '3', '6', '9', '12') ; stepRange
[efdb01a]961
[c274d9a]962        if c.rrint:
[d727af2]963            # set start and end timestamps for retrieval period
[092aaf1]964            if not c.purefc:
965                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
966                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
967            else:
968                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
969                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
970                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
971                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
972                end_date = end_date + timedelta(hours=c.maxstep)
973
[d727af2]974            # get necessary grid dimensions from grib files for storing the
975            # precipitation fields
[c274d9a]976            info = get_informations(os.path.join(c.inputdir,
977                                                 inputfiles.files[0]))
[092aaf1]978            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
979                                  start_date, end_date)
[d727af2]980
981            # create empty numpy arrays
982            if not maxnum:
983                lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
984                cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
985            else:
986                lsp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
987                cp_np = np.zeros((maxnum, dims[1] * dims[0], dims[2]), dtype=np.float64)
988
989            # index counter for time line
[c274d9a]990            it_lsp = 0
991            it_cp = 0
[d727af2]992
993            # store the order of date and step
[092aaf1]994            date_list = []
995            step_list = []
[c274d9a]996
[d727af2]997        # initialize dictionaries to store flux values per parameter
[c274d9a]998        orig_vals = {}
999        deac_vals = {}
[27fe969]1000        for p in pars:
[c274d9a]1001            orig_vals[p] = []
1002            deac_vals[p] = []
[efdb01a]1003
[ca867de]1004        # "product" genereates each possible combination between the
1005        # values of the index keys
[efdb01a]1006        for prod in product(*index_vals):
[2fb99de]1007            # e.g. prod = ('20170505', '0', '12')
[d727af2]1008            #             ( date     ,time, step)
[ca867de]1009
[d43706b]1010            print('CURRENT PRODUCT: ', prod)
[ca867de]1011
[d727af2]1012            # the whole process has to be done for each seperate ensemble member
1013            # therefore, for each new ensemble member we delete old flux values
1014            # and start collecting flux data from the beginning time step
1015            if maxnum and prod[index_number] not in ens_numbers:
1016                ens_numbers.add(prod[index_number])
1017                inumb = len(ens_numbers) - 1
1018                # re-initialize dictionaries to store flux values per parameter
1019                # for the next ensemble member
1020                it_lsp = 0
1021                it_cp = 0
1022                orig_vals = {}
1023                deac_vals = {}
1024                for p in pars:
1025                    orig_vals[p] = []
1026                    deac_vals[p] = []
1027
[efdb01a]1028            for i in range(len(index_keys)):
[2e62398]1029                codes_index_select(iid, index_keys[i], prod[i])
[efdb01a]1030
[991df6a]1031            # get first id from current product
[2e62398]1032            gid = codes_new_from_index(iid)
[991df6a]1033
[ca867de]1034            # if there is no data for this specific time combination / product
1035            # skip the rest of the for loop and start with next timestep/product
1036            if not gid:
1037                continue
1038
1039            # create correct timestamp from the three time informations
[2e62398]1040            cdate = str(codes_get(gid, 'date'))
[79729d5]1041            time = codes_get(gid, 'time') // 100  # integer
[092aaf1]1042            step = codes_get(gid, 'step') # integer
1043            ctime = '{:0>2}'.format(time)
1044            cstep = '{:0>3}'.format(step)
1045
[ca867de]1046            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
[092aaf1]1047            t_dt = t_date + timedelta(hours=step)
1048            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
1049            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
[d4696e0]1050            if c.basetime is not None:
1051                t_enddate = datetime.strptime(c.end_date + str(c.basetime),
1052                                              '%Y%m%d%H')
1053            else:
1054                t_enddate = t_date + timedelta(2*int(c.dtime))
[27fe969]1055
[45b99e6]1056            # if necessary, add ensemble member number to filename suffix
1057            # otherwise, add empty string
[d727af2]1058            if maxnum:
1059                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
[45b99e6]1060            else:
1061                numbersuffix = ''
1062
[a4531f1]1063            if c.purefc:
[27fe969]1064                fnout = os.path.join(c.inputdir, 'flux' +
[ca867de]1065                                     t_date.strftime('%Y%m%d.%H') +
[45b99e6]1066                                     '.{:0>3}'.format(step-2*int(c.dtime)) +
1067                                     numbersuffix)
[27fe969]1068                gnout = os.path.join(c.inputdir, 'flux' +
[ca867de]1069                                     t_date.strftime('%Y%m%d.%H') +
[45b99e6]1070                                     '.{:0>3}'.format(step-int(c.dtime)) +
1071                                     numbersuffix)
[27fe969]1072                hnout = os.path.join(c.inputdir, 'flux' +
[ca867de]1073                                     t_date.strftime('%Y%m%d.%H') +
[45b99e6]1074                                     '.{:0>3}'.format(step) +
1075                                     numbersuffix)
[27fe969]1076            else:
1077                fnout = os.path.join(c.inputdir, 'flux' +
[45b99e6]1078                                     t_m2dt.strftime('%Y%m%d%H') + numbersuffix)
[27fe969]1079                gnout = os.path.join(c.inputdir, 'flux' +
[45b99e6]1080                                     t_m1dt.strftime('%Y%m%d%H') + numbersuffix)
[27fe969]1081                hnout = os.path.join(c.inputdir, 'flux' +
[45b99e6]1082                                     t_dt.strftime('%Y%m%d%H') + numbersuffix)
[27fe969]1083
1084            print("outputfile = " + fnout)
[2e62398]1085            f_handle = open(fnout, 'w')
1086            h_handle = open(hnout, 'w')
1087            g_handle = open(gnout, 'w')
[27fe969]1088
[092aaf1]1089            # read message for message and store relevant data fields, where
[27fe969]1090            # data keywords are stored in pars
[c274d9a]1091            while True:
[ca867de]1092                if not gid:
[27fe969]1093                    break
[092aaf1]1094                parId = codes_get(gid, 'paramId') # integer
1095                step = codes_get(gid, 'step') # integer
1096                time = codes_get(gid, 'time') # integer
1097                ni = codes_get(gid, 'Ni') # integer
1098                nj = codes_get(gid, 'Nj') # integer
[c274d9a]1099                if parId not in orig_vals.keys():
1100                    # parameter is not a flux, find next one
1101                    continue
[27fe969]1102
[c274d9a]1103                # define conversion factor
1104                if parId == 142 or parId == 143:
1105                    fak = 1. / 1000.
1106                else:
1107                    fak = 3600.
1108
1109                # get parameter values and reshape
1110                values = codes_get_values(gid)
1111                values = (np.reshape(values, (nj, ni))).flatten() / fak
[27fe969]1112
[c274d9a]1113                # save the original and accumulated values
1114                orig_vals[parId].append(values[:])
1115
1116                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
1117                    # no de-accumulation needed
1118                    deac_vals[parId].append(values[:] / int(c.dtime))
1119                else:
1120                    # do de-accumulation
1121                    deac_vals[parId].append(
1122                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
1123                         int(c.dtime))
1124
1125                # store precipitation if new disaggregation method is selected
1126                # only the exact days are needed
[092aaf1]1127                if c.rrint:
1128                    if start_date <= t_dt <= end_date:
1129                        if not c.purefc:
1130                            if t_dt not in date_list:
1131                                date_list.append(t_dt)
1132                                step_list = [0]
1133                        else:
1134                            if t_date not in date_list:
1135                                date_list.append(t_date)
1136                            if step not in step_list:
1137                                step_list.append(step)
[d727af2]1138                        # store precipitation values
1139                        if maxnum and parId == 142:
1140                            lsp_np[inumb, :, it_lsp] = deac_vals[parId][-1][:]
1141                            it_lsp += 1
1142                        elif not maxnum and parId == 142:
1143                            lsp_np[:, it_lsp] = deac_vals[parId][-1][:]
[092aaf1]1144                            it_lsp += 1
[d727af2]1145                        elif maxnum and parId == 143:
1146                            cp_np[inumb, :, it_cp] = deac_vals[parId][-1][:]
1147                            it_cp += 1
1148                        elif not maxnum and parId == 143:
1149                            cp_np[:, it_cp] = deac_vals[parId][-1][:]
[092aaf1]1150                            it_cp += 1
[c274d9a]1151
1152                # information printout
1153                print(parId, time, step, len(values), values[0], np.std(values))
1154
1155                # length of deac_vals[parId] corresponds to the
[092aaf1]1156                # number of time steps, max. 4 are needed for disaggegration
1157                # with the old and original method
[c274d9a]1158                # run over all grib messages and perform
[092aaf1]1159                # shifting in time
[c274d9a]1160                if len(deac_vals[parId]) >= 3:
1161                    if len(deac_vals[parId]) > 3:
1162                        if not c.rrint and (parId == 142 or parId == 143):
1163                            values = disaggregation.darain(deac_vals[parId])
[27fe969]1164                        else:
[c274d9a]1165                            values = disaggregation.dapoly(deac_vals[parId])
1166
[a4531f1]1167                        if not (step == c.maxstep and c.purefc \
[c274d9a]1168                                or t_dt == t_enddate):
1169                            # remove first time step in list to shift
1170                            # time line
1171                            orig_vals[parId].pop(0)
1172                            deac_vals[parId].pop(0)
1173                    else:
1174                        # if the third time step is read (per parId),
1175                        # write out the first one as a boundary value
[a4531f1]1176                        if c.purefc:
[c274d9a]1177                            values = deac_vals[parId][1]
1178                        else:
1179                            values = deac_vals[parId][0]
[27fe969]1180
[c274d9a]1181                    if not (c.rrint and (parId == 142 or parId == 143)):
[2e62398]1182                        codes_set_values(gid, values)
[ca867de]1183
[a4531f1]1184                        if c.purefc:
[d43706b]1185                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
[27fe969]1186                        else:
[d43706b]1187                            codes_set(gid, 'stepRange', 0)
[2e62398]1188                            codes_set(gid, 'time', t_m2dt.hour*100)
1189                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
[ca867de]1190
[2e62398]1191                        codes_write(gid, f_handle)
[27fe969]1192
[d4696e0]1193                        # squeeze out information of last two steps
1194                        # contained in deac_vals[parId]
1195                        # Note that deac_vals[parId][0] has not been popped
1196                        # in this case
1197
1198                        if step == c.maxstep and c.purefc or \
1199                           t_dt == t_enddate:
1200                            # last step
1201                            if c.purefc:
1202                                values = deac_vals[parId][3]
1203                                codes_set_values(gid, values)
1204                                codes_set(gid, 'stepRange', step)
1205                                #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1206                                codes_write(gid, h_handle)
1207                            else:
1208                                values = deac_vals[parId][3]
1209                                codes_set_values(gid, values)
1210                                codes_set(gid, 'stepRange', 0)
1211                                truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1212                                codes_set(gid, 'time', truedatetime.hour * 100)
1213                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1214                                codes_write(gid, h_handle)
1215
1216                            if parId == 142 or parId == 143:
1217                                values = disaggregation.darain(list(reversed(deac_vals[parId])))
1218                            else:
1219                                values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1220
1221                            # step before last step
1222                            if c.purefc:
1223                                codes_set(gid, 'stepRange', step-int(c.dtime))
1224                                #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1225                                codes_set_values(gid, values)
1226                                codes_write(gid, g_handle)
1227                            else:
1228                                codes_set(gid, 'stepRange', 0)
1229                                truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1230                                codes_set(gid, 'time', truedatetime.hour * 100)
1231                                codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1232                                codes_set_values(gid, values)
1233                                codes_write(gid, g_handle)
[2e62398]1234
1235                codes_release(gid)
[27fe969]1236
[2e62398]1237                gid = codes_new_from_index(iid)
[27fe969]1238
[2e62398]1239            f_handle.close()
1240            g_handle.close()
1241            h_handle.close()
[27fe969]1242
[2e62398]1243        codes_index_release(iid)
[27fe969]1244
[092aaf1]1245        if c.rrint:
1246            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1247
[d727af2]1248            self._prep_new_rrint(dims[0], dims[1], dims[2], lsp_np,
1249                                 cp_np, maxnum, index_keys, index_vals, c)
[092aaf1]1250
1251        return
1252
[d727af2]1253    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, maxnum, index_keys, index_vals, c):
[092aaf1]1254        '''Calculates and writes out the disaggregated precipitation fields.
[c274d9a]1255
[092aaf1]1256        Disaggregation is done in time and original times are written to the
1257        flux files, while the additional subgrid times are written to
[d727af2]1258        extra files output files. They are named like the original files with
1259        suffixes "_1" and "_2" for the first and second subgrid point.
[c274d9a]1260
[092aaf1]1261        Parameters
1262        ----------
[6f951ca]1263        ni : int
[092aaf1]1264            Amount of zonal grid points.
1265
[6f951ca]1266        nj : int
[092aaf1]1267            Amount of meridional grid points.
1268
[6f951ca]1269        nt : int
[092aaf1]1270            Number of time steps.
1271
[6f951ca]1272        lsp_np : numpy array of float
[092aaf1]1273            The large scale precipitation fields for each time step.
1274            Shape (ni * nj, nt).
1275
[6f951ca]1276        cp_np : numpy array of float
[092aaf1]1277            The convective precipitation fields for each time step.
1278            Shape (ni * nj, nt).
1279
[d727af2]1280        maxnum : int
1281            The maximum number of ensemble members. It is None
1282            if there are no or just one ensemble.
[092aaf1]1283
[d727af2]1284        index_keys : dictionary
1285            List of parameter names which serves as index.
1286
1287        index_vals : list of list  of str
1288            Contains the values from the keys used for a distinct selection
1289            of grib messages in processing  the grib files.
1290            Content looks like e.g.:
1291            index_vals[0]: ('20171106', '20171107', '20171108') ; date
1292            index_vals[1]: ('0', '1200', '1800', '600') ; time
1293            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
[c274d9a]1294
[6f951ca]1295        c : ControlFile
[092aaf1]1296            Contains all the parameters of CONTROL file and
1297            command line.
1298
1299        Return
1300        ------
1301
1302        '''
[f20af73]1303        print('... disaggregation of precipitation with new method.')
[d727af2]1304
1305        tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1306
1307        # initialize new numpy arrays for disaggregated fields
1308        if maxnum:
1309            lsp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1310            cp_new_np = np.zeros((maxnum, ni * nj, nt * 3), dtype=np.float64)
1311        else:
1312            lsp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
1313            cp_new_np = np.zeros((1, ni * nj, nt * 3), dtype=np.float64)
[092aaf1]1314
1315        # do the disaggregation, but neglect the last value of the
[d727af2]1316        # original time series. This one corresponds for example to
1317        # 24 hour, which we don't need. we use 0 - 23 UTC for a day.
1318        if maxnum:
1319            for inum in range(maxnum):
1320                for ix in range(ni*nj):
1321                    lsp_new_np[inum,ix,:] = disaggregation.IA3(lsp_np[inum,ix,:])[:-1]
1322                    cp_new_np[inum,ix,:] = disaggregation.IA3(cp_np[inum,ix,:])[:-1]
1323        else:
1324            for ix in range(ni*nj):
1325                lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[0,ix,:])[:-1]
1326                cp_new_np[ix,:] = disaggregation.IA3(cp_np[0,ix,:])[:-1]
[092aaf1]1327
1328        # write to grib files (full/orig times to flux file and inbetween
1329        # times into seperate end files)
1330        print('... write disaggregated precipitation to files.')
[d727af2]1331
1332        if maxnum:
1333            # remember the index of the number values
1334            index_number = index_keys.index('number')
1335            # empty set to save unique ensemble numbers which were already processed
1336            ens_numbers = set()
1337            # index for the ensemble number
1338            inumb = 0
1339        else:
1340            inumb = 0
1341
[79729d5]1342        # index variable of disaggregated fields
[092aaf1]1343        it = 0
[d727af2]1344
1345        # "product" genereates each possible combination between the
1346        # values of the index keys
1347        for prod in product(*index_vals):
1348            # e.g. prod = ('20170505', '0', '12')
1349            #             ( date     ,time, step)
1350            # or   prod = ('0'   , '20170505', '0', '12')
1351            #             (number, date      ,time, step)
1352
1353            cdate = prod[index_keys.index('date')]
1354            ctime = '{:0>2}'.format(int(prod[index_keys.index('time')])//100)
1355            cstep = '{:0>3}'.format(int(prod[index_keys.index('step')]))
1356
1357            date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1358            date += timedelta(hours=int(cstep))
1359
1360            start_period, end_period = generate_retrieval_period_boundary(c)
1361            # skip all temporary times
1362            # which are outside the retrieval period
1363            if date < start_period or \
1364               date > end_period:
1365                continue
1366
1367            # the whole process has to be done for each seperate ensemble member
1368            # therefore, for each new ensemble member we delete old flux values
1369            # and start collecting flux data from the beginning time step
1370            if maxnum and prod[index_number] not in ens_numbers:
1371                ens_numbers.add(prod[index_number])
1372                inumb = int(prod[index_number])
1373                it = 0
1374
1375            # if necessary, add ensemble member number to filename suffix
1376            # otherwise, add empty string
1377            if maxnum:
1378                numbersuffix = '.N{:0>3}'.format(int(prod[index_number]))
1379            else:
1380                numbersuffix = ''
1381
1382            # per original time stamp: write original time step and
1383            # the two newly generated sub time steps
1384            if c.purefc:
1385                fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + '.' + cstep
1386            else:
1387                fluxfilename = 'flux' + date.strftime('%Y%m%d%H') + numbersuffix
1388
1389            # write original time step to flux file as usual
1390            fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1391            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1392                              wherekeynames=['paramId'], wherekeyvalues=[142],
1393                              keynames=['perturbationNumber','date','time','stepRange','values'],
1394                              keyvalues=[inumb, int(date.strftime('%Y%m%d')),
1395                                         date.hour*100, 0, lsp_new_np[inumb,:,it]],
1396                             )
1397            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1398                              wherekeynames=['paramId'], wherekeyvalues=[143],
1399                              keynames=['perturbationNumber','date','time','stepRange','values'],
1400                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1401                                         date.hour*100, 0, cp_new_np[inumb,:,it]]
1402                             )
1403
1404            # rr for first subgrid point is identified by step = 1
1405            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1406                              wherekeynames=['paramId'], wherekeyvalues=[142],
1407                              keynames=['perturbationNumber','date','time','stepRange','values'],
1408                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1409                                         date.hour*100, '1', lsp_new_np[inumb,:,it+1]]
1410                              )
1411            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1412                              wherekeynames=['paramId'], wherekeyvalues=[143],
1413                              keynames=['perturbationNumber','date','time','stepRange','values'],
1414                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1415                                         date.hour*100, '1', cp_new_np[inumb,:,it+1]]
1416                              )
1417
1418            # rr for second subgrid point is identified by step = 2
1419            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1420                              wherekeynames=['paramId'], wherekeyvalues=[142],
1421                              keynames=['perturbationNumber','date','time','stepRange','values'],
1422                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1423                                         date.hour*100, '2', lsp_new_np[inumb,:,it+2]]
1424                              )
1425            fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1426                              wherekeynames=['paramId'], wherekeyvalues=[143],
1427                              keynames=['perturbationNumber','date','time','stepRange','values'],
1428                              keyvalues=[inumb,int(date.strftime('%Y%m%d')),
1429                                         date.hour*100, '2', cp_new_np[inumb,:,it+2]]
1430                              )
1431
1432            it = it + 3 # jump to next original time step in rr fields
[27fe969]1433        return
1434
[092aaf1]1435    def _create_rr_grib_dummy(self, ifile, inputdir):
1436        '''Creates a grib file with a dummy message for the two precipitation
1437        types lsp and cp each.
1438
1439        Parameters
1440        ----------
[6f951ca]1441        ifile : str
[092aaf1]1442            Filename of the input file to read the grib messages from.
1443
[6f951ca]1444        inputdir : str, optional
[092aaf1]1445            Path to the directory where the retrieved data is stored.
1446
1447        Return
1448        ------
1449
1450        '''
1451
1452        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1453
1454        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1455                      keyvalues=[142], filemode='w')
1456
1457        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1458                      keyvalues=[143], filemode='a')
1459
1460        return
[27fe969]1461
1462    def create(self, inputfiles, c):
[274f9ef]1463        '''An index file will be created which depends on the combination
1464        of "date", "time" and "stepRange" values. This is used to iterate
1465        over all messages in each grib file which were passed through the
1466        parameter "inputfiles" to seperate specific parameters into fort.*
1467        files. Afterwards the FORTRAN program is called to convert
1468        the data fields all to the same grid and put them in one file
1469        per unique time step (combination of "date", "time" and
1470        "stepRange").
1471
1472        Note
1473        ----
1474        This method is based on the ECMWF example index.py
1475        https://software.ecmwf.int/wiki/display/GRIB/index.py
1476
1477        Parameters
1478        ----------
[6f951ca]1479        inputfiles : UioFiles
[274f9ef]1480            Contains a list of files.
1481
[6f951ca]1482        c : ControlFile
[274f9ef]1483            Contains all the parameters of CONTROL file and
1484            command line.
1485
1486        Return
1487        ------
1488
[27fe969]1489        '''
1490
[e446e85]1491        # generate start and end timestamp of the retrieval period
1492        start_period = datetime.strptime(c.start_date + c.time[0], '%Y%m%d%H')
1493        start_period = start_period + timedelta(hours=int(c.step[0]))
1494        end_period = datetime.strptime(c.end_date + c.time[-1], '%Y%m%d%H')
1495        end_period = end_period + timedelta(hours=int(c.step[-1]))
1496
[ca867de]1497        if c.wrf:
1498            table128 = init128(_config.PATH_GRIBTABLE)
1499            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1500                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1501                                  table128)
1502
1503        # these numbers are indices for the temporary files "fort.xx"
1504        # which are used to seperate the grib fields to,
1505        # for the Fortran program input
1506        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1507        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
[27fe969]1508        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
[ca867de]1509                 '17':None, '18':None, '19':None, '21':None, '22':None}
[27fe969]1510
[ca867de]1511        iid = None
1512        index_vals = None
1513
1514        # get the values of the keys which are used for distinct access
1515        # of grib messages via product
[45b99e6]1516        if '/' in self.number:
1517            # more than one ensemble member is selected
1518            index_keys = ["number", "date", "time", "step"]
1519        else:
1520            index_keys = ["date", "time", "step"]
[ca867de]1521        iid, index_vals = self._mk_index_values(c.inputdir,
1522                                                inputfiles,
1523                                                index_keys)
1524        # index_vals looks like e.g.:
1525        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
[092aaf1]1526        # index_vals[1]: ('0', '600', '1200', '1800') ; time
[ca867de]1527        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1528
1529        # "product" genereates each possible combination between the
1530        # values of the index keys
[27fe969]1531        for prod in product(*index_vals):
1532            # e.g. prod = ('20170505', '0', '12')
1533            #             (  date    ,time, step)
[ca867de]1534
1535            print('current product: ', prod)
1536
[27fe969]1537            for i in range(len(index_keys)):
[2e62398]1538                codes_index_select(iid, index_keys[i], prod[i])
[27fe969]1539
1540            # get first id from current product
[2e62398]1541            gid = codes_new_from_index(iid)
[27fe969]1542
[ca867de]1543            # if there is no data for this specific time combination / product
1544            # skip the rest of the for loop and start with next timestep/product
1545            if not gid:
1546                continue
[3f36e42]1547#============================================================================================
[ca867de]1548            # remove old fort.* files and open new ones
1549            # they are just valid for a single product
[79729d5]1550            for k, f in fdict.items():
[ca867de]1551                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1552                silent_remove(fortfile)
1553                fdict[k] = open(fortfile, 'w')
[3f36e42]1554#============================================================================================
[ca867de]1555            # create correct timestamp from the three time informations
[2e62398]1556            cdate = str(codes_get(gid, 'date'))
[79729d5]1557            ctime = '{:0>2}'.format(codes_get(gid, 'time') // 100)
[2e62398]1558            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
[ca867de]1559            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1560            timestamp += timedelta(hours=int(cstep))
1561            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1562
[f20af73]1563            # skip all temporary times
[e446e85]1564            # which are outside the retrieval period
1565            if timestamp < start_period or \
1566               timestamp > end_period:
1567                continue
1568
[ca867de]1569            # if the timestamp is out of basetime start/end date period,
1570            # skip this specific product
[d4696e0]1571            if c.basetime is not None:
1572                time_delta = timedelta(hours=12-int(c.dtime))
1573                start_time = datetime.strptime(c.end_date + str(c.basetime),
[ca867de]1574                                                '%Y%m%d%H') - time_delta
[d4696e0]1575                end_time = datetime.strptime(c.end_date + str(c.basetime),
[ca867de]1576                                             '%Y%m%d%H')
1577                if timestamp < start_time or timestamp > end_time:
1578                    continue
1579
1580            if c.wrf:
1581                if 'olddate' not in locals() or cdate != olddate:
1582                    fwrf = open(os.path.join(c.outputdir,
1583                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1584                    olddate = cdate[:]
[3f36e42]1585#============================================================================================
[ca867de]1586            # savedfields remembers which fields were already used.
[efdb01a]1587            savedfields = []
[ca867de]1588            # sum of cloud liquid and ice water content
1589            scwc = None
[efdb01a]1590            while 1:
[ca867de]1591                if not gid:
[efdb01a]1592                    break
[2e62398]1593                paramId = codes_get(gid, 'paramId')
1594                gridtype = codes_get(gid, 'gridType')
1595                levtype = codes_get(gid, 'typeOfLevel')
[ca867de]1596                if paramId == 77: # ETADOT
[2e62398]1597                    codes_write(gid, fdict['21'])
[ca867de]1598                elif paramId == 130: # T
[2e62398]1599                    codes_write(gid, fdict['11'])
[ca867de]1600                elif paramId == 131 or paramId == 132: # U, V wind component
[2e62398]1601                    codes_write(gid, fdict['10'])
[ca867de]1602                elif paramId == 133 and gridtype != 'reduced_gg': # Q
[2e62398]1603                    codes_write(gid, fdict['17'])
[ca867de]1604                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
[2e62398]1605                    codes_write(gid, fdict['18'])
[ca867de]1606                elif paramId == 135: # W
[2e62398]1607                    codes_write(gid, fdict['19'])
[ca867de]1608                elif paramId == 152: # LNSP
[2e62398]1609                    codes_write(gid, fdict['12'])
[ca867de]1610                elif paramId == 155 and gridtype == 'sh': # D
[2e62398]1611                    codes_write(gid, fdict['13'])
[ca867de]1612                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1613                    # sum cloud liquid water and ice
[0934db1]1614                    if scwc is None:
[2e62398]1615                        scwc = codes_get_values(gid)
[efdb01a]1616                    else:
[2e62398]1617                        scwc += codes_get_values(gid)
1618                        codes_set_values(gid, scwc)
1619                        codes_set(gid, 'paramId', 201031)
1620                        codes_write(gid, fdict['22'])
[092aaf1]1621                        scwc = None
[ca867de]1622                elif c.wrf and paramId in [129, 138, 155] and \
1623                      levtype == 'hybrid': # Z, VO, D
1624                    # do not do anything right now
1625                    # these are specific parameter for WRF
1626                    pass
[efdb01a]1627                else:
1628                    if paramId not in savedfields:
[ca867de]1629                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1630                        # and all ADDPAR parameter
[2e62398]1631                        codes_write(gid, fdict['16'])
[efdb01a]1632                        savedfields.append(paramId)
1633                    else:
[2fb99de]1634                        print('duplicate ' + str(paramId) + ' not written')
[efdb01a]1635
1636                try:
[2fb99de]1637                    if c.wrf:
1638                        # model layer
1639                        if levtype == 'hybrid' and \
1640                           paramId in [129, 130, 131, 132, 133, 138, 155]:
[2e62398]1641                            codes_write(gid, fwrf)
[2fb99de]1642                        # sfc layer
1643                        elif paramId in wrfpars:
[2e62398]1644                            codes_write(gid, fwrf)
[efdb01a]1645                except AttributeError:
1646                    pass
1647
[2e62398]1648                codes_release(gid)
1649                gid = codes_new_from_index(iid)
[3f36e42]1650#============================================================================================
[efdb01a]1651            for f in fdict.values():
1652                f.close()
[3f36e42]1653#============================================================================================
[ca867de]1654            # call for Fortran program to convert e.g. reduced_gg grids to
1655            # regular_ll and calculate detadot/dp
1656            pwd = os.getcwd()
1657            os.chdir(c.inputdir)
1658            if os.stat('fort.21').st_size == 0 and c.eta:
[f20af73]1659                print('Parameter 77 (etadot) is missing, most likely it is '
1660                      'not available for this type or date / time\n')
[ca867de]1661                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
[f20af73]1662                my_error('fort.21 is empty while parameter eta '
1663                         'is set to 1 in CONTROL file')
1664# ============================================================================================
[2d56c04]1665            # write out all output to log file before starting fortran programm
1666            sys.stdout.flush()
1667
[ca867de]1668            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
[bf48c8a]1669            execute_subprocess([os.path.join(c.exedir,
1670                                _config.FORTRAN_EXECUTABLE)],
1671                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1672
[ca867de]1673            os.chdir(pwd)
[f20af73]1674# ============================================================================================
[ca867de]1675            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
[a4531f1]1676            if c.purefc:
[ca867de]1677                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1678            else:
1679                suffix = cdate_hour[2:10]
[45b99e6]1680
1681            # if necessary, add ensemble member number to filename suffix
1682            if 'number' in index_keys:
1683                index_number = index_keys.index('number')
1684                if len(index_vals[index_number]) > 1:
1685                    suffix = suffix + '.N{:0>3}'.format(int(prod[index_number]))
1686
[ca867de]1687            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1688            print("outputfile = " + fnout)
1689            # collect for final processing
1690            self.outputfilelist.append(os.path.basename(fnout))
[d727af2]1691            # # get additional precipitation subgrid data if available
1692            # if c.rrint:
1693                # self.outputfilelist.append(os.path.basename(fnout + '_1'))
1694                # self.outputfilelist.append(os.path.basename(fnout + '_2'))
[f20af73]1695# ============================================================================================
[ca867de]1696            # create outputfile and copy all data from intermediate files
1697            # to the outputfile (final GRIB input files for FLEXPART)
1698            orolsm = os.path.basename(glob.glob(c.inputdir +
1699                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1700            fluxfile = 'flux' + cdate[0:2] + suffix
1701            if not c.cwc:
1702                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
[2fb99de]1703            else:
[ca867de]1704                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1705
1706            with open(fnout, 'wb') as fout:
1707                for f in flist:
1708                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1709                                       fout)
1710
1711            if c.omega:
1712                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1713                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1714                                            'rb'), fout)
[f20af73]1715# ============================================================================================
[2fb99de]1716        if c.wrf:
[ff99eae]1717            fwrf.close()
[efdb01a]1718
[2e62398]1719        codes_index_release(iid)
[efdb01a]1720
1721        return
1722
[27fe969]1723
[45b99e6]1724    def calc_extra_elda(self, path, prefix):
1725        ''' Calculates extra ensemble members for ELDA - Stream.
1726
[d727af2]1727        This is a specific feature which doubles the number of ensemble members
1728        for the ELDA Stream.
1729
[45b99e6]1730        Parameters
1731        ----------
1732        path : str
1733            Path to the output files.
1734
1735        prefix : str
1736            The prefix of the output filenames as defined in Control file.
1737
1738        Return
1739        ------
1740
1741        '''
1742        # max number
1743        maxnum = int(self.number.split('/')[-1])
1744
1745        # get a list of all prepared output files with control forecast (CF)
1746        CF_filelist = UioFiles(path, prefix + '*.N000')
1747
1748        for cffile in CF_filelist.files:
1749            with open(cffile, 'rb') as f:
1750                cfvalues=[]
1751                while True:
1752                    fid = codes_grib_new_from_file(f)
1753                    if fid is None:
1754                        break
1755                    cfvalues.append(codes_get_array(fid, 'values'))
1756                    codes_release(fid)
1757
1758            filename = cffile.split('N000')[0]
[d727af2]1759            for i in range(1, maxnum + 1):
[45b99e6]1760
1761                # read an ensemble member
1762                g = open(filename + 'N{:0>3}'.format(i), 'rb')
1763                # create file for newly calculated ensemble member
1764                h = open(filename + 'N{:0>3}'.format(i+maxnum), 'wb')
1765                # number of message in grib file
1766                j = 0
1767                while True:
1768                    gid = codes_grib_new_from_file(g)
1769                    if gid is None:
1770                        break
1771                    values = codes_get_array(gid, 'values')
[d727af2]1772                    # generate a new ensemble member by subtracting
1773                    # 2 * ( current time step value - last time step value )
[45b99e6]1774                    codes_set_array(gid, 'values',
1775                                    values-2*(values-cfvalues[j]))
1776                    codes_set(gid, 'number', i+maxnum)
1777                    codes_write(gid, h)
1778                    codes_release(gid)
1779                    j += 1
1780
1781                g.close()
1782                h.close()
1783                print('wrote ' + filename + 'N{:0>3}'.format(i+maxnum))
1784                self.outputfilelist.append(
1785                    os.path.basename(filename + 'N{:0>3}'.format(i+maxnum)))
1786
1787        return
1788
1789
[27fe969]1790    def process_output(self, c):
[3f36e42]1791        '''Postprocessing of FLEXPART input files.
1792
1793        The grib files are postprocessed depending on the selection in
[274f9ef]1794        CONTROL file. The resulting files are moved to the output
1795        directory if its not equal to the input directory.
1796        The following modifications might be done if
1797        properly switched in CONTROL file:
1798        GRIB2 - Conversion to GRIB2
1799        ECTRANS - Transfer of files to gateway server
1800        ECSTORAGE - Storage at ECMWF server
1801
1802        Parameters
1803        ----------
[6f951ca]1804        c : ControlFile
[274f9ef]1805            Contains all the parameters of CONTROL file and
1806            command line.
1807
1808        Return
1809        ------
[efdb01a]1810
[27fe969]1811        '''
[efdb01a]1812
[27fe969]1813        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
[efdb01a]1814
[f20af73]1815        if _config.FLAG_ON_ECMWFSERVER:
[27fe969]1816            print('ecstorage: {}\n ecfsdir: {}\n'.
1817                  format(c.ecstorage, c.ecfsdir))
1818            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1819                  .format(c.ectrans, c.gateway, c.destination))
[efdb01a]1820
[ca867de]1821        print('Output filelist: ')
[d727af2]1822        print(sorted(self.outputfilelist))
[efdb01a]1823
[70fee58]1824        for ofile in self.outputfilelist:
1825            ofile = os.path.join(self.inputdir, ofile)
1826
1827            if c.format.lower() == 'grib2':
[bf48c8a]1828                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1829                                    'productDefinitionTemplateNumber=8',
1830                                    ofile, ofile + '_2'],
1831                                   error_msg='GRIB2 CONVERSION FAILED!')
1832
1833                execute_subprocess(['mv', ofile + '_2', ofile],
1834                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1835                                   'FILES FAILED!')
[efdb01a]1836
[f20af73]1837            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
[bf48c8a]1838                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1839                                    c.gateway, '-remote', c.destination,
1840                                    '-source', ofile],
1841                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
[efdb01a]1842
[f20af73]1843            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
[bf48c8a]1844                execute_subprocess(['ecp', '-o', ofile,
1845                                    os.path.expandvars(c.ecfsdir)],
1846                                   error_msg='COPY OF FILES TO ECSTORAGE '
1847                                   'AREA FAILED!')
[efdb01a]1848
[70fee58]1849            if c.outputdir != c.inputdir:
[bf48c8a]1850                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1851                                    c.outputdir],
1852                                   error_msg='RELOCATION OF OUTPUT FILES '
1853                                   'TO OUTPUTDIR FAILED!')
[efdb01a]1854
[27fe969]1855        return
[efdb01a]1856
1857
[27fe969]1858    def prepare_fp_files(self, c):
[274f9ef]1859        '''Conversion of GRIB files to FLEXPART binary format.
[efdb01a]1860
[274f9ef]1861        Parameters
1862        ----------
[6f951ca]1863        c : ControlFile
[274f9ef]1864            Contains all the parameters of CONTROL file and
1865            command line.
[efdb01a]1866
[274f9ef]1867        Return
1868        ------
[efdb01a]1869
[27fe969]1870        '''
1871        # generate AVAILABLE file
1872        # Example of AVAILABLE file data:
1873        # 20131107 000000      EN13110700              ON DISC
1874        clist = []
1875        for ofile in self.outputfilelist:
1876            fname = ofile.split('/')
1877            if '.' in fname[-1]:
1878                l = fname[-1].split('.')
1879                timestamp = datetime.strptime(l[0][-6:] + l[1],
1880                                              '%y%m%d%H')
1881                timestamp += timedelta(hours=int(l[2]))
1882                cdate = datetime.strftime(timestamp, '%Y%m%d')
1883                chms = datetime.strftime(timestamp, '%H%M%S')
1884            else:
1885                cdate = '20' + fname[-1][-8:-2]
1886                chms = fname[-1][-2:] + '0000'
1887            clist.append(cdate + ' ' + chms + ' '*6 +
1888                         fname[-1] + ' '*14 + 'ON DISC')
1889        clist.sort()
1890        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1891            f.write('\n'.join(clist) + '\n')
1892
1893        # generate pathnames file
1894        pwd = os.path.abspath(c.outputdir)
1895        with open(pwd + '/pathnames', 'w') as f:
1896            f.write(pwd + '/Options/\n')
1897            f.write(pwd + '/\n')
1898            f.write(pwd + '/\n')
1899            f.write(pwd + '/AVAILABLE\n')
1900            f.write(' = == = == = == = == = == ==  = \n')
1901
1902        # create Options dir if necessary
1903        if not os.path.exists(pwd + '/Options'):
[5bad6ec]1904            make_dir(pwd+'/Options')
[27fe969]1905
1906        # read template COMMAND file
1907        with open(os.path.expandvars(os.path.expanduser(
[092aaf1]1908            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
[27fe969]1909            lflist = f.read().split('\n')
1910
1911        # find index of list where to put in the
1912        # date and time information
1913        # usually after the LDIRECT parameter
1914        i = 0
1915        for l in lflist:
1916            if 'LDIRECT' in l.upper():
1917                break
1918            i += 1
1919
1920        # insert the date and time information of run start and end
1921        # into the list of lines of COMMAND file
1922        lflist = lflist[:i+1] + \
1923                 [clist[0][:16], clist[-1][:16]] + \
1924                 lflist[i+3:]
1925
1926        # write the new COMMAND file
1927        with open(pwd + '/Options/COMMAND', 'w') as g:
1928            g.write('\n'.join(lflist) + '\n')
1929
1930        # change to outputdir and start the grib2flexpart run
1931        # afterwards switch back to the working dir
1932        os.chdir(c.outputdir)
[bf48c8a]1933        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1934         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1935        execute_subprocess(cmd)
[27fe969]1936        os.chdir(pwd)
[efdb01a]1937
1938        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG