• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python os.rename函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中portage.os.rename函数的典型用法代码示例。如果您正苦于以下问题:Python rename函数的具体用法?Python rename怎么用?Python rename使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了rename函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _setitem

	def _setitem(self, cpv, values):
		s = cpv.rfind("/")
		fp=os.path.join(self.location,cpv[:s],".update.%i.%s" % (os.getpid(), cpv[s+1:]))
		try:
			myf = codecs.open(_unicode_encode(fp,
				encoding=_encodings['fs'], errors='strict'),
				mode='w', encoding=_encodings['repo.content'],
				errors='backslashreplace')
		except (OSError, IOError) as e:
			if errno.ENOENT == e.errno:
				try:
					self._ensure_dirs(cpv)
					myf = codecs.open(_unicode_encode(fp,
						encoding=_encodings['fs'], errors='strict'),
						mode='w', encoding=_encodings['repo.content'],
						errors='backslashreplace')
				except (OSError, IOError) as e:
					raise cache_errors.CacheCorruption(cpv, e)
			else:
				raise cache_errors.CacheCorruption(cpv, e)
		

		for x in self.auxdbkey_order:
			myf.write(values.get(x,"")+"\n")

		myf.close()
		self._ensure_access(fp, mtime=values["_mtime_"])
		#update written.  now we move it.
		new_fp = os.path.join(self.location,cpv)
		try:
			os.rename(fp, new_fp)
		except (OSError, IOError) as e:
			os.remove(fp)
			raise cache_errors.CacheCorruption(cpv, e)
开发者ID:TommyD,项目名称:gentoo-portage-multilib,代码行数:34,代码来源:flat_list.py


示例2: recompose_mem

	def recompose_mem(self, xpdata, break_hardlinks=True):
		"""
		Update the xpak segment.
		@param xpdata: A new xpak segment to be written, like that returned
			from the xpak_mem() function.
		@param break_hardlinks: If hardlinks exist, create a copy in order
			to break them. This makes it safe to use hardlinks to create
			cheap snapshots of the repository, which is useful for solving
			race conditions on binhosts as described here:
			https://crbug.com/185031
			Default is True.
		"""
		self.scan() # Don't care about condition... We'll rewrite the data anyway.

		if break_hardlinks and self.filestat and self.filestat.st_nlink > 1:
			tmp_fname = "%s.%d" % (self.file, os.getpid())
			shutil.copyfile(self.file, tmp_fname)
			try:
				portage.util.apply_stat_permissions(self.file, self.filestat)
			except portage.exception.OperationNotPermitted:
				pass
			os.rename(tmp_fname, self.file)

		myfile = open(_unicode_encode(self.file,
			encoding=_encodings['fs'], errors='strict'), 'ab+')
		if not myfile:
			raise IOError
		myfile.seek(-self.xpaksize, 2) # 0,2 or -0,2 just mean EOF.
		myfile.truncate()
		myfile.write(xpdata + encodeint(len(xpdata)) + b'STOP')
		myfile.flush()
		myfile.close()
		return 1
开发者ID:amadio,项目名称:portage,代码行数:33,代码来源:xpak.py


示例3: _hardlink_atomic

	def _hardlink_atomic(self, src, dest, dir_info):

		head, tail = os.path.split(dest)
		hardlink_tmp = os.path.join(head, ".%s._mirrordist_hardlink_.%s" % \
			(tail, os.getpid()))

		try:
			try:
				os.link(src, hardlink_tmp)
			except OSError as e:
				if e.errno != errno.EXDEV:
					msg = "hardlink %s from %s failed: %s" % \
						(self.distfile, dir_info, e)
					self.scheduler.output(msg + '\n', background=True,
						log_path=self._log_path)
					logging.error(msg)
				return False

			try:
				os.rename(hardlink_tmp, dest)
			except OSError as e:
				msg = "hardlink rename '%s' from %s failed: %s" % \
					(self.distfile, dir_info, e)
				self.scheduler.output(msg + '\n', background=True,
					log_path=self._log_path)
				logging.error(msg)
				return False
		finally:
			try:
				os.unlink(hardlink_tmp)
			except OSError:
				pass

		return True
开发者ID:amadio,项目名称:portage,代码行数:34,代码来源:FetchTask.py


