Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
903 views
in Technique[技术] by (71.8m points)

azure - How to set the EventProcessorHost to read events from now on (UTC)?

We are using the EventProcessorHost to receive events from Azure EventHubs. I've been unsuccessfully trying to configure it (through the EventProcessorOptions.InitialOffsetProvider) to read events from UTC now on but it always reads from the start of the feed. I am not saving checkpoints (and I even deleted the BLOB container created). This is how I am setting it:

DateTime startDate = DateTime.UtcNow;

var epo = new EventProcessorOptions
            {
                MaxBatchSize = 100, 
                PrefetchCount = 100, 
                ReceiveTimeOut = TimeSpan.FromSeconds(120),  
                InitialOffsetProvider = (name) => startDate  
            };

Any guidance would be appreciated.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Think this changed in version 2.0.0 - Rajiv's code would now be:

var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};

Here is an example block with fully qualified classnames:

    private static async Task MainAsync(string[] args)
    {
        try{
            Console.WriteLine("Registering EventProcessor...");

            string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
            string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
            string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
            string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
            string AISEhEntityPath = Configuration["AISEhEntityPath"];
            string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
            string AISStorageContainerName = Configuration["AISStorageContainerName"];
            string AISStorageAccountName = Configuration["AISStorageAccountName"];
            string AISStorageAccountKey = Configuration["AISStorageAccountKey"];

            string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);

            var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
                AISEhEntityPath,
                AISEhConsumerGroupName,
                EhConnectionString,
                StorageConnectionString,
                AISStorageContainerName);

            var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
            {
                InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
            };

            // Registers the Event Processor Host and starts receiving messages
            await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);

            Thread.Sleep(Timeout.Infinite);

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
            NLog.LogManager.GetCurrentClassLogger().Error(ex);
            throw;
        }
    }

}

And here are my general settings with secrets/exact addresses obscured to help work things out, as I found working this out to be less pleasurable than extracting teeth:

"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName":  "<consumer group name e.g $Default>",
"AISStorageContainerName":  "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...