pygenie/jobs/presto.py (60 lines of code) (raw):
"""
genie.jobs.presto
This module implements creating Presto jobs.
Example:
>>> from genie.jobs import PrestoJob
>>> job = PrestoJob() \\
... .job_id('my-job-id-1234') \\
... .job_name('my-job') \\
... .script('select * from db.table') \\
... .headers() \\
... .option('debug') \\
... .option('source', 'genie') \\
... .session('hive.max_initial_split_size', '4MB')
>>> running_job = job.execute()
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import os
from ..utils import unicodify
from .core import GenieJob
from .utils import (add_to_repr,
arg_string,
is_file)
logger = logging.getLogger('com.netflix.genie.jobs.presto')
class PrestoJob(GenieJob):
"""
Presto job.
Example:
>>> job = PrestoJob() \\
... .job_name('presto example') \\
... .script("SELECT * FROM mydb.mytable") \\
... .headers() \\
... .option('debug') \\
... .session('hive.max_initial_split_size', '4MB')
"""
DEFAULT_SCRIPT_NAME = 'script.presto'
def __init__(self, conf=None):
super(PrestoJob, self).__init__(conf=conf)
self._script = None
@property
def cmd_args(self):
"""
The constructed command line arguments using the job's definition. If the
command line arguments are set explicitly (by calling
:py:meth:`command_arguments`) this will be the same.
"""
if self._command_arguments is not None:
return self._command_arguments
filename = PrestoJob.DEFAULT_SCRIPT_NAME
if is_file(self._script):
filename = os.path.basename(self._script)
self._add_dependency(self._script)
elif self._script is not None:
if not self._script.strip().endswith(';'):
#\n ensures if the script ends with a comment ; still gets applied
self._script = '{}\n;'.format(self._script)
self._add_dependency({'name': filename, 'data': self._script})
options_str = ' '.join([
'--{name}{space}{value}' \
.format(name=k,
value=v if v is not None else '',
space=' ' if v is not None else '') \
for k, v in self._command_options.get('--', {}).items()])
sessions_str = ' '.join([
'--session {}={}'.format(k, v) \
for k, v in self._command_options.get('--session', {}).items()])
return '{sessions} {options} -f {filename} {post_cmd_args}' \
.format(sessions=sessions_str,
options=options_str,
filename=filename,
post_cmd_args=' '.join(self._post_cmd_args)) \
.strip()
def headers(self):
"""
Sets the option to prepend headers in the output.
--output-format CSV_HEADER
Example:
>>> job = PrestoJob() \\
... .headers()
Returns:
:py:class:`PrestoJob`: self
"""
self.tags('headers')
return self.option('output-format', 'CSV_HEADER')
@unicodify
@add_to_repr('append')
def option(self, name, value=None):
"""
Sets an option for the job.
Using the name and value passed in, the following will be constructed for
the command-line when executing:
'--name value'
Example:
>>> # presto --output-format CSV_HEADER --debug
>>> job = PrestoJob() \\
... .option('output-format', 'CSV_HEADER') \\
... .option('debug')
Args:
name (str): The option name.
value (str, optional): The option value (this is optional since some
options do not take values like '--debug').
Returns:
:py:class:`PrestoJob`: self
"""
self._set_command_option('--', name.lstrip('-'), value)
return self
def query(self, script):
"""Alias for :py:meth:`PrestoJob.script`"""
return self.script(script)
@unicodify
@arg_string
@add_to_repr('overwrite')
def script(self, _script):
"""
Sets the script to run for the job. This can be a path to a script file or
the code to execute.
Example:
>>> job = PrestoJob() \\
... .script("SELECT * FROM mydb.mytable")
>>> job = PrestoJob() \\
... .script("/Users/jdoe/my_query.presto")
Args:
script (str): A path to a script file or the code to run.
Returns:
:py:class:`PrestoJob`: self
"""
@unicodify
@add_to_repr('append')
def session(self, name, value):
"""
Sets a session property for the job.
Using the name and value passed in, the following will be constructed for
the command-line when executing:
'--session name=value'
Example:
>>> # presto --session hive.max_initial_split_size=4MB
>>> job = PrestoJob() \\
... .session('hive.max_initial_split_size', '4MB')
Args:
name (str): The session property name.
value (str): The session property value.
Returns:
:py:class:`PrestoJob`: self
"""
self._set_command_option('--session', name, value)
return self