示例4: _migrate

	def _migrate(self, update_location):
		"""
		When repo.user_location is a normal directory, migrate it to
		storage so that it can be replaced with a symlink. After migration,
		commit the content as the latest snapshot.
		"""
		try:
			os.rename(self._user_location, update_location)
		except OSError:
			portage.util.ensure_dirs(update_location)
			portage.util.apply_stat_permissions(update_location,
				os.stat(self._user_location))
			# It's probably on a different device, so copy it.
			yield self._check_call(['rsync', '-a',
				self._user_location + '/', update_location + '/'])

			# Remove the old copy so that symlink can be created. Run with
			# maximum privileges, since removal requires write access to
			# the parent directory.
			yield self._check_call(['rm', '-rf', user_location], privileged=True)

		self._update_location = update_location

		# Make this copy the latest snapshot
		yield self.commit_update()
开发者ID:gentoo,项目名称:portage,代码行数:25,代码来源:hardlink_rcu.py


示例5: file_archive_post_process

def file_archive_post_process(archive):
	"""Rename the archive file with the .dist.new suffix to a .dist suffix"""
	if os.path.lexists(archive + '.dist.new'):
		dest = "%s.dist" % archive
		if os.path.isdir(dest) and not os.path.islink(dest):
			_file_archive_rotate(dest)
		os.rename(archive + '.dist.new', dest)
开发者ID:aeroniero33,项目名称:portage,代码行数:7,代码来源:dispatch_conf.py


示例6: _chpathtool_exit

	def _chpathtool_exit(self, chpathtool):
		if self._final_exit(chpathtool) != os.EX_OK:
			self._unlock_builddir()
			self._writemsg_level("!!! Error Adjusting Prefix to %s\n" %
				(self.settings["EPREFIX"],),
				noiselevel=-1, level=logging.ERROR)
			self.wait()
			return

		# We want to install in "our" prefix, not the binary one
		with io.open(_unicode_encode(os.path.join(self._infloc, "EPREFIX"),
			encoding=_encodings['fs'], errors='strict'), mode='w',
			encoding=_encodings['repo.content'], errors='strict') as f:
			f.write(self.settings["EPREFIX"] + "\n")

		# Move the files to the correct location for merge.
		image_tmp_dir = os.path.join(
			self.settings["PORTAGE_BUILDDIR"], "image_tmp")
		build_d = os.path.join(self.settings["D"],
			self._build_prefix.lstrip(os.sep))
		if not os.path.isdir(build_d):
			# Assume this is a virtual package or something.
			shutil.rmtree(self._image_dir)
			ensure_dirs(self.settings["ED"])
		else:
			os.rename(build_d, image_tmp_dir)
			shutil.rmtree(self._image_dir)
			ensure_dirs(os.path.dirname(self.settings["ED"].rstrip(os.sep)))
			os.rename(image_tmp_dir, self.settings["ED"])

		self.wait()
开发者ID:aeroniero33,项目名称:portage,代码行数:31,代码来源:Binpkg.py


示例7: close

	def close(self):
		"""Closes the temporary file, copies permissions (if possible),
		and performs the atomic replacement via os.rename().  If the abort()
		method has been called, then the temp file is closed and removed."""
		f = object.__getattribute__(self, '_file')
		real_name = object.__getattribute__(self, '_real_name')
		if not f.closed:
			try:
				f.close()
				if not object.__getattribute__(self, '_aborted'):
					try:
						apply_stat_permissions(f.name, os.stat(real_name))
					except OperationNotPermitted:
						pass
					except FileNotFound:
						pass
					except OSError as oe: # from the above os.stat call
						if oe.errno in (errno.ENOENT, errno.EPERM):
							pass
						else:
							raise
					os.rename(f.name, real_name)
			finally:
				# Make sure we cleanup the temp file
				# even if an exception is raised.
				try:
					os.unlink(f.name)
				except OSError as oe:
					pass
开发者ID:fastinetserver,项目名称:portage-idfetch,代码行数:29,代码来源:__init__.py


示例8: _start

	def _start(self):

		distfile_path = os.path.join(
			self.config.options.distfiles, self.distfile)

		if self.config.options.recycle_dir is not None:
			distfile_path = os.path.join(self.config.options.distfiles, self.distfile)
			recycle_path = os.path.join(
				self.config.options.recycle_dir, self.distfile)
			if self.config.options.dry_run:
				logging.info(("dry-run: move '%s' from "
					"distfiles to recycle") % self.distfile)
			else:
				logging.debug(("move '%s' from "
					"distfiles to recycle") % self.distfile)
				try:
					os.rename(distfile_path, recycle_path)
				except OSError as e:
					if e.errno != errno.EXDEV:
						logging.error(("rename %s from distfiles to "
							"recycle failed: %s") % (self.distfile, e))
				else:
					self.returncode = os.EX_OK
					self._async_wait()
					return

				self._start_task(
					FileCopier(src_path=distfile_path,
						dest_path=recycle_path,
						background=False),
					self._recycle_copier_exit)
				return

		success = True

		if self.config.options.dry_run:
			logging.info(("dry-run: delete '%s' from "
				"distfiles") % self.distfile)
		else:
			logging.debug(("delete '%s' from "
				"distfiles") % self.distfile)
			try:
				os.unlink(distfile_path)
			except OSError as e:
				if e.errno not in (errno.ENOENT, errno.ESTALE):
					logging.error("%s unlink failed in distfiles: %s" %
						(self.distfile, e))
					success = False

		if success:
			self._success()
			self.returncode = os.EX_OK
		else:
			self.returncode = 1

		self._async_wait()
开发者ID:Spencerx,项目名称:portage,代码行数:56,代码来源:DeletionTask.py


示例9: rcs_archive

def rcs_archive(archive, curconf, newconf, mrgconf):
	"""Archive existing config in rcs (on trunk). Then, if mrgconf is
	specified and an old branch version exists, merge the user's changes
	and the distributed changes and put the result into mrgconf.  Lastly,
	if newconf was specified, leave it in the archive dir with a .dist.new
	suffix along with the last 1.1.1 branch version with a .dist suffix."""

	try:
		os.makedirs(os.path.dirname(archive))
	except OSError:
		pass

	try:
		curconf_st = os.lstat(curconf)
	except OSError:
		curconf_st = None

	if curconf_st is not None and \
		(stat.S_ISREG(curconf_st.st_mode) or
		stat.S_ISLNK(curconf_st.st_mode)):
		_archive_copy(curconf_st, curconf, archive)

	if os.path.lexists(archive + ',v'):
		os.system(RCS_LOCK + ' ' + archive)
	os.system(RCS_PUT + ' ' + archive)

	ret = 0
	mystat = None
	if newconf:
		try:
			mystat = os.lstat(newconf)
		except OSError:
			pass

	if mystat is not None and \
		(stat.S_ISREG(mystat.st_mode) or
		stat.S_ISLNK(mystat.st_mode)):
		os.system(RCS_GET + ' -r' + RCS_BRANCH + ' ' + archive)
		has_branch = os.path.lexists(archive)
		if has_branch:
			os.rename(archive, archive + '.dist')

		_archive_copy(mystat, newconf, archive)

		if has_branch:
			if mrgconf and os.path.isfile(archive) and \
				os.path.isfile(mrgconf):
				# This puts the results of the merge into mrgconf.
				ret = os.system(RCS_MERGE % (archive, mrgconf))
				os.chmod(mrgconf, mystat.st_mode)
				os.chown(mrgconf, mystat.st_uid, mystat.st_gid)
		os.rename(archive, archive + '.dist.new')

	return ret
开发者ID:aeroniero33,项目名称:portage,代码行数:54,代码来源:dispatch_conf.py


示例10: rcs_archive_post_process

def rcs_archive_post_process(archive):
	"""Check in the archive file with the .dist.new suffix on the branch
	and remove the one with the .dist suffix."""
	os.rename(archive + '.dist.new', archive)
	if os.path.lexists(archive + '.dist'):
		# Commit the last-distributed version onto the branch.
		os.system(RCS_LOCK + RCS_BRANCH + ' ' + archive)
		os.system(RCS_PUT + ' -r' + RCS_BRANCH + ' ' + archive)
		os.unlink(archive + '.dist')
	else:
		# Forcefully commit the last-distributed version onto the branch.
		os.system(RCS_PUT + ' -f -r' + RCS_BRANCH + ' ' + archive)
开发者ID:aeroniero33,项目名称:portage,代码行数:12,代码来源:dispatch_conf.py


示例11: gpgsign

def gpgsign(filename, repoman_settings, options):
	gpgcmd = repoman_settings.get("PORTAGE_GPG_SIGNING_COMMAND")
	if gpgcmd in [None, '']:
		raise MissingParameter("PORTAGE_GPG_SIGNING_COMMAND is unset!"
			" Is make.globals missing?")
	if "${PORTAGE_GPG_KEY}" in gpgcmd and \
		"PORTAGE_GPG_KEY" not in repoman_settings:
		raise MissingParameter("PORTAGE_GPG_KEY is unset!")
	if "${PORTAGE_GPG_DIR}" in gpgcmd:
		if "PORTAGE_GPG_DIR" not in repoman_settings:
			repoman_settings["PORTAGE_GPG_DIR"] = \
				os.path.expanduser("~/.gnupg")
			logging.info(
				"Automatically setting PORTAGE_GPG_DIR to '%s'" %
				repoman_settings["PORTAGE_GPG_DIR"])
		else:
			repoman_settings["PORTAGE_GPG_DIR"] = \
				os.path.expanduser(repoman_settings["PORTAGE_GPG_DIR"])
		if not os.access(repoman_settings["PORTAGE_GPG_DIR"], os.X_OK):
			raise portage.exception.InvalidLocation(
				"Unable to access directory: PORTAGE_GPG_DIR='%s'" %
				repoman_settings["PORTAGE_GPG_DIR"])
	gpgvars = {"FILE": filename}
	for k in ("PORTAGE_GPG_DIR", "PORTAGE_GPG_KEY"):
		v = repoman_settings.get(k)
		if v is not None:
			gpgvars[k] = v
	gpgcmd = portage.util.varexpand(gpgcmd, mydict=gpgvars)
	if options.pretend:
		print("(" + gpgcmd + ")")
	else:
		# Encode unicode manually for bug #310789.
		gpgcmd = portage.util.shlex_split(gpgcmd)

		if sys.hexversion < 0x3020000 and sys.hexversion >= 0x3000000 and \
			not os.path.isabs(gpgcmd[0]):
			# Python 3.1 _execvp throws TypeError for non-absolute executable
			# path passed as bytes (see http://bugs.python.org/issue8513).
			fullname = find_binary(gpgcmd[0])
			if fullname is None:
				raise portage.exception.CommandNotFound(gpgcmd[0])
			gpgcmd[0] = fullname

		gpgcmd = [
			_unicode_encode(arg, encoding=_encodings['fs'], errors='strict')
			for arg in gpgcmd]
		rValue = subprocess.call(gpgcmd)
		if rValue == os.EX_OK:
			os.rename(filename + ".asc", filename)
		else:
			raise portage.exception.PortageException(
				"!!! gpg exited with '" + str(rValue) + "' status")
开发者ID:aeroniero33,项目名称:portage,代码行数:52,代码来源:gpg.py


示例12: _setitem

    def _setitem(self, cpv, values):
        s = cpv.rfind("/")
        fp = os.path.join(self.location, cpv[:s], ".update.%i.%s" % (os.getpid(), cpv[s + 1 :]))
        try:
            myf = io.open(
                _unicode_encode(fp, encoding=_encodings["fs"], errors="strict"),
                mode="w",
                encoding=_encodings["repo.content"],
                errors="backslashreplace",
            )
        except (IOError, OSError) as e:
            if errno.ENOENT == e.errno:
                try:
                    self._ensure_dirs(cpv)
                    myf = io.open(
                        _unicode_encode(fp, encoding=_encodings["fs"], errors="strict"),
                        mode="w",
                        encoding=_encodings["repo.content"],
                        errors="backslashreplace",
                    )
                except (OSError, IOError) as e:
                    raise cache_errors.CacheCorruption(cpv, e)
            else:
                raise cache_errors.CacheCorruption(cpv, e)

        try:
            for k in self._write_keys:
                v = values.get(k)
                if not v:
                    continue
                    # NOTE: This format string requires unicode_literals, so that
                    # k and v are coerced to unicode, in order to prevent TypeError
                    # when writing raw bytes to TextIOWrapper with Python 2.
                myf.write("%s=%s\n" % (k, v))
        finally:
            myf.close()
        self._ensure_access(fp)

        # update written.  now we move it.

        new_fp = os.path.join(self.location, cpv)
        try:
            os.rename(fp, new_fp)
        except (OSError, IOError) as e:
            os.remove(fp)
            raise cache_errors.CacheCorruption(cpv, e)
开发者ID:TriadicTek,项目名称:Portage,代码行数:46,代码来源:flat_hash.py


示例13: _fetch_digester_exit

	def _fetch_digester_exit(self, digester):

		self._assert_current(digester)
		if self._was_cancelled():
			self.wait()
			return

		if digester.returncode != os.EX_OK:
			msg = "%s %s digester failed unexpectedly" % \
			(self.distfile, self._fetch_tmp_dir_info)
			self.scheduler.output(msg + '\n', background=True,
				log_path=self._log_path)
			logging.error(msg)
		else:
			bad_digest = self._find_bad_digest(digester.digests)
			if bad_digest is not None:
				msg = "%s has bad %s digest: expected %s, got %s" % \
					(self.distfile, bad_digest,
					self.digests[bad_digest], digester.digests[bad_digest])
				self.scheduler.output(msg + '\n', background=True,
					log_path=self._log_path)
				try:
					os.unlink(self._fetch_tmp_file)
				except OSError:
					pass
			else:
				dest = os.path.join(self.config.options.distfiles, self.distfile)
				try:
					os.rename(self._fetch_tmp_file, dest)
				except OSError:
					self._start_task(
						FileCopier(src_path=self._fetch_tmp_file,
							dest_path=dest,
							background=(self.background and
								self._log_path is not None),
							logfile=self._log_path),
						self._fetch_copier_exit)
					return
				else:
					self._success()
					self.returncode = os.EX_OK
					self.wait()
					return

		self._try_next_mirror()
开发者ID:amadio,项目名称:portage,代码行数:45,代码来源:FetchTask.py


示例14: _gpg_proc_exit

	def _gpg_proc_exit(self, gpg_proc):
		if self._default_exit(gpg_proc) != os.EX_OK:
			self.wait()
			return

		rename_args = (self._manifest_path + ".asc", self._manifest_path)
		try:
			os.rename(*rename_args)
		except OSError as e:
			writemsg("!!! rename('%s', '%s'): %s\n" % rename_args + (e,),
				noiselevel=-1)
			try:
				os.unlink(self._manifest_path + ".asc")
			except OSError:
				pass
			self.returncode = 1
		else:
			self.returncode = os.EX_OK

		self._current_task = None
		self.wait()
开发者ID:clickbeetle,项目名称:portage-cb,代码行数:21,代码来源:ManifestTask.py


示例15: _setitem

	def _setitem(self, cpv, values):
		try:
			fd, fp = tempfile.mkstemp(dir=self.location)
		except EnvironmentError as e:
			raise cache_errors.CacheCorruption(cpv, e)

		with io.open(fd, mode='w',
			encoding=_encodings['repo.content'],
			errors='backslashreplace') as myf:
			for k in self._write_keys:
				v = values.get(k)
				if not v:
					continue
				# NOTE: This format string requires unicode_literals, so that
				# k and v are coerced to unicode, in order to prevent TypeError
				# when writing raw bytes to TextIOWrapper with Python 2.
				myf.write("%s=%s\n" % (k, v))

		self._ensure_access(fp)

		#update written.  now we move it.

		new_fp = os.path.join(self.location,cpv)
		try:
			os.rename(fp, new_fp)
		except EnvironmentError as e:
			success = False
			try:
				if errno.ENOENT == e.errno:
					try:
						self._ensure_dirs(cpv)
						os.rename(fp, new_fp)
						success = True
					except EnvironmentError as e:
						raise cache_errors.CacheCorruption(cpv, e)
				else:
					raise cache_errors.CacheCorruption(cpv, e)
			finally:
				if not success:
					os.remove(fp)
开发者ID:amadio,项目名称:portage,代码行数:40,代码来源:flat_hash.py


示例16: _setitem

	def _setitem(self, cpv, values):
#		import pdb;pdb.set_trace()
		s = cpv.rfind("/")
		fp = os.path.join(self.location,cpv[:s],".update.%i.%s" % (os.getpid(), cpv[s+1:]))
		try:
			myf = codecs.open(_unicode_encode(fp,
				encoding=_encodings['fs'], errors='strict'),
				mode='w', encoding=_encodings['repo.content'],
				errors='backslashreplace')
		except (IOError, OSError) as e:
			if errno.ENOENT == e.errno:
				try:
					self._ensure_dirs(cpv)
					myf = codecs.open(_unicode_encode(fp,
						encoding=_encodings['fs'], errors='strict'),
						mode='w', encoding=_encodings['repo.content'],
						errors='backslashreplace')
				except (OSError, IOError) as e:
					raise cache_errors.CacheCorruption(cpv, e)
			else:
				raise cache_errors.CacheCorruption(cpv, e)

		try:
			for k in self._write_keys:
				v = values.get(k)
				if not v:
					continue
				myf.write("%s=%s\n" % (k, v))
		finally:
			myf.close()
		self._ensure_access(fp)

		#update written.  now we move it.

		new_fp = os.path.join(self.location,cpv)
		try:
			os.rename(fp, new_fp)
		except (OSError, IOError) as e:
			os.remove(fp)
			raise cache_errors.CacheCorruption(cpv, e)
开发者ID:Neuvoo,项目名称:legacy-portage,代码行数:40,代码来源:flat_hash.py


示例17: _checksum_failure_temp_file

def _checksum_failure_temp_file(distdir, basename):
	"""
	First try to find a duplicate temp file with the same checksum and return
	that filename if available. Otherwise, use mkstemp to create a new unique
	filename._checksum_failure_.$RANDOM, rename the given file, and return the
	new filename. In any case, filename will be renamed or removed before this
	function returns a temp filename.
	"""

	filename = os.path.join(distdir, basename)
	size = os.stat(filename).st_size
	checksum = None
	tempfile_re = re.compile(re.escape(basename) + r'\._checksum_failure_\..*')
	for temp_filename in os.listdir(distdir):
		if not tempfile_re.match(temp_filename):
			continue
		temp_filename = os.path.join(distdir, temp_filename)
		try:
			if size != os.stat(temp_filename).st_size:
				continue
		except OSError:
			continue
		try:
			temp_checksum = perform_md5(temp_filename)
		except FileNotFound:
			# Apparently the temp file disappeared. Let it go.
			continue
		if checksum is None:
			checksum = perform_md5(filename)
		if checksum == temp_checksum:
			os.unlink(filename)
			return temp_filename

	fd, temp_filename = \
		tempfile.mkstemp("", basename + "._checksum_failure_.", distdir)
	os.close(fd)
	os.rename(filename, temp_filename)
	return temp_filename
开发者ID:clickbeetle,项目名称:portage-cb,代码行数:38,代码来源:fetch.py


示例18: _file_archive_rotate

def _file_archive_rotate(archive):
	"""
	Rename archive to archive + '.1', and perform similar rotation
	for files up to archive + '.9'.

	@param archive: file path to archive
	@type archive: str
	"""

	max_suf = 0
	try:
		for max_suf, max_st, max_path in (
			(suf, os.lstat(path), path) for suf, path in (
			(suf, "%s.%s" % (archive, suf)) for suf in range(
			1, _ARCHIVE_ROTATE_MAX + 1))):
			pass
	except OSError as e:
		if e.errno not in (errno.ENOENT, errno.ESTALE):
			raise
		# There's already an unused suffix.
	else:
		# Free the max suffix in order to avoid possible problems
		# when we rename another file or directory to the same
		# location (see bug 256376).
		if stat.S_ISDIR(max_st.st_mode):
			# Removing a directory might destroy something important,
			# so rename it instead.
			head, tail = os.path.split(archive)
			placeholder = tempfile.NamedTemporaryFile(
				prefix="%s." % tail,
				dir=head)
			placeholder.close()
			os.rename(max_path, placeholder.name)
		else:
			os.unlink(max_path)

		# The max suffix is now unused.
		max_suf -= 1

	for suf in range(max_suf + 1, 1, -1):
		os.rename("%s.%s" % (archive, suf - 1), "%s.%s" % (archive, suf))

	os.rename(archive, "%s.1" % (archive,))
开发者ID:aeroniero33,项目名称:portage,代码行数:43,代码来源:dispatch_conf.py


