2023年7月10日发(作者:)
pythonkafka官⽅⽂档_kafka-python的API简单介绍在上⼀篇⽂章中说明了kafka-python的API使⽤的理论概念,这篇⽂章来说明API的实际使⽤。对于⽣成者我们着重于介绍⼀个send⽅法,其余的⽅法提到的时候会说明,在官⽅⽂档中有许多可配置参数可以查看,也可以查看上⼀篇博⽂中的参数。#send⽅法的详细说明,send⽤于向主题发送信息send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)topic (str) – topic where the message will be published,指定向哪个主题发送消息。value (optional) – message value. Must be type bytes, or be serializable to bytes via configured value_serializer. If value isNone, key is required and message acts as a ‘delete’.#value为要发送的消息值,必须为bytes类型,如果这个值为空,则必须有对应的key值,并且空值被标记为删除。可以通过配置value_serializer参数序列化为字节类型。key (optional) – a key to associate with the message. Can be used to determine which partition to send the message to. Ifpartition is None (and producer’s partitioner config is left as default),then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly).Must betype bytes, or be serializable to bytes via configured key_serializer.#key与value对应的键值,必须为bytes类型。kafka根据key值确定消息发往哪个分区(如果分区被指定则发往指定的分区),具有相同key的消息被发往同⼀个分区,如果key#为NONE则随机选择分区,可以使⽤key_serializer参数序列化为字节类型。headers (optional) – a list of header key value pairs. List items are tuples of str key and bytes value.#键值对的列表头部,列表项是str(key)和bytes(value)。timestamp_ms (int, optional) – epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults tocurrent time.#时间戳消息发送成功,返回的是RecordMetadata的对象;否则的话引发KafkaTimeoutError异常在进⾏实际测试前,先创建⼀个topics,这⾥我们利⽤控制台创建:[root@test3 bin]# ./ --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --create --topic kafkatest --replication-factor 3 --partitions 3Created topic"kafkatest".[root@test3 bin]# ./ --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --list --topic kafkatestkafkatest[root@test3 bin]# ./ --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --describe --topic kafkatestTopic:kafkatest PartitionCount:3 ReplicationFactor:3Configs:Topic: kafkatest Partition:0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: kafkatest Partition:1 Leader: 3 Replicas: 3,1,2 Isr:3,1,2Topic: kafkatest Partition:2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3[root@test3 bin]##主题有3个分区,3个复制系数,主题名为 kafka import KafkaProducerproducer= KafkaProducer(bootstrap_servers=["10.0.102.214:9092"])i= 20whileTrue:i+= 1msg= "producer1+%d" %iprint(msg)('kafkatest', key=bytes(str(i), value=('utf-8'))(1)()#就是⼀个简易的while循环,不停的向kafka发送消息,⼀定要注意send发送的key和value的值均为bytes类型。⼀个消费者的demo接收上⾯⽣产者发送的数据。from kafka import KafkaConsumerconsumer= KafkaConsumer("kafkatest", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='latest')for msginconsumer:key= (encoding="utf-8") #因为接收到的数据时bytes类型,因此需要解码value= (encoding="utf-8")print("%s-%d-%d key=%s value=%s" % (, ion, , key, value))#这是⼀个阻塞的过程,当⽣产者有消息传来的时候,就会读取消息,若是没有消息就会阻塞等待#auto_offset_reset参数表⽰重置偏移量,有两个取值,latest表⽰读取消息队列中最新的消息,另⼀个取值earliest表⽰读取最早的消息。执⾏上⾯的两个demo,得到的结果如下:消费者群组在上⼀篇博⽂中,说明了消费者群组与消费者的概念,这⾥我们来定义⼀个消费者群组。⼀个群组⾥的消费者订阅的是同⼀个主题,每个消费者接收主题⼀部分分区的消息。每个消费者接收主题⼀部分分区的消息创建⼀个消费者群组如下:from kafka import KafkaConsumerimporttime#消费者群组中有⼀个group_id参数,consumer= KafkaConsumer("kafkatest", group_id="test1", bootstrap_servers=["10.0.102.204:9092"],auto_offset_reset='latest')for msg inconsumer:key= (encoding="utf-8")value= (encoding="utf-8")print("%s-%d-%d key=%s value=%s" % (, ion, , key, value))消费者群组中的消费者总是消费订阅主题的部分数据。在pycharm中把上⾯的代码复制⼀份,这样在⼀个test1群组中就有了两个消费者,同时执⾏。分析: kafkatest主题有3个分区,3个分区会被分配给test1群组中的两个消费者,在上⾯⼀篇博⽂中提到,默认的分配策略时range。也就是说⼀个消费者可能由2个分区,另⼀个消费者只有⼀个分区;执⾏结果如下:下⾯会通过实例来说明⼏个消费者的⽅法的使⽤>>>from kafka import KafkaConsumer>>> consumer = KafkaConsumer("kafkatest", group_id="test1", bootstrap_servers=["10.0.102.204:9092"])>>>() #获取主主题列表,返回的是⼀个set集合{'kafkatest', 'lianxi', 'science'}>>> ions_for_topic("kafkatest") #获取主题的分区信息{0, 1, 2}>>>iption() #获取当前消费者订阅的主题{'kafkatest'}>>> on((0,)) #得到下⼀个记录的偏移量TypeError: partition must be a TopicPartition namedtuple#需要注意的是position⽅法需要传⼊的是⼀个kafka-python⾃带的⼀种数据结构TopicPartition,这种数据结构的定义如下,在使⽤的时候需要导⼊TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])>>> on(TopicPartition(topic='kafkatest', partition=1))17580下⾯说明poll()⽅法的⽤法:poll(timeout_ms=0, max_records=None)⽅法: 从指定的主题/分区中获取数据Records are fetched and returnedin batches by topic-partition. On each poll, consumer will try to use the last consumedoffset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek() orautomatically set as the last committed offset forthe subscribed list of partitions.#通过主题-分区分批获取和返回记录,在每⼀个轮询中,消费者将会使⽤最后消费的偏移量作为开始然后顺序fetch数据。最后消费的偏移量可以使⽤seek()⼿动设置,或者⾃动设置为订阅#的分区列表的最后提交的偏移量。Incompatible with iterator interface – use one or the other, not both.与迭代器的接⼝是对⽴的。timeout_ms (int, optional) – Milliseconds spent waiting in poll if data is not available in the buffer. If 0, returns immediatelywith anyrecords that are available currently in the buffer, else returns empty. Must not be negative. Default: 0max_records (int,optional) – The maximum number of records returned in a single call to poll(). Default: Inherit value from max_poll_records.默认从max_poll_records继承值。#⼀个简答的实例从kafka拉取数据from kafka import KafkaConsumerimporttimeconsumer= KafkaConsumer("kafkatest", bootstrap_servers=['10.0.102.204:9092'])whileTrue:msg= (timeout_ms=5)print(msg)(2)#执⾏结果如下,返回的是⼀个字典,consumerRecord对象包含着消息的⼀些元数据信息{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21929,timestamp=92, timestamp_type=0, key=b'138', value=b'producer1+138', checksum=-660348132,serialized_key_size=3, serialized_value_size=13)]}{TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22064,timestamp=93, timestamp_type=0, key=b'141', value=b'producer1+141', checksum=-1803506349,serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=2):[ConsumerRecord(topic='kafkatest', partition=2, offset=21930, timestamp=92, timestamp_type=0,key=b'139', value=b'producer1+139', checksum=-1863433503, serialized_key_size=3, serialized_value_size=13),ConsumerRecord(topic='kafkatest', partition=2, offset=21931, timestamp=93, timestamp_type=0,key=b'140', value=b'producer1+140', checksum=-280146643, serialized_key_size=3, serialized_value_size=13)]}{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21932,timestamp=94, timestamp_type=0, key=b'143', value=b'producer1+143', checksum=1459018748,serialized_key_size=3, serialized_value_size=13)]}{TopicPartition(topic='kafkatest', partition=1): [ConsumerRecord(topic='kafkatest', partition=1, offset=22046,timestamp=94, timestamp_type=0, key=b'142', value=b'producer1+142', checksum=-2023137030,serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=0):[ConsumerRecord(topic='kafkatest', partition=0, offset=22065, timestamp=94, timestamp_type=0,key=b'144', value=b'producer1+144', checksum=1999922748, serialized_key_size=3, serialized_value_size=13)]}seek()⽅法的⽤法:seek(partition, offset)Manually specify the fetch offsetfora TopicPartition.#⼿动指定拉取主题的偏移量Overrides the fetch offsets that the consumer will use on the next poll(). If this API is invokedfor the same partitionmorethan once,the latest offset will be used on the next poll().#覆盖下⼀个消费者使⽤poll()拉取的偏移量。如果这个API对同⼀个分区执⾏了多次,那么最后⼀个次的结果将会被使⽤。Note: You may lose dataif this API is arbitrarily used inthe middle of consumption to reset the fetch offsets.#如果在消费过程中任意使⽤此API以重置提取偏移,则可能会丢失数据。#实例如下>>> on(TopicPartition(topic="kafkatest",partition=1))22103#使⽤seek()设置偏移量>>> (partition=TopicPartition("kafkatest",1),offset=22222)#需要说明的是seek函数有⼀个partition参数,但是这个参数必须是TopicPartition类型的。>>> on(TopicPartition(topic="kafkatest",partition=1))22222与seek相关的还有两个⽅法:seek_to_beginning(*partitions)#寻找分区最早可⽤的偏移量seek_to_end(*partitions)#寻找分区最近可⽤的偏移量>>> _to_beginning(TopicPartition("kafkatest",1))>>> _to_end(TopicPartition("kafkatest",1))#注意这两个⽅法的参数都是TopicPartition类型。subscribe()⽅法,给当前消费者订阅主题。Subscribe to a list of topics, or a topic regex pattern.#订阅⼀个主体列表,或者主题的正则表达式Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replacethe current assignment (ifthere is one).#分区将会通过分区协调器⾃动分配。主题订阅不是增量的,这个列表将会替换已经存在的主题。This method is incompatible with assign().#这个⽅法与assign()⽅法是不兼容的。#说明⼀下listener参数:监听回调,该回调将在每次重新平衡操作之前和之后调⽤。作为组管理的⼀部分,消费者将跟踪属于特定组的使⽤者列表,并在以下事件之⼀触发时触发重新平衡操作:任何订阅主题的分区数都会发⽣变化 主题已创建或删除 消费者组织的现有成员死亡 将新成员添加到使⽤者组 触发任何这些事件时,将⾸先调⽤提供的侦听器以指⽰已撤消使⽤者的分配,然后在收到新分配时再次调⽤。请注意,此侦听器将⽴即覆盖先前对subscribe的调⽤中设置的任何侦听器。但是,可以保证通过此接⼝撤消/分配的分区来⾃此呼叫中订阅的主题。>>>iption() #当前消费者订阅的主题{'lianxi'}>>> ibe(("kafkatest","lianxi")) #订阅主题,会覆盖之前的主题>>>iption() #可以看到已经覆盖{'lianxi', 'kafkatest'}unsubscribe() :取消订阅所有主题并清除所有已分配的分区。assign(partitions):Manually assign a list of TopicPartitions to this consumer.#⼿动将TopicPartitions指定给此消费者。#这个函数和subscribe函数不能同时使⽤>>> (TopicPartition("kafkatest",1))assignment():Get the TopicPartitions currently assigned to this consumer.如果分区是使⽤assign()直接分配的,那么这将只返回先前分配的相同分区。如果使⽤subscribe()订阅了主题,那么这将给出当前分配给使⽤者的主题分区集(如果分配尚未发⽣,或者分区正在重新分配的过程中,则可能是None)beginning_offsets(partitions)Get the first offset forthe given partitions. #得到给定分区的第⼀个偏移量This method does not change the current consumer position of the partitions. #这个⽅法不会改变当前消费者的偏移量This method may block indefinitelyif the partition does not exist. #这个⽅法可能会阻塞,如果给定的分区没有出现。partitions参数仍然是TopicPartition类型。>>> ing_offsets(TopicPartition("kafkatest",1))#这个⽅法在kafka-python-1.3.1中没有close(autocommit=True)Close the consumer, waiting indefinitely forany needed cleanup. #关闭消费者,阻塞等待所需要的清理。Keyword Arguments:autocommit (bool) – If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt tocommit anypending consumed offsets prior to close. Default: True#如果为此使⽤者配置了⾃动提交,则此可选标志会导致使⽤者在关闭之前尝试提交任何待处理的消耗偏移量。默认值:Truecommit(offsets=None)Commit offsets to kafka, blocking untilsuccess or error.#提交偏移量到kafka,阻塞直到成功或者出错这只向Kafka提交偏移量。使⽤此API提交的偏移量将在每次重新平衡之后的第⼀次取出时以及在启动时使⽤。因此,如果需要在Kafka以外的任何地⽅存储偏移,则不应该使⽤此API。为了避免在重新启动使⽤者时重新处理读取的最后⼀条消息,提交的偏移量应该是应⽤程序应该使⽤的下⼀条消息,即:last_offset+1Parameters: offsets (dict, optional) – {TopicPartition: OffsetAndMetadata} dict to commit with the configured group_ts tocurrently consumed offsets for all subscribed _async(offsets=None, callback=None)Commit offsets to kafka asynchronously, optionally firing callback.#异步提交,可选择的触发回调,其余的和上⾯的commit⼀样。committed(partition)Get the last committed offset forthe given offset will be used as the positionfor the consumer inthe event of a failure.如果有问题的分区未分配给此使⽤者,或者使⽤者尚未初始化其已提交偏移量缓存,则此调⽤可能会阻⽌执⾏远程调⽤。>>> ted(TopicPartition("kafkatest",1))22103pase, pased和resumepase:暂停当前正在进⾏的请求。需要使⽤resume恢复pased:获取使⽤pase暂停时的分区信息resume: 从pase状态恢复。除了pased之外,其余两个⽅法的参数均为TopicPartation类型kafka-python除了有消费者和⽣成者之外,还有⼀个客户端,下⾯我们来说明客户端API。客户端API简单说明怎么使⽤客户端API创建主题。>>>from import KafkaClient>>> kc = KafkaClient(bootstrap_servers="10.0.102.204:9092")>>> #配置还是蛮多的{'bootstrap_servers': '10.0.102.204:9092', 'client_id': 'kafka-python-1.3.1', 'request_timeout_ms': 40000,'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes':None, 'socket_options': [(6, 1, 1)], 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'security_protocol':'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None,'ssl_password': None, 'ssl_crlfile': None, 'api_version': (0, 10), 'api_version_auto_timeout_ms': 2000, 'selector': , 'metrics':None, 'metric_group_prefix': '', 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None}#这些参数的具体意思可以查看上⾯的官⽅⽂档。>>> _topic("clent-1") #添加主题kafka-python还提供了其余两个API,broker连接API和集群连接API
发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1688930621a184700.html
评论列表(0条)