elastic/endpoint/track_processors/assets_loader.py (103 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import shutil
from urllib.parse import urlparse
from esrally.track import ComponentTemplate, Index, IndexTemplate
logger = logging.getLogger(__name__)
def load_index_template(track, asset_content, kibana_space="default"):
index_name = asset_content["name"]
track.composable_templates += [
IndexTemplate(
index_name,
index_pattern,
asset_content,
)
for index_pattern in asset_content["index_template"]["index_patterns"]
]
track.data_streams.append(Index(f"{index_name}-{kibana_space}"))
def load_component_template(track, asset_content):
track.component_templates.append(
ComponentTemplate(
asset_content["name"],
asset_content["component_template"],
)
)
def load_ingest_pipeline(track, asset_content):
pass
def load_ilm_policy(track, asset_content):
pass
asset_loaders = {
"component_templates": load_component_template,
"index_templates": load_index_template,
"ingest_pipelines": load_ingest_pipeline,
"ilm_policies": load_ilm_policy,
}
def download_from_github(track, packages, repo_path, assets_root):
from elastic.package import assets
from github import Github
if not packages:
raise ValueError("Required param 'packages' is empty or not configured")
github = Github(os.getenv("ASSETS_AUTH_TOKEN") or None)
repo = github.get_repo(repo_path)
for package in packages:
logger.info(f"Downloading assets of [{package}] from [{repo.html_url}]")
entries = assets.get_remote_assets(package, repo)
dest_package_path = os.path.join(assets_root, package)
if os.path.exists(dest_package_path):
shutil.rmtree(dest_package_path)
count = 0
for path, content in assets.download_assets(entries):
asset_path = os.path.join(assets_root, path)
os.makedirs(os.path.dirname(asset_path), exist_ok=True)
with open(asset_path, "wb") as f:
f.write(content)
path_parts = os.path.split(path[len(package) + 1 :])
if not path_parts[0]:
continue
if path_parts[0] in asset_loaders:
asset_loaders[path_parts[0]](track, json.loads(content))
count += 1
logger.info(f"Loaded [{count}] assets")
def load_from_path(track, packages, path):
from elastic.package import assets
if not packages:
raise ValueError("Required param 'packages' is empty or not configured")
for package in packages:
logger.info(f"Loading assets of [{package}] from [{path}]")
count = 0
for path, content in assets.get_local_assets(package, path):
path_parts = os.path.split(path[len(package) + 1 :])
if not path_parts[0]:
continue
if path_parts[0] in asset_loaders:
asset_loaders[path_parts[0]](track, json.loads(content))
count += 1
logger.info(f"Loaded [{count}] assets")
class AssetsLoader:
def on_after_load_track(self, track):
try:
from elastic.package import assets # noqa: F401
except ModuleNotFoundError:
logger.warning("Cannot import module [elastic.package.assets], assets are not loaded")
return
asset_groups = track.selected_challenge_or_default.parameters.get("assets", [])
for assets_group in asset_groups:
repository = assets_group.get("repository", "https://github.com/elastic/package-assets")
packages = assets_group.get("packages", [])
repo_parts = urlparse(repository)
if repo_parts.scheme.startswith("http") and repo_parts.netloc == "github.com":
assets_root = os.path.join(track.root, "assets", repo_parts.path[1:])
download_from_github(track, packages, repo_parts.path[1:], assets_root)
elif repo_parts.scheme == "file":
if repo_parts.netloc == ".":
assets_root = os.path.join(track.root, "." + repo_parts.path)
else:
assets_root = repo_parts.path
load_from_path(track, packages, assets_root)
else:
raise ValueError(f"Unsupported repository: {repository}")
assets_group["path"] = os.path.abspath(assets_root)
logger.info(f"Assets group path is [{assets_group['path']}]")
def on_prepare_track(self, track, data_root_dir):
return []