fix subject f string

This commit is contained in:
root
2025-01-10 21:40:35 +00:00
parent 1431837e47
commit 42c6d7a0db
46610 changed files with 4096513 additions and 148 deletions

View File

@@ -0,0 +1,25 @@
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
"""Import all submodules' main classes into the package space."""
__all__ = [
"IndexObject",
"Object",
"Blob",
"Commit",
"Submodule",
"UpdateProgress",
"RootModule",
"RootUpdateProgress",
"TagObject",
"Tree",
"TreeModifier",
]
from .base import IndexObject, Object
from .blob import Blob
from .commit import Commit
from .submodule import RootModule, RootUpdateProgress, Submodule, UpdateProgress
from .tag import TagObject
from .tree import Tree, TreeModifier

View File

@@ -0,0 +1,301 @@
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = ["Object", "IndexObject"]
import os.path as osp
import gitdb.typ as dbtyp
from git.exc import WorkTreeRepositoryUnsupported
from git.util import LazyMixin, bin_to_hex, join_path_native, stream_copy
from .util import get_object_type_by_name
# typing ------------------------------------------------------------------
from typing import Any, TYPE_CHECKING, Union
from git.types import AnyGitObject, GitObjectTypeString, PathLike
if TYPE_CHECKING:
from gitdb.base import OStream
from git.refs.reference import Reference
from git.repo import Repo
from .blob import Blob
from .submodule.base import Submodule
from .tree import Tree
IndexObjUnion = Union["Tree", "Blob", "Submodule"]
# --------------------------------------------------------------------------
class Object(LazyMixin):
"""Base class for classes representing git object types.
The following four leaf classes represent specific kinds of git objects:
* :class:`Blob <git.objects.blob.Blob>`
* :class:`Tree <git.objects.tree.Tree>`
* :class:`Commit <git.objects.commit.Commit>`
* :class:`TagObject <git.objects.tag.TagObject>`
See :manpage:`gitglossary(7)` on:
* "object": https://git-scm.com/docs/gitglossary#def_object
* "object type": https://git-scm.com/docs/gitglossary#def_object_type
* "blob": https://git-scm.com/docs/gitglossary#def_blob_object
* "tree object": https://git-scm.com/docs/gitglossary#def_tree_object
* "commit object": https://git-scm.com/docs/gitglossary#def_commit_object
* "tag object": https://git-scm.com/docs/gitglossary#def_tag_object
:note:
See the :class:`~git.types.AnyGitObject` union type of the four leaf subclasses
that represent actual git object types.
:note:
:class:`~git.objects.submodule.base.Submodule` is defined under the hierarchy
rooted at this :class:`Object` class, even though submodules are not really a
type of git object. (This also applies to its
:class:`~git.objects.submodule.root.RootModule` subclass.)
:note:
This :class:`Object` class should not be confused with :class:`object` (the root
of the class hierarchy in Python).
"""
NULL_HEX_SHA = "0" * 40
NULL_BIN_SHA = b"\0" * 20
TYPES = (
dbtyp.str_blob_type,
dbtyp.str_tree_type,
dbtyp.str_commit_type,
dbtyp.str_tag_type,
)
__slots__ = ("repo", "binsha", "size")
type: Union[GitObjectTypeString, None] = None
"""String identifying (a concrete :class:`Object` subtype for) a git object type.
The subtypes that this may name correspond to the kinds of git objects that exist,
i.e., the objects that may be present in a git repository.
:note:
Most subclasses represent specific types of git objects and override this class
attribute accordingly. This attribute is ``None`` in the :class:`Object` base
class, as well as the :class:`IndexObject` intermediate subclass, but never
``None`` in concrete leaf subclasses representing specific git object types.
:note:
See also :class:`~git.types.GitObjectTypeString`.
"""
def __init__(self, repo: "Repo", binsha: bytes) -> None:
"""Initialize an object by identifying it by its binary sha.
All keyword arguments will be set on demand if ``None``.
:param repo:
Repository this object is located in.
:param binsha:
20 byte SHA1
"""
super().__init__()
self.repo = repo
self.binsha = binsha
assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % (
binsha,
len(binsha),
)
@classmethod
def new(cls, repo: "Repo", id: Union[str, "Reference"]) -> AnyGitObject:
"""
:return:
New :class:`Object` instance of a type appropriate to the object type behind
`id`. The id of the newly created object will be a binsha even though the
input id may have been a `~git.refs.reference.Reference` or rev-spec.
:param id:
:class:`~git.refs.reference.Reference`, rev-spec, or hexsha.
:note:
This cannot be a ``__new__`` method as it would always call :meth:`__init__`
with the input id which is not necessarily a binsha.
"""
return repo.rev_parse(str(id))
@classmethod
def new_from_sha(cls, repo: "Repo", sha1: bytes) -> AnyGitObject:
"""
:return:
New object instance of a type appropriate to represent the given binary sha1
:param sha1:
20 byte binary sha1.
"""
if sha1 == cls.NULL_BIN_SHA:
# The NULL binsha is always the root commit.
return get_object_type_by_name(b"commit")(repo, sha1)
# END handle special case
oinfo = repo.odb.info(sha1)
inst = get_object_type_by_name(oinfo.type)(repo, oinfo.binsha)
inst.size = oinfo.size
return inst
def _set_cache_(self, attr: str) -> None:
"""Retrieve object information."""
if attr == "size":
oinfo = self.repo.odb.info(self.binsha)
self.size = oinfo.size # type: int
else:
super()._set_cache_(attr)
def __eq__(self, other: Any) -> bool:
""":return: ``True`` if the objects have the same SHA1"""
if not hasattr(other, "binsha"):
return False
return self.binsha == other.binsha
def __ne__(self, other: Any) -> bool:
""":return: ``True`` if the objects do not have the same SHA1"""
if not hasattr(other, "binsha"):
return True
return self.binsha != other.binsha
def __hash__(self) -> int:
""":return: Hash of our id allowing objects to be used in dicts and sets"""
return hash(self.binsha)
def __str__(self) -> str:
""":return: String of our SHA1 as understood by all git commands"""
return self.hexsha
def __repr__(self) -> str:
""":return: String with pythonic representation of our object"""
return '<git.%s "%s">' % (self.__class__.__name__, self.hexsha)
@property
def hexsha(self) -> str:
""":return: 40 byte hex version of our 20 byte binary sha"""
# b2a_hex produces bytes.
return bin_to_hex(self.binsha).decode("ascii")
@property
def data_stream(self) -> "OStream":
"""
:return:
File-object compatible stream to the uncompressed raw data of the object
:note:
Returned streams must be read in order.
"""
return self.repo.odb.stream(self.binsha)
def stream_data(self, ostream: "OStream") -> "Object":
"""Write our data directly to the given output stream.
:param ostream:
File-object compatible stream object.
:return:
self
"""
istream = self.repo.odb.stream(self.binsha)
stream_copy(istream, ostream)
return self
class IndexObject(Object):
"""Base for all objects that can be part of the index file.
The classes representing git object types that can be part of the index file are
:class:`~git.objects.tree.Tree and :class:`~git.objects.blob.Blob`. In addition,
:class:`~git.objects.submodule.base.Submodule`, which is not really a git object
type but can be part of an index file, is also a subclass.
"""
__slots__ = ("path", "mode")
# For compatibility with iterable lists.
_id_attribute_ = "path"
def __init__(
self,
repo: "Repo",
binsha: bytes,
mode: Union[None, int] = None,
path: Union[None, PathLike] = None,
) -> None:
"""Initialize a newly instanced :class:`IndexObject`.
:param repo:
The :class:`~git.repo.base.Repo` we are located in.
:param binsha:
20 byte sha1.
:param mode:
The stat-compatible file mode as :class:`int`.
Use the :mod:`stat` module to evaluate the information.
:param path:
The path to the file in the file system, relative to the git repository
root, like ``file.ext`` or ``folder/other.ext``.
:note:
Path may not be set if the index object has been created directly, as it
cannot be retrieved without knowing the parent tree.
"""
super().__init__(repo, binsha)
if mode is not None:
self.mode = mode
if path is not None:
self.path = path
def __hash__(self) -> int:
"""
:return:
Hash of our path as index items are uniquely identifiable by path, not by
their data!
"""
return hash(self.path)
def _set_cache_(self, attr: str) -> None:
if attr in IndexObject.__slots__:
# They cannot be retrieved later on (not without searching for them).
raise AttributeError(
"Attribute '%s' unset: path and mode attributes must have been set during %s object creation"
% (attr, type(self).__name__)
)
else:
super()._set_cache_(attr)
# END handle slot attribute
@property
def name(self) -> str:
""":return: Name portion of the path, effectively being the basename"""
return osp.basename(self.path)
@property
def abspath(self) -> PathLike:
R"""
:return:
Absolute path to this index object in the file system (as opposed to the
:attr:`path` field which is a path relative to the git repository).
The returned path will be native to the system and contains ``\`` on
Windows.
"""
if self.repo.working_tree_dir is not None:
return join_path_native(self.repo.working_tree_dir, self.path)
else:
raise WorkTreeRepositoryUnsupported("working_tree_dir was None or empty")

View File

@@ -0,0 +1,48 @@
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = ["Blob"]
from mimetypes import guess_type
import sys
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
from . import base
class Blob(base.IndexObject):
"""A Blob encapsulates a git blob object.
See :manpage:`gitglossary(7)` on "blob":
https://git-scm.com/docs/gitglossary#def_blob_object
"""
DEFAULT_MIME_TYPE = "text/plain"
type: Literal["blob"] = "blob"
# Valid blob modes
executable_mode = 0o100755
file_mode = 0o100644
link_mode = 0o120000
__slots__ = ()
@property
def mime_type(self) -> str:
"""
:return:
String describing the mime type of this file (based on the filename)
:note:
Defaults to ``text/plain`` in case the actual file type is unknown.
"""
guesses = None
if self.path:
guesses = guess_type(str(self.path))
return guesses and guesses[0] or self.DEFAULT_MIME_TYPE

View File

@@ -0,0 +1,899 @@
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = ["Commit"]
from collections import defaultdict
import datetime
from io import BytesIO
import logging
import os
import re
from subprocess import Popen, PIPE
import sys
from time import altzone, daylight, localtime, time, timezone
import warnings
from gitdb import IStream
from git.cmd import Git
from git.diff import Diffable
from git.util import Actor, Stats, finalize_process, hex_to_bin
from . import base
from .tree import Tree
from .util import (
Serializable,
TraversableIterableObj,
altz_to_utctz_str,
from_timestamp,
parse_actor_and_date,
parse_date,
)
# typing ------------------------------------------------------------------
from typing import (
Any,
Dict,
IO,
Iterator,
List,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
cast,
)
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
from git.types import PathLike
if TYPE_CHECKING:
from git.refs import SymbolicReference
from git.repo import Repo
# ------------------------------------------------------------------------
_logger = logging.getLogger(__name__)
class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
"""Wraps a git commit object.
See :manpage:`gitglossary(7)` on "commit object":
https://git-scm.com/docs/gitglossary#def_commit_object
:note:
This class will act lazily on some of its attributes and will query the value on
demand only if it involves calling the git binary.
"""
# ENVIRONMENT VARIABLES
# Read when creating new commits.
env_author_date = "GIT_AUTHOR_DATE"
env_committer_date = "GIT_COMMITTER_DATE"
# CONFIGURATION KEYS
conf_encoding = "i18n.commitencoding"
# INVARIANTS
default_encoding = "UTF-8"
type: Literal["commit"] = "commit"
__slots__ = (
"tree",
"author",
"authored_date",
"author_tz_offset",
"committer",
"committed_date",
"committer_tz_offset",
"message",
"parents",
"encoding",
"gpgsig",
)
_id_attribute_ = "hexsha"
parents: Sequence["Commit"]
def __init__(
self,
repo: "Repo",
binsha: bytes,
tree: Union[Tree, None] = None,
author: Union[Actor, None] = None,
authored_date: Union[int, None] = None,
author_tz_offset: Union[None, float] = None,
committer: Union[Actor, None] = None,
committed_date: Union[int, None] = None,
committer_tz_offset: Union[None, float] = None,
message: Union[str, bytes, None] = None,
parents: Union[Sequence["Commit"], None] = None,
encoding: Union[str, None] = None,
gpgsig: Union[str, None] = None,
) -> None:
"""Instantiate a new :class:`Commit`. All keyword arguments taking ``None`` as
default will be implicitly set on first query.
:param binsha:
20 byte sha1.
:param tree:
A :class:`~git.objects.tree.Tree` object.
:param author:
The author :class:`~git.util.Actor` object.
:param authored_date: int_seconds_since_epoch
The authored DateTime - use :func:`time.gmtime` to convert it into a
different format.
:param author_tz_offset: int_seconds_west_of_utc
The timezone that the `authored_date` is in.
:param committer:
The committer string, as an :class:`~git.util.Actor` object.
:param committed_date: int_seconds_since_epoch
The committed DateTime - use :func:`time.gmtime` to convert it into a
different format.
:param committer_tz_offset: int_seconds_west_of_utc
The timezone that the `committed_date` is in.
:param message: string
The commit message.
:param encoding: string
Encoding of the message, defaults to UTF-8.
:param parents:
List or tuple of :class:`Commit` objects which are our parent(s) in the
commit dependency graph.
:return:
:class:`Commit`
:note:
Timezone information is in the same format and in the same sign as what
:func:`time.altzone` returns. The sign is inverted compared to git's UTC
timezone.
"""
super().__init__(repo, binsha)
self.binsha = binsha
if tree is not None:
assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
if tree is not None:
self.tree = tree
if author is not None:
self.author = author
if authored_date is not None:
self.authored_date = authored_date
if author_tz_offset is not None:
self.author_tz_offset = author_tz_offset
if committer is not None:
self.committer = committer
if committed_date is not None:
self.committed_date = committed_date
if committer_tz_offset is not None:
self.committer_tz_offset = committer_tz_offset
if message is not None:
self.message = message
if parents is not None:
self.parents = parents
if encoding is not None:
self.encoding = encoding
if gpgsig is not None:
self.gpgsig = gpgsig
@classmethod
def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]:
return tuple(commit.parents)
@classmethod
def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes:
"""Calculate the sha of a commit.
:param repo:
:class:`~git.repo.base.Repo` object the commit should be part of.
:param commit:
:class:`Commit` object for which to generate the sha.
"""
stream = BytesIO()
commit._serialize(stream)
streamlen = stream.tell()
stream.seek(0)
istream = repo.odb.store(IStream(cls.type, streamlen, stream))
return istream.binsha
def replace(self, **kwargs: Any) -> "Commit":
"""Create new commit object from an existing commit object.
Any values provided as keyword arguments will replace the corresponding
attribute in the new object.
"""
attrs = {k: getattr(self, k) for k in self.__slots__}
for attrname in kwargs:
if attrname not in self.__slots__:
raise ValueError("invalid attribute name")
attrs.update(kwargs)
new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs)
new_commit.binsha = self._calculate_sha_(self.repo, new_commit)
return new_commit
def _set_cache_(self, attr: str) -> None:
if attr in Commit.__slots__:
# Read the data in a chunk, its faster - then provide a file wrapper.
_binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha)
self._deserialize(BytesIO(stream.read()))
else:
super()._set_cache_(attr)
# END handle attrs
@property
def authored_datetime(self) -> datetime.datetime:
return from_timestamp(self.authored_date, self.author_tz_offset)
@property
def committed_datetime(self) -> datetime.datetime:
return from_timestamp(self.committed_date, self.committer_tz_offset)
@property
def summary(self) -> Union[str, bytes]:
""":return: First line of the commit message"""
if isinstance(self.message, str):
return self.message.split("\n", 1)[0]
else:
return self.message.split(b"\n", 1)[0]
def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int:
"""Count the number of commits reachable from this commit.
:param paths:
An optional path or a list of paths restricting the return value to commits
actually containing the paths.
:param kwargs:
Additional options to be passed to :manpage:`git-rev-list(1)`. They must not
alter the output style of the command, or parsing will yield incorrect
results.
:return:
An int defining the number of reachable commits
"""
# Yes, it makes a difference whether empty paths are given or not in our case as
# the empty paths version will ignore merge commits for some reason.
if paths:
return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines())
return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
@property
def name_rev(self) -> str:
"""
:return:
String describing the commits hex sha based on the closest
`~git.refs.reference.Reference`.
:note:
Mostly useful for UI purposes.
"""
return self.repo.git.name_rev(self)
@classmethod
def iter_items(
cls,
repo: "Repo",
rev: Union[str, "Commit", "SymbolicReference"],
paths: Union[PathLike, Sequence[PathLike]] = "",
**kwargs: Any,
) -> Iterator["Commit"]:
R"""Find all commits matching the given criteria.
:param repo:
The :class:`~git.repo.base.Repo`.
:param rev:
Revision specifier. See :manpage:`git-rev-parse(1)` for viable options.
:param paths:
An optional path or list of paths. If set only :class:`Commit`\s that
include the path or paths will be considered.
:param kwargs:
Optional keyword arguments to :manpage:`git-rev-list(1)` where:
* ``max_count`` is the maximum number of commits to fetch.
* ``skip`` is the number of commits to skip.
* ``since`` selects all commits since some date, e.g. ``"1970-01-01"``.
:return:
Iterator yielding :class:`Commit` items.
"""
if "pretty" in kwargs:
raise ValueError("--pretty cannot be used as parsing expects single sha's only")
# END handle pretty
# Use -- in all cases, to prevent possibility of ambiguous arguments.
# See https://github.com/gitpython-developers/GitPython/issues/264.
args_list: List[PathLike] = ["--"]
if paths:
paths_tup: Tuple[PathLike, ...]
if isinstance(paths, (str, os.PathLike)):
paths_tup = (paths,)
else:
paths_tup = tuple(paths)
args_list.extend(paths_tup)
# END if paths
proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)
def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]:
R"""Iterate _all_ parents of this commit.
:param paths:
Optional path or list of paths limiting the :class:`Commit`\s to those that
contain at least one of the paths.
:param kwargs:
All arguments allowed by :manpage:`git-rev-list(1)`.
:return:
Iterator yielding :class:`Commit` objects which are parents of ``self``
"""
# skip ourselves
skip = kwargs.get("skip", 1)
if skip == 0: # skip ourselves
skip = 1
kwargs["skip"] = skip
return self.iter_items(self.repo, self, paths, **kwargs)
@property
def stats(self) -> Stats:
"""Create a git stat from changes between this commit and its first parent
or from all changes done if this is the very first commit.
:return:
:class:`Stats`
"""
if not self.parents:
text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, no_renames=True, root=True)
text2 = ""
for line in text.splitlines()[1:]:
(insertions, deletions, filename) = line.split("\t")
text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename)
text = text2
else:
text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True)
return Stats._list_from_string(self.repo, text)
@property
def trailers(self) -> Dict[str, str]:
"""Deprecated. Get the trailers of the message as a dictionary.
:note:
This property is deprecated, please use either :attr:`trailers_list` or
:attr:`trailers_dict`.
:return:
Dictionary containing whitespace stripped trailer information.
Only contains the latest instance of each trailer key.
"""
warnings.warn(
"Commit.trailers is deprecated, use Commit.trailers_list or Commit.trailers_dict instead",
DeprecationWarning,
stacklevel=2,
)
return {k: v[0] for k, v in self.trailers_dict.items()}
@property
def trailers_list(self) -> List[Tuple[str, str]]:
"""Get the trailers of the message as a list.
Git messages can contain trailer information that are similar to :rfc:`822`
e-mail headers. See :manpage:`git-interpret-trailers(1)`.
This function calls ``git interpret-trailers --parse`` onto the message to
extract the trailer information, returns the raw trailer data as a list.
Valid message with trailer::
Subject line
some body information
another information
key1: value1.1
key1: value1.2
key2 : value 2 with inner spaces
Returned list will look like this::
[
("key1", "value1.1"),
("key1", "value1.2"),
("key2", "value 2 with inner spaces"),
]
:return:
List containing key-value tuples of whitespace stripped trailer information.
"""
cmd = ["git", "interpret-trailers", "--parse"]
proc: Git.AutoInterrupt = self.repo.git.execute( # type: ignore[call-overload]
cmd,
as_process=True,
istream=PIPE,
)
trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8")
trailer = trailer.strip()
if not trailer:
return []
trailer_list = []
for t in trailer.split("\n"):
key, val = t.split(":", 1)
trailer_list.append((key.strip(), val.strip()))
return trailer_list
@property
def trailers_dict(self) -> Dict[str, List[str]]:
"""Get the trailers of the message as a dictionary.
Git messages can contain trailer information that are similar to :rfc:`822`
e-mail headers. See :manpage:`git-interpret-trailers(1)`.
This function calls ``git interpret-trailers --parse`` onto the message to
extract the trailer information. The key value pairs are stripped of leading and
trailing whitespaces before they get saved into a dictionary.
Valid message with trailer::
Subject line
some body information
another information
key1: value1.1
key1: value1.2
key2 : value 2 with inner spaces
Returned dictionary will look like this::
{
"key1": ["value1.1", "value1.2"],
"key2": ["value 2 with inner spaces"],
}
:return:
Dictionary containing whitespace stripped trailer information, mapping
trailer keys to a list of their corresponding values.
"""
d = defaultdict(list)
for key, val in self.trailers_list:
d[key].append(val)
return dict(d)
@classmethod
def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]:
"""Parse out commit information into a list of :class:`Commit` objects.
We expect one line per commit, and parse the actual commit information directly
from our lighting fast object database.
:param proc:
:manpage:`git-rev-list(1)` process instance - one sha per line.
:return:
Iterator supplying :class:`Commit` objects
"""
# def is_proc(inp) -> TypeGuard[Popen]:
# return hasattr(proc_or_stream, 'wait') and not hasattr(proc_or_stream, 'readline')
# def is_stream(inp) -> TypeGuard[IO]:
# return hasattr(proc_or_stream, 'readline')
if hasattr(proc_or_stream, "wait"):
proc_or_stream = cast(Popen, proc_or_stream)
if proc_or_stream.stdout is not None:
stream = proc_or_stream.stdout
elif hasattr(proc_or_stream, "readline"):
proc_or_stream = cast(IO, proc_or_stream) # type: ignore[redundant-cast]
stream = proc_or_stream
readline = stream.readline
while True:
line = readline()
if not line:
break
hexsha = line.strip()
if len(hexsha) > 40:
# Split additional information, as returned by bisect for instance.
hexsha, _ = line.split(None, 1)
# END handle extra info
assert len(hexsha) == 40, "Invalid line: %s" % hexsha
yield cls(repo, hex_to_bin(hexsha))
# END for each line in stream
# TODO: Review this - it seems process handling got a bit out of control due to
# many developers trying to fix the open file handles issue.
if hasattr(proc_or_stream, "wait"):
proc_or_stream = cast(Popen, proc_or_stream)
finalize_process(proc_or_stream)
@classmethod
def create_from_tree(
cls,
repo: "Repo",
tree: Union[Tree, str],
message: str,
parent_commits: Union[None, List["Commit"]] = None,
head: bool = False,
author: Union[None, Actor] = None,
committer: Union[None, Actor] = None,
author_date: Union[None, str, datetime.datetime] = None,
commit_date: Union[None, str, datetime.datetime] = None,
) -> "Commit":
"""Commit the given tree, creating a :class:`Commit` object.
:param repo:
:class:`~git.repo.base.Repo` object the commit should be part of.
:param tree:
:class:`~git.objects.tree.Tree` object or hex or bin sha.
The tree of the new commit.
:param message:
Commit message. It may be an empty string if no message is provided. It will
be converted to a string, in any case.
:param parent_commits:
Optional :class:`Commit` objects to use as parents for the new commit. If
empty list, the commit will have no parents at all and become a root commit.
If ``None``, the current head commit will be the parent of the new commit
object.
:param head:
If ``True``, the HEAD will be advanced to the new commit automatically.
Otherwise the HEAD will remain pointing on the previous commit. This could
lead to undesired results when diffing files.
:param author:
The name of the author, optional.
If unset, the repository configuration is used to obtain this value.
:param committer:
The name of the committer, optional.
If unset, the repository configuration is used to obtain this value.
:param author_date:
The timestamp for the author field.
:param commit_date:
The timestamp for the committer field.
:return:
:class:`Commit` object representing the new commit.
:note:
Additional information about the committer and author are taken from the
environment or from the git configuration. See :manpage:`git-commit-tree(1)`
for more information.
"""
if parent_commits is None:
try:
parent_commits = [repo.head.commit]
except ValueError:
# Empty repositories have no head commit.
parent_commits = []
# END handle parent commits
else:
for p in parent_commits:
if not isinstance(p, cls):
raise ValueError(f"Parent commit '{p!r}' must be of type {cls}")
# END check parent commit types
# END if parent commits are unset
# Retrieve all additional information, create a commit object, and serialize it.
# Generally:
# * Environment variables override configuration values.
# * Sensible defaults are set according to the git documentation.
# COMMITTER AND AUTHOR INFO
cr = repo.config_reader()
env = os.environ
committer = committer or Actor.committer(cr)
author = author or Actor.author(cr)
# PARSE THE DATES
unix_time = int(time())
is_dst = daylight and localtime().tm_isdst > 0
offset = altzone if is_dst else timezone
author_date_str = env.get(cls.env_author_date, "")
if author_date:
author_time, author_offset = parse_date(author_date)
elif author_date_str:
author_time, author_offset = parse_date(author_date_str)
else:
author_time, author_offset = unix_time, offset
# END set author time
committer_date_str = env.get(cls.env_committer_date, "")
if commit_date:
committer_time, committer_offset = parse_date(commit_date)
elif committer_date_str:
committer_time, committer_offset = parse_date(committer_date_str)
else:
committer_time, committer_offset = unix_time, offset
# END set committer time
# Assume UTF-8 encoding.
enc_section, enc_option = cls.conf_encoding.split(".")
conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding)
if not isinstance(conf_encoding, str):
raise TypeError("conf_encoding could not be coerced to str")
# If the tree is no object, make sure we create one - otherwise the created
# commit object is invalid.
if isinstance(tree, str):
tree = repo.tree(tree)
# END tree conversion
# CREATE NEW COMMIT
new_commit = cls(
repo,
cls.NULL_BIN_SHA,
tree,
author,
author_time,
author_offset,
committer,
committer_time,
committer_offset,
message,
parent_commits,
conf_encoding,
)
new_commit.binsha = cls._calculate_sha_(repo, new_commit)
if head:
# Need late import here, importing git at the very beginning throws as
# well...
import git.refs
try:
repo.head.set_commit(new_commit, logmsg=message)
except ValueError:
# head is not yet set to the ref our HEAD points to.
# Happens on first commit.
master = git.refs.Head.create(
repo,
repo.head.ref,
new_commit,
logmsg="commit (initial): %s" % message,
)
repo.head.set_reference(master, logmsg="commit: Switching to %s" % master)
# END handle empty repositories
# END advance head handling
return new_commit
# { Serializable Implementation
def _serialize(self, stream: BytesIO) -> "Commit":
write = stream.write
write(("tree %s\n" % self.tree).encode("ascii"))
for p in self.parents:
write(("parent %s\n" % p).encode("ascii"))
a = self.author
aname = a.name
c = self.committer
fmt = "%s %s <%s> %s %s\n"
write(
(
fmt
% (
"author",
aname,
a.email,
self.authored_date,
altz_to_utctz_str(self.author_tz_offset),
)
).encode(self.encoding)
)
# Encode committer.
aname = c.name
write(
(
fmt
% (
"committer",
aname,
c.email,
self.committed_date,
altz_to_utctz_str(self.committer_tz_offset),
)
).encode(self.encoding)
)
if self.encoding != self.default_encoding:
write(("encoding %s\n" % self.encoding).encode("ascii"))
try:
if self.__getattribute__("gpgsig"):
write(b"gpgsig")
for sigline in self.gpgsig.rstrip("\n").split("\n"):
write((" " + sigline + "\n").encode("ascii"))
except AttributeError:
pass
write(b"\n")
# Write plain bytes, be sure its encoded according to our encoding.
if isinstance(self.message, str):
write(self.message.encode(self.encoding))
else:
write(self.message)
# END handle encoding
return self
def _deserialize(self, stream: BytesIO) -> "Commit":
readline = stream.readline
self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "")
self.parents = []
next_line = None
while True:
parent_line = readline()
if not parent_line.startswith(b"parent"):
next_line = parent_line
break
# END abort reading parents
self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))))
# END for each parent line
self.parents = tuple(self.parents)
# We don't know actual author encoding before we have parsed it, so keep the
# lines around.
author_line = next_line
committer_line = readline()
# We might run into one or more mergetag blocks, skip those for now.
next_line = readline()
while next_line.startswith(b"mergetag "):
next_line = readline()
while next_line.startswith(b" "):
next_line = readline()
# END skip mergetags
# Now we can have the encoding line, or an empty line followed by the optional
# message.
self.encoding = self.default_encoding
self.gpgsig = ""
# Read headers.
enc = next_line
buf = enc.strip()
while buf:
if buf[0:10] == b"encoding ":
self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore")
elif buf[0:7] == b"gpgsig ":
sig = buf[buf.find(b" ") + 1 :] + b"\n"
is_next_header = False
while True:
sigbuf = readline()
if not sigbuf:
break
if sigbuf[0:1] != b" ":
buf = sigbuf.strip()
is_next_header = True
break
sig += sigbuf[1:]
# END read all signature
self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore")
if is_next_header:
continue
buf = readline().strip()
# Decode the author's name.
try:
(
self.author,
self.authored_date,
self.author_tz_offset,
) = parse_actor_and_date(author_line.decode(self.encoding, "replace"))
except UnicodeDecodeError:
_logger.error(
"Failed to decode author line '%s' using encoding %s",
author_line,
self.encoding,
exc_info=True,
)
try:
(
self.committer,
self.committed_date,
self.committer_tz_offset,
) = parse_actor_and_date(committer_line.decode(self.encoding, "replace"))
except UnicodeDecodeError:
_logger.error(
"Failed to decode committer line '%s' using encoding %s",
committer_line,
self.encoding,
exc_info=True,
)
# END handle author's encoding
# A stream from our data simply gives us the plain message.
# The end of our message stream is marked with a newline that we strip.
self.message = stream.read()
try:
self.message = self.message.decode(self.encoding, "replace")
except UnicodeDecodeError:
_logger.error(
"Failed to decode message '%s' using encoding %s",
self.message,
self.encoding,
exc_info=True,
)
# END exception handling
return self
# } END serializable implementation
@property
def co_authors(self) -> List[Actor]:
"""Search the commit message for any co-authors of this commit.
Details on co-authors:
https://github.blog/2018-01-29-commit-together-with-co-authors/
:return:
List of co-authors for this commit (as :class:`~git.util.Actor` objects).
"""
co_authors = []
if self.message:
results = re.findall(
r"^Co-authored-by: (.*) <(.*?)>$",
self.message,
re.MULTILINE,
)
for author in results:
co_authors.append(Actor(*author))
return co_authors

View File

@@ -0,0 +1,281 @@
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
"""Functions that are supposed to be as fast as possible."""
__all__ = [
"tree_to_stream",
"tree_entries_from_data",
"traverse_trees_recursive",
"traverse_tree_recursive",
]
from stat import S_ISDIR
from git.compat import safe_decode, defenc
# typing ----------------------------------------------
from typing import (
Callable,
List,
MutableSequence,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
overload,
)
if TYPE_CHECKING:
from _typeshed import ReadableBuffer
from git import GitCmdObjectDB
EntryTup = Tuple[bytes, int, str] # Same as TreeCacheTup in tree.py.
EntryTupOrNone = Union[EntryTup, None]
# ---------------------------------------------------
def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None:
"""Write the given list of entries into a stream using its ``write`` method.
:param entries:
**Sorted** list of tuples with (binsha, mode, name).
:param write:
A ``write`` method which takes a data string.
"""
ord_zero = ord("0")
bit_mask = 7 # 3 bits set.
for binsha, mode, name in entries:
mode_str = b""
for i in range(6):
mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str
# END for each 8 octal value
# git slices away the first octal if it's zero.
if mode_str[0] == ord_zero:
mode_str = mode_str[1:]
# END save a byte
# Here it comes: If the name is actually unicode, the replacement below will not
# work as the binsha is not part of the ascii unicode encoding - hence we must
# convert to an UTF-8 string for it to work properly. According to my tests,
# this is exactly what git does, that is it just takes the input literally,
# which appears to be UTF-8 on linux.
if isinstance(name, str):
name_bytes = name.encode(defenc)
else:
name_bytes = name # type: ignore[unreachable] # check runtime types - is always str?
write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha)))
# END for each item
def tree_entries_from_data(data: bytes) -> List[EntryTup]:
"""Read the binary representation of a tree and returns tuples of
:class:`~git.objects.tree.Tree` items.
:param data:
Data block with tree data (as bytes).
:return:
list(tuple(binsha, mode, tree_relative_path), ...)
"""
ord_zero = ord("0")
space_ord = ord(" ")
len_data = len(data)
i = 0
out = []
while i < len_data:
mode = 0
# Read Mode
# Some git versions truncate the leading 0, some don't.
# The type will be extracted from the mode later.
while data[i] != space_ord:
# Move existing mode integer up one level being 3 bits and add the actual
# ordinal value of the character.
mode = (mode << 3) + (data[i] - ord_zero)
i += 1
# END while reading mode
# Byte is space now, skip it.
i += 1
# Parse name, it is NULL separated.
ns = i
while data[i] != 0:
i += 1
# END while not reached NULL
# Default encoding for strings in git is UTF-8.
# Only use the respective unicode object if the byte stream was encoded.
name_bytes = data[ns:i]
name = safe_decode(name_bytes)
# Byte is NULL, get next 20.
i += 1
sha = data[i : i + 20]
i = i + 20
out.append((sha, mode, name))
# END for each byte in data stream
return out
def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone:
"""Return data entry matching the given name and tree mode or ``None``.
Before the item is returned, the respective data item is set None in the `tree_data`
list to mark it done.
"""
try:
item = tree_data[start_at]
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
tree_data[start_at] = None
return item
except IndexError:
pass
# END exception handling
for index, item in enumerate(tree_data):
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
tree_data[index] = None
return item
# END if item matches
# END for each item
return None
@overload
def _to_full_path(item: None, path_prefix: str) -> None: ...
@overload
def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: ...
def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone:
"""Rebuild entry with given path prefix."""
if not item:
return item
return (item[0], item[1], path_prefix + item[2])
def traverse_trees_recursive(
odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str
) -> List[Tuple[EntryTupOrNone, ...]]:
"""
:return:
List of list with entries according to the given binary tree-shas.
The result is encoded in a list
of n tuple|None per blob/commit, (n == len(tree_shas)), where:
* [0] == 20 byte sha
* [1] == mode as int
* [2] == path relative to working tree root
The entry tuple is ``None`` if the respective blob/commit did not exist in the
given tree.
:param tree_shas:
Iterable of shas pointing to trees. All trees must be on the same level.
A tree-sha may be ``None``, in which case ``None``.
:param path_prefix:
A prefix to be added to the returned paths on this level.
Set it ``""`` for the first iteration.
:note:
The ordering of the returned items will be partially lost.
"""
trees_data: List[List[EntryTupOrNone]] = []
nt = len(tree_shas)
for tree_sha in tree_shas:
if tree_sha is None:
data: List[EntryTupOrNone] = []
else:
# Make new list for typing as list invariant.
data = list(tree_entries_from_data(odb.stream(tree_sha).read()))
# END handle muted trees
trees_data.append(data)
# END for each sha to get data for
out: List[Tuple[EntryTupOrNone, ...]] = []
# Find all matching entries and recursively process them together if the match is a
# tree. If the match is a non-tree item, put it into the result.
# Processed items will be set None.
for ti, tree_data in enumerate(trees_data):
for ii, item in enumerate(tree_data):
if not item:
continue
# END skip already done items
entries: List[EntryTupOrNone]
entries = [None for _ in range(nt)]
entries[ti] = item
_sha, mode, name = item
is_dir = S_ISDIR(mode) # Type mode bits
# Find this item in all other tree data items.
# Wrap around, but stop one before our current index, hence ti+nt, not
# ti+1+nt.
for tio in range(ti + 1, ti + nt):
tio = tio % nt
entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
# END for each other item data
# If we are a directory, enter recursion.
if is_dir:
out.extend(
traverse_trees_recursive(
odb,
[((ei and ei[0]) or None) for ei in entries],
path_prefix + name + "/",
)
)
else:
out.append(tuple(_to_full_path(e, path_prefix) for e in entries))
# END handle recursion
# Finally mark it done.
tree_data[ii] = None
# END for each item
# We are done with one tree, set all its data empty.
del tree_data[:]
# END for each tree_data chunk
return out
def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
"""
:return:
List of entries of the tree pointed to by the binary `tree_sha`.
An entry has the following format:
* [0] 20 byte sha
* [1] mode as int
* [2] path relative to the repository
:param path_prefix:
Prefix to prepend to the front of all returned paths.
"""
entries = []
data = tree_entries_from_data(odb.stream(tree_sha).read())
# Unpacking/packing is faster than accessing individual items.
for sha, mode, name in data:
if S_ISDIR(mode):
entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/"))
else:
entries.append((sha, mode, path_prefix + name))
# END for each item
return entries

View File

@@ -0,0 +1,7 @@
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = ["Submodule", "UpdateProgress", "RootModule", "RootUpdateProgress"]
from .base import Submodule, UpdateProgress
from .root import RootModule, RootUpdateProgress

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,467 @@
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = ["RootModule", "RootUpdateProgress"]
import logging
import git
from git.exc import InvalidGitRepositoryError
from .base import Submodule, UpdateProgress
from .util import find_first_remote_branch
# typing -------------------------------------------------------------------
from typing import TYPE_CHECKING, Union
from git.types import Commit_ish
if TYPE_CHECKING:
from git.repo import Repo
from git.util import IterableList
# ----------------------------------------------------------------------------
_logger = logging.getLogger(__name__)
class RootUpdateProgress(UpdateProgress):
"""Utility class which adds more opcodes to
:class:`~git.objects.submodule.base.UpdateProgress`."""
REMOVE, PATHCHANGE, BRANCHCHANGE, URLCHANGE = [
1 << x for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4)
]
_num_op_codes = UpdateProgress._num_op_codes + 4
__slots__ = ()
BEGIN = RootUpdateProgress.BEGIN
END = RootUpdateProgress.END
REMOVE = RootUpdateProgress.REMOVE
BRANCHCHANGE = RootUpdateProgress.BRANCHCHANGE
URLCHANGE = RootUpdateProgress.URLCHANGE
PATHCHANGE = RootUpdateProgress.PATHCHANGE
class RootModule(Submodule):
"""A (virtual) root of all submodules in the given repository.
This can be used to more easily traverse all submodules of the
superproject (master repository).
"""
__slots__ = ()
k_root_name = "__ROOT__"
def __init__(self, repo: "Repo") -> None:
# repo, binsha, mode=None, path=None, name = None, parent_commit=None, url=None, ref=None)
super().__init__(
repo,
binsha=self.NULL_BIN_SHA,
mode=self.k_default_mode,
path="",
name=self.k_root_name,
parent_commit=repo.head.commit,
url="",
branch_path=git.Head.to_full_path(self.k_head_default),
)
def _clear_cache(self) -> None:
"""May not do anything."""
pass
# { Interface
def update( # type: ignore[override]
self,
previous_commit: Union[Commit_ish, str, None] = None,
recursive: bool = True,
force_remove: bool = False,
init: bool = True,
to_latest_revision: bool = False,
progress: Union[None, "RootUpdateProgress"] = None,
dry_run: bool = False,
force_reset: bool = False,
keep_going: bool = False,
) -> "RootModule":
"""Update the submodules of this repository to the current HEAD commit.
This method behaves smartly by determining changes of the path of a submodule's
repository, next to changes to the to-be-checked-out commit or the branch to be
checked out. This works if the submodule's ID does not change.
Additionally it will detect addition and removal of submodules, which will be
handled gracefully.
:param previous_commit:
If set to a commit-ish, the commit we should use as the previous commit the
HEAD pointed to before it was set to the commit it points to now.
If ``None``, it defaults to ``HEAD@{1}`` otherwise.
:param recursive:
If ``True``, the children of submodules will be updated as well using the
same technique.
:param force_remove:
If submodules have been deleted, they will be forcibly removed. Otherwise
the update may fail if a submodule's repository cannot be deleted as changes
have been made to it.
(See :meth:`Submodule.update <git.objects.submodule.base.Submodule.update>`
for more information.)
:param init:
If we encounter a new module which would need to be initialized, then do it.
:param to_latest_revision:
If ``True``, instead of checking out the revision pointed to by this
submodule's sha, the checked out tracking branch will be merged with the
latest remote branch fetched from the repository's origin.
Unless `force_reset` is specified, a local tracking branch will never be
reset into its past, therefore the remote branch must be in the future for
this to have an effect.
:param force_reset:
If ``True``, submodules may checkout or reset their branch even if the
repository has pending changes that would be overwritten, or if the local
tracking branch is in the future of the remote tracking branch and would be
reset into its past.
:param progress:
:class:`RootUpdateProgress` instance, or ``None`` if no progress should be
sent.
:param dry_run:
If ``True``, operations will not actually be performed. Progress messages
will change accordingly to indicate the WOULD DO state of the operation.
:param keep_going:
If ``True``, we will ignore but log all errors, and keep going recursively.
Unless `dry_run` is set as well, `keep_going` could cause
subsequent/inherited errors you wouldn't see otherwise.
In conjunction with `dry_run`, this can be useful to anticipate all errors
when updating submodules.
:return:
self
"""
if self.repo.bare:
raise InvalidGitRepositoryError("Cannot update submodules in bare repositories")
# END handle bare
if progress is None:
progress = RootUpdateProgress()
# END ensure progress is set
prefix = ""
if dry_run:
prefix = "DRY-RUN: "
repo = self.repo
try:
# SETUP BASE COMMIT
###################
cur_commit = repo.head.commit
if previous_commit is None:
try:
previous_commit = repo.commit(repo.head.log_entry(-1).oldhexsha)
if previous_commit.binsha == previous_commit.NULL_BIN_SHA:
raise IndexError
# END handle initial commit
except IndexError:
# In new repositories, there is no previous commit.
previous_commit = cur_commit
# END exception handling
else:
previous_commit = repo.commit(previous_commit) # Obtain commit object.
# END handle previous commit
psms: "IterableList[Submodule]" = self.list_items(repo, parent_commit=previous_commit)
sms: "IterableList[Submodule]" = self.list_items(repo)
spsms = set(psms)
ssms = set(sms)
# HANDLE REMOVALS
###################
rrsm = spsms - ssms
len_rrsm = len(rrsm)
for i, rsm in enumerate(rrsm):
op = REMOVE
if i == 0:
op |= BEGIN
# END handle begin
# Fake it into thinking its at the current commit to allow deletion
# of previous module. Trigger the cache to be updated before that.
progress.update(
op,
i,
len_rrsm,
prefix + "Removing submodule %r at %s" % (rsm.name, rsm.abspath),
)
rsm._parent_commit = repo.head.commit
rsm.remove(
configuration=False,
module=True,
force=force_remove,
dry_run=dry_run,
)
if i == len_rrsm - 1:
op |= END
# END handle end
progress.update(op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name)
# END for each removed submodule
# HANDLE PATH RENAMES
#####################
# URL changes + branch changes.
csms = spsms & ssms
len_csms = len(csms)
for i, csm in enumerate(csms):
psm: "Submodule" = psms[csm.name]
sm: "Submodule" = sms[csm.name]
# PATH CHANGES
##############
if sm.path != psm.path and psm.module_exists():
progress.update(
BEGIN | PATHCHANGE,
i,
len_csms,
prefix + "Moving repository of submodule %r from %s to %s" % (sm.name, psm.abspath, sm.abspath),
)
# Move the module to the new path.
if not dry_run:
psm.move(sm.path, module=True, configuration=False)
# END handle dry_run
progress.update(
END | PATHCHANGE,
i,
len_csms,
prefix + "Done moving repository of submodule %r" % sm.name,
)
# END handle path changes
if sm.module_exists():
# HANDLE URL CHANGE
###################
if sm.url != psm.url:
# Add the new remote, remove the old one.
# This way, if the url just changes, the commits will not have
# to be re-retrieved.
nn = "__new_origin__"
smm = sm.module()
rmts = smm.remotes
# Don't do anything if we already have the url we search in
# place.
if len([r for r in rmts if r.url == sm.url]) == 0:
progress.update(
BEGIN | URLCHANGE,
i,
len_csms,
prefix + "Changing url of submodule %r from %s to %s" % (sm.name, psm.url, sm.url),
)
if not dry_run:
assert nn not in [r.name for r in rmts]
smr = smm.create_remote(nn, sm.url)
smr.fetch(progress=progress)
# If we have a tracking branch, it should be available
# in the new remote as well.
if len([r for r in smr.refs if r.remote_head == sm.branch_name]) == 0:
raise ValueError(
"Submodule branch named %r was not available in new submodule remote at %r"
% (sm.branch_name, sm.url)
)
# END head is not detached
# Now delete the changed one.
rmt_for_deletion = None
for remote in rmts:
if remote.url == psm.url:
rmt_for_deletion = remote
break
# END if urls match
# END for each remote
# If we didn't find a matching remote, but have exactly
# one, we can safely use this one.
if rmt_for_deletion is None:
if len(rmts) == 1:
rmt_for_deletion = rmts[0]
else:
# If we have not found any remote with the
# original URL we may not have a name. This is a
# special case, and its okay to fail here.
# Alternatively we could just generate a unique
# name and leave all existing ones in place.
raise InvalidGitRepositoryError(
"Couldn't find original remote-repo at url %r" % psm.url
)
# END handle one single remote
# END handle check we found a remote
orig_name = rmt_for_deletion.name
smm.delete_remote(rmt_for_deletion)
# NOTE: Currently we leave tags from the deleted remotes
# as well as separate tracking branches in the possibly
# totally changed repository (someone could have changed
# the url to another project). At some point, one might
# want to clean it up, but the danger is high to remove
# stuff the user has added explicitly.
# Rename the new remote back to what it was.
smr.rename(orig_name)
# Early on, we verified that the our current tracking
# branch exists in the remote. Now we have to ensure
# that the sha we point to is still contained in the new
# remote tracking branch.
smsha = sm.binsha
found = False
rref = smr.refs[self.branch_name]
for c in rref.commit.traverse():
if c.binsha == smsha:
found = True
break
# END traverse all commits in search for sha
# END for each commit
if not found:
# Adjust our internal binsha to use the one of the
# remote this way, it will be checked out in the
# next step. This will change the submodule relative
# to us, so the user will be able to commit the
# change easily.
_logger.warning(
"Current sha %s was not contained in the tracking\
branch at the new remote, setting it the the remote's tracking branch",
sm.hexsha,
)
sm.binsha = rref.commit.binsha
# END reset binsha
# NOTE: All checkout is performed by the base
# implementation of update.
# END handle dry_run
progress.update(
END | URLCHANGE,
i,
len_csms,
prefix + "Done adjusting url of submodule %r" % (sm.name),
)
# END skip remote handling if new url already exists in module
# END handle url
# HANDLE PATH CHANGES
#####################
if sm.branch_path != psm.branch_path:
# Finally, create a new tracking branch which tracks the new
# remote branch.
progress.update(
BEGIN | BRANCHCHANGE,
i,
len_csms,
prefix
+ "Changing branch of submodule %r from %s to %s"
% (sm.name, psm.branch_path, sm.branch_path),
)
if not dry_run:
smm = sm.module()
smmr = smm.remotes
# As the branch might not exist yet, we will have to fetch
# all remotes to be sure...
for remote in smmr:
remote.fetch(progress=progress)
# END for each remote
try:
tbr = git.Head.create(
smm,
sm.branch_name,
logmsg="branch: Created from HEAD",
)
except OSError:
# ...or reuse the existing one.
tbr = git.Head(smm, sm.branch_path)
# END ensure tracking branch exists
tbr.set_tracking_branch(find_first_remote_branch(smmr, sm.branch_name))
# NOTE: All head-resetting is done in the base
# implementation of update but we will have to checkout the
# new branch here. As it still points to the currently
# checked out commit, we don't do any harm.
# As we don't want to update working-tree or index, changing
# the ref is all there is to do.
smm.head.reference = tbr
# END handle dry_run
progress.update(
END | BRANCHCHANGE,
i,
len_csms,
prefix + "Done changing branch of submodule %r" % sm.name,
)
# END handle branch
# END handle
# END for each common submodule
except Exception as err:
if not keep_going:
raise
_logger.error(str(err))
# END handle keep_going
# FINALLY UPDATE ALL ACTUAL SUBMODULES
######################################
for sm in sms:
# Update the submodule using the default method.
sm.update(
recursive=False,
init=init,
to_latest_revision=to_latest_revision,
progress=progress,
dry_run=dry_run,
force=force_reset,
keep_going=keep_going,
)
# Update recursively depth first - question is which inconsistent state will
# be better in case it fails somewhere. Defective branch or defective depth.
# The RootSubmodule type will never process itself, which was done in the
# previous expression.
if recursive:
# The module would exist by now if we are not in dry_run mode.
if sm.module_exists():
type(self)(sm.module()).update(
recursive=True,
force_remove=force_remove,
init=init,
to_latest_revision=to_latest_revision,
progress=progress,
dry_run=dry_run,
force_reset=force_reset,
keep_going=keep_going,
)
# END handle dry_run
# END handle recursive
# END for each submodule to update
return self
def module(self) -> "Repo":
""":return: The actual repository containing the submodules"""
return self.repo
# } END interface
# } END classes

