Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
226 views
in Technique[技术] by (71.8m points)

Azure Durable Functions in Python: why is function 2 not triggered?

My use case is the following:

  1. F1: generate some data and write them to a CosmosDB (using a time trigger)
  2. F2: read the data that have just been written and add a username
  3. Orchestrator: control the workflow and call F2 after F1 is done

My problem: only F1 works, but F2 is not triggered at all. Why? Does F1 have to return a trigger or something?

That's how I know that only F1 is executed: enter image description here

F1

import logging
import hashlib
import time
import datetime
from azure.cosmos import CosmosClient
import azure.functions as func

def generate_id(string=None, length=10):
    '''This function generates a hash id to be attached to each new row'''

    ts = time.time()
    guid = hashlib.shake_128((str(string) + str(ts)).encode()).hexdigest(10)
    return guid

def main(mytimer: func.TimerRequest, outputDocument: func.Out[func.Document]) -> None:

    utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()

    if mytimer.past_due:
        logging.info('The timer is past due')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    result1 = {
    "first_letter": "A",
    "second_letter": "B",
    "third_letter": "C",
    "score": 0.001,
    }

    result1['id'] = generate_id()

    outputDocument.set(func.Document.from_dict(result1))

    return

F1 function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "*/30 * * * * *"
    },
    {
      "type": "cosmosDB",
      "direction": "out",
      "name": "outputDocument",
      "databaseName": myCosmosDB,
      "collectionName": myContainer,
      "createIfNotExists": "true",
      "connectionStringSetting": myConnString,
      "partitionKey": "id"
    }
  ]
}

F2

import logging
import azure.functions as func
from azure.cosmos import CosmosClient

def add_username(string=None):
    '''Generate username'''

    name = "MyName"
    surname = "MySurname"
    username = name+" "+surname

    return username


def main(F1activitytrigger, inputDocument: func.DocumentList) -> str:

    if inputDocument:
        logging.info('Document id: %s', inputDocument[0]['id'])

    result2 = inputDocument[0].data

    result2['username'] = add_username() 

    return result2

F2 function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "F1activitytrigger",
      "type": "activityTrigger",
      "direction": "in"
    },
    {
      "type": "cosmosDB",
      "direction": "in",
      "name": "inputDocument",
      "databaseName": myCosmosDB,
      "collectionName": myContainer,
      "createIfNotExists": "true",
      "connectionStringSetting": myConnString,
      "partitionKey": "id"     
    }
  ]
}

Orchestrator

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('Test-F1')
    result2 = yield context.call_activity('Test-F2')
    return [result1, result2]

main = df.Orchestrator.create(orchestrator_function)

Orchestrator function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

From your description, F1 is a function trigger by schedule(the type is 'timerTrigger').

F1 doesn't need Orchestrator to call it, but F2 needs.

You need to call F2 in Orchestrator, because the type of F2 is 'activityTrigger'.

So, your durable function should be like below:

F1

import logging

import azure.functions as func
import azure.durable_functions as df


async def main(mytimer: func.TimerRequest, starter: str) -> None:
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new("YourOrchestratorName", None, None)
    logging.info(f"Started orchestration with ID = '{instance_id}'.")

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "* * * * * *"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

YourOrchestratorName

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('F2', "Tokyo")
    result2 = yield context.call_activity('F2', "Seattle")
    result3 = yield context.call_activity('F2', "London")
    return [result1, result2, result3]

main = df.Orchestrator.create(orchestrator_function)

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

F2

import logging


def main(name: str) -> str:
    logging.info(f"Hello {name}!")
    return f"Hello {name}!"

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "name",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...