Upstash Kafka Topics
Overview
Kafka is event streaming platform used for high-performance data pipelines, streaming analytics and any other types of messaging.
Upstash provides Kafka as a serverless service. This enables users to leverage Kafka without the need for paying and managing their own servers and cluster.
Upstash uses Per-Request pricing so you do NOT need to pay for the servers/instances and you only pay for request you make.
Upstash provides out of the box REST API for communicating with your Kafka topics. In combination with pay-per-request pricing makes Upstash a perfect choice for Serverless world.
When to use
Advantages
- REST API - access your Kafka topic using simple REST API
- Secure - SSL out of the box
- High availability - with option to enable multi-zone replication
- Pay-per-request - pay only for what you use. Refer to Upstash pricing
- Scalable
Disadvantages
- Separate billing - Even though Stacktape seamlessly integrates Upstash Kafka to your stacks, you still need to manage your billing separately.
Provider configuration
You must have an Upstash account. To create one, head over to Upstash registration page.
You need following to configure the provider:
- Email address of your Upstash account (
accountEmail
). - API Key associated with the account (
apiKey
).
- Email address of your Upstash account (
You can obtain API Key in Upstash account management console.
The recommended way to store your credentials is to use secrets.
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topic
Basic usage
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topic# produces messages into kafka topicmyProducer:type: functionproperties:packaging:type: stacktape-lambda-buildpackproperties:entryfilePath: producer.tsenvironment:- name: BROKERvalue: $ResourceParam('myTopic', 'tcpEndpoint')- name: TOPICvalue: $ResourceParam('myTopic', 'topicName')- name: USERNAMEvalue: $ResourceParam('myTopic', 'username')- name: PASSWORDvalue: $ResourceParam('myTopic', 'password')
Copy
import { Kafka } from 'kafkajs';// kafka clientconst kafka = new Kafka({clientId: 'my-lambda',brokers: [process.env.BROKER],sasl: { mechanism: 'scram-sha-256', username: process.env.USERNAME, password: process.env.PASSWORD },ssl: true});// producerconst producer = kafka.producer();export default async (event, context) => {// connect producerawait producer.connect();// send message to kafka topicawait producer.send({topic: process.env.TOPIC,messages: [{ value: 'Hello KafkaJS user!' }]});// disconnectawait producer.disconnect();};
Code of myProducer
function
Consumer Integration
Stacktape allows you to easily setup integration which triggers lambda when there are messages (records) in the upstash kafka topic.
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topic# is triggered when there are records in upstash kafka topicmyConsumer:type: functionproperties:packaging:type: stacktape-lambda-buildpackproperties:entryfilePath: consumer.tsevents:- type: kafka-topicproperties:upstashKafkaTopic: myTopic
Copy
import type { MSKHandler as KafkaTriggerHandler } from 'aws-lambda';const handler: KafkaTriggerHandler = async (event, context) => {const { records } = event;// actual records are encapsulated in Object (each key is topicName)// full event example here https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.htmlconst [topicName] = Object.keys(records);records[topicName].forEach((record) => {// Each array item contains details of the Kafka topic and Kafka partition identifier,// together with a timestamp and a base64-encoded message.// EXAMPLE RECORD// {// "topic": "upstash-connectivity-myKafkaUpstash",// "partition": 0,// "offset": 21,// "timestamp": 1647415103909,// "timestampType": "CREATE_TIME",// "value": "SGVsbG8gS2Fma2FKUyB1c2VyIQ==",// "headers": []// }console.log(record.partition);console.log(Buffer.from(record.value, 'base64').toString('utf8'));});};export default handler;
Code of myConsumer
function
Using REST API
Alongside Kafka clients using TCP connections, Upstash provides a REST API to access Kafka topics over HTTP. REST API is essential for more restricted environments (mobile, edge etc) but also significantly lightweight when compared to native Kafka clients. With the REST API you don't need to manage Kafka clients and connections yourself.
For more information about REST API refer to Upstash docs.
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topic# function using Upstash Kafka REST APImyRestKafkaTest:type: functionproperties:packaging:type: stacktape-lambda-buildpackproperties:entryfilePath: rest-kafka.tsenvironment:- name: KAFKA_REST_URLvalue: $ResourceParam('myTopic', 'restUrl')- name: TOPICvalue: $ResourceParam('myTopic', 'topicName')- name: USERNAMEvalue: $ResourceParam('myTopic', 'username')- name: PASSWORDvalue: $ResourceParam('myTopic', 'password')
Copy
import fetch from 'node-fetch';export default async (event, context) => {const message = 'hello from kafka producer';// for authorization we will need base64 encoded credentials (<username>:<password>)const base64EncodedCredentials = Buffer.from(`${process.env.USERNAME}:${process.env.PASSWORD}`).toString('base64');// send message to kafka topicawait fetch(`${process.env.KAFKA_REST_URL}/produce/${process.env.TOPIC}/${message}`, {headers: {Authorization: `Basic ${base64EncodedCredentials}`}});// consume message from topicconst result = await fetch(`${process.env.KAFKA_REST_URL}/consume/mygroup/myconsumer/${process.env.TOPIC}?timeout=5000`,{headers: {Authorization: `Basic ${base64EncodedCredentials}`}});// result in a form of jsonconst jsonResult = await result.json();console.log(jsonResult);};
Code of myRestKafkaTest
function
Specifying Upstash cluster
By default (if you do not specify clusterId
property) a new Upstash Kafka cluster is created when the topic is created
(topic cannot exist without a cluster).
Currently, Upstash Kafka cluster can be created only in the following regions:
- us-east-1
- eu-west-1
If you are deploying your stack in some other region, Stacktape will try to create the cluster in the region geographically closest to the region of your stack.
If you specify clusterId
, the topic will be created in the specified Upstash Cluster.
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topicproperties:clusterId: 166380a2-3eb0-4574-a22c-c71dc20bfb9c
Topic with specified clusterId
Partitions
- Partitions are a way to scale topic throughput.
- Each topic is divided into 1 or more partitions.
- If you do not specify a number a single partition is created.
- To understand topic partitioning, refer to Kafka docs.
You cannot change number of partitions once the topic is created.
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topicproperties:partitions: 20
Retention time
- Records(messages) in topic are by default retained for 1 week (604800000 ms)
- Retention time can affect your billing. Refer to Upstash docs
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topicproperties:retentionTime: 86400000 # 1 day
Retention size
- By default storage space is set to 1GB (1073741824 bytes)
- After the storage space is full old records will be cleaned-up according to the cleanup policy
- Retention size can affect your billing. Refer to Upstash docs
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topicproperties:retentionSize: 536870912 # 0,5 GB
Maximum message size
- By default maximum message size is 1 MB (1048576 bytes)
Copy
providerConfig:upstash:accountEmail: xxxxx.yyyy@example.comapiKey: $Secret('upstash-api-key')resources:myTopic:type: upstash-kafka-topicproperties:maxMessageSize: 8388608 # 8 MB
Referenceable parameters
The following parameters can be easily referenced using $ResourceParam directive directive.
To learn more about referencing parameters, refer to referencing parameters.
Name of the topic in the Kafka cluster. Name is constructed in the following way: <
>-< > - Usage:
$ResourceParam('<<resource-name>>', 'topicName')
Id of the cluster in which the topic is created.
- Usage:
$ResourceParam('<<resource-name>>', 'clusterId')
Password that can be used to authenticate (using
SASL_SCRAM_256_AUTH
protocol).- Usage:
$ResourceParam('<<resource-name>>', 'password')
Username that can be used to authenticate (using
SASL_SCRAM_256_AUTH
protocol).- Usage:
$ResourceParam('<<resource-name>>', 'username')
Standard endpoint of the Kafka cluster in which the topic is created.
- Usage:
$ResourceParam('<<resource-name>>', 'tcpEndpoint')
Rest URL for communicating with the cluster using REST API.
- Usage:
$ResourceParam('<<resource-name>>', 'restUrl')