source: flex_extract.git/source/python/classes/EcFlexpart.py @ 2d56c04

devfeature/makefilesorigin/task/language-editingtask/language-editing
Last change on this file since 2d56c04 was 2d56c04, checked in by Anne Philipp <anne.philipp@…>, 21 months ago

automatic detection of grid and area component formats (1/1000 or normal degree format)

  • Property mode set to 100644
File size: 51.7 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - extended with class Control
12#        - removed functions mkdir_p, daterange, years_between, months_between
13#        - added functions darain, dapoly, to_param_id, init128, normal_exit,
14#          my_error, clean_up, install_args_and_control,
15#          interpret_args_and_control,
16#        - removed function __del__ in class EIFLexpart
17#        - added the following functions in EIFlexpart:
18#            - create_namelist
19#            - process_output
20#            - deacc_fluxes
21#        - modified existing EIFlexpart - functions for the use in
22#          flex_extract
23#        - retrieve also longer term forecasts, not only analyses and
24#          short term forecast data
25#        - added conversion into GRIB2
26#        - added conversion into .fp format for faster execution of FLEXPART
27#          (see https://www.flexpart.eu/wiki/FpCtbtoWo4FpFormat)
28#
29#    February 2018 - Anne Philipp (University of Vienna):
30#        - applied PEP8 style guide
31#        - added documentation
32#        - removed function getFlexpartTime in class EcFlexpart
33#        - outsourced class ControlFile
34#        - outsourced class MarsRetrieval
35#        - changed class name from EIFlexpart to EcFlexpart
36#        - applied minor code changes (style)
37#        - removed "dead code" , e.g. retrieval of Q since it is not needed
38#        - removed "times" parameter from retrieve-method since it is not used
39#        - seperated function "retrieve" into smaller functions (less code
40#          duplication, easier testing)
41#
42# @License:
43#    (C) Copyright 2014-2018.
44#
45#    This software is licensed under the terms of the Apache Licence Version 2.0
46#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
47#
48# @Class Description:
49#    FLEXPART needs grib files in a specifc format. All necessary data fields
50#    for one time step are stored in a single file. The class represents an
51#    instance with all the parameter and settings necessary for retrieving
52#    MARS data and modifing them so they are fitting FLEXPART need. The class
53#    is able to disaggregate the fluxes and convert grid types to the one needed
54#    by FLEXPART, therefore using the FORTRAN program.
55#
56# @Class Content:
57#    - __init__
58#    - write_namelist
59#    - retrieve
60#    - process_output
61#    - create
62#    - deacc_fluxes
63#
64# @Class Attributes:
65#
66#  TODO
67#
68#*******************************************************************************
69#pylint: disable=unsupported-assignment-operation
70# this is disabled because for this specific case its an error in pylint
71#pylint: disable=consider-using-enumerate
72# this is not useful in this case
73# ------------------------------------------------------------------------------
74# MODULES
75# ------------------------------------------------------------------------------
76import os
77import sys
78import glob
79import shutil
80import subprocess
81from datetime import datetime, timedelta
82import numpy as np
83
84from eccodes import (codes_index_select, codes_new_from_index, codes_get,
85                     codes_get_values, codes_set_values, codes_set,
86                     codes_write, codes_release, codes_new_from_index,
87                     codes_index_release, codes_index_get)
88
89# software specific classes and modules from flex_extract
90sys.path.append('../')
91import _config
92from GribTools import GribTools
93from mods.tools import (init128, to_param_id, silent_remove, product,
94                        my_error, make_dir)
95from MarsRetrieval import MarsRetrieval
96import mods.disaggregation as disaggregation
97
98# ------------------------------------------------------------------------------
99# CLASS
100# ------------------------------------------------------------------------------
101class EcFlexpart(object):
102    '''
103    Class to retrieve FLEXPART specific ECMWF data.
104    '''
105    # --------------------------------------------------------------------------
106    # CLASS FUNCTIONS
107    # --------------------------------------------------------------------------
108    def __init__(self, c, fluxes=False):
109        '''Creates an object/instance of EcFlexpart with the associated
110        settings of its attributes for the retrieval.
111
112        Parameters:
113        -----------
114        c : :obj:`ControlFile`
115            Contains all the parameters of CONTROL file and
116            command line.
117
118        fluxes : :obj:`boolean`, optional
119            Decides if the flux parameter settings are stored or
120            the rest of the parameter list.
121            Default value is False.
122
123        Return
124        ------
125
126        '''
127        # set a counter for the number of mars requests generated
128        self.mreq_count = 0
129
130        # different mars types for retrieving data for flexpart
131        self.types = dict()
132
133        # Pure forecast mode
134        if c.maxstep > len(c.type) and 'AN' not in c.type:
135            c.type = [c.type[0]]
136            c.step = ['{:0>3}'.format(int(c.step[0]))]
137            c.time = [c.time[0]]
138            for i in range(1, c.maxstep + 1):
139                c.type.append(c.type[0])
140                c.step.append('{:0>3}'.format(i))
141                c.time.append(c.time[0])
142
143        self.inputdir = c.inputdir
144        self.dataset = c.dataset
145        self.basetime = c.basetime
146        self.dtime = c.dtime
147        i = 0
148        if fluxes and c.maxstep <= 24:
149            # no forecast beyond one day is needed!
150            # Thus, prepare flux data manually as usual
151            # with only forecast fields with start times at 00/12
152            # (but without 00/12 fields since these are
153            # the initialisation times of the flux fields
154            # and therefore are zero all the time)
155            self.types[str(c.acctype)] = {'times': str(c.acctime),
156                                          'steps': '{}/to/{}/by/{}'.format(
157                                              c.dtime, c.accmaxstep, c.dtime)}
158        else:
159            for ty, st, ti in zip(c.type, c.step, c.time):
160                btlist = range(24)
161                if c.basetime == '12':
162                    btlist = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
163                if c.basetime == '00':
164                    btlist = [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0]
165
166                if ((ty.upper() == 'AN' and (int(c.time[i]) % int(c.dtime)) == 0) or
167                    (ty.upper() != 'AN' and (int(c.step[i]) % int(c.dtime)) == 0 and
168                     (int(c.step[i]) % int(c.dtime) == 0)) ) and \
169                    (int(c.time[i]) in btlist or c.maxstep > 24):
170
171                    if ty not in self.types.keys():
172                        self.types[ty] = {'times': '', 'steps': ''}
173
174                    if ti not in self.types[ty]['times']:
175                        if self.types[ty]['times']:
176                            self.types[ty]['times'] += '/'
177                        self.types[ty]['times'] += ti
178
179                    if st not in self.types[ty]['steps']:
180                        if self.types[ty]['steps']:
181                            self.types[ty]['steps'] += '/'
182                        self.types[ty]['steps'] += st
183                i += 1
184
185        self.marsclass = c.marsclass
186        self.stream = c.stream
187        self.number = c.number
188        self.resol = c.resol
189        self.accuracy = c.accuracy
190        self.level = c.level
191        self.expver = c.expver
192        self.levelist = c.levelist
193        # for gaussian grid retrieval
194        self.glevelist = '1/to/' + c.level
195        self.gaussian = c.gaussian
196        self.grid = c.grid
197        self.area = c.area
198        self.outputfilelist = []
199
200
201        # Now comes the nasty part that deals with the different
202        # scenarios we have:
203        # 1) Calculation of etadot on
204        #    a) Gaussian grid
205        #    b) Output grid
206        #    c) Output grid using parameter 77 retrieved from MARS
207        # 3) Calculation/Retrieval of omega
208        # 4) Download also data for WRF
209
210        # Different grids need different retrievals
211        # SH = Spherical Harmonics, GG = Gaussian Grid,
212        # OG = Output Grid, ML = MultiLevel, SL = SingleLevel
213        self.params = {'SH__ML': '', 'SH__SL': '',
214                       'GG__ML': '', 'GG__SL': '',
215                       'OG__ML': '', 'OG__SL': '',
216                       'OG_OROLSM_SL': '', 'OG_acc_SL': ''}
217        # the self.params dictionary stores a list of
218        # [param, levtype, levelist, grid] per key
219
220        if fluxes is False:
221            self.params['SH__SL'] = ['LNSP', 'ML', '1', 'OFF']
222            #                        "SD/MSL/TCC/10U/10V/2T/2D/129/172"
223            self.params['OG__SL'] = ["141/151/164/165/166/167/168/129/172", \
224                                     'SFC', '1', self.grid]
225            if c.addpar:
226                if c.addpar[0] == '/':
227                    c.addpar = c.addpar[1:]
228                self.params['OG__SL'][0] += '/' + '/'.join(c.addpar)
229
230            if c.marsclass.upper() == 'EA' or c.marsclass.upper() == 'EP':
231                self.params['OG_OROLSM__SL'] = ["160/27/28/244",
232                                                'SFC', '1', self.grid]
233            else:
234                self.params['OG_OROLSM__SL'] = ["160/27/28/173", \
235                                                'SFC', '1', self.grid]
236
237            self.params['OG__ML'] = ['T/Q', 'ML', self.levelist, self.grid]
238
239            #if c.gauss == '0' and c.eta == '1':
240            if not c.gauss and c.eta:
241                # the simplest case
242                self.params['OG__ML'][0] += '/U/V/77'
243            #elif c.gauss == '0' and c.eta == '0':
244            elif not c.gauss and not c.eta:
245            # this is not recommended (inaccurate)
246                self.params['OG__ML'][0] += '/U/V'
247            #elif c.gauss == '1' and c.eta == '0':
248            elif c.gauss and not c.eta:
249                # this is needed for data before 2008, or for reanalysis data
250                self.params['GG__SL'] = ['Q', 'ML', '1', \
251                                         '{}'.format((int(self.resol) + 1) / 2)]
252                self.params['SH__ML'] = ['U/V/D', 'ML', self.glevelist, 'OFF']
253            else:
254                print('Warning: This is a very costly parameter combination, \
255                      use only for debugging!')
256                self.params['GG__SL'] = ['Q', 'ML', '1', \
257                                         '{}'.format((int(self.resol) + 1) / 2)]
258                self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
259                                         '{}'.format((int(self.resol) + 1) / 2)]
260
261            if c.omega:
262                self.params['OG__ML'][0] += '/W'
263
264            # add cloud water content if necessary
265            if c.cwc:
266                self.params['OG__ML'][0] += '/CLWC/CIWC'
267
268            # add vorticity and geopotential height for WRF if necessary
269            if c.wrf:
270                self.params['OG__ML'][0] += '/Z/VO'
271                if '/D' not in self.params['OG__ML'][0]:
272                    self.params['OG__ML'][0] += '/D'
273                #wrf_sfc = 'sp/msl/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/stl1/ /
274                #           stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4'.upper()
275                wrf_sfc = '134/235/167/165/166/168/129/172/34/31/141/ \
276                           139/170/183/236/39/40/41/42'
277                lwrt_sfc = wrf_sfc.split('/')
278                for par in lwrt_sfc:
279                    if par not in self.params['OG__SL'][0]:
280                        self.params['OG__SL'][0] += '/' + par
281
282        else:
283            self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
284                                        'SFC', '1', self.grid]
285
286        return
287
288
289    def _mk_targetname(self, ftype, param, date):
290        '''Creates the filename for the requested grib data to be stored in.
291        This name is passed as the "target" parameter in the request.
292
293        Parameters
294        ----------
295        ftype : :obj:`string`
296            Shortcut name of the type of the field. E.g. AN, FC, PF, ...
297
298        param : :obj:`string`
299            Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,
300            GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL
301
302        date : :obj:`string`
303            The date period of the grib data to be stored in this file.
304
305        Return
306        ------
307        targetname : :obj:`string`
308            The target filename for the grib data.
309        '''
310        targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +
311                      str(os.getppid()) + '.' + str(os.getpid()) + '.grb')
312
313        return targetname
314
315
316    def _start_retrievement(self, request, par_dict):
317        '''Creates the Mars Retrieval and prints or submits the request
318        depending on the status of the request variable.
319
320        Parameters
321        ----------
322        request : :obj:`integer`
323            Selects the mode of retrieval.
324            0: Retrieves the data from ECMWF.
325            1: Prints the mars requests to an output file.
326            2: Retrieves the data and prints the mars request.
327
328        par_dict : :obj:`dictionary`
329            Contains all parameter which have to be set for creating the
330            Mars Retrievals. The parameter are:
331            marsclass, dataset, stream, type, levtype, levelist, resol,
332            gaussian, accuracy, grid, target, area, date, time, number,
333            step, expver, param
334
335        Return
336        ------
337
338        '''
339        # increase number of mars requests
340        self.mreq_count += 1
341
342        MR = MarsRetrieval(self.server,
343                           self.public,
344                           marsclass=par_dict['marsclass'],
345                           dataset=par_dict['dataset'],
346                           stream=par_dict['stream'],
347                           type=par_dict['type'],
348                           levtype=par_dict['levtype'],
349                           levelist=par_dict['levelist'],
350                           resol=par_dict['resol'],
351                           gaussian=par_dict['gaussian'],
352                           accuracy=par_dict['accuracy'],
353                           grid=par_dict['grid'],
354                           target=par_dict['target'],
355                           area=par_dict['area'],
356                           date=par_dict['date'],
357                           time=par_dict['time'],
358                           number=par_dict['number'],
359                           step=par_dict['step'],
360                           expver=par_dict['expver'],
361                           param=par_dict['param'])
362
363        if request == 0:
364            MR.display_info()
365            MR.data_retrieve()
366        elif request == 1:
367            MR.print_infodata_csv(self.inputdir, self.mreq_count)
368        elif request == 2:
369            MR.print_infodata_csv(self.inputdir, self.mreq_count)
370            MR.display_info()
371            MR.data_retrieve()
372        else:
373            print('Failure')
374
375        return
376
377
378    def _mk_index_values(self, inputdir, inputfiles, keys):
379        '''Creates an index file for a set of grib parameter keys.
380        The values from the index keys are returned in a list.
381
382        Parameters
383        ----------
384        keys : :obj:`dictionary`
385            List of parameter names which serves as index.
386
387        inputfiles : :obj:`UioFiles`
388            Contains a list of files.
389
390        Return
391        ------
392        iid : :obj:`codes_index`
393            This is a grib specific index structure to access
394            messages in a file.
395
396        index_vals : :obj:`list`
397            Contains the values from the keys used for a distinct selection
398            of grib messages in processing  the grib files.
399            Content looks like e.g.:
400            index_vals[0]: ('20171106', '20171107', '20171108') ; date
401            index_vals[1]: ('0', '1200', '1800', '600') ; time
402            index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
403        '''
404        iid = None
405        index_keys = keys
406
407        indexfile = os.path.join(inputdir, _config.FILE_GRIB_INDEX)
408        silent_remove(indexfile)
409        grib = GribTools(inputfiles.files)
410        # creates new index file
411        iid = grib.index(index_keys=index_keys, index_file=indexfile)
412
413        # read the values of index keys
414        index_vals = []
415        for key in index_keys:
416            key_vals = codes_index_get(iid, key)
417            # have to sort the key values for correct disaggregation,
418            # therefore convert to int first
419            key_vals = [int(k) for k in key_vals]
420            key_vals.sort()
421            key_vals = [str(k) for k in key_vals]
422            index_vals.append(key_vals)
423            # index_vals looks for example like:
424            # index_vals[0]: ('20171106', '20171107', '20171108') ; date
425            # index_vals[1]: ('0', '1200') ; time
426            # index_vals[2]: (3', '6', '9', '12') ; stepRange
427
428        return iid, index_vals
429
430
431    def retrieve(self, server, dates, public, request, inputdir='.'):
432        '''Finalizing the retrieval information by setting final details
433        depending on grid type.
434        Prepares MARS retrievals per grid type and submits them.
435
436        Parameters
437        ----------
438        server : :obj:`ECMWFService` or :obj:`ECMWFDataServer`
439            The connection to the ECMWF server. This is different
440            for member state users which have full access and non
441            member state users which have only access to the public
442            data sets. The decision is made from command line argument
443            "public"; for public access its True (ECMWFDataServer)
444            for member state users its False (ECMWFService)
445
446        dates : :obj:`string`
447            Contains start and end date of the retrieval in the format
448            "YYYYMMDD/to/YYYYMMDD"
449
450        request : :obj:`integer`
451            Selects the mode of retrieval.
452            0: Retrieves the data from ECMWF.
453            1: Prints the mars requests to an output file.
454            2: Retrieves the data and prints the mars request.
455
456        inputdir : :obj:`string`, optional
457            Path to the directory where the retrieved data is about
458            to be stored. The default is the current directory ('.').
459
460        Return
461        ------
462
463        '''
464        self.dates = dates
465        self.server = server
466        self.public = public
467        self.inputdir = inputdir
468        oro = False
469
470        # define times with datetime module
471        t12h = timedelta(hours=12)
472        t24h = timedelta(hours=24)
473
474        # dictionary which contains all parameter for the mars request,
475        # entries with a "None" will change in different requests and will
476        # therefore be set in each request seperately
477        retr_param_dict = {'marsclass':self.marsclass,
478                           'dataset':self.dataset,
479                           'stream':None,
480                           'type':None,
481                           'levtype':None,
482                           'levelist':None,
483                           'resol':self.resol,
484                           'gaussian':None,
485                           'accuracy':self.accuracy,
486                           'grid':None,
487                           'target':None,
488                           'area':None,
489                           'date':None,
490                           'time':None,
491                           'number':self.number,
492                           'step':None,
493                           'expver':self.expver,
494                           'param':None}
495
496        for ftype in self.types:
497            # fk contains field types such as
498            #     [AN, FC, PF, CV]
499            # fv contains all of the items of the belonging key
500            #     [times, steps]
501            for pk, pv in self.params.iteritems():
502                # pk contains one of these keys of params
503                #     [SH__ML, SH__SL, GG__ML, GG__SL, OG__ML, OG__SL,
504                #      OG_OROLSM_SL, OG_acc_SL]
505                # pv contains all of the items of the belonging key
506                #     [param, levtype, levelist, grid]
507                if isinstance(pv, str):
508                    continue
509                retr_param_dict['type'] = ftype
510                retr_param_dict['time'] = self.types[ftype]['times']
511                retr_param_dict['step'] = self.types[ftype]['steps']
512                retr_param_dict['date'] = self.dates
513                retr_param_dict['stream'] = self.stream
514                retr_param_dict['target'] = \
515                    self._mk_targetname(ftype,
516                                        pk,
517                                        retr_param_dict['date'].split('/')[0])
518                retr_param_dict['param'] = pv[0]
519                retr_param_dict['levtype'] = pv[1]
520                retr_param_dict['levelist'] = pv[2]
521                retr_param_dict['grid'] = pv[3]
522                retr_param_dict['area'] = self.area
523                retr_param_dict['gaussian'] = self.gaussian
524
525                if pk == 'OG__SL':
526                    pass
527                if pk == 'OG_OROLSM__SL' and not oro:
528                    oro = True
529                    # in CERA20C (class EP) there is no stream "OPER"!
530                    if self.marsclass.upper() != 'EP':
531                        retr_param_dict['stream'] = 'OPER'
532                    retr_param_dict['type'] = 'AN'
533                    retr_param_dict['time'] = '00'
534                    retr_param_dict['step'] = '000'
535                    retr_param_dict['date'] = self.dates.split('/')[0]
536                    retr_param_dict['target'] = self._mk_targetname('',
537                                            pk, retr_param_dict['date'])
538                elif pk == 'OG_OROLSM__SL' and oro:
539                    continue
540                if pk == 'GG__SL' and pv[0] == 'Q':
541                    retr_param_dict['area'] = ""
542                    retr_param_dict['gaussian'] = 'reduced'
543
544    # ------  on demand path  --------------------------------------------------
545                if not self.basetime:
546                    # ******* start retrievement
547                    self._start_retrievement(request, retr_param_dict)
548    # ------  operational path  ------------------------------------------------
549                else:
550                    # check if mars job requests fields beyond basetime.
551                    # if yes eliminate those fields since they may not
552                    # be accessible with user's credentials
553
554                    enddate = retr_param_dict['date'].split('/')[-1]
555                    elimit = datetime.strptime(enddate + self.basetime,
556                                               '%Y%m%d%H')
557
558                    if self.basetime == '12':
559                        # --------------  flux data ----------------------------
560                        if 'acc' in pk:
561
562                        # Strategy:
563                        # if maxtime-elimit >= 24h reduce date by 1,
564                        # if 12h <= maxtime-elimit<12h reduce time for last date
565                        # if maxtime-elimit<12h reduce step for last time
566                        # A split of the MARS job into 2 is likely necessary.
567
568
569                            startdate = retr_param_dict['date'].split('/')[0]
570                            enddate = datetime.strftime(elimit - t24h,'%Y%m%d')
571                            retr_param_dict['date'] = '/'.join([startdate,
572                                                                'to',
573                                                                enddate])
574
575                            # ******* start retrievement
576                            self._start_retrievement(request, retr_param_dict)
577
578                            retr_param_dict['date'] = \
579                                datetime.strftime(elimit - t12h, '%Y%m%d')
580                            retr_param_dict['time'] = '00'
581                            retr_param_dict['target'] = \
582                                self._mk_targetname(ftype, pk,
583                                                    retr_param_dict['date'])
584
585                            # ******* start retrievement
586                            self._start_retrievement(request, retr_param_dict)
587
588                        # --------------  non flux data ------------------------
589                        else:
590                            # ******* start retrievement
591                            self._start_retrievement(request, retr_param_dict)
592
593                    else: # basetime = 0
594                        retr_param_dict['date'] = \
595                            datetime.strftime(elimit - t24h, '%Y%m%d')
596
597                        timesave = ''.join(retr_param_dict['time'])
598
599                        if '/' in retr_param_dict['time']:
600                            times = retr_param_dict['time'].split('/')
601                            steps = retr_param_dict['step'].split('/')
602                            while (pk != 'OG_OROLSM__SL' and
603                                   'acc' not in pk and
604                                   (int(times[0]) + int(steps[0])) <= 12):
605                                times = times[1:]
606
607                            if len(times) > 1:
608                                retr_param_dict['time'] = '/'.join(times)
609                            else:
610                                retr_param_dict['time'] = times[0]
611
612                        # ******* start retrievement
613                        self._start_retrievement(request, retr_param_dict)
614
615                        if (pk != 'OG_OROLSM__SL' and
616                            int(retr_param_dict['step'].split('/')[0]) == 0 and
617                            int(timesave.split('/')[0]) == 0):
618
619                            retr_param_dict['date'] = \
620                                datetime.strftime(elimit, '%Y%m%d')
621                            retr_param_dict['time'] = '00'
622                            retr_param_dict['step'] = '000'
623                            retr_param_dict['target'] = \
624                                self._mk_targetname(ftype, pk,
625                                                    retr_param_dict['date'])
626
627                            # ******* start retrievement
628                            self._start_retrievement(request, retr_param_dict)
629
630        if request == 0 or request == 2:
631            print('MARS retrieve done ... ')
632        elif request == 1:
633            print('MARS request printed ...')
634
635        return
636
637
638    def write_namelist(self, c):
639        '''Creates a namelist file in the temporary directory and writes
640        the following values to it: maxl, maxb, mlevel,
641        mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,
642        momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta
643
644        Parameters
645        ----------
646        c : :obj:`ControlFile`
647            Contains all the parameters of CONTROL file and
648            command line.
649
650        filename : :obj:`string`
651                Name of the namelist file.
652
653        Return
654        ------
655
656        '''
657
658        from genshi.template.text import NewTextTemplate
659        from genshi.template import  TemplateLoader
660
661        loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
662        namelist_template = loader.load(_config.TEMPFILE_NAMELIST,
663                                       cls=NewTextTemplate)
664
665        self.inputdir = c.inputdir
666        area = np.asarray(self.area.split('/')).astype(float)
667        grid = np.asarray(self.grid.split('/')).astype(float)
668
669        if area[1] > area[3]:
670            area[1] -= 360
671        maxl = int((area[3] - area[1]) / grid[1]) + 1
672        maxb = int((area[0] - area[2]) / grid[0]) + 1
673
674        stream = namelist_template.generate(
675            maxl = str(maxl),
676            maxb = str(maxb),
677            mlevel = str(self.level),
678            mlevelist = str(self.levelist),
679            mnauf = str(self.resol),
680            metapar = '77',
681            rlo0 = str(area[1]),
682            rlo1 = str(area[3]),
683            rla0 = str(area[2]),
684            rla1 = str(area[0]),
685            momega = str(c.omega),
686            momegadiff = str(c.omegadiff),
687            mgauss = str(c.gauss),
688            msmooth = str(c.smooth),
689            meta = str(c.eta),
690            metadiff = str(c.etadiff),
691            mdpdeta = str(c.dpdeta)
692        )
693
694        namelistfile = os.path.join(self.inputdir, _config.FILE_NAMELIST)
695
696        with open(namelistfile, 'w') as f:
697            f.write(stream.render('text'))
698
699        return
700
701
702    def deacc_fluxes(self, inputfiles, c):
703        '''Goes through all flux fields in ordered time and de-accumulate
704        the fields. Afterwards the fields are disaggregated in time.
705        Different versions of disaggregation is provided for rainfall
706        data (darain, modified linear) and the surface fluxes and
707        stress data (dapoly, cubic polynomial).
708
709        Parameters
710        ----------
711        inputfiles : :obj:`UioFiles`
712            Contains a list of files.
713
714        c : :obj:`ControlFile`
715            Contains all the parameters of CONTROL file and
716            command line.
717
718        Return
719        ------
720
721        '''
722
723        table128 = init128(_config.PATH_GRIBTABLE)
724        pars = to_param_id(self.params['OG_acc_SL'][0], table128)
725
726        iid = None
727        index_vals = None
728
729        # get the values of the keys which are used for distinct access
730        # of grib messages via product
731        index_keys = ["date", "time", "step"]
732        iid, index_vals = self._mk_index_values(c.inputdir,
733                                                inputfiles,
734                                                index_keys)
735        # index_vals looks like e.g.:
736        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
737        # index_vals[1]: ('0', '1200', '1800', '600') ; time
738        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
739
740        # initialize dictionaries
741        valsdict = {}
742        svalsdict = {}
743        for p in pars:
744            valsdict[str(p)] = []
745            svalsdict[str(p)] = []
746
747        # "product" genereates each possible combination between the
748        # values of the index keys
749        for prod in product(*index_vals):
750            # e.g. prod = ('20170505', '0', '12')
751            #             (  date    ,time, step)
752
753            print('CURRENT PRODUCT: ', prod)
754
755            for i in range(len(index_keys)):
756                codes_index_select(iid, index_keys[i], prod[i])
757
758            # get first id from current product
759            gid = codes_new_from_index(iid)
760
761            # if there is no data for this specific time combination / product
762            # skip the rest of the for loop and start with next timestep/product
763            if not gid:
764                continue
765
766            # create correct timestamp from the three time informations
767            cdate = str(codes_get(gid, 'date'))
768            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
769            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
770            t_date = datetime.strptime(cdate + ctime, '%Y%m%d%H')
771            t_dt = t_date + timedelta(hours=int(cstep))
772            t_m1dt = t_date + timedelta(hours=int(cstep)-int(c.dtime))
773            t_m2dt = t_date + timedelta(hours=int(cstep)-2*int(c.dtime))
774            t_enddate = None
775
776            if c.maxstep > 12:
777                fnout = os.path.join(c.inputdir, 'flux' +
778                                     t_date.strftime('%Y%m%d.%H') +
779                                     '.{:0>3}'.format(step-2*int(c.dtime)))
780                gnout = os.path.join(c.inputdir, 'flux' +
781                                     t_date.strftime('%Y%m%d.%H') +
782                                     '.{:0>3}'.format(step-int(c.dtime)))
783                hnout = os.path.join(c.inputdir, 'flux' +
784                                     t_date.strftime('%Y%m%d.%H') +
785                                     '.{:0>3}'.format(step))
786            else:
787                fnout = os.path.join(c.inputdir, 'flux' +
788                                     t_m2dt.strftime('%Y%m%d%H'))
789                gnout = os.path.join(c.inputdir, 'flux' +
790                                     t_m1dt.strftime('%Y%m%d%H'))
791                hnout = os.path.join(c.inputdir, 'flux' +
792                                     t_dt.strftime('%Y%m%d%H'))
793
794            print("outputfile = " + fnout)
795            f_handle = open(fnout, 'w')
796            h_handle = open(hnout, 'w')
797            g_handle = open(gnout, 'w')
798
799            # read message for message and store relevant data fields
800            # data keywords are stored in pars
801            while 1:
802                if not gid:
803                    break
804                cparamId = str(codes_get(gid, 'paramId'))
805                step = codes_get(gid, 'step')
806                time = codes_get(gid, 'time')
807                ni = codes_get(gid, 'Ni')
808                nj = codes_get(gid, 'Nj')
809                if cparamId in valsdict.keys():
810                    values = codes_get_values(gid)
811                    vdp = valsdict[cparamId]
812                    svdp = svalsdict[cparamId]
813
814                    if cparamId == '142' or cparamId == '143':
815                        fak = 1. / 1000.
816                    else:
817                        fak = 3600.
818
819                    values = (np.reshape(values, (nj, ni))).flatten() / fak
820                    vdp.append(values[:])  # save the accumulated values
821                    if c.marsclass.upper() == 'EA' or \
822                       step <= int(c.dtime):
823                        svdp.append(values[:] / int(c.dtime))
824                    else:  # deaccumulate values
825                        svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime))
826
827                    print(cparamId, time, step, len(values),
828                          values[0], np.std(values))
829
830                    # len(svdp) corresponds to the time
831                    if len(svdp) >= 3:
832                        if len(svdp) > 3:
833                            if cparamId == '142' or cparamId == '143':
834                                values = disaggregation.darain(svdp)
835                            else:
836                                values = disaggregation.dapoly(svdp)
837
838                            if not (step == c.maxstep and c.maxstep > 12 \
839                                    or t_dt == t_enddate):
840                                vdp.pop(0)
841                                svdp.pop(0)
842                        else:
843                            if c.maxstep > 12:
844                                values = svdp[1]
845                            else:
846                                values = svdp[0]
847
848                        codes_set_values(gid, values)
849
850                        if c.maxstep > 12:
851                            codes_set(gid, 'stepRange', max(0, step-2*int(c.dtime)))
852                        else:
853                            codes_set(gid, 'stepRange', 0)
854                            codes_set(gid, 'time', t_m2dt.hour*100)
855                            codes_set(gid, 'date', int(t_m2dt.strftime('%Y%m%d')))
856
857                        codes_write(gid, f_handle)
858
859                        if c.basetime:
860                            t_enddate = datetime.strptime(c.end_date +
861                                                          c.basetime,
862                                                          '%Y%m%d%H')
863                        else:
864                            t_enddate = t_date + timedelta(2*int(c.dtime))
865
866                        # squeeze out information of last two steps contained
867                        # in svdp
868                        # if step+int(c.dtime) == c.maxstep and c.maxstep>12
869                        # or t_dt+timedelta(hours = int(c.dtime))
870                        # >= t_enddate:
871                        # Note that svdp[0] has not been popped in this case
872
873                        if step == c.maxstep and c.maxstep > 12 or \
874                           t_dt == t_enddate:
875
876                            values = svdp[3]
877                            codes_set_values(gid, values)
878                            codes_set(gid, 'stepRange', 0)
879                            truedatetime = t_m2dt + timedelta(hours=
880                                                              2*int(c.dtime))
881                            codes_set(gid, 'time', truedatetime.hour * 100)
882                            codes_set(gid, 'date', truedatetime.year * 10000 +
883                                      truedatetime.month * 100 +
884                                      truedatetime.day)
885                            codes_write(gid, h_handle)
886
887                            #values = (svdp[1]+svdp[2])/2.
888                            if cparamId == '142' or cparamId == '143':
889                                values = disaggregation.darain(list(reversed(svdp)))
890                            else:
891                                values = disaggregation.dapoly(list(reversed(svdp)))
892
893                            codes_set(gid, 'stepRange', 0)
894                            truedatetime = t_m2dt + timedelta(hours=int(c.dtime))
895                            codes_set(gid, 'time', truedatetime.hour * 100)
896                            codes_set(gid, 'date', truedatetime.year * 10000 +
897                                      truedatetime.month * 100 +
898                                      truedatetime.day)
899                            codes_set_values(gid, values)
900                            codes_write(gid, g_handle)
901
902                codes_release(gid)
903
904                gid = codes_new_from_index(iid)
905
906            f_handle.close()
907            g_handle.close()
908            h_handle.close()
909
910        codes_index_release(iid)
911
912        return
913
914
915    def create(self, inputfiles, c):
916        '''An index file will be created which depends on the combination
917        of "date", "time" and "stepRange" values. This is used to iterate
918        over all messages in each grib file which were passed through the
919        parameter "inputfiles" to seperate specific parameters into fort.*
920        files. Afterwards the FORTRAN program is called to convert
921        the data fields all to the same grid and put them in one file
922        per unique time step (combination of "date", "time" and
923        "stepRange").
924
925        Note
926        ----
927        This method is based on the ECMWF example index.py
928        https://software.ecmwf.int/wiki/display/GRIB/index.py
929
930        Parameters
931        ----------
932        inputfiles : :obj:`UioFiles`
933            Contains a list of files.
934
935        c : :obj:`ControlFile`
936            Contains all the parameters of CONTROL file and
937            command line.
938
939        Return
940        ------
941
942        '''
943
944        if c.wrf:
945            table128 = init128(_config.PATH_GRIBTABLE)
946            wrfpars = to_param_id('sp/mslp/skt/2t/10u/10v/2d/z/lsm/sst/ci/sd/\
947                                   stl1/stl2/stl3/stl4/swvl1/swvl2/swvl3/swvl4',
948                                  table128)
949
950        # these numbers are indices for the temporary files "fort.xx"
951        # which are used to seperate the grib fields to,
952        # for the Fortran program input
953        # 10: U,V | 11: T | 12: lnsp | 13: D | 16: sfc fields
954        # 17: Q | 18: Q , gaussian| 19: w | 21: etadot | 22: clwc+ciwc
955        fdict = {'10':None, '11':None, '12':None, '13':None, '16':None,
956                 '17':None, '18':None, '19':None, '21':None, '22':None}
957
958        iid = None
959        index_vals = None
960
961        # get the values of the keys which are used for distinct access
962        # of grib messages via product
963        index_keys = ["date", "time", "step"]
964        iid, index_vals = self._mk_index_values(c.inputdir,
965                                                inputfiles,
966                                                index_keys)
967        # index_vals looks like e.g.:
968        # index_vals[0]: ('20171106', '20171107', '20171108') ; date
969        # index_vals[1]: ('0', '1200', '1800', '600') ; time
970        # index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
971
972        # "product" genereates each possible combination between the
973        # values of the index keys
974        for prod in product(*index_vals):
975            # e.g. prod = ('20170505', '0', '12')
976            #             (  date    ,time, step)
977
978            print('current product: ', prod)
979
980            for i in range(len(index_keys)):
981                codes_index_select(iid, index_keys[i], prod[i])
982
983            # get first id from current product
984            gid = codes_new_from_index(iid)
985
986            # if there is no data for this specific time combination / product
987            # skip the rest of the for loop and start with next timestep/product
988            if not gid:
989                continue
990
991            # remove old fort.* files and open new ones
992            # they are just valid for a single product
993            for k, f in fdict.iteritems():
994                fortfile = os.path.join(c.inputdir, 'fort.' + k)
995                silent_remove(fortfile)
996                fdict[k] = open(fortfile, 'w')
997
998            # create correct timestamp from the three time informations
999            cdate = str(codes_get(gid, 'date'))
1000            ctime = '{:0>2}'.format(codes_get(gid, 'time')/100)
1001            cstep = '{:0>3}'.format(codes_get(gid, 'step'))
1002            timestamp = datetime.strptime(cdate + ctime, '%Y%m%d%H')
1003            timestamp += timedelta(hours=int(cstep))
1004            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
1005
1006            # if the timestamp is out of basetime start/end date period,
1007            # skip this specific product
1008            if c.basetime:
1009                start_time = datetime.strptime(c.end_date + c.basetime,
1010                                                '%Y%m%d%H') - time_delta
1011                end_time = datetime.strptime(c.end_date + c.basetime,
1012                                             '%Y%m%d%H')
1013                if timestamp < start_time or timestamp > end_time:
1014                    continue
1015
1016            if c.wrf:
1017                if 'olddate' not in locals() or cdate != olddate:
1018                    fwrf = open(os.path.join(c.outputdir,
1019                                'WRF' + cdate + '.' + ctime + '.000.grb2'), 'w')
1020                    olddate = cdate[:]
1021
1022            # savedfields remembers which fields were already used.
1023            savedfields = []
1024            # sum of cloud liquid and ice water content
1025            scwc = None
1026            while 1:
1027                if not gid:
1028                    break
1029                paramId = codes_get(gid, 'paramId')
1030                gridtype = codes_get(gid, 'gridType')
1031                levtype = codes_get(gid, 'typeOfLevel')
1032                if paramId == 77: # ETADOT
1033                    codes_write(gid, fdict['21'])
1034                elif paramId == 130: # T
1035                    codes_write(gid, fdict['11'])
1036                elif paramId == 131 or paramId == 132: # U, V wind component
1037                    codes_write(gid, fdict['10'])
1038                elif paramId == 133 and gridtype != 'reduced_gg': # Q
1039                    codes_write(gid, fdict['17'])
1040                elif paramId == 133 and gridtype == 'reduced_gg': # Q, gaussian
1041                    codes_write(gid, fdict['18'])
1042                elif paramId == 135: # W
1043                    codes_write(gid, fdict['19'])
1044                elif paramId == 152: # LNSP
1045                    codes_write(gid, fdict['12'])
1046                elif paramId == 155 and gridtype == 'sh': # D
1047                    codes_write(gid, fdict['13'])
1048                elif paramId == 246 or paramId == 247: # CLWC, CIWC
1049                    # sum cloud liquid water and ice
1050                    if scwc is None:
1051                        scwc = codes_get_values(gid)
1052                    else:
1053                        scwc += codes_get_values(gid)
1054                        codes_set_values(gid, scwc)
1055                        codes_set(gid, 'paramId', 201031)
1056                        codes_write(gid, fdict['22'])
1057                elif c.wrf and paramId in [129, 138, 155] and \
1058                      levtype == 'hybrid': # Z, VO, D
1059                    # do not do anything right now
1060                    # these are specific parameter for WRF
1061                    pass
1062                else:
1063                    if paramId not in savedfields:
1064                        # SD/MSL/TCC/10U/10V/2T/2D/Z/LSM/SDOR/CVL/CVH/SR
1065                        # and all ADDPAR parameter
1066                        codes_write(gid, fdict['16'])
1067                        savedfields.append(paramId)
1068                    else:
1069                        print('duplicate ' + str(paramId) + ' not written')
1070
1071                try:
1072                    if c.wrf:
1073                        # model layer
1074                        if levtype == 'hybrid' and \
1075                           paramId in [129, 130, 131, 132, 133, 138, 155]:
1076                            codes_write(gid, fwrf)
1077                        # sfc layer
1078                        elif paramId in wrfpars:
1079                            codes_write(gid, fwrf)
1080                except AttributeError:
1081                    pass
1082
1083                codes_release(gid)
1084                gid = codes_new_from_index(iid)
1085
1086            for f in fdict.values():
1087                f.close()
1088
1089            # call for Fortran program to convert e.g. reduced_gg grids to
1090            # regular_ll and calculate detadot/dp
1091            pwd = os.getcwd()
1092            os.chdir(c.inputdir)
1093            if os.stat('fort.21').st_size == 0 and c.eta:
1094                print('Parameter 77 (etadot) is missing, most likely it is \
1095                       not available for this type or date/time\n')
1096                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
1097                my_error(c.mailfail, 'fort.21 is empty while parameter eta \
1098                         is set to 1 in CONTROL file')
1099
1100            # write out all output to log file before starting fortran programm
1101            sys.stdout.flush()
1102
1103            # Fortran program creates file fort.15 (with u,v,etadot,t,sp,q)
1104            p = subprocess.check_call([os.path.join(
1105                c.exedir, _config.FORTRAN_EXECUTABLE)], shell=True)
1106            os.chdir(pwd)
1107
1108            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
1109            if c.maxstep > 12:
1110                suffix = cdate[2:8] + '.' + ctime + '.' + cstep
1111            else:
1112                suffix = cdate_hour[2:10]
1113            fnout = os.path.join(c.inputdir, c.prefix + suffix)
1114            print("outputfile = " + fnout)
1115            # collect for final processing
1116            self.outputfilelist.append(os.path.basename(fnout))
1117
1118            # create outputfile and copy all data from intermediate files
1119            # to the outputfile (final GRIB input files for FLEXPART)
1120            orolsm = os.path.basename(glob.glob(c.inputdir +
1121                                        '/OG_OROLSM__SL.*.' + c.ppid + '*')[0])
1122            fluxfile = 'flux' + cdate[0:2] + suffix
1123            if not c.cwc:
1124                flist = ['fort.15', fluxfile, 'fort.16', orolsm]
1125            else:
1126                flist = ['fort.15', 'fort.22', fluxfile, 'fort.16', orolsm]
1127
1128            with open(fnout, 'wb') as fout:
1129                for f in flist:
1130                    shutil.copyfileobj(open(os.path.join(c.inputdir, f), 'rb'),
1131                                       fout)
1132
1133            if c.omega:
1134                with open(os.path.join(c.outputdir, 'OMEGA'), 'wb') as fout:
1135                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
1136                                            'rb'), fout)
1137
1138        if c.wrf:
1139            fwrf.close()
1140
1141        codes_index_release(iid)
1142
1143        return
1144
1145
1146    def process_output(self, c):
1147        '''The grib files are postprocessed depending on the selection in
1148        CONTROL file. The resulting files are moved to the output
1149        directory if its not equal to the input directory.
1150        The following modifications might be done if
1151        properly switched in CONTROL file:
1152        GRIB2 - Conversion to GRIB2
1153        ECTRANS - Transfer of files to gateway server
1154        ECSTORAGE - Storage at ECMWF server
1155
1156        Parameters
1157        ----------
1158        c : :obj:`ControlFile`
1159            Contains all the parameters of CONTROL file and
1160            command line.
1161
1162        Return
1163        ------
1164
1165        '''
1166
1167        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
1168
1169        if not c.ecapi:
1170            print('ecstorage: {}\n ecfsdir: {}\n'.
1171                  format(c.ecstorage, c.ecfsdir))
1172            print('ectrans: {}\n gateway: {}\n destination: {}\n '
1173                  .format(c.ectrans, c.gateway, c.destination))
1174
1175        print('Output filelist: ')
1176        print(self.outputfilelist)
1177
1178        for ofile in self.outputfilelist:
1179            ofile = os.path.join(self.inputdir, ofile)
1180
1181            if c.format.lower() == 'grib2':
1182                p = subprocess.check_call(['codes_set', '-s', 'edition=2,',
1183                                           'productDefinitionTemplateNumber=8',
1184                                           ofile, ofile + '_2'])
1185                p = subprocess.check_call(['mv', ofile + '_2', ofile])
1186
1187            if c.ectrans and not c.ecapi:
1188                p = subprocess.check_call(['ectrans', '-overwrite', '-gateway',
1189                                           c.gateway, '-remote', c.destination,
1190                                           '-source', ofile])
1191
1192            if c.ecstorage and not c.ecapi:
1193                p = subprocess.check_call(['ecp', '-o', ofile,
1194                                           os.path.expandvars(c.ecfsdir)])
1195
1196            if c.outputdir != c.inputdir:
1197                p = subprocess.check_call(['mv',
1198                                           os.path.join(c.inputdir, ofile),
1199                                           c.outputdir])
1200
1201        return
1202
1203
1204    def prepare_fp_files(self, c):
1205        '''Conversion of GRIB files to FLEXPART binary format.
1206
1207        Parameters
1208        ----------
1209        c : :obj:`ControlFile`
1210            Contains all the parameters of CONTROL file and
1211            command line.
1212
1213        Return
1214        ------
1215
1216        '''
1217        # generate AVAILABLE file
1218        # Example of AVAILABLE file data:
1219        # 20131107 000000      EN13110700              ON DISC
1220        clist = []
1221        for ofile in self.outputfilelist:
1222            fname = ofile.split('/')
1223            if '.' in fname[-1]:
1224                l = fname[-1].split('.')
1225                timestamp = datetime.strptime(l[0][-6:] + l[1],
1226                                              '%y%m%d%H')
1227                timestamp += timedelta(hours=int(l[2]))
1228                cdate = datetime.strftime(timestamp, '%Y%m%d')
1229                chms = datetime.strftime(timestamp, '%H%M%S')
1230            else:
1231                cdate = '20' + fname[-1][-8:-2]
1232                chms = fname[-1][-2:] + '0000'
1233            clist.append(cdate + ' ' + chms + ' '*6 +
1234                         fname[-1] + ' '*14 + 'ON DISC')
1235        clist.sort()
1236        with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f:
1237            f.write('\n'.join(clist) + '\n')
1238
1239        # generate pathnames file
1240        pwd = os.path.abspath(c.outputdir)
1241        with open(pwd + '/pathnames', 'w') as f:
1242            f.write(pwd + '/Options/\n')
1243            f.write(pwd + '/\n')
1244            f.write(pwd + '/\n')
1245            f.write(pwd + '/AVAILABLE\n')
1246            f.write(' = == = == = == = == = == ==  = \n')
1247
1248        # create Options dir if necessary
1249        if not os.path.exists(pwd + '/Options'):
1250            make_dir(pwd+'/Options')
1251
1252        # read template COMMAND file
1253        with open(os.path.expandvars(os.path.expanduser(
1254            c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f:
1255            lflist = f.read().split('\n')
1256
1257        # find index of list where to put in the
1258        # date and time information
1259        # usually after the LDIRECT parameter
1260        i = 0
1261        for l in lflist:
1262            if 'LDIRECT' in l.upper():
1263                break
1264            i += 1
1265
1266        # insert the date and time information of run start and end
1267        # into the list of lines of COMMAND file
1268        lflist = lflist[:i+1] + \
1269                 [clist[0][:16], clist[-1][:16]] + \
1270                 lflist[i+3:]
1271
1272        # write the new COMMAND file
1273        with open(pwd + '/Options/COMMAND', 'w') as g:
1274            g.write('\n'.join(lflist) + '\n')
1275
1276        # change to outputdir and start the grib2flexpart run
1277        # afterwards switch back to the working dir
1278        os.chdir(c.outputdir)
1279        p = subprocess.check_call([
1280            os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts))
1281            + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.'])
1282        os.chdir(pwd)
1283
1284        return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG