source: flex_extract.git/source/python/classes/EcFlexpart.py @ bf48c8a

ctbtodev
Last change on this file since bf48c8a was bf48c8a, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

introduced a function for subprocess check_call to do error handling once

  • Property mode set to 100644
File size: 66.6 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83
84from eccodes import (codes_index_select, codes_new_from_index, codes_get,
85                     codes_get_values, codes_set_values, codes_set,
86                     codes_write, codes_release, codes_new_from_index,
87                     codes_index_release, codes_index_get)
88
89# software specific classes and modules from flex_extract
90sys.path.append('../')
91import _config
92from GribUtil import GribUtil
93from mods.tools import (init128, to_param_id, silent_remove, product,
94                        my_error, make_dir, get_informations, get_dimensions,
95                        execute_subprocess)
96from MarsRetrieval import MarsRetrieval
97import mods.disaggregation as disaggregation
98
99# ------------------------------------------------------------------------------
100# CLASS
101# ------------------------------------------------------------------------------
102class EcFlexpart(object):
103    '''
104    Class to retrieve FLEXPART specific ECMWF data.
105    '''
106    # --------------------------------------------------------------------------
107    # CLASS FUNCTIONS
108    # --------------------------------------------------------------------------
109    def __init__(self, c, fluxes=False):
110        '''Creates an object/instance of EcFlexpart with the associated
111        settings of its attributes for the retrieval.
112
113        Parameters:
114        -----------
115        c : :obj:`ControlFile`
116            Contains all the parameters of CONTROL file and
117            command line.
118
119        fluxes : :obj:`boolean`, optional
120            Decides if the flux parameter settings are stored or
121            the rest of the parameter list.
122            Default value is False.
123
124        Return
125        ------
126
127        '''
128        # set a counter for the number of generated mars requests
129        self.mreq_count = 0
130
131        self.inputdir = c.inputdir
132        self.dataset = c.dataset
133        self.basetime = c.basetime
134        self.dtime = c.dtime
135        self.acctype = c.acctype
136        self.acctime = c.acctime
137        self.accmaxstep = c.accmaxstep
138        self.marsclass = c.marsclass
139        self.stream = c.stream
140        self.number = c.number
141        self.resol = c.resol
142        self.accuracy = c.accuracy
143        self.addpar = c.addpar
144        self.level = c.level
145        self.expver = c.expver
146        self.levelist = c.levelist
147        self.glevelist = '1/to/' + c.level # in case of gaussian grid
148        self.gaussian = c.gaussian
149        self.grid = c.grid
150        self.area = c.area
151        self.purefc = c.purefc
152        self.outputfilelist = []
153
154        # Define the different types of field combinations (type, time, step)
155        self.types = {}
156        # Define the parameters and their level types, level list and
157        # grid resolution for the retrieval job
158        self.params = {}
159
160        if fluxes:
161            self._create_params_fluxes()
162        else:
163            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
164
165        if fluxes and not c.purefc:
166            self._create_field_types_fluxes()
167        else:
168            self._create_field_types(c.type, c.time, c.step)
169        return
170
171    def _create_field_types(self, ftype, ftime, fstep):
172        '''Create the combination of field type, time and forecast step.
173
174        Parameters:
175        -----------
176        ftype : :obj:`list` of :obj:`string`
177            List of field types.
178
179        ftime : :obj:`list` of :obj:`string`
180            The time in hours of the field.
181
182        fstep : :obj:`string`
183            Specifies the forecast time step from forecast base time.
184            Valid values are hours (HH) from forecast base time.
185
186        Return
187        ------
188
189        '''
190        i = 0
191        for ty, st, ti in zip(ftype, fstep, ftime):
192            btlist = range(24)
193            if self.basetime == '12':
194                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
195            if self.basetime == '00':
196                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
197
198            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
199                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
200                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
201                 # (int(c.time[i]) in btlist or c.purefc):
202
203            if (int(ti) in btlist) or self.purefc:
204
205                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
206                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
207
208                    if ty not in self.types.keys():
209                        self.types[ty] = {'times': '', 'steps': ''}
210
211                    if ti not in self.types[ty]['times']:
212                        if self.types[ty]['times']:
213                            self.types[ty]['times'] += '/'
214                        self.types[ty]['times'] += ti
215
216                    if st not in self.types[ty]['steps']:
217                        if self.types[ty]['steps']:
218                            self.types[ty]['steps'] += '/'
219                        self.types[ty]['steps'] += st
220            i += 1
221        return
222
223    def _create_field_types_fluxes(self):
224        '''Create the combination of field type, time and forecast step
225        for the flux data.
226
227        Parameters:
228        -----------
229
230        Return
231        ------
232
233        '''
234        self.types[str(self.acctype)] = {'times': str(self.acctime),
235                                         'steps': '{}/to/{}/by/{}'.format(
236                                             self.dtime,
237                                             self.accmaxstep,
238                                             self.dtime)}
239        return
240
241    def _create_params(self, gauss, eta, omega, cwc, wrf):
242        '''Define the specific parameter settings for retrievment.
243
244        The different parameters need specific grid types and level types
245        for retrievement. We might get following combination of types
246        (depending on selection and availability):
247        (These are short cuts for the grib file names (leading sequence)
248        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
249        where:
250            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
251            ML = Model Level, SL = Surface Level
252
253        For each of this combination there is a list of parameter names,
254        the level type, the level list and the grid resolution.
255
256        There are different scenarios for data extraction from MARS:
257        1) Retrieval of etadot
258           eta=1, gauss=0, omega=0
259        2) Calculation of etadot from divergence
260           eta=0, gauss=1, omega=0
261        3) Calculation of etadot from omega (for makes sense for debugging)
262           eta=0, gauss=0, omega=1
263        4) Retrieval and Calculation of etadot (only for debugging)
264           eta=1, gauss=1, omega=0
265        5) Download also specific model and surface level data for FLEXPART-WRF
266
267        Parameters:
268        -----------
269        gauss : :obj:`integer`
270            Gaussian grid is retrieved.
271
272        eta : :obj:`integer`
273            Etadot parameter will be directly retrieved.
274
275        omega : :obj:`integer`
276            The omega paramterwill be retrieved.
277
278        cwc : :obj:`integer`
279            The cloud liquid and ice water content will be retrieved.
280
281        wrf : :obj:`integer`
282            Additional model level and surface level data will be retrieved for
283            WRF/FLEXPART-WRF simulations.
284
285        Return
286        ------
287
288        '''
289        # SURFACE FIELDS
290        #-----------------------------------------------------------------------
291        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
292        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
293                                 'SFC', '1', self.grid]
294        if self.addpar:
295            self.params['OG__SL'][0] += self.addpar
296
297        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
298            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
299                                            'SFC', '1', self.grid]
300        else:
301            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
302                                            'SFC', '1', self.grid]
303
304        # MODEL LEVEL FIELDS
305        #-----------------------------------------------------------------------
306        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
307
308        if not gauss and eta:
309            self.params['OG__ML'][0] += '/U/V/ETADOT'
310        elif gauss and not eta:
311            self.params['GG__SL'] = ['Q', 'ML', '1', \
312                                     '{}'.format((int(self.resol) + 1) / 2)]
313            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
314        elif not gauss and not eta:
315            self.params['OG__ML'][0] += '/U/V'
316        else:
317            print('Warning: Collecting etadot and parameters for gaussian grid \
318                            is a very costly parameter combination, \
319                            use this combination only for debugging!')
320            self.params['GG__SL'] = ['Q', 'ML', '1', \
321                                     '{}'.format((int(self.resol) + 1) / 2)]
322            self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
323                                     '{}'.format((int(self.resol) + 1) / 2)]
324
325        if omega:
326            self.params['OG__ML'][0] += '/W'
327
328        if cwc:
329            self.params['OG__ML'][0] += '/CLWC/CIWC'
330
331        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
332        #-----------------------------------------------------------------------
333        if wrf:
334            self.params['OG__ML'][0] += '/Z/VO'
335            if '/D' not in self.params['OG__ML'][0]:
336                self.params['OG__ML'][0] += '/D'
337            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
338                       'SWVL1','SWVL2','SWVL3','SWVL4']
339            for par in wrf_sfc:
340                if par not in self.params['OG__SL'][0]:
341                    self.params['OG__SL'][0] += '/' + par
342
343        return
344
345
346    def _create_params_fluxes(self):
347        '''Define the parameter setting for flux data.
348
349        Flux data are accumulated fields in time and are stored on the
350        surface level. The leading short cut name for the grib files is:
351        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
352        acc for Accumulated Grid.
353        The params dictionary stores a list of parameter names, the level type,
354        the level list and the grid resolution.
355
356        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
357
358        Parameters:
359        -----------
360
361        Return
362        ------
363
364        '''
365        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
366                                    'SFC', '1', self.grid]
367        return
368
369
370    def _mk_targetname(self, ftype, param, date):
371        '''Creates the filename for the requested grib data to be stored in.
372        This name is passed as the "target" parameter in the request.
373
374        Parameters
375        ----------
376        ftype : :obj:`string`
377            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
378
379        param : :obj:`string`
380            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
381            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
382
383        date : :obj:`string`
384            The date period of the grib data to be stored in this file.
385
386        Return
387        ------
388        targetname : :obj:`string`
389            The target filename for the grib data.
390        '''
391        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
392                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
393
394        return targetname
395
396
397    def _start_retrievement(self, request, par_dict):
398        '''Creates the Mars Retrieval and prints or submits the request
399        depending on the status of the request variable.
400
401        Parameters
402        ----------
403        request : :obj:`integer`
404            Selects the mode of retrieval.
405            0: Retrieves the data from ECMWF.
406            1: Prints the mars requests to an output file.
407            2: Retrieves the data and prints the mars request.
408
409        par_dict : :obj:`dictionary`
410            Contains all parameter which have to be set for creating the
411            Mars Retrievals. The parameter are:
412            marsclass, dataset, stream, type, levtype, levelist, resol,
413            gaussian, accuracy, grid, target, area, date, time, number,
414            step, expver, param
415
416        Return
417        ------
418
419        '''
420        # increase number of mars requests
421        self.mreq_count += 1
422
423        MR = MarsRetrieval(self.server,
424                           self.public,
425                           marsclass=par_dict['marsclass'],
426                           dataset=par_dict['dataset'],
427                           stream=par_dict['stream'],
428                           type=par_dict['type'],
429                           levtype=par_dict['levtype'],
430                           levelist=par_dict['levelist'],
431                           resol=par_dict['resol'],
432                           gaussian=par_dict['gaussian'],
433                           accuracy=par_dict['accuracy'],
434                           grid=par_dict['grid'],
435                           target=par_dict['target'],
436                           area=par_dict['area'],
437                           date=par_dict['date'],
438                           time=par_dict['time'],
439                           number=par_dict['number'],
440                           step=par_dict['step'],
441                           expver=par_dict['expver'],
442                           param=par_dict['param'])
443
444        if request == 0:
445            MR.display_info()
446            MR.data_retrieve()
447        elif request == 1:
448            MR.print_infodata_csv(self.inputdir, self.mreq_count)
449        elif request == 2:
450            MR.print_infodata_csv(self.inputdir, self.mreq_count)
451            MR.display_info()
452            MR.data_retrieve()
453        else:
454            print('Failure')
455
456        return
457
458
459    def _mk_index_values(self, inputdir, inputfiles, keys):
460        '''Creates an index file for a set of grib parameter keys.
461        The values from the index keys are returned in a list.
462
463        Parameters
464        ----------
465        keys : :obj:`dictionary`
466            List of parameter names which serves as index.
467
468        inputfiles : :obj:`UioFiles`
469            Contains a list of files.
470
471        Return
472        ------
473        iid : :obj:`codes_index`
474            This is a grib specific index structure to access
475            messages in a file.
476
477        index_vals : :obj:`list`
478            Contains the values from the keys used for a distinct selection
479            of grib messages in processing  the grib files.
480            Content looks like e.g.:
481            index_vals[0]: ('20171106', '20171107', '20171108') ; date
482            index_vals[1]: ('0', '1200', '1800', '600') ; time
483            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
484        '''
485        iid = None
486        index_keys = keys
487
488        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
489        silent_remove(indexfile)
490        grib = GribUtil(inputfiles.files)
491        # creates new index file
492        iid = grib.index(index_keys=index_keys, index_file=indexfile)
493
494        # read the values of index keys
495        index_vals = []
496        for key in index_keys:
497            key_vals = codes_index_get(iid, key)
498            # have to sort the key values for correct disaggregation,
499            # therefore convert to int first
500            key_vals = [int(k) for k in key_vals]
501            key_vals.sort()
502            key_vals = [str(k) for k in key_vals]
503            index_vals.append(key_vals)
504            # index_vals looks for example like:
505            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
506            # index_vals[1]: ('0', '1200') ; time
507            # index_vals[2]: (3', '6', '9', '12') ; stepRange
508
509        return iid, index_vals
510
511
512    def retrieve(self, server, dates, public, request, inputdir='.'):
513        '''Finalizing the retrieval information by setting final details
514        depending on grid type.
515        Prepares MARS retrievals per grid type and submits them.
516
517        Parameters
518        ----------
519        server : :obj:`ECMWFService` or :obj:`ECMWFDataServer`
520            The connection to the ECMWF server. This is different
521            for member state users which have full access and non
522            member state users which have only access to the public
523            data sets. The decision is made from command line argument
524            "public"; for public access its True (ECMWFDataServer)
525            for member state users its False (ECMWFService)
526
527        dates : :obj:`string`
528            Contains start and end date of the retrieval in the format
529            "YYYYMMDD/to/YYYYMMDD"
530
531        request : :obj:`integer`
532            Selects the mode of retrieval.
533            0: Retrieves the data from ECMWF.
534            1: Prints the mars requests to an output file.
535            2: Retrieves the data and prints the mars request.
536
537        inputdir : :obj:`string`, optional
538            Path to the directory where the retrieved data is about
539            to be stored. The default is the current directory ('.').
540
541        Return
542        ------
543
544        '''
545        self.dates = dates
546        self.server = server
547        self.public = public
548        self.inputdir = inputdir
549        oro = False
550
551        # define times with datetime module
552        t12h = timedelta(hours=12)
553        t24h = timedelta(hours=24)
554
555        # dictionary which contains all parameter for the mars request,
556        # entries with a "None" will change in different requests and will
557        # therefore be set in each request seperately
558        retr_param_dict = {'marsclass':self.marsclass,
559                           'dataset':self.dataset,
560                           'stream':None,
561                           'type':None,
562                           'levtype':None,
563                           'levelist':None,
564                           'resol':self.resol,
565                           'gaussian':None,
566                           'accuracy':self.accuracy,
567                           'grid':None,
568                           'target':None,
569                           'area':None,
570                           'date':None,
571                           'time':None,
572                           'number':self.number,
573                           'step':None,
574                           'expver':self.expver,
575                           'param':None}
576
577        for ftype in self.types:
578            # fk contains field types such as
579            #     [AN, FC, PF, CV]
580            # fv contains all of the items of the belonging key
581            #     [times, steps]
582            for pk, pv in self.params.iteritems():
583                # pk contains one of these keys of params
584                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
585                #      OG_OROLSM_SL, OG_acc_SL]
586                # pv contains all of the items of the belonging key
587                #     [param, levtype, levelist, grid]
588                if isinstance(pv, str):
589                    continue
590                retr_param_dict['type'] = ftype
591                retr_param_dict['time'] = self.types[ftype]['times']
592                retr_param_dict['step'] = self.types[ftype]['steps']
593                retr_param_dict['date'] = self.dates
594                retr_param_dict['stream'] = self.stream
595                retr_param_dict['target'] = \
596                    self._mk_targetname(ftype,
597                                        pk,
598                                        retr_param_dict['date'].split('/')[0])
599                retr_param_dict['param'] = pv[0]
600                retr_param_dict['levtype'] = pv[1]
601                retr_param_dict['levelist'] = pv[2]
602                retr_param_dict['grid'] = pv[3]
603                retr_param_dict['area'] = self.area
604                retr_param_dict['gaussian'] = self.gaussian
605
606                if pk == 'OG_OROLSM__SL' and not oro:
607                    oro = True
608                    # in CERA20C (class EP) there is no stream "OPER"!
609                    if self.marsclass.upper() != 'EP':
610                        retr_param_dict['stream'] = 'OPER'
611                    retr_param_dict['type'] = 'AN'
612                    retr_param_dict['time'] = '00'
613                    retr_param_dict['step'] = '000'
614                    retr_param_dict['date'] = self.dates.split('/')[0]
615                    retr_param_dict['target'] = self._mk_targetname('',
616                                            pk, retr_param_dict['date'])
617                elif pk == 'OG_OROLSM__SL' and oro:
618                    continue
619                if pk == 'GG__SL' and pv[0] == 'Q':
620                    retr_param_dict['area'] = ""
621                    retr_param_dict['gaussian'] = 'reduced'
622
623    # ------  on demand path  --------------------------------------------------
624                if not self.basetime:
625                    # ******* start retrievement
626                    self._start_retrievement(request, retr_param_dict)
627    # ------  operational path  ------------------------------------------------
628                else:
629                    # check if mars job requests fields beyond basetime.
630                    # if yes eliminate those fields since they may not
631                    # be accessible with user's credentials
632
633                    enddate = retr_param_dict['date'].split('/')[-1]
634                    elimit = datetime.strptime(enddate + self.basetime,
635                                               '%Y%m%d%H')
636
637                    if self.basetime == '12':
638                        # --------------  flux data ----------------------------
639                        if 'acc' in pk:
640
641                        # Strategy:
642                        # if maxtime-elimit >= 24h reduce date by 1,
643                        # if 12h <= maxtime-elimit<12h reduce time for last date
644                        # if maxtime-elimit<12h reduce step for last time
645                        # A split of the MARS job into 2 is likely necessary.
646
647
648                            startdate = retr_param_dict['date'].split('/')[0]
649                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
650                            retr_param_dict['date'] = '/'.join([startdate,
651                                                                'to',
652                                                                enddate])
653
654                            # ******* start retrievement
655                            self._start_retrievement(request, retr_param_dict)
656
657                            retr_param_dict['date'] = \
658                                datetime.strftime(elimit - t12h, '%Y%m%d')
659                            retr_param_dict['time'] = '00'
660                            retr_param_dict['target'] = \
661                                self._mk_targetname(ftype, pk,
662                                                    retr_param_dict['date'])
663
664                            # ******* start retrievement
665                            self._start_retrievement(request, retr_param_dict)
666
667                        # --------------  non flux data ------------------------
668                        else:
669                            # ******* start retrievement
670                            self._start_retrievement(request, retr_param_dict)
671
672                    else: # basetime = 0
673                        retr_param_dict['date'] = \
674                            datetime.strftime(elimit - t24h, '%Y%m%d')
675
676                        timesave = ''.join(retr_param_dict['time'])
677
678                        if '/' in retr_param_dict['time']:
679                            times = retr_param_dict['time'].split('/')
680                            steps = retr_param_dict['step'].split('/')
681                            while (pk != 'OG_OROLSM__SL' and
682                                   'acc' not in pk and
683                                   (int(times[0]) + int(steps[0])) <= 12):
684                                times = times[1:]
685
686                            if len(times) > 1:
687                                retr_param_dict['time'] = '/'.join(times)
688                            else:
689                                retr_param_dict['time'] = times[0]
690
691                        # ******* start retrievement
692                        self._start_retrievement(request, retr_param_dict)
693
694                        if (pk != 'OG_OROLSM__SL' and
695                            int(retr_param_dict['step'].split('/')[0]) == 0 and
696                            int(timesave.split('/')[0]) == 0):
697
698                            retr_param_dict['date'] = \
699                                datetime.strftime(elimit, '%Y%m%d')
700                            retr_param_dict['time'] = '00'
701                            retr_param_dict['step'] = '000'
702                            retr_param_dict['target'] = \
703                                self._mk_targetname(ftype, pk,
704                                                    retr_param_dict['date'])
705
706                            # ******* start retrievement
707                            self._start_retrievement(request, retr_param_dict)
708
709        if request == 0 or request == 2:
710            print('MARS retrieve done ... ')
711        elif request == 1:
712            print('MARS request printed ...')
713
714        return
715
716
717    def write_namelist(self, c):
718        '''Creates a namelist file in the temporary directory and writes
719        the following values to it: maxl, maxb, mlevel,
720        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
721        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
722
723        Parameters
724        ----------
725        c : :obj:`ControlFile`
726            Contains all the parameters of CONTROL file and
727            command line.
728
729        filename : :obj:`string`
730                Name of the namelist file.
731
732        Return
733        ------
734
735        '''
736
737        from genshi.template.text import NewTextTemplate
738        from genshi.template import  TemplateLoader
739        from genshi.template.eval import  UndefinedError
740
741        try:
742            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
743            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
744                                            cls=NewTextTemplate)
745
746            self.inputdir = c.inputdir
747            area = np.asarray(self.area.split('/')).astype(float)
748            grid = np.asarray(self.grid.split('/')).astype(float)
749
750            if area[1] > area[3]:
751                area[1] -= 360
752            maxl = int((area[3] - area[1]) / grid[1]) + 1
753            maxb = int((area[0] - area[2]) / grid[0]) + 1
754
755            stream = namelist_template.generate(
756                maxl = str(maxl),
757                maxb = str(maxb),
758                mlevel = str(self.level),
759                mlevelist = str(self.levelist),
760                mnauf = str(self.resol),
761                metapar = '77',
762                rlo0 = str(area[1]),
763                rlo1 = str(area[3]),
764                rla0 = str(area[2]),
765                rla1 = str(area[0]),
766                momega = str(c.omega),
767                momegadiff = str(c.omegadiff),
768                mgauss = str(c.gauss),
769                msmooth = str(c.smooth),
770                meta = str(c.eta),
771                metadiff = str(c.etadiff),
772                mdpdeta = str(c.dpdeta)
773            )
774        except UndefinedError as e:
775            print('... ERROR ' + str(e))
776
777            sys.exit('\n... error occured while trying to generate namelist ' +
778                     _config.TEMPFILE_NAMELIST)
779        except OSError as e:
780            print('... ERROR CODE: ' + str(e.errno))
781            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
782
783            sys.exit('\n... error occured while trying to generate template ' +
784                     _config.TEMPFILE_NAMELIST)
785
786        try:
787            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
788
789            with open(namelistfile, 'w') as f:
790                f.write(stream.render('text'))
791        except OSError as e:
792            print('... ERROR CODE: ' + str(e.errno))
793            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
794
795            sys.exit('\n... error occured while trying to write ' +
796                     namelistfile)
797
798        return
799
800
801    def deacc_fluxes(self, inputfiles, c):
802        '''De-accumulate and disaggregate flux data.
803
804        Goes through all flux fields in ordered time and de-accumulate
805        the fields. Afterwards the fields are disaggregated in time.
806        Different versions of disaggregation is provided for rainfall
807        data (darain, modified linear) and the surface fluxes and
808        stress data (dapoly, cubic polynomial).
809
810        Parameters
811        ----------
812        inputfiles : :obj:`UioFiles`
813            Contains the list of files that contain flux data.
814
815        c : :obj:`ControlFile`
816            Contains all the parameters of CONTROL file and
817            command line.
818
819        Return
820        ------
821
822        '''
823
824        table128 = init128(_config.PATH_GRIBTABLE)
825        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
826
827        iid = None
828        index_vals = None
829
830        # get the values of the keys which are used for distinct access
831        # of grib messages via product
832        index_keys = ["date", "time", "step"]
833        iid, index_vals = self._mk_index_values(c.inputdir,
834                                                inputfiles,
835                                                index_keys)
836        # index_vals looks like e.g.:
837        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
838        # index_vals[1]: ('0', '600', '1200', '1800') ; time
839        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
840
841        if c.rrint:
842            if not c.purefc:
843                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
844                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
845            else:
846                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
847                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
848                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
849                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
850                end_date = end_date + timedelta(hours=c.maxstep)
851
852            info = get_informations(os.path.join(c.inputdir,
853                                                 inputfiles.files[0]))
854            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
855                                  start_date, end_date)
856            # create numpy array
857            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
858            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
859            it_lsp = 0
860            it_cp = 0
861            date_list = []
862            step_list = []
863
864        # initialize dictionaries to store values
865        orig_vals = {}
866        deac_vals = {}
867        for p in pars:
868            orig_vals[p] = []
869            deac_vals[p] = []
870
871        # "product" genereates each possible combination between the
872        # values of the index keys
873        for prod in product(*index_vals):
874            # e.g. prod = ('20170505', '0', '12')
875            #             (  date    ,time, step)
876
877            print('CURRENT PRODUCT: ', prod)
878
879            for i in range(len(index_keys)):
880                codes_index_select(iid, index_keys[i], prod[i])
881
882            # get first id from current product
883            gid = codes_new_from_index(iid)
884
885            # if there is no data for this specific time combination / product
886            # skip the rest of the for loop and start with next timestep/product
887            if not gid:
888                continue
889
890            # create correct timestamp from the three time informations
891            cdate = str(codes_get(gid, 'date'))
892            time = codes_get(gid, 'time')/100 # integer
893            step = codes_get(gid, 'step') # integer
894            ctime = '{:0>2}'.format(time)
895            cstep = '{:0>3}'.format(step)
896
897            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
898            t_dt = t_date + timedelta(hours=step)
899            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
900            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
901            t_enddate = None
902
903            if c.purefc:
904                fnout = os.path.join(c.inputdir, 'flux' +
905                                     t_date.strftime('%Y%m%d.%H') +
906                                     '.{:0>3}'.format(step-2*int(c.dtime)))
907                gnout = os.path.join(c.inputdir, 'flux' +
908                                     t_date.strftime('%Y%m%d.%H') +
909                                     '.{:0>3}'.format(step-int(c.dtime)))
910                hnout = os.path.join(c.inputdir, 'flux' +
911                                     t_date.strftime('%Y%m%d.%H') +
912                                     '.{:0>3}'.format(step))
913            else:
914                fnout = os.path.join(c.inputdir, 'flux' +
915                                     t_m2dt.strftime('%Y%m%d%H'))
916                gnout = os.path.join(c.inputdir, 'flux' +
917                                     t_m1dt.strftime('%Y%m%d%H'))
918                hnout = os.path.join(c.inputdir, 'flux' +
919                                     t_dt.strftime('%Y%m%d%H'))
920
921            print("outputfile = " + fnout)
922            f_handle = open(fnout, 'w')
923            h_handle = open(hnout, 'w')
924            g_handle = open(gnout, 'w')
925
926            # read message for message and store relevant data fields, where
927            # data keywords are stored in pars
928            while True:
929                if not gid:
930                    break
931                parId = codes_get(gid, 'paramId') # integer
932                step = codes_get(gid, 'step') # integer
933                time = codes_get(gid, 'time') # integer
934                ni = codes_get(gid, 'Ni') # integer
935                nj = codes_get(gid, 'Nj') # integer
936                if parId not in orig_vals.keys():
937                    # parameter is not a flux, find next one
938                    continue
939
940                # define conversion factor
941                if parId == 142 or parId == 143:
942                    fak = 1. / 1000.
943                else:
944                    fak = 3600.
945
946                # get parameter values and reshape
947                values = codes_get_values(gid)
948                values = (np.reshape(values, (nj, ni))).flatten() / fak
949
950                # save the original and accumulated values
951                orig_vals[parId].append(values[:])
952
953                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
954                    # no de-accumulation needed
955                    deac_vals[parId].append(values[:] / int(c.dtime))
956                else:
957                    # do de-accumulation
958                    deac_vals[parId].append(
959                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
960                         int(c.dtime))
961
962                # store precipitation if new disaggregation method is selected
963                # only the exact days are needed
964                if c.rrint:
965                    if start_date <= t_dt <= end_date:
966                        if not c.purefc:
967                            if t_dt not in date_list:
968                                date_list.append(t_dt)
969                                step_list = [0]
970                        else:
971                            if t_date not in date_list:
972                                date_list.append(t_date)
973                            if step not in step_list:
974                                step_list.append(step)
975                        if c.rrint and parId == 142:
976                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
977                            it_lsp += 1
978                        elif c.rrint and parId == 143:
979                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
980                            it_cp += 1
981
982                # information printout
983                print(parId, time, step, len(values), values[0], np.std(values))
984
985                # length of deac_vals[parId] corresponds to the
986                # number of time steps, max. 4 are needed for disaggegration
987                # with the old and original method
988                # run over all grib messages and perform
989                # shifting in time
990                if len(deac_vals[parId]) >= 3:
991                    if len(deac_vals[parId]) > 3:
992                        if not c.rrint and (parId == 142 or parId == 143):
993                            values = disaggregation.darain(deac_vals[parId])
994                        else:
995                            values = disaggregation.dapoly(deac_vals[parId])
996
997                        if not (step == c.maxstep and c.purefc \
998                                or t_dt == t_enddate):
999                            # remove first time step in list to shift
1000                            # time line
1001                            orig_vals[parId].pop(0)
1002                            deac_vals[parId].pop(0)
1003                    else:
1004                        # if the third time step is read (per parId),
1005                        # write out the first one as a boundary value
1006                        if c.purefc:
1007                            values = deac_vals[parId][1]
1008                        else:
1009                            values = deac_vals[parId][0]
1010
1011                    if not (c.rrint and (parId == 142 or parId == 143)):
1012                        codes_set_values(gid, values)
1013
1014                        if c.purefc:
1015                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1016                        else:
1017                            codes_set(gid, 'stepRange', 0)
1018                            codes_set(gid, 'time', t_m2dt.hour*100)
1019                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1020
1021                        codes_write(gid, f_handle)
1022
1023                        if c.basetime:
1024                            t_enddate = datetime.strptime(c.end_date + c.basetime,
1025                                                          '%Y%m%d%H')
1026                        else:
1027                            t_enddate = t_date + timedelta(2*int(c.dtime))
1028
1029                            # squeeze out information of last two steps
1030                            # contained in deac_vals[parId]
1031                            # Note that deac_vals[parId][0] has not been popped
1032                            # in this case
1033
1034                            if step == c.maxstep and c.purefc or \
1035                               t_dt == t_enddate:
1036                                # last step
1037                                if c.purefc:
1038                                    values = deac_vals[parId][3]
1039                                    codes_set_values(gid, values)
1040                                    codes_set(gid, 'stepRange', step)
1041                                    #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1042                                    codes_write(gid, h_handle)
1043                                else:
1044                                    values = deac_vals[parId][3]
1045                                    codes_set_values(gid, values)
1046                                    codes_set(gid, 'stepRange', 0)
1047                                    truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1048                                    codes_set(gid, 'time', truedatetime.hour * 100)
1049                                    codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1050                                    codes_write(gid, h_handle)
1051
1052                                if parId == 142 or parId == 143:
1053                                    values = disaggregation.darain(list(reversed(deac_vals[parId])))
1054                                else:
1055                                    values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1056
1057                                # step before last step
1058                                if c.purefc:
1059                                    codes_set(gid, 'stepRange', step-int(c.dtime))
1060                                    #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1061                                    codes_set_values(gid, values)
1062                                    codes_write(gid, g_handle)
1063                                else:
1064                                    codes_set(gid, 'stepRange', 0)
1065                                    truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1066                                    codes_set(gid, 'time', truedatetime.hour * 100)
1067                                    codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1068                                    codes_set_values(gid, values)
1069                                    codes_write(gid, g_handle)
1070
1071                codes_release(gid)
1072
1073                gid = codes_new_from_index(iid)
1074
1075            f_handle.close()
1076            g_handle.close()
1077            h_handle.close()
1078
1079        codes_index_release(iid)
1080
1081        if c.rrint:
1082            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1083
1084            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1085                                 cp_np, date_list, step_list, c)
1086
1087        return
1088
1089    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1090        '''Calculates and writes out the disaggregated precipitation fields.
1091
1092        Disaggregation is done in time and original times are written to the
1093        flux files, while the additional subgrid times are written to
1094        extra files.
1095
1096        Parameters
1097        ----------
1098        ni : :obj:`integer`
1099            Amount of zonal grid points.
1100
1101        nj : :obj:`integer`
1102            Amount of meridional grid points.
1103
1104        nt : :obj:`integer`
1105            Number of time steps.
1106
1107        lsp_np : :obj:`numpy array` of :obj:`float`
1108            The large scale precipitation fields for each time step.
1109            Shape (ni * nj, nt).
1110
1111        cp_np : :obj:`numpy array` of :obj:`float`
1112            The convective precipitation fields for each time step.
1113            Shape (ni * nj, nt).
1114
1115        date_list : :obj:`list` of :obj:`datetime`
1116            The list of dates for which the disaggregation is to be done.
1117
1118        step_list : :obj:`list` of :obj:`integer`
1119            The list of steps for a single forecast time.
1120            Only necessary for pure forecasts.
1121
1122        c : :obj:`ControlFile`
1123            Contains all the parameters of CONTROL file and
1124            command line.
1125
1126        Return
1127        ------
1128
1129        '''
1130        print('... disaggregation or precipitation with new method.')
1131        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1132        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1133
1134        # do the disaggregation, but neglect the last value of the
1135        # time series. This one corresponds for example to 24 hour,
1136        # which we don't need.
1137        for ix in range(ni*nj):
1138            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1139            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1140
1141        # write to grib files (full/orig times to flux file and inbetween
1142        # times into seperate end files)
1143        print('... write disaggregated precipitation to files.')
1144        it = 0
1145        for date in date_list:
1146            for step in step_list:
1147                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1148
1149                if c.purefc:
1150                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1151                                   '.{:0>3}'.format(step)
1152                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1153                                '.{:0>3}'.format(step) + '_1'
1154                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1155                                '.{:0>3}'.format(step) + '_2'
1156                else:
1157                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1158                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1159                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1160
1161                # collect for final processing
1162                self.outputfilelist.append(os.path.basename(fluxfilename))
1163                self.outputfilelist.append(os.path.basename(filename1))
1164                self.outputfilelist.append(os.path.basename(filename2))
1165
1166                # write original time step to flux file as usual
1167                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1168                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1169                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1170                                  keynames=['date','time','stepRange','values'],
1171                                  keyvalues=[int(date.strftime('%Y%m%d')),
1172                                             date.hour*100, step, lsp_new_np[:,it]],
1173                                 )
1174                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1175                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1176                                  keynames=['date','time','stepRange','values'],
1177                                  keyvalues=[int(date.strftime('%Y%m%d')),
1178                                             date.hour*100, step, cp_new_np[:,it]]
1179                                 )
1180
1181                # write first subgrid time step
1182                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1183                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1184                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1185                                  keynames=['date','time','stepRange','values'],
1186                                  keyvalues=[int(date.strftime('%Y%m%d')),
1187                                             date.hour*100, step, lsp_new_np[:,it+1]]
1188                                  )
1189                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1190                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1191                                  keynames=['date','time','stepRange','values'],
1192                                  keyvalues=[int(date.strftime('%Y%m%d')),
1193                                             date.hour*100, step, cp_new_np[:,it+1]]
1194                                 )
1195
1196                # write second subgrid time step
1197                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1198                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1199                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1200                                  keynames=['date','time','stepRange','values'],
1201                                  keyvalues=[int(date.strftime('%Y%m%d')),
1202                                             date.hour*100, step, lsp_new_np[:,it+2]]
1203                                 )
1204                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1205                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1206                                  keynames=['date','time','stepRange','values'],
1207                                  keyvalues=[int(date.strftime('%Y%m%d')),
1208                                             date.hour*100, step, cp_new_np[:,it+2]]
1209                                 )
1210                it = it + 3 # jump to next original time step
1211        return
1212
1213    def _create_rr_grib_dummy(self, ifile, inputdir):
1214        '''Creates a grib file with a dummy message for the two precipitation
1215        types lsp and cp each.
1216
1217        Parameters
1218        ----------
1219        ifile : :obj:`string`
1220            Filename of the input file to read the grib messages from.
1221
1222        inputdir : :obj:`string`, optional
1223            Path to the directory where the retrieved data is stored.
1224
1225        Return
1226        ------
1227
1228        '''
1229
1230        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1231
1232        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1233                      keyvalues=[142], filemode='w')
1234
1235        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1236                      keyvalues=[143], filemode='a')
1237
1238        return
1239
1240    def create(self, inputfiles, c):
1241        '''An index file will be created which depends on the combination
1242        of "date", "time" and "stepRange" values. This is used to iterate
1243        over all messages in each grib file which were passed through the
1244        parameter "inputfiles" to seperate specific parameters into fort.*
1245        files. Afterwards the FORTRAN program is called to convert
1246        the data fields all to the same grid and put them in one file
1247        per unique time step (combination of "date", "time" and
1248        "stepRange").
1249
1250        Note
1251        ----
1252        This method is based on the ECMWF example index.py
1253        https://software.ecmwf.int/wiki/display/GRIB/index.py
1254
1255        Parameters
1256        ----------
1257        inputfiles : :obj:`UioFiles`
1258            Contains a list of files.
1259
1260        c : :obj:`ControlFile`
1261            Contains all the parameters of CONTROL file and
1262            command line.
1263
1264        Return
1265        ------
1266
1267        '''
1268
1269        if c.wrf:
1270            table128 = init128(_config.PATH_GRIBTABLE)
1271            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1272                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1273                                  table128)
1274
1275        # these numbers are indices for the temporary files "fort.xx"
1276        # which are used to seperate the grib fields to,
1277        # for the Fortran program input
1278        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1279        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1280        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1281                 '17':None, '18':None, '19':None, '21':None, '22':None}
1282
1283        iid = None
1284        index_vals = None
1285
1286        # get the values of the keys which are used for distinct access
1287        # of grib messages via product
1288        index_keys = ["date", "time", "step"]
1289        iid, index_vals = self._mk_index_values(c.inputdir,
1290                                                inputfiles,
1291                                                index_keys)
1292        # index_vals looks like e.g.:
1293        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1294        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1295        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1296
1297        # "product" genereates each possible combination between the
1298        # values of the index keys
1299        for prod in product(*index_vals):
1300            # e.g. prod = ('20170505', '0', '12')
1301            #             (  date    ,time, step)
1302
1303            print('current product: ', prod)
1304
1305            for i in range(len(index_keys)):
1306                codes_index_select(iid, index_keys[i], prod[i])
1307
1308            # get first id from current product
1309            gid = codes_new_from_index(iid)
1310
1311            # if there is no data for this specific time combination / product
1312            # skip the rest of the for loop and start with next timestep/product
1313            if not gid:
1314                continue
1315#============================================================================================
1316            # remove old fort.* files and open new ones
1317            # they are just valid for a single product
1318            for k, f in fdict.iteritems():
1319                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1320                silent_remove(fortfile)
1321                fdict[k] = open(fortfile, 'w')
1322#============================================================================================
1323            # create correct timestamp from the three time informations
1324            cdate = str(codes_get(gid, 'date'))
1325            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1326            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1327            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1328            timestamp += timedelta(hours=int(cstep))
1329            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1330
1331            # if the timestamp is out of basetime start/end date period,
1332            # skip this specific product
1333            if c.basetime:
1334                start_time = datetime.strptime(c.end_date + c.basetime,
1335                                                '%Y%m%d%H') - time_delta
1336                end_time = datetime.strptime(c.end_date + c.basetime,
1337                                             '%Y%m%d%H')
1338                if timestamp < start_time or timestamp > end_time:
1339                    continue
1340
1341            if c.wrf:
1342                if 'olddate' not in locals() or cdate != olddate:
1343                    fwrf = open(os.path.join(c.outputdir,
1344                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1345                    olddate = cdate[:]
1346#============================================================================================
1347            # savedfields remembers which fields were already used.
1348            savedfields = []
1349            # sum of cloud liquid and ice water content
1350            scwc = None
1351            while 1:
1352                if not gid:
1353                    break
1354                paramId = codes_get(gid, 'paramId')
1355                gridtype = codes_get(gid, 'gridType')
1356                levtype = codes_get(gid, 'typeOfLevel')
1357                if paramId == 77: # ETADOT
1358                    codes_write(gid, fdict['21'])
1359                elif paramId == 130: # T
1360                    codes_write(gid, fdict['11'])
1361                elif paramId == 131 or paramId == 132: # U, V wind component
1362                    codes_write(gid, fdict['10'])
1363                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1364                    codes_write(gid, fdict['17'])
1365                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1366                    codes_write(gid, fdict['18'])
1367                elif paramId == 135: # W
1368                    codes_write(gid, fdict['19'])
1369                elif paramId == 152: # LNSP
1370                    codes_write(gid, fdict['12'])
1371                elif paramId == 155 and gridtype == 'sh': # D
1372                    codes_write(gid, fdict['13'])
1373                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1374                    # sum cloud liquid water and ice
1375                    if scwc is None:
1376                        scwc = codes_get_values(gid)
1377                    else:
1378                        scwc += codes_get_values(gid)
1379                        codes_set_values(gid, scwc)
1380                        codes_set(gid, 'paramId', 201031)
1381                        codes_write(gid, fdict['22'])
1382                        scwc = None
1383                elif c.wrf and paramId in [129, 138, 155] and \
1384                      levtype == 'hybrid': # Z, VO, D
1385                    # do not do anything right now
1386                    # these are specific parameter for WRF
1387                    pass
1388                else:
1389                    if paramId not in savedfields:
1390                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1391                        # and all ADDPAR parameter
1392                        codes_write(gid, fdict['16'])
1393                        savedfields.append(paramId)
1394                    else:
1395                        print('duplicate ' + str(paramId) + ' not written')
1396
1397                try:
1398                    if c.wrf:
1399                        # model layer
1400                        if levtype == 'hybrid' and \
1401                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1402                            codes_write(gid, fwrf)
1403                        # sfc layer
1404                        elif paramId in wrfpars:
1405                            codes_write(gid, fwrf)
1406                except AttributeError:
1407                    pass
1408
1409                codes_release(gid)
1410                gid = codes_new_from_index(iid)
1411#============================================================================================
1412            for f in fdict.values():
1413                f.close()
1414#============================================================================================
1415            # call for Fortran program to convert e.g. reduced_gg grids to
1416            # regular_ll and calculate detadot/dp
1417            pwd = os.getcwd()
1418            os.chdir(c.inputdir)
1419            if os.stat('fort.21').st_size == 0 and c.eta:
1420                print('Parameter 77 (etadot) is missing, most likely it is \
1421                       not available for this type or date/time\n')
1422                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1423                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1424                         is set to 1 in CONTROL file')
1425#============================================================================================
1426            # write out all output to log file before starting fortran programm
1427            sys.stdout.flush()
1428
1429            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1430            execute_subprocess([os.path.join(c.exedir,
1431                                _config.FORTRAN_EXECUTABLE)],
1432                               error_msg='FORTRAN PROGRAM FAILED!')#shell=True)
1433
1434            os.chdir(pwd)
1435#============================================================================================
1436            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1437            if c.purefc:
1438                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1439            else:
1440                suffix = cdate_hour[2:10]
1441            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1442            print("outputfile = " + fnout)
1443            # collect for final processing
1444            self.outputfilelist.append(os.path.basename(fnout))
1445#============================================================================================
1446            # create outputfile and copy all data from intermediate files
1447            # to the outputfile (final GRIB input files for FLEXPART)
1448            orolsm = os.path.basename(glob.glob(c.inputdir +
1449                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1450            fluxfile = 'flux' + cdate[0:2] + suffix
1451            if not c.cwc:
1452                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1453            else:
1454                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1455
1456            with open(fnout, 'wb') as fout:
1457                for f in flist:
1458                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1459                                       fout)
1460
1461            if c.omega:
1462                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1463                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1464                                            'rb'), fout)
1465#============================================================================================
1466        if c.wrf:
1467            fwrf.close()
1468
1469        codes_index_release(iid)
1470
1471        return
1472
1473
1474    def process_output(self, c):
1475        '''Postprocessing of FLEXPART input files.
1476
1477        The grib files are postprocessed depending on the selection in
1478        CONTROL file. The resulting files are moved to the output
1479        directory if its not equal to the input directory.
1480        The following modifications might be done if
1481        properly switched in CONTROL file:
1482        GRIB2 - Conversion to GRIB2
1483        ECTRANS - Transfer of files to gateway server
1484        ECSTORAGE - Storage at ECMWF server
1485
1486        Parameters
1487        ----------
1488        c : :obj:`ControlFile`
1489            Contains all the parameters of CONTROL file and
1490            command line.
1491
1492        Return
1493        ------
1494
1495        '''
1496
1497        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1498
1499        if not c.ecapi:
1500            print('ecstorage: {}\n ecfsdir: {}\n'.
1501                  format(c.ecstorage, c.ecfsdir))
1502            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1503                  .format(c.ectrans, c.gateway, c.destination))
1504
1505        print('Output filelist: ')
1506        print(self.outputfilelist)
1507
1508        for ofile in self.outputfilelist:
1509            ofile = os.path.join(self.inputdir, ofile)
1510
1511            if c.format.lower() == 'grib2':
1512                execute_subprocess(['grib_set', '-s', 'edition=2,' +
1513                                    'productDefinitionTemplateNumber=8',
1514                                    ofile, ofile + '_2'],
1515                                   error_msg='GRIB2 CONVERSION FAILED!')
1516
1517                execute_subprocess(['mv', ofile + '_2', ofile],
1518                                   error_msg='RENAMING FOR NEW GRIB2 FORMAT '
1519                                   'FILES FAILED!')
1520
1521            if c.ectrans and not c.ecapi:
1522                execute_subprocess(['ectrans', '-overwrite', '-gateway',
1523                                    c.gateway, '-remote', c.destination,
1524                                    '-source', ofile],
1525                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
1526
1527            if c.ecstorage and not c.ecapi:
1528                execute_subprocess(['ecp', '-o', ofile,
1529                                    os.path.expandvars(c.ecfsdir)],
1530                                   error_msg='COPY OF FILES TO ECSTORAGE '
1531                                   'AREA FAILED!')
1532
1533            if c.outputdir != c.inputdir:
1534                execute_subprocess(['mv', os.path.join(c.inputdir, ofile),
1535                                    c.outputdir],
1536                                   error_msg='RELOCATION OF OUTPUT FILES '
1537                                   'TO OUTPUTDIR FAILED!')
1538
1539        return
1540
1541
1542    def prepare_fp_files(self, c):
1543        '''Conversion of GRIB files to FLEXPART binary format.
1544
1545        Parameters
1546        ----------
1547        c : :obj:`ControlFile`
1548            Contains all the parameters of CONTROL file and
1549            command line.
1550
1551        Return
1552        ------
1553
1554        '''
1555        # generate AVAILABLE file
1556        # Example of AVAILABLE file data:
1557        # 20131107 000000      EN13110700              ON DISC
1558        clist = []
1559        for ofile in self.outputfilelist:
1560            fname = ofile.split('/')
1561            if '.' in fname[-1]:
1562                l = fname[-1].split('.')
1563                timestamp = datetime.strptime(l[0][-6:] + l[1],
1564                                              '%y%m%d%H')
1565                timestamp += timedelta(hours=int(l[2]))
1566                cdate = datetime.strftime(timestamp, '%Y%m%d')
1567                chms = datetime.strftime(timestamp, '%H%M%S')
1568            else:
1569                cdate = '20' + fname[-1][-8:-2]
1570                chms = fname[-1][-2:] + '0000'
1571            clist.append(cdate + ' ' + chms + ' '*6 +
1572                         fname[-1] + ' '*14 + 'ON DISC')
1573        clist.sort()
1574        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1575            f.write('\n'.join(clist) + '\n')
1576
1577        # generate pathnames file
1578        pwd = os.path.abspath(c.outputdir)
1579        with open(pwd + '/pathnames', 'w') as f:
1580            f.write(pwd + '/Options/\n')
1581            f.write(pwd + '/\n')
1582            f.write(pwd + '/\n')
1583            f.write(pwd + '/AVAILABLE\n')
1584            f.write(' = == = == = == = == = == ==  = \n')
1585
1586        # create Options dir if necessary
1587        if not os.path.exists(pwd + '/Options'):
1588            make_dir(pwd+'/Options')
1589
1590        # read template COMMAND file
1591        with open(os.path.expandvars(os.path.expanduser(
1592            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1593            lflist = f.read().split('\n')
1594
1595        # find index of list where to put in the
1596        # date and time information
1597        # usually after the LDIRECT parameter
1598        i = 0
1599        for l in lflist:
1600            if 'LDIRECT' in l.upper():
1601                break
1602            i += 1
1603
1604        # insert the date and time information of run start and end
1605        # into the list of lines of COMMAND file
1606        lflist = lflist[:i+1] + \
1607                 [clist[0][:16], clist[-1][:16]] + \
1608                 lflist[i+3:]
1609
1610        # write the new COMMAND file
1611        with open(pwd + '/Options/COMMAND', 'w') as g:
1612            g.write('\n'.join(lflist) + '\n')
1613
1614        # change to outputdir and start the grib2flexpart run
1615        # afterwards switch back to the working dir
1616        os.chdir(c.outputdir)
1617        cmd = [os.path.expandvars(os.path.expanduser(c.flexpartdir)) +
1618         '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']
1619        execute_subprocess(cmd)
1620        os.chdir(pwd)
1621
1622        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG