本文整理汇总了Python中portage.os.stat函数的典型用法代码示例。如果您正苦于以下问题:Python stat函数的具体用法?Python stat怎么用?Python stat使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了stat函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: updateprotect
def updateprotect(self):
"""Update internal state for isprotected() calls. Nonexistent paths
are ignored."""
os = _os_merge
self.protect = []
self._dirs = set()
for x in self.protect_list:
ppath = normalize_path(
os.path.join(self.myroot, x.lstrip(os.path.sep)))
try:
if stat.S_ISDIR(os.stat(ppath).st_mode):
self._dirs.add(ppath)
self.protect.append(ppath)
except OSError:
# If it doesn't exist, there's no need to protect it.
pass
self.protectmask = []
for x in self.mask_list:
ppath = normalize_path(
os.path.join(self.myroot, x.lstrip(os.path.sep)))
try:
"""Use lstat so that anything, even a broken symlink can be
protected."""
if stat.S_ISDIR(os.lstat(ppath).st_mode):
self._dirs.add(ppath)
self.protectmask.append(ppath)
"""Now use stat in case this is a symlink to a directory."""
if stat.S_ISDIR(os.stat(ppath).st_mode):
self._dirs.add(ppath)
except OSError:
# If it doesn't exist, there's no need to mask it.
pass
开发者ID:fastinetserver,项目名称:portage-idfetch,代码行数:35,代码来源:__init__.py
示例2: _same_device
def _same_device(path1, path2):
try:
st1 = os.stat(path1)
st2 = os.stat(path2)
except OSError:
return False
else:
return st1.st_dev == st2.st_dev
开发者ID:amadio,项目名称:portage,代码行数:8,代码来源:FetchTask.py
示例3: exists_raise_eaccess
def exists_raise_eaccess(path):
try:
os.stat(path)
except OSError as e:
if e.errno == PermissionDenied.errno:
raise PermissionDenied("stat('%s')" % path)
return False
else:
return True
开发者ID:Spencerx,项目名称:portage,代码行数:9,代码来源:_path.py
示例4: cacheddir
def cacheddir(my_original_path, ignorecvs, ignorelist, EmptyOnError, followSymlinks=True):
mypath = normalize_path(my_original_path)
try:
pathstat = os.stat(mypath)
if not stat.S_ISDIR(pathstat.st_mode):
raise DirectoryNotFound(mypath)
except EnvironmentError as e:
if e.errno == PermissionDenied.errno:
raise PermissionDenied(mypath)
del e
return [], []
except PortageException:
return [], []
else:
try:
fpaths = os.listdir(mypath)
except EnvironmentError as e:
if e.errno != errno.EACCES:
raise
del e
raise PermissionDenied(mypath)
ftype = []
for x in fpaths:
try:
if followSymlinks:
pathstat = os.stat(mypath+"/"+x)
else:
pathstat = os.lstat(mypath+"/"+x)
if stat.S_ISREG(pathstat[stat.ST_MODE]):
ftype.append(0)
elif stat.S_ISDIR(pathstat[stat.ST_MODE]):
ftype.append(1)
elif stat.S_ISLNK(pathstat[stat.ST_MODE]):
ftype.append(2)
else:
ftype.append(3)
except (IOError, OSError):
ftype.append(3)
if ignorelist or ignorecvs:
ret_list = []
ret_ftype = []
for file_path, file_type in zip(fpaths, ftype):
if file_path in ignorelist:
pass
elif ignorecvs:
if file_path[:2] != ".#" and \
not (file_type == 1 and file_path in VCS_DIRS):
ret_list.append(file_path)
ret_ftype.append(file_type)
else:
ret_list = fpaths
ret_ftype = ftype
return ret_list, ret_ftype
开发者ID:Spencerx,项目名称:portage,代码行数:56,代码来源:listdir.py
示例5: getfetchsizes
def getfetchsizes(self, mypkg, useflags=None, debug=0, myrepo=None):
# returns a filename:size dictionnary of remaining downloads
myebuild, mytree = self.findname2(mypkg, myrepo=myrepo)
if myebuild is None:
raise AssertionError(_("ebuild not found for '%s'") % mypkg)
pkgdir = os.path.dirname(myebuild)
mf = self.repositories.get_repo_for_location(
os.path.dirname(os.path.dirname(pkgdir))).load_manifest(
pkgdir, self.settings["DISTDIR"])
checksums = mf.getDigests()
if not checksums:
if debug:
writemsg(_("[empty/missing/bad digest]: %s\n") % (mypkg,))
return {}
filesdict={}
myfiles = self.getFetchMap(mypkg, useflags=useflags, mytree=mytree)
#XXX: maybe this should be improved: take partial downloads
# into account? check checksums?
for myfile in myfiles:
try:
fetch_size = int(checksums[myfile]["size"])
except (KeyError, ValueError):
if debug:
writemsg(_("[bad digest]: missing %(file)s for %(pkg)s\n") % {"file":myfile, "pkg":mypkg})
continue
file_path = os.path.join(self.settings["DISTDIR"], myfile)
mystat = None
try:
mystat = os.stat(file_path)
except OSError:
pass
if mystat is None:
existing_size = 0
ro_distdirs = self.settings.get("PORTAGE_RO_DISTDIRS")
if ro_distdirs is not None:
for x in shlex_split(ro_distdirs):
try:
mystat = os.stat(os.path.join(x, myfile))
except OSError:
pass
else:
if mystat.st_size == fetch_size:
existing_size = fetch_size
break
else:
existing_size = mystat.st_size
remaining_size = fetch_size - existing_size
if remaining_size > 0:
# Assume the download is resumable.
filesdict[myfile] = remaining_size
elif remaining_size < 0:
# The existing file is too large and therefore corrupt.
filesdict[myfile] = int(checksums[myfile]["size"])
return filesdict
开发者ID:entoo,项目名称:portage-src,代码行数:54,代码来源:porttree.py
示例6: load
def load(self):
atoms = []
nonatoms = []
atoms_changed = False
# load atoms and non-atoms from different files so the worldfile is
# backwards-compatible with older versions and other PMs, even though
# it's supposed to be private state data :/
try:
mtime = os.stat(self._filename).st_mtime
except (OSError, IOError):
mtime = None
if (not self._loaded or self._mtime != mtime):
try:
data, errors = self.loader.load()
for fname in errors:
for e in errors[fname]:
self.errors.append(fname+": "+e)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
del e
data = {}
atoms = list(data)
self._mtime = mtime
atoms_changed = True
else:
atoms.extend(self._atoms)
try:
mtime = os.stat(self._filename2).st_mtime
except (OSError, IOError):
mtime = None
if (not self._loaded or self._mtime2 != mtime):
try:
data, errors = self.loader2.load()
for fname in errors:
for e in errors[fname]:
self.errors.append(fname+": "+e)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
del e
data = {}
nonatoms = list(data)
self._mtime2 = mtime
atoms_changed = True
else:
nonatoms.extend(self._nonatoms)
if atoms_changed:
self._setAtoms(atoms+nonatoms)
开发者ID:fastinetserver,项目名称:portage-idfetch,代码行数:49,代码来源:files.py
示例7: grab_updates
def grab_updates(updpath, prev_mtimes=None):
"""Returns all the updates from the given directory as a sorted list of
tuples, each containing (file_path, statobj, content). If prev_mtimes is
given then only updates with differing mtimes are considered."""
try:
mylist = os.listdir(updpath)
except OSError as oe:
if oe.errno == errno.ENOENT:
raise DirectoryNotFound(updpath)
raise
if prev_mtimes is None:
prev_mtimes = {}
# validate the file name (filter out CVS directory, etc...)
mylist = [myfile for myfile in mylist if len(myfile) == 7 and myfile[1:3] == "Q-"]
if len(mylist) == 0:
return []
# update names are mangled to make them sort properly
mylist = [myfile[3:]+"-"+myfile[:2] for myfile in mylist]
mylist.sort()
mylist = [myfile[5:]+"-"+myfile[:4] for myfile in mylist]
update_data = []
for myfile in mylist:
file_path = os.path.join(updpath, myfile)
mystat = os.stat(file_path)
if file_path not in prev_mtimes or \
long(prev_mtimes[file_path]) != mystat[stat.ST_MTIME]:
content = codecs.open(_unicode_encode(file_path,
encoding=_encodings['fs'], errors='strict'),
mode='r', encoding=_encodings['repo.content'], errors='replace'
).read()
update_data.append((file_path, mystat, content))
return update_data
开发者ID:Neuvoo,项目名称:legacy-portage,代码行数:34,代码来源:update.py
示例8: load
def load(self):
try:
mtime = os.stat(self._filename).st_mtime
except (OSError, IOError):
mtime = None
if not self._loaded or self._mtime != mtime:
try:
data, errors = self.loader.load()
for fname in errors:
for e in errors[fname]:
self.errors.append(fname + ": " + e)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
del e
data = {}
if self.greedy:
atoms = []
for a in data:
matches = self.dbapi.match(a)
for cpv in matches:
atoms.append("%s:%s" % (cpv_getkey(cpv), self.dbapi.aux_get(cpv, ["SLOT"])[0]))
# In addition to any installed slots, also try to pull
# in the latest new slot that may be available.
atoms.append(a)
else:
atoms = iter(data)
self._setAtoms(atoms)
self._mtime = mtime
开发者ID:devurandom,项目名称:portage,代码行数:29,代码来源:files.py
示例9: sync_timestamp
def sync_timestamp(self):
# If possible, update the mtime to match the remote package if
# the fetcher didn't already do it automatically.
bintree = self.pkg.root_config.trees["bintree"]
if bintree._remote_has_index:
remote_mtime = bintree._remotepkgs[
bintree.dbapi._instance_key(
self.pkg.cpv)].get("_mtime_")
if remote_mtime is not None:
try:
remote_mtime = long(remote_mtime)
except ValueError:
pass
else:
try:
local_mtime = os.stat(self.pkg_path)[stat.ST_MTIME]
except OSError:
pass
else:
if remote_mtime != local_mtime:
try:
os.utime(self.pkg_path,
(remote_mtime, remote_mtime))
except OSError:
pass
开发者ID:gentoo,项目名称:portage,代码行数:25,代码来源:BinpkgFetcher.py
示例10: __getattr__
def __getattr__(self, attr):
if attr == 'mtime':
# use stat.ST_MTIME; accessing .st_mtime gets you a float
# depending on the python version, and long(float) introduces
# some rounding issues that aren't present for people using
# the straight c api.
# thus use the defacto python compatibility work around;
# access via index, which guarantees you get the raw long.
try:
self.mtime = obj = os.stat(self.location)[stat.ST_MTIME]
except OSError as e:
if e.errno in (errno.ENOENT, errno.ESTALE):
raise FileNotFound(self.location)
elif e.errno == PermissionDenied.errno:
raise PermissionDenied(self.location)
raise
return obj
if not attr.islower():
# we don't care to allow .mD5 as an alias for .md5
raise AttributeError(attr)
hashname = attr.upper()
if hashname not in checksum.hashfunc_map:
raise AttributeError(attr)
val = checksum.perform_checksum(self.location, hashname)[0]
setattr(self, attr, val)
return val
开发者ID:Spencerx,项目名称:portage,代码行数:26,代码来源:eclass_cache.py
示例11: _apply_max_mtime
def _apply_max_mtime(self, existing_st, entries):
"""
Set the Manifest mtime to the max mtime of all relevant files
(the existing Manifest mtime is included in order to account for
eclass modifications that change DIST entries). This results in a
stable/predictable mtime, which is useful when converting thin
manifests to thick manifests for distribution via rsync. For
portability, the mtime is set with 1 second resolution.
@param existing_st: stat result for existing Manifest
@type existing_st: posix.stat_result
@param entries: list of current Manifest2Entry instances
@type entries: list
"""
# Use stat_result[stat.ST_MTIME] for 1 second resolution, since
# it always rounds down. Note that stat_result.st_mtime will round
# up from 0.999999999 to 1.0 when precision is lost during conversion
# from nanosecond resolution to float.
max_mtime = None if existing_st is None else existing_st[stat.ST_MTIME]
for entry in entries:
if entry.type == 'DIST':
continue
abs_path = (os.path.join(self.pkgdir, 'files', entry.name) if
entry.type == 'AUX' else os.path.join(self.pkgdir, entry.name))
mtime = os.stat(abs_path)[stat.ST_MTIME]
if max_mtime is None or mtime > max_mtime:
max_mtime = mtime
if max_mtime is not None:
os.utime(self.getFullname(), (max_mtime, max_mtime))
开发者ID:gmt,项目名称:portage,代码行数:30,代码来源:manifest.py
示例12: close
def close(self):
"""Closes the temporary file, copies permissions (if possible),
and performs the atomic replacement via os.rename(). If the abort()
method has been called, then the temp file is closed and removed."""
f = object.__getattribute__(self, '_file')
real_name = object.__getattribute__(self, '_real_name')
if not f.closed:
try:
f.close()
if not object.__getattribute__(self, '_aborted'):
try:
apply_stat_permissions(f.name, os.stat(real_name))
except OperationNotPermitted:
pass
except FileNotFound:
pass
except OSError as oe: # from the above os.stat call
if oe.errno in (errno.ENOENT, errno.EPERM):
pass
else:
raise
os.rename(f.name, real_name)
finally:
# Make sure we cleanup the temp file
# even if an exception is raised.
try:
os.unlink(f.name)
except OSError as oe:
pass
开发者ID:fastinetserver,项目名称:portage-idfetch,代码行数:29,代码来源:__init__.py
示例13: _check_distfile
def _check_distfile(filename, digests, eout, show_errors=1):
"""
@return a tuple of (match, stat_obj) where match is True if filename
matches all given digests (if any) and stat_obj is a stat result, or
None if the file does not exist.
"""
if digests is None:
digests = {}
size = digests.get("size")
if size is not None and len(digests) == 1:
digests = None
try:
st = os.stat(filename)
except OSError:
return (False, None)
if size is not None and size != st.st_size:
return (False, st)
if not digests:
if size is not None:
eout.ebegin(_("%s size ;-)") % os.path.basename(filename))
eout.eend(0)
elif st.st_size == 0:
# Zero-byte distfiles are always invalid.
return (False, st)
else:
if _check_digests(filename, digests, show_errors=show_errors):
eout.ebegin("%s %s ;-)" % (os.path.basename(filename),
" ".join(sorted(digests))))
eout.eend(0)
else:
return (False, st)
return (True, st)
开发者ID:TommyD,项目名称:gentoo-portage-multilib,代码行数:33,代码来源:fetch.py
示例14: verify_all
def verify_all(filename, mydict, calc_prelink=0, strict=0):
"""
Verify all checksums against a file.
@param filename: File to run the checksums against
@type filename: String
@param calc_prelink: Whether or not to reverse prelink before running the checksum
@type calc_prelink: Integer
@param strict: Enable/Disable strict checking (which stops exactly at a checksum failure and throws an exception)
@type strict: Integer
@rtype: Tuple
@return: Result of the checks and possible message:
1) If size fails, False, and a tuple containing a message, the given size, and the actual size
2) If there is an os error, False, and a tuple containing the system error followed by 2 nulls
3) If a checksum fails, False and a tuple containing a message, the given hash, and the actual hash
4) If all checks succeed, return True and a fake reason
"""
# Dict relates to single file only.
# returns: (passed,reason)
file_is_ok = True
reason = "Reason unknown"
try:
mysize = os.stat(filename)[stat.ST_SIZE]
if mydict["size"] != mysize:
return False, (_("Filesize does not match recorded size"), mysize, mydict["size"])
except OSError as e:
if e.errno == errno.ENOENT:
raise portage.exception.FileNotFound(filename)
return False, (str(e), None, None)
verifiable_hash_types = set(mydict).intersection(hashfunc_map)
verifiable_hash_types.discard("size")
if not verifiable_hash_types:
expected = set(hashfunc_map)
expected.discard("size")
expected = list(expected)
expected.sort()
expected = " ".join(expected)
got = set(mydict)
got.discard("size")
got = list(got)
got.sort()
got = " ".join(got)
return False, (_("Insufficient data for checksum verification"), got, expected)
for x in sorted(mydict):
if x == "size":
continue
elif x in hashfunc_map:
myhash = perform_checksum(filename, x, calc_prelink=calc_prelink)[0]
if mydict[x] != myhash:
if strict:
raise portage.exception.DigestException(
("Failed to verify '$(file)s' on " + "checksum type '%(type)s'") % {"file": filename, "type": x}
)
else:
file_is_ok = False
reason = (("Failed on %s verification" % x), myhash, mydict[x])
break
return file_is_ok, reason
开发者ID:Neuvoo,项目名称:legacy-portage,代码行数:60,代码来源:checksum.py
示例15: _ensure_log_subdirs
def _ensure_log_subdirs(logdir, subdir):
"""
This assumes that logdir exists, and creates subdirectories down
to subdir as necessary. The gid of logdir is copied to all
subdirectories, along with 0x2070 mode bits if present. Both logdir
and subdir are assumed to be normalized absolute paths.
"""
st = os.stat(logdir)
uid = -1
gid = st.st_gid
grp_mode = 0o2070 & st.st_mode
# If logdir is writable by the portage group but its uid
# is not portage_uid, then set the uid to portage_uid if
# we have privileges to do so, for compatibility with our
# default logrotate config (see bug 378451). With the
# "su portage portage" directive and logrotate-3.8.0,
# logrotate's chown call during the compression phase will
# only succeed if the log file's uid is portage_uid.
if grp_mode and gid == portage_gid and \
portage.data.secpass >= 2:
uid = portage_uid
if st.st_uid != portage_uid:
ensure_dirs(logdir, uid=uid)
logdir_split_len = len(logdir.split(os.sep))
subdir_split = subdir.split(os.sep)[logdir_split_len:]
subdir_split.reverse()
current = logdir
while subdir_split:
current = os.path.join(current, subdir_split.pop())
ensure_dirs(current, uid=uid, gid=gid, mode=grp_mode, mask=0)
开发者ID:gentoo,项目名称:portage,代码行数:32,代码来源:prepare_build_dirs.py
示例16: _migrate
def _migrate(self, update_location):
"""
When repo.user_location is a normal directory, migrate it to
storage so that it can be replaced with a symlink. After migration,
commit the content as the latest snapshot.
"""
try:
os.rename(self._user_location, update_location)
except OSError:
portage.util.ensure_dirs(update_location)
portage.util.apply_stat_permissions(update_location,
os.stat(self._user_location))
# It's probably on a different device, so copy it.
yield self._check_call(['rsync', '-a',
self._user_location + '/', update_location + '/'])
# Remove the old copy so that symlink can be created. Run with
# maximum privileges, since removal requires write access to
# the parent directory.
yield self._check_call(['rm', '-rf', user_location], privileged=True)
self._update_location = update_location
# Make this copy the latest snapshot
yield self.commit_update()
开发者ID:gentoo,项目名称:portage,代码行数:25,代码来源:hardlink_rcu.py
示例17: RecursiveFileLoader
def RecursiveFileLoader(filename):
"""
If filename is of type file, return a generate that yields filename
else if filename is of type directory, return a generator that fields
files in that directory.
Ignore files beginning with . or ending in ~.
Prune CVS directories.
@param filename: name of a file/directory to traverse
@rtype: list
@returns: List of files to process
"""
try:
st = os.stat(filename)
except OSError:
return
if stat.S_ISDIR(st.st_mode):
for root, dirs, files in os.walk(filename):
for d in list(dirs):
if d[:1] == "." or d == "CVS":
dirs.remove(d)
for f in files:
try:
f = _unicode_decode(f, encoding=_encodings["fs"], errors="strict")
except UnicodeDecodeError:
continue
if f[:1] == "." or f[-1:] == "~":
continue
yield os.path.join(root, f)
else:
yield filename
开发者ID:sysrqb,项目名称:portage-funtoo,代码行数:33,代码来源:loaders.py
示例18: have_ebuild_dir
def have_ebuild_dir(path, maxdepth=3):
"""
Try to figure out if 'path' or a subdirectory contains one or more
ebuild files named appropriately for their parent directory.
"""
stack = [(normalize_path(path), 1)]
while stack:
path, depth = stack.pop()
basename = os.path.basename(path)
try:
listdir = os.listdir(path)
except OSError:
continue
for filename in listdir:
abs_filename = os.path.join(path, filename)
try:
st = os.stat(abs_filename)
except OSError:
continue
if stat.S_ISDIR(st.st_mode):
if depth < maxdepth:
stack.append((abs_filename, depth + 1))
elif stat.S_ISREG(st.st_mode):
if filename.endswith(".ebuild") and \
filename.startswith(basename + "-"):
return os.path.dirname(os.path.dirname(path))
开发者ID:pombredanne,项目名称:portage-3,代码行数:26,代码来源:utilities.py
示例19: _pkgindex_entry
def _pkgindex_entry(self, cpv):
"""
Performs checksums and evaluates USE flag conditionals.
Raises InvalidDependString if necessary.
@rtype: dict
@return: a dict containing entry for the give cpv.
"""
pkg_path = self.getname(cpv)
d = dict(zip(self._pkgindex_aux_keys,
self.dbapi.aux_get(cpv, self._pkgindex_aux_keys)))
d.update(perform_multiple_checksums(
pkg_path, hashes=self._pkgindex_hashes))
d["CPV"] = cpv
st = os.stat(pkg_path)
d["MTIME"] = str(st[stat.ST_MTIME])
d["SIZE"] = str(st.st_size)
rel_path = self._pkg_paths[cpv]
# record location if it's non-default
if rel_path != cpv + ".tbz2":
d["PATH"] = rel_path
self._eval_use_flags(cpv, d)
return d
开发者ID:devurandom,项目名称:portage,代码行数:28,代码来源:bintree.py
示例20: load
def load(self):
atoms_changed = False
try:
mtime = os.stat(self._filename).st_mtime
except (OSError, IOError):
mtime = None
if (not self._loaded or self._mtime != mtime):
try:
data, errors = self.loader.load()
for fname in errors:
for e in errors[fname]:
self.errors.append(fname+": "+e)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
del e
data = {}
nonatoms = list(data)
self._mtime = mtime
atoms_changed = True
else:
nonatoms = list(self._nonatoms)
if atoms_changed:
self._setAtoms(nonatoms)
开发者ID:jonasstein,项目名称:portage,代码行数:25,代码来源:files.py
注:本文中的portage.os.stat函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论