source: flex_extract.git/source/python/submit.py @ f2616a3

ctbtodev
Last change on this file since f2616a3 was f2616a3, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

implemented a job split with a new parameter 'job_chunk' so that huge time periods can automatically be splitted

  • Property mode set to 100755
File size: 8.2 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Fouilloux (University of Oslo)
5#
6# @Date: October 2014
7#
8# @Change History:
9#
10#    November 2015 - Leopold Haimberger (University of Vienna):
11#        - job submission on ecgate and cca
12#        - job templates suitable for twice daily operational dissemination
13#
14#    February 2018 - Anne Philipp (University of Vienna):
15#        - applied PEP8 style guide
16#        - added documentation
17#        - minor changes in programming style (for consistence)
18#        - changed path names to variables from config file
19#        - added option for writing mars requests to extra file
20#          additionally,as option without submitting the mars jobs
21#
22# @License:
23#    (C) Copyright 2014-2018.
24#
25#    This software is licensed under the terms of the Apache Licence Version 2.0
26#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
27#
28# @Program Functionality:
29#    This program is the main program of flex_extract and controls the
30#    program flow.
31#    If it is supposed to work locally then it works through the necessary
32#    functions get_mars_data and prepareFlexpart. Otherwise it prepares
33#    a shell job script which will do the necessary work on the
34#    ECMWF server and is submitted via ecaccess-job-submit.
35#
36# @Program Content:
37#    - main
38#    - submit
39#
40#*******************************************************************************
41
42# ------------------------------------------------------------------------------
43# MODULES
44# ------------------------------------------------------------------------------
45import os
46import sys
47import subprocess
48import inspect
49import collections
50from datetime import datetime, timedelta
51
52# software specific classes and modules from flex_extract
53import _config
54from mods.tools import (normal_exit, get_cmdline_args,
55                        submit_job_to_ecserver, read_ecenv)
56from mods.get_mars_data import get_mars_data
57from mods.prepare_flexpart import prepare_flexpart
58from classes.ControlFile import ControlFile
59
60# ------------------------------------------------------------------------------
61# FUNCTIONS
62# ------------------------------------------------------------------------------
63
64def main():
65    '''Get the arguments from script call and from CONTROL file.
66    Decides from the argument "queue" if the local version
67    is done "queue=None" or the gateway version with "queue=ecgate"
68    or "queue=cca".
69
70    Parameters
71    ----------
72
73    Return
74    ------
75
76    '''
77
78    args = get_cmdline_args()
79    c = ControlFile(args.controlfile)
80
81    env_parameter = read_ecenv(_config.PATH_ECMWF_ENV)
82    c.assign_args_to_control(args)
83    c.assign_envs_to_control(env_parameter)
84    c.check_conditions(args.queue)
85
86    # on local side
87    # on ECMWF server this would also be the local side
88    called_from_dir = os.getcwd()
89    if args.queue is None:
90        if c.inputdir[0] != '/':
91            c.inputdir = os.path.join(called_from_dir, c.inputdir)
92        if c.outputdir[0] != '/':
93            c.outputdir = os.path.join(called_from_dir, c.outputdir)
94        get_mars_data(c)
95        if c.request == 0 or c.request == 2:
96            prepare_flexpart(args.ppid, c)
97            exit_message = 'FLEX_EXTRACT IS DONE!'
98        else:
99            exit_message = 'PRINTING MARS_REQUESTS DONE!'
100    # send files to ECMWF server
101    else:
102        submit(args.job_template, c, args.queue)
103        exit_message = 'FLEX_EXTRACT JOB SCRIPT IS SUBMITED!'
104
105    normal_exit(exit_message)
106
107    return
108
109def submit(jtemplate, c, queue):
110    '''Prepares the job script and submits it to the specified queue.
111
112    Parameters
113    ----------
114    jtemplate : :obj:`string`
115        Job template file from sub-directory "_templates" for
116        submission to ECMWF. It contains all necessary
117        module and variable settings for the ECMWF environment as well as
118        the job call and mail report instructions.
119        Default is "job.temp".
120
121    c : :obj:`ControlFile`
122        Contains all the parameters of CONTROL file and
123        command line.
124
125    queue : :obj:`string`
126        Name of queue for submission to ECMWF (e.g. ecgate or cca )
127
128    Return
129    ------
130
131    '''
132
133    if not c.basetime:
134    # --------- create on demand job script ------------------------------------
135        if c.purefc:
136            print('---- Pure forecast mode! ----')
137        else:
138            print('---- On-demand mode! ----')
139
140        job_file = os.path.join(_config.PATH_JOBSCRIPTS,
141                                jtemplate[:-5] + '.ksh')
142
143        # divide time periode into specified number of job chunks
144        # to have multiple job scripts
145        if c.job_chunk:
146            start = datetime.strptime(c.start_date, '%Y%m%d')
147            end = datetime.strptime(c.end_date, '%Y%m%d')
148            chunk = timedelta(days=c.job_chunk)
149
150            while start <= end:
151                if (start + chunk) <= end:
152                    c.end_date = (start + chunk).strftime("%Y%m%d")
153                else:
154                    c.end_date = end.strftime("%Y%m%d")
155                print c.start_date +' bis ' + c.end_date
156
157                clist = c.to_list()
158
159                mk_jobscript(jtemplate, job_file, clist)
160
161                job_id = submit_job_to_ecserver(queue, job_file)
162                print('The job id is: ' + str(job_id.strip()))
163
164                start = start + chunk
165                c.start_date = start.strftime("%Y%m%d")
166        # submit a single job script
167        else:
168            clist = c.to_list()
169
170            mk_jobscript(jtemplate, job_file, clist)
171
172            job_id = submit_job_to_ecserver(queue, job_file)
173            print('The job id is: ' + str(job_id.strip()))
174
175    else:
176    # --------- create operational job script ----------------------------------
177        print('---- Operational mode! ----')
178
179        job_file = os.path.join(_config.PATH_JOBSCRIPTS,
180                                jtemplate[:-5] + 'oper.ksh')
181
182        c.start_date = '${MSJ_YEAR}${MSJ_MONTH}${MSJ_DAY}'
183        c.end_date = '${MSJ_YEAR}${MSJ_MONTH}${MSJ_DAY}'
184        c.base_time = '${MSJ_BASETIME}'
185        if c.maxstep > 24:
186            c.time = '${MSJ_BASETIME} {MSJ_BASETIME}'
187
188        clist = c.to_list()
189
190        mk_jobscript(jtemplate, job_file, clist)
191
192        job_id = submit_job_to_ecserver(queue, job_file)
193        print('The job id is: ' + str(job_id.strip()))
194
195
196    print('You should get an email per job with subject flex.hostname.pid')
197
198    return
199
200def mk_jobscript(jtemplate, job_file, clist):
201    '''Creates the job script from template.
202
203    Parameters
204    ----------
205    jtemplate : :obj:`string`
206        Job template file from sub-directory "_templates" for
207        submission to ECMWF. It contains all necessary
208        module and variable settings for the ECMWF environment as well as
209        the job call and mail report instructions.
210        Default is "job.temp".
211
212    job_file : :obj:`string`
213        Path to the job script file.
214
215    clist : :obj:`list` of :obj:`string`
216        Contains all necessary parameters for ECMWF CONTROL file.
217
218    Return
219    ------
220
221    '''
222    from genshi.template.text import NewTextTemplate
223    from genshi.template import  TemplateLoader
224    from genshi.template.eval import UndefinedError
225
226    # load template and insert control content as list
227    try:
228        loader = TemplateLoader(_config.PATH_TEMPLATES, auto_reload=False)
229        control_template = loader.load(jtemplate,
230                                       cls=NewTextTemplate)
231
232        stream = control_template.generate(control_content=clist)
233    except UndefinedError as e:
234        print('... ERROR ' + str(e))
235
236        sys.exit('\n... error occured while trying to generate jobscript')
237    except OSError as e:
238        print('... ERROR CODE: ' + str(e.errno))
239        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
240
241        sys.exit('\n... error occured while trying to generate jobscript')
242
243    # create jobscript file
244    try:
245        with open(job_file, 'w') as f:
246            f.write(stream.render('text'))
247    except OSError as e:
248        print('... ERROR CODE: ' + str(e.errno))
249        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
250
251        sys.exit('\n... error occured while trying to write ' + job_file)
252
253    return
254
255
256if __name__ == "__main__":
257    main()
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG