本文整理汇总了Python中posixpath.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upload
def upload(self, source, target):
logger.debug("Copying '%s' -> '%s'", source, target)
if self.isdir(target):
target = posixpath.join(target, os.path.basename(source))
source = os.path.expanduser(source)
if not os.path.isdir(source):
self._sftp.put(source, target)
return
for rootdir, _, files in os.walk(source):
targetdir = os.path.normpath(
os.path.join(
target,
os.path.relpath(rootdir, source))).replace("\\", "/")
self.mkdir(targetdir)
for entry in files:
local_path = os.path.join(rootdir, entry)
remote_path = posixpath.join(targetdir, entry)
if self.exists(remote_path):
self._sftp.unlink(remote_path)
self._sftp.put(local_path, remote_path)
开发者ID:githubofhuo,项目名称:fuel-devops,代码行数:25,代码来源:ssh_client.py
示例2: _hadoop_log_dirs
def _hadoop_log_dirs(self, output_dir=None):
"""Yield all possible places to look for hadoop logs."""
# hadoop_log_dirs opt overrides all this
if self._opts['hadoop_log_dirs']:
for path in self._opts['hadoop_log_dirs']:
yield path
return
hadoop_log_dir = os.environ.get('HADOOP_LOG_DIR')
if hadoop_log_dir:
yield hadoop_log_dir
yarn = uses_yarn(self.get_hadoop_version())
if yarn:
yarn_log_dir = os.environ.get('YARN_LOG_DIR')
if yarn_log_dir:
yield yarn_log_dir
yield _DEFAULT_YARN_HDFS_LOG_DIR
if output_dir:
# Cloudera style of logging
yield posixpath.join(output_dir, '_logs')
for hadoop_dir in self._hadoop_dirs():
yield posixpath.join(hadoop_dir, 'logs')
# hard-coded fallback paths
if yarn:
for path in _FALLBACK_HADOOP_YARN_LOG_DIRS:
yield path
for path in _FALLBACK_HADOOP_LOG_DIRS:
yield path
开发者ID:okomestudio,项目名称:mrjob,代码行数:35,代码来源:hadoop.py
示例3: test_folder
def test_folder(self):
self.client.write('hello', 'hello, world!')
self.client.write('foo/hey', 'hey, world!')
infos = list(self.client.walk(''))
eq_(len(infos), 2)
eq_(infos[0], (psp.join(self.client.root), ['foo'], ['hello']))
eq_(infos[1], (psp.join(self.client.root, 'foo'), [], ['hey']))
开发者ID:e-heller,项目名称:hdfs,代码行数:7,代码来源:test_client.py
示例4: _GetAPISchemaFilename
def _GetAPISchemaFilename(api_name, file_system, version):
'''Gets the name of the file which may contain the schema for |api_name| in
|file_system|, or None if the API is not found. Note that this may be the
single _EXTENSION_API file which all APIs share in older versions of Chrome,
in which case it is unknown whether the API actually exists there.
'''
if version == 'trunk' or version > _ORIGINAL_FEATURES_MIN_VERSION:
# API schema filenames switch format to unix_hacker_style.
api_name = UnixName(api_name)
# Devtools API names have 'devtools.' prepended to them.
# The corresponding filenames do not.
if 'devtools_' in api_name:
api_name = api_name.replace('devtools_', '')
for api_path in API_PATHS:
try:
for base, _, filenames in file_system.Walk(api_path):
for ext in ('json', 'idl'):
filename = '%s.%s' % (api_name, ext)
if filename in filenames:
return posixpath.join(api_path, base, filename)
if _EXTENSION_API in filenames:
return posixpath.join(api_path, base, _EXTENSION_API)
except FileNotFoundError:
continue
return None
开发者ID:lafezhang,项目名称:chromium-1,代码行数:27,代码来源:availability_finder.py
示例5: runTest
def runTest(self):
"""This tests copying a directory structure to the device.
"""
dvroot = self.dm.deviceRoot
dvpath = posixpath.join(dvroot, 'infratest')
self.dm.removeDir(dvpath)
self.dm.mkDir(dvpath)
p1 = os.path.join('test-files', 'push1')
# Set up local stuff
try:
os.rmdir(p1)
except:
pass
if not os.path.exists(p1):
os.makedirs(os.path.join(p1, 'sub.1', 'sub.2'))
if not os.path.exists(os.path.join(p1, 'sub.1', 'sub.2', 'testfile')):
file(os.path.join(p1, 'sub.1', 'sub.2', 'testfile'), 'w').close()
self.dm.pushDir(p1, posixpath.join(dvpath, 'push1'))
self.assertTrue(
self.dm.dirExists(posixpath.join(dvpath, 'push1', 'sub.1')))
self.assertTrue(self.dm.dirExists(
posixpath.join(dvpath, 'push1', 'sub.1', 'sub.2')))
开发者ID:Andrel322,项目名称:gecko-dev,代码行数:26,代码来源:test_push1.py
示例6: _Install
def _Install(vm):
"""Install YCSB and HBase on 'vm'."""
vm.Install('hbase')
vm.Install('ycsb')
vm.Install('curl')
cluster_name = (FLAGS.google_bigtable_cluster_name or
'pkb-bigtable-{0}'.format(FLAGS.run_uri))
hbase_lib = posixpath.join(hbase.HBASE_DIR, 'lib')
for url in [FLAGS.google_bigtable_hbase_jar_url, TCNATIVE_BORINGSSL_URL]:
jar_name = os.path.basename(url)
jar_path = posixpath.join(YCSB_HBASE_LIB, jar_name)
vm.RemoteCommand('curl -Lo {0} {1}'.format(jar_path, url))
vm.RemoteCommand('cp {0} {1}'.format(jar_path, hbase_lib))
vm.RemoteCommand('echo "export JAVA_HOME=/usr" >> {0}/hbase-env.sh'.format(
hbase.HBASE_CONF_DIR))
context = {
'google_bigtable_endpoint': FLAGS.google_bigtable_endpoint,
'google_bigtable_admin_endpoint': FLAGS.google_bigtable_admin_endpoint,
'project': FLAGS.project or _GetDefaultProject(),
'cluster': cluster_name,
'zone': FLAGS.google_bigtable_zone_name,
'hbase_version': HBASE_VERSION.replace('.', '_')
}
for file_name in HBASE_CONF_FILES:
file_path = data.ResourcePath(file_name)
remote_path = posixpath.join(hbase.HBASE_CONF_DIR,
os.path.basename(file_name))
if file_name.endswith('.j2'):
vm.RenderTemplate(file_path, os.path.splitext(remote_path)[0], context)
else:
vm.RemoteCopy(file_path, remote_path)
开发者ID:MHBauer,项目名称:PerfKitBenchmarker,代码行数:35,代码来源:cloud_bigtable_ycsb_benchmark.py
示例7: run
def run(self):
self.env = env = self.state.document.settings.env
self.genopt = {}
self.warnings = []
names = [x.strip().split()[0] for x in self.content
if x.strip() and re.search(r'^[~a-zA-Z_]', x.strip()[0])]
items = self.get_items(names)
nodes = self.get_table(items)
if 'toctree' in self.options:
suffix = env.config.source_suffix
dirname = posixpath.dirname(env.docname)
tree_prefix = self.options['toctree'].strip()
docnames = []
for name, sig, summary, real_name in items:
docname = posixpath.join(tree_prefix, real_name)
docname = posixpath.normpath(posixpath.join(dirname, docname))
if docname not in env.found_docs:
self.warn('toctree references unknown document %r'
% docname)
docnames.append(docname)
tocnode = addnodes.toctree()
tocnode['includefiles'] = docnames
tocnode['entries'] = [(None, docname) for docname in docnames]
tocnode['maxdepth'] = -1
tocnode['glob'] = None
tocnode = autosummary_toc('', '', tocnode)
nodes.append(tocnode)
return self.warnings + nodes
开发者ID:Fematich,项目名称:article_browser,代码行数:34,代码来源:__init__.py
示例8: update_virtualenv
def update_virtualenv(preindex=False):
"""
update external dependencies on remote host
assumes you've done a code update
"""
_require_target()
if preindex:
root_to_use = env.code_root_preindex
env_to_use = env.virtualenv_root_preindex
else:
root_to_use = env.code_root
env_to_use = env.virtualenv_root
requirements = posixpath.join(root_to_use, 'requirements')
with cd(root_to_use):
cmd_prefix = 'export HOME=/home/%s && source %s/bin/activate && ' % (
env.sudo_user, env_to_use)
# uninstall requirements in uninstall-requirements.txt
# but only the ones that are actually installed (checks pip freeze)
sudo("%s bash scripts/uninstall-requirements.sh" % cmd_prefix,
user=env.sudo_user)
sudo('%s pip install --requirement %s --requirement %s' % (
cmd_prefix,
posixpath.join(requirements, 'prod-requirements.txt'),
posixpath.join(requirements, 'requirements.txt'),
), user=env.sudo_user)
开发者ID:amonkeykong81,项目名称:commcare-hq,代码行数:27,代码来源:fabfile.py
示例9: __init__
def __init__(self, **kwargs):
""":py:class:`~mrjob.hadoop.HadoopJobRunner` takes the same arguments
as :py:class:`~mrjob.runner.MRJobRunner`, plus some additional options
which can be defaulted in :ref:`mrjob.conf <mrjob.conf>`.
"""
super(HadoopJobRunner, self).__init__(**kwargs)
self._hdfs_tmp_dir = fully_qualify_hdfs_path(
posixpath.join(
self._opts['hdfs_scratch_dir'], self._job_name))
# Set output dir if it wasn't set explicitly
self._output_dir = fully_qualify_hdfs_path(
self._output_dir or
posixpath.join(self._hdfs_tmp_dir, 'output'))
# we'll set this up later
self._hdfs_input_files = None
# temp dir for input
self._hdfs_input_dir = None
self._hadoop_log_dir = hadoop_log_dir()
# Running jobs via hadoop assigns a new timestamp to each job.
# Running jobs via mrjob only adds steps.
# Store both of these values to enable log parsing.
self._job_timestamp = None
self._start_step_num = 0
# init hadoop version cache
self._hadoop_version = None
开发者ID:AnthonyNystrom,项目名称:mrjob,代码行数:31,代码来源:hadoop.py
示例10: main
def main():
global VERBOSE, INPUT_DIR, OUTPUT_DIR, INCLUDE, EXCLUDE, SOURCE_PREFIX, TARGET_PREFIX
parser = OptionParser()
(options, args) = parser.parse_args()
if options.verbose != None:
VERBOSE = options.verbose
if options.input != None:
INPUT_DIR = options.input
if options.output != None:
OUTPUT_DIR = options.output
if options.include != None:
INCLUDE = options.include
if options.exclude != None:
EXCLUDE = options.exclude
if options.sourceprefix != None:
SOURCE_PREFIX = options.sourceprefix
if options.targetprefix != None:
TARGET_PREFIX = options.targetprefix
themes = lookup_themes(INPUT_DIR)
for name, theme in themes.iteritems():
theme.initialize()
if theme.hidden():
print "Generating: %s.iby" % name
theme.write_iby(posixpath.join(OUTPUT_DIR, "%s.iby" % name))
else:
print "Generating: %s.thx" % name
theme.write_thx(posixpath.join(OUTPUT_DIR, "%s.thx" % name))
return EXIT_STATUS
开发者ID:kuailexs,项目名称:symbiandump-mw2,代码行数:32,代码来源:rom.py
示例11: india
def india():
"""Our production server in India."""
env.home = '/home/commcarehq/'
env.environment = 'india'
env.sudo_user = 'commcarehq'
env.hosts = ['220.226.209.82']
env.user = prompt("Username: ", default=env.user)
env.django_port = '8001'
env.should_migrate = True
_setup_path()
env.virtualenv_root = posixpath.join(
env.home, '.virtualenvs/commcarehq27')
env.virtualenv_root_preindex = posixpath.join(
env.home, '.virtualenvs/commcarehq27_preindex')
env.roledefs = {
'couch': [],
'pg': [],
'rabbitmq': [],
'django_celery': [],
'sms_queue': [],
'pillow_retry_queue': [],
'django_app': [],
'django_pillowtop': [],
'formsplayer': [],
'staticfiles': [],
'lb': [],
'deploy': [],
'django_monolith': ['220.226.209.82'],
}
env.roles = ['django_monolith']
env.es_endpoint = 'localhost'
env.flower_port = 5555
开发者ID:amonkeykong81,项目名称:commcare-hq,代码行数:35,代码来源:fabfile.py
示例12: generate_code
def generate_code(self):
union_types = self.info_provider.union_types
if not union_types:
return ()
header_template = self.jinja_env.get_template('union.h')
cpp_template = self.jinja_env.get_template('union.cpp')
template_context = v8_union.union_context(
union_types, self.info_provider.interfaces_info)
template_context['code_generator'] = module_pyname
capitalized_component = self.target_component.capitalize()
template_context['header_filename'] = 'bindings/%s/v8/UnionTypes%s.h' % (
self.target_component, capitalized_component)
template_context['macro_guard'] = 'UnionType%s_h' % capitalized_component
# Add UnionTypesCore.h as a dependency when we generate modules union types
# because we only generate union type containers which are used by both
# core and modules in UnionTypesCore.h.
# FIXME: This is an ad hoc workaround and we need a general way to
# handle core <-> modules dependency.
if self.target_component == 'modules':
template_context['header_includes'] = sorted(
template_context['header_includes'] +
['bindings/core/v8/UnionTypesCore.h'])
header_text = header_template.render(template_context)
cpp_text = cpp_template.render(template_context)
header_path = posixpath.join(self.output_dir,
'UnionTypes%s.h' % capitalized_component)
cpp_path = posixpath.join(self.output_dir,
'UnionTypes%s.cpp' % capitalized_component)
return (
(header_path, header_text),
(cpp_path, cpp_text),
)
开发者ID:janRucka,项目名称:blink,代码行数:34,代码来源:code_generator_v8.py
示例13: run_tests_remote
def run_tests_remote(tests, prefix, options):
# Setup device with everything needed to run our tests.
from mozdevice import devicemanager, devicemanagerADB, devicemanagerSUT
if options.device_transport == 'adb':
if options.device_ip:
dm = devicemanagerADB.DeviceManagerADB(options.device_ip, options.device_port, deviceSerial=options.device_serial, packageName=None, deviceRoot=options.remote_test_root)
else:
dm = devicemanagerADB.DeviceManagerADB(deviceSerial=options.device_serial, packageName=None, deviceRoot=options.remote_test_root)
else:
dm = devicemanagerSUT.DeviceManagerSUT(options.device_ip, options.device_port, deviceRoot=options.remote_test_root)
if options.device_ip == None:
print('Error: you must provide a device IP to connect to via the --device option')
sys.exit(1)
# Update the test root to point to our test directory.
jit_tests_dir = posixpath.join(options.remote_test_root, 'jit-tests')
options.remote_test_root = posixpath.join(jit_tests_dir, 'jit-tests')
# Push js shell and libraries.
if dm.dirExists(jit_tests_dir):
dm.removeDir(jit_tests_dir)
dm.mkDirs(options.remote_test_root)
push_libs(options, dm)
push_progs(options, dm, [prefix[0]])
dm.chmodDir(options.remote_test_root)
dm.pushDir(ECMA6_DIR, posixpath.join(jit_tests_dir, 'tests', 'ecma_6'), timeout=600)
dm.pushDir(os.path.dirname(TEST_DIR), options.remote_test_root, timeout=600)
prefix[0] = os.path.join(options.remote_test_root, 'js')
# Run all tests.
gen = get_remote_results(tests, dm, prefix, options)
ok = process_test_results(gen, len(tests), options)
return ok
开发者ID:jonathanmarvens,项目名称:mozilla-central,代码行数:35,代码来源:jittests.py
示例14: create_board_post
def create_board_post(self, board_name, board_id, current_uid = -1):
board_info = board.get_board_info(board_id)
if not acl.is_allowed('board', board_id, current_uid, 'create'):
return util.render().error(error_message = _('NO_PERMISSION'), help_context='error')
user_data = web.input()
comment = 1 if user_data.has_key('commentable') else 0
write_by_other = 1 if user_data.has_key('writable') else 0
indexable = 1 if user_data.has_key('indexable') else 0
show_avatar = 1 if user_data.has_key('show_avatar') else 0
owner_uid = user._get_uid_from_username(user_data.owner)
if owner_uid < 0:
return util.render().error(error_message=_('NO_SUCH_USER_FOR_BOARD_ADMIN'), help_context='error')
if user_data.name.strip() == '':
return util.render().error(error_message = _('NO_NAME_SPECIFIED'), help_context='error')
if board_name == '^root':
new_path = posixpath.join('/', user_data.name)
else:
new_path = posixpath.join('/', board_name, user_data.name)
if board._get_board_id_from_path(new_path) > 0:
return util.render().error(error_message = _('BOARD_EXISTS'), help_context='error')
settings = dict(path=new_path, board_owner = owner_uid,
cover = user_data.information,
description = user_data.description,
type = int(user_data.type),
guest_write = write_by_other,
can_comment = comment,
indexable = indexable, show_avatar = show_avatar,
current_uid = current_uid)
ret = board.create_board(board_id, settings)
if ret[0] == False:
return util.render().error(error_message = ret[1] ,help_context = 'error')
raise web.seeother(util.link('%s') % (new_path))
开发者ID:peremen,项目名称:noah3k,代码行数:34,代码来源:web_board.py
示例15: run
def run(self):
"""Run substitutions on files."""
super(checkbox_install_data, self).run()
examplesfiles = [o for o in self.outfiles if "examples" in o]
if not examplesfiles:
return
# Create etc directory
if self.install_dir == "/usr":
basedir = posixpath.sep
else:
basedir = self.install_dir
etcdir = posixpath.join(basedir, "etc", "checkbox.d")
self.mkpath(etcdir)
# Create configs symbolic link
dstdir = posixpath.dirname(examplesfiles[0]).replace("examples",
"configs")
if not os.path.exists(dstdir):
os.symlink(etcdir, dstdir)
# Substitute version in examplesfiles and etcfiles
version = changelog_version()
for examplesfile in examplesfiles:
etcfile = posixpath.join(etcdir,
posixpath.basename(examplesfile))
infile = posixpath.join("examples",
posixpath.basename(examplesfile))
for outfile in examplesfile, etcfile:
substitute_variables(infile, outfile, {
"version = dev": "version = %s" % version})
开发者ID:jds2001,项目名称:ocp-checkbox,代码行数:32,代码来源:setup.py
示例16: retrieve_keyring
def retrieve_keyring(self, keyring_path):
# ignore local
if self.local:
return
keyring = Keyring(keyring_path)
# prevent previously signed repos from going unsigned
if not self.signed and keyring.exists():
raise RepositoryUnavailable('Previously signed repository can not go unsigned')
if not self.signed:
return
if not keyring.exists() or self.key_update > keyring.version:
# This is a remote repository, download file
browser = WeboobBrowser()
try:
keyring_data = browser.readurl(posixpath.join(self.url, self.KEYRING))
sig_data = browser.readurl(posixpath.join(self.url, self.KEYRING + '.sig'))
except BrowserUnavailable, e:
raise RepositoryUnavailable(unicode(e))
if keyring.exists():
if not keyring.is_valid(keyring_data, sig_data):
raise InvalidSignature('the keyring itself')
print 'The keyring was updated (and validated by the previous one).'
else:
print 'First time saving the keyring, blindly accepted.'
keyring.save(keyring_data, self.key_update)
print keyring
开发者ID:eirmag,项目名称:weboob,代码行数:28,代码来源:repositories.py
示例17: cleanup
def cleanup(self):
if not self.restore:
return
Runner.cleanup(self)
self.dm.remount()
# Restore the original profile
for added_file in self.added_files:
self.dm.removeFile(added_file)
for backup_file in self.backup_files:
if self.dm.fileExists('%s.orig' % backup_file):
self.dm.shellCheckOutput(['dd', 'if=%s.orig' % backup_file, 'of=%s' % backup_file])
self.dm.removeFile("%s.orig" % backup_file)
# Delete any bundled extensions
extension_dir = posixpath.join(self.remote_profile, 'extensions', 'staged')
if self.dm.dirExists(extension_dir):
for filename in self.dm.listFiles(extension_dir):
try:
self.dm.removeDir(posixpath.join(self.bundles_dir, filename))
except DMError:
pass
# Remove the test profile
self.dm.removeDir(self.remote_profile)
开发者ID:afabbro,项目名称:gecko-dev,代码行数:26,代码来源:remote.py
示例18: prepare_site_db_and_overrides
def prepare_site_db_and_overrides():
'''Prepare overrides and create _SITE_DB
_SITE_DB.keys() need to be ready for filter_translations
'''
_SITE_DB.clear()
_SITE_DB[_MAIN_LANG] = _MAIN_SITEURL
# make sure it works for both root-relative and absolute
main_siteurl = '/' if _MAIN_SITEURL == '' else _MAIN_SITEURL
for lang, overrides in _SUBSITE_QUEUE.items():
if 'SITEURL' not in overrides:
overrides['SITEURL'] = posixpath.join(main_siteurl, lang)
_SITE_DB[lang] = overrides['SITEURL']
# default subsite hierarchy
if 'OUTPUT_PATH' not in overrides:
overrides['OUTPUT_PATH'] = os.path.join(
_MAIN_SETTINGS['OUTPUT_PATH'], lang)
if 'CACHE_PATH' not in overrides:
overrides['CACHE_PATH'] = os.path.join(
_MAIN_SETTINGS['CACHE_PATH'], lang)
if 'STATIC_PATHS' not in overrides:
overrides['STATIC_PATHS'] = []
if ('THEME' not in overrides and 'THEME_STATIC_DIR' not in overrides and
'THEME_STATIC_PATHS' not in overrides):
relpath = relpath_to_site(lang, _MAIN_LANG)
overrides['THEME_STATIC_DIR'] = posixpath.join(
relpath, _MAIN_SETTINGS['THEME_STATIC_DIR'])
overrides['THEME_STATIC_PATHS'] = []
# to change what is perceived as translations
overrides['DEFAULT_LANG'] = lang
开发者ID:DjalelBBZ,项目名称:pelican-plugins,代码行数:30,代码来源:i18n_subsites.py
示例19: __init__
def __init__(self, **kwargs):
""":py:class:`~mrjob.hadoop.HadoopJobRunner` takes the same arguments
as :py:class:`~mrjob.runner.MRJobRunner`, plus some additional options
which can be defaulted in :ref:`mrjob.conf <mrjob.conf>`.
"""
super(HadoopJobRunner, self).__init__(**kwargs)
self._hadoop_tmp_dir = fully_qualify_hdfs_path(
posixpath.join(
self._opts['hadoop_tmp_dir'], self._job_key))
# Keep track of local files to upload to HDFS. We'll add them
# to this manager just before we need them.
hdfs_files_dir = posixpath.join(self._hadoop_tmp_dir, 'files', '')
self._upload_mgr = UploadDirManager(hdfs_files_dir)
# Set output dir if it wasn't set explicitly
self._output_dir = fully_qualify_hdfs_path(
self._output_dir or
posixpath.join(self._hadoop_tmp_dir, 'output'))
# Track job and (YARN) application ID to enable log parsing
self._application_id = None
self._job_id = None
# Keep track of where the hadoop streaming jar is
self._hadoop_streaming_jar = self._opts['hadoop_streaming_jar']
self._searched_for_hadoop_streaming_jar = False
# List of dicts (one for each step) potentially containing
# the keys 'history', 'step', and 'task' ('step' will always
# be filled because it comes from the hadoop jar command output,
# others will be filled as needed)
self._log_interpretations = []
开发者ID:mmontagna,项目名称:mrjob,代码行数:34,代码来源:hadoop.py
示例20: get_icon_url
def get_icon_url(self):
base = join(self.ICON_PATH, self.get_ext())
for ext in self.ICON_EXTENSIONS:
relative = "%s.%s" % (base, ext)
if exists(join(settings.MEDIA_ROOT, relative)):
return join(settings.MEDIA_URL, relative)
return None
开发者ID:psmithelectric,项目名称:django-cms-2.0,代码行数:7,代码来源:models.py
注:本文中的posixpath.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论