View File

@@ -0,0 +1,121 @@
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = [
"sm_section",
"sm_name",
"mkhead",
"find_first_remote_branch",
"SubmoduleConfigParser",
]
from io import BytesIO
import weakref
import git
from git.config import GitConfigParser
from git.exc import InvalidGitRepositoryError
# typing -----------------------------------------------------------------------
from typing import Any, Sequence, TYPE_CHECKING, Union
from git.types import PathLike
if TYPE_CHECKING:
from weakref import ReferenceType
from git.refs import Head, RemoteReference
from git.remote import Remote
from git.repo import Repo
from .base import Submodule
# { Utilities
def sm_section(name: str) -> str:
""":return: Section title used in ``.gitmodules`` configuration file"""
return f'submodule "{name}"'
def sm_name(section: str) -> str:
""":return: Name of the submodule as parsed from the section name"""
section = section.strip()
return section[11:-1]
def mkhead(repo: "Repo", path: PathLike) -> "Head":
""":return: New branch/head instance"""
return git.Head(repo, git.Head.to_full_path(path))
def find_first_remote_branch(remotes: Sequence["Remote"], branch_name: str) -> "RemoteReference":
"""Find the remote branch matching the name of the given branch or raise
:exc:`~git.exc.InvalidGitRepositoryError`."""
for remote in remotes:
try:
return remote.refs[branch_name]
except IndexError:
continue
# END exception handling
# END for remote
raise InvalidGitRepositoryError("Didn't find remote branch '%r' in any of the given remotes" % branch_name)
# } END utilities
# { Classes
class SubmoduleConfigParser(GitConfigParser):
"""Catches calls to :meth:`~git.config.GitConfigParser.write`, and updates the
``.gitmodules`` blob in the index with the new data, if we have written into a
stream.
Otherwise it would add the local file to the index to make it correspond with the
working tree. Additionally, the cache must be cleared.
Please note that no mutating method will work in bare mode.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._smref: Union["ReferenceType[Submodule]", None] = None
self._index = None
self._auto_write = True
super().__init__(*args, **kwargs)
# { Interface
def set_submodule(self, submodule: "Submodule") -> None:
"""Set this instance's submodule. It must be called before the first write
operation begins."""
self._smref = weakref.ref(submodule)
def flush_to_index(self) -> None:
"""Flush changes in our configuration file to the index."""
assert self._smref is not None
# Should always have a file here.
assert not isinstance(self._file_or_files, BytesIO)
sm = self._smref()
if sm is not None:
index = self._index
if index is None:
index = sm.repo.index
# END handle index
index.add([sm.k_modules_file], write=self._auto_write)
sm._clear_cache()
# END handle weakref
# } END interface
# { Overridden Methods
def write(self) -> None: # type: ignore[override]
rval: None = super().write()
self.flush_to_index()
return rval
# END overridden methods
# } END classes

View File

@@ -0,0 +1,141 @@
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
"""Provides an :class:`~git.objects.base.Object`-based type for annotated tags.
This defines the :class:`TagObject` class, which represents annotated tags.
For lightweight tags, see the :mod:`git.refs.tag` module.
"""
__all__ = ["TagObject"]
import sys
from git.compat import defenc
from git.util import hex_to_bin
from . import base
from .util import get_object_type_by_name, parse_actor_and_date
# typing ----------------------------------------------
from typing import List, TYPE_CHECKING, Union
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
if TYPE_CHECKING:
from git.repo import Repo
from git.util import Actor
from .blob import Blob
from .commit import Commit
from .tree import Tree
# ---------------------------------------------------
class TagObject(base.Object):
"""Annotated (i.e. non-lightweight) tag carrying additional information about an
object we are pointing to.
See :manpage:`gitglossary(7)` on "tag object":
https://git-scm.com/docs/gitglossary#def_tag_object
"""
type: Literal["tag"] = "tag"
__slots__ = (
"object",
"tag",
"tagger",
"tagged_date",
"tagger_tz_offset",
"message",
)
def __init__(
self,
repo: "Repo",
binsha: bytes,
object: Union[None, base.Object] = None,
tag: Union[None, str] = None,
tagger: Union[None, "Actor"] = None,
tagged_date: Union[int, None] = None,
tagger_tz_offset: Union[int, None] = None,
message: Union[str, None] = None,
) -> None: # @ReservedAssignment
"""Initialize a tag object with additional data.
:param repo:
Repository this object is located in.
:param binsha:
20 byte SHA1.
:param object:
:class:`~git.objects.base.Object` instance of object we are pointing to.
:param tag:
Name of this tag.
:param tagger:
:class:`~git.util.Actor` identifying the tagger.
:param tagged_date: int_seconds_since_epoch
The DateTime of the tag creation.
Use :func:`time.gmtime` to convert it into a different format.
:param tagger_tz_offset: int_seconds_west_of_utc
The timezone that the `tagged_date` is in, in a format similar to
:attr:`time.altzone`.
"""
super().__init__(repo, binsha)
if object is not None:
self.object: Union["Commit", "Blob", "Tree", "TagObject"] = object
if tag is not None:
self.tag = tag
if tagger is not None:
self.tagger = tagger
if tagged_date is not None:
self.tagged_date = tagged_date
if tagger_tz_offset is not None:
self.tagger_tz_offset = tagger_tz_offset
if message is not None:
self.message = message
def _set_cache_(self, attr: str) -> None:
"""Cache all our attributes at once."""
if attr in TagObject.__slots__:
ostream = self.repo.odb.stream(self.binsha)
lines: List[str] = ostream.read().decode(defenc, "replace").splitlines()
_obj, hexsha = lines[0].split(" ")
_type_token, type_name = lines[1].split(" ")
object_type = get_object_type_by_name(type_name.encode("ascii"))
self.object = object_type(self.repo, hex_to_bin(hexsha))
self.tag = lines[2][4:] # tag <tag name>
if len(lines) > 3:
tagger_info = lines[3] # tagger <actor> <date>
(
self.tagger,
self.tagged_date,
self.tagger_tz_offset,
) = parse_actor_and_date(tagger_info)
# Line 4 empty - it could mark the beginning of the next header.
# In case there really is no message, it would not exist.
# Otherwise a newline separates header from message.
if len(lines) > 5:
self.message = "\n".join(lines[5:])
else:
self.message = ""
# END check our attributes
else:
super()._set_cache_(attr)

View File

@@ -0,0 +1,414 @@
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
__all__ = ["TreeModifier", "Tree"]
import sys
import git.diff as git_diff
from git.util import IterableList, join_path, to_bin_sha
from . import util
from .base import IndexObjUnion, IndexObject
from .blob import Blob
from .fun import tree_entries_from_data, tree_to_stream
from .submodule.base import Submodule
# typing -------------------------------------------------
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Tuple,
TYPE_CHECKING,
Type,
Union,
cast,
)
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
from git.types import PathLike
if TYPE_CHECKING:
from io import BytesIO
from git.repo import Repo
TreeCacheTup = Tuple[bytes, int, str]
TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
# --------------------------------------------------------
cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
class TreeModifier:
"""A utility class providing methods to alter the underlying cache in a list-like
fashion.
Once all adjustments are complete, the :attr:`_cache`, which really is a reference
to the cache of a tree, will be sorted. This ensures it will be in a serializable
state.
"""
__slots__ = ("_cache",)
def __init__(self, cache: List[TreeCacheTup]) -> None:
self._cache = cache
def _index_by_name(self, name: str) -> int:
""":return: index of an item with name, or -1 if not found"""
for i, t in enumerate(self._cache):
if t[2] == name:
return i
# END found item
# END for each item in cache
return -1
# { Interface
def set_done(self) -> "TreeModifier":
"""Call this method once you are done modifying the tree information.
This may be called several times, but be aware that each call will cause a sort
operation.
:return:
self
"""
self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2])
return self
# } END interface
# { Mutators
def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
"""Add the given item to the tree.
If an item with the given name already exists, nothing will be done, but a
:exc:`ValueError` will be raised if the sha and mode of the existing item do not
match the one you add, unless `force` is ``True``.
:param sha:
The 20 or 40 byte sha of the item to add.
:param mode:
:class:`int` representing the stat-compatible mode of the item.
:param force:
If ``True``, an item with your name and information will overwrite any
existing item with the same name, no matter which information it has.
:return:
self
"""
if "/" in name:
raise ValueError("Name must not contain '/' characters")
if (mode >> 12) not in Tree._map_id_to_type:
raise ValueError("Invalid object type according to mode %o" % mode)
sha = to_bin_sha(sha)
index = self._index_by_name(name)
item = (sha, mode, name)
if index == -1:
self._cache.append(item)
else:
if force:
self._cache[index] = item
else:
ex_item = self._cache[index]
if ex_item[0] != sha or ex_item[1] != mode:
raise ValueError("Item %r existed with different properties" % name)
# END handle mismatch
# END handle force
# END handle name exists
return self
def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
"""Add the given item to the tree. Its correctness is assumed, so it is the
caller's responsibility to ensure that the input is correct.
For more information on the parameters, see :meth:`add`.
:param binsha:
20 byte binary sha.
"""
assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
tree_cache = (binsha, mode, name)
self._cache.append(tree_cache)
def __delitem__(self, name: str) -> None:
"""Delete an item with the given name if it exists."""
index = self._index_by_name(name)
if index > -1:
del self._cache[index]
# } END mutators
class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and
other :class:`Tree`\s.
See :manpage:`gitglossary(7)` on "tree object":
https://git-scm.com/docs/gitglossary#def_tree_object
Subscripting is supported, as with a list or dict:
* Access a specific blob using the ``tree["filename"]`` notation.
* You may likewise access by index, like ``blob = tree[0]``.
"""
type: Literal["tree"] = "tree"
__slots__ = ("_cache",)
# Actual integer IDs for comparison.
commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link.
blob_id = 0o10
symlink_id = 0o12
tree_id = 0o04
_map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
commit_id: Submodule,
blob_id: Blob,
symlink_id: Blob,
# Tree ID added once Tree is defined.
}
def __init__(
self,
repo: "Repo",
binsha: bytes,
mode: int = tree_id << 12,
path: Union[PathLike, None] = None,
):
super().__init__(repo, binsha, mode, path)
@classmethod
def _get_intermediate_items(
cls,
index_object: IndexObjUnion,
) -> Union[Tuple["Tree", ...], Tuple[()]]:
if index_object.type == "tree":
return tuple(index_object._iter_convert_to_object(index_object._cache))
return ()
def _set_cache_(self, attr: str) -> None:
if attr == "_cache":
# Set the data when we need it.
ostream = self.repo.odb.stream(self.binsha)
self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
else:
super()._set_cache_(attr)
# END handle attribute
def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
"""Iterable yields tuples of (binsha, mode, name), which will be converted to
the respective object representation.
"""
for binsha, mode, name in iterable:
path = join_path(self.path, name)
try:
yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
except KeyError as e:
raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
# END for each item
def join(self, file: str) -> IndexObjUnion:
"""Find the named object in this tree's contents.
:return:
:class:`~git.objects.blob.Blob`, :class:`Tree`, or
:class:`~git.objects.submodule.base.Submodule`
:raise KeyError:
If the given file or tree does not exist in this tree.
"""
msg = "Blob or Tree named %r not found"
if "/" in file:
tree = self
item = self
tokens = file.split("/")
for i, token in enumerate(tokens):
item = tree[token]
if item.type == "tree":
tree = item
else:
# Safety assertion - blobs are at the end of the path.
if i != len(tokens) - 1:
raise KeyError(msg % file)
return item
# END handle item type
# END for each token of split path
if item == self:
raise KeyError(msg % file)
return item
else:
for info in self._cache:
if info[2] == file: # [2] == name
return self._map_id_to_type[info[1] >> 12](
self.repo, info[0], info[1], join_path(self.path, info[2])
)
# END for each obj
raise KeyError(msg % file)
# END handle long paths
def __truediv__(self, file: str) -> IndexObjUnion:
"""The ``/`` operator is another syntax for joining.
See :meth:`join` for details.
"""
return self.join(file)
@property
def trees(self) -> List["Tree"]:
""":return: list(Tree, ...) List of trees directly below this tree"""
return [i for i in self if i.type == "tree"]
@property
def blobs(self) -> List[Blob]:
""":return: list(Blob, ...) List of blobs directly below this tree"""
return [i for i in self if i.type == "blob"]
@property
def cache(self) -> TreeModifier:
"""
:return:
An object allowing modification of the internal cache. This can be used to
change the tree's contents. When done, make sure you call
:meth:`~TreeModifier.set_done` on the tree modifier, or serialization
behaviour will be incorrect.
:note:
See :class:`TreeModifier` for more information on how to alter the cache.
"""
return TreeModifier(self._cache)
def traverse(
self,
predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
depth: int = -1,
branch_first: bool = True,
visit_once: bool = False,
ignore_self: int = 1,
as_edge: bool = False,
) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
"""For documentation, see
`Traversable._traverse() <git.objects.util.Traversable._traverse>`.
Trees are set to ``visit_once = False`` to gain more performance in the
traversal.
"""
# # To typecheck instead of using cast.
# import itertools
# def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
# return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
# ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
# ret_tup = itertools.tee(ret, 2)
# assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
# return ret_tup[0]
return cast(
Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
super()._traverse(
predicate, # type: ignore[arg-type]
prune, # type: ignore[arg-type]
depth,
branch_first,
visit_once,
ignore_self,
),
)
def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
"""
:return:
:class:`~git.util.IterableList` with the results of the traversal as
produced by :meth:`traverse`
Tree -> IterableList[Union[Submodule, Tree, Blob]]
"""
return super()._list_traverse(*args, **kwargs)
# List protocol
def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
return list(self._iter_convert_to_object(self._cache[i:j]))
def __iter__(self) -> Iterator[IndexObjUnion]:
return self._iter_convert_to_object(self._cache)
def __len__(self) -> int:
return len(self._cache)
def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
if isinstance(item, int):
info = self._cache[item]
return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
if isinstance(item, str):
# compatibility
return self.join(item)
# END index is basestring
raise TypeError("Invalid index type: %r" % item)
def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
if isinstance(item, IndexObject):
for info in self._cache:
if item.binsha == info[0]:
return True
# END compare sha
# END for each entry
# END handle item is index object
# compatibility
# Treat item as repo-relative path.
else:
path = self.path
for info in self._cache:
if item == join_path(path, info[2]):
return True
# END for each item
return False
def __reversed__(self) -> Iterator[IndexObjUnion]:
return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload]
def _serialize(self, stream: "BytesIO") -> "Tree":
"""Serialize this tree into the stream. Assumes sorted tree data.
:note:
We will assume our tree data to be in a sorted state. If this is not the
case, serialization will not generate a correct tree representation as these
are assumed to be sorted by algorithms.
"""
tree_to_stream(self._cache, stream.write)
return self
def _deserialize(self, stream: "BytesIO") -> "Tree":
self._cache = tree_entries_from_data(stream.read())
return self
# END tree
# Finalize map definition.
Tree._map_id_to_type[Tree.tree_id] = Tree

