sdk/build/lib/faropt/__init__.py [12:501]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logging.basicConfig(level=logging.INFO)


class FarOpt(object):
    """FarOpt class used to initialize, configure and submit jobs to the back end
    :param framework: Currently only ortools, TO DO is to extend to other frameworks. Note that other frameworks like pyomo, DEAP, inspyred and pulp are supported
    :type framework: string, optional
    :param stackname: Points to the backend CDK stack that needs to be launched separately. Default name is faropt, but you many need to pass in another name while testing
    :type stackname: string, optional 
    """
    def __init__(self, framework = 'ortools', stackname = 'faropt'):
        """Constructor method: Gets buckets and tables associated with the already launched stack
        """
        # Check if backend stack is launched
        cf = boto3.client('cloudformation')
        try:
            response = cf.describe_stacks(StackName=stackname)
            if response['Stacks'][0]['StackStatus'] in ['CREATE_COMPLETE','UPDATE_COMPLETE']:
                logging.info('FarOpt backend is ready!')
                self.ready = True
                self.stackname = stackname
                
                outputs = response['Stacks'][0]['Outputs']
                for output in outputs:
                    
                    if output['OutputKey']=='s3asyncoutput':
                        self.asyncbucket = output['OutputValue']
                        logging.info('Async Bucket: ' + self.asyncbucket)
                    
                    if output['OutputKey']=='s3output':
                        self.bucket = output['OutputValue']
                        logging.info('Bucket: ' + self.bucket)
                    
                    if output['OutputKey']=='recipetable':
                        self.recipetable = output['OutputValue']
                        logging.info('Recipe Table: ' + self.recipetable)
                    
                    if output['OutputKey']=='jobtable':
                        self.jobtable = output['OutputValue']
                        logging.info('Job table: ' + self.jobtable)
                    
                    if output['OutputKey']=='lambdaopt':
                        self.lambdaopt = output['OutputValue']
                        logging.info('Lambda Opt function: ' + self.lambdaopt)
                        
                        
                
                self.configured = False
                self.submitted = False
        except Exception as e:
            self.ready = False
            logging.error(e)
        
        self.allowed_frameworks = ['ortools']
        
        if framework not in self.allowed_frameworks:
            logging.warning("Only ortools is supported for now. You entered "+framework)
            #exit(0)
        else:
            self.framework = framework
    
    def configure (self,source_dir):
        """Zips up a local folder containing your main.py code, and any other subfolders/files required to run your project. Make note of the output structure printed to see if all files that you need are printed.ArithmeticError
        
        :param source_dir: path to your source, such as './home/src/'
        :type source_dir: string
        """
        logging.info("Listing project files ...")
        file_name = "source.zip"
        zf = zipfile.ZipFile("source.zip", "w")
    
        for dirname, subdirs, files in os.walk(source_dir):
            #zf.write(dirname)
            print(dirname, subdirs, files)
            for filename in files:
                print(filename)
                zf.write(os.path.join(dirname,filename),os.path.relpath(dirname+'/'+filename,source_dir))
        zf.close()
        
        self.path_file_name = os.path.abspath(file_name)
        self.file_name = file_name
            
        self.configured = True
        logging.info("Configured job!")
        
    def add_recipe(self,recipe_name,maintainer='Faropt SDK user'):
        """Adds a recipe referencing the job that you submitted (see self object params). 
        
        :param recipe_name: Friendly name for your recipe
        :type recipe_name: string
        :param maintainer: Recipe author/maintainer
        :type source_dir: string, optional. Defaults to 'Faropt SDK User'
        """
        if self.configured:
            self.ddb_resource = boto3.resource('dynamodb')
            self.ddb_table = self.ddb_resource.Table(self.recipetable)
            UID = str(uuid4())
            
            if self.micro:
                
                job = {
                    'recipeid': UID,
                    'description':recipe_name,
                    'bucket': self.asyncbucket,
                    'path': self.stagedkey+'/source.zip',
                    'maintainer':maintainer,
                    'code':'see path'
                }
            
            
            else:
                job = {
                    'recipeid': UID,
                    'description':recipe_name,
                    'bucket': self.bucket,
                    'path': self.jobname+'/'+self.file_name,
                    'maintainer':maintainer,
                    'code':'see path'
                }
                
            self.ddb_table.put_item(Item=job)
            
        else:
            logging.error('Please configure the job first!')
            
    def run_s3_job(self, bucket, key, micro=False):
        """Runs job based on a source file in bucket/key. For example, place a source.zip in s3://bucket/key/source.zip and submit a job
        
        :param bucket: Bucket name
        :type bucket: string
        :param key: path/key on S3 that looks like path/to/s3/key/source.zip inside the bucket
        :type key: string
        :param micro: Submit a micro job. 
        :type micro: bool
        """
        
        logging.info("Downloading source...")
        s3 = boto3.client('s3')
        with open('/tmp/source.zip', 'wb') as f:
            s3.download_fileobj(bucket, key, f)
            
        self.path_file_name = os.path.abspath('/tmp/source.zip')
        self.file_name = 'source.zip'
        logging.info("Configured job!")
        self.configured = True
        if micro:
            self.stage()
            
        self.submit(micro = micro)
        
    def get_recipe_id_from_description(self, description):
        """Returns UUID of a recipe based on friendly description/ recipe name
        
        :param description: friendly description/ recipe name
        :type description: string
        :return: First UUID that matches the description of the recipe
        :rtype: uuid4()
        """
        self.list_recipes(verbose=False)
        for r in self.recipes:
            if r['description'] == description:
                return r['recipeid']
        
        
    def run_recipe(self, recipe_id, micro=False):
        """Runs already registered recipe
        
        :param recipe_id: UUID of recipe
        :type recipe_id: string
        :param micro: Submit as a micro job
        :type recipe_id: bool
        """
        try:
            self.ddb_resource = boto3.resource('dynamodb')
            self.ddb_table = self.ddb_resource.Table(self.recipetable)

            response = self.ddb_table.get_item(Key={'recipeid': recipe_id})
            path = response['Item']['path']
            bucket = response['Item']['bucket']
            
            logging.info("Downloading recipe...")
            s3 = boto3.client('s3')
            with open('/tmp/source.zip', 'wb') as f:
                s3.download_fileobj(bucket, path, f)
            
            self.path_file_name = os.path.abspath('/tmp/source.zip')
            self.file_name = 'source.zip'
            logging.info("Configured job!")
            self.configured = True
            self.submit(micro=micro)
        
        except ClientError as e:
                logging.error(e)
                return False
        
        
    def wait(self):
        """Polls for the primary status of the container task that runs this job. You should see PROVISIONING > PENDING > RUNNING > STOPPED > JOB COMPLETED

        :return: primary status of the job that was submitted
        :rtype: list
        """
        while self.primary_status()!='STOPPED':
            print(self.primary_status())
            time.sleep(3)
            
        logging.info("JOB COMPLETED!")
        
        
    def stage(self):
        """Uploads the source.zip but does not submit to fargate. Useful when you want to run later
        """
        logging.info("Staging job")
        s3_client = boto3.client('s3')
        try:
            eventid = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')+'-'+str(uuid4())
            response = s3_client.upload_file(self.path_file_name, self.asyncbucket,'staged/'+eventid+'/source.zip')
            
            logging.info("Staged job! id: " + str(eventid))

            self.jobname = eventid
            self.stagedkey = 'staged/'+eventid
    
            logging.info(f"Look for s3://{self.asyncbucket}/staged/{self.jobname}/source.zip")
        except:
            logging.error("Could not stage job")
        
        
    def submit(self, micro=False):
        """Runs job defined in object params. Creates a new job ID to track and sets submitted to True. Check self.jobname to reference the job that was submitted. View self.logs() once the job has completed
        
        :param micro: Submit a micro job. By submitting a micro job, you are restricted to using ortools, pyomo and deap libraries for jobs that last up to 5 minutes
        :type micro: bool
        """
        if self.configured :
            logging.info("Submitting job")
            s3_client = boto3.client('s3')
            self.ddb_resource = boto3.resource('dynamodb')
            self.ddb_table = self.ddb_resource.Table(self.jobtable)
            
            if not micro:
                try:
                    self.micro = False
                    eventid = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')+'-'+str(uuid4())
                    response = s3_client.upload_file(self.path_file_name, self.bucket,eventid+'/source.zip')
                    
                    logging.info("Submitted job! id: " + str(eventid))
                    
                    self.jobname = eventid
                    
                    # Add job to dynamoDB
                    job = {
                    'jobid': self.jobname,
                    'bucket': self.bucket,
                    'path': eventid+'/source.zip'}
                    
                    self.ddb_table.put_item(Item=job)
                except ClientError as e:
                    logging.error(e)
                    return False
            
            else:
                try:
                    self.stage()
                    self.micro = True
                    logging.info("By submitting a micro job, you are restricted to using ortools, pyomo and deap libraries for jobs that last up to 5 minutes")
                    lamclient = boto3.client("lambda")
                    response = lamclient.invoke(
                        FunctionName=self.lambdaopt,
                        InvocationType='RequestResponse',
                        LogType='Tail',
                        Payload=json.dumps({'s3bucket':self.asyncbucket,'s3key':self.stagedkey}).encode())
                        
                    # Add job to dynamoDB
                    job = {
                    'jobid': self.stagedkey,
                    'bucket': self.asyncbucket,
                    'path': self.stagedkey+'/source.zip'}
                    
                    self.ddb_table.put_item(Item=job)
                    
                    base64_message = response['LogResult']
                    print(base64.b64decode(base64_message.encode()).decode())
                    self.micrologs = base64.b64decode(base64_message.encode()).decode()
                    
                except ClientError as e:
                    logging.error(e)
                    return False
            
        else:
            logging.error('Please configure the job first!')
            
        self.submitted = True
    
    def list_recipes(self, limit=10, verbose=True):
        """Returns list of recipes registered
        
        :param limit: Number of recipes to return, Defaults to 10
        :type limit: int, optional
        :param verbose: Verbose print of the recipe table, Defaults to True
        :type verbose: bool, optional
        :return: Recipe table scan (raw) results
        :rtype: boto3 response
        """
        ddb_client = boto3.client('dynamodb')
        
        response = ddb_client.scan(
            TableName=self.recipetable,
            Limit=limit)
        
        allrecipes = []
        for job in response['Items']:
            allrecipes.append({'recipeid':job['recipeid']['S'], 'bucket':job['bucket']['S'], 'path':job['path']['S'], 'description':job['description']['S'], 'maintainer':job['maintainer']['S'], 'code':job['code']['S']})
            if verbose:
                print(f"recipeid:{job['recipeid']['S']} | bucket:{job['bucket']['S']} | path:{job['path']['S']} | description:{job['description']['S']} | maintainer:{job['maintainer']['S']}")
        self.recipes = allrecipes
        
        return response
                  
    def list_jobs(self, limit=10, verbose=True):
        """Returns list of jobs submitted
        
        :param limit: Number of jobs to return, Defaults to 10
        :type limit: int, optional
        :param verbose: Verbose print of the job table, Defaults to True
        :type verbose: bool, optional
        :return: job table scan (raw) results
        :rtype: boto3 response
        """
        ddb_client = boto3.client('dynamodb')
        
        response = ddb_client.scan(
            TableName=self.jobtable,
            Limit=limit)
        
        alljobs = []
        for job in response['Items']:
            alljobs.append({'jobid':job['jobid']['S'], 'bucket':job['bucket']['S'], 'path':job['path']['S']})
            if verbose:
                print(f"jobid:{job['jobid']['S']} | bucket:{job['bucket']['S']} | path:{job['path']['S']}")
        
        self.jobs = alljobs
        
        return response

    
    def stream_logs(self,start_time=0, skip=0):
        """Internal, use self.logs() instead of streaming
        """
        #def log_stream(client, log_group, stream_name, start_time=0, skip=0): from SM stream logs

        next_token = None
    
        event_count = 1
        while event_count > 0:
            if next_token is not None:
                token_arg = {"nextToken": next_token}
            else:
                token_arg = {}
            
            taskarn = self.status()['tasks'][0]['taskArn'].split('/')[-1]
            client = boto3.client('logs')
                        
            response = client.get_log_events(
                logGroupName='faroptlogGroup',
                logStreamName='faroptlogs/FarOptImage/' + taskarn,
                startTime=start_time,
                startFromHead=True,
                **token_arg
            )
            
            next_token = response["nextForwardToken"]
            events = response["events"]
            event_count = len(events)
            if event_count > skip:
                events = events[skip:]
                skip = 0
            else:
                skip = skip - event_count
                events = []
            for ev in events:
                yield ev
     
    
    def stop(self):
        """Stops a submitted task
        """
        # if self.primary_status() in ['STOPPED','DEPROVISIONING','RUNNING']:
        if self.submitted and self.micro==False:
            client = boto3.client('ecs')
            taskarn = self.status()['tasks'][0]['taskArn'].split('/')[-1]
            response = client.stop_task(
                cluster='FarOptCluster',
                task=taskarn,
                reason='User stopped task'
                )
        else:
            logging.info("Please ensure you have submitted a non-micro job")
        # else:
        #     logging.info('Job status: ' + self.primary_status())
            
    
    def printlogs(self,response):

        for ev in response['events']:
            print(str(ev['timestamp']) + ' | ' + ev['message'])
    
    def logs(self):
        """Prints logs of a submitted job. 
        """
        if self.primary_status() in ['STOPPED','DEPROVISIONING','RUNNING']:
            if self.micro == False:
                taskarn = self.status()['tasks'][0]['taskArn'].split('/')[-1]
                client = boto3.client('logs')
                response = client.get_log_events(
                            logGroupName='faroptlogGroup',
                            logStreamName='faroptlogs/FarOptImage/' + taskarn)
                
                self.printlogs(response)
            
            else:
                print(self.micrologs)

        else:
            print('Please wait for the task to start running | ' + self.primary_status())
                
                
    def primary_status(self):
        """Returns the last status of the submitted job; Can be PROVISIONING > PENDING > RUNNING > STOPPED > JOB COMPLETED 
        
        :return: primary staus
        :rtype: string
        """
        if self.micro == False:
            return self.status()['tasks'][0]['lastStatus']
        else:
            return 'STOPPED'
        
    def status(self):
        """Returns the full status of the submitted job; used in primary_status, which should be enough for most use cases
        """
        if self.submitted and self.micro==False:

            client = boto3.client('ecs')
            response1 = client.list_tasks(
                        cluster='FarOptCluster',
                        startedBy=self.jobname)
            
            running_tasks = response1['taskArns']
            
            if running_tasks == []:
                logging.info("No running tasks. Checking completed tasks...")
                #check if stopped tasks exist
                response1 = client.list_tasks(
                        cluster='FarOptCluster',
                        startedBy=self.jobname,
                        desiredStatus='STOPPED')
                        
                stopped_tasks = response1['taskArns']
                        
            response = client.describe_tasks(cluster='FarOptCluster',
            tasks=[response1['taskArns'][0]])
            
            return response
                        
        else:
            logging.error("Please submit a non-micro job first!")
                      
    def get_metric_data(self,metric_name):
        """Returns raw metric data that was submitted from the backend. To use this, do from utils import * in your main.py, and then use log_metric like this, for e.g: log_metric('total_distance',total_distance)
        
        :return: response from cloudwatch 
        :rtype: json string
        """

        cloudwatch = boto3.resource('cloudwatch')
        metric = cloudwatch.Metric('FarOpt',metric_name)

        response = metric.get_statistics(
        Dimensions=[
            {
                'Name': 'jobid',
                'Value': self.jobname
            },
        ],
        StartTime=datetime.now() - timedelta(minutes=24),
        EndTime=datetime.now(),
        Period=1,Statistics=['Average','Minimum','Maximum'])

        return response
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



