in src/plugins/scanners/git-sync.py [0:0]
def scan(KibbleBit, source):
# Get some vars, construct a data path for the repo
path = source['sourceID']
url = source['sourceURL']
rootpath = "%s/%s/git" % (KibbleBit.config['scanner']['scratchdir'], source['organisation'])
# If the root path does not exist, try to make it recursively.
if not os.path.exists(rootpath):
try:
os.makedirs(rootpath, exist_ok = True)
print("Created root path %s" % rootpath)
except Exception as err:
source['steps']['sync'] = {
'time': time.time(),
'status': 'Could not create root scratch dir - permision denied?',
'running': False,
'good': False
}
KibbleBit.updateSource(source)
return
# This is were the repo should be cloned
datapath = os.path.join(rootpath, path)
KibbleBit.pprint("Checking out %s as %s" % (url, path))
try:
source['steps']['sync'] = {
'time': time.time(),
'status': 'Fetching code data from source location...',
'running': True,
'good': True
}
KibbleBit.updateSource(source)
# If we already checked this out earlier, just sync it.
if os.path.exists(datapath):
KibbleBit.pprint("Repo %s exists, fetching changes..." % datapath)
# Do we have a default branch here?
branch = plugins.utils.git.defaultBranch(source, datapath, KibbleBit)
if len(branch) == 0:
source['default_branch'] = branch
source['steps']['sync'] = {
'time': time.time(),
'status': "Could not sync with source",
'exception': "No default branch was found in this repository",
'running': False,
'good': False
}
KibbleBit.updateSource(source)
KibbleBit.pprint("No default branch found for %s (%s)" % (source['sourceID'], source['sourceURL']))
return
KibbleBit.pprint("Using branch %s" % branch)
# Try twice checking out the main branch and fetching changes.
# Sometimes we need to clean up after older scanners, which is
# why we try twice. If first attempt fails, clean up and try again.
for n in range(0,2):
try:
subprocess.check_output("GIT_TERMINAL_PROMPT=0 cd %s && git checkout %s && git fetch --all && git merge -X theirs --no-edit" % (datapath, branch), shell = True, stderr=subprocess.STDOUT)
break
except subprocess.CalledProcessError as err:
e = str(err.output).lower()
# We're interested in merge conflicts, which we can resolve through trickery.
if n > 0 or not ('resolve' in e or 'merge' in e or 'overwritten' in e):
# This isn't a merge conflict, pass it to outer func
raise err
else:
# Switch to first commit
fcommit = subprocess.check_output("cd %s && git rev-list --max-parents=0 --abbrev-commit HEAD" % datapath, shell = True, stderr=subprocess.STDOUT)
fcommit = fcommit.decode('ascii').strip()
subprocess.check_call("cd %s && git reset --hard %s" % (datapath, fcommit), shell = True, stderr=subprocess.STDOUT)
try:
subprocess.check_call("cd %s && git clean -xfd" % datpath, shell = True, stderr=subprocess.STDOUT)
except:
pass
# This is a new repo, clone it!
else:
KibbleBit.pprint("%s is new, cloning...!" % datapath)
subprocess.check_output("GIT_TERMINAL_PROMPT=0 cd %s && git clone %s %s" % (rootpath, url, path), shell = True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
KibbleBit.pprint("Repository sync failed (no master?)")
KibbleBit.pprint(str(err.output))
source['steps']['sync'] = {
'time': time.time(),
'status': 'Sync failed at ' + time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()),
'running': False,
'good': False,
'exception': str(err.output)
}
KibbleBit.updateSource(source)
return
# All good, yay!
source['steps']['sync'] = {
'time': time.time(),
'status': 'Source code fetched successfully at ' + time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()),
'running': False,
'good': True
}
KibbleBit.updateSource(source)