Avro
Apache Avro is a language-neutral data serialization system. The project was created by Doug Cutting to address the major downside of Hadoop
Avro data is described using a language-independent schema(模式). Schemas are usually written in JSON, and data is usually encoded using a binary format.
Geting start¶
This section based on Apache Avro™ Getting Started (Java).
Defining a schema¶
Avro schemas are defined using JSON. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).
Let's start with a simple schema example, user.avsc
:
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
This schema defines a record representing a hypothetical user. (Note that a schema file can only contain a single schema definition.) At minimum, a record definition must include its type, a name, and fields. Fields are defined via an array of objects, each of which defines a name and type; The type attribute of a field is another schema object, which can be either a primitive or complex type.
Serializing and deserializing¶
Let's create some users, serialize them to a data file on disk, and then read back the file and deserialize the users objects.
First, we use a
Schema schema = new Schema.Parser().parse(new File("user.avsc"));
Using this schema, let's create some users.
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
// Leave favorite color null
GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
We use
Note that we do not set user1's favorite color. Since that record is of type ["string", "null"], we can either set it to a string or leave it null; it is essentially optional.
Now that we've created our user objects, we use generic readers and writers to serialize and deserialize them.
First we'll serialize our users to a data file on disk.
// Serialize user1 and user2 to disk
File file = new File("users.avro");
DatumWriter<GenericRecord> datumWriter =
new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.close();
We create a
We also create a
Finally, we'll deserialize the data file we just created.
// Deserialize users from disk
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
GenericRecord user = null;
while (dataFileReader.hasNext()) {
// Reuse user object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
user = dataFileReader.next(user);
System.out.println(user);
}
This outputs:
{"name": "Alyssa", "favorite_number": 256, "favorite_color": null}
{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
Deserializing is very similar to serializing. We create a
Next, we use the for (GenericRecord user : dataFileReader)
if performance is not a concern.
Sort Order¶
Avro defines a sort order for objects. All types except
For example, the following schema (SortedStringPair.avsc
) defines an ordering of
{
"type": "record",
"name": "StringPair",
"doc": "A pair of strings, sorted by right field descending.",
"fields": [
{"name": "left", "type": "string", "order": "ignore"},
{"name": "right", "type": "string", "order": "descending"} ]
}
Avro implements efficient binary comparisons. That is to say, Avro does not have to deserialize binary data into objects to perform the comparison, because it can instead work directly on the byte streams. Avro provides the comparator for us.
Avro MapReduce¶
Avro provides a number of classes for making it easy to run MapReduce programs on Avro data.
Let’s rework the MapReduce program for finding the maximum temperature for each year in the weather dataset, this time using the Avro MapReduce API. We will represent weather records using the following schema:
{
"type": "record",
"name": "WeatherRecord",
"doc": "A weather reading.",
"fields": [
{"name": "year", "type": "int"},
{"name": "temperature", "type": "int"},
{"name": "stationId", "type": "string"} ]
}
There are a couple of differences from the regular Hadoop MapReduce API. The first is the use of wrappers around Avro Java types. The second major difference from regular MapReduce is the use of
Avro Mapreduce
//vv AvroGenericMaxTemperature
public class AvroGenericMaxTemperature extends Configured implements Tool {
private static final Schema SCHEMA = new Schema.Parser().parse(
"{" +
" \"type\": \"record\"," +
" \"name\": \"WeatherRecord\"," +
" \"doc\": \"A weather reading.\"," +
" \"fields\": [" +
" {\"name\": \"year\", \"type\": \"int\"}," +
" {\"name\": \"temperature\", \"type\": \"int\"}," +
" {\"name\": \"stationId\", \"type\": \"string\"}" +
" ]" +
"}"
);
public static class MaxTemperatureMapper
extends Mapper<LongWritable, Text, AvroKey<Integer>,
AvroValue<GenericRecord>> {
private NcdcRecordParser parser = new NcdcRecordParser();
private GenericRecord record = new GenericData.Record(SCHEMA);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value.toString());
if (parser.isValidTemperature()) {
record.put("year", parser.getYearInt());
record.put("temperature", parser.getAirTemperature());
record.put("stationId", parser.getStationId());
context.write(new AvroKey<Integer>(parser.getYearInt()),
new AvroValue<GenericRecord>(record));
}
}
}
public static class MaxTemperatureReducer
extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,
AvroKey<GenericRecord>, NullWritable> {
@Override
protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>
values, Context context) throws IOException, InterruptedException {
GenericRecord max = null;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
if (max == null ||
(Integer) record.get("temperature") > (Integer) max.get("temperature")) {
max = newWeatherRecord(record);
}
}
context.write(new AvroKey(max), NullWritable.get());
}
private GenericRecord newWeatherRecord(GenericRecord value) {
GenericRecord record = new GenericData.Record(SCHEMA);
record.put("year", value.get("year"));
record.put("temperature", value.get("temperature"));
record.put("stationId", value.get("stationId"));
return record;
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
job.getConfiguration().setBoolean(
Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setMapOutputValueSchema(job, SCHEMA);
AvroJob.setOutputKeySchema(job, SCHEMA);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
System.exit(exitCode);
}
}
Sorting Using Avro MapReduce¶
To sort an Avro datafile, it is simple. The mapper simply emits the input key wrapped in an
The sorting happens in the MapReduce shuffle, and the sort function is determined by the Avro schema that is passed to the program. See Chapter 7, section Shuffle and Sort for details.