Skip to content

GitHub

Repositories

examples/github/repositories.py
import itertools
import operator
import pathlib
from typing import Optional

from _client import github_commits, github_files, github_repositories, github_user  # see _client.py

import graphinate


def repo_graph_model():  # noqa: C901
    """
    Create a graph model for GitHub repositories.

    Returns:
        GraphModel: A graph model representing GitHub repositories with nodes and edges.
    """

    graph_model = graphinate.model(name='GitHub Repository Graph')

    @graph_model.edge()
    def github(user_id: Optional[str] = None,
               repository_id: Optional[str] = None,
               commit_id: Optional[str] = None,
               file_id: Optional[str] = None,
               **kwargs):
        user = github_user(user_id)
        for repo in github_repositories(user_id, repository_id):
            yield {'source': (user.login,), 'target': (user.login, repo.name)}
            for commit in github_commits(repo, commit_id):
                yield {
                    'source': (user.login, repo.name),
                    'target': (user.login, repo.name, commit.sha)
                }
                for file in github_files(commit, file_id):
                    yield {
                        'source': (user.login, repo.name, commit.sha),
                        'target': (user.login, repo.name, commit.sha, file.filename)
                    }

    user_node = graph_model.node(key=operator.attrgetter('login'),
                                 value=operator.attrgetter('raw_data'),
                                 label=operator.itemgetter('name'))

    repository_node = graph_model.node(parent_type='user',
                                       key=operator.attrgetter('name'),
                                       value=operator.attrgetter('raw_data'),
                                       label=operator.itemgetter('name'))

    def commit_label(commit):
        return commit['sha'][-7:]

    commit_node = graph_model.node(parent_type='repository',
                                   key=operator.attrgetter('sha'),
                                   value=operator.attrgetter('raw_data'),
                                   label=commit_label)

    file_node = graph_model.node(parent_type='commit',
                                 unique=True,
                                 key=operator.attrgetter('filename'),
                                 value=operator.attrgetter('raw_data'),
                                 label=operator.itemgetter('filename'))

    @user_node
    def user(user_id: Optional[str] = None, **kwargs):
        yield github_user(user_id)

    @repository_node
    def repository(user_id: Optional[str] = None,
                   repository_id: Optional[str] = None,
                   **kwargs):
        repos = github_repositories(user_id, repository_id)
        yield from repos

    @commit_node
    def commit(user_id: Optional[str] = None,
               repository_id: Optional[str] = None,
               commit_id: Optional[str] = None,
               **kwargs):
        for repo in github_repositories(user_id, repository_id):
            yield from github_commits(repo, commit_id)

    def file_type(user_id: Optional[str] = None,
                  repository_id: Optional[str] = None,
                  commit_id: Optional[str] = None,
                  file_type_id: Optional[str] = None,
                  **kwargs):
        def group_key(file):
            return pathlib.PurePath(file).suffix

        for repo in github_repositories(user_id, repository_id):
            for commit in github_commits(repo, commit_id):
                yield from ((k, list(g)) for k, g in
                            itertools.groupby(
                                sorted(github_files(commit),
                                       key=group_key), group_key
                            ))

    @file_node
    def file(user_id: Optional[str] = None,
             repository_id: Optional[str] = None,
             commit_id: Optional[str] = None,
             file_id: Optional[str] = None,
             **kwargs):
        for repo in github_repositories(user_id, repository_id):
            for commit in github_commits(repo, commit_id):
                yield from github_files(commit, file_id)

    return graph_model


if __name__ == '__main__':
    repo_model = repo_graph_model()

    params = {
        'user_id': 'erivlis',
        'repository_id': 'graphinate',
        # 'user_id': 'andybrewer',
        # 'repository_id': 'operation-go',
        # 'commit_id': None,
        # 'file_id': 'README.md',
        # 'user_id' "strawberry-graphql"
    }

    schema = graphinate.builders.GraphQLBuilder(repo_model).build(**params)
    graphinate.graphql(schema)
examples/github/requirements.txt
graphinate
PyGithub
examples/github/_client.py
import functools
import os
from collections.abc import Iterable
from typing import Optional, Union

# see requirements.txt
from github import Auth, Github
from github.AuthenticatedUser import AuthenticatedUser
from github.Commit import Commit
from github.File import File
from github.NamedUser import NamedUser
from github.Repository import Repository