sdk/faropt/__init__.py [12:501]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logging.basicConfig(level=logging.INFO)


class FarOpt(object):
    """FarOpt class used to initialize, configure and submit jobs to the back end
    :param framework: Currently only ortools, TO DO is to extend to other frameworks. Note that other frameworks like pyomo, DEAP, inspyred and pulp are supported
    :type framework: string, optional
    :param stackname: Points to the backend CDK stack that needs to be launched separately. Default name is faropt, but you many need to pass in another name while testing
    :type stackname: string, optional 
    """
    def __init__(self, framework = 'ortools', stackname = 'faropt'):
        """Constructor method: Gets buckets and tables associated with the already launched stack
        """
        # Check if backend stack is launched
        cf = boto3.client('cloudformation')
        try:
            response = cf.describe_stacks(StackName=stackname)
            if response['Stacks'][0]['StackStatus'] in ['CREATE_COMPLETE','UPDATE_COMPLETE']:
                logging.info('FarOpt backend is ready!')
                self.ready = True
                self.stackname = stackname
                
                outputs = response['Stacks'][0]['Outputs']
                for output in outputs:
                    
                    if output['OutputKey']=='s3asyncoutput':
                        self.asyncbucket = output['OutputValue']
                        logging.info('Async Bucket: ' + self.asyncbucket)
                    
                    if output['OutputKey']=='s3output':
                        self.bucket = output['OutputValue']
                        logging.info('Bucket: ' + self.bucket)
                    
                    if output['OutputKey']=='recipetable':
                        self.recipetable = output['OutputValue']
                        logging.info('Recipe Table: ' + self.recipetable)
                    
                    if output['OutputKey']=='jobtable':
                        self.jobtable = output['OutputValue']
                        logging.info('Job table: ' + self.jobtable)
                    
                    if output['OutputKey']=='lambdaopt':
                        self.lambdaopt = output['OutputValue']
                        logging.info('Lambda Opt function: ' + self.lambdaopt)
                        
                        
                
                self.configured = False
                self.submitted = False
        except Exception as e:
            self.ready = False
            logging.error(e)
        
        self.allowed_frameworks = ['ortools']
        
        if framework not in self.allowed_frameworks:
            logging.warning("Only ortools is supported for now. You entered "+framework)
            #exit(0)
        else:
            self.framework = framework
    
    def configure (self,source_dir):
        """Zips up a local folder containing your main.py code, and any other subfolders/files required to run your project. Make note of the output structure printed to see if all files that you need are printed.ArithmeticError
        
        :param source_dir: path to your source, such as './home/src/'
        :type source_dir: string
        """
        logging.info("Listing project files ...")
        file_name = "source.zip"
        zf = zipfile.ZipFile("source.zip", "w")
    
        for dirname, subdirs, files in os.walk(source_dir):
            #zf.write(dirname)
            print(dirname, subdirs, files)
            for filename in files:
                print(filename)
                zf.write(os.path.join(dirname,filename),os.path.relpath(dirname+'/'+filename,source_dir))
        zf.close()
        
        self.path_file_name = os.path.abspath(file_name)
        self.file_name = file_name
            
        self.configured = True
        logging.info("Configured job!")
        
    def add_recipe(self,recipe_name,maintainer='Faropt SDK user'):
        """Adds a recipe referencing the job that you submitted (see self object params). 
        
        :param recipe_name: Friendly name for your recipe
        :type recipe_name: string
        :param maintainer: Recipe author/maintainer
        :type source_dir: string, optional. Defaults to 'Faropt SDK User'
        """
        if self.configured:
            self.ddb_resource = boto3.resource('dynamodb')
            self.ddb_table = self.ddb_resource.Table(self.recipetable)
            UID = str(uuid4())
            
            if self.micro:
                
                job = {
                    'recipeid': UID,
                    'description':recipe_name,
                    'bucket': self.asyncbucket,
                    'path': self.stagedkey+'/source.zip',
                    'maintainer':maintainer,
                    'code':'see path'
                }
            
            
            else:
                job = {
                    'recipeid': UID,
                    'description':recipe_name,
                    'bucket': self.bucket,
                    'path': self.jobname+'/'+self.file_name,
                    'maintainer':maintainer,
                    'code':'see path'
                }
                
            self.ddb_table.put_item(Item=job)
            
        else:
            logging.error('Please configure the job first!')
            
    def run_s3_job(self, bucket, key, micro=False):
        """Runs job based on a source file in bucket/key. For example, place a source.zip in s3://bucket/key/source.zip and submit a job
        
        :param bucket: Bucket name
        :type bucket: string
        :param key: path/key on S3 that looks like path/to/s3/key/source.zip inside the bucket
        :type key: string
        :param micro: Submit a micro job. 
        :type micro: bool
        """
        
        logging.info("Downloading source...")
        s3 = boto3.client('s3')
        with open('/tmp/source.zip', 'wb') as f:
            s3.download_fileobj(bucket, key, f)
            
        self.path_file_name = os.path.abspath('/tmp/source.zip')
        self.file_name = 'source.zip'
        logging.info("Configured job!")
        self.configured = True
        if micro:
            self.stage()
            
        self.submit(micro = micro)
        
    def get_recipe_id_from_description(self, description):
        """Returns UUID of a recipe based on friendly description/ recipe name
        
        :param description: friendly description/ recipe name
        :type description: string
        :return: First UUID that matches the description of the recipe
        :rtype: uuid4()
        """
        self.list_recipes(verbose=False)
        for r in self.recipes:
            if r['description'] == description:
                return r['recipeid']
        
        
    def run_recipe(self, recipe_id, micro=False):
        """Runs already registered recipe
        
        :param recipe_id: UUID of recipe
        :type recipe_id: string
        :param micro: Submit as a micro job
        :type recipe_id: bool
        """
        try:
            self.ddb_resource = boto3.resource('dynamodb')
            self.ddb_table = self.ddb_resource.Table(self.recipetable)

            response = self.ddb_table.get_item(Key={'recipeid': recipe_id})
            path = response['Item']['path']
            bucket = response['Item']['bucket']
            
            logging.info("Downloading recipe...")
            s3 = boto3.client('s3')
            with open('/tmp/source.zip', 'wb') as f:
                s3.download_fileobj(bucket, path, f)
            
            self.path_file_name = os.path.abspath('/tmp/source.zip')
            self.file_name = 'source.zip'
            logging.info("Configured job!")
            self.configured = True
            self.submit(micro=micro)
        
        except ClientError as e:
                logging.error(e)
                return False
        
        
    def wait(self):
        """Polls for the primary status of the container task that runs this job. You should see PROVISIONING > PENDING > RUNNING > STOPPED > JOB COMPLETED

        :return: primary status of the job that was submitted
        :rtype: list
        """
        while self.primary_status()!='STOPPED':
            print(self.primary_status())
            time.sleep(3)
            
        logging.info("JOB COMPLETED!")
        
        
    def stage(self):
        """Uploads the source.zip but does not submit to fargate. Useful when you want to run later
        """
        logging.info("Staging job")
        s3_client = boto3.client('s3')
        try:
            eventid = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')+'-'+str(uuid4())
            response = s3_client.upload_file(self.path_file_name, self.asyncbucket,'staged/'+eventid+'/source.zip')
            
            logging.info("Staged job! id: " + str(eventid))

            self.jobname = eventid
            self.stagedkey = 'staged/'+eventid
    
            logging.info(f"Look for s3://{self.asyncbucket}/staged/{self.jobname}/source.zip")
        except:
            logging.error("Could not stage job")
        
        
    def submit(self, micro=False):
        """Runs job defined in object params. Creates a new job ID to track and sets submitted to True. Check self.jobname to reference the job that was submitted. View self.logs() once the job has completed
        
        :param micro: Submit a micro job. By submitting a micro job, you are restricted to using ortools, pyomo and deap libraries for jobs that last up to 5 minutes
        :type micro: bool
        """
        if self.configured :
            logging.info("Submitting job")
            s3_client = boto3.client('s3')
            self.ddb_resource = boto3.resource('dynamodb')
            self.ddb_table = self.ddb_resource.Table(self.jobtable)
            
            if not micro:
                try:
                    self.micro = False
                    eventid = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')+'-'+str(uuid4())
                    response = s3_client.upload_file(self.path_file_name, self.bucket,eventid+'/source.zip')
                    
                    logging.info("Submitted job! id: " + str(eventid))
                    
                    self.jobname = eventid
                    
                    # Add job to dynamoDB
                    job = {
                    'jobid': self.jobname,
                    'bucket': self.bucket,
                    'path': eventid+'/source.zip'}
                    
                    self.ddb_table.put_item(Item=job)
                except ClientError as e:
                    logging.error(e)
                    return False
            
            else:
                try:
                    self.stage()
                    self.micro = True
                    logging.info("By submitting a micro job, you are restricted to using ortools, pyomo and deap libraries for jobs that last up to 5 minutes")
                    lamclient = boto3.client("lambda")
                    response = lamclient.invoke(
                        FunctionName=self.lambdaopt,
                        InvocationType='RequestResponse',
                        LogType='Tail',
                        Payload=json.dumps({'s3bucket':self.asyncbucket,'s3key':self.stagedkey}).encode())
                        
                    # Add job to dynamoDB
                    job = {
                    'jobid': self.stagedkey,
                    'bucket': self.asyncbucket,
                    'path': self.stagedkey+'/source.zip'}
                    
                    self.ddb_table.put_item(Item=job)
                    
                    base64_message = response['LogResult']
                    print(base64.b64decode(base64_message.encode()).decode())
                    self.micrologs = base64.b64decode(base64_message.encode()).decode()
                    
                except ClientError as e:
                    logging.error(e)
                    return False
            
        else:
            logging.error('Please configure the job first!')
            
        self.submitted = True
    
    def list_recipes(self, limit=10, verbose=True):
        """Returns list of recipes registered
        
        :param limit: Number of recipes to return, Defaults to 10
        :type limit: int, optional
        :param verbose: Verbose print of the recipe table, Defaults to True
        :type verbose: bool, optional
        :return: Recipe table scan (raw) results
        :rtype: boto3 response
        """
        ddb_client = boto3.client('dynamodb')
        
        response = ddb_client.scan(
            TableName=self.recipetable,
            Limit=limit)
        
        allrecipes = []
        for job in response['Items']:
            allrecipes.append({'recipeid':job['recipeid']['S'], 'bucket':job['bucket']['S'], 'path':job['path']['S'], 'description':job['description']['S'], 'maintainer':job['maintainer']['S'], 'code':job['code']['S']})
            if verbose:
                print(f"recipeid:{job['recipeid']['S']} | bucket:{job['bucket']['S']} | path:{job['path']['S']} | description:{job['description']['S']} | maintainer:{job['maintainer']['S']}")
        self.recipes = allrecipes
        
        return response
                  
    def list_jobs(self, limit=10, verbose=True):
        """Returns list of jobs submitted
        
        :param limit: Number of jobs to return, Defaults to 10
        :type limit: int, optional
        :param verbose: Verbose print of the job table, Defaults to True
        :type verbose: bool, optional
        :return: job table scan (raw) results
        :rtype: boto3 response
        """
        ddb_client = boto3.client('dynamodb')
        
        response = ddb_client.scan(
            TableName=self.jobtable,
            Limit=limit)
        
        alljobs = []
        for job in response['Items']:
            alljobs.append({'jobid':job['jobid']['S'], 'bucket':job['bucket']['S'], 'path':job['path']['S']})
            if verbose:
                print(f"jobid:{job['jobid']['S']} | bucket:{job['bucket']['S']} | path:{job['path']['S']}")
        
        self.jobs = alljobs
        
        return response

    
    def stream_logs(self,start_time=0, skip=0):
        """Internal, use self.logs() instead of streaming
        """
        #def log_stream(client, log_group, stream_name, start_time=0, skip=0): from SM stream logs

        next_token = None
    
        event_count = 1
        while event_count > 0:
            if next_token is not None:
                token_arg = {"nextToken": next_token}
            else:
                token_arg = {}
            
            taskarn = self.status()['tasks'][0]['taskArn'].split('/')[-1]
            client = boto3.client('logs')
                        
            response = client.get_log_events(
                logGroupName='faroptlogGroup',
                logStreamName='faroptlogs/FarOptImage/' + taskarn,
                startTime=start_time,
                startFromHead=True,
                **token_arg
            )
            
            next_token = response["nextForwardToken"]
            events = response["events"]
            event_count = len(events)
            if event_count > skip:
                events = events[skip:]
                skip = 0
            else:
                skip = skip - event_count
                events = []
            for ev in events:
                yield ev
     
    
    def stop(self):
        """Stops a submitted task
        """
        # if self.primary_status() in ['STOPPED','DEPROVISIONING','RUNNING']:
        if self.submitted and self.micro==False:
            client = boto3.client('ecs')
            taskarn = self.status()['tasks'][0]['taskArn'].split('/')[-1]
            response = client.stop_task(
                cluster='FarOptCluster',
                task=taskarn,
                reason='User stopped task'
                )
        else:
            logging.info("Please ensure you have submitted a non-micro job")
        # else:
        #     logging.info('Job status: ' + self.primary_status())
            
    
    def printlogs(self,response):

        for ev in response['events']:
            print(str(ev['timestamp']) + ' | ' + ev['message'])
    
    def logs(self):
        """Prints logs of a submitted job. 
        """
        if self.primary_status() in ['STOPPED','DEPROVISIONING','RUNNING']:
            if self.micro == False:
                taskarn = self.status()['tasks'][0]['taskArn'].split('/')[-1]
                client = boto3.client('logs')
                response = client.get_log_events(
                            logGroupName='faroptlogGroup',
                            logStreamName='faroptlogs/FarOptImage/' + taskarn)
                
                self.printlogs(response)
            
            else:
                print(self.micrologs)

        else:
            print('Please wait for the task to start running | ' + self.primary_status())
                
                
    def primary_status(self):
        """Returns the last status of the submitted job; Can be PROVISIONING > PENDING > RUNNING > STOPPED > JOB COMPLETED 
        
        :return: primary staus
        :rtype: string
        """
        if self.micro == False:
            return self.status()['tasks'][0]['lastStatus']
        else:
            return 'STOPPED'
        
    def status(self):
        """Returns the full status of the submitted job; used in primary_status, which should be enough for most use cases
        """
        if self.submitted and self.micro==False:

            client = boto3.client('ecs')
            response1 = client.list_tasks(
                        cluster='FarOptCluster',
                        startedBy=self.jobname)
            
            running_tasks = response1['taskArns']
            
            if running_tasks == []:
                logging.info("No running tasks. Checking completed tasks...")
                #check if stopped tasks exist
                response1 = client.list_tasks(
                        cluster='FarOptCluster',
                        startedBy=self.jobname,
                        desiredStatus='STOPPED')
                        
                stopped_tasks = response1['taskArns']
                        
            response = client.describe_tasks(cluster='FarOptCluster',
            tasks=[response1['taskArns'][0]])
            
            return response
                        
        else:
            logging.error("Please submit a non-micro job first!")
                      
    def get_metric_data(self,metric_name):
        """Returns raw metric data that was submitted from the backend. To use this, do from utils import * in your main.py, and then use log_metric like this, for e.g: log_metric('total_distance',total_distance)
        
        :return: response from cloudwatch 
        :rtype: json string
        """

        cloudwatch = boto3.resource('cloudwatch')
        metric = cloudwatch.Metric('FarOpt',metric_name)

        response = metric.get_statistics(
        Dimensions=[
            {
                'Name': 'jobid',
                'Value': self.jobname
            },
        ],
        StartTime=datetime.now() - timedelta(minutes=24),
        EndTime=datetime.now(),
        Period=1,Statistics=['Average','Minimum','Maximum'])

        return response
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



