一、简述
ORC File,它的全名是Optimized Row Columnar (ORC) file。官网: https://orc.apache.org。
ORC是列式存储格式,为了降低存储空间和加速查询速度。主要特点:
1> 列式存储,多种文件压缩方式,高压缩比
2> 文件是可切分(Split)节省HDFS存储,查询任务的输入数据量减少,使用的MapTask也就减少了
3> 提供了多种索引,row group index、bloom filter index
4> 可以支持复杂的数据结构(比如Map等)
二、列式存储
OLAP特点,列式存储可以提升其性能。对比关系型数据库中的行式存储,列式存储每一列的所有元素都是顺序存储的。由此有以下优化:
1> 不需要全量扫描,只需涉及的列,将I/O降低了N倍。可以保存每一列的统计信息(min、max、sum等),实现部分的谓词下推
2> 每一列成员同构,可以有针对的使用压缩算法,进一步减少I/O
3> 同构性,使用更加合适CPU pipeline的编码方式,减少CPU缓存失效
三、文件结构
ORC文件以二进制方式存储,是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。ORC涉及到如下的概念:
1> ORC文件:普通二进制文件,一个ORC文件中可以包含多个stripe,每一个stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的row group的概念。
2> 文件级元数据:包括文件的描述信息PostScript、文件meta信息(包括整个文件的统计信息)、所有stripe的信息和文件schema信息。
3> stripe:一组行形成一个stripe,每次读取文件是以行组为单位的,一般为HDFS的块大小,保存了每一列的索引和数据。
4> stripe元数据:保存stripe的位置、每一个列的在该stripe的统计信息以及所有的stream类型和位置
5> row group:索引的最小单位,一个stripe中包含多个row group,默认为10000个值组成。
6> stream:一个stream表示文件中一段有效的数据,包括索引和数据两类。索引stream保存每一个row group的位置和统计信息,数据stream包括多种类型的数据,
具体需要哪几种是由该列类型和编码方式决定。
在ORC文件中保存了三个层级的统计信息,分别为文件级别、stripe级别和row group级别的,他们都可以用来根据Search ARGuments(谓词下推条件)
判断是否可以跳过某些数据,在统计信息中都包含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。
1> file level
在ORC文件的末尾会记录文件级别的统计信息,会记录整个文件中columns的统计信息。这些信息主要用于查询的优化,也可以为一些简单的聚合查询比如max, min, sum输出结果。
2> stripe level
ORC文件会保存每个字段stripe级别的统计信息,ORC reader使用这些统计信息来确定对于一个查询语句来说,需要读入哪些stripe中的记录。
比如说某个stripe的字段max(a)=10,min(a)=3,那么当where条件为a >10或者a <3时,那么这个stripe中的所有记录在查询语句执行时不会被读入。
3> row level
为了进一步的避免读入不必要的数据,在逻辑上将一个column的index以一个给定的值(默认为10000,可由参数配置)分割为多个index组。
以10000条记录为一个组,对数据进行统计。Hive查询引擎会将where条件中的约束传递给ORC reader,这些reader根据组级别的统计信息,过滤掉不必要的数据。
如果该值设置的太小,就会保存更多的统计信息,用户需要根据自己数据的特点权衡一个合理的值。
四、Go语言实现
目前有一个比较完整的开源实现: https://github.com/scritchley/orc, 已经很久没有更新了,估计是没有维护了。这个开源库在索引处理上存在严重的BUG,导致大数据系统无法正确的读取, 之前一个Go项目涉及到大数据ORC文件的读写,为此修复了这个BUG。
1> 读取示例
定义表结构:
struct<string1:string,timestamp1:timestamp,int1:int,boolean1:boolean,double1:double,nested:struct<double2:double,nested:struct<int2:int>>>
示例代码:
func TestWriter(t *testing.T) {
f, err := ioutil.TempFile("/w/tmp/orc", "testorc")
if err != nil {
t.Fatal(err)
}
filename := f.Name()
defer func() {
_ = f.Close()
//defer os.Remove(filename)
}()
schema, err := ParseSchema("struct<string1:string,timestamp1:timestamp,int1:int,boolean1:boolean,double1:double,nested:struct<double2:double,nested:struct<int2:int>>>")
if err != nil {
t.Fatal(err)
}
w, err := NewWriter(f, SetSchema(schema))
if err != nil {
t.Fatal(err)
}
now := time.Unix(1478123411, 99).UTC()
timeIncrease := 5*time.Second + 10001*time.Nanosecond
length := 1000
var intSum int64
for i := 0; i < length; i++ {
string1 := fmt.Sprintf("%x", rand.Int63n(1000))
timestamp1 := now.Add(time.Duration(i) * timeIncrease)
int1 := rand.Int63n(10000)
intSum += int1
boolean1 := int1 > 4444
double1 := rand.Float64()
nested := []interface{}{
rand.Float64(),
[]interface{}{
rand.Int63n(10000),
},
}
err = w.Write(string1, timestamp1, int1, boolean1, double1, nested)
if err != nil {
t.Fatal(err)
}
}
err = w.Close()
if err != nil {
t.Fatal(err)
}
fmt.Printf("Write file: %s len: %d intSum: %d\n", filename, length, intSum)
}
输出结果:
> Write file: /tmp/orc/testorc3535413322 len: 1000 intSum: 4990095
2> 读取文件:
读取列 [ "int1", "timestamp1" ]
示例代码:
func TestReader(t *testing.T) {
filename := "/w/tmp/orc/testorc3535413322"
r, err := Open(filename)
if err != nil {
t.Fatal(err)
}
fmt.Printf("Total Rows: %d \n", r.NumRows())
var compareIntSum int64
var previousTimestamp time.Time
var now = time.Unix(1478123411, 99).UTC()
var timeIncrease = 5*time.Second + 10001*time.Nanosecond
c := r.Select("int1", "timestamp1")
row := 0
for c.Stripes() {
for c.Next() {
compareIntSum += c.Row()[0].(int64)
timestamp, ok := c.Row()[1].(time.Time)
if !ok {
t.Fatalf("Row %d: Expected a time.Time but got %T", row, c.Row()[1])
}
if row == 0 {
if timestamp != now {
t.Fatalf("Row %d: Expected a timestamp %s got %s. Difference: %s", row, now, timestamp, now.Sub(timestamp))
}
} else {
d := timestamp.Sub(previousTimestamp)
if d != timeIncrease {
t.Fatalf("Row %d: Expected a time increase of %s but got %s", row, timeIncrease, d)
}
}
previousTimestamp = timestamp
row++
fmt.Println(ToJson(c.Row()))
}
}
if err = c.Err(); err != nil && err != io.EOF {
t.Fatal(err)
}
}
输出结果:
Total Rows: 1000
[8528,"2016-11-02T21:50:11.000000099Z"]
[6749,"2016-11-02T21:50:16.0000101Z"]
[2937,"2016-11-02T21:50:21.000020101Z"]
[7744,"2016-11-02T21:50:26.000030102Z"]
[6762,"2016-11-02T21:50:31.000040103Z"]
[5589,"2016-11-02T21:50:36.000050104Z"]
[8851,"2016-11-02T21:50:41.000060105Z"]
[5159,"2016-11-02T21:50:46.000070106Z"]
[3010,"2016-11-02T21:50:51.000080107Z"]
[2515,"2016-11-02T21:50:56.000090108Z"]
[7865,"2016-11-02T21:51:01.000100109Z"]
... 略
3> 打印统计数据
示例代码:
func TestDescribe(t *testing.T) {
srcFile := "/w/tmp/orc/testorc3535413322"
fmt.Printf("开始读取: %s \n", srcFile)
r, err := Open(srcFile)
if err != nil {
t.Fatal(err)
}
fmt.Printf("字段信息: \n")
fmt.Printf("\t fields: %s \n", ToJson(r.schema.fieldNames))
num, _ := r.NumStripes()
fmt.Printf("\t numStripes: %d \n", num)
stripes, err := r.getStripes()
if err != nil {
t.Fatal(err)
}
fmt.Printf("StripeInfo\n")
for i, item := range stripes {
fmt.Printf("\t stripe: %d 统计数据: %s \n", i, item.String())
}
fmt.Printf("单个Stripe统计数据\n")
for i := 0; i < num; i++ {
stripe, err := r.getStripe(i, 0, 1)
if err != nil {
t.Fatal(err)
}
fmt.Printf("\t num: %d info: %s \n", i, stripe.String())
fmt.Printf("\t info2: %s \n", stripe.StripeInformation.String())
stripe.print()
}
fmt.Printf("Column统计数据\n")
for _, item := range r.metadata.StripeStats {
for _, colStat := range item.ColStats {
fmt.Printf("\t 统计数据: %s \n", colStat.String())
}
}
fmt.Printf("Footer统计数据:\n")
for _, item := range r.footer.Stripes {
fmt.Printf("\t %s \n", item.String())
}
}
输出结果:
开始读取: /w/tmp/orc/testorc3535413322
字段信息:
fields: ["string1","timestamp1","int1","boolean1","double1","nested"]
numStripes: 1
StripeInfo
stripe: 0 统计数据: offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000
单个Stripe统计数据
num: 0 info: offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000
info2: offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000
name: {0 6} rowIndex: {"entry":[{"statistics":{"numberOfValues":1000,"hasNull":false}}]}
name: {1 6} rowIndex: {"entry":[{"positions":[0,0,0],"statistics":{"numberOfValues":1000,"stringStatistics":{"minimum":"0","maximum":"ff","sum":2738},"hasNull":false}}]}
Column统计数据
统计数据: numberOfValues:1000 hasNull:false
统计数据: numberOfValues:1000 stringStatistics:<minimum:"0" maximum:"ff" sum:2738 > hasNull:false
统计数据: numberOfValues:1000 timestampStatistics:<minimum:1478123411 maximum:1478128406 minimumUtc:1478123411 maximumUtc:1478128406 > hasNull:false
统计数据: numberOfValues:1000 intStatistics:<minimum:2 maximum:9997 sum:4990095 > hasNull:false
统计数据: numberOfValues:1000 bucketStatistics:<> hasNull:false
统计数据: numberOfValues:1000 hasNull:false
统计数据: numberOfValues:1000 hasNull:false
统计数据: numberOfValues:1000 hasNull:false
统计数据: numberOfValues:1000 hasNull:false
统计数据: numberOfValues:1000 intStatistics:<minimum:8 maximum:9934 sum:4902367 > hasNull:false
Footer统计数据:
offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000
参考说明:
1> 部分ORC描述参考: https://www.cnblogs.com/yylsjlove/p/14225875.html
2> 代码为个人实践总结
本文暂时没有评论,来添加一个吧(●'◡'●)