Kafka library

Index:

Functions:

kafka-consumer

kafka-consumer

Synopsis
Creates and registers locally a Kafka consumer object
Usage
(kafka-consumer
  servers
  "key-deserializer" => key_deserializer
  "value-deserializer" => value_deserializer
  "group-id" => group_id
)
Returns
consumer_id
Where
  • servers is list: String URL of the Kafka server, or list of strings of some of the servers of the cluster.
  • key_deserializer is string[0..1]: Class name of the message value deserializer.
  • value_deserializer is string[0..1]: Class name of the message key deserializer.
  • group_id is string[0..1]: Name of the consumer group to join. Useful to distribute among different consumers the processing workload of messages from a given topic. If you want to receive all messages from the subscribed topics at this consumer, leave it blank to join a random, unique consumer group.
  • consumer_id is string: The application id to be used with kafka-poll and kafka-delete-app.
Description
Instantiates a Kafka consumer client wihout communicating to the Kafka server.
Examples
(kafka-consumer "localhost:9092")
"c1a3de99eb26446d0965a044ac89632f8"
(kafka-consumer
  ("10.0.0.1.90:9092" "10.0.0.2.90:9092")
)
"c1a3de99eb26446d0965a044ac89632f8"
(kafka-consumer
  "localhost:9092"
  "value-deserializer" => "DoubleDeserializer"
)
"c1a3de99eb26446d0965a044ac89632f8"
(kafka-consumer
  "localhost:9092"
  "group-id" => "pollution-data-workers"
)
"c1a3de99eb26446d0965a044ac89632f8"
kafka-create-topics

kafka-create-topics

Synopsis
Creates topics to send to or receive messages from
Usage
(kafka-create-topics servers topics)
Returns
result
Where
  • servers is list: String URL of the Kafka server, or list of strings of some of the servers of the cluster.
  • topics is list: Name of the topic or topics to create. String or list of strings.
  • result is list: named list showing whether a topic was created (true), was already present (false), or something else (false).
Description
Creates, if not existing, the topics provided by topics in the Kafka cluster reachable through servers. Partition number and replication factor is set to 1, values adapted to a single-broker cluster.
Examples
(kafka-create-topics
  "localhost:9092"
  ("traffic" "pollution")
)
("pollution" => true "traffic" => true)
(kafka-create-topics
  ("10.0.0.1.90:9092" "10.0.0.2.90:9092")
  "pollution"
)
("pollution" => false)
kafka-delete-app

kafka-delete-app

Synopsis
Closes resources and forgets a consumer or producer
Usage
(kafka-delete-app app_id)
Returns
deleted
Where
  • app_id is string: The application id provided by kafka-producer or kafka-consumer.
  • deleted is boolean: True when no errors were found during deletion. False if some exception prevented finishing the operation.
Description
If the app is a consumer, it will unsubscribe it from any subscribed topic. Using an app id after having deleted id raises an error.
Examples
(kafka-delete-app
  "pbf8114e907f045a093f53abce9564553"
)
true
kafka-delete-topics

kafka-delete-topics

Synopsis
Delete topics from the kafka cluster
Usage
(kafka-delete-topics servers topics)
Returns
result
Where
  • servers is list: String URL of the Kafka server, or list of strings ofsome of the servers of the cluster.
  • topics is list: Name of the topic or topics to delete. String or list of strings.
  • result is list: named list showing whether a topic was deleted (true), didn't already exist (false), or some error ocurred (false).
Description
Deletes, if existing, the topics provided by topics, from the Kafka cluster reachable through servers.
Examples
(kafka-delete-topics
  "localhost:9092"
  ("traffic" "pollution")
)
("pollution" => true "traffic" => true)
(kafka-delete-topics
  ("10.0.0.1.90:9092" "10.0.0.2.90:9092")
  "pollution"
)
("pollution" => false)
kafka-list-topics

kafka-list-topics

Synopsis
Lists all topics in the Kafka cluster
Usage
(kafka-list-topics servers)
Returns
topics
Where
  • servers is list: String URL of the Kafka server, or list of strings ofsome of the servers of the cluster.
  • topics is list: List of string names of all current topics.
Description
Lists all existing topics in the Kafka cluster. Useful to double-check that a topic was deleted or created, to get an overview of which applications are running, for instance.
Examples
(kafka-list-topics "localhost:9092")
("pollution" "traffic")
(kafka-list-topics
  ("10.0.0.1.90:9092" "10.0.0.2.90:9092")
)
("pollution")
kafka-poll

kafka-poll

Synopsis
Receives messages through a consumer
Usage
(kafka-poll consumer_id topics timeout_ms)
Returns
messages
Where
  • consumer_id is string: Identifier of the consumer to use, provided by (kafka-consumer)
  • timeout_ms is int: Time to wait without receiving any message before ending the execution of the function.
  • messages is list: List of elements named by each subscribed topic, where each value is a sublist of messages received from that topic.
Description
Subscribes the consumer to specified topics and does a long poll to the server of duration timeout.
Examples
(kafka-poll
  "c1a3de99eb26446d0965a044ac89632f8"
  2000
)
(
  "temperature" =>
  (
    "{\"ts\": 1614605655, \"id\": \"town_center_temp\", \"value\": 16}"
  )
  "traffic" => ()
)
kafka-producer

kafka-producer

Synopsis
Creates and registers locally a Kafka producer object
Usage
(kafka-producer
  servers
  "key-serializer" => key_serializer
  "value-serializer" => value_serializer
)
Returns
producer_id
Where
  • servers is list: String URL of the Kafka server, or list of strings ofsome of the servers of the cluster.
  • key_serializer is string[0..1]: Class name of the message key serializer.
  • value_serializer is string[0..1]: Class name of the message value serializer.
  • producer_id is string: The application id to be used with kafka-send and kafka-delete-app.
Description
Instantiates a Kafka producer client wihout communicating to the Kafka server.
Examples
(kafka-producer "localhost:9092")
"pbf8114e907f045a093f53abce9564553"
(kafka-producer
  ("10.0.0.1.90:9092" "10.0.0.2.90:9092")
)
"pbf8114e907f045a093f53abce9564553"
(kafka-producer
  "localhost:9092"
  "value-serializer" => "DoubleSerializer"
)
"pbf8114e907f045a093f53abce9564553"
(kafka-producer "localhost:9092")
"pbf8114e907f045a093f53abce9564553"
kafka-send

kafka-send

Synopsis
Send messages to topics through a producer
Usage
(kafka-send producer_id topics messages)
Returns
null
Where
  • producer_id is string: Identifier of the producer to use, provided by (kafka-producer)
  • topics is list: String name of the topic, or List of topics to send the messages to. All topics will receive the same message/s.
  • messages is list: Object acording to producer's value-serializer, or List of Objects. If a list element is named, this name is used as Kafka's message key, which is left unset or null otherwise. The message value is always the list element.
Description
Sends all given messages to all given topics, using the producer identified by producer-id
Examples
(kafka-send
  "pbf8114e907f045a093f53abce9564553"
  "sensors"
  "town_square_temp=16"
)
null
(kafka-send
  prod-id
  ("sensors" "temperature")
  "town_square_temp=16"
)
null
(kafka-send
  prod-id
  "sensors"
  ("town_square_temp=16" "train_sta_temp=18")
)
null
(kafka-send
  prod-id
  "sensors"
  ("town_square" => "town_square_temp=16")
)
null
kafka-subscribe

kafka-subscribe

Synopsis
Reset consumer subscriptions to the given topics
Usage
(kafka-subscribe consumer_id desired-topics)
Returns
subscribed-topics
Where
  • consumer_id is string: Identifier of the consumer to use, provided by kafka-consumer
  • desired-topics is list: String name of the topic, or List of topics to subscribe the consumer to. An empty list () unsubscribes from all current subscribed topics.
  • subscribed-topics is list
Description
Replace the current subscriptions for the given consumer with the provided topic list.
Examples
(kafka-subscribe
  "c1a3de99eb26446d0965a044ac89632f8"
  "pollution"
)
("pollution")
(kafka-subscribe
  "c1a3de99eb26446d0965a044ac89632f8"
  ("pollution" "traffic")
)
("pollution" "traffic")
(kafka-subscribe
  "c1a3de99eb26446d0965a044ac89632f8"
  ()
)
()
Top