234 lines
8.9 KiB
Python

import argparse
import os
import re
import tarfile
import tempfile
import time
import zipfile
from functools import wraps
import gitlab
try:
from typing import Any, Callable, Dict, List, Optional
TR = Callable[..., Any]
except ImportError:
pass
def retry(func): # type: (TR) -> TR
"""
This wrapper will only catch several exception types associated with
"network issues" and retry the whole function.
"""
@wraps(func)
def wrapper(self, *args, **kwargs): # type: (Gitlab, Any, Any) -> Any
retried = 0
while True:
try:
res = func(self, *args, **kwargs)
except (IOError, EOFError, gitlab.exceptions.GitlabError) as e:
if isinstance(e, gitlab.exceptions.GitlabError) and e.response_code != 500:
# Only retry on error 500
raise e
retried += 1
if retried > self.DOWNLOAD_ERROR_MAX_RETRIES:
raise e # get out of the loop
else:
print('Network failure in {}, retrying ({})'.format(getattr(func, '__name__', '(unknown callable)'), retried))
time.sleep(2 ** retried) # wait a bit more after each retry
continue
else:
break
return res
return wrapper
class Gitlab(object):
JOB_NAME_PATTERN = re.compile(r"(\w+)(\s+(\d+)/(\d+))?")
DOWNLOAD_ERROR_MAX_RETRIES = 3
def __init__(self, project_id=None): # type: (Optional[int]) -> None
config_data_from_env = os.getenv("PYTHON_GITLAB_CONFIG")
if config_data_from_env:
# prefer to load config from env variable
with tempfile.NamedTemporaryFile('w', delete=False) as temp_file:
temp_file.write(config_data_from_env)
config_files = [temp_file.name] # type: Optional[List[str]]
else:
# otherwise try to use config file at local filesystem
config_files = None
self._init_gitlab_inst(project_id, config_files)
@retry
def _init_gitlab_inst(self, project_id, config_files): # type: (Optional[int], Optional[List[str]]) -> None
gitlab_id = os.getenv('LOCAL_GITLAB_HTTPS_HOST') # if None, will use the default gitlab server
self.gitlab_inst = gitlab.Gitlab.from_config(gitlab_id=gitlab_id, config_files=config_files)
self.gitlab_inst.auth()
if project_id:
self.project = self.gitlab_inst.projects.get(project_id)
else:
self.project = None
@retry
def get_project_id(self, name, namespace=None): # type: (str, Optional[str]) -> int
"""
search project ID by name
:param name: project name
:param namespace: namespace to match when we have multiple project with same name
:return: project ID
"""
projects = self.gitlab_inst.projects.list(search=name)
res = []
for project in projects:
if namespace is None:
if len(projects) == 1:
res.append(project.id)
break
if project.namespace["path"] == namespace:
if project.name == name:
res.insert(0, project.id)
else:
res.append(project.id)
if not res:
raise ValueError("Can't find project")
return int(res[0])
@retry
def download_artifacts(self, job_id, destination): # type (int, str) -> None
"""
download full job artifacts and extract to destination.
:param job_id: Gitlab CI job ID
:param destination: extract artifacts to path.
"""
job = self.project.jobs.get(job_id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
job.artifacts(streamed=True, action=temp_file.write)
with zipfile.ZipFile(temp_file.name, "r") as archive_file:
archive_file.extractall(destination)
@retry
def download_artifact(self, job_id, artifact_path, destination=None): # type: (int, str, Optional[str]) -> List[bytes]
"""
download specific path of job artifacts and extract to destination.
:param job_id: Gitlab CI job ID
:param artifact_path: list of path in artifacts (relative path to artifact root path)
:param destination: destination of artifact. Do not save to file if destination is None
:return: A list of artifact file raw data.
"""
job = self.project.jobs.get(job_id)
raw_data_list = []
for a_path in artifact_path:
try:
data = job.artifact(a_path) # type: bytes
except gitlab.GitlabGetError as e:
print("Failed to download '{}' form job {}".format(a_path, job_id))
raise e
raw_data_list.append(data)
if destination:
file_path = os.path.join(destination, a_path)
try:
os.makedirs(os.path.dirname(file_path))
except OSError:
# already exists
pass
with open(file_path, "wb") as f:
f.write(data)
return raw_data_list
@retry
def find_job_id(self, job_name, pipeline_id=None, job_status='success'): # type: (str, Optional[str], str) -> List[Dict]
"""
Get Job ID from job name of specific pipeline
:param job_name: job name
:param pipeline_id: If None, will get pipeline id from CI pre-defined variable.
:param job_status: status of job. One pipeline could have multiple jobs with same name after retry.
job_status is used to filter these jobs.
:return: a list of job IDs (parallel job will generate multiple jobs)
"""
job_id_list = []
if pipeline_id is None:
pipeline_id = os.getenv("CI_PIPELINE_ID")
pipeline = self.project.pipelines.get(pipeline_id)
jobs = pipeline.jobs.list(all=True)
for job in jobs:
match = self.JOB_NAME_PATTERN.match(job.name)
if match:
if match.group(1) == job_name and job.status == job_status:
job_id_list.append({"id": job.id, "parallel_num": match.group(3)})
return job_id_list
@retry
def download_archive(self, ref, destination, project_id=None): # type: (str, str, Optional[int]) -> str
"""
Download archive of certain commit of a repository and extract to destination path
:param ref: commit or branch name
:param destination: destination path of extracted archive file
:param project_id: download project of current instance if project_id is None
:return: root path name of archive file
"""
if project_id is None:
project = self.project
else:
project = self.gitlab_inst.projects.get(project_id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
try:
project.repository_archive(sha=ref, streamed=True, action=temp_file.write)
except gitlab.GitlabGetError as e:
print("Failed to archive from project {}".format(project_id))
raise e
print("archive size: {:.03f}MB".format(float(os.path.getsize(temp_file.name)) / (1024 * 1024)))
with tarfile.open(temp_file.name, "r") as archive_file:
root_name = archive_file.getnames()[0]
archive_file.extractall(destination)
return os.path.join(os.path.realpath(destination), root_name)
def main(): # type: () -> None
parser = argparse.ArgumentParser()
parser.add_argument("action")
parser.add_argument("project_id", type=int)
parser.add_argument("--pipeline_id", "-i", type=int, default=None)
parser.add_argument("--ref", "-r", default="master")
parser.add_argument("--job_id", "-j", type=int, default=None)
parser.add_argument("--job_name", "-n", default=None)
parser.add_argument("--project_name", "-m", default=None)
parser.add_argument("--destination", "-d", default=None)
parser.add_argument("--artifact_path", "-a", nargs="*", default=None)
args = parser.parse_args()
gitlab_inst = Gitlab(args.project_id)
if args.action == "download_artifacts":
gitlab_inst.download_artifacts(args.job_id, args.destination)
if args.action == "download_artifact":
gitlab_inst.download_artifact(args.job_id, args.artifact_path, args.destination)
elif args.action == "find_job_id":
job_ids = gitlab_inst.find_job_id(args.job_name, args.pipeline_id)
print(";".join([",".join([str(j["id"]), j["parallel_num"]]) for j in job_ids]))
elif args.action == "download_archive":
gitlab_inst.download_archive(args.ref, args.destination)
elif args.action == "get_project_id":
ret = gitlab_inst.get_project_id(args.project_name)
print("project id: {}".format(ret))
if __name__ == '__main__':
main()