View File

@@ -0,0 +1,700 @@
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
"""Utility functions for working with git objects."""
__all__ = [
"get_object_type_by_name",
"parse_date",
"parse_actor_and_date",
"ProcessStreamAdapter",
"Traversable",
"altz_to_utctz_str",
"utctz_to_altz",
"verify_utctz",
"Actor",
"tzoffset",
"utc",
]
from abc import ABC, abstractmethod
import calendar
from collections import deque
from datetime import datetime, timedelta, tzinfo
import re
from string import digits
import time
import warnings
from git.util import Actor, IterableList, IterableObj
# typing ------------------------------------------------------------
from typing import (
Any,
Callable,
Deque,
Iterator,
NamedTuple,
Sequence,
TYPE_CHECKING,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)
from git.types import Has_id_attribute, Literal
if TYPE_CHECKING:
from io import BytesIO, StringIO
from subprocess import Popen
from git.types import Protocol, runtime_checkable
from .blob import Blob
from .commit import Commit
from .submodule.base import Submodule
from .tag import TagObject
from .tree import TraversedTreeTup, Tree
else:
Protocol = ABC
def runtime_checkable(f):
return f
class TraverseNT(NamedTuple):
depth: int
item: Union["Traversable", "Blob"]
src: Union["Traversable", None]
T_TIobj = TypeVar("T_TIobj", bound="TraversableIterableObj") # For TraversableIterableObj.traverse()
TraversedTup = Union[
Tuple[Union["Traversable", None], "Traversable"], # For Commit, Submodule.
"TraversedTreeTup", # For Tree.traverse().
]
# --------------------------------------------------------------------
ZERO = timedelta(0)
# { Functions
def mode_str_to_int(modestr: Union[bytes, str]) -> int:
"""Convert mode bits from an octal mode string to an integer mode for git.
:param modestr:
String like ``755`` or ``644`` or ``100644`` - only the last 6 chars will be
used.
:return:
String identifying a mode compatible to the mode methods ids of the :mod:`stat`
module regarding the rwx permissions for user, group and other, special flags
and file system flags, such as whether it is a symlink.
"""
mode = 0
for iteration, char in enumerate(reversed(modestr[-6:])):
char = cast(Union[str, int], char)
mode += int(char) << iteration * 3
# END for each char
return mode
def get_object_type_by_name(
object_type_name: bytes,
) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]:
"""Retrieve the Python class GitPython uses to represent a kind of Git object.
:return:
A type suitable to handle the given as `object_type_name`.
This type can be called create new instances.
:param object_type_name:
Member of :attr:`Object.TYPES <git.objects.base.Object.TYPES>`.
:raise ValueError:
If `object_type_name` is unknown.
"""
if object_type_name == b"commit":
from . import commit
return commit.Commit
elif object_type_name == b"tag":
from . import tag
return tag.TagObject
elif object_type_name == b"blob":
from . import blob
return blob.Blob
elif object_type_name == b"tree":
from . import tree
return tree.Tree
else:
raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode())
def utctz_to_altz(utctz: str) -> int:
"""Convert a git timezone offset into a timezone offset west of UTC in seconds
(compatible with :attr:`time.altzone`).
:param utctz:
git utc timezone string, e.g. +0200
"""
int_utctz = int(utctz)
seconds = (abs(int_utctz) // 100) * 3600 + (abs(int_utctz) % 100) * 60
return seconds if int_utctz < 0 else -seconds
def altz_to_utctz_str(altz: float) -> str:
"""Convert a timezone offset west of UTC in seconds into a Git timezone offset
string.
:param altz:
Timezone offset in seconds west of UTC.
"""
hours = abs(altz) // 3600
minutes = (abs(altz) % 3600) // 60
sign = "-" if altz >= 60 else "+"
return "{}{:02}{:02}".format(sign, hours, minutes)
def verify_utctz(offset: str) -> str:
"""
:raise ValueError:
If `offset` is incorrect.
:return:
`offset`
"""
fmt_exc = ValueError("Invalid timezone offset format: %s" % offset)
if len(offset) != 5:
raise fmt_exc
if offset[0] not in "+-":
raise fmt_exc
if offset[1] not in digits or offset[2] not in digits or offset[3] not in digits or offset[4] not in digits:
raise fmt_exc
# END for each char
return offset
class tzoffset(tzinfo):
def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None:
self._offset = timedelta(seconds=-secs_west_of_utc)
self._name = name or "fixed"
def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]:
return tzoffset, (-self._offset.total_seconds(), self._name)
def utcoffset(self, dt: Union[datetime, None]) -> timedelta:
return self._offset
def tzname(self, dt: Union[datetime, None]) -> str:
return self._name
def dst(self, dt: Union[datetime, None]) -> timedelta:
return ZERO
utc = tzoffset(0, "UTC")
def from_timestamp(timestamp: float, tz_offset: float) -> datetime:
"""Convert a `timestamp` + `tz_offset` into an aware :class:`~datetime.datetime`
instance."""
utc_dt = datetime.fromtimestamp(timestamp, utc)
try:
local_dt = utc_dt.astimezone(tzoffset(tz_offset))
return local_dt
except ValueError:
return utc_dt
def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
"""Parse the given date as one of the following:
* Aware datetime instance
* Git internal format: timestamp offset
* :rfc:`2822`: ``Thu, 07 Apr 2005 22:13:13 +0200``
* ISO 8601: ``2005-04-07T22:13:13`` - The ``T`` can be a space as well.
:return:
Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch
:raise ValueError:
If the format could not be understood.
:note:
Date can also be ``YYYY.MM.DD``, ``MM/DD/YYYY`` and ``DD.MM.YYYY``.
"""
if isinstance(string_date, datetime):
if string_date.tzinfo:
utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None
offset = -int(utcoffset.total_seconds())
return int(string_date.astimezone(utc).timestamp()), offset
else:
raise ValueError(f"string_date datetime object without tzinfo, {string_date}")
# Git time
try:
if string_date.count(" ") == 1 and string_date.rfind(":") == -1:
timestamp, offset_str = string_date.split()
if timestamp.startswith("@"):
timestamp = timestamp[1:]
timestamp_int = int(timestamp)
return timestamp_int, utctz_to_altz(verify_utctz(offset_str))
else:
offset_str = "+0000" # Local time by default.
if string_date[-5] in "-+":
offset_str = verify_utctz(string_date[-5:])
string_date = string_date[:-6] # skip space as well
# END split timezone info
offset = utctz_to_altz(offset_str)
# Now figure out the date and time portion - split time.
date_formats = []
splitter = -1
if "," in string_date:
date_formats.append("%a, %d %b %Y")
splitter = string_date.rfind(" ")
else:
# ISO plus additional
date_formats.append("%Y-%m-%d")
date_formats.append("%Y.%m.%d")
date_formats.append("%m/%d/%Y")
date_formats.append("%d.%m.%Y")
splitter = string_date.rfind("T")
if splitter == -1:
splitter = string_date.rfind(" ")
# END handle 'T' and ' '
# END handle RFC or ISO
assert splitter > -1
# Split date and time.
time_part = string_date[splitter + 1 :] # Skip space.
date_part = string_date[:splitter]
# Parse time.
tstruct = time.strptime(time_part, "%H:%M:%S")
for fmt in date_formats:
try:
dtstruct = time.strptime(date_part, fmt)
utctime = calendar.timegm(
(
dtstruct.tm_year,
dtstruct.tm_mon,
dtstruct.tm_mday,
tstruct.tm_hour,
tstruct.tm_min,
tstruct.tm_sec,
dtstruct.tm_wday,
dtstruct.tm_yday,
tstruct.tm_isdst,
)
)
return int(utctime), offset
except ValueError:
continue
# END exception handling
# END for each fmt
# Still here ? fail.
raise ValueError("no format matched")
# END handle format
except Exception as e:
raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e
# END handle exceptions
# Precompiled regexes
_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$")
_re_only_actor = re.compile(r"^.+? (.*)$")
def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]:
"""Parse out the actor (author or committer) info from a line like::
author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700
:return:
[Actor, int_seconds_since_epoch, int_timezone_offset]
"""
actor, epoch, offset = "", "0", "0"
m = _re_actor_epoch.search(line)
if m:
actor, epoch, offset = m.groups()
else:
m = _re_only_actor.search(line)
actor = m.group(1) if m else line or ""
return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset))
# } END functions
# { Classes
class ProcessStreamAdapter:
"""Class wiring all calls to the contained Process instance.
Use this type to hide the underlying process to provide access only to a specified
stream. The process is usually wrapped into an :class:`~git.cmd.Git.AutoInterrupt`
class to kill it if the instance goes out of scope.
"""
__slots__ = ("_proc", "_stream")
def __init__(self, process: "Popen", stream_name: str) -> None:
self._proc = process
self._stream: StringIO = getattr(process, stream_name) # guessed type
def __getattr__(self, attr: str) -> Any:
return getattr(self._stream, attr)
@runtime_checkable
class Traversable(Protocol):
"""Simple interface to perform depth-first or breadth-first traversals in one
direction.
Subclasses only need to implement one function.
Instances of the subclass must be hashable.
Defined subclasses:
* :class:`Commit <git.objects.Commit>`
* :class:`Tree <git.objects.tree.Tree>`
* :class:`Submodule <git.objects.submodule.base.Submodule>`
"""
__slots__ = ()
@classmethod
@abstractmethod
def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]:
"""
:return:
Tuple of items connected to the given item.
Must be implemented in subclass.
class Commit:: (cls, Commit) -> Tuple[Commit, ...]
class Submodule:: (cls, Submodule) -> Iterablelist[Submodule]
class Tree:: (cls, Tree) -> Tuple[Tree, ...]
"""
raise NotImplementedError("To be implemented in subclass")
@abstractmethod
def list_traverse(self, *args: Any, **kwargs: Any) -> Any:
"""Traverse self and collect all items found.
Calling this directly on the abstract base class, including via a ``super()``
proxy, is deprecated. Only overridden implementations should be called.
"""
warnings.warn(
"list_traverse() method should only be called from subclasses."
" Calling from Traversable abstract class will raise NotImplementedError in 4.0.0."
" The concrete subclasses in GitPython itself are 'Commit', 'RootModule', 'Submodule', and 'Tree'.",
DeprecationWarning,
stacklevel=2,
)
return self._list_traverse(*args, **kwargs)
def _list_traverse(
self, as_edge: bool = False, *args: Any, **kwargs: Any
) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]:
"""Traverse self and collect all items found.
:return:
:class:`~git.util.IterableList` with the results of the traversal as
produced by :meth:`traverse`::
Commit -> IterableList[Commit]
Submodule -> IterableList[Submodule]
Tree -> IterableList[Union[Submodule, Tree, Blob]]
"""
# Commit and Submodule have id.__attribute__ as IterableObj.
# Tree has id.__attribute__ inherited from IndexObject.
if isinstance(self, Has_id_attribute):
id = self._id_attribute_
else:
# Shouldn't reach here, unless Traversable subclass created with no
# _id_attribute_.
id = ""
# Could add _id_attribute_ to Traversable, or make all Traversable also
# Iterable?
if not as_edge:
out: IterableList[Union["Commit", "Submodule", "Tree", "Blob"]] = IterableList(id)
out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # noqa: B026
return out
# Overloads in subclasses (mypy doesn't allow typing self: subclass).
# Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]]
else:
# Raise DeprecationWarning, it doesn't make sense to use this.
out_list: IterableList = IterableList(self.traverse(*args, **kwargs))
return out_list
@abstractmethod
def traverse(self, *args: Any, **kwargs: Any) -> Any:
"""Iterator yielding items found when traversing self.
Calling this directly on the abstract base class, including via a ``super()``
proxy, is deprecated. Only overridden implementations should be called.
"""
warnings.warn(
"traverse() method should only be called from subclasses."
" Calling from Traversable abstract class will raise NotImplementedError in 4.0.0."
" The concrete subclasses in GitPython itself are 'Commit', 'RootModule', 'Submodule', and 'Tree'.",
DeprecationWarning,
stacklevel=2,
)
return self._traverse(*args, **kwargs)
def _traverse(
self,
predicate: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: True,
prune: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: False,
depth: int = -1,
branch_first: bool = True,
visit_once: bool = True,
ignore_self: int = 1,
as_edge: bool = False,
) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]:
"""Iterator yielding items found when traversing `self`.
:param predicate:
A function ``f(i,d)`` that returns ``False`` if item i at depth ``d`` should
not be included in the result.
:param prune:
A function ``f(i,d)`` that returns ``True`` if the search should stop at
item ``i`` at depth ``d``. Item ``i`` will not be returned.
:param depth:
Defines at which level the iteration should not go deeper if -1. There is no
limit if 0, you would effectively only get `self`, the root of the
iteration. If 1, you would only get the first level of
predecessors/successors.
:param branch_first:
If ``True``, items will be returned branch first, otherwise depth first.
:param visit_once:
If ``True``, items will only be returned once, although they might be
encountered several times. Loops are prevented that way.
:param ignore_self:
If ``True``, `self` will be ignored and automatically pruned from the
result. Otherwise it will be the first item to be returned. If `as_edge` is
``True``, the source of the first edge is ``None``.
:param as_edge:
If ``True``, return a pair of items, first being the source, second the
destination, i.e. tuple(src, dest) with the edge spanning from source to
destination.
:return:
Iterator yielding items found when traversing `self`::
Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] Submodule ->
Iterator[Submodule, Tuple[Submodule, Submodule]] Tree ->
Iterator[Union[Blob, Tree, Submodule,
Tuple[Union[Submodule, Tree], Union[Blob, Tree,
Submodule]]]
ignore_self=True is_edge=True -> Iterator[item] ignore_self=True
is_edge=False --> Iterator[item] ignore_self=False is_edge=True ->
Iterator[item] | Iterator[Tuple[src, item]] ignore_self=False
is_edge=False -> Iterator[Tuple[src, item]]
"""
visited = set()
stack: Deque[TraverseNT] = deque()
stack.append(TraverseNT(0, self, None)) # self is always depth level 0.
def addToStack(
stack: Deque[TraverseNT],
src_item: "Traversable",
branch_first: bool,
depth: int,
) -> None:
lst = self._get_intermediate_items(item)
if not lst: # Empty list
return
if branch_first:
stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
else:
reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
stack.extend(reviter)
# END addToStack local method
while stack:
d, item, src = stack.pop() # Depth of item, item, item_source
if visit_once and item in visited:
continue
if visit_once:
visited.add(item)
rval: Union[TraversedTup, "Traversable", "Blob"]
if as_edge:
# If as_edge return (src, item) unless rrc is None
# (e.g. for first item).
rval = (src, item)
else:
rval = item
if prune(rval, d):
continue
skipStartItem = ignore_self and (item is self)
if not skipStartItem and predicate(rval, d):
yield rval
# Only continue to next level if this is appropriate!
nd = d + 1
if depth > -1 and nd > depth:
continue
addToStack(stack, item, branch_first, nd)
# END for each item on work stack
@runtime_checkable
class Serializable(Protocol):
"""Defines methods to serialize and deserialize objects from and into a data
stream."""
__slots__ = ()
# @abstractmethod
def _serialize(self, stream: "BytesIO") -> "Serializable":
"""Serialize the data of this object into the given data stream.
:note:
A serialized object would :meth:`_deserialize` into the same object.
:param stream:
A file-like object.
:return:
self
"""
raise NotImplementedError("To be implemented in subclass")
# @abstractmethod
def _deserialize(self, stream: "BytesIO") -> "Serializable":
"""Deserialize all information regarding this object from the stream.
:param stream:
A file-like object.
:return:
self
"""
raise NotImplementedError("To be implemented in subclass")
class TraversableIterableObj(IterableObj, Traversable):
__slots__ = ()
TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]:
return super()._list_traverse(*args, **kwargs)
@overload
def traverse(self: T_TIobj) -> Iterator[T_TIobj]: ...
@overload
def traverse(
self: T_TIobj,
predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
depth: int,
branch_first: bool,
visit_once: bool,
ignore_self: Literal[True],
as_edge: Literal[False],
) -> Iterator[T_TIobj]: ...
@overload
def traverse(
self: T_TIobj,
predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
depth: int,
branch_first: bool,
visit_once: bool,
ignore_self: Literal[False],
as_edge: Literal[True],
) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: ...
@overload
def traverse(
self: T_TIobj,
predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
depth: int,
branch_first: bool,
visit_once: bool,
ignore_self: Literal[True],
as_edge: Literal[True],
) -> Iterator[Tuple[T_TIobj, T_TIobj]]: ...
def traverse(
self: T_TIobj,
predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: True,
prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False,
depth: int = -1,
branch_first: bool = True,
visit_once: bool = True,
ignore_self: int = 1,
as_edge: bool = False,
) -> Union[Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]]:
"""For documentation, see :meth:`Traversable._traverse`."""
## To typecheck instead of using cast:
#
# import itertools
# from git.types import TypeGuard
# def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]:
# for x in inp[1]:
# if not isinstance(x, tuple) and len(x) != 2:
# if all(isinstance(inner, Commit) for inner in x):
# continue
# return True
#
# ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge)
# ret_tup = itertools.tee(ret, 2)
# assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}"
# return ret_tup[0]
return cast(
Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
super()._traverse(
predicate, # type: ignore[arg-type]
prune, # type: ignore[arg-type]
depth,
branch_first,
visit_once,
ignore_self,
as_edge,
),
)