Working with Schema Registry and Avro in Apache Kafka
In a distributed system powered by Apache Kafka, microservices communicate by publishing and consuming events. As these systems grow, a major challenge emerges: Data Governance. What happens if a producer changes the structure of a message (e.g., renaming a database field or deleting a crucial parameter) without notifying the consumers? The consumers will crash, leading to downstream failures.
To solve this, we need a formal contract between producers and consumers. This is where Apache Avro and the Confluent Schema Registry come into play. Together, they ensure that data remains consistent, structured, and backward-compatible over time.
Why Do We Need Schema Management?
By default, Kafka treats all messages as raw byte arrays. It does not inspect the payload or validate its structure. If a producer sends a corrupted JSON message or changes a field type from an Integer to a String, Kafka will happily write it to the log. The consumer, expecting an Integer, will fail at runtime.
Using Schema Registry and Avro provides several key advantages:
- Strict Data Contracts: Producers and consumers must adhere to a predefined schema.
- Payload Optimization: Avro is a binary serialization format. Unlike JSON or XML, it does not send field names with every message. This significantly reduces network bandwidth and storage costs.
- Schema Evolution: You can safely update schemas over time (e.g., adding new fields) without breaking existing consumers.
Understanding the Architecture
The Confluent Schema Registry runs as a separate service outside the Kafka brokers. It serves as a centralized repository for storing and managing schemas.
+------------------+ Registers / Fetches +-----------------+
| | <----------------------------------------> | |
| Kafka Producer | | Schema Registry |
| | ------ Sends Binary Data (with Schema ID) ->| |
+------------------+ +-----------------+
| ^
| Writes to Topic |
v | Fetches Schema
+------------------+ | by ID
| Kafka Broker | |
+------------------+ |
| |
| Reads from Topic |
v |
+------------------+ |
| Kafka Consumer | <----------------------------------------------------+
| |
+------------------+
Here is how the end-to-end workflow operates:
- Publishing: When a producer sends an Avro record, the Avro Serializer contacts the Schema Registry. It checks if the schema is already registered. If not, it registers it and receives a unique Schema ID.
- Writing: The producer prepends this 4-byte Schema ID to the serialized binary payload and writes it to the Kafka broker.
- Consuming: When the consumer reads the message, the Avro Deserializer extracts the Schema ID from the payload, fetches the corresponding schema from the Schema Registry, and deserializes the binary data back into a Java object.
Defining an Avro Schema
Avro schemas are written in JSON format with a .avsc file extension. Let us define a schema for a user signup event. Save this file as user-signup.avsc:
{
"type": "record",
"name": "UserSignup",
"namespace": "com.example.kafka.avro",
"fields": [
{
"name": "userId",
"type": "string"
},
{
"name": "emailAddress",
"type": "string"
},
{
"name": "signupTimestamp",
"type": "long"
},
{
"name": "referralCode",
"type": ["null", "string"],
"default": null
}
]
}
In this schema, we define a record with four fields. Notice the referralCode field: it uses a union type ["null", "string"] with a default value of null, making it an optional field. This is crucial for maintaining schema compatibility.
Setting Up Your Java Project
To work with Avro in Java, you need to add the appropriate dependencies to your build file. For Maven, include the following in your pom.xml:
<dependencies>
<!-- Apache Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!-- Confluent Avro Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.3.0</version>
</dependency>
<!-- Apache Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
</dependencies>
You should also configure the avro-maven-plugin to automatically generate Java classes from your .avsc schema files during the build process.
Writing a Java Producer with Avro
Once your Maven build runs and generates the UserSignup Java class, you can write a producer to publish Avro-serialized messages to Kafka.
package com.example.kafka.producer;
import com.example.kafka.avro.UserSignup;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class AvroProducerApp {
public static void main(String[] args) {
Properties config = new Properties();
// Standard Kafka bootstrap settings
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Configure KafkaAvroSerializer for values
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Point to the running Schema Registry instance
config.put("schema.registry.url", "http://localhost:8081");
KafkaProducer<String, UserSignup> producer = new KafkaProducer<>(config);
// Instantiating the generated Avro class
UserSignup userEvent = UserSignup.newBuilder()
.setUserId("usr-9923")
.setEmailAddress("developer@example.com")
.setSignupTimestamp(System.currentTimeMillis())
.setReferralCode("WELCOME2023")
.build();
ProducerRecord<String, UserSignup> record = new ProducerRecord<>(
"user-signups",
userEvent.getUserId(),
userEvent
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Successfully sent message to partition: " + metadata.partition());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
}
}
Notice that we set VALUE_SERIALIZER_CLASS_CONFIG to KafkaAvroSerializer and provided the schema.registry.url configuration property. The producer handles schema registration behind the scenes.
Writing a Java Consumer with Avro
Reading Avro data requires configuring the consumer with the KafkaAvroDeserializer and enabling specific Avro reader configurations.
package com.example.kafka.consumer;
import com.example.kafka.avro.UserSignup;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumerApp {
public static void main(String[] args) {
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "email-service-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Configure KafkaAvroDeserializer for values
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
config.put("schema.registry.url", "http://localhost:8081");
// Crucial: Instructs the deserializer to return the specific generated class (UserSignup)
// instead of a generic GenericRecord
config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
KafkaConsumer<String, UserSignup> consumer = new KafkaConsumer<>(config);
consumer.subscribe(Collections.singletonList("user-signups"));
System.out.println("Waiting for events...");
while (true) {
ConsumerRecords<String, UserSignup> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserSignup> record : records) {
UserSignup user = record.value();
System.out.println("Processing user signup: " + user.getEmailAddress());
System.out.println("Referral Code used: " + user.getReferralCode());
}
}
}
}
The setting SPECIFIC_AVRO_READER_CONFIG set to true is essential. Without it, the consumer will return a GenericRecord instance rather than casting it directly to your compiled class UserSignup.
Schema Evolution and Compatibility Rules
As business requirements change, schemas must evolve. Schema Registry supports several compatibility modes to ensure updates do not disrupt production systems:
- BACKWARD (Default): Consumers using the new schema can read data written with the old schema. This allows you to upgrade consumers first. Rule: You can delete fields or add optional fields (fields with default values).
- FORWARD: Consumers using the old schema can read data written with the new schema. This allows you to upgrade producers first. Rule: You can add new fields or delete optional fields.
- FULL: The schema is both backward and forward compatible. You can upgrade producers and consumers in any order. Rule: You can only add or remove optional fields.
- NONE: No compatibility checks are performed. This is not recommended for production environments.
Real-World Use Cases
Schema Registry and Avro are standard components in enterprise-grade event-driven architectures:
- Financial Transactions: Financial institutions use strictly validated schemas to ensure payment instructions, ledger entries, and audit logs conform to regulatory standards.
- Data Lake Ingestion: When streaming data from Kafka to cloud storage (like Amazon S3 or Google Cloud Storage) or data warehouses (like Snowflake), Avro schemas make it easy to dynamically build and update table structures.
- Microservices Coordination: Decoupled teams can modify their services independently, confident that the Schema Registry will block any breaking schema changes before they reach production.
Common Mistakes and How to Avoid Them
- Forgetting Default Values: When adding a new field to a schema, always specify a default value. If you do not, your change will violate backward compatibility, and the Schema Registry will reject it.
- Missing Specific Reader Config: Forgetting to set
specific.avro.reader=truein the consumer configuration. This is the most common reason developers encounterClassCastExceptionerrors when trying to read Avro payloads in Java. - Schema Registry Downtime: If your Schema Registry goes offline, producers cannot register new schemas, and consumers cannot fetch unknown schemas. Ensure high availability for your Schema Registry instances in production.
Interview Notes
- How does Schema Registry prevent payload bloat in Kafka? It stores the schema definitions centrally. Instead of attaching the entire schema JSON to every Kafka message (which happens with formats like JSON), the producer only attaches a 4-byte Schema ID. This keeps the message size minimal.
- What is the difference between GenericRecord and SpecificRecord?
SpecificRecordrefers to classes generated from your schema file by the Avro compiler (e.g.,UserSignupclass).GenericRecordis a generic key-value map used when you do not have the pre-compiled Java classes available at runtime. - Where is the schema metadata actually stored? Confluent Schema Registry stores all schemas in a special, compacted internal Kafka topic named
_schemas.
Summary
Integrating Schema Registry and Apache Avro into your Kafka pipeline establishes robust data governance. It ensures that producers and consumers can communicate safely through strict, versioned contracts. By utilizing binary serialization, you reduce network overhead and storage footprints while enabling seamless schema evolution. Always remember to configure your consumer for specific readers and define default values for your schema fields to ensure smooth compatibility transitions.