Stacktape

Sign up



Upstash Kafka Topics

Overview

  • Kafka is event streaming platform used for high-performance data pipelines, streaming analytics and any other types of messaging.

  • Upstash provides Kafka as a serverless service. This enables users to leverage Kafka without the need for paying and managing their own servers and cluster.

  • Upstash uses Per-Request pricing so you do NOT need to pay for the servers/instances and you only pay for request you make.

  • Upstash provides out of the box REST API for communicating with your Kafka topics. In combination with pay-per-request pricing makes Upstash a perfect choice for Serverless world.

When to use

Advantages

  • REST API - access your Kafka topic using simple REST API
  • Secure - SSL out of the box
  • High availability - with option to enable multi-zone replication
  • Pay-per-request - pay only for what you use. Refer to Upstash pricing
  • Scalable

Disadvantages

  • Separate billing - Even though Stacktape seamlessly integrates Upstash Kafka to your stacks, you still need to manage your billing separately.

Provider configuration

  • You must have an Upstash account. To create one, head over to Upstash registration page.

  • You need following to configure the provider:

    • Email address of your Upstash account (accountEmail).
    • API Key associated with the account (apiKey).
  • You can obtain API Key in Upstash account management console.

  • The recommended way to store your credentials is to use secrets.

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic

Basic usage

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
# produces messages into kafka topic
myProducer:
type: function
properties:
packaging:
type: stacktape-lambda-buildpack
properties:
entryfilePath: producer.ts
environment:
- name: BROKER
value: $ResourceParam('myTopic', 'tcpEndpoint')
- name: TOPIC
value: $ResourceParam('myTopic', 'topicName')
- name: USERNAME
value: $ResourceParam('myTopic', 'username')
- name: PASSWORD
value: $ResourceParam('myTopic', 'password')

Copy

import { Kafka } from 'kafkajs';
// kafka client
const kafka = new Kafka({
clientId: 'my-lambda',
brokers: [process.env.BROKER],
sasl: { mechanism: 'scram-sha-256', username: process.env.USERNAME, password: process.env.PASSWORD },
ssl: true
});
// producer
const producer = kafka.producer();
export default async (event, context) => {
// connect producer
await producer.connect();
// send message to kafka topic
await producer.send({
topic: process.env.TOPIC,
messages: [{ value: 'Hello KafkaJS user!' }]
});
// disconnect
await producer.disconnect();
};

Code of myProducer function

UpstashKafkaTopic  API reference
type
Required
properties.clusterId
properties.partitions
Default: 1
properties.retentionTime
Default: 604800000
properties.retentionSize
Default: 1073741824
properties.maxMessageSize
Default: 1048576
properties.cleanupPolicy
Default: delete
overrides

Consumer Integration

Stacktape allows you to easily setup integration which triggers lambda when there are messages (records) in the upstash kafka topic.

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
# is triggered when there are records in upstash kafka topic
myConsumer:
type: function
properties:
packaging:
type: stacktape-lambda-buildpack
properties:
entryfilePath: consumer.ts
events:
- type: kafka-topic
properties:
upstashKafkaTopic: myTopic

Copy

import type { MSKHandler as KafkaTriggerHandler } from 'aws-lambda';
const handler: KafkaTriggerHandler = async (event, context) => {
const { records } = event;
// actual records are encapsulated in Object (each key is topicName)
// full event example here https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
const [topicName] = Object.keys(records);
records[topicName].forEach((record) => {
// Each array item contains details of the Kafka topic and Kafka partition identifier,
// together with a timestamp and a base64-encoded message.
// EXAMPLE RECORD
// {
// "topic": "upstash-connectivity-myKafkaUpstash",
// "partition": 0,
// "offset": 21,
// "timestamp": 1647415103909,
// "timestampType": "CREATE_TIME",
// "value": "SGVsbG8gS2Fma2FKUyB1c2VyIQ==",
// "headers": []
// }
console.log(record.partition);
console.log(Buffer.from(record.value, 'base64').toString('utf8'));
});
};
export default handler;

Code of myConsumer function

Using REST API

Alongside Kafka clients using TCP connections, Upstash provides a REST API to access Kafka topics over HTTP. REST API is essential for more restricted environments (mobile, edge etc) but also significantly lightweight when compared to native Kafka clients. With the REST API you don't need to manage Kafka clients and connections yourself.

For more information about REST API refer to Upstash docs.

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
# function using Upstash Kafka REST API
myRestKafkaTest:
type: function
properties:
packaging:
type: stacktape-lambda-buildpack
properties:
entryfilePath: rest-kafka.ts
environment:
- name: KAFKA_REST_URL
value: $ResourceParam('myTopic', 'restUrl')
- name: TOPIC
value: $ResourceParam('myTopic', 'topicName')
- name: USERNAME
value: $ResourceParam('myTopic', 'username')
- name: PASSWORD
value: $ResourceParam('myTopic', 'password')

Copy

import fetch from 'node-fetch';
export default async (event, context) => {
const message = 'hello from kafka producer';
// for authorization we will need base64 encoded credentials (<username>:<password>)
const base64EncodedCredentials = Buffer.from(`${process.env.USERNAME}:${process.env.PASSWORD}`).toString('base64');
// send message to kafka topic
await fetch(`${process.env.KAFKA_REST_URL}/produce/${process.env.TOPIC}/${message}`, {
headers: {
Authorization: `Basic ${base64EncodedCredentials}`
}
});
// consume message from topic
const result = await fetch(
`${process.env.KAFKA_REST_URL}/consume/mygroup/myconsumer/${process.env.TOPIC}?timeout=5000`,
{
headers: {
Authorization: `Basic ${base64EncodedCredentials}`
}
}
);
// result in a form of json
const jsonResult = await result.json();
console.log(jsonResult);
};

Code of myRestKafkaTest function

Specifying Upstash cluster

By default (if you do not specify clusterId property) a new Upstash Kafka cluster is created when the topic is created (topic cannot exist without a cluster).

Currently, Upstash Kafka cluster can be created only in the following regions:

  • us-east-1
  • eu-west-1

If you are deploying your stack in some other region, Stacktape will try to create the cluster in the region geographically closest to the region of your stack.


If you specify clusterId, the topic will be created in the specified Upstash Cluster.

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
properties:
clusterId: 166380a2-3eb0-4574-a22c-c71dc20bfb9c

Topic with specified clusterId

Partitions

  • Partitions are a way to scale topic throughput.
  • Each topic is divided into 1 or more partitions.
  • If you do not specify a number a single partition is created.
  • To understand topic partitioning, refer to Kafka docs.

You cannot change number of partitions once the topic is created.

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
properties:
partitions: 20

Retention time

  • Records(messages) in topic are by default retained for 1 week (604800000 ms)
  • Retention time can affect your billing. Refer to Upstash docs

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
properties:
retentionTime: 86400000 # 1 day

Retention size

  • By default storage space is set to 1GB (1073741824 bytes)
  • After the storage space is full old records will be cleaned-up according to the cleanup policy
  • Retention size can affect your billing. Refer to Upstash docs

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
properties:
retentionSize: 536870912 # 0,5 GB

Maximum message size

  • By default maximum message size is 1 MB (1048576 bytes)

Copy

providerConfig:
upstash:
accountEmail: xxxxx.yyyy@example.com
apiKey: $Secret('upstash-api-key')
resources:
myTopic:
type: upstash-kafka-topic
properties:
maxMessageSize: 8388608 # 8 MB

Referenceable parameters

The following parameters can be easily referenced using $ResourceParam directive directive.

To learn more about referencing parameters, refer to referencing parameters.

topicName
  • Name of the topic in the Kafka cluster. Name is constructed in the following way: <>-<>

  • Usage: $ResourceParam('<<resource-name>>', 'topicName')
clusterId
  • Id of the cluster in which the topic is created.

  • Usage: $ResourceParam('<<resource-name>>', 'clusterId')
password
  • Password that can be used to authenticate (using SASL_SCRAM_256_AUTH protocol).

  • Usage: $ResourceParam('<<resource-name>>', 'password')
username
  • Username that can be used to authenticate (using SASL_SCRAM_256_AUTH protocol).

  • Usage: $ResourceParam('<<resource-name>>', 'username')
tcpEndpoint
  • Standard endpoint of the Kafka cluster in which the topic is created.

  • Usage: $ResourceParam('<<resource-name>>', 'tcpEndpoint')
restUrl
  • Rest URL for communicating with the cluster using REST API.

  • Usage: $ResourceParam('<<resource-name>>', 'restUrl')

Need help? Ask a question on Discord or info@stacktape.com.