source: flex_extract.git/source/python/classes/EcFlexpart.py @ 092aaf1

ctbtodev
Last change on this file since 092aaf1 was 092aaf1, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

modularized the init function; implemented disaggregation with new interpolation method

  • Property mode set to 100644
File size: 66.1 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83
84from eccodes import (codes_index_select, codes_new_from_index, codes_get,
85                     codes_get_values, codes_set_values, codes_set,
86                     codes_write, codes_release, codes_new_from_index,
87                     codes_index_release, codes_index_get)
88
89# software specific classes and modules from flex_extract
90sys.path.append('../')
91import _config
92from GribUtil import GribUtil
93from mods.tools import (init128, to_param_id, silent_remove, product,
94                        my_error, make_dir, get_informations, get_dimensions)
95from MarsRetrieval import MarsRetrieval
96import mods.disaggregation as disaggregation
97
98# ------------------------------------------------------------------------------
99# CLASS
100# ------------------------------------------------------------------------------
101class EcFlexpart(object):
102    '''
103    Class to retrieve FLEXPART specific ECMWF data.
104    '''
105    # --------------------------------------------------------------------------
106    # CLASS FUNCTIONS
107    # --------------------------------------------------------------------------
108    def __init__(self, c, fluxes=False):
109        '''Creates an object/instance of EcFlexpart with the associated
110        settings of its attributes for the retrieval.
111
112        Parameters:
113        -----------
114        c : :obj:`ControlFile`
115            Contains all the parameters of CONTROL file and
116            command line.
117
118        fluxes : :obj:`boolean`, optional
119            Decides if the flux parameter settings are stored or
120            the rest of the parameter list.
121            Default value is False.
122
123        Return
124        ------
125
126        '''
127        # set a counter for the number of generated mars requests
128        self.mreq_count = 0
129
130        self.inputdir = c.inputdir
131        self.dataset = c.dataset
132        self.basetime = c.basetime
133        self.dtime = c.dtime
134        self.acctype = c.acctype
135        self.acctime = c.acctime
136        self.accmaxstep = c.accmaxstep
137        self.marsclass = c.marsclass
138        self.stream = c.stream
139        self.number = c.number
140        self.resol = c.resol
141        self.accuracy = c.accuracy
142        self.addpar = c.addpar
143        self.level = c.level
144        self.expver = c.expver
145        self.levelist = c.levelist
146        self.glevelist = '1/to/' + c.level # in case of gaussian grid
147        self.gaussian = c.gaussian
148        self.grid = c.grid
149        self.area = c.area
150        self.purefc = c.purefc
151        self.outputfilelist = []
152
153        # Define the different types of field combinations (type, time, step)
154        self.types = {}
155        # Define the parameters and their level types, level list and
156        # grid resolution for the retrieval job
157        self.params = {}
158
159        if fluxes:
160            self._create_params_fluxes()
161        else:
162            self._create_params(c.gauss, c.eta, c.omega, c.cwc, c.wrf)
163
164        if fluxes and not c.purefc:
165            self._create_field_types_fluxes()
166        else:
167            self._create_field_types(c.type, c.time, c.step)
168        return
169
170    def _create_field_types(self, ftype, ftime, fstep):
171        '''Create the combination of field type, time and forecast step.
172
173        Parameters:
174        -----------
175        ftype : :obj:`list` of :obj:`string`
176            List of field types.
177
178        ftime : :obj:`list` of :obj:`string`
179            The time in hours of the field.
180
181        fstep : :obj:`string`
182            Specifies the forecast time step from forecast base time.
183            Valid values are hours (HH) from forecast base time.
184
185        Return
186        ------
187
188        '''
189        i = 0
190        for ty, st, ti in zip(ftype, fstep, ftime):
191            btlist = range(24)
192            if self.basetime == '12':
193                btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
194            if self.basetime == '00':
195                btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
196
197            # if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
198                # (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
199                 # (int(c.step[i]) % int(c.dtime) == 0)) ) and \
200                 # (int(c.time[i]) in btlist or c.purefc):
201
202            if (int(ti) in btlist) or self.purefc:
203
204                if ((ty.upper() == 'AN' and (int(ti) % int(self.dtime)) == 0) or
205                    (ty.upper() != 'AN' and (int(st) % int(self.dtime)) == 0)):
206
207                    if ty not in self.types.keys():
208                        self.types[ty] = {'times': '', 'steps': ''}
209
210                    if ti not in self.types[ty]['times']:
211                        if self.types[ty]['times']:
212                            self.types[ty]['times'] += '/'
213                        self.types[ty]['times'] += ti
214
215                    if st not in self.types[ty]['steps']:
216                        if self.types[ty]['steps']:
217                            self.types[ty]['steps'] += '/'
218                        self.types[ty]['steps'] += st
219            i += 1
220        return
221
222    def _create_field_types_fluxes(self):
223        '''Create the combination of field type, time and forecast step
224        for the flux data.
225
226        Parameters:
227        -----------
228
229        Return
230        ------
231
232        '''
233        self.types[str(self.acctype)] = {'times': str(self.acctime),
234                                         'steps': '{}/to/{}/by/{}'.format(
235                                             self.dtime,
236                                             self.accmaxstep,
237                                             self.dtime)}
238        return
239
240    def _create_params(self, gauss, eta, omega, cwc, wrf):
241        '''Define the specific parameter settings for retrievment.
242
243        The different parameters need specific grid types and level types
244        for retrievement. We might get following combination of types
245        (depending on selection and availability):
246        (These are short cuts for the grib file names (leading sequence)
247        SH__ML, OG__ML, GG__ML, SH__SL, OG__SL, GG__SL, OG_OROLSM_SL
248        where:
249            SH = Spherical Harmonics, GG = Gaussian Grid, OG = Output Grid,
250            ML = Model Level, SL = Surface Level
251
252        For each of this combination there is a list of parameter names,
253        the level type, the level list and the grid resolution.
254
255        There are different scenarios for data extraction from MARS:
256        1) Retrieval of etadot
257           eta=1, gauss=0, omega=0
258        2) Calculation of etadot from divergence
259           eta=0, gauss=1, omega=0
260        3) Calculation of etadot from omega (for makes sense for debugging)
261           eta=0, gauss=0, omega=1
262        4) Retrieval and Calculation of etadot (only for debugging)
263           eta=1, gauss=1, omega=0
264        5) Download also specific model and surface level data for FLEXPART-WRF
265
266        Parameters:
267        -----------
268        gauss : :obj:`integer`
269            Gaussian grid is retrieved.
270
271        eta : :obj:`integer`
272            Etadot parameter will be directly retrieved.
273
274        omega : :obj:`integer`
275            The omega paramterwill be retrieved.
276
277        cwc : :obj:`integer`
278            The cloud liquid and ice water content will be retrieved.
279
280        wrf : :obj:`integer`
281            Additional model level and surface level data will be retrieved for
282            WRF/FLEXPART-WRF simulations.
283
284        Return
285        ------
286
287        '''
288        # SURFACE FIELDS
289        #-----------------------------------------------------------------------
290        self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
291        self.params['OG__SL'] = ['SD/MSL/TCC/10U/10V/2T/2D/Z/LSM', \
292                                 'SFC', '1', self.grid]
293        if self.addpar:
294            self.params['OG__SL'][0] += self.addpar
295
296        if self.marsclass.upper() == 'EA' or self.marsclass.upper() == 'EP':
297            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/FSR",
298                                            'SFC', '1', self.grid]
299        else:
300            self.params['OG_OROLSM__SL'] = ["SDOR/CVL/CVH/SR", \
301                                            'SFC', '1', self.grid]
302
303        # MODEL LEVEL FIELDS
304        #-----------------------------------------------------------------------
305        self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
306
307        if not gauss and eta:
308            self.params['OG__ML'][0] += '/U/V/ETADOT'
309        elif gauss and not eta:
310            self.params['GG__SL'] = ['Q', 'ML', '1', \
311                                     '{}'.format((int(self.resol) + 1) / 2)]
312            self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
313        elif not gauss and not eta:
314            self.params['OG__ML'][0] += '/U/V'
315        else:
316            print('Warning: Collecting etadot and parameters for gaussian grid \
317                            is a very costly parameter combination, \
318                            use this combination only for debugging!')
319            self.params['GG__SL'] = ['Q', 'ML', '1', \
320                                     '{}'.format((int(self.resol) + 1) / 2)]
321            self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
322                                     '{}'.format((int(self.resol) + 1) / 2)]
323
324        if omega:
325            self.params['OG__ML'][0] += '/W'
326
327        if cwc:
328            self.params['OG__ML'][0] += '/CLWC/CIWC'
329
330        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
331        #-----------------------------------------------------------------------
332        if wrf:
333            self.params['OG__ML'][0] += '/Z/VO'
334            if '/D' not in self.params['OG__ML'][0]:
335                self.params['OG__ML'][0] += '/D'
336            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
337                       'SWVL1','SWVL2','SWVL3','SWVL4']
338            for par in wrf_sfc:
339                if par not in self.params['OG__SL'][0]:
340                    self.params['OG__SL'][0] += '/' + par
341
342        return
343
344
345    def _create_params_fluxes(self):
346        '''Define the parameter setting for flux data.
347
348        Flux data are accumulated fields in time and are stored on the
349        surface level. The leading short cut name for the grib files is:
350        "OG_acc_SL" with OG for Regular Output Grid, SL for Surface Level, and
351        acc for Accumulated Grid.
352        The params dictionary stores a list of parameter names, the level type,
353        the level list and the grid resolution.
354
355        The flux data are: LSP/CP/SSHF/EWSS/NSSS/SSR
356
357        Parameters:
358        -----------
359
360        Return
361        ------
362
363        '''
364        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
365                                    'SFC', '1', self.grid]
366        return
367
368
369    def _mk_targetname(self, ftype, param, date):
370        '''Creates the filename for the requested grib data to be stored in.
371        This name is passed as the "target" parameter in the request.
372
373        Parameters
374        ----------
375        ftype : :obj:`string`
376            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
377
378        param : :obj:`string`
379            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
380            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
381
382        date : :obj:`string`
383            The date period of the grib data to be stored in this file.
384
385        Return
386        ------
387        targetname : :obj:`string`
388            The target filename for the grib data.
389        '''
390        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
391                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
392
393        return targetname
394
395
396    def _start_retrievement(self, request, par_dict):
397        '''Creates the Mars Retrieval and prints or submits the request
398        depending on the status of the request variable.
399
400        Parameters
401        ----------
402        request : :obj:`integer`
403            Selects the mode of retrieval.
404            0: Retrieves the data from ECMWF.
405            1: Prints the mars requests to an output file.
406            2: Retrieves the data and prints the mars request.
407
408        par_dict : :obj:`dictionary`
409            Contains all parameter which have to be set for creating the
410            Mars Retrievals. The parameter are:
411            marsclass, dataset, stream, type, levtype, levelist, resol,
412            gaussian, accuracy, grid, target, area, date, time, number,
413            step, expver, param
414
415        Return
416        ------
417
418        '''
419        # increase number of mars requests
420        self.mreq_count += 1
421
422        MR = MarsRetrieval(self.server,
423                           self.public,
424                           marsclass=par_dict['marsclass'],
425                           dataset=par_dict['dataset'],
426                           stream=par_dict['stream'],
427                           type=par_dict['type'],
428                           levtype=par_dict['levtype'],
429                           levelist=par_dict['levelist'],
430                           resol=par_dict['resol'],
431                           gaussian=par_dict['gaussian'],
432                           accuracy=par_dict['accuracy'],
433                           grid=par_dict['grid'],
434                           target=par_dict['target'],
435                           area=par_dict['area'],
436                           date=par_dict['date'],
437                           time=par_dict['time'],
438                           number=par_dict['number'],
439                           step=par_dict['step'],
440                           expver=par_dict['expver'],
441                           param=par_dict['param'])
442
443        if request == 0:
444            MR.display_info()
445            MR.data_retrieve()
446        elif request == 1:
447            MR.print_infodata_csv(self.inputdir, self.mreq_count)
448        elif request == 2:
449            MR.print_infodata_csv(self.inputdir, self.mreq_count)
450            MR.display_info()
451            MR.data_retrieve()
452        else:
453            print('Failure')
454
455        return
456
457
458    def _mk_index_values(self, inputdir, inputfiles, keys):
459        '''Creates an index file for a set of grib parameter keys.
460        The values from the index keys are returned in a list.
461
462        Parameters
463        ----------
464        keys : :obj:`dictionary`
465            List of parameter names which serves as index.
466
467        inputfiles : :obj:`UioFiles`
468            Contains a list of files.
469
470        Return
471        ------
472        iid : :obj:`codes_index`
473            This is a grib specific index structure to access
474            messages in a file.
475
476        index_vals : :obj:`list`
477            Contains the values from the keys used for a distinct selection
478            of grib messages in processing  the grib files.
479            Content looks like e.g.:
480            index_vals[0]: ('20171106', '20171107', '20171108') ; date
481            index_vals[1]: ('0', '1200', '1800', '600') ; time
482            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
483        '''
484        iid = None
485        index_keys = keys
486
487        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
488        silent_remove(indexfile)
489        grib = GribUtil(inputfiles.files)
490        # creates new index file
491        iid = grib.index(index_keys=index_keys, index_file=indexfile)
492
493        # read the values of index keys
494        index_vals = []
495        for key in index_keys:
496            key_vals = codes_index_get(iid, key)
497            # have to sort the key values for correct disaggregation,
498            # therefore convert to int first
499            key_vals = [int(k) for k in key_vals]
500            key_vals.sort()
501            key_vals = [str(k) for k in key_vals]
502            index_vals.append(key_vals)
503            # index_vals looks for example like:
504            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
505            # index_vals[1]: ('0', '1200') ; time
506            # index_vals[2]: (3', '6', '9', '12') ; stepRange
507
508        return iid, index_vals
509
510
511    def retrieve(self, server, dates, public, request, inputdir='.'):
512        '''Finalizing the retrieval information by setting final details
513        depending on grid type.
514        Prepares MARS retrievals per grid type and submits them.
515
516        Parameters
517        ----------
518        server : :obj:`ECMWFService` or :obj:`ECMWFDataServer`
519            The connection to the ECMWF server. This is different
520            for member state users which have full access and non
521            member state users which have only access to the public
522            data sets. The decision is made from command line argument
523            "public"; for public access its True (ECMWFDataServer)
524            for member state users its False (ECMWFService)
525
526        dates : :obj:`string`
527            Contains start and end date of the retrieval in the format
528            "YYYYMMDD/to/YYYYMMDD"
529
530        request : :obj:`integer`
531            Selects the mode of retrieval.
532            0: Retrieves the data from ECMWF.
533            1: Prints the mars requests to an output file.
534            2: Retrieves the data and prints the mars request.
535
536        inputdir : :obj:`string`, optional
537            Path to the directory where the retrieved data is about
538            to be stored. The default is the current directory ('.').
539
540        Return
541        ------
542
543        '''
544        self.dates = dates
545        self.server = server
546        self.public = public
547        self.inputdir = inputdir
548        oro = False
549
550        # define times with datetime module
551        t12h = timedelta(hours=12)
552        t24h = timedelta(hours=24)
553
554        # dictionary which contains all parameter for the mars request,
555        # entries with a "None" will change in different requests and will
556        # therefore be set in each request seperately
557        retr_param_dict = {'marsclass':self.marsclass,
558                           'dataset':self.dataset,
559                           'stream':None,
560                           'type':None,
561                           'levtype':None,
562                           'levelist':None,
563                           'resol':self.resol,
564                           'gaussian':None,
565                           'accuracy':self.accuracy,
566                           'grid':None,
567                           'target':None,
568                           'area':None,
569                           'date':None,
570                           'time':None,
571                           'number':self.number,
572                           'step':None,
573                           'expver':self.expver,
574                           'param':None}
575
576        for ftype in self.types:
577            # fk contains field types such as
578            #     [AN, FC, PF, CV]
579            # fv contains all of the items of the belonging key
580            #     [times, steps]
581            for pk, pv in self.params.iteritems():
582                # pk contains one of these keys of params
583                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
584                #      OG_OROLSM_SL, OG_acc_SL]
585                # pv contains all of the items of the belonging key
586                #     [param, levtype, levelist, grid]
587                if isinstance(pv, str):
588                    continue
589                retr_param_dict['type'] = ftype
590                retr_param_dict['time'] = self.types[ftype]['times']
591                retr_param_dict['step'] = self.types[ftype]['steps']
592                retr_param_dict['date'] = self.dates
593                retr_param_dict['stream'] = self.stream
594                retr_param_dict['target'] = \
595                    self._mk_targetname(ftype,
596                                        pk,
597                                        retr_param_dict['date'].split('/')[0])
598                retr_param_dict['param'] = pv[0]
599                retr_param_dict['levtype'] = pv[1]
600                retr_param_dict['levelist'] = pv[2]
601                retr_param_dict['grid'] = pv[3]
602                retr_param_dict['area'] = self.area
603                retr_param_dict['gaussian'] = self.gaussian
604
605                if pk == 'OG_OROLSM__SL' and not oro:
606                    oro = True
607                    # in CERA20C (class EP) there is no stream "OPER"!
608                    if self.marsclass.upper() != 'EP':
609                        retr_param_dict['stream'] = 'OPER'
610                    retr_param_dict['type'] = 'AN'
611                    retr_param_dict['time'] = '00'
612                    retr_param_dict['step'] = '000'
613                    retr_param_dict['date'] = self.dates.split('/')[0]
614                    retr_param_dict['target'] = self._mk_targetname('',
615                                            pk, retr_param_dict['date'])
616                elif pk == 'OG_OROLSM__SL' and oro:
617                    continue
618                if pk == 'GG__SL' and pv[0] == 'Q':
619                    retr_param_dict['area'] = ""
620                    retr_param_dict['gaussian'] = 'reduced'
621
622    # ------  on demand path  --------------------------------------------------
623                if not self.basetime:
624                    # ******* start retrievement
625                    self._start_retrievement(request, retr_param_dict)
626    # ------  operational path  ------------------------------------------------
627                else:
628                    # check if mars job requests fields beyond basetime.
629                    # if yes eliminate those fields since they may not
630                    # be accessible with user's credentials
631
632                    enddate = retr_param_dict['date'].split('/')[-1]
633                    elimit = datetime.strptime(enddate + self.basetime,
634                                               '%Y%m%d%H')
635
636                    if self.basetime == '12':
637                        # --------------  flux data ----------------------------
638                        if 'acc' in pk:
639
640                        # Strategy:
641                        # if maxtime-elimit >= 24h reduce date by 1,
642                        # if 12h <= maxtime-elimit<12h reduce time for last date
643                        # if maxtime-elimit<12h reduce step for last time
644                        # A split of the MARS job into 2 is likely necessary.
645
646
647                            startdate = retr_param_dict['date'].split('/')[0]
648                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
649                            retr_param_dict['date'] = '/'.join([startdate,
650                                                                'to',
651                                                                enddate])
652
653                            # ******* start retrievement
654                            self._start_retrievement(request, retr_param_dict)
655
656                            retr_param_dict['date'] = \
657                                datetime.strftime(elimit - t12h, '%Y%m%d')
658                            retr_param_dict['time'] = '00'
659                            retr_param_dict['target'] = \
660                                self._mk_targetname(ftype, pk,
661                                                    retr_param_dict['date'])
662
663                            # ******* start retrievement
664                            self._start_retrievement(request, retr_param_dict)
665
666                        # --------------  non flux data ------------------------
667                        else:
668                            # ******* start retrievement
669                            self._start_retrievement(request, retr_param_dict)
670
671                    else: # basetime = 0
672                        retr_param_dict['date'] = \
673                            datetime.strftime(elimit - t24h, '%Y%m%d')
674
675                        timesave = ''.join(retr_param_dict['time'])
676
677                        if '/' in retr_param_dict['time']:
678                            times = retr_param_dict['time'].split('/')
679                            steps = retr_param_dict['step'].split('/')
680                            while (pk != 'OG_OROLSM__SL' and
681                                   'acc' not in pk and
682                                   (int(times[0]) + int(steps[0])) <= 12):
683                                times = times[1:]
684
685                            if len(times) > 1:
686                                retr_param_dict['time'] = '/'.join(times)
687                            else:
688                                retr_param_dict['time'] = times[0]
689
690                        # ******* start retrievement
691                        self._start_retrievement(request, retr_param_dict)
692
693                        if (pk != 'OG_OROLSM__SL' and
694                            int(retr_param_dict['step'].split('/')[0]) == 0 and
695                            int(timesave.split('/')[0]) == 0):
696
697                            retr_param_dict['date'] = \
698                                datetime.strftime(elimit, '%Y%m%d')
699                            retr_param_dict['time'] = '00'
700                            retr_param_dict['step'] = '000'
701                            retr_param_dict['target'] = \
702                                self._mk_targetname(ftype, pk,
703                                                    retr_param_dict['date'])
704
705                            # ******* start retrievement
706                            self._start_retrievement(request, retr_param_dict)
707
708        if request == 0 or request == 2:
709            print('MARS retrieve done ... ')
710        elif request == 1:
711            print('MARS request printed ...')
712
713        return
714
715
716    def write_namelist(self, c):
717        '''Creates a namelist file in the temporary directory and writes
718        the following values to it: maxl, maxb, mlevel,
719        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
720        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
721
722        Parameters
723        ----------
724        c : :obj:`ControlFile`
725            Contains all the parameters of CONTROL file and
726            command line.
727
728        filename : :obj:`string`
729                Name of the namelist file.
730
731        Return
732        ------
733
734        '''
735
736        from genshi.template.text import NewTextTemplate
737        from genshi.template import  TemplateLoader
738        from genshi.template.eval import  UndefinedError
739
740        try:
741            loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
742            namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
743                                            cls=NewTextTemplate)
744
745            self.inputdir = c.inputdir
746            area = np.asarray(self.area.split('/')).astype(float)
747            grid = np.asarray(self.grid.split('/')).astype(float)
748
749            if area[1] > area[3]:
750                area[1] -= 360
751            maxl = int((area[3] - area[1]) / grid[1]) + 1
752            maxb = int((area[0] - area[2]) / grid[0]) + 1
753
754            stream = namelist_template.generate(
755                maxl = str(maxl),
756                maxb = str(maxb),
757                mlevel = str(self.level),
758                mlevelist = str(self.levelist),
759                mnauf = str(self.resol),
760                metapar = '77',
761                rlo0 = str(area[1]),
762                rlo1 = str(area[3]),
763                rla0 = str(area[2]),
764                rla1 = str(area[0]),
765                momega = str(c.omega),
766                momegadiff = str(c.omegadiff),
767                mgauss = str(c.gauss),
768                msmooth = str(c.smooth),
769                meta = str(c.eta),
770                metadiff = str(c.etadiff),
771                mdpdeta = str(c.dpdeta)
772            )
773        except UndefinedError as e:
774            print('... ERROR ' + str(e))
775
776            sys.exit('\n... error occured while trying to generate namelist ' +
777                     _config.TEMPFILE_NAMELIST)
778        except OSError as e:
779            print('... ERROR CODE: ' + str(e.errno))
780            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
781
782            sys.exit('\n... error occured while trying to generate template ' +
783                     _config.TEMPFILE_NAMELIST)
784
785        try:
786            namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
787
788            with open(namelistfile, 'w') as f:
789                f.write(stream.render('text'))
790        except OSError as e:
791            print('... ERROR CODE: ' + str(e.errno))
792            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
793
794            sys.exit('\n... error occured while trying to write ' +
795                     namelistfile)
796
797        return
798
799
800    def deacc_fluxes(self, inputfiles, c):
801        '''De-accumulate and disaggregate flux data.
802
803        Goes through all flux fields in ordered time and de-accumulate
804        the fields. Afterwards the fields are disaggregated in time.
805        Different versions of disaggregation is provided for rainfall
806        data (darain, modified linear) and the surface fluxes and
807        stress data (dapoly, cubic polynomial).
808
809        Parameters
810        ----------
811        inputfiles : :obj:`UioFiles`
812            Contains the list of files that contain flux data.
813
814        c : :obj:`ControlFile`
815            Contains all the parameters of CONTROL file and
816            command line.
817
818        Return
819        ------
820
821        '''
822
823        table128 = init128(_config.PATH_GRIBTABLE)
824        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
825
826        iid = None
827        index_vals = None
828
829        # get the values of the keys which are used for distinct access
830        # of grib messages via product
831        index_keys = ["date", "time", "step"]
832        iid, index_vals = self._mk_index_values(c.inputdir,
833                                                inputfiles,
834                                                index_keys)
835        # index_vals looks like e.g.:
836        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
837        # index_vals[1]: ('0', '600', '1200', '1800') ; time
838        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
839
840        if c.rrint:
841            if not c.purefc:
842                start_date = datetime.strptime(c.start_date + '00', '%Y%m%d%H')
843                end_date = datetime.strptime(c.end_date + '23', '%Y%m%d%H')
844            else:
845                sdate_str = c.start_date + '{:0>2}'.format(index_vals[1][0])
846                start_date = datetime.strptime(sdate_str, '%Y%m%d%H')
847                edate_str = c.end_date + '{:0>2}'.format(index_vals[1][-1])
848                end_date = datetime.strptime(edate_str, '%Y%m%d%H')
849                end_date = end_date + timedelta(hours=c.maxstep)
850
851            info = get_informations(os.path.join(c.inputdir,
852                                                 inputfiles.files[0]))
853            dims = get_dimensions(info, c.purefc, c.dtime, index_vals,
854                                  start_date, end_date)
855            # create numpy array
856            lsp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
857            cp_np = np.zeros((dims[1] * dims[0], dims[2]), dtype=np.float64)
858            it_lsp = 0
859            it_cp = 0
860            date_list = []
861            step_list = []
862
863        # initialize dictionaries to store values
864        orig_vals = {}
865        deac_vals = {}
866        for p in pars:
867            orig_vals[p] = []
868            deac_vals[p] = []
869
870        # "product" genereates each possible combination between the
871        # values of the index keys
872        for prod in product(*index_vals):
873            # e.g. prod = ('20170505', '0', '12')
874            #             (  date    ,time, step)
875
876            print('CURRENT PRODUCT: ', prod)
877
878            for i in range(len(index_keys)):
879                codes_index_select(iid, index_keys[i], prod[i])
880
881            # get first id from current product
882            gid = codes_new_from_index(iid)
883
884            # if there is no data for this specific time combination / product
885            # skip the rest of the for loop and start with next timestep/product
886            if not gid:
887                continue
888
889            # create correct timestamp from the three time informations
890            cdate = str(codes_get(gid, 'date'))
891            time = codes_get(gid, 'time')/100 # integer
892            step = codes_get(gid, 'step') # integer
893            ctime = '{:0>2}'.format(time)
894            cstep = '{:0>3}'.format(step)
895
896            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
897            t_dt = t_date + timedelta(hours=step)
898            t_m1dt = t_date + timedelta(hours=step-int(c.dtime))
899            t_m2dt = t_date + timedelta(hours=step-2*int(c.dtime))
900            t_enddate = None
901
902            if c.purefc:
903                fnout = os.path.join(c.inputdir, 'flux' +
904                                     t_date.strftime('%Y%m%d.%H') +
905                                     '.{:0>3}'.format(step-2*int(c.dtime)))
906                gnout = os.path.join(c.inputdir, 'flux' +
907                                     t_date.strftime('%Y%m%d.%H') +
908                                     '.{:0>3}'.format(step-int(c.dtime)))
909                hnout = os.path.join(c.inputdir, 'flux' +
910                                     t_date.strftime('%Y%m%d.%H') +
911                                     '.{:0>3}'.format(step))
912            else:
913                fnout = os.path.join(c.inputdir, 'flux' +
914                                     t_m2dt.strftime('%Y%m%d%H'))
915                gnout = os.path.join(c.inputdir, 'flux' +
916                                     t_m1dt.strftime('%Y%m%d%H'))
917                hnout = os.path.join(c.inputdir, 'flux' +
918                                     t_dt.strftime('%Y%m%d%H'))
919
920            print("outputfile = " + fnout)
921            f_handle = open(fnout, 'w')
922            h_handle = open(hnout, 'w')
923            g_handle = open(gnout, 'w')
924
925            # read message for message and store relevant data fields, where
926            # data keywords are stored in pars
927            while True:
928                if not gid:
929                    break
930                parId = codes_get(gid, 'paramId') # integer
931                step = codes_get(gid, 'step') # integer
932                time = codes_get(gid, 'time') # integer
933                ni = codes_get(gid, 'Ni') # integer
934                nj = codes_get(gid, 'Nj') # integer
935                if parId not in orig_vals.keys():
936                    # parameter is not a flux, find next one
937                    continue
938
939                # define conversion factor
940                if parId == 142 or parId == 143:
941                    fak = 1. / 1000.
942                else:
943                    fak = 3600.
944
945                # get parameter values and reshape
946                values = codes_get_values(gid)
947                values = (np.reshape(values, (nj, ni))).flatten() / fak
948
949                # save the original and accumulated values
950                orig_vals[parId].append(values[:])
951
952                if c.marsclass.upper() == 'EA' or step <= int(c.dtime):
953                    # no de-accumulation needed
954                    deac_vals[parId].append(values[:] / int(c.dtime))
955                else:
956                    # do de-accumulation
957                    deac_vals[parId].append(
958                        (orig_vals[parId][-1] - orig_vals[parId][-2]) /
959                         int(c.dtime))
960
961                # store precipitation if new disaggregation method is selected
962                # only the exact days are needed
963                if c.rrint:
964                    if start_date <= t_dt <= end_date:
965                        if not c.purefc:
966                            if t_dt not in date_list:
967                                date_list.append(t_dt)
968                                step_list = [0]
969                        else:
970                            if t_date not in date_list:
971                                date_list.append(t_date)
972                            if step not in step_list:
973                                step_list.append(step)
974                        if c.rrint and parId == 142:
975                            lsp_np[:,it_lsp] = deac_vals[parId][-1][:]
976                            it_lsp += 1
977                        elif c.rrint and parId == 143:
978                            cp_np[:,it_cp] = deac_vals[parId][-1][:]
979                            it_cp += 1
980
981                # information printout
982                print(parId, time, step, len(values), values[0], np.std(values))
983
984                # length of deac_vals[parId] corresponds to the
985                # number of time steps, max. 4 are needed for disaggegration
986                # with the old and original method
987                # run over all grib messages and perform
988                # shifting in time
989                if len(deac_vals[parId]) >= 3:
990                    if len(deac_vals[parId]) > 3:
991                        if not c.rrint and (parId == 142 or parId == 143):
992                            values = disaggregation.darain(deac_vals[parId])
993                        else:
994                            values = disaggregation.dapoly(deac_vals[parId])
995
996                        if not (step == c.maxstep and c.purefc \
997                                or t_dt == t_enddate):
998                            # remove first time step in list to shift
999                            # time line
1000                            orig_vals[parId].pop(0)
1001                            deac_vals[parId].pop(0)
1002                    else:
1003                        # if the third time step is read (per parId),
1004                        # write out the first one as a boundary value
1005                        if c.purefc:
1006                            values = deac_vals[parId][1]
1007                        else:
1008                            values = deac_vals[parId][0]
1009
1010                    if not (c.rrint and (parId == 142 or parId == 143)):
1011                        codes_set_values(gid, values)
1012
1013                        if c.purefc:
1014                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
1015                        else:
1016                            codes_set(gid, 'stepRange', 0)
1017                            codes_set(gid, 'time', t_m2dt.hour*100)
1018                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
1019
1020                        codes_write(gid, f_handle)
1021
1022                        if c.basetime:
1023                            t_enddate = datetime.strptime(c.end_date + c.basetime,
1024                                                          '%Y%m%d%H')
1025                        else:
1026                            t_enddate = t_date + timedelta(2*int(c.dtime))
1027
1028                            # squeeze out information of last two steps
1029                            # contained in deac_vals[parId]
1030                            # Note that deac_vals[parId][0] has not been popped
1031                            # in this case
1032
1033                            if step == c.maxstep and c.purefc or \
1034                               t_dt == t_enddate:
1035                                # last step
1036                                if c.purefc:
1037                                    values = deac_vals[parId][3]
1038                                    codes_set_values(gid, values)
1039                                    codes_set(gid, 'stepRange', step)
1040                                    #truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1041                                    codes_write(gid, h_handle)
1042                                else:
1043                                    values = deac_vals[parId][3]
1044                                    codes_set_values(gid, values)
1045                                    codes_set(gid, 'stepRange', 0)
1046                                    truedatetime = t_m2dt + timedelta(hours=2*int(c.dtime))
1047                                    codes_set(gid, 'time', truedatetime.hour * 100)
1048                                    codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1049                                    codes_write(gid, h_handle)
1050
1051                                if parId == 142 or parId == 143:
1052                                    values = disaggregation.darain(list(reversed(deac_vals[parId])))
1053                                else:
1054                                    values = disaggregation.dapoly(list(reversed(deac_vals[parId])))
1055
1056                                # step before last step
1057                                if c.purefc:
1058                                    codes_set(gid, 'stepRange', step-int(c.dtime))
1059                                    #truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1060                                    codes_set_values(gid, values)
1061                                    codes_write(gid, g_handle)
1062                                else:
1063                                    codes_set(gid, 'stepRange', 0)
1064                                    truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
1065                                    codes_set(gid, 'time', truedatetime.hour * 100)
1066                                    codes_set(gid, 'date', int(truedatetime.strftime('%Y%m%d')))
1067                                    codes_set_values(gid, values)
1068                                    codes_write(gid, g_handle)
1069
1070                codes_release(gid)
1071
1072                gid = codes_new_from_index(iid)
1073
1074            f_handle.close()
1075            g_handle.close()
1076            h_handle.close()
1077
1078        codes_index_release(iid)
1079
1080        if c.rrint:
1081            self._create_rr_grib_dummy(inputfiles.files[0], c.inputdir)
1082
1083            self._prep_new_rrint(ni, nj, lsp_np.shape[1], lsp_np,
1084                                 cp_np, date_list, step_list, c)
1085
1086        return
1087
1088    def _prep_new_rrint(self, ni, nj, nt, lsp_np, cp_np, date_list, step_list, c):
1089        '''Calculates and writes out the disaggregated precipitation fields.
1090
1091        Disaggregation is done in time and original times are written to the
1092        flux files, while the additional subgrid times are written to
1093        extra files.
1094
1095        Parameters
1096        ----------
1097        ni : :obj:`integer`
1098            Amount of zonal grid points.
1099
1100        nj : :obj:`integer`
1101            Amount of meridional grid points.
1102
1103        nt : :obj:`integer`
1104            Number of time steps.
1105
1106        lsp_np : :obj:`numpy array` of :obj:`float`
1107            The large scale precipitation fields for each time step.
1108            Shape (ni * nj, nt).
1109
1110        cp_np : :obj:`numpy array` of :obj:`float`
1111            The convective precipitation fields for each time step.
1112            Shape (ni * nj, nt).
1113
1114        date_list : :obj:`list` of :obj:`datetime`
1115            The list of dates for which the disaggregation is to be done.
1116
1117        step_list : :obj:`list` of :obj:`integer`
1118            The list of steps for a single forecast time.
1119            Only necessary for pure forecasts.
1120
1121        c : :obj:`ControlFile`
1122            Contains all the parameters of CONTROL file and
1123            command line.
1124
1125        Return
1126        ------
1127
1128        '''
1129        print('... disaggregation or precipitation with new method.')
1130        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1131        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
1132
1133        # do the disaggregation, but neglect the last value of the
1134        # time series. This one corresponds for example to 24 hour,
1135        # which we don't need.
1136        for ix in range(ni*nj):
1137            lsp_new_np[ix,:] = disaggregation.IA3(lsp_np[ix,:])[:-1]
1138            cp_new_np[ix,:] = disaggregation.IA3(cp_np[ix,:])[:-1]
1139
1140        # write to grib files (full/orig times to flux file and inbetween
1141        # times into seperate end files)
1142        print('... write disaggregated precipitation to files.')
1143        it = 0
1144        for date in date_list:
1145            for step in step_list:
1146                tmpfile = os.path.join(c.inputdir, 'rr_grib_dummy.grb')
1147
1148                if c.purefc:
1149                    fluxfilename = 'flux' + date.strftime('%Y%m%d.%H') + \
1150                                   '.{:0>3}'.format(step)
1151                    filename1 = c.prefix + date.strftime('%y%m%d.%H') + \
1152                                '.{:0>3}'.format(step) + '_1'
1153                    filename2 = c.prefix + date.strftime('%y%m%d.%H') + \
1154                                '.{:0>3}'.format(step) + '_2'
1155                else:
1156                    fluxfilename = 'flux' + date.strftime('%Y%m%d%H')
1157                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
1158                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
1159
1160                # collect for final processing
1161                self.outputfilelist.append(os.path.basename(fluxfilename))
1162                self.outputfilelist.append(os.path.basename(filename1))
1163                self.outputfilelist.append(os.path.basename(filename2))
1164
1165                # write original time step to flux file as usual
1166                fluxfile = GribUtil(os.path.join(c.inputdir, fluxfilename))
1167                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1168                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1169                                  keynames=['date','time','stepRange','values'],
1170                                  keyvalues=[int(date.strftime('%Y%m%d')),
1171                                             date.hour*100, step, lsp_new_np[:,it]],
1172                                 )
1173                fluxfile.set_keys(tmpfile, filemode='a', strict=True,
1174                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1175                                  keynames=['date','time','stepRange','values'],
1176                                  keyvalues=[int(date.strftime('%Y%m%d')),
1177                                             date.hour*100, step, cp_new_np[:,it]]
1178                                 )
1179
1180                # write first subgrid time step
1181                endfile1 = GribUtil(os.path.join(c.inputdir, filename1))
1182                endfile1.set_keys(tmpfile, filemode='w', strict=True,
1183                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1184                                  keynames=['date','time','stepRange','values'],
1185                                  keyvalues=[int(date.strftime('%Y%m%d')),
1186                                             date.hour*100, step, lsp_new_np[:,it+1]]
1187                                  )
1188                endfile1.set_keys(tmpfile, filemode='a', strict=True,
1189                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1190                                  keynames=['date','time','stepRange','values'],
1191                                  keyvalues=[int(date.strftime('%Y%m%d')),
1192                                             date.hour*100, step, cp_new_np[:,it+1]]
1193                                 )
1194
1195                # write second subgrid time step
1196                endfile2 = GribUtil(os.path.join(c.inputdir, filename2))
1197                endfile2.set_keys(tmpfile, filemode='w', strict=True,
1198                                  wherekeynames=['paramId'], wherekeyvalues=[142],
1199                                  keynames=['date','time','stepRange','values'],
1200                                  keyvalues=[int(date.strftime('%Y%m%d')),
1201                                             date.hour*100, step, lsp_new_np[:,it+2]]
1202                                 )
1203                endfile2.set_keys(tmpfile, filemode='a', strict=True,
1204                                  wherekeynames=['paramId'], wherekeyvalues=[143],
1205                                  keynames=['date','time','stepRange','values'],
1206                                  keyvalues=[int(date.strftime('%Y%m%d')),
1207                                             date.hour*100, step, cp_new_np[:,it+2]]
1208                                 )
1209                it = it + 3 # jump to next original time step
1210        return
1211
1212    def _create_rr_grib_dummy(self, ifile, inputdir):
1213        '''Creates a grib file with a dummy message for the two precipitation
1214        types lsp and cp each.
1215
1216        Parameters
1217        ----------
1218        ifile : :obj:`string`
1219            Filename of the input file to read the grib messages from.
1220
1221        inputdir : :obj:`string`, optional
1222            Path to the directory where the retrieved data is stored.
1223
1224        Return
1225        ------
1226
1227        '''
1228
1229        gribfile = GribUtil(os.path.join(inputdir,'rr_grib_dummy.grb'))
1230
1231        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1232                      keyvalues=[142], filemode='w')
1233
1234        gribfile.copy_dummy_msg(ifile, keynames=['paramId'],
1235                      keyvalues=[143], filemode='a')
1236
1237        return
1238
1239    def create(self, inputfiles, c):
1240        '''An index file will be created which depends on the combination
1241        of "date", "time" and "stepRange" values. This is used to iterate
1242        over all messages in each grib file which were passed through the
1243        parameter "inputfiles" to seperate specific parameters into fort.*
1244        files. Afterwards the FORTRAN program is called to convert
1245        the data fields all to the same grid and put them in one file
1246        per unique time step (combination of "date", "time" and
1247        "stepRange").
1248
1249        Note
1250        ----
1251        This method is based on the ECMWF example index.py
1252        https://software.ecmwf.int/wiki/display/GRIB/index.py
1253
1254        Parameters
1255        ----------
1256        inputfiles : :obj:`UioFiles`
1257            Contains a list of files.
1258
1259        c : :obj:`ControlFile`
1260            Contains all the parameters of CONTROL file and
1261            command line.
1262
1263        Return
1264        ------
1265
1266        '''
1267
1268        if c.wrf:
1269            table128 = init128(_config.PATH_GRIBTABLE)
1270            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
1271                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
1272                                  table128)
1273
1274        # these numbers are indices for the temporary files "fort.xx"
1275        # which are used to seperate the grib fields to,
1276        # for the Fortran program input
1277        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
1278        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
1279        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
1280                 '17':None, '18':None, '19':None, '21':None, '22':None}
1281
1282        iid = None
1283        index_vals = None
1284
1285        # get the values of the keys which are used for distinct access
1286        # of grib messages via product
1287        index_keys = ["date", "time", "step"]
1288        iid, index_vals = self._mk_index_values(c.inputdir,
1289                                                inputfiles,
1290                                                index_keys)
1291        # index_vals looks like e.g.:
1292        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
1293        # index_vals[1]: ('0', '600', '1200', '1800') ; time
1294        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
1295
1296        # "product" genereates each possible combination between the
1297        # values of the index keys
1298        for prod in product(*index_vals):
1299            # e.g. prod = ('20170505', '0', '12')
1300            #             (  date    ,time, step)
1301
1302            print('current product: ', prod)
1303
1304            for i in range(len(index_keys)):
1305                codes_index_select(iid, index_keys[i], prod[i])
1306
1307            # get first id from current product
1308            gid = codes_new_from_index(iid)
1309
1310            # if there is no data for this specific time combination / product
1311            # skip the rest of the for loop and start with next timestep/product
1312            if not gid:
1313                continue
1314#============================================================================================
1315            # remove old fort.* files and open new ones
1316            # they are just valid for a single product
1317            for k, f in fdict.iteritems():
1318                fortfile = os.path.join(c.inputdir, 'fort.' + k)
1319                silent_remove(fortfile)
1320                fdict[k] = open(fortfile, 'w')
1321#============================================================================================
1322            # create correct timestamp from the three time informations
1323            cdate = str(codes_get(gid, 'date'))
1324            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1325            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1326            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1327            timestamp += timedelta(hours=int(cstep))
1328            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1329
1330            # if the timestamp is out of basetime start/end date period,
1331            # skip this specific product
1332            if c.basetime:
1333                start_time = datetime.strptime(c.end_date + c.basetime,
1334                                                '%Y%m%d%H') - time_delta
1335                end_time = datetime.strptime(c.end_date + c.basetime,
1336                                             '%Y%m%d%H')
1337                if timestamp < start_time or timestamp > end_time:
1338                    continue
1339
1340            if c.wrf:
1341                if 'olddate' not in locals() or cdate != olddate:
1342                    fwrf = open(os.path.join(c.outputdir,
1343                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1344                    olddate = cdate[:]
1345#============================================================================================
1346            # savedfields remembers which fields were already used.
1347            savedfields = []
1348            # sum of cloud liquid and ice water content
1349            scwc = None
1350            while 1:
1351                if not gid:
1352                    break
1353                paramId = codes_get(gid, 'paramId')
1354                gridtype = codes_get(gid, 'gridType')
1355                levtype = codes_get(gid, 'typeOfLevel')
1356                if paramId == 77: # ETADOT
1357                    codes_write(gid, fdict['21'])
1358                elif paramId == 130: # T
1359                    codes_write(gid, fdict['11'])
1360                elif paramId == 131 or paramId == 132: # U, V wind component
1361                    codes_write(gid, fdict['10'])
1362                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1363                    codes_write(gid, fdict['17'])
1364                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1365                    codes_write(gid, fdict['18'])
1366                elif paramId == 135: # W
1367                    codes_write(gid, fdict['19'])
1368                elif paramId == 152: # LNSP
1369                    codes_write(gid, fdict['12'])
1370                elif paramId == 155 and gridtype == 'sh': # D
1371                    codes_write(gid, fdict['13'])
1372                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1373                    # sum cloud liquid water and ice
1374                    if scwc is None:
1375                        scwc = codes_get_values(gid)
1376                    else:
1377                        scwc += codes_get_values(gid)
1378                        codes_set_values(gid, scwc)
1379                        codes_set(gid, 'paramId', 201031)
1380                        codes_write(gid, fdict['22'])
1381                        scwc = None
1382                elif c.wrf and paramId in [129, 138, 155] and \
1383                      levtype == 'hybrid': # Z, VO, D
1384                    # do not do anything right now
1385                    # these are specific parameter for WRF
1386                    pass
1387                else:
1388                    if paramId not in savedfields:
1389                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1390                        # and all ADDPAR parameter
1391                        codes_write(gid, fdict['16'])
1392                        savedfields.append(paramId)
1393                    else:
1394                        print('duplicate ' + str(paramId) + ' not written')
1395
1396                try:
1397                    if c.wrf:
1398                        # model layer
1399                        if levtype == 'hybrid' and \
1400                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1401                            codes_write(gid, fwrf)
1402                        # sfc layer
1403                        elif paramId in wrfpars:
1404                            codes_write(gid, fwrf)
1405                except AttributeError:
1406                    pass
1407
1408                codes_release(gid)
1409                gid = codes_new_from_index(iid)
1410#============================================================================================
1411            for f in fdict.values():
1412                f.close()
1413#============================================================================================
1414            # call for Fortran program to convert e.g. reduced_gg grids to
1415            # regular_ll and calculate detadot/dp
1416            pwd = os.getcwd()
1417            os.chdir(c.inputdir)
1418            if os.stat('fort.21').st_size == 0 and c.eta:
1419                print('Parameter 77 (etadot) is missing, most likely it is \
1420                       not available for this type or date/time\n')
1421                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1422                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1423                         is set to 1 in CONTROL file')
1424#============================================================================================
1425            # write out all output to log file before starting fortran programm
1426            sys.stdout.flush()
1427
1428            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1429            p = subprocess.check_call([os.path.join(
1430                c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1431            os.chdir(pwd)
1432#============================================================================================
1433            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1434            if c.purefc:
1435                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1436            else:
1437                suffix = cdate_hour[2:10]
1438            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1439            print("outputfile = " + fnout)
1440            # collect for final processing
1441            self.outputfilelist.append(os.path.basename(fnout))
1442#============================================================================================
1443            # create outputfile and copy all data from intermediate files
1444            # to the outputfile (final GRIB input files for FLEXPART)
1445            orolsm = os.path.basename(glob.glob(c.inputdir +
1446                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1447            fluxfile = 'flux' + cdate[0:2] + suffix
1448            if not c.cwc:
1449                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1450            else:
1451                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1452
1453            with open(fnout, 'wb') as fout:
1454                for f in flist:
1455                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1456                                       fout)
1457
1458            if c.omega:
1459                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1460                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1461                                            'rb'), fout)
1462#============================================================================================
1463        if c.wrf:
1464            fwrf.close()
1465
1466        codes_index_release(iid)
1467
1468        return
1469
1470
1471    def process_output(self, c):
1472        '''Postprocessing of FLEXPART input files.
1473
1474        The grib files are postprocessed depending on the selection in
1475        CONTROL file. The resulting files are moved to the output
1476        directory if its not equal to the input directory.
1477        The following modifications might be done if
1478        properly switched in CONTROL file:
1479        GRIB2 - Conversion to GRIB2
1480        ECTRANS - Transfer of files to gateway server
1481        ECSTORAGE - Storage at ECMWF server
1482
1483        Parameters
1484        ----------
1485        c : :obj:`ControlFile`
1486            Contains all the parameters of CONTROL file and
1487            command line.
1488
1489        Return
1490        ------
1491
1492        '''
1493
1494        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1495
1496        if not c.ecapi:
1497            print('ecstorage: {}\n ecfsdir: {}\n'.
1498                  format(c.ecstorage, c.ecfsdir))
1499            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1500                  .format(c.ectrans, c.gateway, c.destination))
1501
1502        print('Output filelist: ')
1503        print(self.outputfilelist)
1504
1505        for ofile in self.outputfilelist:
1506            ofile = os.path.join(self.inputdir, ofile)
1507
1508            if c.format.lower() == 'grib2':
1509                p = subprocess.check_call(['grib_set', '-s', 'edition=2,',
1510                                           'productDefinitionTemplateNumber=8',
1511                                           ofile, ofile + '_2'])
1512                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1513
1514            if c.ectrans and not c.ecapi:
1515                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1516                                           c.gateway, '-remote', c.destination,
1517                                           '-source', ofile])
1518
1519            if c.ecstorage and not c.ecapi:
1520                p = subprocess.check_call(['ecp', '-o', ofile,
1521                                           os.path.expandvars(c.ecfsdir)])
1522
1523            if c.outputdir != c.inputdir:
1524                p = subprocess.check_call(['mv',
1525                                           os.path.join(c.inputdir, ofile),
1526                                           c.outputdir])
1527
1528        return
1529
1530
1531    def prepare_fp_files(self, c):
1532        '''Conversion of GRIB files to FLEXPART binary format.
1533
1534        Parameters
1535        ----------
1536        c : :obj:`ControlFile`
1537            Contains all the parameters of CONTROL file and
1538            command line.
1539
1540        Return
1541        ------
1542
1543        '''
1544        # generate AVAILABLE file
1545        # Example of AVAILABLE file data:
1546        # 20131107 000000      EN13110700              ON DISC
1547        clist = []
1548        for ofile in self.outputfilelist:
1549            fname = ofile.split('/')
1550            if '.' in fname[-1]:
1551                l = fname[-1].split('.')
1552                timestamp = datetime.strptime(l[0][-6:] + l[1],
1553                                              '%y%m%d%H')
1554                timestamp += timedelta(hours=int(l[2]))
1555                cdate = datetime.strftime(timestamp, '%Y%m%d')
1556                chms = datetime.strftime(timestamp, '%H%M%S')
1557            else:
1558                cdate = '20' + fname[-1][-8:-2]
1559                chms = fname[-1][-2:] + '0000'
1560            clist.append(cdate + ' ' + chms + ' '*6 +
1561                         fname[-1] + ' '*14 + 'ON DISC')
1562        clist.sort()
1563        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1564            f.write('\n'.join(clist) + '\n')
1565
1566        # generate pathnames file
1567        pwd = os.path.abspath(c.outputdir)
1568        with open(pwd + '/pathnames', 'w') as f:
1569            f.write(pwd + '/Options/\n')
1570            f.write(pwd + '/\n')
1571            f.write(pwd + '/\n')
1572            f.write(pwd + '/AVAILABLE\n')
1573            f.write(' = == = == = == = == = == ==  = \n')
1574
1575        # create Options dir if necessary
1576        if not os.path.exists(pwd + '/Options'):
1577            make_dir(pwd+'/Options')
1578
1579        # read template COMMAND file
1580        with open(os.path.expandvars(os.path.expanduser(
1581            c.flexpartdir)) + '/../Options/COMMAND', 'r') as f:
1582            lflist = f.read().split('\n')
1583
1584        # find index of list where to put in the
1585        # date and time information
1586        # usually after the LDIRECT parameter
1587        i = 0
1588        for l in lflist:
1589            if 'LDIRECT' in l.upper():
1590                break
1591            i += 1
1592
1593        # insert the date and time information of run start and end
1594        # into the list of lines of COMMAND file
1595        lflist = lflist[:i+1] + \
1596                 [clist[0][:16], clist[-1][:16]] + \
1597                 lflist[i+3:]
1598
1599        # write the new COMMAND file
1600        with open(pwd + '/Options/COMMAND', 'w') as g:
1601            g.write('\n'.join(lflist) + '\n')
1602
1603        # change to outputdir and start the grib2flexpart run
1604        # afterwards switch back to the working dir
1605        os.chdir(c.outputdir)
1606        p = subprocess.check_call([
1607            os.path.expandvars(os.path.expanduser(c.flexpartdir))
1608            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1609        os.chdir(pwd)
1610
1611        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG