• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python ast.literal_eval函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ast.literal_eval函数的典型用法代码示例。如果您正苦于以下问题:Python literal_eval函数的具体用法?Python literal_eval怎么用?Python literal_eval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了literal_eval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _signup_create_user

    def _signup_create_user(self, cr, uid, values, context=None):
        """ create a new user from the template user """
        ir_config_parameter = self.pool.get("ir.config_parameter")
        template_user_id = literal_eval(ir_config_parameter.get_param(cr, uid, "auth_signup.template_user_id", "False"))
        assert template_user_id and self.exists(
            cr, uid, template_user_id, context=context
        ), "Signup: invalid template user"

        # check that uninvited users may sign up
        if "partner_id" not in values:
            if not literal_eval(ir_config_parameter.get_param(cr, uid, "auth_signup.allow_uninvited", "False")):
                raise SignupError("Signup is not allowed for uninvited users")

        assert values.get("login"), "Signup: no login given for new user"
        assert values.get("partner_id") or values.get("name"), "Signup: no name or partner given for new user"

        # create a copy of the template user (attached to a specific partner_id if given)
        values["active"] = True
        context = dict(context or {}, no_reset_password=True)
        try:
            with cr.savepoint():
                return self.copy(cr, uid, template_user_id, values, context=context)
        except Exception, e:
            # copy may failed if asked login is not available.
            raise SignupError(ustr(e))
开发者ID:SCSSG,项目名称:Odoo-SCS,代码行数:25,代码来源:res_users.py


示例2: deferred_add_new_album_details

def deferred_add_new_album_details(album_id, added, album, artist, channel, img, tags, url, users):
    try:
        if album_id not in list_model.get_list():
            list_model.add_to_list(album_id)
        albums_model.add_to_albums(album_id, artist=artist, name=album, url=url, img=img, channel=channel)
        if added:
            albums_model.update_album_added(album_id, added)
        if not img:
            deferred_process_album_cover.delay(album_id)
        if tags is not None:
            if isinstance(tags, str):
                tags = ast.literal_eval(tags)
            deferred_process_tags.delay(album_id, tags)
        else:
            deferred_process_album_tags.delay(album_id)
        if users is not None:
            if isinstance(users, str):
                users = ast.literal_eval(users)
            deferred_process_users.delay(album_id, users)
        deferred_check_album_url.delay(album_id)
    except DatabaseError as e:
        print(f'[db]: failed to add new album details for [{album_id}] {album} by {artist}')
        print(f'[db]: {e}')
    else:
        print(f'[db]: added new album details for [{album_id}] {album} by {artist}')
开发者ID:Ogreman,项目名称:albumlist,代码行数:25,代码来源:queued.py


示例3: test_get

    def test_get(self):
        username = "other"
        email = "mailto:[email protected]"
        password = "test"
        auth = "Basic %s" % base64.b64encode("%s:%s" % (username, password))
        form = {'username':username,'email': email,'password':password,'password2':password}
        response = self.client.post(reverse(views.register),form, X_Experience_API_Version="1.0.0")        

        r = self.client.get(self.url, self.testparams1, X_Experience_API_Version="1.0.0", Authorization=self.auth)
        self.assertEqual(r.status_code, 200)
        robj = ast.literal_eval(r.content)
        self.assertEqual(robj['test'], self.teststate1['test'])
        self.assertEqual(robj['obj']['agent'], self.teststate1['obj']['agent'])
        self.assertEqual(r['etag'], '"%s"' % hashlib.sha1(r.content).hexdigest())

        r2 = self.client.get(self.url, self.testparams2, X_Experience_API_Version="1.0.0", Authorization=self.auth)
        self.assertEqual(r2.status_code, 200)
        robj2 = ast.literal_eval(r2.content)
        self.assertEqual(robj2['test'], self.teststate2['test'])
        self.assertEqual(robj2['obj']['agent'], self.teststate2['obj']['agent'])
        self.assertEqual(r2['etag'], '"%s"' % hashlib.sha1(r2.content).hexdigest())
        
        r3 = self.client.get(self.url, self.testparams3, X_Experience_API_Version="1.0.0", Authorization=self.auth)
        self.assertEqual(r3.status_code, 200)
        robj3 = ast.literal_eval(r3.content)
        self.assertEqual(robj3['test'], self.teststate3['test'])
        self.assertEqual(robj3['obj']['agent'], self.teststate3['obj']['agent'])
        self.assertEqual(r3['etag'], '"%s"' % hashlib.sha1(r3.content).hexdigest())

        r4 = self.client.get(self.url, self.testparams4, X_Experience_API_Version="1.0.0", Authorization=auth)
        self.assertEqual(r4.status_code, 200)
        robj4 = ast.literal_eval(r4.content)
        self.assertEqual(robj4['test'], self.teststate4['test'])
        self.assertEqual(robj4['obj']['agent'], self.teststate4['obj']['agent'])
        self.assertEqual(r4['etag'], '"%s"' % hashlib.sha1(r4.content).hexdigest())
开发者ID:Lindy21,项目名称:CSE498-LRS,代码行数:35,代码来源:ActivityStateTests.py


示例4: incrementclientip

def incrementclientip():
    netparts = []
    iplist = []
    startip = 2
    
    #p = subprocess.Popen(['/usr/local/openvpn_as/scripts/confdba', '-s'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    #tmpdba, err = p.communicate()
    #confdba = ast.literal_eval(tmpdba)
    
    p = subprocess.Popen(['cat', '/root/bin/confdbas.txt'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    tmpdba, err = p.communicate()
    confdba = ast.literal_eval(tmpdba)
    
    for key1,val1 in confdba.items():
        if key1 == 'Default':
            if isinstance(val1, dict):
                for key2,val2 in val1.items():
                    if key2 == 'vpn.server.static.0.network':
                        network = val1.get('vpn.server.static.0.network', None)
                        netmask = val1.get('vpn.server.static.0.netmask_bits', None)
                        
    startipdec = ip2int(network)                        #decimal form of the network address
    maxclients = pow(2,(32 - int(netmask))) - startip   #decimal form of the number of hosts with given mask
    print maxclients
    minipdec = startipdec + startip + 8192                        #lowest ip in decimal format
    maxipdec = startipdec + maxclients + startip - 2 + 8192      #highest ip in decimal format
    minip = int2ip(minipdec)                                #lowest ip in dotted notation
    maxip = int2ip(maxipdec)                                #highest ip in dotted notation

    #p = subprocess.Popen(['/usr/local/openvpn_as/scripts/confdba', '-us'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    #tmpdba, err = p.communicate()
    #userdba = ast.literal_eval(tmpdba)

    p = subprocess.Popen(['cat', '/root/bin/confdbaus.txt'], stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    tmpdba, err = p.communicate()
    userdba = ast.literal_eval(tmpdba)

    for key1,val1 in userdba.items():
        if isinstance(val1, dict):
            for key2,val2 in val1.items():
                usertype = val1.get(key2, val2)
                if usertype == 'user_connect' or usertype == 'user_compile':
                    userip = val1.get('conn_ip', None)
                    if userip:
                        try:
                            socket.inet_aton(userip)  #Makes sure the groups are in IPv4 network form for sorted below
                            iplist.append(userip)
                        except:
                            pass
    
    for seqdec in range(minipdec, maxipdec):
        seqip = int2ip(seqdec)
        if checkip(seqip, iplist):
            pass
        else:
            newclientip = seqip
            break
    
    print newclientip
    return newclientip
开发者ID:grantmcwilliams,项目名称:VPNMagic,代码行数:60,代码来源:vpnmagic-as.py


示例5: run

    def run(self):
        """Run the directive."""
        if pygal is None:
            msg = req_missing(['pygal'], 'use the Chart directive', optional=True)
            return [nodes.raw('', '<div class="text-error">{0}</div>'.format(msg), format='html')]
        options = {}
        if 'style' in self.options:
            style_name = self.options.pop('style')
        else:
            style_name = 'BlueStyle'
        if '(' in style_name:  # Parametric style
            style = eval('pygal.style.' + style_name)
        else:
            style = getattr(pygal.style, style_name)
        for k, v in self.options.items():
            options[k] = literal_eval(v)

        chart = getattr(pygal, self.arguments[0])(style=style)
        chart.config(**options)
        for line in self.content:
            label, series = literal_eval('({0})'.format(line))
            chart.add(label, series)
        data = chart.render().decode('utf8')
        if _site and _site.invariant:
            import re
            data = re.sub('id="chart-[a-f0-9\-]+"', 'id="chart-foobar"', data)
            data = re.sub('#chart-[a-f0-9\-]+', '#chart-foobar', data)
        return [nodes.raw('', data, format='html')]
开发者ID:GetsDrawn,项目名称:nikola,代码行数:28,代码来源:chart.py


示例6: syncfiles

def syncfiles(args):
    if len(args) < 3:
        printtofile ('args not enough, need: files src_host dest_hosts')
        return
    dest_hosts = []
    src_host = ''
    files = ast.literal_eval(args[2])
    if len(args) >= 4:
        src_host = args[3]
    if len(args) >= 5:
        dest_hosts = ast.literal_eval(args[4])
    if len(dest_hosts) == 0:
        if len(src_host) == 0:
            dest_hosts = primary_host + replicas_host
        else:
            dest_hosts = replicas_host

    for file in files:
        fullfilepath = ''
        if (src_host == ''):
            fullfilepath = base_dir + '/' + path.abspath(file).replace(path.abspath(local_base), '')
            printtofile ('local to remote : ' + fullfilepath)
        else:
            fullfilepath = base_dir + '/' + file
        for dest_host in dest_hosts:
            dest_host_path = loginuser + '@' + dest_host + ':' + fullfilepath
            if src_host == '':
                # using local file
                os.system(scp_local + file + ' ' + dest_host_path)
            else:
                os.system(scp_remote + src_host + ':' + fullfilepath + ' ' + dest_host_path)
    printtofile ('finished.')
开发者ID:izenecloud,项目名称:driver-ruby,代码行数:32,代码来源:easy_tool.py


示例7: rulesets

def rulesets():
    if request.method == 'POST':
        topic = ast.literal_eval(request.form['topic'])
        name = request.form['name']

        app.logger.info('POST rulesets: topic(%s), name(%s)' % (topic, name))

        db = g.db
        cur = db.cursor()
        cur.execute(queries['add_ruleset'], (topic, name, topic))
        db.commit()
        cur.execute(queries['get_ruleset'], (topic, ))
        rulesets = [dict(category_seq=int(row[0]), name=row[1]) for row in cur.fetchall()]
        return jsonify(rulesets=rulesets)

    if request.method == 'DELETE':
        topic = ast.literal_eval(request.form['topic'])
        category_seq = ast.literal_eval(request.form['category_seq'])

        app.logger.info('DELETE rulesets: topic(%s), category_seq(%s)' % (topic, category_seq))

        db = g.db
        cur = db.cursor()
        cur.execute(queries['get_rules'], (topic, category_seq))
        rd = g.rd
        for row in cur.fetchall():
            rd.delete(int(row[0]))
        cur.execute(queries['del_rules_word_relations'], (topic, category_seq))
        cur.execute(queries['del_rules_sentence_relations'], (topic, category_seq))
        cur.execute(queries['del_rules'], (topic, category_seq))
        cur.execute(queries['del_ruleset'], (topic, category_seq))
        db.commit()
        return jsonify(deleted=dict(topic=topic, category_seq=category_seq))
开发者ID:sungminoh,项目名称:Sentences-analysis,代码行数:33,代码来源:morpheme.py


示例8: custom_tecov2_nee

def custom_tecov2_nee(task_id,site,filename,years,forecast,upload):#,modWeather):
    # Header row
    header='Year DOY hour     T_air q1    precip q2       PAR q3     R_net q4    ustar         NEE filled_NEE         LE  filled_LE\n'
    head =['Year','DOY','hour','T_air','q1','precip','q2','PAR','q3','R_net','q4','ustar','NEE','filled_NEE','LE','filled_LE']
    #fixed width list of values
    wd=[4,4,5,11,2,11,2,11,2,11,2,11,11,11,11,11]
    #set working diectory
    wkdir = basedir + "celery_data/" + str(task_id)
    #wkdir = "/home/mstacy/test"
    os.chdir(wkdir)
    #open file and set header
    outfile = open(filename,"w")
    outfile.write(header)
    #open mongo connection
    db = Connection(mongoHost).teco
    #safe eval to get start and end dates
    yr=ast.literal_eval(years)
    start = yr[0]
    end = yr[1] 
    #figure time step currrently only working for Hourly and half hourly
    #stepRes=db.forcing.find({"Site":site}).sort([('observed_date',1),('hour',1)]).limit(2)
    #step=stepRes[1]['hour']-stepRes[0]['hour']
    #if step == 0.5:
    #    stepdenom=2
    #else:
    #stepdenom=1
    #safe eval forecast to list of tuples
    forc = ast.literal_eval(forecast)
    set_nee_data(db,site,head,wd,outfile,start,end,forc,upload)#,stepdenom,modWeather)
开发者ID:ouinformatics,项目名称:cybercomq,代码行数:29,代码来源:task.py


示例9: __init__

	def __init__(self, request, is_celery):
		self.privacy_level = request['privacy_level']
		self.epsilon = float(request['epsilon'])
		self.min_freq = float(request['min_freq']) if 'min_freq' in request.keys() else 0.
		self.exp_round = request['exp_round'] if 'exp_round' in request.keys() else None

		self.dp_id = request['dp_id']
		task = get_object_or_404(Task, pk = request['task_id']) if is_celery else request['task_id']
		self.task_id = task.task_id
		self.eps1_level = task.eps1_level
		self.data_path = task.data_path

		self.jtree_strct = ast.literal_eval(str(task.jtree_strct))
		self.opted_cluster = ast.literal_eval(str(task.opted_cluster))
		
		self.edges = ast.literal_eval(str(task.dep_graph))
		self.domain = task.domain if isinstance(task.domain, dict) else collections.OrderedDict(ast.literal_eval(task.domain)) # This is the corsed domain
		self.valbin_map = ast.literal_eval(str(task.valbin_map))
		self.selected_attrs = task.selected_attrs if isinstance(task.selected_attrs, dict) else self.convert_selected_attrs(task.selected_attrs)
		self.white_list = ast.literal_eval(str(task.white_list))
		self.nodes = self.domain.keys()

		self.histogramdds = None
		self.data = None
		self.sim_df = None
		self.statistics_err = None
		self.generalized_dataframe = None
		self.synthetic_path = None
开发者ID:JiaMingLin,项目名称:de-identification,代码行数:28,代码来源:celery_tasks.py


示例10: custom_tecov2_setup

def custom_tecov2_setup(task_id,site,filename,years,forecast,modWeather,upload):
    # Header row
    header='Year  DOY  hour  T_air q1   Q_air  q2   Wind_speed q3     Precip   q4   Pressure   q5  R_global_in q6   R_longwave_in q7   CO2'
    head =['Year','DOY','hour','T_air','q1','Q_air','q2','Wind_speed','q3','Precip','q4','Pressure','q5',
            'R_global_in','q6','R_longwave_in','q7','CO2']
    #fixed width list of values
    wd=[4,5,7,14,2,14,2,14,2,14,2,14,2,14,2,14,2,11]
    #set working diectory
    wkdir = basedir + "celery_data/" + str(task_id)
    #wkdir = "/home/mstacy/test"
    os.chdir(wkdir)
    #open file and set header
    outfile = open(filename,"w")
    outfile.write(header + '\n\n')
    #open mongo connection
    db = Connection(mongoHost).teco
    #safe eval to get start and end dates
    yr=ast.literal_eval(years)
    start = datetime(yr[0],1,1)
    end = datetime(yr[1] + 1,1,1)
    #figure time step currrently only working for Hourly and half hourly
    if upload:
        stepRes=db.uploaded_data.find({"Site":site}).sort([('observed_date',1),('hour',1)]).limit(2)
    else:
        stepRes=db.forcing.find({"Site":site}).sort([('observed_date',1),('hour',1)]).limit(2)
    step=stepRes[1]['hour']-stepRes[0]['hour']
    if step == 0.5:
        stepdenom=2
    else:
        stepdenom=1
    #safe eval forecast to list of tuples
    forc = ast.literal_eval(forecast)
    set_input_data(db,site,head,wd,outfile,start,end,forc,stepdenom,modWeather,upload)
开发者ID:ouinformatics,项目名称:cybercomq,代码行数:33,代码来源:task.py


示例11: get_os

    def get_os(self):
        cmd = 'python -c "import platform; raw = list(platform.dist());raw.append(platform.release());print raw"'
        data_out, data_err = self.execute(cmd)
        get_os_info = {}
        if not data_err:
            if 'command not found' not in data_out[0]:  # because some distros sport python3 by default!
                self.os, ver, release, kernel_version = ast.literal_eval(data_out[0])
                get_os_info['os'] = self.os
                get_os_info['osver'] = ver
                get_os_info['osverno'] =  kernel_version
                self.devargs.update({'os_info': get_os_info})
            else:
                cmd = 'python3 -c "import platform; raw = list(platform.dist());' \
                      'raw.append(platform.release());print (raw)"'
                data_out, data_err = self.execute(cmd)
                if not data_err:
                    self.os, ver, release, kernel_version = ast.literal_eval(data_out[0])
                    get_os_info['os'] = self.os
                    get_os_info['osver'] = ver
                    get_os_info['osverno'] =  kernel_version
                    self.devargs.update({'os_info': get_os_info})
                else:
                    if self.debug:
                        print '\t[-] Could not get OS info from host %s. Message was: %s' % (
                            self.machine_name, str(data_err))

        else:
            if self.debug:
                print '\t[-] Could not get OS info from host %s. Message was: %s' % (self.machine_name, str(data_err))
开发者ID:bugcy013,项目名称:Inventory,代码行数:29,代码来源:module_linux.py


示例12: merge_pieces

def merge_pieces(files, ip):
    ip = ip.replace("/","_")
    merge = {}
    if len(files) > 1:
        for index in range(len(files)):
            if index == len(files) - 1:
                filename = "fast_scaning_reports_"+ str(ip)
                report = file(filename, 'w')
                report.write(str(merge))
                report.close()
                return merge
            if index == 0:  
                input_1 = file(files[index]).read()
                os.remove(files[index])
                input_2 = file(files[index + 1]).read()
                os.remove(files[index + 1])
                input_1_dict = ast.literal_eval(input_1)
                input_2_dict = ast.literal_eval(input_2)
                merge = combineDicts(input_1_dict, input_2_dict)
            if index > 0:
                input_2 = file(files[index + 1]).read()
                os.remove(files[index + 1])
                input_2_dict= ast.literal_eval(input_2)
                merge = combineDicts(merge, input_2_dict)
    if len(files) == 1:
        os.rename(files[0], 'fast_scaning_reports_' + str(ip))
开发者ID:dikien,项目名称:penetration_testing,代码行数:26,代码来源:port_scanner.py


示例13: start

  def start(self):
            fp=open(self.fileis)
            error=" "
            r=1
            for i, line in enumerate(fp):
              try:
                #print i
                if i==1000:
                    break

                
                if len(line)>10:
                    line=line.replace("\"\"","\"")
                    line=line.replace("\"{","{")
                    line=line.replace("}\"","}")
                    if len(self.keycol)==0:
                      self.makeColumn(ast.literal_eval(line))
                    self.addTosheet(ast.literal_eval(line),r)
                r +=1    
              except:
                  print "Exception acr"
                  pass
            print "saving file ..........."      
            self.workbook.close()
            print "file saved"
开发者ID:haikentcode,项目名称:Delhivery,代码行数:25,代码来源:Dixcl(json+to+excel).py


示例14: service_create

    def service_create(self, context, service):
        LOG.debug("service_create")
        self.k8s_api = k8s.create_k8s_api(context, service.bay_uuid)
        manifest = k8s_manifest.parse(service.manifest)
        try:
            resp = self.k8s_api.create_namespaced_service(body=manifest,
                                                          namespace='default')
        except rest.ApiException as err:
            raise exception.KubernetesAPIFailed(err=err)

        if resp is None:
            raise exception.ServiceCreationFailed(bay_uuid=service.bay_uuid)

        service['uuid'] = resp.metadata.uid
        service['name'] = resp.metadata.name
        service['labels'] = ast.literal_eval(resp.metadata.labels)
        service['selector'] = ast.literal_eval(resp.spec.selector)
        service['ip'] = resp.spec.cluster_ip
        service_value = []
        for p in resp.spec.ports:
            ports = p.to_dict()
            if not ports['name']:
                ports['name'] = 'k8s-service'
            service_value.append(ports)

        service['ports'] = service_value

        return service
开发者ID:baikai,项目名称:magnum,代码行数:28,代码来源:k8s_conductor.py


示例15: data

def data(request):
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'choice.settings')
    django.setup()
    if request.method == "POST":
        compType = request.POST["compType"]
        if compType[-1] == ' ':
            compType = compType[:len(compType)-1]
        # Create the HttpResponse object with the appropriate CSV header.
        response = HttpResponse(content_type='text/csv')
        response['Content-Disposition'] = 'attachment; filename="{0}.csv"'.format(compType)
        writer = csv.writer(response)
        # retrieve the correct dataset
        samples = choiceData.objects.filter(compType=compType)
        writer.writerow(['ID', 'Username', 'Type', 'Choice1', 'Choice2', 'Selection', 'TTChoose', 'Familiar', 'Time of Survey Completion', 'IP'])
        count = 0
        for sample in samples:
            count += 1
            username = str(sample.username).replace('"','')
            CType = str(sample.compType).replace('"','')
            matchups = ast.literal_eval(sample.matchupNames)
            selection = ast.literal_eval(sample.choices)
            TTChoose = ast.literal_eval(sample.TTChoose)
            familiarity = sample.familiarity
            CTime = ast.literal_eval(sample.CTime)
            ip = sample.ip
            for i in range(len(selection)):
                row = [count, username, CType, matchups[i][0], matchups[i][1], selection[i], TTChoose[i], familiarity, CTime, ip]
                writer.writerow(row)
        return response
开发者ID:zmikaya,项目名称:rankbomb,代码行数:29,代码来源:views.py


示例16: test_get

    def test_get(self):
        r = self.client.get(reverse(agent_profile), self.testparams1, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
        self.assertEqual(r.status_code, 200)
        
        robj = ast.literal_eval(r.content)
        self.assertEqual(robj['test'], self.testprofile1['test'])
        self.assertEqual(robj['obj']['agent'], self.testprofile1['obj']['agent'])
        self.assertEqual(r['etag'], '"%s"' % hashlib.sha1('%s' % self.testprofile1).hexdigest())

        r2 = self.client.get(reverse(agent_profile), self.testparams2, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
        self.assertEqual(r2.status_code, 200)
        robj2 = ast.literal_eval(r2.content)
        self.assertEqual(robj2['test'], self.testprofile2['test'])
        self.assertEqual(robj2['obj']['agent'], self.testprofile2['obj']['agent'])
        self.assertEqual(r2['etag'], '"%s"' % hashlib.sha1('%s' % self.testprofile2).hexdigest())
        
        r3 = self.client.get(reverse(agent_profile), self.testparams3, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
        self.assertEqual(r3.status_code, 200)
        robj3 = ast.literal_eval(r3.content)
        self.assertEqual(robj3['test'], self.testprofile3['test'])
        self.assertEqual(robj3['obj']['agent'], self.testprofile3['obj']['agent'])
        self.assertEqual(r3['etag'], '"%s"' % hashlib.sha1('%s' % self.testprofile3).hexdigest())

        r4 = self.client.get(reverse(agent_profile), self.testparams4, Authorization=self.auth, X_Experience_API_Version=settings.XAPI_VERSION)
        self.assertEqual(r4.status_code, 200)
        robj4 = ast.literal_eval(r4.content)
        self.assertEqual(robj4['test'], self.otherprofile1['test'])
        self.assertEqual(robj4['obj']['agent'], self.otherprofile1['obj']['agent'])
        self.assertEqual(r4['etag'], '"%s"' % hashlib.sha1('%s' % self.otherprofile1).hexdigest())
开发者ID:roadlit,项目名称:lrs,代码行数:29,代码来源:AgentProfileTests.py


示例17: getMatch

def getMatch(startup):
    """Find top three matches for the given startup based on both Euclidean distance
    and Cosine similarity.
    
    Returns a tuple of grand total top matches, and top matches for each metric.
    Also returns basic site info for the matches.
    """

    vcList = loadVCList()
    scoresEuc, scoresCos = list(), list()

    for vc in vcList:
        # euclidean
        scoreEuc = eucDist(startup.fingerprint, ast.literal_eval(vc[1]))
        scoresEuc.append(tuple([vc[0], scoreEuc]))

        # cosine
        scoreCos = cosSim(startup.fingerprint, ast.literal_eval(vc[1]))
        scoresCos.append(tuple([vc[0], scoreCos]))

    scoresEuc = sorted(scoresEuc, key = itemgetter(1))[:3]
    scoresCos = sorted(scoresCos, key = itemgetter(1), reverse = True)[:3]

    scoresNetList = getNet(list((scoresEuc, scoresCos)))
    
    scoresNet = [vc for vc in vcList if vc[0] in scoresNetList[::-1]]
    
    return tuple((scoresNet, scoresEuc, scoresCos))
开发者ID:aviyashchin,项目名称:boxer.ai,代码行数:28,代码来源:__init__.py


示例18: sentences_by_rule

def sentences_by_rule():
    if request.method == 'GET':
        topic = ast.literal_eval(request.args.get('topic'))
        sources_ids = ast.literal_eval(request.args.get('sources'))
        format_string = formatstring(sources_ids)
        isRuleset = True if request.args.get('isRuleset') == 'true' else False
        rule_id = ast.literal_eval(request.args.get('rule_id'))
        fromDate = format_fromDate(ast.literal_eval(request.args.get('fromDate')))
        toDate = format_toDate(ast.literal_eval(request.args.get('toDate')))

        app.logger.info('GET sentences_by_rule: topic(%s), sources_ids(%s), isRuleset(%s), ruleset_or_rule_id(%s)'
                        % ('%s', format_string, '%s', '%s')
                        % tuple([topic] + sources_ids + [isRuleset, rule_id]))

        db = g.db
        cur = db.cursor()

        sentences = []
        if isRuleset:
            cur.execute(queries['get_sentences_by_ruleset'].format(fromDate, toDate)
                        % ('%s', '%s', format_string), [rule_id, topic] + sources_ids)
            sentences.extend([dict(sentence_id  = row[0],
                                   full_text    = row[1],
                                   rules        = map(int, row[2].split(',')))
                              for row in cur.fetchall()])
        else:
            cur.execute(queries['get_sentences_by_rule'].format(fromDate, toDate)
                        % ('%s', '%s', format_string), [rule_id, topic] + sources_ids)
            sentences.extend([dict(sentence_id  = row[0],
                                   full_text    = row[1],
                                   rules        = [])
                              for row in cur.fetchall()])

        return jsonify(sentences=sentences)
开发者ID:sungminoh,项目名称:Sentences-analysis,代码行数:34,代码来源:morpheme.py


示例19: get

def get(function, duration, *args):
    # type: (function, int, object) -> object or None
    """
    Gets cached value for provided function with optional arguments, or executes and stores the result
    :param function: Function to be executed
    :param duration: Duration of validity of cache in hours
    :param args: Optional arguments for the provided function
    """

    try:
        key = _hash_function(function, args)
        cache_result = cache_get(key)
        if cache_result:
            if _is_cache_valid(cache_result['date'], duration):
                return ast.literal_eval(cache_result['value'].encode('utf-8'))

        fresh_result = repr(function(*args))
        if not fresh_result:
            # If the cache is old, but we didn't get fresh result, return the old cache
            if cache_result:
                return cache_result
            return None

        cache_insert(key, fresh_result)
        return ast.literal_eval(fresh_result.encode('utf-8'))
    except Exception:
        return None
开发者ID:CYBERxNUKE,项目名称:xbmc-addon,代码行数:27,代码来源:cache.py


示例20: on_privmsg

 def on_privmsg(self, nick, chan, msg):
     if chan == "#osp" and msg.startswith("!"):
         p = r"^([a-zA-Z]+)\((.+)*\)"
         r = re.search(p, msg[1:])
         if r is None:
             return
         else:
             cmd, args = r.groups()
         if cmd in methodList:
             scribus.setRedraw(False)
             try:
                 if args is None:
                     r = getattr(scribus, cmd)()
                 elif type(ast.literal_eval(args)) is tuple:
                     r = getattr(scribus, cmd)(*ast.literal_eval(args))
                 else:
                     r = getattr(scribus, cmd)(ast.literal_eval(args))
                 self.privmsg(chan, "called %s" % cmd)
                 self.privmsg(chan, "returned %s" % r if r is not None else "nothing")
                 # Ugly workaround to force scribus refreshing
                 scribus.zoomDocument(101)
                 scribus.zoomDocument(100)
             except TypeError:
                 self.privmsg(chan, "%s" % getattr(scribus, cmd).__doc__)
             scribus.setRedraw(True)
         elif cmd == "help":
             for i in info(scribus):
                 self.privmsg(chan, "%s" % i)
         else:
             self.privmsg(chan, "No such a command: %s" % cmd)
开发者ID:osp,项目名称:osp.workshop.rca,代码行数:30,代码来源:qtbot.py



注:本文中的ast.literal_eval函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ast.make_tuple函数代码示例发布时间:2022-05-24
下一篇:
Python ast.keyword函数代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap