Chào các bạn,
Trong bài viết này tôi sẽ hướng dẫn step-by-step cách xây dựng một mô hình gửi/nhận tin nhắn với Apache Pulsar.
Bước 1: Sử dụng docker container của Apache Pulsar với câu lệnh docker pull apachepulsar/pulsar
Bước 2: Chạy docker với câu lệnh
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ -v $PWD/data:/pulsar/data \ apachepulsar/pulsar:latest \ bin/pulsar standalone
Docker chạy thành công sẽ trả về nội dung như bên dưới.
13:07:44.190 [pulsar-web-68-8] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [22/May/2021:13:07:44 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.2" 16 13:08:14.165 [pulsar-web-68-6] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [22/May/2021:13:08:14 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.2" 24
Bước 3: Cài thư viện pulsar-client cho Python với câu lệnh $ pip3 install pulsar-client==2.7.1
Bước 4: Chương trình Python cho phía producer (bên gửi)
import pulsar import time client = pulsar.Client('pulsar://localhost:6650') producer = client.create_producer('pulsar') for i in range(30): producer.send(('Hello-%d' % i).encode('utf-8')) time.sleep(3) client.close()
Chương trình chạy thành công sẽ trả về kết quả như bên dưới.
2021-05-22 19:58:52.009 INFO [0x109681e00] ConnectionPool:85 | Created connection for pulsar://localhost:6650 2021-05-22 19:58:52.015 INFO [0x700009a6e000] ClientConnection:356 | [[::1]:51936 -> [::1]:6650] Connected to broker 2021-05-22 19:58:52.102 INFO [0x700009a6e000] HandlerBase:54 | [persistent://public/default/pulsar, ] Getting connection from pool 2021-05-22 19:58:52.497 INFO [0x700009a6e000] ProducerImpl:170 | [persistent://public/default/pulsar, ] Created producer on broker [[::1]:51936 -> [::1]:6650]
Bước 5: Chương trình Python cho phía consumer ( bên nhận)
import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('pulsar', 'my-subscription') while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except: # Message failed to be processed consumer.negative_acknowledge(msg) client.close()
Chương trình chạy thành công sẽ trả về dữ liệu như bên dưới.
2021-05-22 19:59:16.941 INFO [0x114c79e00] Client:88 | Subscribing on Topic :pulsar 2021-05-22 19:59:16.942 INFO [0x114c79e00] ConnectionPool:85 | Created connection for pulsar://localhost:6650 2021-05-22 19:59:16.951 INFO [0x70000d72c000] ClientConnection:356 | [[::1]:51959 -> [::1]:6650] Connected to broker 2021-05-22 19:59:17.010 INFO [0x70000d72c000] HandlerBase:54 | [persistent://public/default/pulsar, my-subscription, 0] Getting connection from pool 2021-05-22 19:59:17.074 INFO [0x70000d72c000] ConsumerImpl:216 | [persistent://public/default/pulsar, my-subscription, 0] Created consumer on broker [[::1]:51959 -> [::1]:6650] Received message 'b'Hello-0'' id='(1103,0,-1,-1)' Received message 'b'Hello-1'' id='(1103,1,-1,-1)' Received message 'b'Hello-2'' id='(1103,2,-1,-1)' Received message 'b'Hello-3'' id='(1103,3,-1,-1)' Received message 'b'Hello-4'' id='(1103,4,-1,-1)' Received message 'b'Hello-5'' id='(1103,5,-1,-1)' Received message 'b'Hello-6'' id='(1103,6,-1,-1)' Received message 'b'Hello-7'' id='(1103,7,-1,-1)' Received message 'b'Hello-8'' id='(1103,8,-1,-1)' Received message 'b'Hello-9'' id='(1103,9,-1,-1)' Received message 'b'Hello-10'' id='(1103,10,-1,-1)' Received message 'b'Hello-11'' id='(1103,11,-1,-1)' Received message 'b'Hello-12'' id='(1103,12,-1,-1)' Received message 'b'Hello-13'' id='(1103,13,-1,-1)' Received message 'b'Hello-14'' id='(1103,14,-1,-1)' Received message 'b'Hello-15'' id='(1103,15,-1,-1)' Received message 'b'Hello-16'' id='(1103,16,-1,-1)' Received message 'b'Hello-17'' id='(1103,17,-1,-1)' Received message 'b'Hello-18'' id='(1103,18,-1,-1)' Received message 'b'Hello-19'' id='(1103,19,-1,-1)' Received message 'b'Hello-20'' id='(1103,20,-1,-1)' Received message 'b'Hello-21'' id='(1103,21,-1,-1)' Received message 'b'Hello-22'' id='(1103,22,-1,-1)' Received message 'b'Hello-23'' id='(1103,23,-1,-1)' Received message 'b'Hello-24'' id='(1103,24,-1,-1)' Received message 'b'Hello-25'' id='(1103,25,-1,-1)' Received message 'b'Hello-26'' id='(1103,26,-1,-1)' Received message 'b'Hello-27'' id='(1103,27,-1,-1)' Received message 'b'Hello-28'' id='(1103,28,-1,-1)' Received message 'b'Hello-29'' id='(1103,29,-1,-1)' 2021-05-22 20:09:17.010 INFO [0x70000d72c000] ConsumerStatsImpl:70 | Consumer [persistent://public/default/pulsar, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 230, totalNumBytesRecieved_ = 230, receivedMsgMap_ = {[Key: Ok, Value: 30], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 30], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 30], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 30], })
Như vậy là chúng ta đã xây dựng được một hệ thống gửi và nhận tin nhắn với Apache Pulsar bằng Python.
Link tham khảo
About Author

I’m Viet, the founder of this website with 8+ years experience in data analytics. My sharing is focus on data, which specialise on both Analytics and Business Intelligence platform as well as Data Science and Machine Learning platform.