Distributed Tasks Processing With Google Pub/Sub

What is Pub/Sub?

Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events. You can use Pub/Sub as messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines. Pub/Sub offers durable message storage and real-time message delivery with high availability and consistent performance at scale. Pub/Sub servers run in all Google Cloud regions around the world.

Core concepts

  • Topic: A named resource to which messages are sent by publishers.
  • Subscription: A named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application.
  • Message: The combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
  • Message attribute: A key-value pair that a publisher can define for a message. For example, key iana.org/language_tag and value en could be added to messages to mark them as readable by an English-speaking subscriber.

Install

pip install google-cloud-pubsub

Task Dispatcher (Publisher)

In this example, the publisher will act as a task distributer.

import os
from google.cloud import pubsub_v1

project_id = os.environ.get('GCP_PROJECT_ID')
topic_id = os.environ.get('GCP_PUBSUB_TOPIC_ID')

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(future.result())

print(f"Published messages to {topic_path}.")

Task Workers (Subscribers)

Multiple workers will subscribe to a same subscriptions to receive difference tasks from a same publisher.

import os
from google.cloud import pubsub_v1

project_id = os.environ.get("GCP_PROJECT_ID")
subscription_id = os.environ.get("GCP_PUBSUB_SUBSCRIPTION_ID")

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 1

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
while True:
# The subscriber pulls a specific number of messages.
print(f"Waiting for a message...")
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
)

if not response:
time.sleep(1)
continue

ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)

print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)