本文整理汇总了Python中util.get_webmention_target函数的典型用法代码示例。如果您正苦于以下问题:Python get_webmention_target函数的具体用法?Python get_webmention_target怎么用?Python get_webmention_target使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_webmention_target函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_webmention_targets
def get_webmention_targets(source, activity):
"""Returns a set of string target URLs to attempt to send webmentions to.
Side effect: runs the original post discovery algorithm on the activity and
adds the resulting URLs to the activity as tags, in place.
Args:
source: models.Source subclass
activity: activity dict
"""
original_post_discovery.discover(source, activity)
targets = set()
obj = activity.get('object') or activity
for tag in obj.get('tags', []):
url = tag.get('url')
if url and tag.get('objectType') == 'article':
url, domain, send = util.get_webmention_target(url)
tag['url'] = url
if send:
targets.add(url)
for url in obj.get('upstreamDuplicates', []):
url, domain, send = util.get_webmention_target(url)
if send:
targets.add(url)
return targets
开发者ID:notenoughneon,项目名称:bridgy,代码行数:29,代码来源:tasks.py
示例2: test_get_webmention_target_blacklisted_urls
def test_get_webmention_target_blacklisted_urls(self):
gwt = util.get_webmention_target
for bad in ('http://facebook.com/x', 'https://www.facebook.com/y',
'http://sub.dom.ain.facebook.com/z'):
self.assertFalse(util.get_webmention_target(bad)[2], bad)
self.assertTrue(util.get_webmention_target('http://good.com/a')[2])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:7,代码来源:util_test.py
示例3: test_get_webmention_target_too_big
def test_get_webmention_target_too_big(self):
self.expect_requests_head('http://orig', response_headers={
'Content-Length': str(util.MAX_HTTP_RESPONSE_SIZE + 1),
})
self.mox.ReplayAll()
self.assert_equals(('http://orig', 'orig', False),
util.get_webmention_target('http://orig'))
开发者ID:snarfed,项目名称:bridgy,代码行数:7,代码来源:test_util.py
示例4: resolve_profile_url
def resolve_profile_url(url, resolve=True):
"""Resolves a profile URL to be added to a source.
Args:
url: string
resolve: boolean, whether to make HTTP requests to follow redirects, etc.
Returns: string, resolved URL, or None
"""
final, _, ok = util.get_webmention_target(url, resolve=resolve)
if not ok:
return None
final = final.lower()
if util.schemeless(final).startswith(util.schemeless(url.lower())):
# redirected to a deeper path. use the original higher level URL. #652
final = url
# If final has a path segment check if root has a matching rel=me.
match = re.match(r'^(https?://[^/]+)/.+', final)
if match and resolve:
root = match.group(1)
try:
resp = util.requests_get(root)
resp.raise_for_status()
data = util.mf2py_parse(resp.text, root)
me_urls = data.get('rels', {}).get('me', [])
if final in me_urls:
final = root
except requests.RequestException:
logging.warning("Couldn't fetch %s, preserving path in %s",
root, final, exc_info=True)
return final
开发者ID:snarfed,项目名称:bridgy,代码行数:34,代码来源:models.py
示例5: _urls_and_domains
def _urls_and_domains(self, auth_entity, user_url):
"""Returns this user's valid (not webmention-blacklisted) URLs and domains.
Converts the auth entity's user_json to an ActivityStreams actor and uses
its 'urls' and 'url' fields. May be overridden by subclasses.
Args:
auth_entity: oauth_dropins.models.BaseAuth
user_url: string, optional URL passed in when authorizing
Returns: ([string url, ...], [string domain, ...])
"""
actor = self.gr_source.user_to_actor(json.loads(auth_entity.user_json))
logging.debug('Converted to actor: %s', json.dumps(actor, indent=2))
candidates = util.trim_nulls(util.uniquify(
[user_url] + microformats2.object_urls(actor)))
if len(candidates) > MAX_AUTHOR_URLS:
logging.warning('Too many profile links! Only resolving the first %s: %s',
MAX_AUTHOR_URLS, candidates)
urls = []
for i, url in enumerate(candidates):
url, domain, send = util.get_webmention_target(url, resolve=i < MAX_AUTHOR_URLS)
if send:
urls.append(url)
urls = util.dedupe_urls(urls) # normalizes domains to lower case
domains = [util.domain_from_link(url) for url in urls]
return urls, domains
开发者ID:chrisaldrich,项目名称:bridgy,代码行数:31,代码来源:models.py
示例6: _url_and_domain
def _url_and_domain(self, auth_entity):
"""Returns this source's URL and domain.
Uses the auth entity user_json 'url' field by default. May be overridden
by subclasses.
Args:
auth_entity: oauth_dropins.models.BaseAuth
Returns: (string url, string domain, boolean ok) tuple
"""
user_json = json.loads(auth_entity.user_json)
actor = self.as_source.user_to_actor(user_json)
urls = util.trim_nulls([actor.get('url')] +
# also look at G+'s urls field
[u.get('value') for u in user_json.get('urls', [])])
first_url = first_domain = None
for url in urls:
# TODO: fully support multiple urls
for url in url.split():
url, domain, ok = util.get_webmention_target(url)
if ok:
domain = domain.lower()
return url, domain, True
elif not first_url:
first_url = url
first_domain = domain
return first_url, first_domain, False
开发者ID:notenoughneon,项目名称:bridgy,代码行数:30,代码来源:models.py
示例7: test_get_webmention_middle_redirect_blacklisted
def test_get_webmention_middle_redirect_blacklisted(self):
"""We should allow blacklisted domains in the middle of a redirect chain.
...e.g. Google's redirector https://www.google.com/url?...
"""
self.expect_requests_head("http://orig", redirected_url=["https://www.google.com/url?xyz", "https://end"])
self.mox.ReplayAll()
self.assert_equals(("https://end", "end", True), util.get_webmention_target("http://orig", resolve=True))
开发者ID:singpolyma,项目名称:bridgy,代码行数:8,代码来源:test_util.py
示例8: resolve
def resolve(urls):
resolved = set()
for url in urls:
final, _, send = util.get_webmention_target(url)
if send:
resolved.add(final)
if include_redirect_sources:
resolved.add(url)
return resolved
开发者ID:kylewm,项目名称:bridgy,代码行数:9,代码来源:original_post_discovery.py
示例9: add_original_post_urls
def add_original_post_urls(self, post_id, obj, prop):
"""Extracts original post URLs and adds them to an object, in place.
If the post object has upstreamDuplicates, *only* they are considered
original post URLs and added as tags with objectType 'article', and the
post's own links and 'article' tags are added with objectType 'mention'.
Args:
post_id: string post id
obj: ActivityStreams post object
prop: string property name in obj to add the original post URLs to
"""
post = None
try:
post = self.source.get_post(post_id)
except:
logging.warning('Error fetching source post %s', post_id, exc_info=True)
return
if not post:
logging.warning('Source post %s not found', post_id)
return
original_post_discovery.discover(self.source, post, fetch_hfeed=False)
tags = [tag for tag in post['object'].get('tags', [])
if 'url' in tag and tag['objectType'] == 'article']
upstreams = post['object'].get('upstreamDuplicates', [])
if not isinstance(obj.setdefault(prop, []), list):
obj[prop] = [obj[prop]]
if upstreams:
obj[prop] += [{'url': url, 'objectType': 'article'} for url in upstreams]
obj.setdefault('tags', []).extend(
[{'url': tag.get('url'), 'objectType': 'mention'} for tag in tags])
else:
obj[prop] += tags
# check for redirects, and if there are any follow them and add final urls
# in addition to the initial urls.
seen = set()
for url_list in obj[prop], obj.get('tags', []):
for url_obj in url_list:
url = util.clean_webmention_url(url_obj.get('url', ''))
if not url or url in seen:
continue
seen.add(url)
# when debugging locally, replace my (snarfed.org) URLs with localhost
url_obj['url'] = url = util.replace_test_domains_with_localhost(url)
resolved, _, send = util.get_webmention_target(url)
if send and resolved != url and resolved not in seen:
seen.add(resolved)
url_list.append({'url': resolved, 'objectType': url_obj.get('objectType')})
logging.info('After original post discovery, urls are: %s', seen)
开发者ID:sanduhrs,项目名称:bridgy,代码行数:53,代码来源:handlers.py
示例10: post
def post(self):
logging.debug('Params: %s', self.request.params)
if self.lease(ndb.Key(urlsafe=self.request.params['key'])):
source_domains = self.entity.source.get().domains
to_send = set()
for url in self.entity.unsent:
url, domain, ok = util.get_webmention_target(url)
# skip "self" links to this blog's domain
if ok and domain not in source_domains:
to_send.add(url)
self.entity.unsent = list(to_send)
self.send_webmentions()
开发者ID:tantek,项目名称:bridgy,代码行数:14,代码来源:tasks.py
示例11: _urls_and_domains
def _urls_and_domains(self, auth_entity, user_url):
"""Returns this user's valid (not webmention-blacklisted) URLs and domains.
Converts the auth entity's user_json to an ActivityStreams actor and uses
its 'urls' and 'url' fields. May be overridden by subclasses.
Args:
auth_entity: :class:`oauth_dropins.models.BaseAuth`
user_url: string, optional URL passed in when authorizing
Returns:
([string url, ...], [string domain, ...])
"""
actor = self.gr_source.user_to_actor(json.loads(auth_entity.user_json))
logging.debug('Converted to actor: %s', json.dumps(actor, indent=2))
candidates = util.trim_nulls(util.uniquify(
[user_url] + microformats2.object_urls(actor)))
if len(candidates) > MAX_AUTHOR_URLS:
logging.info('Too many profile links! Only resolving the first %s: %s',
MAX_AUTHOR_URLS, candidates)
urls = []
for i, url in enumerate(candidates):
final, domain, ok = util.get_webmention_target(url, resolve=i < MAX_AUTHOR_URLS)
if ok:
final = final.lower()
if util.schemeless(final).startswith(util.schemeless(url.lower())):
# redirected to a deeper path. use the original higher level URL. #652
final = url
# If final has a path segment check if root has a matching rel=me.
match = re.match(r'^(https?://[^/]+)/.+', final)
if match and i < MAX_AUTHOR_URLS:
root = match.group(1)
resp = util.requests_get(root)
resp.raise_for_status()
data = util.mf2py_parse(resp.text, root)
me_urls = data.get('rels', {}).get('me', [])
if final in me_urls:
final = root
urls.append(final)
urls = util.dedupe_urls(urls) # normalizes domains to lower case
domains = [util.domain_from_link(url) for url in urls]
return urls, domains
开发者ID:mblaney,项目名称:bridgy,代码行数:46,代码来源:models.py
示例12: do_send_webmentions
def do_send_webmentions(self):
urls = self.entity.unsent + self.entity.error + self.entity.failed
unsent = set()
self.entity.error = []
self.entity.failed = []
for orig_url in urls:
# recheck the url here since the checks may have failed during the poll
# or streaming add.
url, domain, ok = util.get_webmention_target(orig_url)
if ok:
if len(url) <= _MAX_STRING_LENGTH:
unsent.add(url)
else:
logging.warning('Giving up on target URL over %s chars! %s',
_MAX_STRING_LENGTH, url)
self.entity.failed.append(orig_url)
self.entity.unsent = sorted(unsent)
while self.entity.unsent:
target = self.entity.unsent.pop(0)
source_url = self.source_url(target)
logging.info('Webmention from %s to %s', source_url, target)
# see if we've cached webmention discovery for this domain. the cache
# value is a string URL endpoint if discovery succeeded, a
# WebmentionSend error dict if it failed (semi-)permanently, or None.
cache_key = util.webmention_endpoint_cache_key(target)
cached = memcache.get(cache_key)
if cached:
logging.info('Using cached webmention endpoint %r: %s', cache_key, cached)
# send! and handle response or error
error = None
if isinstance(cached, dict):
error = cached
else:
mention = send.WebmentionSend(source_url, target, endpoint=cached)
logging.info('Sending...')
try:
if not mention.send(timeout=999, headers=util.USER_AGENT_HEADER):
error = mention.error
except BaseException, e:
logging.warning('', exc_info=True)
error = getattr(mention, 'error')
if not error:
error = ({'code': 'BAD_TARGET_URL', 'http_status': 499}
if 'DNS lookup failed for URL:' in str(e)
else {'code': 'EXCEPTION'})
if not cached:
val = (error if error and error['code'] in ('NO_ENDPOINT', 'BAD_TARGET_URL')
else mention.receiver_endpoint)
memcache.set(cache_key, val, time=WEBMENTION_DISCOVERY_CACHE_TIME)
if error is None:
logging.info('Sent! %s', mention.response)
self.record_source_webmention(mention)
self.entity.sent.append(target)
else:
code = error['code']
status = error.get('http_status', 0)
if (code == 'NO_ENDPOINT' or
(code == 'BAD_TARGET_URL' and status == 204)): # 204 is No Content
logging.info('Giving up this target. %s', error)
self.entity.skipped.append(target)
elif status // 100 == 4:
# Give up on 4XX errors; we don't expect later retries to succeed.
logging.info('Giving up this target. %s', error)
self.entity.failed.append(target)
else:
self.fail('Error sending to endpoint: %s' % error)
self.entity.error.append(target)
if target in self.entity.unsent:
self.entity.unsent.remove(target)
开发者ID:tantek,项目名称:bridgy,代码行数:76,代码来源:tasks.py
示例13: post
def post(self):
logging.info('Params: %self', self.request.params.items())
self.source_url = util.get_required_param(self, 'source')
self.target_url = util.get_required_param(self, 'target')
assert self.PREVIEW in (True, False)
# parse and validate target URL
try:
parsed = urlparse.urlparse(self.target_url)
except BaseException:
return self.error(msg, 'Could not parse target URL %s' % self.target_url)
domain = parsed.netloc
path_parts = parsed.path.rsplit('/', 1)
source_cls = SOURCE_NAMES.get(path_parts[-1])
if (domain not in ('brid.gy', 'www.brid.gy', 'localhost:8080') or
len(path_parts) != 2 or path_parts[0] != '/publish' or not source_cls):
return self.error('Target must be brid.gy/publish/{facebook,twitter}')
elif source_cls in (Instagram, GooglePlusPage):
return self.error('Sorry, %s is not yet supported.' %
source_cls.AS_CLASS.NAME)
# resolve source URL
url, domain, ok = util.get_webmention_target(self.source_url)
# show nice error message if they're trying to publish a silo post
if domain in SOURCE_DOMAINS:
return self.error(
"Looks like that's a %s URL. Try one from your web site instead!" %
SOURCE_DOMAINS[domain].AS_CLASS.NAME)
elif not ok:
return self.error('Unsupported source URL %s' % url)
elif not domain:
return self.error('Could not parse source URL %s' % url)
# When debugging locally, use snarfed.org for localhost webmentions
if appengine_config.DEBUG and domain == 'localhost':
domain = 'snarfed.org'
# look up source by domain
domain = domain.lower()
sources = source_cls.query().filter(source_cls.domains == domain).fetch(100)
if not sources:
return self.error("Could not find <b>%(type)s</b> account for <b>%(domain)s</b>. Check that your %(type)s profile has %(domain)s in its <em>web site</em> or <em>link</em> field, then try signing up again." %
{'type': source_cls.AS_CLASS.NAME, 'domain': domain})
for source in sources:
logging.info('Source: %s , features %s, status %s' %
(source.bridgy_url(self), source.features, source.status))
if source.status == 'enabled' and 'publish' in source.features:
self.source = source
break
else:
return self.error(
'Publish is not enabled for your account(s). Please visit %s and sign up!' %
' or '.join(s.bridgy_url(self) for s in sources))
# show nice error message if they're trying to publish their home page
for domain_url in self.source.domain_urls:
domain_url_parts = urlparse.urlparse(domain_url)
source_url_parts = urlparse.urlparse(self.source_url)
if (source_url_parts.netloc == domain_url_parts.netloc and
source_url_parts.path.strip('/') == domain_url_parts.path.strip('/') and
not source_url_parts.query):
return self.error(
"Looks like that's your home page. Try one of your posts instead!")
# done with the sanity checks, ready to fetch the source url. create the
# Publish entity so we can store the result.
entity = self.get_or_add_publish_entity(url)
if (entity.status == 'complete' and entity.type != 'preview' and
not self.PREVIEW and not appengine_config.DEBUG):
return self.error("Sorry, you've already published that page, and Bridgy Publish doesn't yet support updating or deleting existing posts. Ping Ryan if you want that feature!")
self.entity = entity
# fetch source page
resp = self.fetch_mf2(url)
if not resp:
return
self.fetched, data = resp
# loop through each item and its children and try to preview/create it. if
# it fails, try the next one. break after the first one that works.
resp = None
types = set()
queue = collections.deque(data.get('items', []))
while queue:
item = queue.popleft()
item_types = set(item.get('type'))
if 'h-feed' in item_types and 'h-entry' not in item_types:
queue.extend(item.get('children', []))
continue
try:
resp = self.attempt_single_item(item)
if self.entity.published:
break
if resp.abort:
return self.error(resp.error_plain, html=resp.error_html, data=item)
# try the next item
for embedded in ('rsvp', 'invitee', 'repost', 'repost-of', 'like',
#.........这里部分代码省略.........
开发者ID:sanduhrs,项目名称:bridgy,代码行数:101,代码来源:publish.py
示例14: do_send_webmentions
def do_send_webmentions(self):
urls = self.entity.unsent + self.entity.error + self.entity.failed
unsent = set()
self.entity.error = []
self.entity.failed = []
for orig_url in urls:
# recheck the url here since the checks may have failed during the poll
# or streaming add.
url, domain, ok = util.get_webmention_target(orig_url)
if ok:
if len(url) <= _MAX_STRING_LENGTH:
unsent.add(url)
else:
logging.warning("Giving up on target URL over %s chars! %s", _MAX_STRING_LENGTH, url)
self.entity.failed.append(orig_url)
self.entity.unsent = sorted(unsent)
while self.entity.unsent:
target = self.entity.unsent.pop(0)
source_url = self.source_url(target)
logging.info("Webmention from %s to %s", source_url, target)
# see if we've cached webmention discovery for this domain. the cache
# value is a string URL endpoint if discovery succeeded, a
# WebmentionSend error dict if it failed (semi-)permanently, or None.
cache_key = util.webmention_endpoint_cache_key(target)
cached = memcache.get(cache_key)
if cached:
logging.info("Using cached webmention endpoint %r: %s", cache_key, cached)
# send! and handle response or error
error = None
if isinstance(cached, dict):
error = cached
else:
mention = send.WebmentionSend(source_url, target, endpoint=cached)
logging.info("Sending...")
try:
if not mention.send(timeout=999, headers=util.REQUEST_HEADERS):
error = mention.error
except BaseException, e:
logging.warning("", exc_info=True)
error = getattr(mention, "error")
if not error:
error = (
{"code": "BAD_TARGET_URL", "http_status": 499}
if "DNS lookup failed for URL:" in str(e)
else {"code": "EXCEPTION"}
)
error_code = error["code"] if error else None
if error_code != "BAD_TARGET_URL" and not cached:
val = error if error_code == "NO_ENDPOINT" else mention.receiver_endpoint
memcache.set(cache_key, val, time=WEBMENTION_DISCOVERY_CACHE_TIME)
if error is None:
logging.info("Sent! %s", mention.response)
self.record_source_webmention(mention)
self.entity.sent.append(target)
else:
status = error.get("http_status", 0)
if error_code == "NO_ENDPOINT" or (error_code == "BAD_TARGET_URL" and status == 204): # No Content
logging.info("Giving up this target. %s", error)
self.entity.skipped.append(target)
elif status // 100 == 4:
# Give up on 4XX errors; we don't expect later retries to succeed.
logging.info("Giving up this target. %s", error)
self.entity.failed.append(target)
else:
self.fail("Error sending to endpoint: %s" % error)
self.entity.error.append(target)
if target in self.entity.unsent:
self.entity.unsent.remove(target)
开发者ID:snarfed,项目名称:bridgy,代码行数:75,代码来源:tasks.py
示例15: do_send_webmentions
def do_send_webmentions(self):
unsent = set()
for url in self.entity.unsent + self.entity.error:
# recheck the url here since the checks may have failed during the poll
# or streaming add.
url, domain, ok = util.get_webmention_target(url)
if ok:
# When debugging locally, redirect our own webmentions to localhost
if appengine_config.DEBUG and domain in util.LOCALHOST_TEST_DOMAINS:
url = url.replace(domain, 'localhost')
unsent.add(url)
self.entity.unsent = sorted(unsent)
self.entity.error = []
while self.entity.unsent:
target = self.entity.unsent.pop(0)
source_url = self.source_url(target)
logging.info('Webmention from %s to %s', source_url, target)
# see if we've cached webmention discovery for this domain. the cache
# value is a string URL endpoint if discovery succeeded, a
# WebmentionSend error dict if it failed (semi-)permanently, or None.
domain = util.domain_from_link(target)
cache_key = 'W ' + domain
cached = memcache.get(cache_key)
if cached:
logging.info('Using cached webmention endpoint for %s: %s',
domain, cached)
# send! and handle response or error
error = None
if isinstance(cached, dict):
error = cached
else:
mention = send.WebmentionSend(source_url, target, endpoint=cached)
logging.info('Sending...')
try:
if not mention.send(timeout=999):
error = mention.error
except:
logging.warning('', exc_info=True)
error = getattr(mention, 'error', None)
if not error:
error = {'code': 'EXCEPTION'}
if error is None:
logging.info('Sent! %s', mention.response)
if not self.entity.sent:
self.set_last_webmention_sent()
self.entity.sent.append(target)
memcache.set(cache_key, mention.receiver_endpoint,
time=WEBMENTION_DISCOVERY_CACHE_TIME)
else:
if error['code'] == 'NO_ENDPOINT':
logging.info('Giving up this target. %s', error)
self.entity.skipped.append(target)
memcache.set(cache_key, error, time=WEBMENTION_DISCOVERY_CACHE_TIME)
elif (error['code'] == 'BAD_TARGET_URL' and
error['http_status'] / 100 == 4):
# Give up on 4XX errors; we don't expect later retries to succeed.
logging.info('Giving up this target. %s', error)
self.entity.failed.append(target)
else:
self.fail('Error sending to endpoint: %s' % error)
self.entity.error.append(target)
if target in self.entity.unsent:
self.entity.unsent.remove(target)
if self.entity.error:
logging.warning('Propagate task failed')
self.release('error')
else:
self.complete()
开发者ID:notenoughneon,项目名称:bridgy,代码行数:74,代码来源:tasks.py
示例16: test_get_webmention_second_redirect_not_text_html
def test_get_webmention_second_redirect_not_text_html(self):
self.expect_requests_head(
"http://orig", redirected_url=["http://middle", "https://end"], content_type="application/pdf"
)
self.mox.ReplayAll()
self.assert_equals(("https://end", "end", False), util.get_webmention_target("http://orig", resolve=True))
开发者ID:singpolyma,项目名称:bridgy,代码行数:6,代码来源:test_util.py
示例17: _process_author
def _process_author(source, author_url, refetch_blanks=False):
"""Fetch the author's domain URL, and look for syndicated posts.
Args:
source: a subclass of models.Source
author_url: the author's homepage URL
refetch_blanks: boolean, if true, refetch SyndicatedPosts that have
previously been marked as not having a rel=syndication link
Return:
a dict of syndicated_url to models.SyndicatedPost
"""
# for now use whether the url is a valid webmention target
# as a proxy for whether it's worth searching it.
# TODO skip sites we know don't have microformats2 markup
author_url, _, ok = util.get_webmention_target(author_url)
if not ok:
return {}
try:
logging.debug('fetching author domain %s', author_url)
author_resp = requests.get(author_url, timeout=HTTP_TIMEOUT)
# TODO for error codes that indicate a temporary error, should we make
# a certain number of retries before giving up forever?
author_resp.raise_for_status()
except AssertionError:
raise # for unit tests
except BaseException:
# TODO limit allowed failures, cache the author's h-feed url
# or the # of times we've failed to fetch it
logging.warning('Could not fetch author url %s', author_url, exc_info=True)
return {}
author_dom = BeautifulSoup(author_resp.text)
author_parser = mf2py.Parser(url=author_url, doc=author_dom)
author_parsed = author_parser.to_dict()
# look for canonical feed url (if it isn't this one) using
# rel='feed', type='text/html'
for rel_feed_node in (author_dom.find_all('link', rel='feed')
+ author_dom.find_all('a', rel='feed')):
feed_url = rel_feed_node.get('href')
if not feed_url:
continue
feed_url = urlparse.urljoin(author_url, feed_url)
feed_type = rel_feed_node.get('type')
if not feed_type:
# type is not specified, use this to confirm that it's text/html
feed_url, _, feed_type_ok = util.get_webmention_target(feed_url)
else:
feed_type_ok = feed_type == 'text/html'
if feed_url == author_url:
logging.debug('author url is the feed url, proceeding')
break
elif not feed_type_ok:
logging.debug('skipping feed of type %s', feed_type)
continue
try:
logging.debug("fetching author's h-feed %s", feed_url)
feed_resp = requests.get(feed_url, timeout=HTTP_TIMEOUT)
feed_resp.raise_for_status()
logging.debug("author's h-feed fetched successfully %s", feed_url)
author_parsed = mf2py.Parser(
url=feed_url, doc=feed_resp.text).to_dict()
break
except AssertionError:
raise # reraise assertions for unit tests
except BaseException:
logging.warning('Could not fetch h-feed url %s.', feed_url, exc_info=True)
feeditems = author_parsed['items']
hfeed = next((item for item in feeditems
if 'h-feed' in item['type']), None)
if hfeed:
feeditems = hfeed.get('children', [])
else:
logging.info('No h-feed found, fallback to top-level h-entrys.')
permalinks = set()
for child in feeditems:
if 'h-entry' in child['type']:
# TODO if this h-entry in the h-feed has u-syndication links, we
# can just use it without fetching its permalink page
# TODO maybe limit to first ~30 entries? (do that here rather than,
# below because we want the *first* n entries)
for permalink in child['properties'].get('url', []):
permalinks.add(permalink)
# query all preexisting permalinks at once, instead of once per link
preexisting = {r.original: r for r in
SyndicatedPost.query_by_originals(source, permalinks)}
results = {}
for permalink in permalinks:
logging.debug('processing permalink: %s', permalink)
results.update(_process_entry(source, permalink, refetch_blanks,
preexisting))
#.........这里部分代码省略.........
开发者ID:notenoughneon,项目名称:bridgy,代码行数:101,代码来源:original_post_discovery.py
示例18: _process_author
def _process_author(source, author_url, refetch=False, store_blanks=True):
"""Fetch the author's domain URL, and look for syndicated posts.
Args:
source: a subclass of :class:`models.Source`
author_url: the author's homepage URL
refetch: boolean, whether to refetch and process entries we've seen before
store_blanks: boolean, whether we should store blank
:class:`models.SyndicatedPost`\ s when we don't find a relationship
Return:
a dict of syndicated_url to a list of new :class:`models.SyndicatedPost`\ s
"""
# for now use whether the url is a valid webmention target
# as a proxy for whether it's worth searching it.
author_url, _, ok = util.get_webmention_target(author_url)
if not ok:
return {}
try:
logging.debug('fetching author url %s', author_url)
author_resp = util.requests_get(author_url)
# TODO for error codes that indicate a temporary error, should we make
# a certain number of retries before giving up forever?
author_resp.raise_for_status()
author_dom = util.beautifulsoup_parse(author_resp.text)
except AssertionError:
raise # for unit tests
except BaseException:
# TODO limit allowed failures, cache the author's h-feed url
# or the # of times we've failed to fetch it
logging.info('Could not fetch author url %s', author_url, exc_info=True)
return {}
feeditems = _find_feed_items(author_url, author_dom)
# look for all other feed urls using rel='feed', type='text/html'
feed_urls = set()
for rel_feed_node in (author_dom.find_all('link', rel='feed')
+ author_dom.find_all('a', rel='feed')):
feed_url = rel_feed_node.get('href')
if not feed_url:
continue
feed_url = urlparse.urljoin(author_url, feed_url)
feed_type = rel_feed_node.get('type')
if feed_type and feed_type != 'text/html':
feed_ok = False
else:
# double check that it's text/html, not too big, etc
feed_url, _, feed_ok = util.get_webmention_target(feed_url)
if feed_url == author_url:
logging.debug('author url is the feed url, ignoring')
elif not feed_ok:
logging.debug('skipping feed of type %s', feed_type)
else:
feed_urls.add(feed_url)
for feed_url in feed_urls:
try:
logging.debug("fetching author's rel-feed %s", feed_url)
feed_resp = util.requests_get(feed_url)
feed_resp.raise_for_status()
logging.debug("author's rel-feed fetched successfully %s", feed_url)
feeditems = _merge_hfeeds(feeditems,
_find_feed_items(feed_url, feed_resp.text))
domain = util.domain_from_link(feed_url)
if source.updates is not None and domain not in source.domains:
domains = source.updates.setdefault('domains', source.domains)
if domain not in domains:
logging.info('rel-feed found new domain %s! adding to source', domain)
domains.append(domain)
except AssertionError:
raise # reraise assertions for unit tests
except BaseException:
logging.info('Could not fetch h-feed url %s.', feed_url, exc_info=True)
# sort by dt-updated/dt-published
def updated_or_published(item):
props = microformats2.first_props(item.get('properties'))
return props.get('updated') or props.get('published')
feeditems.sort(key=updated_or_published, reverse=True)
permalink_to_entry = collections.OrderedDict()
for child in feeditems:
if 'h-entry' in child['type']:
permalinks = child['properties'].get('url', [])
if not permalinks:
logging.debug('ignoring h-entry with no u-url!')
for permalink in permalinks:
if isinstance(permalink, basestring):
permalink_to_entry[permalink] = child
else:
logging.warn('unexpected non-string "url" property: %s', permalink)
max = (MAX_PERMALINK_FETCHES_BETA if source.is_beta_user()
#.........这里部分代码省略.........
开发者ID:snarfed,项目名称:bridgy,代码行数:101,代码来源:original_post_discovery.py
示例19: expand_target_urls
def expand_target_urls(self, activity):
"""Expand the inReplyTo or object fields of an ActivityStreams object
by fetching the original and looking for rel=syndication URLs.
This method modifies the dict in place.
Args:
activity: an ActivityStreams dict of the activity being published
"""
for field in ('inReplyTo', 'object'):
# microformats2.json_to_object de-dupes, no need to do it here
objs = activity.get(field)
if not objs:
continue
if isinstance(objs, dict):
objs = [objs]
augmented = list(objs)
for obj in objs:
url = obj.get('url')
if not url:
continue
# get_webmention_target weeds out silos and non-HTML targets
# that we wouldn't want to download and parse
url, _, ok = util.get_webmention_target(url)
if not ok:
continue
# fetch_mf2 raises a fuss if it can't fetch a mf2 document;
# easier to just grab this ourselves than add a bunch of
# special-cases to that method
logging.debug('expand_target_urls fetching field=%s, url=%s', field, url)
try:
resp = util.requests_get(url)
resp.raise_for_status()
data = mf2py.Parser(url=url, doc=resp.text).to_dict()
except AssertionError:
raise # for unit tests
except BaseException:
# it's not a big deal if we can't fetch an in-reply-to url
logging.warning('expand_target_urls could not fetch field=%s, url=%s',
field, url, exc_info=True)
continue
synd_urls = data.get('rels', {}).get('syndication', [])
# look for syndication urls in the first h-entry
queue = collections.deque(data.get('items', []))
while queue:
item = queue.popleft()
item_types = set(item.get('type', []))
if 'h-feed' in item_types and 'h-entry' not in item_types:
queue.extend(item.get('children', []))
continue
# these can be urls or h-cites
synd_urls += microformats2.get_string_urls(
item.get('properties', {}).get('syndication', []))
logging.debug('expand_target_urls found rel=syndication for url=%s: %r', url, synd_urls)
augmented += [{'url': u} for u in synd_urls]
activity[field] = augmented
开发者ID:lcorbasson,项目名称:bridgy,代码行数:65,代码来源:publish.py
示例20: test_get_webmention_cleans_redirected_urls
def test_get_webmention_cleans_redirected_urls(self):
self.expect_requests_head("http://foo/bar", redirected_url="http://final?utm_source=x")
self.mox.ReplayAll()
self.assert_equals(("http://final", "final", True), util.get_webmention_target("http://foo/bar", resolve=True))
self.assert_equals(("http://foo/bar", "foo", True), util.get_webmention_target("http://foo/bar", resolve=False))
开发者ID:singpolyma,项目名称:bridgy,代码行数:6,代码来源:test_util.py
注:本文中的util.get_webmention_target函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和 |
请发表评论