source: flex_extract.git/source/python/mods/tools.py @ e446e85

ctbtodev
Last change on this file since e446e85 was d4696e0, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

bugfix retrievement with basetime parameter

  • Property mode set to 100644
File size: 24.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Philipp (University of Vienna)
5#
6# @Date: May 2018
7#
8# @Change History:
9#    October 2014 - Anne Fouilloux (University of Oslo)
10#        - created functions silent_remove and product (taken from ECMWF)
11#
12#    November 2015 - Leopold Haimberger (University of Vienna)
13#        - created functions: interpret_args_and_control, clean_up
14#          my_error, normal_exit, init128, to_param_id
15#
16#    April - December 2018 - Anne Philipp (University of Vienna):
17#        - applied PEP8 style guide
18#        - added documentation
19#        - moved all non class methods from former file Flexparttools in here
20#        - seperated args and control interpretation
21#        - added functions get_list_as_string, read_ecenv, send_mail, make_dir,
22#          put_file_to_ecserver, submit_job_to_ecserver, get_informations,
23#          get_dimensions, execute_subprocess, none_or_int, none_or_str
24#
25# @License:
26#    (C) Copyright 2014-2019.
27#    Anne Philipp, Leopold Haimberger
28#
29#    This work is licensed under the Creative Commons Attribution 4.0
30#    International License. To view a copy of this license, visit
31#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
32#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
33#
34# @Methods:
35#    none_or_str
36#    none_or_int
37#    get_cmdline_args
38#    read_ecenv
39#    clean_up
40#    my_error
41#    send_mail
42#    normal_exit
43#    product
44#    silent_remove
45#    init128
46#    to_param_id
47#    get_list_as_string
48#    make_dir
49#    put_file_to_ecserver
50#    submit_job_to_ecserver
51#    get_informations
52#    get_dimensions
53#    execute_subprocess
54#*******************************************************************************
55'''This module contains a collection of diverse tasks within flex_extract.
56'''
57
58# ------------------------------------------------------------------------------
59# MODULES
60# ------------------------------------------------------------------------------
61import os
62import errno
63import sys
64import glob
65import subprocess
66import traceback
67import exceptions
68from datetime import datetime
69from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
70
71# ------------------------------------------------------------------------------
72# METHODS
73# ------------------------------------------------------------------------------
74
75def none_or_str(value):
76    '''Converts the input string into pythons None-type if the string
77    contains string "None".
78
79    Parameters
80    ----------
81    value : str
82        String to be checked for the "None" word.
83
84    Return
85    ------
86    None or value:
87        Return depends on the content of the input value. If it was "None",
88        then the python type None is returned. Otherwise the string itself.
89    '''
90    if value == 'None':
91        return None
92    return value
93
94def none_or_int(value):
95    '''Converts the input string into pythons None-type if the string
96    contains string "None". Otherwise it is converted to an integer value.
97
98    Parameters
99    ----------
100    value : str
101        String to be checked for the "None" word.
102
103    Return
104    ------
105    None or int(value):
106        Return depends on the content of the input value. If it was "None",
107        then the python type None is returned. Otherwise the string is
108        converted into an integer value.
109    '''
110    if value == 'None':
111        return None
112    return int(value)
113
114def get_cmdline_args():
115    '''Decomposes the command line arguments and assigns them to variables.
116    Apply default values for non mentioned arguments.
117
118    Parameters
119    ----------
120
121    Return
122    ------
123    args : Namespace
124        Contains the commandline arguments from script/program call.
125    '''
126
127    parser = ArgumentParser(description='Retrieve FLEXPART input from \
128                                ECMWF MARS archive',
129                            formatter_class=ArgumentDefaultsHelpFormatter)
130
131    # control parameters that override control file values
132    parser.add_argument("--start_date", dest="start_date",
133                        type=none_or_str, default=None,
134                        help="start date YYYYMMDD")
135    parser.add_argument("--end_date", dest="end_date",
136                        type=none_or_str, default=None,
137                        help="end_date YYYYMMDD")
138    parser.add_argument("--date_chunk", dest="date_chunk",
139                        type=none_or_int, default=None,
140                        help="# of days to be retrieved at once")
141    parser.add_argument("--job_chunk", dest="job_chunk",
142                        type=none_or_int, default=None,
143                        help="# of days to be retrieved within a single job")
144    parser.add_argument("--controlfile", dest="controlfile",
145                        type=none_or_str, default='CONTROL.temp',
146                        help="file with CONTROL parameters")
147    parser.add_argument("--basetime", dest="basetime",
148                        type=none_or_int, default=None,
149                        help="base such as 0 or 12 (for half day retrievals)")
150    parser.add_argument("--step", dest="step",
151                        type=none_or_str, default=None,
152                        help="steps such as 00/to/48")
153    parser.add_argument("--levelist", dest="levelist",
154                        type=none_or_str, default=None,
155                        help="Vertical levels to be retrieved, e.g. 30/to/60")
156    parser.add_argument("--area", dest="area",
157                        type=none_or_str, default=None,
158                        help="area defined as north/west/south/east")
159
160    # some switches
161    parser.add_argument("--debug", dest="debug",
162                        type=none_or_int, default=None,
163                        help="debug mode - leave temporary files intact")
164    parser.add_argument("--oper", dest="oper",
165                        type=none_or_int, default=None,
166                        help="operational mode - prepares dates with \
167                        environment variables")
168    parser.add_argument("--request", dest="request",
169                        type=none_or_int, default=None,
170                        help="list all mars requests in file mars_requests.dat")
171    parser.add_argument("--public", dest="public",
172                        type=none_or_int, default=None,
173                        help="public mode - retrieves the public datasets")
174    parser.add_argument("--rrint", dest="rrint",
175                        type=none_or_int, default=None,
176                        help="select old or new precipitation interpolation \
177                        0 - old method\
178                        1 - new method (additional subgrid points)")
179
180    # set directories
181    parser.add_argument("--inputdir", dest="inputdir",
182                        type=none_or_str, default=None,
183                        help="root directory for storing intermediate files")
184    parser.add_argument("--outputdir", dest="outputdir",
185                        type=none_or_str, default=None,
186                        help="root directory for storing output files")
187    parser.add_argument("--flexpartdir", dest="flexpartdir",
188                        type=none_or_str, default=None,
189                        help="FLEXPART root directory (to find grib2flexpart \
190                        and COMMAND file)\n Normally flex_extract resides in \
191                        the scripts directory of the FLEXPART distribution")
192
193    # this is only used by prepare_flexpart.py to rerun a postprocessing step
194    parser.add_argument("--ppid", dest="ppid",
195                        type=none_or_str, default=None,
196                        help="specify parent process id for \
197                        rerun of prepare_flexpart")
198
199    # arguments for job submission to ECMWF, only needed by submit.py
200    parser.add_argument("--job_template", dest='job_template',
201                        type=none_or_str, default="job.temp",
202                        help="job template file for submission to ECMWF")
203    parser.add_argument("--queue", dest="queue",
204                        type=none_or_str, default=None,
205                        help="queue for submission to ECMWF \
206                        (e.g. ecgate or cca )")
207
208    args = parser.parse_args()
209
210    return args
211
212def read_ecenv(filepath):
213    '''Reads the file into a dictionary where the key values are the parameter
214    names.
215
216    Parameters
217    ----------
218    filepath : str
219        Path to file where the ECMWF environment parameters are stored.
220
221    Return
222    ------
223    envs : dict
224        Contains the environment parameter ecuid, ecgid, gateway
225        and destination for ECMWF server environments.
226    '''
227    envs= {}
228    try:
229        with open(filepath, 'r') as f:
230            for line in f:
231                data = line.strip().split()
232                envs[str(data[0])] = str(data[1])
233    except OSError as e:
234        print('... ERROR CODE: ' + str(e.errno))
235        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
236
237        sys.exit('\n... Error occured while trying to read ECMWF_ENV '
238                     'file: ' + str(filepath))
239
240    return envs
241
242def clean_up(c):
243    '''Remove files from the intermediate directory (inputdir).
244
245    It keeps the final FLEXPART input files if program runs without
246    ECMWF Api and keywords "ectrans" or "ecstorage" are set to "1".
247
248    Parameters
249    ----------
250    c : ControlFile
251        Contains all the parameters of CONTROL file and
252        command line.
253
254    Return
255    ------
256
257    '''
258
259    print("... clean inputdir!")
260
261    cleanlist = glob.glob(os.path.join(c.inputdir, "*"))
262
263    if cleanlist:
264        for element in cleanlist:
265            if c.prefix not in element:
266                silent_remove(element)
267            if c.ecapi is False and (c.ectrans == 1 or c.ecstorage == 1):
268                silent_remove(element)
269        print("... done!")
270    else:
271        print("... nothing to clean!")
272
273    return
274
275
276def my_error(users, message='ERROR'):
277    '''Prints a specified error message which can be passed to the function
278    before exiting the program.
279
280    Parameters
281    ----------
282    user : list of str
283        Contains all email addresses which should be notified.
284        It might also contain just the ecmwf user name which wil trigger
285        mailing to the associated email address for this user.
286
287    message : str, optional
288        Error message. Default value is "ERROR".
289
290    Return
291    ------
292
293    '''
294
295    trace = '\n'.join(traceback.format_stack())
296    full_message = message + '\n\n' + trace
297
298    print(full_message)
299
300    send_mail(users, 'ERROR', full_message)
301
302    sys.exit(1)
303
304    return
305
306
307def send_mail(users, success_mode, message):
308    '''Prints a specific exit message which can be passed to the function.
309
310    Parameters
311    ----------
312    users : list of str
313        Contains all email addresses which should be notified.
314        It might also contain just the ecmwf user name which wil trigger
315        mailing to the associated email address for this user.
316
317    success_mode : str
318        States the exit mode of the program to put into
319        the mail subject line.
320
321    message : str, optional
322        Message for exiting program. Default value is "Done!".
323
324    Return
325    ------
326
327    '''
328
329    for user in users:
330        if '${USER}' in user:
331            user = os.getenv('USER')
332        try:
333            p = subprocess.Popen(['mail', '-s flex_extract_v7.1 ' +
334                                  success_mode, os.path.expandvars(user)],
335                                 stdin=subprocess.PIPE,
336                                 stdout=subprocess.PIPE,
337                                 stderr=subprocess.PIPE,
338                                 bufsize=1)
339            pout = p.communicate(input=message + '\n\n')[0]
340        except ValueError as e:
341            print('... ERROR: ' + str(e))
342            sys.exit('... Email could not be sent!')
343        except OSError as e:
344            print('... ERROR CODE: ' + str(e.errno))
345            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
346            sys.exit('... Email could not be sent!')
347        else:
348            print('Email sent to ' + os.path.expandvars(user))
349
350    return
351
352
353def normal_exit(message='Done!'):
354    '''Prints a specific exit message which can be passed to the function.
355
356    Parameters
357    ----------
358    message : str, optional
359        Message for exiting program. Default value is "Done!".
360
361    Return
362    ------
363
364    '''
365
366    print(str(message))
367
368    return
369
370
371def product(*args, **kwds):
372    '''Creates combinations of all passed arguments.
373
374    This method combines the single characters of the passed arguments
375    with each other. So that each character of each argument value
376    will be combined with each character of the other arguments as a tuple.
377
378    Note
379    ----
380    This method is taken from an example at the ECMWF wiki website.
381    https://software.ecmwf.int/wiki/display/GRIB/index.py; 2018-03-16
382
383    Example
384    -------
385    product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
386
387    product(range(2), repeat = 3) --> 000 001 010 011 100 101 110 111
388
389    Parameters
390    ----------
391    \*args : list or str
392        Positional arguments (arbitrary number).
393
394    \*\*kwds : dict
395        Contains all the keyword arguments from \*args.
396
397    Return
398    ------
399    prod : :obj:`tuple`
400        Return will be done with "yield". A tuple of combined arguments.
401        See example in description above.
402    '''
403    try:
404        pools = map(tuple, args) * kwds.get('repeat', 1)
405        result = [[]]
406        for pool in pools:
407            result = [x + [y] for x in result for y in pool]
408        for prod in result:
409            yield tuple(prod)
410    except TypeError as e:
411        sys.exit('... PRODUCT GENERATION FAILED!')
412
413    return
414
415
416def silent_remove(filename):
417    '''Remove file if it exists.
418    The function does not fail if the file does not exist.
419
420    Parameters
421    ----------
422    filename : str
423        The name of the file to be removed without notification.
424
425    Return
426    ------
427
428    '''
429    try:
430        os.remove(filename)
431    except OSError as e:
432        # errno.ENOENT  =  no such file or directory
433        if e.errno == errno.ENOENT:
434            pass
435        else:
436            raise  # re-raise exception if a different error occured
437
438    return
439
440
441def init128(filepath):
442    '''Opens and reads the grib file with table 128 information.
443
444    Parameters
445    ----------
446    filepath : str
447        Path to file of ECMWF grib table number 128.
448
449    Return
450    ------
451    table128 : dict
452        Contains the ECMWF grib table 128 information.
453        The key is the parameter number and the value is the
454        short name of the parameter.
455    '''
456    table128 = dict()
457    try:
458        with open(filepath) as f:
459            fdata = f.read().split('\n')
460    except OSError as e:
461        print('... ERROR CODE: ' + str(e.errno))
462        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
463
464        sys.exit('\n... Error occured while trying to read parameter '
465                 'table file: ' + str(filepath))
466    else:
467        for data in fdata:
468            if data[0] != '!':
469                table128[data[0:3]] = data[59:64].strip()
470
471    return table128
472
473
474def to_param_id(pars, table):
475    '''Transform parameter names to parameter ids with ECMWF grib table 128.
476
477    Parameters
478    ----------
479    pars : str
480        Addpar argument from CONTROL file in the format of
481        parameter names instead of ids. The parameter short
482        names are sepearted with "/" and they are passed as
483        one single string.
484
485    table : dict
486        Contains the ECMWF grib table 128 information.
487        The key is the parameter number and the value is the
488        short name of the parameter.
489
490    Return
491    ------
492    ipar : list of int
493        List of addpar parameters from CONTROL file transformed to
494        parameter ids in the format of integer.
495    '''
496    if not pars:
497        return []
498    if not isinstance(pars, str):
499        pars=str(pars)
500
501    cpar = pars.upper().split('/')
502    ipar = []
503    for par in cpar:
504        for k, v in table.iteritems():
505            if par == k or par == v:
506                ipar.append(int(k))
507                break
508        else:
509            print('Warning: par ' + par + ' not found in table 128')
510
511    return ipar
512
513def get_list_as_string(list_obj, concatenate_sign=', '):
514    '''Converts a list of arbitrary content into a single string.
515
516    Parameters
517    ----------
518    list_obj : list of *
519        A list with arbitrary content.
520
521    concatenate_sign : str, optional
522        A string which is used to concatenate the single
523        list elements. Default value is ", ".
524
525    Return
526    ------
527    str_of_list : str
528        The content of the list as a single string.
529    '''
530
531    if not isinstance(list_obj, list):
532        list_obj = list(list_obj)
533    str_of_list = concatenate_sign.join(str(l) for l in list_obj)
534
535    return str_of_list
536
537def make_dir(directory):
538    '''Creates a directory.
539
540    It gives a warning if the directory already exists and skips process.
541    The program stops only if there is another problem.
542
543    Parameters
544    ----------
545    directory : str
546        The path to directory which should be created.
547
548    Return
549    ------
550
551    '''
552    try:
553        os.makedirs(directory)
554    except OSError as e:
555        # errno.EEXIST = directory already exists
556        if e.errno == errno.EEXIST:
557            print('WARNING: Directory {0} already exists!'.format(directory))
558        else:
559            raise # re-raise exception if a different error occured
560
561    return
562
563def put_file_to_ecserver(ecd, filename, target, ecuid, ecgid):
564    '''Uses the ecaccess-file-put command to send a file to the ECMWF servers.
565
566    Note
567    ----
568    The return value is just for testing reasons. It does not have
569    to be used from the calling function since the whole error handling
570    is done in here.
571
572    Parameters
573    ----------
574    ecd : str
575        The path were the file is stored.
576
577    filename : str
578        The name of the file to send to the ECMWF server.
579
580    target : str
581        The target queue where the file should be sent to.
582
583    ecuid : str
584        The user id on ECMWF server.
585
586    ecgid : str
587        The group id on ECMWF server.
588
589    Return
590    ------
591
592    '''
593
594    try:
595        subprocess.check_output(['ecaccess-file-put',
596                                 ecd + '/' + filename,
597                                 target + ':/home/ms/' +
598                                 ecgid + '/' + ecuid +
599                                 '/' + filename],
600                                stderr=subprocess.STDOUT)
601    except subprocess.CalledProcessError as e:
602        print('... ERROR CODE: ' + str(e.returncode))
603        print('... ERROR MESSAGE:\n \t ' + str(e))
604
605        print('\n... Do you have a valid ecaccess certification key?')
606        sys.exit('... ECACCESS-FILE-PUT FAILED!')
607    except OSError as e:
608        print('... ERROR CODE: ' + str(e.errno))
609        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
610
611        print('\n... Most likely the ECACCESS library is not available!')
612        sys.exit('... ECACCESS-FILE-PUT FAILED!')
613
614    return
615
616def submit_job_to_ecserver(target, jobname):
617    '''Uses ecaccess-job-submit command to submit a job to the ECMWF server.
618
619    Note
620    ----
621    The return value is just for testing reasons. It does not have
622    to be used from the calling function since the whole error handling
623    is done in here.
624
625    Parameters
626    ----------
627    target : str
628        The target where the file should be sent to, e.g. the queue.
629
630    jobname : str
631        The name of the jobfile to be submitted to the ECMWF server.
632
633    Return
634    ------
635    job_id : int
636        The id number of the job as a reference at the ecmwf server.
637    '''
638
639    try:
640        job_id = subprocess.check_output(['ecaccess-job-submit', '-queueName',
641                                          target, jobname])
642
643    except subprocess.CalledProcessError as e:
644        print('... ERROR CODE: ' + str(e.returncode))
645        print('... ERROR MESSAGE:\n \t ' + str(e))
646
647        print('\n... Do you have a valid ecaccess certification key?')
648        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
649    except OSError as e:
650        print('... ERROR CODE: ' + str(e.errno))
651        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
652
653        print('\n... Most likely the ECACCESS library is not available!')
654        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
655
656    return job_id
657
658
659def get_informations(filename):
660    '''Gets basic information from an example grib file.
661
662    These information are important for later use and the
663    initialization of numpy arrays for data storing.
664
665    Parameters
666    ----------
667    filename : str
668            Name of the file which will be opened to extract basic information.
669
670    Return
671    ------
672    data : dict
673        Contains basic informations of the ECMWF grib files, e.g.
674        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
675        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
676        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
677        'iDirectionIncrementInDegrees', 'missingValue'
678    '''
679    from eccodes import codes_grib_new_from_file, codes_get, codes_release
680
681    data = {}
682
683    # --- open file ---
684    print("Opening file for getting information data --- %s" % filename)
685    with open(filename) as f:
686        # load first message from file
687        gid = codes_grib_new_from_file(f)
688
689        # information needed from grib message
690        keys = [
691                'Ni',
692                'Nj',
693                'latitudeOfFirstGridPointInDegrees',
694                'longitudeOfFirstGridPointInDegrees',
695                'latitudeOfLastGridPointInDegrees',
696                'longitudeOfLastGridPointInDegrees',
697                'jDirectionIncrementInDegrees',
698                'iDirectionIncrementInDegrees',
699                'missingValue',
700               ]
701
702        print('\nInformations are: ')
703        for key in keys:
704            # Get the value of the key in a grib message.
705            data[key] = codes_get(gid,key)
706            print("%s = %s" % (key,data[key]))
707
708        # Free the memory for the message referred as gribid.
709        codes_release(gid)
710
711    return data
712
713
714def get_dimensions(info, purefc, dtime, index_vals, start_date, end_date):
715    '''This function specifies the correct dimensions for x, y and t.
716
717    Parameters
718    ----------
719    info : dict
720        Contains basic informations of the ECMWF grib files, e.g.
721        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
722        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
723        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
724        'iDirectionIncrementInDegrees', 'missingValue'
725
726    purefc : int
727        Switch for definition of pure forecast mode or not.
728
729    dtime : str
730        Time step in hours.
731
732    index_vals : list of list of str
733        Contains the values from the keys used for a distinct selection
734        of grib messages in processing  the grib files.
735        Content looks like e.g.:
736        index_vals[0]: ('20171106', '20171107', '20171108') ; date
737        index_vals[1]: ('0', '1200', '1800', '600') ; time
738        index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
739
740    start_date : str
741        The start date of the retrieval job.
742
743    end_date : str
744        The end date of the retrieval job.
745
746    Return
747    ------
748    (ix, jy, it) : tuple of int
749        Dimension in x-direction, y-direction and in time.
750    '''
751
752    ix = info['Ni']
753
754    jy = info['Nj']
755
756    if not purefc:
757        it = ((end_date - start_date).days + 1) * 24/int(dtime)
758    else:
759        # #no of step * #no of times * #no of days
760        it = len(index_vals[2]) * len(index_vals[1]) * len(index_vals[0])
761
762    return (ix, jy, it)
763
764
765def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'):
766    '''Executes a command line instruction via a subprocess.
767
768    Error handling is done if an error occures.
769
770    Parameters
771    ----------
772    cmd_list : list of str
773        A list of the components for the command line execution. Each
774        list entry is a single part of the command which is seperated from
775        the rest by a blank space.
776        E.g. ['mv', file1, file2]
777
778    Return
779    ------
780    error_msg : str, optional
781        The possible error message if the subprocess failed.
782        By default it will just tell "SUBPROCESS FAILED!".
783    '''
784
785    try:
786        subprocess.check_call(cmd_list)
787    except subprocess.CalledProcessError as e:
788        print('... ERROR CODE: ' + str(e.returncode))
789        print('... ERROR MESSAGE:\n \t ' + str(e))
790
791        sys.exit('... ' + error_msg)
792    except OSError as e:
793        print('... ERROR CODE: ' + str(e.errno))
794        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
795
796        sys.exit('... ' + error_msg)
797
798    return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG