Source code for skare3_tools.packages

#!/usr/bin/env python3
r"""
A module to keep track of all package information (repository, conda package info, etc).

Package List
------------

One of the main purposes of this module is to maintain a list of "packages". Some packages have an
associated github repository, which can be owned by one or more organizations. Some packages
have an associated conda package, which is listed in one or more conda channels. The package list is
the union of the conda packages and the github repositories. The name of the package, the name of
the repository and the name of the conda package might not be the same.

To assemble the package list, this module uses:

- All skare3/pkg_defs/\*/meta.yaml files within the skare3 repository
- the list of all repositories for a given list of organizations (sot, acisops)

The package list is cached locally. The cache expires after one day.
To use this module to get the package list, use :func:`~skare3_tools.packages.get_package_list`::

    >>> from skare3_tools import packages
    >>> pkgs = packages.get_package_list()
    >>> pkgs[0]
    {'name': 'ska3-core',
     'package': 'ska3-core',
     'repository': None,
     'owner': None}

Package Info
------------

Some information about each package is cached locally. The cache expires whenever there is an
"update" or a "push" to the associated Github repository. The information includes information such
as the number of open pull requests, number of branches. It also includes versions available in
conda channels.

To get the current information associated with a package using
:func:`~skare3_tools.packages.get_repository_info`::

    >>> from skare3_tools import packages
    >>> pkg = packages.get_repository_info('sot/Quaternion')
    >>> pkg.keys()
    dict_keys(['owner', 'name', 'pushed_at', 'updated_at', 'last_tag', 'last_tag_date',
    'commits', 'merges', 'merge_info', 'release_info', 'issues', 'n_pull_requests',
    'branches', 'pull_requests', 'workflows', 'master_version'])

The information on all packages can be accessed with
:func:`~skare3_tools.packages.get_repositories_info`::

    >>> from skare3_tools import packages
    >>> pkg = packages.get_repositories_info()

Conda Info
----------

As part of the call to get_repository_info, the conda package versions are also fetched. This is
done with :func:`~skare3_tools.packages.get_conda_pkg_info`, something like::

    >>> from skare3_tools import packages
    >>> info = packages.get_conda_pkg_info('quaternion')

By default, this function looks for information on packages from a set of channels specified as
the "main" channels. Extra sets of channels (i.e.: test, masters, shiny) can be specified as part
of the :ref:`Configuration`, in which case one can do::

    >>> from skare3_tools import packages
    >>> info = packages.get_conda_pkg_info('quaternion', conda_channel='masters')

"""

import argparse
import datetime
import glob
import json
import logging
import os
import re
import subprocess
import sys
import urllib
from pathlib import Path

import jinja2
import requests
import yaml
from packaging.version import InvalidVersion, Version

from skare3_tools import github
from skare3_tools.config import CONFIG


class NetworkException(Exception):
    pass


def dir_access_ok(path):
    """
    Returns true if the given path has write access or can be created.
    """
    path = Path(path).resolve()
    if os.path.exists(path):
        return os.access(path, os.W_OK)
    # if path does not exist, climb up the hierarchy to see if it can be created
    if path.parent != path:
        return dir_access_ok(path.parent)
    return False


[docs] def json_cache(name, directory="", ignore=None, expires=None, update_policy=None): r""" Decorator to cache function results in json format. This decorator adds an 'update' argument to decorated functions. update is False by default, but one can set it to True to force-update the cache entry. Data is saved in json files. The file names can include a special separator character to denote the function arguments. Currently that character is ':'. :param name: :param directory: str path where to save json file. Either absolute or relative to CONFIG['data_dir'] :param ignore: list list of argument names to ignore in the cache entry identifier :param expires: dict a dictionary that can be given to datetime.timedelta(\*\*expires) If the cache entry is older than this interval, it is updated. :param update_policy: callable A callable taking two arguments: (filename, result), which returns True if the cache entry should be updated. :return: """ import inspect from functools import wraps directory = os.path.normpath(os.path.join(CONFIG["data_dir"], directory)) if not ignore: ignore = [] if expires: expires = datetime.timedelta(**expires) def decorator_cache(func, ignore_args=ignore, expiration=expires, name=name): signature = inspect.signature(func) name += "::" @wraps(func) def wrapper(*args, update=False, **kwargs): s_args = signature.bind(*args, **kwargs).arguments arg_str = "-".join( [ "{a}:{v}".format(a=a, v=s_args[a]) for a in s_args if a not in ignore_args ] ) filename = "{name}{arg_str}.json".format(name=name, arg_str=arg_str) # in an ideal world, filename would be completely sanitized... this world is not ideal. filename = filename.replace(os.sep, "-") filename = os.path.join(directory, filename) if expiration is not None and os.path.exists(filename): m_time = datetime.datetime.fromtimestamp(os.path.getmtime(filename)) update = update or (datetime.datetime.now() - m_time > expiration) result = None if os.path.exists(filename): with open(filename) as file: result = json.load(file) if update_policy is not None and result is not None: update = update or update_policy(filename, result) if not dir_access_ok(filename): if result is None: raise Exception( f"No write access to cache file {filename} and no cached value" ) logging.getLogger("skare3").debug( f"No write access to cache file {filename}" ) update = False if result is None or update: result = func(*args, **kwargs) directory_out = os.path.dirname(filename) if not os.path.exists(directory_out): os.makedirs(directory_out) with open(filename, "w") as file: json.dump(result, file) return result def clear_cache(): files = os.path.join(directory, "{name}*.json".format(name=name)) files = glob.glob(files) if files: subprocess.run(["rm"] + files, check=False) wrapper.clear_cache = clear_cache sig = inspect.signature(func) def rm_cache_entry(*args, s=sig, **kwargs): s_args = s.bind(*args, **kwargs).arguments arg_str = "-".join( [ "{a}:{v}".format(a=a, v=s_args[a]) for a in s_args if a not in ignore_args ] ) filename = os.path.join( directory, "{name}{arg_str}.json".format(name=name, arg_str=arg_str) ) if os.path.exists(filename): os.remove(filename) wrapper.rm_cache_entry = rm_cache_entry return wrapper return decorator_cache
def _ensure_skare3_local_repo(update=True): repo_dir = os.path.join(CONFIG["data_dir"], "skare3") parent = os.path.dirname(repo_dir) if not os.path.exists(parent): os.makedirs(parent) if not os.path.exists(repo_dir): _ = subprocess.run( ["git", "clone", "https://github.com/sot/skare3", repo_dir], cwd=CONFIG["data_dir"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) elif update: _ = subprocess.run( ["git", "pull"], cwd=repo_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) assert os.path.exists(repo_dir) def _conda_package_list(update=True): _ensure_skare3_local_repo(update) all_meta = glob.glob( os.path.join(CONFIG["data_dir"], "skare3", "pkg_defs", "*", "meta.yaml") ) all_info = [] for f in all_meta: macro = "{% macro compiler(arg) %}{% endmacro %}\n" info = yaml.load( jinja2.Template(macro + open(f).read()).render(), Loader=yaml.FullLoader ) pkg_info = { "name": os.path.basename(os.path.dirname(f)), "package": info["package"]["name"], "repository": None, "owner": None, } if "about" in info and "home" in info["about"]: home = info["about"]["home"].strip() matches = [ re.match(r"git@github.com:(?P<org>[^/]+)/(?P<repo>\S+)\.git$", home), re.match(r"git@github.com:(?P<org>[^/]+)/(?P<repo>\S+)$", home), re.match(r"https?://github.com/(?P<org>[^/]+)/(?P<repo>[^/]+)/?", home), ] for match in matches: if match: org_repo = match.groupdict() pkg_info["owner"] = org_repo["org"] pkg_info["repository"] = "{org}/{repo}".format(**org_repo) pkg_info["home"] = info["about"]["home"] break # else: # pkg_info['home'] = '' # print(f, pkg_info['repository']) all_info.append(pkg_info) return all_info
[docs] @json_cache("pkg_name_map", expires={"days": 1}) def get_package_list(): """ Return a list of dictionaries, one per package. :return: dict Dictionary contains only basic information """ all_packages = _conda_package_list() full_names = [p["repository"] for p in all_packages] organizations = [github.Organization(org) for org in CONFIG["organizations"]] repositories = [r for org in organizations for r in org.repositories()] for r in repositories: if r["full_name"] in full_names: continue all_packages.append( { "name": r["full_name"], "package": None, "repository": r["full_name"], "owner": r["owner"]["login"], } ) all_packages = sorted( all_packages, key=lambda p: (str(p["repository"]) if p["repository"] else "", p["name"]), ) return all_packages
def _get_tag_target(tag): if "target" in tag: return _get_tag_target(tag["target"]) else: return tag["oid"], tag["committedDate"] # I did not assemble these queries in my mind. # If you need to change one of these queries, # go to https://docs.github.com/en/graphql/overview/explorer # copy the query into the dialog, edit the template parameters # (you can remove the 'before: "{{ cursor }}"' part) # run it to see it works, then click where it says "explorer" # and that should bring up a tree view where you can click to edit the query. _PR_QUERY = """ { repository(name: "{{ name }}", owner: "{{ owner }}") { name owner { login } pullRequests(last: 100, before: "{{ cursor }}") { nodes { number title url mergeCommit { oid } commits(last: 100) { totalCount nodes { commit { committedDate pushedDate message } } } baseRefName headRefName author { ... on User { name } } state } pageInfo { hasPreviousPage hasNextPage startCursor endCursor } } } } """ _COMPARE_COMMITS_QUERY = """ { repository(name: "{{ name }}", owner: "{{ owner }}") { ref(qualifiedName: "{{ base }}") { compare(headRef: "{{ head }}") { aheadBy behindBy commits(first: 100, after: "{{ cursor }}") { nodes { oid message pushedDate author { user { login } } } pageInfo { hasPreviousPage hasNextPage startCursor endCursor } } } } } } """ _COMMIT_QUERY = """ { repository(name: "{{ name }}", owner: "{{ owner }}") { name owner { login } defaultBranchRef { target { ... on Commit { history(first: 100, after: "{{ cursor }}") { pageInfo { hasNextPage endCursor } nodes { oid message pushedDate author { user { login } } } } } } } } } """ class Dict(dict): def __getitem__(self, i): if i in self.keys(): return super().__getitem__(i) return self.node(self, i) @staticmethod def _node(root, path): if path: return Dict._node(root[path[0]], path[1:]) return root @staticmethod def node(root, path): path = path.split("/") return Dict._node(root, path) def get_all_nodes( owner, name, path, query, query_2=None, at="", reverse=False, **kwargs ): if reverse: cursor = "startCursor" has_more = "hasPreviousPage" else: cursor = "endCursor" has_more = "hasNextPage" data = Dict( github.GITHUB_API_V4( jinja2.Template(query).render(name=name, owner=owner, cursor=at, **kwargs) ) ) check_api_errors(data) commits = data[path]["nodes"] if query_2 is None: query_2 = query while data[path]["pageInfo"][has_more]: if at == data[path]["pageInfo"][cursor]: raise RuntimeError("Cursor did not change and will cause an infinite loop") at = data[path]["pageInfo"][cursor] data = Dict( github.GITHUB_API_V4( jinja2.Template(query_2).render( name=name, owner=owner, cursor=at, **kwargs ) ) ) check_api_errors(data) commits += data[path]["nodes"] return commits def check_api_errors(data): if "errors" in data: try: msg = "\n".join([e["message"] for e in data["errors"]]) except Exception: raise Exception(str(data["errors"])) from None raise Exception(msg) def _pr_commits(commits, all_pull_requests): merges = [] pulls_v_hash = { pr["mergeCommit"]["oid"]: pr for pr in all_pull_requests.values() if pr["mergeCommit"] is not None } for commit in commits: match = re.match( r"Merge pull request #(?P<pr_number>.+) from (?P<branch>\S+)(\n\n(?P<title>.+))?", commit["message"], ) if commit["oid"] in pulls_v_hash: merge = { "pr_number": pulls_v_hash[commit["oid"]]["number"], "title": pulls_v_hash[commit["oid"]]["title"], "branch": pulls_v_hash[commit["oid"]]["headRefName"], "author": pulls_v_hash[commit["oid"]]["author"]["name"], } merges.append(merge) elif match: # I don't think it will ever enter this branch # this would be recognizable in the dashboard because the PR author is unknown merge = match.groupdict() merge["pr_number"] = int(merge["pr_number"]) merge["author"] = "Unknown" merges.append(merge) return merges def _get_repository_info_v4( owner_repo, since=7, include_unreleased_commits=False, include_commits=False, ): owner, name = owner_repo.split("/") api = github.GITHUB_API_V4 data_v4 = Dict( api(jinja2.Template(github.graphql.REPO_QUERY).render(name=name, owner=owner)) ) if "errors" in data_v4: try: msg = "\n".join([e["message"] for e in data_v4["errors"]]) except Exception: raise Exception(str(data_v4["errors"])) from None raise Exception(msg) branches = [ n for n in data_v4["data/repository/refs/nodes"] if re.match("heads/", n["name"]) ] releases = data_v4["data/repository/releases/nodes"] issues = data_v4["data/repository/issues/nodes"] default_branch = data_v4["data/repository/defaultBranchRef/name"] commits_path = "data/repository/defaultBranchRef/target/history" commits = data_v4[commits_path]["nodes"] if data_v4[commits_path]["pageInfo"]["endCursor"] is not None: # append the rest of the commits only if there were commits to begin with commits += get_all_nodes( owner, name, commits_path, _COMMIT_QUERY, reverse=False, at=data_v4[commits_path]["pageInfo"]["endCursor"], ) pull_requests_path = "data/repository/pullRequests" pull_requests = data_v4[pull_requests_path]["nodes"] if data_v4[pull_requests_path]["pageInfo"]["startCursor"] is not None: # append the rest of the PRs only if there were commits to begin with pull_requests += get_all_nodes( owner, name, pull_requests_path, _PR_QUERY, reverse=True, at=data_v4[pull_requests_path]["pageInfo"]["startCursor"], ) # from now, keep a list of the open pull requests on the main branch all_pull_requests = {pr["number"]: pr for pr in pull_requests} pull_requests = [ pr for pr in pull_requests if pr["state"] not in ["CLOSED", "MERGED"] and pr["baseRefName"] == default_branch ] pull_requests = [ { "number": pr["number"], "author": pr["author"]["name"], "url": pr["url"], "title": pr["title"], "n_commits": pr["commits"]["totalCount"], "last_commit_date": pr["commits"]["nodes"][-1]["commit"]["pushedDate"], } for pr in pull_requests ] pull_requests = sorted(pull_requests, key=lambda pr: pr["number"], reverse=True) # get release info since "since", excluding drafts, pre-releases, invalid versions releases = [r for r in releases if not r["isPrerelease"] and not r["isDraft"]] exclude = [] for rel in releases: rel["tag_oid"], rel["committed_date"] = _get_tag_target(rel["tag"]) try: Version(rel["tagName"]) except InvalidVersion: logging.debug( f"{owner_repo} release {rel['tagName']} does not conform to PEP 440. " "It will be ignored" ) exclude += [rel["tagName"]] releases = [r for r in releases if r["tagName"] not in exclude] releases = sorted(releases, key=lambda r: Version(r["tagName"]), reverse=True) release_tags = [r["tagName"] for r in releases] if isinstance(since, int): # keeping the last "since" releases, plus the current main branch releases = releases[: since + 1] elif since in release_tags: # keeping up to the "since" tag (inclusive), plus the current main branch releases = releases[: release_tags.index(since) + 2] elif since is not None: raise Exception( "Requested repository info with since={since},".format(since=since) + "which is not and integer and is not one of the known releases" + "({release_tags})".format(release_tags=release_tags) ) if len(releases) == 0: # if there are no releases, look for merge messages in all commits rel_prs = _pr_commits(commits, all_pull_requests) else: # if there are releases, look for merge messages in the commits since the last release rel_commits = get_all_nodes( owner, name, "data/repository/ref/compare/commits", _COMPARE_COMMITS_QUERY, reverse=False, base=releases[0]["tagName"], head=default_branch, ) rel_prs = _pr_commits(rel_commits, all_pull_requests) # the first entry in release_info does not correspond to a release # it's the list of PRs (and commits) waiting to be released. release_info = [ { "release_tag": "", "release_tag_date": "", "release_commit_date": datetime.datetime.now().isoformat(), "commits": [], "merges": rel_prs, } ] for base, head in zip(releases[1:], releases[:-1], strict=True): rel_commits = get_all_nodes( owner, name, "data/repository/ref/compare/commits", _COMPARE_COMMITS_QUERY, reverse=False, base=base["tagName"], head=head["tagName"], ) rel_prs = _pr_commits(rel_commits, all_pull_requests) release = { "release_sha": head["tag_oid"], "release_commit_date": head["committed_date"], "release_tag": head["tagName"], "release_tag_date": head["publishedAt"], "commits": [], "merges": rel_prs, } release_info.append(release) # the first entry in the list is not a release, but the current main branch release_info = release_info[:1] + sorted( release_info[1:], key=lambda r: Version(r["release_tag"]), reverse=True ) if len(release_info) > 1: last_tag = release_info[1]["release_tag"] last_tag_date = release_info[1]["release_tag_date"] else: last_tag = "" last_tag_date = "" # workflows are only in v3 headers = {"Accept": "application/vnd.github.antiope-preview+json"} workflows = github.GITHUB_API_V3.get( "/repos/{owner}/{name}/actions/workflows".format(owner=owner, name=name), headers=headers, ).json() workflows = [ {k: w[k] for k in ["name", "badge_url"]} for w in workflows["workflows"] ] repo_info = { "owner": owner, "name": name, "pushed_at": data_v4["data"]["repository"]["pushedAt"], "updated_at": data_v4["data"]["repository"]["updatedAt"], "last_tag": last_tag, "last_tag_date": last_tag_date, "commits": len(release_info[0]["commits"]), "merges": len(release_info[0]["merges"]), "merge_info": release_info[0]["merges"], "release_info": release_info, "issues": len(issues), "n_pull_requests": len(pull_requests), "branches": len(branches), "pull_requests": pull_requests, "workflows": workflows, } if not include_commits: for r in repo_info["release_info"]: del r["commits"] if not include_unreleased_commits and len(repo_info["release_info"]) == 1: repo_info["commits"] = 0 repo_info["merges"] = 0 repo_info["merge_info"] = [] return repo_info
[docs] def get_conda_pkg_info(conda_package, conda_channel=None): """ Get information on a conda package. :param conda_package: str Name of conda package :param conda_channel: str url of the channel :return: dict """ if sys.version_info == 3 >= (3, 7): kwargs = {"capture_output": True} else: kwargs = {"stdout": subprocess.PIPE} cmd = ["conda", "search", conda_package, "--override-channels", "--json"] if conda_channel is None: conda_channels = CONFIG["conda_channels"]["main"] elif isinstance(conda_channel, list): conda_channels = conda_channel elif conda_channel in CONFIG["conda_channels"]: conda_channels = CONFIG["conda_channels"][conda_channel] else: conda_channels = [conda_channel] unreachable = [] for c in conda_channels: try: requests.get(c.format(**os.environ), timeout=2) except KeyError as e: # this clears the exception we just caugh and raises another one raise Exception( "Missing expected environmental variable: {e}".format(e=str(e)) ) from None except requests.ConnectTimeout: c2 = urllib.parse.urlparse(c) c2 = urllib.parse.urlunparse( ( c2.scheme, c2.netloc.split("@")[-1], c2.path, c2.params, c2.query, c2.fragment, ) ) unreachable.append(c2) cmd += ["--channel", c.format(**os.environ)] if unreachable: msg = "The following conda channels are not reachable:\n -" msg += " -".join(unreachable) raise NetworkException(msg) check = kwargs.pop("check", True) p = subprocess.run(cmd, check=check, **kwargs) out = json.loads(p.stdout.decode()) if ( "error" in out and "exception_name" in out and out["exception_name"] == "PackagesNotFoundError" ): out = {} if "error" in out: if "message" in out: raise Exception(out["message"]) else: raise Exception(str(out)) for key in out: for pkg in out[key]: pkg["depends"] = _split_versions(pkg["depends"]) return out
def _split_versions(depends): """ Convert a list of package dependencies into a dictionary of the form {name: version}. Typically, "depends" comes from calling `conda search ska3-flight --info --json`. This function expects each row to be of the form "name==version" or "name version". If the version is not given, it is set to ''. """ result = {} for depend in depends: if "==" in depend: name_version = depend.split("==", maxsplit=1) else: name_version = depend.split(maxsplit=1) if len(name_version) == 2: name, version = name_version else: name, version = name_version[0], "" result[name.strip()] = version.strip() return result
[docs] def get_conda_pkg_dependencies(conda_package, conda_channel=None): """ Get dependencies of a conda package. :param conda_package: str Name of conda package :param conda_channel: str url of the channel :return: dict """ out = get_conda_pkg_info(conda_package, conda_channel) if not out: raise Exception( "{conda_package} not found.".format(conda_package=conda_package) ) return out[conda_package][-1]["depends"]
def _get_release_commit(repository, release_name): """ Get release commit. Quaternion releases 3.4.1 and 3.5.1 give different results. :param repository: :param release_name: :return: """ obj = repository.tags(name=release_name)["object"] if obj["type"] == "tag": obj = repository.tags(tag_sha=obj["sha"])["object"] if obj["type"] != "commit": raise Exception("Object is not a commit, but a {t}".format(t=obj["type"])) return obj def _get_repository_info_v3( owner_repo, since=7, include_unreleased_commits=False, include_commits=False, ): """ Get information about a Github repository. This uses Github API v3. This function is DEPRECATED, use v4 instead. :param owner_repo: str the name of the repository, including owner, something like 'sot/skare3'. :param since: int or str the maximum number of releases to look back, or the release tag to look back to (not inclusive). :param include_unreleased_commits: bool whether to include commits and merges for repositories that have no release. This affects only top-level entries 'commits', 'merges', 'merge_info'. It is for backward compatibility with the dashboard. :param include_commits: bool whether to include commits in release_info. :return: """ api = github.GITHUB_API_V3 owner, repo = owner_repo.split("/") repository = github.Repository(owner_repo) releases = [ release for release in repository.releases() if not release["prerelease"] and not release["draft"] ] # get the actual commit sha and date for each release release_commits = [_get_release_commit(repository, r["tag_name"]) for r in releases] release_commits = [repository.commits(ref=c["sha"]) for c in release_commits] release_dates = { r["tag_name"]: c["commit"]["committer"]["date"] for r, c in zip(releases, release_commits, strict=True) } date_since = None if isinstance(since, int): # only the latest 'since' releases (at most) will be included in summary if len(releases) > since: date_since = sorted(release_dates.values(), reverse=True)[since] elif since in release_dates: # only releases _after_ 'since' will be included in summary date_since = release_dates[since] else: raise Exception( "Requested repository info with since={since},".format(since=since) + "which is not and integer and is not one of the known releases" + "({releases})".format(releases=sorted(release_dates.keys())) ) release_info = [ {"release_tag": "", "release_tag_date": "", "commits": [], "merges": []} ] all_pull_requests = repository.pull_requests(state="all") all_pull_requests = {pr["number"]: pr for pr in all_pull_requests} commits = repository.commits( sha=repository.info["default_branch"], since=date_since ) if date_since is not None: commits = commits[:-1] # remove first commit, which was just the starting point for commit in commits: sha = commit["sha"] releases_at_commit = [ { "release_tag": release["tag_name"], "release_tag_date": release["published_at"], "commits": [], "merges": [], } for release in [ r for r, c in zip(releases, release_commits, strict=True) if c["sha"] == sha ] ] release_info += releases_at_commit release_info[-1]["commits"].append( { "sha": commit["sha"], "message": commit["commit"]["message"], "date": commit["commit"]["committer"]["date"], "author": commit["commit"]["author"]["name"], } ) match = re.match( r"Merge pull request #(?P<pr_number>.+) from (?P<branch>\S+)\n\n(?P<title>.+)", commit["commit"]["message"], ) if match: merge = match.groupdict() merge["pr_number"] = int(merge["pr_number"]) if merge["pr_number"] in all_pull_requests: merge["title"] = all_pull_requests[merge["pr_number"]]["title"].strip() release_info[-1]["merges"].append(merge) if len(release_info) > 1: last_tag = release_info[1]["release_tag"] last_tag_date = release_info[1]["release_tag_date"] else: last_tag = "" last_tag_date = "" branches = repository.branches() issues = [i for i in repository.issues() if "pull_request" not in i] pull_requests = [] for pr in repository.pull_requests(): pr_commits = api.get(pr["commits_url"]).json() date = pr_commits[-1]["commit"]["committer"]["date"] pull_requests.append( { "number": pr["number"], "url": pr["_links"]["html"]["href"], "title": pr["title"], "n_commits": len(pr_commits), "last_commit_date": date, } ) headers = {"Accept": "application/vnd.github.antiope-preview+json"} workflows = api.get( "/repos/{owner}/{repo}/actions/workflows".format(owner=owner, repo=repo), headers=headers, ).json() workflows = [ {k: w[k] for k in ["name", "badge_url"]} for w in workflows["workflows"] ] repo_info = { "owner": owner, "name": repo, "last_tag": last_tag, "last_tag_date": last_tag_date, "commits": len(release_info[0]["commits"]), "merges": len(release_info[0]["merges"]), "merge_info": release_info[0]["merges"], "release_info": release_info, "issues": len(issues), "n_pull_requests": len(pull_requests), "branches": len(branches), "pull_requests": pull_requests, "workflows": workflows, } if not include_commits: for r in repo_info["release_info"]: del r["commits"] if not include_unreleased_commits and len(repo_info["release_info"]) == 1: repo_info["commits"] = 0 repo_info["merges"] = 0 repo_info["merge_info"] = [] return repo_info _LAST_UPDATED_QUERY = jinja2.Template( """ { repository(name: "{{ name }}", owner: "{{ owner }}") { pushedAt updatedAt name owner { id } } } """ )
[docs] def repository_info_is_outdated(_, pkg_info): """ Cache update policy that returns True if the Github repository has been updated or pushed into. If the calling user has not write access to the cache directory, this function returns False, unless SKARE3_REPO_INFO_LATEST is set to "True". :param _: :param pkg_info: dict. As returned from :func:`~skare3_tools.packages.get_repository_info`. :return: """ update = os.environ.get("SKARE3_REPO_INFO_LATEST", "").lower() in ["true", "1"] if not dir_access_ok(CONFIG["data_dir"]) and not update: return False result = github.GITHUB_API_V4(_LAST_UPDATED_QUERY.render(**pkg_info)) result = result["data"]["repository"] outdated = ( pkg_info["pushed_at"] < result["pushedAt"] or pkg_info["updated_at"] < result["updatedAt"] ) return outdated
[docs] def get_repository_info(owner_repo, version="v4", **kwargs): """ Get information about a Github repository :param owner_repo: str the name of the repository, including owner, something like 'sot/skare3'. :param since: int or str the maximum number of releases to look back, or the release tag to look back to (not inclusive). :param include_unreleased_commits: bool whether to include commits and merges for repositories that have no release. This affects only top-level entries 'commits', 'merges', 'merge_info'. It is for backward compatibility with the dashboard. :param include_commits: bool whether to include commits in release_info. :param version: str Github API version to use. :param update: bool Force update of the cached info. By default updates only if pushed_at or updated_at change. :return: """ # the indirect call is to make sure the version argument is set at this point # otherwise, there are two caches if the version is explicitly set to the default value # (one where it is set and one where it is not) return _get_repository_info(owner_repo, version, **kwargs)
@json_cache( "pkg_repository_info", directory="pkg_info", update_policy=repository_info_is_outdated, ) def _get_repository_info(owner_repo, version, **kwargs): owner, name = owner_repo.split("/") if version == "v4": info = _get_repository_info_v4(owner_repo, **kwargs) else: info = _get_repository_info_v3(owner_repo, **kwargs) info["master_version"] = "" conda_info = get_conda_pkg_info(name, conda_channel="masters") if name.lower() in conda_info: info["master_version"] = conda_info[name.lower()][-1]["version"] return info get_repository_info.clear_cache = _get_repository_info.clear_cache get_repository_info.rm_cache_entry = _get_repository_info.rm_cache_entry
[docs] def get_repositories_info(repositories=None, version="v4", update=False): if repositories is None: repositories = [ p["repository"] for p in get_package_list() if p["owner"] in CONFIG["organizations"] ] repo_package_map = { p["repository"]: p["package"] for p in get_package_list() if p["repository"] } info = {"packages": []} meta_pkg_versions = { pkg: {r: "" for r in repositories} for pkg in ["ska3-flight", "ska3-matlab"] } for pkg in ["ska3-flight", "ska3-matlab"]: try: assert pkg in meta_pkg_versions conda_info = get_conda_pkg_info(pkg, conda_channel="main") if pkg not in conda_info: raise Exception(f"{pkg} package not found") conda_info = conda_info[pkg][-1] info[pkg] = conda_info["version"] versions = conda_info["depends"] for owner_repo in repositories: assert ( owner_repo in repo_package_map ), "Package {owner_repo} not in package map".format( owner_repo=owner_repo ) conda_pkg = repo_package_map[owner_repo] if conda_pkg in versions: assert owner_repo in meta_pkg_versions[pkg] meta_pkg_versions[pkg][owner_repo] = versions[conda_pkg] except NetworkException as e: logging.error(e) raise except Exception as e: logging.warning("Empty {pkg}: {t}: {e}".format(pkg=pkg, t=type(e), e=e)) for owner_repo in repositories: # print(owner_repo) try: repo_info = get_repository_info(owner_repo, version=version, update=update) repo_info["matlab"] = meta_pkg_versions["ska3-matlab"][owner_repo] repo_info["flight"] = meta_pkg_versions["ska3-flight"][owner_repo] info["packages"].append(repo_info) except Exception as e: logging.warning("Failed to get info on %s: %s", owner_repo, e) continue info.update({"time": datetime.datetime.now().isoformat()}) return info
def get_parser(): description = """ SkaRE3 Github information tool. This script queries Github and a few other sources to determine the status of all packages. """ parser = argparse.ArgumentParser(description=description) parser.add_argument( "-o", default="repository_info.json", help="Output file (default=repository_info.json)", ) parser.add_argument( "--token", help="Github token, or name of file that contains token" ) return parser def main(): args = get_parser().parse_args() github.init(token=args.token) info = get_repositories_info() if info: with open(args.o, "w") as f: json.dump(info, f, indent=2) if __name__ == "__main__": main()