示例19: commit_update

	def commit_update(self):
		update_location = self.current_update
		self._update_location = None
		try:
			snapshots = [int(name) for name in os.listdir(self._snapshots_dir)]
		except OSError:
			snapshots = []
			portage.util.ensure_dirs(self._snapshots_dir)
			portage.util.apply_stat_permissions(self._snapshots_dir,
				os.stat(self._storage_location))
		if snapshots:
			new_id = max(snapshots) + 1
		else:
			new_id = 1
		os.rename(update_location, os.path.join(self._snapshots_dir, str(new_id)))
		new_symlink = self._latest_symlink + '.new'
		try:
			os.unlink(new_symlink)
		except OSError:
			pass
		os.symlink('snapshots/{}'.format(new_id), new_symlink)
		os.rename(new_symlink, self._latest_symlink)

		try:
			user_location_correct = os.path.samefile(self._user_location, self._latest_symlink)
		except OSError:
			user_location_correct = False

		if not user_location_correct:
			new_symlink = self._user_location + '.new'
			try:
				os.unlink(new_symlink)
			except OSError:
				pass
			os.symlink(self._latest_symlink, new_symlink)
			os.rename(new_symlink, self._user_location)

		coroutine_return()
		yield None
开发者ID:gentoo,项目名称:portage,代码行数:39,代码来源:hardlink_rcu.py


示例20: testEbuildFetch


#.........这里部分代码省略.........
						with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
							self.assertEqual(f.read(), distfiles[k])

					# Test empty files in DISTDIR
					for k in settings['AA'].split():
						file_path = os.path.join(settings['DISTDIR'], k)
						with open(file_path, 'wb') as f:
							pass
						self.assertEqual(os.stat(file_path).st_size, 0)
					self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
					for k in settings['AA'].split():
						with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
							self.assertEqual(f.read(), distfiles[k])

					# Test non-empty files containing null bytes in DISTDIR
					for k in settings['AA'].split():
						file_path = os.path.join(settings['DISTDIR'], k)
						with open(file_path, 'wb') as f:
							f.write(len(distfiles[k]) * b'\0')
						self.assertEqual(os.stat(file_path).st_size, len(distfiles[k]))
					self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
					for k in settings['AA'].split():
						with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
							self.assertEqual(f.read(), distfiles[k])

					# Test PORTAGE_RO_DISTDIRS
					settings['PORTAGE_RO_DISTDIRS'] = '"{}"'.format(ro_distdir)
					orig_fetchcommand = settings['FETCHCOMMAND']
					orig_resumecommand = settings['RESUMECOMMAND']
					try:
						settings['FETCHCOMMAND'] = settings['RESUMECOMMAND'] = ''
						for k in settings['AA'].split():
							file_path = os.path.join(settings['DISTDIR'], k)
							os.rename(file_path, os.path.join(ro_distdir, k))
						self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
						for k in settings['AA'].split():
							file_path = os.path.join(settings['DISTDIR'], k)
							self.assertTrue(os.path.islink(file_path))
							with open(file_path, 'rb') as f:
								self.assertEqual(f.read(), distfiles[k])
							os.unlink(file_path)
					finally:
						settings.pop('PORTAGE_RO_DISTDIRS')
						settings['FETCHCOMMAND'] = orig_fetchcommand
						settings['RESUMECOMMAND'] = orig_resumecommand

					# Test local filesystem in GENTOO_MIRRORS
					orig_mirrors = settings['GENTOO_MIRRORS']
					orig_fetchcommand = settings['FETCHCOMMAND']
					try:
						settings['GENTOO_MIRRORS'] = ro_distdir
						settings['FETCHCOMMAND'] = settings['RESUMECOMMAND'] = ''
						self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
						for k in settings['AA'].split():
							with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
								self.assertEqual(f.read(), distfiles[k])
					finally:
						settings['GENTOO_MIRRORS'] = orig_mirrors
						settings['FETCHCOMMAND'] = orig_fetchcommand
						settings['RESUMECOMMAND'] = orig_resumecommand

					# Test readonly DISTDIR
					orig_distdir_mode = os.stat(settings['DISTDIR']).st_mode
					try:
						os.chmod(settings['DISTDIR'], 0o555)
						self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
开发者ID:gentoo,项目名称:portage,代码行数:67,代码来源:test_fetch.py



注:本文中的portage.os.rename函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python os.stat函数代码示例发布时间:2022-05-25
下一篇:
Python os.pipe函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap