本文整理汇总了Python中ast.literal_eval函数的典型用法代码示例。如果您正苦于以下问题:Python literal_eval函数的具体用法?Python literal_eval怎么用?Python literal_eval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了literal_eval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _signup_create_user
def _signup_create_user(self, cr, uid, values, context=None):
""" create a new user from the template user """
ir_config_parameter = self.pool.get("ir.config_parameter")
template_user_id = literal_eval(ir_config_parameter.get_param(cr, uid, "auth_signup.template_user_id", "False"))
assert template_user_id and self.exists(
cr, uid, template_user_id, context=context
), "Signup: invalid template user"
# check that uninvited users may sign up
if "partner_id" not in values:
if not literal_eval(ir_config_parameter.get_param(cr, uid, "auth_signup.allow_uninvited", "False")):
raise SignupError("Signup is not allowed for uninvited users")
assert values.get("login"), "Signup: no login given for new user"
assert values.get("partner_id") or values.get("name"), "Signup: no name or partner given for new user"
# create a copy of the template user (attached to a specific partner_id if given)
values["active"] = True
context = dict(context or {}, no_reset_password=True)
try:
with cr.savepoint():
return self.copy(cr, uid, template_user_id, values, context=context)
except Exception, e:
# copy may failed if asked login is not available.
raise SignupError(ustr(e))
开发者ID:SCSSG,项目名称:Odoo-SCS,代码行数:25,代码来源:res_users.py
示例2: deferred_add_new_album_details
def deferred_add_new_album_details(album_id, added, album, artist, channel, img, tags, url, users):
try:
if album_id not in list_model.get_list():
list_model.add_to_list(album_id)
albums_model.add_to_albums(album_id, artist=artist, name=album, url=url, img=img, channel=channel)
if added:
albums_model.update_album_added(album_id, added)
if not img:
deferred_process_album_cover.delay(album_id)
if tags is not None:
if isinstance(tags, str):
tags = ast.literal_eval(tags)
deferred_process_tags.delay(album_id, tags)
else:
deferred_process_album_tags.delay(album_id)
if users is not None:
if isinstance(users, str):
users = ast.literal_eval(users)
deferred_process_users.delay(album_id, users)
deferred_check_album_url.delay(album_id)
except DatabaseError as e:
print(f'[db]: failed to add new album details for [{album_id}] {album} by {artist}')
print(f'[db]: {e}')
else:
print(f'[db]: added new album details for [{album_id}] {album} by {artist}')
开发者ID:Ogreman,项目名称:albumlist,代码行数:25,代码来源:queued.py
示例3: test_get
def test_get(self):
username = "other"
email = "mailto:[email protected]"
password = "test"
auth = "Basic %s" % base64.b64encode("%s:%s" % (username, password))
form = {'username':username,'email': email,'password':password,'password2':password}
response = self.client.post(reverse(views.register),form, X_Experience_API_Version="1.0.0")
r = self.client.get(self.url, self.testparams1, X_Experience_API_Version="1.0.0", Authorization=self.auth)
self.assertEqual(r.status_code, 200)
robj = ast.literal_eval(r.content)
self.assertEqual(robj['test'], self.teststate1['test'])
self.assertEqual(robj['obj']['agent'], self.teststate1['obj']['agent'])
self.assertEqual(r['etag'], '"%s"' % hashlib.sha1(r.content).hexdigest())
r2 = self.client.get(self.url, self.testparams2, X_Experience_API_Version="1.0.0", Authorization=self.auth)
self.assertEqual(r2.status_code, 200)
robj2 = ast.literal_eval(r2.content)
self.assertEqual(robj2['test'], self.teststate2['test'])
self.assertEqual(robj2['obj']['agent'], self.teststate2['obj']['agent'])
self.assertEqual(r2['etag'], '"%s"' % hashlib.sha1(r2.content).hexdigest())
r3 = self.client.get(self.url, self.testparams3, X_Experience_API_Version="1.0.0", Authorization=self.auth)
self.assertEqual(r3.status_code, 200)
robj3 = ast.literal_eval(r3.content)
self.assertEqual(robj3['test'], self.teststate3['test'])
self.assertEqual(robj3['obj']['agent'], self.teststate3['obj']['agent'])
self.assertEqual(r3['etag'], '"%s"' % hashlib.sha1(r3.content).hexdigest())
r4 = self.client.get(self.url, self.testparams4, X_Experience_API_Version="1.0.0", Authorization=auth)
self.assertEqual(r4.status_code, 200)
robj4 = ast.literal_eval(r4.content)
self.assertEqual(robj4['test'], self.teststate4['test'])
self.assertEqual(robj4['obj']['agent'], self.teststate4['obj']['agent'])
self.assertEqual(r4['etag'], '"%s"' % hashlib.sha1(r4.content).hexdigest())
开发者ID:Lindy21,项目名称:CSE498-LRS,代码行数:35,代码来源:ActivityStateTests.py
示例4: incrementclientip
def incrementclientip():
netparts = []
iplist = []
startip = 2
#p = subprocess.Popen(['/usr/local/openvpn_as/scripts/confdba', '-s'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#tmpdba, err = p.communicate()
#confdba = ast.literal_eval(tmpdba)
p = subprocess.Popen(['cat', '/root/bin/confdbas.txt'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
tmpdba, err = p.communicate()
confdba = ast.literal_eval(tmpdba)
for key1,val1 in confdba.items():
if key1 == 'Default':
if isinstance(val1, dict):
for key2,val2 in val1.items():
if key2 == 'vpn.server.static.0.network':
network = val1.get('vpn.server.static.0.network', None)
netmask = val1.get('vpn.server.static.0.netmask_bits', None)
startipdec = ip2int(network) #decimal form of the network address
maxclients = pow(2,(32 - int(netmask))) - startip #decimal form of the number of hosts with given mask
print maxclients
minipdec = startipdec + startip + 8192 #lowest ip in decimal format
maxipdec = startipdec + maxclients + startip - 2 + 8192 #highest ip in decimal format
minip = int2ip(minipdec) #lowest ip in dotted notation
maxip = int2ip(maxipdec) #highest ip in dotted notation
#p = subprocess.Popen(['/usr/local/openvpn_as/scripts/confdba', '-us'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#tmpdba, err = p.communicate()
#userdba = ast.literal_eval(tmpdba)
p = subprocess.Popen(['cat', '/root/bin/confdbaus.txt'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
tmpdba, err = p.communicate()
userdba = ast.literal_eval(tmpdba)
for key1,val1 in userdba.items():
if isinstance(val1, dict):
for key2,val2 in val1.items():
usertype = val1.get(key2, val2)
if usertype == 'user_connect' or usertype == 'user_compile':
userip = val1.get('conn_ip', None)
if userip:
try:
socket.inet_aton(userip) #Makes sure the groups are in IPv4 network form for sorted below
iplist.append(userip)
except:
pass
for seqdec in range(minipdec, maxipdec):
seqip = int2ip(seqdec)
if checkip(seqip, iplist):
pass
else:
newclientip = seqip
break
print newclientip
return newclientip
开发者ID:grantmcwilliams,项目名称:VPNMagic,代码行数:60,代码来源:vpnmagic-as.py
示例5: run
def run(self):
"""Run the directive."""
if pygal is None:
msg = req_missing(['pygal'], 'use the Chart directive', optional=True)
return [nodes.raw('', '<div class="text-error">{0}</div>'.format(msg), format='html')]
options = {}
if 'style' in self.options:
style_name = self.options.pop('style')
else:
style_name = 'BlueStyle'
if '(' in style_name: # Parametric style
style = eval('pygal.style.' + style_name)
else:
style = getattr(pygal.style, style_name)
for k, v in self.options.items():
options[k] = literal_eval(v)
chart = getattr(pygal, self.arguments[0])(style=style)
chart.config(**options)
for line in self.content:
label, series = literal_eval('({0})'.format(line))
chart.add(label, series)
data = chart.render().decode('utf8')
if _site and _site.invariant:
import re
data = re.sub('id="chart-[a-f0-9\-]+"', 'id="chart-foobar"', data)
data = re.sub('#chart-[a-f0-9\-]+', '#chart-foobar', data)
return [nodes.raw('', data, format='html')]
开发者ID:GetsDrawn,项目名称:nikola,代码行数:28,代码来源:chart.py
示例6: syncfiles
def syncfiles(args):
if len(args) < 3:
printtofile ('args not enough, need: files src_host dest_hosts')
return
dest_hosts = []
src_host = ''
files = ast.literal_eval(args[2])
if len(args) >= 4:
src_host = args[3]
if len(args) >= 5:
dest_hosts = ast.literal_eval(args[4])
if len(dest_hosts) == 0:
if len(src_host) == 0:
dest_hosts = primary_host + replicas_host
else:
dest_hosts = replicas_host
for file in files:
fullfilepath = ''
if (src_host == ''):
fullfilepath = base_dir + '/' + path.abspath(file).replace(path.abspath(local_base), '')
printtofile ('local to remote : ' + fullfilepath)
else:
fullfilepath = base_dir + '/' + file
for dest_host in dest_hosts:
dest_host_path = loginuser + '@' + dest_host + ':' + fullfilepath
if src_host == '':
# using local file
os.system(scp_local + file + ' ' + dest_host_path)
else:
os.system(scp_remote + src_host + ':' + fullfilepath + ' ' + dest_host_path)
printtofile ('finished.')
开发者ID:izenecloud,项目名称:driver-ruby,代码行数:32,代码来源:easy_tool.py
示例7: rulesets
def rulesets():
if request.method == 'POST':
topic = ast.literal_eval(request.form['topic'])
name = request.form['name']
app.logger.info('POST rulesets: topic(%s), name(%s)' % (topic, name))
db = g.db
cur = db.cursor()
cur.execute(queries['add_ruleset'], (topic, name, topic))
db.commit()
cur.execute(queries['get_ruleset'], (topic, ))
rulesets = [dict(category_seq=int(row[0]), name=row[1]) for row in cur.fetchall()]
return jsonify(rulesets=rulesets)
if request.method == 'DELETE':
topic = ast.literal_eval(request.form['topic'])
category_seq = ast.literal_eval(request.form['category_seq'])
app.logger.info('DELETE rulesets: topic(%s), category_seq(%s)' % (topic, category_seq))
db = g.db
cur = db.cursor()
cur.execute(queries['get_rules'], (topic, category_seq))
rd = g.rd
for row in cur.fetchall():
rd.delete(int(row[0]))
cur.execute(queries['del_rules_word_relations'], (topic, category_seq))
cur.execute(queries['del_rules_sentence_relations'], (topic, category_seq))
cur.execute(queries['del_rules'], (topic, category_seq))
cur.execute(queries['del_ruleset'], (topic, category_seq))
db.commit()
return jsonify(deleted=dict(topic=topic, category_seq=category_seq))
开发者ID:sungminoh,项目名称:Sentences-analysis,代码行数:33,代码来源:morpheme.py
示例8: custom_tecov2_nee
def custom_tecov2_nee(task_id,site,filename,years,forecast,upload):#,modWeather):
# Header row
header='Year DOY hour T_air q1 precip q2 PAR q3 R_net q4 ustar NEE filled_NEE LE filled_LE\n'
head =['Year','DOY','hour','T_air','q1','precip','q2','PAR','q3','R_net','q4','ustar','NEE','filled_NEE','LE','filled_LE']
#fixed width list of values
wd=[4,4,5,11,2,11,2,11,2,11,2,11,11,11,11,11]
#set working diectory
wkdir = basedir + "celery_data/" + str(task_id)
#wkdir = "/home/mstacy/test"
os.chdir(wkdir)
#open file and set header
outfile = open(filename,"w")
outfile.write(header)
#open mongo connection
db = Connection(mongoHost).teco
#safe eval to get start and end dates
yr=ast.literal_eval(years)
start = yr[0]
end = yr[1]
#figure time step currrently only working for Hourly and half hourly
#stepRes=db.forcing.find({"Site":site}).sort([('observed_date',1),('hour',1)]).limit(2)
#step=stepRes[1]['hour']-stepRes[0]['hour']
#if step == 0.5:
# stepdenom=2
#else:
#stepdenom=1
#safe eval forecast to list of tuples
forc = ast.literal_eval(forecast)
set_nee_data(db,site,head,wd,outfile,start,end,forc,upload)#,stepdenom,modWeather)
开发者ID:ouinformatics,项目名称:cybercomq,代码行数:29,代码来源:task.py
示例9: __init__
def __init__(self, request, is_celery):
self.privacy_level = request['privacy_level']
self.epsilon = float(request['epsilon'])
self.min_freq = float(request['min_freq']) if 'min_freq' in request.keys() else 0.
self.exp_round = request['exp_round'] if 'exp_round' in request.keys() else None
self.dp_id = request['dp_id']
task = get_object_or_404(Task, pk = request['task_id']) if is_celery else request['task_id']
self.task_id = task.task_id
self.eps1_level = task.eps1_level
self.data_path = task.data_path
self.jtree_strct = ast.literal_eval(str(task.jtree_strct))
self.opted_cluster = ast.literal_eval(str(task.opted_cluster))
self.edges = ast.literal_eval(str(task.dep_graph))
self.domain = task.domain if isinstance(task.domain, dict) else collections.OrderedDict(ast.literal_eval(task.domain)) # This is the corsed domain
self.valbin_map = ast.literal_eval(str(task.valbin_map))
self.selected_attrs = task.selected_attrs if isinstance(task.selected_attrs, dict) else self.convert_selected_attrs(task.selected_attrs)
self.white_list = ast.literal_eval(str(task.white_list))
self.nodes = self.domain.keys()
self.histogramdds = None
self.data = None
self.sim_df = None
self.statistics_err = None
self.generalized_dataframe = None
self.synthetic_path = None
开发者ID:JiaMingLin,项目名称:de-identification,代码行数:28,代码来源:celery_tasks.py
示例10: custom_tecov2_setup
def custom_tecov2_setup(task_id,site,filename,years,forecast,modWeather,upload):
# Header row
header='Year DOY hour T_air q1 Q_air q2 Wind_speed q3 Precip q4 Pressure q5 R_global_in q6 R_longwave_in q7 CO2'
head =['Year','DOY','hour','T_air','q1','Q_air','q2','Wind_speed','q3','Precip','q4','Pressure','q5',
'R_global_in','q6','R_longwave_in','q7','CO2']
#fixed width list of values
wd=[4,5,7,14,2,14,2,14,2,14,2,14,2,14,2,14,2,11]
#set working diectory
wkdir = basedir + "celery_data/" + str(task_id)
#wkdir = "/home/mstacy/test"
os.chdir(wkdir)
#open file and set header
outfile = open(filename,"w")
outfile.write(header + '\n\n')
#open mongo connection
db = Connection(mongoHost).teco
#safe eval to get start and end dates
yr=ast.literal_eval(years)
start = datetime(yr[0],1,1)
end = datetime(yr[1] + 1,1,1)
#figure time step currrently only working for Hourly and half hourly
if upload:
stepRes=db.uploaded_data.find({"Site":site}).sort([('observed_date',1),('hour',1)]).limit(2)
else:
stepRes=db.forcing.find({"Site":site}).sort([('observed_date',1),('hour',1)]).limit(2)
step=stepRes[1]['hour']-stepRes[0]['hour']
if step == 0.5:
stepdenom=2
else:
stepdenom=1
#safe eval forecast to list of tuples
forc = ast.literal_eval(forecast)
set_input_data(db,site,head,wd,outfile,start,end,forc,stepdenom,modWeather,upload)
开发者ID:ouinformatics,项目名称:cybercomq,代码行数:33,代码来源:task.py
示例11: get_os
def get_os(self):
cmd = 'python -c "import platform; raw = list(platform.dist());raw.append(platform.release());print raw"'
data_out, data_err = self.execute(cmd)
get_os_info = {}
if not data_err:
if 'command not found' not in data_out[0]: # because some distros sport python3 by default!
self.os, ver, release, kernel_version = ast.literal_eval(data_out[0])
get_os_info['os'] = self.os
get_os_info['osver'] = ver
get_os_info['osverno'] = kernel_version
self.devargs.update({'os_info': get_os_info})
else:
cmd = 'python3 -c "import platform; raw = list(platform.dist());' \
'raw.append(platform.release());print (raw)"'
data_out, data_err = self.execute(cmd)
if not data_err:
self.os, ver, release, kernel_version = ast.literal_eval(data_out[0])
get_os_info['os'] = self.os
get_os_info['osver'] = ver
get_os_info['osverno'] = kernel_version
self.devargs.update({'os_info': get_os_info})
else:
if self.debug:
print '\t[-] Could not get OS info from host %s. Message was: %s' % (
self.machine_name, str(data_err))
else:
if self.debug:
print '\t[-] Could not get OS info from host %s. Message was: %s' % (self.machine_name, str(data_err))
开发者ID:bugcy013,项目名称:Inventory,代码行数:29,代码来源:module_linux.py
示例12: merge_pieces
def merge_pieces(files, ip):
ip = ip.replace("/","_")
merge = {}
if len(files) > 1:
for index in range(len(files)):
if index == len(files) - 1:
filename = "fast_scaning_reports_"+ str(ip)
report = file(filename, 'w')
report.write(str(merge))
report.close()
return merge
if index == 0:
input_1 = file(files[index]).read()
os.remove(files[index])
input_2 = file(files[index + 1]).read()
os.remove(files[index + 1])
input_1_dict = ast.literal_eval(input_1)
input_2_dict = ast.literal_eval(input_2)
merge = combineDicts(input_1_dict, input_2_dict)
if index > 0:
input_2 = file(files[index + 1]).read()
os.remove(files[index + 1])
input_2_dict= ast.literal_eval(input_2)
merge = combineDicts(merge, input_2_dict)
if len(files) == 1:
os.rename(files[0], 'fast_scaning_reports_' + str(ip))
开发者ID:dikien,项目名称:penetration_testing,代码行数:26,代码来源:port_scanner.py
示例13: start
def start(self):
fp=open(self.fileis)
error=" "
r=1
for i, line in enumerate(fp):
try:
#print i
if i==1000:
break
if len(line)>10:
line=line.replace("\"\"","\"")
line=line.replace("\"{","{")
line=line.replace("}\"","}")
if len(self.keycol)==0:
self.makeColumn(ast.literal_eval(line))
self.addTosheet(ast.literal_eval(line),r)
r +=1
except:
print "Exception acr"
pass
print "saving file ..........."
self.workbook.close()
print "file saved"
开发者ID:haikentcode,项目名称:Delhivery,代码行数:25,代码来源:Dixcl(json+to+excel).py
示例14: service_create
def service_create(self, context, service):
LOG.debug("service_create")
self.k8s_api = k8s.create_k8s_api(context, service.bay_uuid)
manifest = k8s_manifest.parse(service.manifest)
try:
resp = self.k8s_api.create_namespaced_service(body=manifest,
namespace='default')
except rest.ApiException as err:
raise exception.KubernetesAPIFailed(err=err)
if resp is None:
raise exception.ServiceCreationFailed(bay_uuid=service.bay_uuid)
service['uuid'] = resp.metadata.uid
service['name'] = resp.metadata.name
service['labels'] = ast.literal_eval(resp.metadata.labels)
service['selector'] = ast.literal_eval(resp.spec.selector)
service['ip'] = resp.spec.cluster_ip
service_value = []
for p in resp.spec.ports:
ports = p.to_dict()
if not ports['name']:
ports['name'] = 'k8s-service'
service_value.append(ports)
service['ports'] = service_value
return service
开发者ID:baikai,项目名称:magnum,代码行数:28,代码来源:k8s_conductor.py
示例15: data
def data(request):
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'choice.settings')
django.setup()
if request.method == "POST":
compType = request.POST["compType"]
if compType[-1] == ' ':
compType = compType[:len(compType)-1]
# Create the HttpResponse object with the appropriate CSV header.
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="{0}.csv"'.format(compType)
writer = csv.writer(response)
# retrieve the correct dataset
samples = choiceData.objects.filter(compType=compType)
writer.writerow(['ID', 'Username', 'Type', 'Choice1', 'Choice2', 'Selection', 'TTChoose', 'Familiar', 'Time of Survey Completion', 'IP'])
count = 0
for sample in samples:
count += 1
username = str(sample.username).replace('"','')
CType = str(sample.compType).replace('"','')
matchups = ast.literal_eval(sample.matchupNames)
selection = ast.literal_eval(sample.choices)
TTChoose = ast.literal_eval(sample.TTChoose)
familiarity = sample.familiarity
CTime = ast.literal_eval(sample.CTime)
ip = sample.ip
for i in range(len(selection)):
row = [count, username, CType, matchups[i][0], matchups[i][1], selection[i], TTChoose[i], familiarity, CTime, ip]
writer.writerow(row)
return response
开发者ID:zmikaya,项目名称:rankbomb,代码行数:29,代码来源:views.py
示例16: test_get
def test_get(self):
r = self.client.get(reverse(agent_profile), self.testparams1, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
self.assertEqual(r.status_code, 200)
robj = ast.literal_eval(r.content)
self.assertEqual(robj['test'], self.testprofile1['test'])
self.assertEqual(robj['obj']['agent'], self.testprofile1['obj']['agent'])
self.assertEqual(r['etag'], '"%s"' % hashlib.sha1('%s' % self.testprofile1).hexdigest())
r2 = self.client.get(reverse(agent_profile), self.testparams2, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
self.assertEqual(r2.status_code, 200)
robj2 = ast.literal_eval(r2.content)
self.assertEqual(robj2['test'], self.testprofile2['test'])
self.assertEqual(robj2['obj']['agent'], self.testprofile2['obj']['agent'])
self.assertEqual(r2['etag'], '"%s"' % hashlib.sha1('%s' % self.testprofile2).hexdigest())
r3 = self.client.get(reverse(agent_profile), self.testparams3, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
self.assertEqual(r3.status_code, 200)
robj3 = ast.literal_eval(r3.content)
self.assertEqual(robj3['test'], self.testprofile3['test'])
self.assertEqual(robj3['obj']['agent'], self.testprofile3['obj']['agent'])
self.assertEqual(r3['etag'], '"%s"' % hashlib.sha1('%s' % self.testprofile3).hexdigest())
r4 = self.client.get(reverse(agent_profile), self.testparams4, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
self.assertEqual(r4.status_code, 200)
robj4 = ast.literal_eval(r4.content)
self.assertEqual(robj4['test'], self.otherprofile1['test'])
self.assertEqual(robj4['obj']['agent'], self.otherprofile1['obj']['agent'])
self.assertEqual(r4['etag'], '"%s"' % hashlib.sha1('%s' % self.otherprofile1).hexdigest())
开发者ID:roadlit,项目名称:lrs,代码行数:29,代码来源:AgentProfileTests.py
示例17: getMatch
def getMatch(startup):
"""Find top three matches for the given startup based on both Euclidean distance
and Cosine similarity.
Returns a tuple of grand total top matches, and top matches for each metric.
Also returns basic site info for the matches.
"""
vcList = loadVCList()
scoresEuc, scoresCos = list(), list()
for vc in vcList:
# euclidean
scoreEuc = eucDist(startup.fingerprint, ast.literal_eval(vc[1]))
scoresEuc.append(tuple([vc[0], scoreEuc]))
# cosine
scoreCos = cosSim(startup.fingerprint, ast.literal_eval(vc[1]))
scoresCos.append(tuple([vc[0], scoreCos]))
scoresEuc = sorted(scoresEuc, key = itemgetter(1))[:3]
scoresCos = sorted(scoresCos, key = itemgetter(1), reverse = True)[:3]
scoresNetList = getNet(list((scoresEuc, scoresCos)))
scoresNet = [vc for vc in vcList if vc[0] in scoresNetList[::-1]]
return tuple((scoresNet, scoresEuc, scoresCos))
开发者ID:aviyashchin,项目名称:boxer.ai,代码行数:28,代码来源:__init__.py
示例18: sentences_by_rule
def sentences_by_rule():
if request.method == 'GET':
topic = ast.literal_eval(request.args.get('topic'))
sources_ids = ast.literal_eval(request.args.get('sources'))
format_string = formatstring(sources_ids)
isRuleset = True if request.args.get('isRuleset') == 'true' else False
rule_id = ast.literal_eval(request.args.get('rule_id'))
fromDate = format_fromDate(ast.literal_eval(request.args.get('fromDate')))
toDate = format_toDate(ast.literal_eval(request.args.get('toDate')))
app.logger.info('GET sentences_by_rule: topic(%s), sources_ids(%s), isRuleset(%s), ruleset_or_rule_id(%s)'
% ('%s', format_string, '%s', '%s')
% tuple([topic] + sources_ids + [isRuleset, rule_id]))
db = g.db
cur = db.cursor()
sentences = []
if isRuleset:
cur.execute(queries['get_sentences_by_ruleset'].format(fromDate, toDate)
% ('%s', '%s', format_string), [rule_id, topic] + sources_ids)
sentences.extend([dict(sentence_id = row[0],
full_text = row[1],
rules = map(int, row[2].split(',')))
for row in cur.fetchall()])
else:
cur.execute(queries['get_sentences_by_rule'].format(fromDate, toDate)
% ('%s', '%s', format_string), [rule_id, topic] + sources_ids)
sentences.extend([dict(sentence_id = row[0],
full_text = row[1],
rules = [])
for row in cur.fetchall()])
return jsonify(sentences=sentences)
开发者ID:sungminoh,项目名称:Sentences-analysis,代码行数:34,代码来源:morpheme.py
示例19: get
def get(function, duration, *args):
# type: (function, int, object) -> object or None
"""
Gets cached value for provided function with optional arguments, or executes and stores the result
:param function: Function to be executed
:param duration: Duration of validity of cache in hours
:param args: Optional arguments for the provided function
"""
try:
key = _hash_function(function, args)
cache_result = cache_get(key)
if cache_result:
if _is_cache_valid(cache_result['date'], duration):
return ast.literal_eval(cache_result['value'].encode('utf-8'))
fresh_result = repr(function(*args))
if not fresh_result:
# If the cache is old, but we didn't get fresh result, return the old cache
if cache_result:
return cache_result
return None
cache_insert(key, fresh_result)
return ast.literal_eval(fresh_result.encode('utf-8'))
except Exception:
return None
开发者ID:CYBERxNUKE,项目名称:xbmc-addon,代码行数:27,代码来源:cache.py
示例20: on_privmsg
def on_privmsg(self, nick, chan, msg):
if chan == "#osp" and msg.startswith("!"):
p = r"^([a-zA-Z]+)\((.+)*\)"
r = re.search(p, msg[1:])
if r is None:
return
else:
cmd, args = r.groups()
if cmd in methodList:
scribus.setRedraw(False)
try:
if args is None:
r = getattr(scribus, cmd)()
elif type(ast.literal_eval(args)) is tuple:
r = getattr(scribus, cmd)(*ast.literal_eval(args))
else:
r = getattr(scribus, cmd)(ast.literal_eval(args))
self.privmsg(chan, "called %s" % cmd)
self.privmsg(chan, "returned %s" % r if r is not None else "nothing")
# Ugly workaround to force scribus refreshing
scribus.zoomDocument(101)
scribus.zoomDocument(100)
except TypeError:
self.privmsg(chan, "%s" % getattr(scribus, cmd).__doc__)
scribus.setRedraw(True)
elif cmd == "help":
for i in info(scribus):
self.privmsg(chan, "%s" % i)
else:
self.privmsg(chan, "No such a command: %s" % cmd)
开发者ID:osp,项目名称:osp.workshop.rca,代码行数:30,代码来源:qtbot.py
注:本文中的ast.literal_eval函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论