本文整理汇总了Python中assembl.models.Post类的典型用法代码示例。如果您正苦于以下问题:Python Post类的具体用法?Python Post怎么用?Python Post使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Post类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_mutation_add_extract_comment
def test_mutation_add_extract_comment(admin_user, graphql_request, idea_in_thread_phase, top_post_in_thread_phase, extract_post_1_to_subidea_1_1):
from graphene.relay import Node
raw_id = int(Node.from_global_id(top_post_in_thread_phase)[1])
from assembl.models import Post
post = Post.get(raw_id)
post.extracts.append(extract_post_1_to_subidea_1_1)
post.db.flush()
extract_id = extract_post_1_to_subidea_1_1.graphene_id()
idea_id = idea_in_thread_phase
res = schema.execute(u"""
mutation createPost {
createPost(
ideaId:"%s",
extractId:"%s",
subject:"Manger des choux à la crème",
body:"Je recommande de manger des choux à la crème, c'est très bon, et ça permet de maintenir l'industrie de la patisserie française."
) {
post {
... on Post {
parentExtractId
}
}
}
}
""" % (idea_id, extract_id), context_value=graphql_request)
assert res.data['createPost']['post']['parentExtractId'] == extract_id
开发者ID:assembl,项目名称:assembl,代码行数:29,代码来源:test_graphql_posts.py
示例2: get_post
def get_post(request):
post_id = request.matchdict["id"]
post = Post.get_instance(post_id)
view_def = request.GET.get("view")
if not post:
raise HTTPNotFound("Post with id '%s' not found." % post_id)
if view_def:
return post.generic_json(view_def)
else:
return post.serializable()
开发者ID:hellsingblack,项目名称:assembl,代码行数:12,代码来源:post.py
示例3: reply_post_3
def reply_post_3(request, participant2_user, discussion,
root_post_1, test_session):
"""
From participant2_user, in reply to reply_post_2
"""
from assembl.models import Post
p = Post(
discussion=discussion, creator=participant2_user,
subject=u"re2: root post", body=u"post body",
type="post", message_id="msg4")
test_session.add(p)
test_session.flush()
p.set_parent(root_post_1)
test_session.flush()
def fin():
print "finalizer reply_post_3"
test_session.delete(p)
test_session.flush()
request.addfinalizer(fin)
return p
开发者ID:iilab,项目名称:assembl,代码行数:21,代码来源:pytest_fixtures.py
示例4: get_post
def get_post(request):
post_id = request.matchdict['id']
post = Post.get_instance(post_id)
view_def = request.GET.get('view') or 'default'
if not post:
raise HTTPNotFound("Post with id '%s' not found." % post_id)
discussion_id = int(request.matchdict['discussion_id'])
user_id = authenticated_userid(request) or Everyone
permissions = get_permissions(user_id, discussion_id)
return post.generic_json(view_def, user_id, permissions)
开发者ID:Lornz-,项目名称:assembl,代码行数:12,代码来源:post.py
示例5: reply_post_2
def reply_post_2(request, participant1_user, discussion,
reply_post_1, test_session):
"""
From participant1_user, in reply to reply_post_1
"""
from assembl.models import Post
p = Post(
discussion=discussion, creator=participant1_user,
subject=u"re2: root post", body=u"post body",
creation_date=datetime(year=2000, month=1, day=5),
type="post", message_id="msg3")
test_session.add(p)
test_session.flush()
p.set_parent(reply_post_1)
test_session.flush()
def fin():
print "finalizer reply_post_2"
test_session.delete(p)
test_session.flush()
request.addfinalizer(fin)
return p
开发者ID:mydigilife,项目名称:assembl,代码行数:22,代码来源:pytest_fixtures.py
示例6: reply_deleted_post_4
def reply_deleted_post_4(request, participant2_user, discussion,
reply_post_1, test_session):
"""
From participant2_user, in reply to reply_post_1
"""
from assembl.models import Post, LangString, PublicationStates
p = Post(
discussion=discussion, creator=participant2_user,
subject=LangString.create(u"re2: root post"),
body=LangString.create(u"post body"),
publication_state=PublicationStates.DELETED_BY_ADMIN,
type="post", message_id="[email protected]")
test_session.add(p)
test_session.flush()
p.set_parent(reply_post_1)
test_session.flush()
def fin():
print "finalizer reply_deleted_post_4"
test_session.delete(p)
test_session.flush()
request.addfinalizer(fin)
return p
开发者ID:assembl,项目名称:assembl,代码行数:23,代码来源:posts.py
示例7: reply_to_deleted_post_5
def reply_to_deleted_post_5(
request, participant1_user, discussion,
reply_deleted_post_4, test_session):
"""
From participant2_user, in reply to root_post_1
"""
from assembl.models import Post, LangString
p = Post(
discussion=discussion, creator=participant1_user,
subject=LangString.create(u"re3: root post"),
body=LangString.create(u"post body"),
type="post", message_id="[email protected]")
test_session.add(p)
test_session.flush()
p.set_parent(reply_deleted_post_4)
test_session.flush()
def fin():
print "finalizer reply_to_deleted_post_5"
test_session.delete(p)
test_session.flush()
request.addfinalizer(fin)
return p
开发者ID:assembl,项目名称:assembl,代码行数:23,代码来源:posts.py
示例8: reply_post_1
def reply_post_1(request, participant2_user, discussion,
root_post_1, test_session):
"""
From participant2_user, in reply to root_post_1
"""
from assembl.models import Post, LangString
p = Post(
discussion=discussion, creator=participant2_user,
subject=LangString.create(u"re1: root post"),
body=LangString.create(u"post body with some text so we can test harvesting features. I'm writing a very topical comment with an unrelated source, hoping it would make people angry and make them write answers. I have read in '17O Solid-State NMR Spectroscopy of Functional Oxides for Energy Conversion' thesis by Halat, D. M. (2018) that variable-temperature spectra indicate the onset of oxide-ion motion involving the interstitials at 130 °C, which is linked to an orthorhombic−tetragonal phase transition. For the V-doped phases, an oxide-ion conduction mechanism is observed that involves oxygen exchange between the Bi-O sublattice and rapidly rotating VO4 tetrahedral units. The more poorly conducting P-doped phase exhibits only vacancy conduction with no evidence of sublattice exchange, a result ascribed to the differing propensities of the dopants to undergo variable oxygen coordination. So I think it would be a very bad idea to allow hot beverages in coworking spaces. But it looks like people don't really care about scientific evidence around here."),
creation_date=datetime(year=2000, month=1, day=4),
type="post", message_id="[email protected]")
test_session.add(p)
test_session.flush()
p.set_parent(root_post_1)
test_session.flush()
def fin():
print "finalizer reply_post_1"
test_session.delete(p)
test_session.flush()
request.addfinalizer(fin)
return p
开发者ID:assembl,项目名称:assembl,代码行数:23,代码来源:posts.py
示例9: reply_post_1
def reply_post_1(request, participant2_user, discussion,
root_post_1, test_session):
"""
From participant2_user, in reply to root_post_1
"""
from assembl.models import Post, LangString
p = Post(
discussion=discussion, creator=participant2_user,
subject=LangString.create(u"re1: root post"),
body=LangString.create(u"post body"),
creation_date=datetime(year=2000, month=1, day=4),
type="post", message_id="[email protected]")
test_session.add(p)
test_session.flush()
p.set_parent(root_post_1)
test_session.flush()
def fin():
print "finalizer reply_post_1"
test_session.delete(p)
test_session.flush()
request.addfinalizer(fin)
return p
开发者ID:Lornz-,项目名称:assembl,代码行数:23,代码来源:pytest_fixtures.py
示例10: upgrade
def upgrade(pyramid_env):
from assembl.models import Extract, TextFragmentIdentifier, Content, Post
db = Extract.db()
reg = re.compile(r"^//div\[@id='message-([0-9]+)'\](.*)")
with transaction.manager:
db.query(TextFragmentIdentifier).filter_by(extract=None).delete()
for tfi in db.query(TextFragmentIdentifier).join(
Extract, Content, Post).all():
xpo = tfi.xpath_start
print xpo
match = reg.match(xpo)
if match:
id, remainder = match.groups()
uri = Post.uri_generic(id)
xp = "//div[@id='message-%s']%s" % (
uri, remainder)
print xp
tfi.xpath_start = tfi.xpath_end = xp
开发者ID:bastnic,项目名称:assembl,代码行数:18,代码来源:30b910cff69d_uris_in_extract_ids.py
示例11: mark_post_read
def mark_post_read(request):
discussion_id = int(request.matchdict["discussion_id"])
discussion = Discussion.get_instance(discussion_id)
post_id = request.matchdict["id"]
post = Post.get_instance(post_id)
if not post:
raise HTTPNotFound("Post with id '%s' not found." % post_id)
post_id = post.id
user_id = authenticated_userid(request)
if not user_id:
raise HTTPUnauthorized()
read_data = json.loads(request.body)
db = Discussion.db()
change = False
with transaction.manager:
if read_data.get("read", None) is False:
view = db.query(ViewPost).filter(ViewPost.post_id == post_id).filter(Action.actor_id == user_id).first()
if view:
change = True
db.delete(view)
else:
count = db.query(ViewPost).filter(ViewPost.post_id == post_id).filter(Action.actor_id == user_id).count()
if not count:
change = True
db.add(ViewPost(post=post, actor_id=user_id))
new_counts = []
if change:
new_counts = Idea.idea_counts(discussion_id, post_id, user_id)
return {
"ok": True,
"ideas": [
{
"@id": Idea.uri_generic(idea_id),
"@type": db.query(Idea).get(idea_id).external_typename(),
"num_posts": total_posts,
"num_read_posts": read_posts,
}
for (idea_id, total_posts, read_posts) in new_counts
],
}
开发者ID:hellsingblack,项目名称:assembl,代码行数:42,代码来源:post.py
示例12: create_post
def create_post(request):
"""
We use post, not put, because we don't know the id of the post
"""
request_body = json.loads(request.body)
user_id = authenticated_userid(request)
user = Post.db.query(User).filter_by(id=user_id).one()
message = request_body.get('message', None)
html = request_body.get('html', None)
reply_id = request_body.get('reply_id', None)
subject = request_body.get('subject', None)
if not user_id:
raise HTTPUnauthorized()
if not message:
raise HTTPUnauthorized()
if reply_id:
post = Post.get_instance(reply_id)
post.content.reply(user, message)
return {"ok": True}
discussion_id = request.matchdict['discussion_id']
discussion = Discussion.get(id=int(discussion_id))
subject = subject or discussion.topic
if not discussion:
raise HTTPNotFound(
_("No discussion found with id=%s" % discussion_id)
)
for source in discussion.sources:
source.send(user, message, subject=subject, html_body=html)
return {"ok": True}
开发者ID:bastnic,项目名称:assembl,代码行数:39,代码来源:post.py
示例13: mark_post_read
def mark_post_read(request):
"""Mark this post as un/read. Return the read post count for all affected ideas."""
discussion_id = int(request.matchdict['discussion_id'])
discussion = Discussion.get_instance(discussion_id)
post_id = request.matchdict['id']
post = Post.get_instance(post_id)
if not post:
raise HTTPNotFound("Post with id '%s' not found." % post_id)
post_id = post.id
user_id = request.authenticated_userid
if not user_id:
raise HTTPUnauthorized()
read_data = json.loads(request.body)
db = discussion.db
change = False
with transaction.manager:
if read_data.get('read', None) is False:
view = db.query(ViewPost).filter_by(
post_id=post_id, actor_id=user_id,
tombstone_date=None).first()
if view:
change = True
view.is_tombstone = True
else:
count = db.query(ViewPost).filter_by(
post_id=post_id, actor_id=user_id,
tombstone_date=None).count()
if not count:
change = True
db.add(ViewPost(post=post, actor_id=user_id))
new_counts = []
if change:
new_counts = Idea.idea_read_counts(discussion_id, post_id, user_id)
return { "ok": True, "ideas": [
{"@id": Idea.uri_generic(idea_id),
"num_read_posts": read_posts
} for (idea_id, read_posts) in new_counts] }
开发者ID:assembl,项目名称:assembl,代码行数:39,代码来源:post.py
示例14: get_posts
def get_posts(request):
"""
Query interface on posts
Filters have two forms:
only_*, is for filters that cannot be reversed (ex: only_synthesis, only_orphan)
is_*, is for filters that can be reversed (ex:is_unread=true returns only unread message, is_unread=false returns only read messages)
order: can be chronological, reverse_chronological, popularity
root_post_id: all posts below the one specified.
family_post_id: all posts below the one specified, and all its ancestors.
post_reply_to: replies to a given post
root_idea_id: all posts associated with the given idea
ids: explicit message ids.
posted_after_date, posted_before_date: date selection (ISO format)
post_author: filter by author
classifier: filter on message_classifier, or absence thereof (classifier=null). Can be negated with "!"
"""
localizer = request.localizer
discussion_id = int(request.matchdict['discussion_id'])
discussion = Discussion.get(int(discussion_id))
if not discussion:
raise HTTPNotFound(localizer.translate(
_("No discussion found with id=%s")) % discussion_id)
discussion.import_from_sources()
user_id = request.authenticated_userid or Everyone
permissions = get_permissions(user_id, discussion_id)
DEFAULT_PAGE_SIZE = 25
page_size = DEFAULT_PAGE_SIZE
filter_names = [
filter_name for filter_name
in request.GET.getone('filters').split(',')
if filter_name
] if request.GET.get('filters') else []
try:
page = int(request.GET.getone('page'))
except (ValueError, KeyError):
page = 1
text_search = request.GET.get('text_search', None)
order = request.GET.get('order')
if order is None:
order = 'chronological'
assert order in ('chronological', 'reverse_chronological', 'score', 'popularity')
if order == 'score':
assert text_search is not None
if page < 1:
page = 1
root_post_id = request.GET.getall('root_post_id')
if root_post_id:
root_post_id = get_database_id("Post", root_post_id[0])
family_post_id = request.GET.getall('family_post_id')
if family_post_id:
family_post_id = get_database_id("Post", family_post_id[0])
root_idea_id = request.GET.getall('root_idea_id')
if root_idea_id:
root_idea_id = get_database_id("Idea", root_idea_id[0])
ids = request.GET.getall('ids[]')
if ids:
ids = [get_database_id("Post", id) for id in ids]
view_def = request.GET.get('view') or 'default'
only_synthesis = request.GET.get('only_synthesis')
post_author_id = request.GET.get('post_author')
if post_author_id:
post_author_id = get_database_id("AgentProfile", post_author_id)
assert AgentProfile.get(post_author_id), "Unable to find agent profile with id " + post_author_id
post_replies_to = request.GET.get('post_replies_to')
if post_replies_to:
post_replies_to = get_database_id("AgentProfile", post_replies_to)
assert AgentProfile.get(post_replies_to), "Unable to find agent profile with id " + post_replies_to
posted_after_date = request.GET.get('posted_after_date')
posted_before_date = request.GET.get('posted_before_date')
message_classifiers = request.GET.getall('classifier')
PostClass = SynthesisPost if only_synthesis == "true" else Post
if order == 'score':
posts = discussion.db.query(PostClass, Content.body_text_index.score_name)
else:
posts = discussion.db.query(PostClass)
posts = posts.filter(
PostClass.discussion_id == discussion_id,
).filter(PostClass.type != 'proposition_post')
##no_of_posts_to_discussion = posts.count()
post_data = []
#.........这里部分代码省略.........
开发者ID:assembl,项目名称:assembl,代码行数:101,代码来源:post.py
示例15: create_post
def create_post(request):
"""
We use post, not put, because we don't know the id of the post
"""
localizer = request.localizer
request_body = json.loads(request.body)
user_id = authenticated_userid(request)
if not user_id:
raise HTTPUnauthorized()
user = Post.default_db.query(User).filter_by(id=user_id).one()
message = request_body.get('message', None)
html = request_body.get('html', None)
reply_id = request_body.get('reply_id', None)
idea_id = request_body.get('idea_id', None)
subject = request_body.get('subject', None)
publishes_synthesis_id = request_body.get('publishes_synthesis_id', None)
if not message:
raise HTTPBadRequest(localizer.translate(
_("Your message is empty")))
if reply_id:
in_reply_to_post = Post.get_instance(reply_id)
else:
in_reply_to_post = None
if idea_id:
in_reply_to_idea = Idea.get_instance(idea_id)
else:
in_reply_to_idea = None
discussion_id = int(request.matchdict['discussion_id'])
discussion = Discussion.get_instance(discussion_id)
if not discussion:
raise HTTPNotFound(
localizer.translate(_("No discussion found with id=%s" % discussion_id))
)
if subject:
subject = subject
else:
#print(in_reply_to_post.subject, discussion.topic)
if in_reply_to_post:
subject = in_reply_to_post.get_title() if in_reply_to_post.get_title() else ''
elif in_reply_to_idea:
#TODO: THis should use a cascade like the frontend
subject = in_reply_to_idea.short_title if in_reply_to_idea.short_title else ''
else:
subject = discussion.topic if discussion.topic else ''
#print subject
subject = "Re: " + restrip_pat.sub('', subject)
post_constructor_args = {
'discussion': discussion,
'creator_id': user_id,
'subject': subject,
'body': html if html else message
}
if publishes_synthesis_id:
published_synthesis = Synthesis.get_instance(publishes_synthesis_id)
post_constructor_args['publishes_synthesis'] = published_synthesis
new_post = SynthesisPost(**post_constructor_args)
else:
new_post = AssemblPost(**post_constructor_args)
discussion.db.add(new_post)
discussion.db.flush()
if in_reply_to_post:
new_post.set_parent(in_reply_to_post)
if in_reply_to_idea:
idea_post_link = IdeaRelatedPostLink(
creator_id=user_id,
content=new_post,
idea=in_reply_to_idea
)
discussion.db.add(idea_post_link)
idea = in_reply_to_idea
while idea:
idea.send_to_changes()
parents = idea.get_parents()
idea = next(iter(parents)) if parents else None
else:
discussion.root_idea.send_to_changes()
for source in discussion.sources:
if 'send_post' in dir(source):
source.send_post(new_post)
permissions = get_permissions(user_id, discussion_id)
return new_post.generic_json('default', user_id, permissions)
开发者ID:uniteddiversity,项目名称:assembl,代码行数:94,代码来源:post.py
示例16: get_posts
def get_posts(request):
"""
Query interface on posts
Filters have two forms:
only_*, is for filters that cannot be reversed (ex: only_synthesis)
is_*, is for filters that can be reversed (ex:is_unread=true returns only unread
order can be chronological, reverse_chronological
message, is_unread=false returns only read messages)
"""
localizer = request.localizer
discussion_id = int(request.matchdict['discussion_id'])
discussion = Discussion.get(int(discussion_id))
if not discussion:
raise HTTPNotFound(localizer.translate(
_("No discussion found with id=%s")) % discussion_id)
discussion.import_from_sources()
user_id = authenticated_userid(request)
permissions = get_permissions(user_id, discussion_id)
DEFAULT_PAGE_SIZE = 25
page_size = DEFAULT_PAGE_SIZE
filter_names = [
filter_name for filter_name \
in request.GET.getone('filters').split(',') \
if filter_name
] if request.GET.get('filters') else []
try:
page = int(request.GET.getone('page'))
except (ValueError, KeyError):
page = 1
order = request.GET.get('order')
if order == None:
order = 'chronological'
assert order in ('chronological', 'reverse_chronological')
if page < 1:
page = 1
root_post_id = request.GET.getall('root_post_id')
if root_post_id:
root_post_id = get_database_id("Post", root_post_id[0])
root_idea_id = request.GET.getall('root_idea_id')
if root_idea_id:
root_idea_id = get_database_id("Idea", root_idea_id[0])
ids = request.GET.getall('ids[]')
if ids:
ids = [get_database_id("Post", id) for id in ids]
view_def = request.GET.get('view') or 'default'
only_synthesis = request.GET.get('only_synthesis')
PostClass = SynthesisPost if only_synthesis == "true" else Post
posts = Post.db.query(PostClass)
posts = posts.filter(
PostClass.discussion_id == discussion_id,
)
##no_of_posts_to_discussion = posts.count()
post_data = []
only_orphan = request.GET.get('only_orphan')
if only_orphan == "true":
if root_idea_id:
raise HTTPBadRequest(localizer.translate(
_("Getting orphan posts of a specific idea isn't supported.")))
orphans = text(Idea._get_orphan_posts_statement(),
bindparams=[bindparam('discussion_id', discussion_id)]
).columns(column('post_id')).alias('orphans')
posts = posts.join(orphans, PostClass.id==orphans.c.post_id)
elif only_orphan == "false":
raise HTTPBadRequest(localizer.translate(
_("Getting non-orphan posts isn't supported.")))
if root_idea_id:
related = text(Idea._get_related_posts_statement(),
bindparams=[bindparam('root_idea_id', root_idea_id),
bindparam('discussion_id', discussion_id)]
).columns(column('post_id')).alias('related')
#Virtuoso bug: This should work...
#posts = posts.join(related, PostClass.id==related.c.post_id)
posts = posts.filter(PostClass.id.in_(related))
if root_post_id:
root_post = Post.get(root_post_id)
posts = posts.filter(
(Post.ancestry.like(
root_post.ancestry + cast(root_post.id, String) + ',%'
))
|
(PostClass.id==root_post.id)
)
#.........这里部分代码省略.........
开发者ID:iilab,项目名称:assembl,代码行数:101,代码来源:post.py
示例17: post_extract
def post_extract(request):
"""
Create a new extract.
"""
extract_data = json.loads(request.body)
discussion_id = int(request.matchdict['discussion_id'])
user_id = authenticated_userid(request)
if not user_id:
# Straight from annotator
token = request.headers.get('X-Annotator-Auth-Token')
if token:
token = decode_token(
token, request.registry.settings['session.secret'])
if token:
user_id = token['userId']
if not user_id:
user_id = Everyone
if not user_has_permission(discussion_id, user_id, P_ADD_EXTRACT):
#TODO: maparent: restore this code once it works:
#return HTTPForbidden(result=ACLDenied(permission=P_ADD_EXTRACT))
return HTTPForbidden()
if user_id == Everyone:
# TODO: Create an anonymous user.
raise HTTPServerError("Anonymous extracts are not implemeted yet.")
content = None
uri = extract_data.get('uri')
annotation_text = None
if uri:
# Straight from annotator
annotation_text = extract_data.get('text')
else:
target = extract_data.get('target')
if not (target or uri):
raise HTTPClientError("No target")
target_type = target.get('@type')
if target_type == 'email':
post_id = target.get('@id')
post = Post.get_instance(post_id)
if not post:
raise HTTPNotFound(
"Post with id '%s' not found." % post_id)
content = post
elif target_type == 'webpage':
uri = target.get('url')
if uri and not content:
content = Webpage.get_instance(uri)
if not content:
# TODO: maparent: This is actually a singleton pattern, should be
# handled by the AnnotatorSource now that it exists...
source = AnnotatorSource.db.query(AnnotatorSource).filter_by(
discussion_id=discussion_id).filter(
cast(AnnotatorSource.name, Unicode) == 'Annotator').first()
if not source:
source = AnnotatorSource(
name='Annotator', discussion_id=discussion_id,
type='source')
content = Webpage(url=uri, discussion_id=discussion_id)
extract_body = extract_data.get('quote', '')
new_extract = Extract(
creator_id=user_id,
owner_id=user_id,
discussion_id=discussion_id,
body=extract_body,
annotation_text=annotation_text,
content=content
)
Extract.db.add(new_extract)
for range_data in extract_data.get('ranges', []):
range = TextFragmentIdentifier(
extract=new_extract,
xpath_start=range_data['start'],
offset_start=range_data['startOffset'],
xpath_end=range_data['end'],
offset_end=range_data['endOffset'])
TextFragmentIdentifier.db.add(range)
Extract.db.flush()
return {'ok': True, 'id': new_extract.uri()}
开发者ID:hellsingblack,项目名称:assembl,代码行数:80,代码来源:extract.py
示例18: get_data
def get_data(content):
"""Return uid, dict of fields we want to index,
return None if we don't index."""
from assembl.models import Idea, Post, SynthesisPost, AgentProfile, LangString, Extract, Question
if type(content) == Idea: # only index Idea, not Thematic or Question
data = {}
for attr in ('creation_date', 'id', 'discussion_id'):
data[attr] = getattr(content, attr)
populate_from_langstring_prop(content, data, 'title')
populate_from_langstring_prop(content, data, 'synthesis_title')
populate_from_langstring_prop(content, data, 'description')
announcement = content.get_applicable_announcement()
if announcement:
populate_from_langstring_prop(announcement, data, 'title', 'announcement_title')
populate_from_langstring_prop(announcement, data, 'body', 'announcement_body')
phase = content.get_associated_phase()
if phase:
data['phase_id'] = phase.id
data['phase_identifier'] = phase.identifier
data['message_view_override'] = content.message_view_override
return get_uid(content), data
elif isinstance(content, AgentProfile):
data = {}
for attr in ('creation_date', 'id', 'name'):
data[attr] = getattr(content, attr, None)
# AgentProfile doesn't have creation_date, User does.
# get all discussions that the user is in via AgentStatusInDiscussion
data['discussion_id'] = set([s.discussion_id
for s in content.agent_status_in_discussion])
# get discussion_id for all posts of this agent
data['discussion_id'] = list(
data['discussion_id'].union(
[post.discussion_id for post in content.posts_created]
)
)
return get_uid(content), data
elif isinstance(content, Post):
data = {}
data['_parent'] = 'user:{}'.format(content.creator_id)
if content.parent_id is not None:
data['parent_creator_id'] = content.parent.creator_id
for attr in ('discussion_id', 'creation_date', 'id', 'parent_id',
'creator_id', 'sentiment_counts'):
data[attr] = getattr(content, attr)
data['creator_display_name'] = AgentProfile.get(content.creator_id).display_name()
data['sentiment_tags'] = [key for key in data['sentiment_counts']
if data['sentiment_counts'][key] > 0]
like = data['sentiment_counts']['like']
disagree = data['sentiment_counts']['disagree']
dont_understand = data['sentiment_counts']['dont_understand']
more_info = data['sentiment_counts']['more_info']
all_sentiments = [like, disagree, dont_understand, more_info]
data['sentiment_counts']['total'] = sum(all_sentiments)
data['sentiment_counts']['popularity'] = like - disagree
data['sentiment_counts']['consensus'] = max(all_sentiments) / ((sum(all_sentiments) / len(all_sentiments)) or 1)
data['sentiment_counts']['controversy'] = max(like, disagree, 1) / min(like or 1, disagree or 1)
data['type'] = content.type # this is the subtype (assembl_post, email...)
# data['publishes_synthesis_id'] = getattr(
# content, 'publishes_synthesis_id', None)
phase = content.get_created_phase()
if phase:
data['phase_id'] = phase.id
data['phase_identifier'] = phase.identifier
if isinstance(content, SynthesisPost):
populate_from_langstring_prop(content.publishes_synthesis,
data, 'subject')
populate_from_langstring_prop(content.publishes_synthesis,
data, 'introduction')
populate_from_langstring_prop(content.publishes_synthesis,
data, 'conclusion')
long_titles = [idea.synthesis_title for idea in content.publishes_synthesis.ideas
if idea.synthesis_title]
long_titles_c = defaultdict(list)
for ls in long_titles:
for e in ls.entries:
if e.value:
long_titles_c[strip_country(e.base_locale)].append(e.value)
ls = LangString()
for locale, values in long_titles_c.iteritems():
ls.add_value(' '.join(values), locale)
populate_from_langstring(ls, data, 'ideas')
else:
idea_id = get_idea_id_for_post(content)
if not idea_id:
return None, None
data['idea_id'] = idea_id
related_idea = Idea.get(idea_id[0])
data['message_view_override'] = related_idea.message_view_override
if isinstance(related_idea, Question):
related_idea = related_idea.parents[0]
#.........这里部分代码省略.........
开发者ID:assembl,项目名称:assembl,代码行数:101,代码来源:utils.py
示例19: create_post
def create_post(request):
"""
We use post, not put, because we don't know the id of the post
"""
localizer = get_localizer(request)
request_body = json.loads(request.body)
user_id = authenticated_userid(request)
user = Post.db.query(User).filter_by(id=user_id).one()
message = request_body.get("message", None)
html = request_body.get("html", None)
reply_id = request_body.get("reply_id", None)
idea_id = request_body.get("idea_id", None)
subject = request_body.get("subject", None)
publishes_synthesis_id = request_body.get("publishes_synthesis_id", None)
if not user_id:
raise HTTPUnauthorized()
if not message:
raise HTTPUnauthorized()
if reply_id:
in_reply_to_post = Post.get_instance(reply_id)
else:
in_reply_to_post = None
if idea_id:
in_reply_to_idea = Idea.get_instance(idea_id)
else:
in_reply_to_idea = None
discussion_id = request.matchdict["discussion_id"]
discussion = Discussion.get_instance(discussion_id)
if not discussion:
raise HTTPNotFound(localizer.translate(_("No discussion found with id=%s" % discussion_id)))
if subject:
subject = subject
elif in_reply_to_post:
subject = in_reply_to_post.subject
elif in_reply_to_idea:
subject = in_reply_to_idea.short_title
else:
subject = discussion.topic
subject = "Re: " + restrip_pat.sub("", subject)
post_constructor_args = {
"discussion": discussion,
"message_id": uuid.uuid1().urn,
"creator_id": user_id,
"subject": subject,
"body": html if html else message,
}
if publishes_synthesis_id:
published_synthesis = Synthesis.get_instance(publishes_synthesis_id)
post_constructor_args["publishes_synthesis"] = published_synthesis
new_post = SynthesisPost(**post_constructor_args)
else:
new_post = AssemblPost(**post_constructor_args)
new_post.db.add(new_post)
new_post.db.flush()
if in_reply_to_post:
new_post.set_parent(in_reply_to_post)
if in_reply_to_idea:
idea_post_link = IdeaRelatedPostLink(creator_id=user_id, content=new_post, idea=in_reply_to_idea)
IdeaRelatedPostLink.db.add(idea_post_link)
for source in discussion.sources:
source.send_post(new_post)
return {"ok": True}
开发者ID:hellsingblack,项目名称:assembl,代码行数:76,代码来源:post.py
示例20: get_posts
def get_posts(request):
"""
Query interface on posts
Filters have two forms:
only_*, is for filters that cannot be reversed (ex: only_synthesis)
is_*, is for filters that can be reversed (ex:is_unread=true returns only unread
order can be chronological, reverse_chronological
message, is_unread=false returns only read messages)
"""
localizer = get_localizer(request)
discussion_id = int(request.matchdict["discussion_id"])
discussion = Discussion.get(id=int(discussion_id))
if not discussion:
raise HTTPNotFound(localizer.translate(_("No discussion found with id=%s")) % discussion_id)
discussion.import_from_sources()
user_id = authenticated_userid(request)
DEFAULT_PAGE_SIZE = 25
page_size = DEFAULT_PAGE_SIZE
filter_names = (
[filter_name for filter_name in request.GET.getone("filters").split(",") if filter_name]
if request.GET.get("filters")
else []
)
try:
page = int(request.GET.getone("page"))
except (ValueError, KeyError):
page = 1
order = request.GET.get("order")
if order == None:
order = "chronological"
assert order in ("chronological", "reverse_chronological")
if page < 1:
page = 1
root_post_id = request.GET.getall("root_post_id")
if root_post_id:
root_post_id = get_database_id("Post", root_post_id[0])
root_idea_id = request.GET.getall("root_idea_id")
if root_idea_id:
root_idea_id = get_database_id("Idea", root_idea_id[0])
ids = request.GET.getall("ids")
if ids:
ids = [get_database_id("Post", id) for id in ids]
view_def = request.GET.get("view")
only_synthesis = request.GET.get("only_synthesis")
if only_synthesis == "true":
posts = Post.db.query(SynthesisPost)
else:
posts = Post.db.query(Post)
posts = posts.filter(Post.discussion_id == discussion_id)
##no_of_posts_to_disc
|
请发表评论