Hướng dẫn cách chạy Apache Pulsar Local với Python

Chào các bạn,

Trong bài viết này tôi sẽ hướng dẫn step-by-step cách xây dựng một mô hình gửi/nhận tin nhắn với Apache Pulsar.

Bước 1: Sử dụng docker container của Apache Pulsar với câu lệnh docker pull apachepulsar/pulsar

Bước 2: Chạy docker với câu lệnh

docker run -it \
    -p 6650:6650 \
    -p 8080:8080 \
    -v $PWD/data:/pulsar/data \
    apachepulsar/pulsar:latest \
    bin/pulsar standalone

Docker chạy thành công sẽ trả về nội dung như bên dưới.

13:07:44.190 [pulsar-web-68-8] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [22/May/2021:13:07:44 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.2" 16
13:08:14.165 [pulsar-web-68-6] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [22/May/2021:13:08:14 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.2" 24

Bước 3: Cài thư viện pulsar-client cho Python với câu lệnh $ pip3 install pulsar-client==2.7.1

Bước 4: Chương trình Python cho phía producer (bên gửi)

import pulsar
import time

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('pulsar')

for i in range(30):
    producer.send(('Hello-%d' % i).encode('utf-8'))
    time.sleep(3)

client.close()

Chương trình chạy thành công sẽ trả về kết quả như bên dưới.

2021-05-22 19:58:52.009 INFO  [0x109681e00] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2021-05-22 19:58:52.015 INFO  [0x700009a6e000] ClientConnection:356 | [[::1]:51936 -> [::1]:6650] Connected to broker
2021-05-22 19:58:52.102 INFO  [0x700009a6e000] HandlerBase:54 | [persistent://public/default/pulsar, ] Getting connection from pool
2021-05-22 19:58:52.497 INFO  [0x700009a6e000] ProducerImpl:170 | [persistent://public/default/pulsar, ] Created producer on broker [[::1]:51936 -> [::1]:6650]

Bước 5: Chương trình Python cho phía consumer ( bên nhận)

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('pulsar', 'my-subscription')

while True:
    msg = consumer.receive()
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
    except:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)

client.close()

Chương trình chạy thành công sẽ trả về dữ liệu như bên dưới.

2021-05-22 19:59:16.941 INFO  [0x114c79e00] Client:88 | Subscribing on Topic :pulsar
2021-05-22 19:59:16.942 INFO  [0x114c79e00] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2021-05-22 19:59:16.951 INFO  [0x70000d72c000] ClientConnection:356 | [[::1]:51959 -> [::1]:6650] Connected to broker
2021-05-22 19:59:17.010 INFO  [0x70000d72c000] HandlerBase:54 | [persistent://public/default/pulsar, my-subscription, 0] Getting connection from pool
2021-05-22 19:59:17.074 INFO  [0x70000d72c000] ConsumerImpl:216 | [persistent://public/default/pulsar, my-subscription, 0] Created consumer on broker [[::1]:51959 -> [::1]:6650] 
Received message 'b'Hello-0'' id='(1103,0,-1,-1)'
Received message 'b'Hello-1'' id='(1103,1,-1,-1)'
Received message 'b'Hello-2'' id='(1103,2,-1,-1)'
Received message 'b'Hello-3'' id='(1103,3,-1,-1)'
Received message 'b'Hello-4'' id='(1103,4,-1,-1)'
Received message 'b'Hello-5'' id='(1103,5,-1,-1)'
Received message 'b'Hello-6'' id='(1103,6,-1,-1)'
Received message 'b'Hello-7'' id='(1103,7,-1,-1)'
Received message 'b'Hello-8'' id='(1103,8,-1,-1)'
Received message 'b'Hello-9'' id='(1103,9,-1,-1)'
Received message 'b'Hello-10'' id='(1103,10,-1,-1)'
Received message 'b'Hello-11'' id='(1103,11,-1,-1)'
Received message 'b'Hello-12'' id='(1103,12,-1,-1)'
Received message 'b'Hello-13'' id='(1103,13,-1,-1)'
Received message 'b'Hello-14'' id='(1103,14,-1,-1)'
Received message 'b'Hello-15'' id='(1103,15,-1,-1)'
Received message 'b'Hello-16'' id='(1103,16,-1,-1)'
Received message 'b'Hello-17'' id='(1103,17,-1,-1)'
Received message 'b'Hello-18'' id='(1103,18,-1,-1)'
Received message 'b'Hello-19'' id='(1103,19,-1,-1)'
Received message 'b'Hello-20'' id='(1103,20,-1,-1)'
Received message 'b'Hello-21'' id='(1103,21,-1,-1)'
Received message 'b'Hello-22'' id='(1103,22,-1,-1)'
Received message 'b'Hello-23'' id='(1103,23,-1,-1)'
Received message 'b'Hello-24'' id='(1103,24,-1,-1)'
Received message 'b'Hello-25'' id='(1103,25,-1,-1)'
Received message 'b'Hello-26'' id='(1103,26,-1,-1)'
Received message 'b'Hello-27'' id='(1103,27,-1,-1)'
Received message 'b'Hello-28'' id='(1103,28,-1,-1)'
Received message 'b'Hello-29'' id='(1103,29,-1,-1)'
2021-05-22 20:09:17.010 INFO  [0x70000d72c000] ConsumerStatsImpl:70 | Consumer [persistent://public/default/pulsar, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 230, totalNumBytesRecieved_ = 230, receivedMsgMap_ = {[Key: Ok, Value: 30], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 30], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 30], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 30], })

Như vậy là chúng ta đã xây dựng được một hệ thống gửi và nhận tin nhắn với Apache Pulsar bằng Python.

About Author

Chia sẻ bài viết

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top