javascript - Rate-limiting and count-limiting events in RxJS v5, but also allowing pass-through - Stack Overflow

I have a bunch of events to send up to a service. But the requests are rate limited and each request ha

I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:

  • 1 request per second: bufferTime(1000)
  • 100 event items per request: bufferCount(100)

The problem is, I am not sure how to bine them in a way that makes sense.

Allowing pass-through

Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.

For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.

Legacy API

I also found that there was a bufferWithTimeOrCount that existed in RxJS v4, although I am not sure how I'd use that even if I had it.

Test playground

Here is a JSBin I made for you to test your solution:

,console,output

Any help would be greatly appreciated.

I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:

  • 1 request per second: bufferTime(1000)
  • 100 event items per request: bufferCount(100)

The problem is, I am not sure how to bine them in a way that makes sense.

Allowing pass-through

Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.

For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.

Legacy API

I also found that there was a bufferWithTimeOrCount that existed in RxJS v4, although I am not sure how I'd use that even if I had it.

Test playground

Here is a JSBin I made for you to test your solution:

http://jsbin./fozexehiba/1/edit?js,console,output

Any help would be greatly appreciated.

Share Improve this question asked Mar 30, 2017 at 23:29 adrianmcliadrianmcli 2,0063 gold badges24 silver badges49 bronze badges
Add a ment  | 

3 Answers 3

Reset to default 5

The bufferTime() operator takes three parameters which bines the functionality of bufferTime and bufferCount. See http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-bufferTime.

With .bufferTime(1000, null, 3) you can make a buffer every 1000ms or when it reaches 3 items. However, this means that it doesn't guarantee 1000ms delay between each buffer.

So you could use something like this which is pretty easy to use (buffers only 3 items for max 1000ms):

click$
  .scan((a, b) => a + 1, 0)
  .bufferTime(1000, null, 3)
  .filter(buffer => buffer.length > 0)
  .concatMap(buffer => Rx.Observable.of(buffer).delay(1000))
  .timestamp()
  .subscribe(console.log);

See live demo: http://jsbin./libazer/7/edit?js,console,output

The only difference to what you probably wanted is that the first emission might be delayed by more than 1000ms. This is because both bufferTime() and delay(1000) operators make a delay to ensure that there's always at least 1000ms gap.

I hope this works for you.

Operator

events$
  .windowCount(10)
  .mergeMap(m => m.bufferTime(100))
  .concatMap(val => Rx.Observable.of(val).delay(100))
  .filter(f => f.length > 0)

Doc

  • .windowCount(number) : [ Rx Doc ]
  • .bufferTime(number) : [ Rx Doc ]

Demo

// test case
const mock = [8, 0, 2, 3, 30, 5, 6, 2, 2, 0, 0, 0, 1]

const tInterval = 100
const tCount = 10

Rx.Observable.interval(tInterval)
  .take(mock.length)
  .mergeMap(mm => Rx.Observable.range(0, mock[mm]))
  
  // start
  .windowCount(tCount)
  .mergeMap(m => m.bufferTime(tInterval))
  .concatMap(val => Rx.Observable.of(val).delay(tInterval))
  .filter(f => f.length > 0)
  // end

  .subscribe({
    next: (n) => console.log('Next: ', n),
    error: (e) => console.log('Error: ', e),
    plete: (c) => console.log('Completed'),
  })
<script src="https://unpkg./rxjs/bundles/Rx.min.js"></script>


Updated

After more testing. I found the answer above has some problem in extreme condition. I think they are caused by .window() and .concat(), and then I find a warning in the doc#concatMap.

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can plete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

However, I thought the right way to limit the request rate possibly is, that we could limit the cycle time of requests. In your case, just limit there is only 1 request per 10 milliseconds. It is simpler and may be more efficient to control the requests.

Operator

const tInterval = 100
const tCount = 10
const tCircle = tInterval / tCount

const rxTimer = Rx.Observable.timer(tCircle).ignoreElements()

events$
  .concatMap(m => Rx.Observable.of(m).merge(rxTimer)) // more accurate than `.delay()`
  // .concatMap(m => Rx.Observable.of(m).delay(tCircle))

or

events$
  .zip(Rx.Observable.interval(tCircle), (x,y) => x)

I've modified the answer I gave to this question to support your use case of adding a limited number of values (i.e. events) to pending requests.

The ments within should explain how it works.

Because you need to keep a record of the requests that have been made within the rate limit period, I don't believe that it's possible to use the bufferTime and bufferCount operators to do what you want - a scan is required so that you can maintain that state within the observable.

function rateLimit(source, period, valuesPerRequest, requestsPerPeriod = 1) {

  return source
    .scan((requests, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all requests made within the last period. If the
      // number of requests made is below the limit, the value can be
      // included in an immediate request. Otherwise, it will need to be
      // included in a delayed request.

      requests = requests.filter((request) => request.until > since);
      if (requests.length >= requestsPerPeriod) {

        const leastRecentRequest = requests[0];
        const mostRecentRequest = requests[requests.length - 1];

        // If there is a request that has not yet been made, append the
        // value to that request if the number of values in that request's
        // is below the limit. Otherwise, another delayed request will be
        // required.

        if (
          (mostRecentRequest.until > now) &&
          (mostRecentRequest.values.length < valuesPerRequest)
        ) {

          mostRecentRequest.values.push(value);

        } else {

          // until is the time until which the value should be delayed.

          const until = leastRecentRequest.until + (
            period * Math.floor(requests.length / requestsPerPeriod)
          );

          // concatMap is used below to guarantee the values are emitted
          // in the same order in which they are received, so the delays
          // are cumulative. That means the actual delay is the difference
          // between the until times.

          requests.push({
            delay: (mostRecentRequest.until < now) ?
              (until - now) :
              (until - mostRecentRequest.until),
            until,
            values: [value]
          });
        }

      } else {

        requests.push({
          delay: 0,
          until: now,
          values: [value]
        });
      }
      return requests;

    }, [])

    // Emit only the most recent request.

    .map((requests) => requests[requests.length - 1])

    // If multiple values are added to the request, it will be emitted
    // mulitple times. Use distinctUntilChanged so that concatMap receives
    // the request only once.

    .distinctUntilChanged()
    .concatMap((request) => {

      const observable = Rx.Observable.of(request.values);
      return request.delay ? observable.delay(request.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 250),
  1000,
  100,
  1
).subscribe((values) => console.log(
  `Request with ${values.length} value(s) at T+${Date.now() - start}`
));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg./rxjs@5/bundles/Rx.min.js"></script>

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745639707a4637613.html

相关推荐

  • 我用AI监控了奥特曼,当他一发推特AI就会自动给我打电话。

    上周我真的扛不住了。奥特曼这个孙贼,发了个X说,“要发一个礼拜的好东西”。我信了他的邪,明明出差1周,每天早上9点不到就要起来参加活动,但是晚上根本不敢睡觉,天天蹲到凌晨3点半,蹲到他们那边时间中午12点多,我才敢去睡觉。真的,那一整周,我

    1小时前
    00
  • 如何打造高效AI智能体?

    作者|Barry Zhang, Anthropic地址|出品|码个蛋(ID:codeegg)整理|陈宇明最近看到了 Anthropic 那篇著名的《Building effective agents》作者之一 Barry Zhang 在 2

    1小时前
    00
  • 开源在线考试系统

    看到调问已经开始扩展在线考试的场景,试了一下,发现在线考试的基本能力都已经支持了。主要是考试中的各种计分功能,包括对每道题的选项设置分值计算、考试时间限制等,和官方了解了一下,考试中的其他各项能力也在逐步完善,有需求可以随时

    51分钟前
    00
  • Go 语言 Mock 实践

    Mock 是软件测试中的一项关键技术,尤其在单元测试领域,可谓是“顶梁柱”般的存在,几乎不可或缺。它通过模拟真实对象的行为,使我们能在不依赖外部系统的情况下,专注测试代码的核心逻辑。对于测试开发、自动化测试,乃至性能测试中的某些场景,合理使

    47分钟前
    00
  • 深度学习在DOM解析中的应用:自动识别页面关键内容区块

    爬虫代理摘要本文介绍了如何在爬取东方财富吧()财经新闻时,利用深度学习模型对 DOM 树中的内容区块进行自动识别和过滤,并将新闻标题、时间、正文等关键信息分类存储。文章聚焦爬虫整体性能瓶颈,通过指标对比、优化策略、压测数据及改进结果,展示了

    41分钟前
    10
  • 什么是docker?它是如何工作的?

    想象一个场景,你要部署一个服务,然后它对环境有很多依赖,不同的操作系统又是不同的需求,而且还可能遇到有些源不能使用,又得一番折腾,折腾完上线后,假设要在新的环境再来一套,又得再来一遍。那么有没有什么办法可以解决呢?有办法,docker就是干

    39分钟前
    00
  • 推荐一个轻量级的监控平台并且支持移动端

    简介XUGOU 是基于Cloudflare构建的轻量化监控平台,专精于系统资源监控与可视化状态页面服务。该平台提供英文简体中文双语支持,满足全球化部署需求。面向开发者及中小团队,项目致力于提供高可用性的监控解决方案。核心功能与实现平台功能

    36分钟前
    00
  • module &#x27;torch.

    踩坑Ascend, 安装 pytorch 2.5.1 和 pytorch_npu 2.5.1, import torch 报错.执行 python -c "import torch;import torch_npu;"时

    35分钟前
    10
  • 【Docker项目实战】使用Docker部署IT工具箱Team·IDE

    一、Team·IDE介绍1.1 Team·IDE简介Team IDE 是一款集成多种数据库(如 MySQL、Oracle、金仓、达梦、神通等)与分布式系统组件(如 Redis、Zookeeper、Kafka、Elasticsearch)管理

    33分钟前
    00
  • 大模型驱动金融数据应用的实战探索

    近年来,人工智能技术的飞速发展正在重塑全球各行各业的生态格局,金融行业作为数据密集型领域,更是首当其冲。大模型凭借其强大的自然语言处理、逻辑推理和生成能力,逐渐成为金融数据应用的核心驱动力。本文将从行业背景与趋势、核心场景重构、产品能力提升

    31分钟前
    00
  • Nat. Mater.

    大家好,今天给大家分享一篇近期发表在Nat. Mater.上的研究进展,题为:De novo design of self-assembling peptides with antimicrobial activity guided

    23分钟前
    00
  • 雨晨 26200.5516 Windows 11 IoT 企业版 LTSC 2024 轻装版

    简述&#xff1a;以下为YCDISM (雨晨作品自2025年03月25日起通用介绍&#xff0c;若无重大更改不再额外敖述) 全程由最新YCDISM2025脱机装载26100.1742_zh-cn_windows_11_

    16分钟前
    00
  • Java&amp;Activiti7实战:轻松构建你的第一个工作流

    本文已收录在Github,关注我,紧跟本系列专栏文章,咱们下篇再续!

    14分钟前
    00
  • windows切换系统版本

    powershell 管理员身份打开 输入 irm massgrave.devget | iex 输入数字 对应后面写着 change windows edition新的会话框中选择想要的版本即可 获取windows 密钥 官方提供的

    11分钟前
    00
  • 【赵渝强老师】创建PostgreSQL的数据库

    在PostgreSQL中,创建数据库主要通过SQL命令“create database”完成,视频讲解如下:下面是具体的操作步骤。(1)查询现有数据库的集合,可以检查系统目录pg_database。代码语言:sql复制postgres=#

    9分钟前
    00
  • 在Windows上使用MetaMCP的完整指南

    在当今AI助手工具快速发展的时代&#xff0c;如何有效管理各种MCP&#xff08;Model Control Protocol&#xff09;服务成为了一个挑战。MetaMCP应运而生&#xff0c;它是

    8分钟前
    00
  • 设计模式:工厂方法模式(Factory Method)(2)

    当年做一个项目时,还不懂什么是设计模式,仅仅是按照经验完成了需求。回头看看,就是暗合桥接模式。但是,在整个需求实现过程中,甲方需要我在已经设计好的标准业务逻辑中添加非标的需求,因为,在他们眼里,从业务角度来看,是自然的拓展。如果当年我知道还

    7分钟前
    00
  • VoidZero 的野心,开发者的福音!

    前言昨天分享了尤雨溪公司 VoidZero 最新的产品 TSDown,我相信肯定有同学和我一样好奇,尤雨溪为什么要推出这么多工具,来增加大家的学习压力!今天我们从整体上分析下,这些产品的功能和目的!正文VoidZero 是尤雨溪于 2020

    6分钟前
    00
  • 解决Windows 10家庭单语言版语言限制:升级专业版全攻略

    解决Windows 10家庭单语言版语言限制:升级专业版全攻略 在日常使用Windows 10系统时,部分用户可能会遇到系统提示“当前许可证仅支持单一显示语言”的困扰。这一问题通常出现在预装或激活了Windows 10家庭单语言版的设备上

    5分钟前
    00
  • 电子产品设计与电源优化实用策略

    产品降成本是商业活动中的常见行为,可贯穿于产品设计、研发、生产、运输、销售及维护的各个环节。然而,降成本策略必须建立在对产品品质要求不降低的基础上,确保设计参数满足要求并通过相关测试。以下是具体优化与深度分析。研发工程师通常从设计入手,选择

    4分钟前
    00

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信