# define a 'GITHUB_TOKEN' Env Var.
token = os.getenv('GITHUB_TOKEN')

# using an access token
auth = Auth.Token(token)

# Public Web GitHub
client = Github(auth=auth)


# or GitHub Enterprise with custom hostname
# g = Github(auth=auth, base_url='https://{hostname}/api/v3')


@functools.lru_cache
def github_user(user_id: Optional[str] = None) -> Union[NamedUser, AuthenticatedUser]:
    """
    Get the GitHub user object for the specified user ID or the authenticated user.

    Parameters:
        user_id (Optional[str]): The ID of the user to retrieve.
                                 If not provided, retrieve the authenticated user.

    Returns:
        Union[NamedUser, AuthenticatedUser]: The GitHub user object corresponding to the user ID provided,
                                             or the authenticated user if no user ID is specified.
    Note:
        This function requires authentication with a valid GitHub token.
    """
    user = client.get_user(user_id) if user_id else client.get_user()
    return user


@functools.lru_cache
def github_repositories(
        user_id: Optional[str] = None,
        repo_id: Optional[str] = None) -> Iterable[Repository]:
    """
    Get the GitHub repositories for the specified user ID or the authenticated user.

    Parameters:
        user_id (Optional[str]): The ID of the user whose repositories to retrieve.
                                 If not provided, retrieve repositories of the authenticated user.
        repo_id (Optional[str]): The ID of the repository to retrieve.
                                 If provided, only that repository will be returned.

    Returns:
        Iterable[Repository]:
        A list of GitHub repository objects corresponding to the user ID and/or repository ID provided.

    Note:
        This function requires authentication with a valid GitHub token.
    """

    user = github_user(user_id)
    if repo_id and (repo := user.get_repo(name=repo_id)):
        return [repo]
    else:
        return user.get_repos()


def github_commits(
        repo: Repository,
        commit_id: Optional[str] = None) -> Iterable[Commit]:
    """
    Retrieve commits from a GitHub repository.

    Parameters:
        repo (Repository): The GitHub repository object from which to retrieve commits.
        commit_id (str, optional): The ID of the commit to retrieve.
                                   If provided, only that commit will be returned.
                                   Defaults to None.

    Returns:
        Iterable[Commit]: An Iterable of Commit objects representing the commits in the repository.

    Example:
        To retrieve all commits from a repository:
        ```
        for commit in github_commits(repo):
            print(commit)
        ```

        To retrieve a specific commit by ID:
        ```
        for commit in github_commits(repo, commit_id='abcdef123456'):
            print(commit)
        ```

    Note:
        This function requires authentication with a valid GitHub token.
    """
    if commit_id and (commit := repo.get_commit(sha=commit_id)):
        yield commit
    else:
        yield from repo.get_commits()


def github_files(
        commit: Commit,
        file_id: Optional[str] = None) -> Iterable[File]:
    """
    Retrieves Files from a GitHub Commit

    Parameters:
        commit (Commit): A Commit object from the GitHub API.
        file_id (Optional[str]): An optional parameter specifying the filename to filter the files. Default is None.

    Returns:
        Iterable[File]: An Iterable of File objects based on the filtering criteria.

    Note:
        This function requires authentication with a valid GitHub token.
    """
    files: list[File] = commit.files
    if file_id:
        yield from [file for file in files if file.filename == file_id]
    else:
        yield from files

repo_graph

Followers

examples/github/followers.py
"""
Defines a function `followers_graph_model` that creates a graph model representing GitHub followers.
It recursively fetches followers of a given user up to a specified maximum depth.
The function yields edges between users in the graph.
"""

from typing import Optional

from _client import github_user  # see _client.py

import graphinate

DEPTH = 0


def followers_graph_model(max_depth: int = DEPTH):
    """
    Create a graph model representing GitHub followers.

    Args:
        max_depth (int): The maximum depth to fetch followers recursively (default is 0).

    Returns:
        GraphModel: A graph model representing GitHub followers.
    """

    graph_model = graphinate.model(name='Github Followers Graph')

    def _followers(user_id: Optional[str] = None, depth: int = 0, **kwargs):
        user = github_user(user_id)
        for follower in user.get_followers():
            yield {'source': user.login, 'target': follower.login}
            if depth < max_depth:
                yield from _followers(follower.login, depth=depth + 1, **kwargs)

    @graph_model.edge()
    def followed_by(user_id: Optional[str] = None, **kwargs):
        yield from _followers(user_id, **kwargs)

    return graph_model


if __name__ == '__main__':
    followers_model = followers_graph_model(max_depth=1)

    params = {
        'user_id': 'erivlis'
        # 'user_id': 'andybrewer'
        # 'user_id' "strawberry-graphql"
    }

    builder = graphinate.builders.GraphQLBuilder(followers_model, graph_type=graphinate.GraphType.DiGraph)
    schema = builder.build(default_node_attributes={'type': 'user'}, **params)
    graphinate.graphql(schema)
examples/github/requirements.txt
graphinate
PyGithub
examples/github/_client.py
import functools
import os
from collections.abc import Iterable
from typing import Optional, Union

# see requirements.txt
from github import Auth, Github
from github.AuthenticatedUser import AuthenticatedUser
from github.Commit import Commit
from github.File import File
from github.NamedUser import NamedUser
from github.Repository import Repository

# define a 'GITHUB_TOKEN' Env Var.
token = os.getenv('GITHUB_TOKEN')

# using an access token
auth = Auth.Token(token)

# Public Web GitHub
client = Github(auth=auth)


# or GitHub Enterprise with custom hostname
# g = Github(auth=auth, base_url='https://{hostname}/api/v3')


@functools.lru_cache
def github_user(user_id: Optional[str] = None) -> Union[NamedUser, AuthenticatedUser]:
    """
    Get the GitHub user object for the specified user ID or the authenticated user.

    Parameters:
        user_id (Optional[str]): The ID of the user to retrieve.
                                 If not provided, retrieve the authenticated user.

    Returns:
        Union[NamedUser, AuthenticatedUser]: The GitHub user object corresponding to the user ID provided,
                                             or the authenticated user if no user ID is specified.
    Note:
        This function requires authentication with a valid GitHub token.
    """
    user = client.get_user(user_id) if user_id else client.get_user()
    return user


@functools.lru_cache
def github_repositories(
        user_id: Optional[str] = None,
        repo_id: Optional[str] = None) -> Iterable[Repository]:
    """
    Get the GitHub repositories for the specified user ID or the authenticated user.

    Parameters:
        user_id (Optional[str]): The ID of the user whose repositories to retrieve.
                                 If not provided, retrieve repositories of the authenticated user.
        repo_id (Optional[str]): The ID of the repository to retrieve.
                                 If provided, only that repository will be returned.

    Returns:
        Iterable[Repository]:
        A list of GitHub repository objects corresponding to the user ID and/or repository ID provided.

    Note:
        This function requires authentication with a valid GitHub token.
    """

    user = github_user(user_id)
    if repo_id and (repo := user.get_repo(name=repo_id)):
        return [repo]
    else:
        return user.get_repos()


def github_commits(
        repo: Repository,
        commit_id: Optional[str] = None) -> Iterable[Commit]:
    """
    Retrieve commits from a GitHub repository.

    Parameters:
        repo (Repository): The GitHub repository object from which to retrieve commits.
        commit_id (str, optional): The ID of the commit to retrieve.
                                   If provided, only that commit will be returned.
                                   Defaults to None.

    Returns:
        Iterable[Commit]: An Iterable of Commit objects representing the commits in the repository.

    Example:
        To retrieve all commits from a repository:
        ```
        for commit in github_commits(repo):
            print(commit)
        ```

        To retrieve a specific commit by ID:
        ```
        for commit in github_commits(repo, commit_id='abcdef123456'):
            print(commit)
        ```

    Note:
        This function requires authentication with a valid GitHub token.
    """
    if commit_id and (commit := repo.get_commit(sha=commit_id)):
        yield commit
    else:
        yield from repo.get_commits()


def github_files(
        commit: Commit,
        file_id: Optional[str] = None) -> Iterable[File]:
    """
    Retrieves Files from a GitHub Commit

    Parameters:
        commit (Commit): A Commit object from the GitHub API.
        file_id (Optional[str]): An optional parameter specifying the filename to filter the files. Default is None.

    Returns:
        Iterable[File]: An Iterable of File objects based on the filtering criteria.

    Note:
        This function requires authentication with a valid GitHub token.
    """
    files: list[File] = commit.files
    if file_id:
        yield from [file for file in files if file.filename == file_id]
    else:
        yield from files