How to Implement Controlled Parallelism with Azure Powershell Durable Functions to Work with Exchange Module Session Limits? - S

I am working on a solution that uses Azure Durable Functions to perform multiple parallel tasks in the

I am working on a solution that uses Azure Durable Functions to perform multiple parallel tasks in the ExchangeOnlineManagement module, collecting the Unified Audit Log to improve efficiency and speed up some long-running operations. Here is my current setup and challenge:

Overall Goal:

  1. I want to use Azure Durable Functions to run activities in parallel, leveraging the ExchangeOnlineManagement module.
  2. However, the ExchangeOnlineManagement module has a limitation of allowing only 3-5 concurrent sessions at a time.
  3. Therefore, I need a way to ensure that I do not exceed this session limit while still running as many activities concurrently as possible to speed up processing.

Current Workflow:

  1. HTTP Starter: I trigger the workflow with an HTTP request.
  2. Orchestrator: This function is designed to:
    • Create "chunks" of tasks that can run concurrently.
    • Limit the number of concurrent sessions to respect the Connect-Exchange restriction.
    • Wait for a task to complete before starting a new one if the maximum concurrent limit is reached.
  3. Activities performing the tasks. (This part works)

The challenge I am facing is how to effectively manage the parallelism in the orchestrator, such that it runs a maximum of 3-5 activities concurrently, waits for any of them to complete, and then starts a new one while maintaining an efficient workflow.

My issue

Where I'm stuck is trying to run tasks in parallel, but only 2-3 at a time, ensuring that I wait for one task to complete before starting the next. To track progress, I store the number of chunks to be processed, along with the counts for completed and remaining chunks, in variables. However, every time the orchestrator function restarts, these variables reset, causing all progress to be lost.

I have tried several approaches, such as using Set-DurableCustomStatus and creating script level or global variables,only setting the variables when it's not a replay, but none have worked to maintain state across orchestrator replays.

How would I keep track of the progress in my Orchestrator, or is this not something you should do in an Orchestrator? Or are there better solutions available, like storing state in an Azure Queue?

How my code currently works

My HTTP starter is triggered with a start date, end date, and interval. Based on this information, several time chunks are calculated. Currently, for testing purposes, I'm using a 1 day period with an interval of 240 minutes, resulting in 6 chunks. The idea is to trigger an activity for each of the 6 chunks, but only run 2 activities at the same time in my testing setup.

$timeChunks = @()
$currentStart = [DateTime]::Parse($input.startDate)
$endDate = [DateTime]::Parse($input.endDate)

while ($currentStart -lt $endDate) {
    $currentEnd = $currentStart.AddMinutes($input.interval)
    if ($currentEnd -gt $endDate) {
        $currentEnd = $endDate
    }
    
    $timeChunks += @{
        startDate = $currentStart.ToString("yyyy-MM-ddTHH:mm:ss")
        endDate = $currentEnd.ToString("yyyy-MM-ddTHH:mm:ss")
        tenantId = $input.tenantId
        maxResults = 5000
        chunkNumber = $timeChunks.Count
    }
    $currentStart = $currentEnd
}

In the next section, I initiate two parallel tasks and then wait for one to complete. Once a task completes, it is recorded in $completedChunkNumber, which I use to track which chunks are finished and which are still pending. However, every time the orchestrator restarts, the state resets to an empty array, resulting in all progress being lost.

# Process chunks while maintaining 2 parallel tasks
while ($activeTasks.Count -gt 0 -or $remainingChunks.Count -gt 0) {
    Write-Host "Active tasks: $($activeTasks.Count), Remaining chunks: $($remainingChunks.Count)"

    # Start new tasks if needed
    while ($activeTasks.Count -lt $maxParallelTasks -and $remainingChunks.Count -gt 0) {
        $chunk = $remainingChunks.Dequeue()
        Write-Host "Starting new task for chunk $($chunk.chunkNumber)"
        $task = Invoke-DurableActivity -FunctionName "Get-UAL-Activity" -Input $chunk -NoWait
        $activeTasks[$chunk.chunkNumber] = @{
            Task = $task
            Chunk = $chunk
        }
    }
    
    if ($activeTasks.Count -gt 0) {
        $tasks = @($activeTasks.Values | ForEach-Object { $_.Task })
        $completedTask = Wait-DurableTask -Task $tasks -Any
        
        # Find completed chunk
        $completedEntry = $activeTasks.GetEnumerator() | 
            Where-Object { $_.Value.Task.TaskId -eq $completedTask.TaskId } | 
            Select-Object -First 1
    
        $completedChunkNumber = $completedEntry.Key
        $originalChunk = $completedEntry.Value.Chunk
            
        Write-Host "Processing completion for chunk $completedChunkNumber"
            
        # Process result
        $result = Get-DurableTaskResult -Task $completedTask
        Write-Host "Got result: $($result | ConvertTo-Json)"
                        
        # Add to appropriate output collection
        if ($result.Status -eq "Success") {
            $finalOutput.Success += $result
            Write-Host "Added successful result for chunk $($result.ChunkNumber)"
        }
        else {
            $finalOutput.Failed += $result 
            Write-Host "Added failed result for chunk $($result.ChunkNumber)"
        }
        
        $finalOutput.ProcessedChunks += $completedChunkNumber
        
        
        # Remove completed task
        $activeTasks.Remove($completedChunkNumber)
        }
    }
}

I attempted to keep track of progress by using Set-DurableCustomStatus, but I'm unsure how to retrieve and use this status within the orchestrator itself, as its primary purpose seems to be for external status checking. How can I track progress within the orchestrator without losing it on every replay?

Azure Function Logs

My HTTP trigger calls the Orchestrator with the following output:

2024-11-18T10:15:21Z   [Information]   INFORMATION: Sending input: {
  "endDate": "2024-11-18T10:14:59",
  "timeoutMinutes": 6000,
  "tenantId": "blabla-ebfa12e7a31c",
  "startDate": "2024-11-17T10:14:59",
  "interval": 240
}

Then the Orchestrator will calculate the chunks and start tasks 0 and 1. It then gets the results for both of them before starting over and initiating tasks 0 and 1 again.:

2024-11-18T10:16:23Z   [Information]   INFORMATION: ================ ORCHESTRATOR START ================
2024-11-18T10:16:23Z   [Information]   INFORMATION: Created 6 chunks to process
2024-11-18T10:16:23Z   [Information]   INFORMATION: Processing chunks with max 2 parallel tasks
2024-11-18T10:16:23Z   [Information]   INFORMATION: Active tasks: 0, Remaining chunks: 6
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 0
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 1
2024-11-18T10:16:23Z   [Information]   INFORMATION: Processing completion for chunk 1
2024-11-18T10:16:23Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}
2024-11-18T10:16:23Z   [Information]   INFORMATION: Added successful result for chunk 1
2024-11-18T10:16:23Z   [Information]   INFORMATION: Active tasks: 1, Remaining chunks: 4
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 2
2024-11-18T10:16:23Z   [Information]   INFORMATION: Processing completion for chunk 2
2024-11-18T10:16:23Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}
2024-11-18T10:16:23Z   [Information]   INFORMATION: Added successful result for chunk 2
2024-11-18T10:16:23Z   [Information]   INFORMATION: Active tasks: 1, Remaining chunks: 3
2024-11-18T10:16:23Z   [Information]   INFORMATION: Starting new task for chunk 3
2024-11-18T10:16:23Z   [Information]   cacb3bf3-6042-4050-94af-dd28682f9f4e: Function 'Get-UAL-Activity (Activity)' scheduled. Reason: Get-UAL-Orchestrator. IsReplay: False. State: Scheduled. RuntimeStatus: Pending. HubName: AzureLogs. AppName: AzureLogs. SlotName: Production. ExtensionVersion: 2.13.4. SequenceNumber: 15.
2024-11-18T10:16:23Z   [Information]   Executed 'Functions.Get-UAL-Orchestrator' (Succeeded, Id=a989d96f-a98b-47ea-a9c2-6d407fdc5ee8, Duration=35ms)
2024-11-18T10:16:29Z   [Information]   Executing 'Functions.Get-UAL-Orchestrator' (Reason='(null)', Id=892258ce-29f7-4ded-9be1-5060c7fad9fb)
2024-11-18T10:16:29Z   [Verbose]   Sending invocation id: '892258ce-29f7-4ded-9be1-5060c7fad9fb
2024-11-18T10:16:29Z   [Verbose]   Posting invocation id:892258ce-29f7-4ded-9be1-5060c7fad9fb on workerId:b273c3af-20e0-4365-b70d-218b4c6e49ea
2024-11-18T10:16:29Z   [Information]   INFORMATION: ================ ORCHESTRATOR START ================
2024-11-18T10:16:29Z   [Information]   INFORMATION: Created 6 chunks to process
2024-11-18T10:16:29Z   [Information]   INFORMATION: Processing chunks with max 2 parallel tasks
2024-11-18T10:16:29Z   [Information]   INFORMATION: Active tasks: 0, Remaining chunks: 6
2024-11-18T10:16:29Z   [Information]   INFORMATION: Starting new task for chunk 0
2024-11-18T10:16:29Z   [Information]   INFORMATION: Starting new task for chunk 1
2024-11-18T10:16:29Z   [Information]   INFORMATION: Processing completion for chunk 1
2024-11-18T10:16:29Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}
2024-11-18T10:16:29Z   [Information]   INFORMATION: Added successful result for chunk 1
2024-11-18T10:16:29Z   [Information]   INFORMATION: Active tasks: 1, Remaining chunks: 4
2024-11-18T10:16:29Z   [Information]   INFORMATION: Starting new task for chunk 2
2024-11-18T10:16:29Z   [Information]   INFORMATION: Processing completion for chunk 2
2024-11-18T10:16:29Z   [Information]   INFORMATION: Got result: {
  "Status": "Success",
  "EndDate": "2024-11-17T14:14:59",
  "HasData": true,
  "StartDate": "2024-11-17T10:14:59",
  "BlobName": "Chuck-0.csv",
  "ChunkNumber": 0,
  "RecordsRetrieved": 586
}

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745628909a4636982.html

相关推荐

  • PackML over OPC UA

    在当今数字化转型的浪潮中,制造业正面临着前所未有的挑战与机遇。如何实现设备之间的高效通信与集成,成为提升生产效率、降低成本的关键。OPC UA(OPC Unified Architecture)与PackML(Packaging Machi

    1小时前
    00
  • 1.8w字图解Java并发容器: CHM、ConcurrentLinkedQueue、7 种阻塞队列的使用场景和原理

    文章多图且内容硬核,建议大家收藏上一章《1.6w 字图解 Java 并发:多线程挑战、线程状态和通信、死锁;AQS、ReentrantLock、Condition 使用和原理》,我们开启了 Java 高并发系列的学习,透彻理解 Java 并

    1小时前
    00
  • 初始JESD204B高速接口协议(JESD204B一)

    01、对比LVDS与JESD204JESD204B是逻辑器件和高速ADCDAC通信的一个串行接口协议,在此之前,ADCDAC与逻辑器件交互的接口大致分为如下几种。低速串行接口(I2C、SPI)、低速并行接口(包含时钟信号和并行数据信号,

    1小时前
    00
  • HLS最全知识库

    HLS最全知识库副标题-FPGA高层次综合HLS(二)-Vitis HLS知识库高层次综合(High-level Synthesis)简称HLS,指的是将高层次语言描述的逻辑结构,自动转换成低抽象级语言描述的电路模型的过程。对于AMD Xi

    58分钟前
    00
  • 面试官:从三万英尺角度谈一下Ceph架构设计(1)

    把面试官当陪练,在找工作中才会越战越勇大家好我是小义同学,这是大厂面试拆解——项目实战系列的第3篇文章,如果有误,请指正。本文主要解决的一个问题,Ceph为例子 如何描述项目的架构。一句话描述:主要矛盾发生变化10年前的技术和方案,放到10

    53分钟前
    00
  • AlignRAG:浙江大学提出的可泛化推理对齐框架,助力 RAG 系统解决推理失配问题

    近年来,检索增强生成(Retrieval-Augmented Generation, RAG)成为知识驱动文本生成的核心范式。然而,现有的 RAG 系统在推理过程中常常出现“推理失配”问题,即模型的推理路径与检索到的证据不一致,导致生成内容

    47分钟前
    00
  • 国产车载通信测试方案:车规级CAN SIC芯片测试技术解析

    随着智能网联汽车的快速发展,车辆内部电子控制单元(ECU)数量激增,动力总成、高级驾驶辅助系统(ADAS)、车身控制等功能对车载通信网络的稳定性与速率提出了更高要求。传统CAN FD总线在复杂拓扑中面临信号振铃、通信速率受限(实际速率通常低

    44分钟前
    00
  • OWASP TOP10

    什么是OWASP?它的全称是 Open Web Application Security Project(开放式 Web 应用程序 安全 项目)TOP 10OWASP Top 10的意思就是10项最严重的Web 应用程序安全风险列表 ,它总

    42分钟前
    00
  • 最简 Odoo 部署方法:Websoft9 企业应用托管平台

    传统方式部署 Odoo 通常依赖 Docker 技术,主要分为以下步骤:1 . 安装 Docker需在服务器上安装 Docker 引擎,涉及操作系统兼容性检查、依赖包安装、镜像源配置等操作。代码语言:bash复制 # 以 Ubu

    40分钟前
    00
  • Prometheus配置docker采集器

    Prometheus 配置 Docker 采集器Prometheus 是一个开源的监控系统和时间序列数据库,广泛用于容器化环境中。通过监控 Docker 容器,用户可以实时获取服务性能、资源使用情况等信息。本文将介绍如何为 Docker 容

    39分钟前
    00
  • 开源在线考试系统

    看到调问已经开始扩展在线考试的场景,试了一下,发现在线考试的基本能力都已经支持了。主要是考试中的各种计分功能,包括对每道题的选项设置分值计算、考试时间限制等,和官方了解了一下,考试中的其他各项能力也在逐步完善,有需求可以随时

    36分钟前
    00
  • Go 语言 Mock 实践

    Mock 是软件测试中的一项关键技术,尤其在单元测试领域,可谓是“顶梁柱”般的存在,几乎不可或缺。它通过模拟真实对象的行为,使我们能在不依赖外部系统的情况下,专注测试代码的核心逻辑。对于测试开发、自动化测试,乃至性能测试中的某些场景,合理使

    32分钟前
    00
  • MongoDB “升级项目” 大型连续剧(2)

    上期写的是非必要不升级,想到这个事情,有一些事情的仔细琢磨琢磨,为什么数据库升级的事情在很多公司都是一个困扰,从一个技术人的观点,升级是一件好事,功能提升了,性能提升了,开发效率和一些数据库使用的痛点也被解决了,为什么就不愿意升级呢?如果只

    29分钟前
    00
  • 深度学习在DOM解析中的应用:自动识别页面关键内容区块

    爬虫代理摘要本文介绍了如何在爬取东方财富吧()财经新闻时,利用深度学习模型对 DOM 树中的内容区块进行自动识别和过滤,并将新闻标题、时间、正文等关键信息分类存储。文章聚焦爬虫整体性能瓶颈,通过指标对比、优化策略、压测数据及改进结果,展示了

    26分钟前
    10
  • 【Docker项目实战】使用Docker部署IT工具箱Team·IDE

    一、Team·IDE介绍1.1 Team·IDE简介Team IDE 是一款集成多种数据库(如 MySQL、Oracle、金仓、达梦、神通等)与分布式系统组件(如 Redis、Zookeeper、Kafka、Elasticsearch)管理

    18分钟前
    00
  • maxwell遇到的一则问题

    结论和原因maxwell的元数据库里面没有存储全部的schema数据(就是少数据了),导致相关表的DDL校验失败。PS:我这里maxwell的作用只是采集库表修改情况的统计粗粒度指标,因为之前maxwell在运行报错的时候,直接修改了pos

    13分钟前
    00
  • Windows Server20192022 Evaluation评估版未激活导致关机问题

    摘要:在安装Windows Server 20192022后,会出现系统版本为 Evaluation 评估版情况,如提示Windows许可证已到期,就

    12分钟前
    00
  • 实现一个 MySQL 配置对比脚本需要考虑哪些细节?

    作者:李彬,爱可生 DBA 团队成员,负责项目日常问题处理及公司平台问题排查。爱好有亿点点多,吉他、旅行、打游戏爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。本文约 1500 字,预计阅读需要 3 分钟。引言想

    10分钟前
    00
  • windows新建open ai密钥

    api链接 openai的api需要付费才能使用但好像系统变量不知道为啥用不了打印出来,获取到的是None可以用了

    8分钟前
    00
  • 人工智能适合什么人学

    一、引言:人工智能浪潮下的新机遇在当今科技飞速发展的时代,人工智能(AI)无疑是最为耀眼的技术明星之一。从智能语音助手到自动驾驶汽车,从医疗诊断辅助到金融风险预测,人工智能正以前所未有的速度改变着我们的生活和工作方式。随着全球领先的终身学习

    5分钟前
    00

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信