source: flex_extract.git/source/python/mods/tools.py @ 8778c5a

ctbtodev
Last change on this file since 8778c5a was 8778c5a, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

avoid sending mail if an error occurs because its already in the log file.

  • Property mode set to 100644
File size: 24.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Philipp (University of Vienna)
5#
6# @Date: May 2018
7#
8# @Change History:
9#    October 2014 - Anne Fouilloux (University of Oslo)
10#        - created functions silent_remove and product (taken from ECMWF)
11#
12#    November 2015 - Leopold Haimberger (University of Vienna)
13#        - created functions: interpret_args_and_control, clean_up
14#          my_error, normal_exit, init128, to_param_id
15#
16#    April - December 2018 - Anne Philipp (University of Vienna):
17#        - applied PEP8 style guide
18#        - added documentation
19#        - moved all non class methods from former file Flexparttools in here
20#        - seperated args and control interpretation
21#        - added functions get_list_as_string, read_ecenv, send_mail, make_dir,
22#          put_file_to_ecserver, submit_job_to_ecserver, get_informations,
23#          get_dimensions, execute_subprocess, none_or_int, none_or_str
24#
25# @License:
26#    (C) Copyright 2014-2019.
27#    Anne Philipp, Leopold Haimberger
28#
29#    This work is licensed under the Creative Commons Attribution 4.0
30#    International License. To view a copy of this license, visit
31#    http://creativecommons.org/licenses/by/4.0/ or send a letter to
32#    Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
33#
34# @Methods:
35#    none_or_str
36#    none_or_int
37#    get_cmdline_args
38#    read_ecenv
39#    clean_up
40#    my_error
41#    send_mail
42#    normal_exit
43#    product
44#    silent_remove
45#    init128
46#    to_param_id
47#    get_list_as_string
48#    make_dir
49#    put_file_to_ecserver
50#    submit_job_to_ecserver
51#    get_informations
52#    get_dimensions
53#    execute_subprocess
54#*******************************************************************************
55'''This module contains a collection of diverse tasks within flex_extract.
56'''
57
58# ------------------------------------------------------------------------------
59# MODULES
60# ------------------------------------------------------------------------------
61import os
62import errno
63import sys
64import glob
65import subprocess
66import traceback
67import exceptions
68from datetime import datetime
69from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
70
71# ------------------------------------------------------------------------------
72# METHODS
73# ------------------------------------------------------------------------------
74
75def none_or_str(value):
76    '''Converts the input string into pythons None-type if the string
77    contains string "None".
78
79    Parameters
80    ----------
81    value : str
82        String to be checked for the "None" word.
83
84    Return
85    ------
86    None or value:
87        Return depends on the content of the input value. If it was "None",
88        then the python type None is returned. Otherwise the string itself.
89    '''
90    if value == 'None':
91        return None
92    return value
93
94def none_or_int(value):
95    '''Converts the input string into pythons None-type if the string
96    contains string "None". Otherwise it is converted to an integer value.
97
98    Parameters
99    ----------
100    value : str
101        String to be checked for the "None" word.
102
103    Return
104    ------
105    None or int(value):
106        Return depends on the content of the input value. If it was "None",
107        then the python type None is returned. Otherwise the string is
108        converted into an integer value.
109    '''
110    if value == 'None':
111        return None
112    return int(value)
113
114def get_cmdline_args():
115    '''Decomposes the command line arguments and assigns them to variables.
116    Apply default values for non mentioned arguments.
117
118    Parameters
119    ----------
120
121    Return
122    ------
123    args : Namespace
124        Contains the commandline arguments from script/program call.
125    '''
126
127    parser = ArgumentParser(description='Retrieve FLEXPART input from \
128                                ECMWF MARS archive',
129                            formatter_class=ArgumentDefaultsHelpFormatter)
130
131    # control parameters that override control file values
132    parser.add_argument("--start_date", dest="start_date",
133                        type=none_or_str, default=None,
134                        help="start date YYYYMMDD")
135    parser.add_argument("--end_date", dest="end_date",
136                        type=none_or_str, default=None,
137                        help="end_date YYYYMMDD")
138    parser.add_argument("--date_chunk", dest="date_chunk",
139                        type=none_or_int, default=None,
140                        help="# of days to be retrieved at once")
141    parser.add_argument("--job_chunk", dest="job_chunk",
142                        type=none_or_int, default=None,
143                        help="# of days to be retrieved within a single job")
144    parser.add_argument("--controlfile", dest="controlfile",
145                        type=none_or_str, default='CONTROL.temp',
146                        help="file with CONTROL parameters")
147    parser.add_argument("--basetime", dest="basetime",
148                        type=none_or_int, default=None,
149                        help="base such as 0 or 12 (for half day retrievals)")
150    parser.add_argument("--step", dest="step",
151                        type=none_or_str, default=None,
152                        help="steps such as 00/to/48")
153    parser.add_argument("--levelist", dest="levelist",
154                        type=none_or_str, default=None,
155                        help="Vertical levels to be retrieved, e.g. 30/to/60")
156    parser.add_argument("--area", dest="area",
157                        type=none_or_str, default=None,
158                        help="area defined as north/west/south/east")
159
160    # some switches
161    parser.add_argument("--debug", dest="debug",
162                        type=none_or_int, default=None,
163                        help="debug mode - leave temporary files intact")
164    parser.add_argument("--oper", dest="oper",
165                        type=none_or_int, default=None,
166                        help="operational mode - prepares dates with \
167                        environment variables")
168    parser.add_argument("--request", dest="request",
169                        type=none_or_int, default=None,
170                        help="list all mars requests in file mars_requests.dat")
171    parser.add_argument("--public", dest="public",
172                        type=none_or_int, default=None,
173                        help="public mode - retrieves the public datasets")
174    parser.add_argument("--rrint", dest="rrint",
175                        type=none_or_int, default=None,
176                        help="select old or new precipitation interpolation \
177                        0 - old method\
178                        1 - new method (additional subgrid points)")
179
180    # set directories
181    parser.add_argument("--inputdir", dest="inputdir",
182                        type=none_or_str, default=None,
183                        help="root directory for storing intermediate files")
184    parser.add_argument("--outputdir", dest="outputdir",
185                        type=none_or_str, default=None,
186                        help="root directory for storing output files")
187    parser.add_argument("--flexpartdir", dest="flexpartdir",
188                        type=none_or_str, default=None,
189                        help="FLEXPART root directory (to find grib2flexpart \
190                        and COMMAND file)\n Normally flex_extract resides in \
191                        the scripts directory of the FLEXPART distribution")
192
193    # this is only used by prepare_flexpart.py to rerun a postprocessing step
194    parser.add_argument("--ppid", dest="ppid",
195                        type=none_or_str, default=None,
196                        help="specify parent process id for \
197                        rerun of prepare_flexpart")
198
199    # arguments for job submission to ECMWF, only needed by submit.py
200    parser.add_argument("--job_template", dest='job_template',
201                        type=none_or_str, default="job.temp",
202                        help="job template file for submission to ECMWF")
203    parser.add_argument("--queue", dest="queue",
204                        type=none_or_str, default=None,
205                        help="queue for submission to ECMWF \
206                        (e.g. ecgate or cca )")
207
208    args = parser.parse_args()
209
210    return args
211
212def read_ecenv(filepath):
213    '''Reads the file into a dictionary where the key values are the parameter
214    names.
215
216    Parameters
217    ----------
218    filepath : str
219        Path to file where the ECMWF environment parameters are stored.
220
221    Return
222    ------
223    envs : dict
224        Contains the environment parameter ecuid, ecgid, gateway
225        and destination for ECMWF server environments.
226    '''
227    envs= {}
228    try:
229        with open(filepath, 'r') as f:
230            for line in f:
231                data = line.strip().split()
232                envs[str(data[0])] = str(data[1])
233    except OSError as e:
234        print('... ERROR CODE: ' + str(e.errno))
235        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
236
237        sys.exit('\n... Error occured while trying to read ECMWF_ENV '
238                     'file: ' + str(filepath))
239
240    return envs
241
242def clean_up(c):
243    '''Remove files from the intermediate directory (inputdir).
244
245    It keeps the final FLEXPART input files if program runs without
246    ECMWF Api and keywords "ectrans" or "ecstorage" are set to "1".
247
248    Parameters
249    ----------
250    c : ControlFile
251        Contains all the parameters of CONTROL file and
252        command line.
253
254    Return
255    ------
256
257    '''
258
259    print("... clean inputdir!")
260
261    cleanlist = glob.glob(os.path.join(c.inputdir, "*"))
262
263    if cleanlist:
264        for element in cleanlist:
265            if c.prefix not in element:
266                silent_remove(element)
267            if c.ecapi is False and (c.ectrans == 1 or c.ecstorage == 1):
268                silent_remove(element)
269        print("... done!")
270    else:
271        print("... nothing to clean!")
272
273    return
274
275
276def my_error(users, message='ERROR'):
277    '''Prints a specified error message which can be passed to the function
278    before exiting the program.
279
280    Parameters
281    ----------
282    user : list of str
283        Contains all email addresses which should be notified.
284        It might also contain just the ecmwf user name which wil trigger
285        mailing to the associated email address for this user.
286
287    message : str, optional
288        Error message. Default value is "ERROR".
289
290    Return
291    ------
292
293    '''
294
295    trace = '\n'.join(traceback.format_stack())
296    full_message = message + '\n\n' + trace
297
298    print(full_message)
299
300    sys.exit(1)
301
302    return
303
304
305def send_mail(users, success_mode, message):
306    '''Prints a specific exit message which can be passed to the function.
307
308    Parameters
309    ----------
310    users : list of str
311        Contains all email addresses which should be notified.
312        It might also contain just the ecmwf user name which wil trigger
313        mailing to the associated email address for this user.
314
315    success_mode : str
316        States the exit mode of the program to put into
317        the mail subject line.
318
319    message : str, optional
320        Message for exiting program. Default value is "Done!".
321
322    Return
323    ------
324
325    '''
326
327    for user in users:
328        if '${USER}' in user:
329            user = os.getenv('USER')
330        try:
331            p = subprocess.Popen(['mail', '-s flex_extract_v7.1 ' +
332                                  success_mode, os.path.expandvars(user)],
333                                 stdin=subprocess.PIPE,
334                                 stdout=subprocess.PIPE,
335                                 stderr=subprocess.PIPE,
336                                 bufsize=1)
337            pout = p.communicate(input=message + '\n\n')[0]
338        except ValueError as e:
339            print('... ERROR: ' + str(e))
340            sys.exit('... Email could not be sent!')
341        except OSError as e:
342            print('... ERROR CODE: ' + str(e.errno))
343            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
344            sys.exit('... Email could not be sent!')
345        else:
346            print('Email sent to ' + os.path.expandvars(user))
347
348    return
349
350
351def normal_exit(message='Done!'):
352    '''Prints a specific exit message which can be passed to the function.
353
354    Parameters
355    ----------
356    message : str, optional
357        Message for exiting program. Default value is "Done!".
358
359    Return
360    ------
361
362    '''
363
364    print(str(message))
365
366    return
367
368
369def product(*args, **kwds):
370    '''Creates combinations of all passed arguments.
371
372    This method combines the single characters of the passed arguments
373    with each other. So that each character of each argument value
374    will be combined with each character of the other arguments as a tuple.
375
376    Note
377    ----
378    This method is taken from an example at the ECMWF wiki website.
379    https://software.ecmwf.int/wiki/display/GRIB/index.py; 2018-03-16
380
381    Example
382    -------
383    product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
384
385    product(range(2), repeat = 3) --> 000 001 010 011 100 101 110 111
386
387    Parameters
388    ----------
389    \*args : list or str
390        Positional arguments (arbitrary number).
391
392    \*\*kwds : dict
393        Contains all the keyword arguments from \*args.
394
395    Return
396    ------
397    prod : :obj:`tuple`
398        Return will be done with "yield". A tuple of combined arguments.
399        See example in description above.
400    '''
401    try:
402        pools = map(tuple, args) * kwds.get('repeat', 1)
403        result = [[]]
404        for pool in pools:
405            result = [x + [y] for x in result for y in pool]
406        for prod in result:
407            yield tuple(prod)
408    except TypeError as e:
409        sys.exit('... PRODUCT GENERATION FAILED!')
410
411    return
412
413
414def silent_remove(filename):
415    '''Remove file if it exists.
416    The function does not fail if the file does not exist.
417
418    Parameters
419    ----------
420    filename : str
421        The name of the file to be removed without notification.
422
423    Return
424    ------
425
426    '''
427    try:
428        os.remove(filename)
429    except OSError as e:
430        # errno.ENOENT  =  no such file or directory
431        if e.errno == errno.ENOENT:
432            pass
433        else:
434            raise  # re-raise exception if a different error occured
435
436    return
437
438
439def init128(filepath):
440    '''Opens and reads the grib file with table 128 information.
441
442    Parameters
443    ----------
444    filepath : str
445        Path to file of ECMWF grib table number 128.
446
447    Return
448    ------
449    table128 : dict
450        Contains the ECMWF grib table 128 information.
451        The key is the parameter number and the value is the
452        short name of the parameter.
453    '''
454    table128 = dict()
455    try:
456        with open(filepath) as f:
457            fdata = f.read().split('\n')
458    except OSError as e:
459        print('... ERROR CODE: ' + str(e.errno))
460        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
461
462        sys.exit('\n... Error occured while trying to read parameter '
463                 'table file: ' + str(filepath))
464    else:
465        for data in fdata:
466            if data[0] != '!':
467                table128[data[0:3]] = data[59:64].strip()
468
469    return table128
470
471
472def to_param_id(pars, table):
473    '''Transform parameter names to parameter ids with ECMWF grib table 128.
474
475    Parameters
476    ----------
477    pars : str
478        Addpar argument from CONTROL file in the format of
479        parameter names instead of ids. The parameter short
480        names are sepearted with "/" and they are passed as
481        one single string.
482
483    table : dict
484        Contains the ECMWF grib table 128 information.
485        The key is the parameter number and the value is the
486        short name of the parameter.
487
488    Return
489    ------
490    ipar : list of int
491        List of addpar parameters from CONTROL file transformed to
492        parameter ids in the format of integer.
493    '''
494    if not pars:
495        return []
496    if not isinstance(pars, str):
497        pars=str(pars)
498
499    cpar = pars.upper().split('/')
500    ipar = []
501    for par in cpar:
502        for k, v in table.iteritems():
503            if par == k or par == v:
504                ipar.append(int(k))
505                break
506        else:
507            print('Warning: par ' + par + ' not found in table 128')
508
509    return ipar
510
511def get_list_as_string(list_obj, concatenate_sign=', '):
512    '''Converts a list of arbitrary content into a single string.
513
514    Parameters
515    ----------
516    list_obj : list of *
517        A list with arbitrary content.
518
519    concatenate_sign : str, optional
520        A string which is used to concatenate the single
521        list elements. Default value is ", ".
522
523    Return
524    ------
525    str_of_list : str
526        The content of the list as a single string.
527    '''
528
529    if not isinstance(list_obj, list):
530        list_obj = list(list_obj)
531    str_of_list = concatenate_sign.join(str(l) for l in list_obj)
532
533    return str_of_list
534
535def make_dir(directory):
536    '''Creates a directory.
537
538    It gives a warning if the directory already exists and skips process.
539    The program stops only if there is another problem.
540
541    Parameters
542    ----------
543    directory : str
544        The path to directory which should be created.
545
546    Return
547    ------
548
549    '''
550    try:
551        os.makedirs(directory)
552    except OSError as e:
553        # errno.EEXIST = directory already exists
554        if e.errno == errno.EEXIST:
555            print('WARNING: Directory {0} already exists!'.format(directory))
556        else:
557            raise # re-raise exception if a different error occured
558
559    return
560
561def put_file_to_ecserver(ecd, filename, target, ecuid, ecgid):
562    '''Uses the ecaccess-file-put command to send a file to the ECMWF servers.
563
564    Note
565    ----
566    The return value is just for testing reasons. It does not have
567    to be used from the calling function since the whole error handling
568    is done in here.
569
570    Parameters
571    ----------
572    ecd : str
573        The path were the file is stored.
574
575    filename : str
576        The name of the file to send to the ECMWF server.
577
578    target : str
579        The target queue where the file should be sent to.
580
581    ecuid : str
582        The user id on ECMWF server.
583
584    ecgid : str
585        The group id on ECMWF server.
586
587    Return
588    ------
589
590    '''
591
592    try:
593        subprocess.check_output(['ecaccess-file-put',
594                                 ecd + '/' + filename,
595                                 target + ':/home/ms/' +
596                                 ecgid + '/' + ecuid +
597                                 '/' + filename],
598                                stderr=subprocess.STDOUT)
599    except subprocess.CalledProcessError as e:
600        print('... ERROR CODE: ' + str(e.returncode))
601        print('... ERROR MESSAGE:\n \t ' + str(e))
602
603        print('\n... Do you have a valid ecaccess certification key?')
604        sys.exit('... ECACCESS-FILE-PUT FAILED!')
605    except OSError as e:
606        print('... ERROR CODE: ' + str(e.errno))
607        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
608
609        print('\n... Most likely the ECACCESS library is not available!')
610        sys.exit('... ECACCESS-FILE-PUT FAILED!')
611
612    return
613
614def submit_job_to_ecserver(target, jobname):
615    '''Uses ecaccess-job-submit command to submit a job to the ECMWF server.
616
617    Note
618    ----
619    The return value is just for testing reasons. It does not have
620    to be used from the calling function since the whole error handling
621    is done in here.
622
623    Parameters
624    ----------
625    target : str
626        The target where the file should be sent to, e.g. the queue.
627
628    jobname : str
629        The name of the jobfile to be submitted to the ECMWF server.
630
631    Return
632    ------
633    job_id : int
634        The id number of the job as a reference at the ecmwf server.
635    '''
636
637    try:
638        job_id = subprocess.check_output(['ecaccess-job-submit', '-queueName',
639                                          target, jobname])
640
641    except subprocess.CalledProcessError as e:
642        print('... ERROR CODE: ' + str(e.returncode))
643        print('... ERROR MESSAGE:\n \t ' + str(e))
644
645        print('\n... Do you have a valid ecaccess certification key?')
646        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
647    except OSError as e:
648        print('... ERROR CODE: ' + str(e.errno))
649        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
650
651        print('\n... Most likely the ECACCESS library is not available!')
652        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
653
654    return job_id
655
656
657def get_informations(filename):
658    '''Gets basic information from an example grib file.
659
660    These information are important for later use and the
661    initialization of numpy arrays for data storing.
662
663    Parameters
664    ----------
665    filename : str
666            Name of the file which will be opened to extract basic information.
667
668    Return
669    ------
670    data : dict
671        Contains basic informations of the ECMWF grib files, e.g.
672        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
673        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
674        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
675        'iDirectionIncrementInDegrees', 'missingValue'
676    '''
677    from eccodes import codes_grib_new_from_file, codes_get, codes_release
678
679    data = {}
680
681    # --- open file ---
682    print("Opening file for getting information data --- %s" % filename)
683    with open(filename) as f:
684        # load first message from file
685        gid = codes_grib_new_from_file(f)
686
687        # information needed from grib message
688        keys = [
689                'Ni',
690                'Nj',
691                'latitudeOfFirstGridPointInDegrees',
692                'longitudeOfFirstGridPointInDegrees',
693                'latitudeOfLastGridPointInDegrees',
694                'longitudeOfLastGridPointInDegrees',
695                'jDirectionIncrementInDegrees',
696                'iDirectionIncrementInDegrees',
697                'missingValue',
698               ]
699
700        print('\nInformations are: ')
701        for key in keys:
702            # Get the value of the key in a grib message.
703            data[key] = codes_get(gid,key)
704            print("%s = %s" % (key,data[key]))
705
706        # Free the memory for the message referred as gribid.
707        codes_release(gid)
708
709    return data
710
711
712def get_dimensions(info, purefc, dtime, index_vals, start_date, end_date):
713    '''This function specifies the correct dimensions for x, y and t.
714
715    Parameters
716    ----------
717    info : dict
718        Contains basic informations of the ECMWF grib files, e.g.
719        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
720        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
721        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
722        'iDirectionIncrementInDegrees', 'missingValue'
723
724    purefc : int
725        Switch for definition of pure forecast mode or not.
726
727    dtime : str
728        Time step in hours.
729
730    index_vals : list of list of str
731        Contains the values from the keys used for a distinct selection
732        of grib messages in processing  the grib files.
733        Content looks like e.g.:
734        index_vals[0]: ('20171106', '20171107', '20171108') ; date
735        index_vals[1]: ('0', '1200', '1800', '600') ; time
736        index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
737
738    start_date : str
739        The start date of the retrieval job.
740
741    end_date : str
742        The end date of the retrieval job.
743
744    Return
745    ------
746    (ix, jy, it) : tuple of int
747        Dimension in x-direction, y-direction and in time.
748    '''
749
750    ix = info['Ni']
751
752    jy = info['Nj']
753
754    if not purefc:
755        it = ((end_date - start_date).days + 1) * 24/int(dtime)
756    else:
757        # #no of step * #no of times * #no of days
758        it = len(index_vals[2]) * len(index_vals[1]) * len(index_vals[0])
759
760    return (ix, jy, it)
761
762
763def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'):
764    '''Executes a command line instruction via a subprocess.
765
766    Error handling is done if an error occures.
767
768    Parameters
769    ----------
770    cmd_list : list of str
771        A list of the components for the command line execution. Each
772        list entry is a single part of the command which is seperated from
773        the rest by a blank space.
774        E.g. ['mv', file1, file2]
775
776    Return
777    ------
778    error_msg : str, optional
779        The possible error message if the subprocess failed.
780        By default it will just tell "SUBPROCESS FAILED!".
781    '''
782
783    try:
784        subprocess.check_call(cmd_list)
785    except subprocess.CalledProcessError as e:
786        print('... ERROR CODE: ' + str(e.returncode))
787        print('... ERROR MESSAGE:\n \t ' + str(e))
788
789        sys.exit('... ' + error_msg)
790    except OSError as e:
791        print('... ERROR CODE: ' + str(e.errno))
792        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
793
794        sys.exit('... ' + error_msg)
795
796    return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG