专业的编程技术博客社区

网站首页 > 博客文章 正文

大数据存储之ORC格式文件及Go实现

baijin 2024-08-23 10:56:35 博客文章 7 ℃ 0 评论

一、简述

ORC File,它的全名是Optimized Row Columnar (ORC) file。官网: https://orc.apache.org。

ORC是列式存储格式,为了降低存储空间和加速查询速度。主要特点:

1> 列式存储,多种文件压缩方式,高压缩比

2> 文件是可切分(Split)节省HDFS存储,查询任务的输入数据量减少,使用的MapTask也就减少了

3> 提供了多种索引,row group index、bloom filter index

4> 可以支持复杂的数据结构(比如Map等)

二、列式存储

OLAP特点,列式存储可以提升其性能。对比关系型数据库中的行式存储,列式存储每一列的所有元素都是顺序存储的。由此有以下优化:

1> 不需要全量扫描,只需涉及的列,将I/O降低了N倍。可以保存每一列的统计信息(min、max、sum等),实现部分的谓词下推
2> 每一列成员同构,可以有针对的使用压缩算法,进一步减少I/O
3> 同构性,使用更加合适CPU pipeline的编码方式,减少CPU缓存失效

三、文件结构

ORC文件以二进制方式存储,是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。ORC涉及到如下的概念:
1> ORC文件:普通二进制文件,一个ORC文件中可以包含多个stripe,每一个stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的row group的概念。
2> 文件级元数据:包括文件的描述信息PostScript、文件meta信息(包括整个文件的统计信息)、所有stripe的信息和文件schema信息。
3> stripe:一组行形成一个stripe,每次读取文件是以行组为单位的,一般为HDFS的块大小,保存了每一列的索引和数据。
4> stripe元数据:保存stripe的位置、每一个列的在该stripe的统计信息以及所有的stream类型和位置
5> row group:索引的最小单位,一个stripe中包含多个row group,默认为10000个值组成。
6> stream:一个stream表示文件中一段有效的数据,包括索引和数据两类。索引stream保存每一个row group的位置和统计信息,数据stream包括多种类型的数据,
具体需要哪几种是由该列类型和编码方式决定。

在ORC文件中保存了三个层级的统计信息,分别为文件级别、stripe级别和row group级别的,他们都可以用来根据Search ARGuments(谓词下推条件)
判断是否可以跳过某些数据,在统计信息中都包含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。

1> file level
在ORC文件的末尾会记录文件级别的统计信息,会记录整个文件中columns的统计信息。这些信息主要用于查询的优化,也可以为一些简单的聚合查询比如max, min, sum输出结果。
2> stripe level
ORC文件会保存每个字段stripe级别的统计信息,ORC reader使用这些统计信息来确定对于一个查询语句来说,需要读入哪些stripe中的记录。
比如说某个stripe的字段max(a)=10,min(a)=3,那么当where条件为a >10或者a <3时,那么这个stripe中的所有记录在查询语句执行时不会被读入。
3> row level
为了进一步的避免读入不必要的数据,在逻辑上将一个column的index以一个给定的值(默认为10000,可由参数配置)分割为多个index组。
以10000条记录为一个组,对数据进行统计。Hive查询引擎会将where条件中的约束传递给ORC reader,这些reader根据组级别的统计信息,过滤掉不必要的数据。
如果该值设置的太小,就会保存更多的统计信息,用户需要根据自己数据的特点权衡一个合理的值。

四、Go语言实现

目前有一个比较完整的开源实现: https://github.com/scritchley/orc, 已经很久没有更新了,估计是没有维护了。这个开源库在索引处理上存在严重的BUG,导致大数据系统无法正确的读取, 之前一个Go项目涉及到大数据ORC文件的读写,为此修复了这个BUG。

1> 读取示例

定义表结构:

struct<string1:string,timestamp1:timestamp,int1:int,boolean1:boolean,double1:double,nested:struct<double2:double,nested:struct<int2:int>>>

示例代码:

func TestWriter(t *testing.T) {
	f, err := ioutil.TempFile("/w/tmp/orc", "testorc")
	if err != nil {
		t.Fatal(err)
	}
	filename := f.Name()
	defer func() {
		_ = f.Close()
		//defer os.Remove(filename)
	}()

	schema, err := ParseSchema("struct<string1:string,timestamp1:timestamp,int1:int,boolean1:boolean,double1:double,nested:struct<double2:double,nested:struct<int2:int>>>")
	if err != nil {
		t.Fatal(err)
	}

	w, err := NewWriter(f, SetSchema(schema))
	if err != nil {
		t.Fatal(err)
	}

	now := time.Unix(1478123411, 99).UTC()
	timeIncrease := 5*time.Second + 10001*time.Nanosecond
	length := 1000
	var intSum int64
	for i := 0; i < length; i++ {
		string1 := fmt.Sprintf("%x", rand.Int63n(1000))
		timestamp1 := now.Add(time.Duration(i) * timeIncrease)
		int1 := rand.Int63n(10000)
		intSum += int1
		boolean1 := int1 > 4444
		double1 := rand.Float64()
		nested := []interface{}{
			rand.Float64(),
			[]interface{}{
				rand.Int63n(10000),
			},
		}
		err = w.Write(string1, timestamp1, int1, boolean1, double1, nested)
		if err != nil {
			t.Fatal(err)
		}
	}

	err = w.Close()
	if err != nil {
		t.Fatal(err)
	}
	fmt.Printf("Write file: %s len: %d intSum: %d\n", filename, length, intSum)
}

输出结果:

> Write file: /tmp/orc/testorc3535413322 len: 1000 intSum: 4990095


2> 读取文件:

读取列 [ "int1", "timestamp1" ]

示例代码:

func TestReader(t *testing.T) {
	filename := "/w/tmp/orc/testorc3535413322"
	r, err := Open(filename)
	if err != nil {
		t.Fatal(err)
	}

	fmt.Printf("Total Rows: %d \n", r.NumRows())

	var compareIntSum int64
	var previousTimestamp time.Time
	var now = time.Unix(1478123411, 99).UTC()
	var timeIncrease = 5*time.Second + 10001*time.Nanosecond
	c := r.Select("int1", "timestamp1")
	row := 0
	for c.Stripes() {
		for c.Next() {
			compareIntSum += c.Row()[0].(int64)
			timestamp, ok := c.Row()[1].(time.Time)
			if !ok {
				t.Fatalf("Row %d: Expected a time.Time but got %T", row, c.Row()[1])
			}
			if row == 0 {
				if timestamp != now {
					t.Fatalf("Row %d: Expected a timestamp %s got %s. Difference: %s", row, now, timestamp, now.Sub(timestamp))
				}
			} else {
				d := timestamp.Sub(previousTimestamp)
				if d != timeIncrease {
					t.Fatalf("Row %d: Expected a time increase of %s but got %s", row, timeIncrease, d)
				}
			}
			previousTimestamp = timestamp
			row++
			fmt.Println(ToJson(c.Row()))
		}
	}

	if err = c.Err(); err != nil && err != io.EOF {
		t.Fatal(err)
	}
}

输出结果:

Total Rows: 1000 
[8528,"2016-11-02T21:50:11.000000099Z"]
[6749,"2016-11-02T21:50:16.0000101Z"]
[2937,"2016-11-02T21:50:21.000020101Z"]
[7744,"2016-11-02T21:50:26.000030102Z"]
[6762,"2016-11-02T21:50:31.000040103Z"]
[5589,"2016-11-02T21:50:36.000050104Z"]
[8851,"2016-11-02T21:50:41.000060105Z"]
[5159,"2016-11-02T21:50:46.000070106Z"]
[3010,"2016-11-02T21:50:51.000080107Z"]
[2515,"2016-11-02T21:50:56.000090108Z"]
[7865,"2016-11-02T21:51:01.000100109Z"]
... 略

3> 打印统计数据

示例代码:

func TestDescribe(t *testing.T) {
	srcFile := "/w/tmp/orc/testorc3535413322"
	fmt.Printf("开始读取: %s \n", srcFile)
	r, err := Open(srcFile)
	if err != nil {
		t.Fatal(err)
	}
	fmt.Printf("字段信息: \n")
	fmt.Printf("\t fields: %s \n", ToJson(r.schema.fieldNames))
	num, _ := r.NumStripes()
	fmt.Printf("\t numStripes: %d \n", num)
	stripes, err := r.getStripes()
	if err != nil {
		t.Fatal(err)
	}
	fmt.Printf("StripeInfo\n")
	for i, item := range stripes {
		fmt.Printf("\t stripe: %d  统计数据: %s \n", i, item.String())
	}

	fmt.Printf("单个Stripe统计数据\n")
	for i := 0; i < num; i++ {
		stripe, err := r.getStripe(i, 0, 1)
		if err != nil {
			t.Fatal(err)
		}
		fmt.Printf("\t num: %d info: %s \n", i, stripe.String())
		fmt.Printf("\t info2: %s \n", stripe.StripeInformation.String())
		stripe.print()
	}

	fmt.Printf("Column统计数据\n")
	for _, item := range r.metadata.StripeStats {
		for _, colStat := range item.ColStats {
			fmt.Printf("\t 统计数据: %s \n", colStat.String())
		}
	}

	fmt.Printf("Footer统计数据:\n")
	for _, item := range r.footer.Stripes {
		fmt.Printf("\t %s \n", item.String())
	}
}

输出结果:

开始读取: /w/tmp/orc/testorc3535413322 
字段信息: 
	 fields: ["string1","timestamp1","int1","boolean1","double1","nested"] 
	 numStripes: 1 
StripeInfo
	 stripe: 0  统计数据: offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000  
单个Stripe统计数据
	 num: 0 info: offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000  
	 info2: offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000  
name: {0 6} rowIndex: {"entry":[{"statistics":{"numberOfValues":1000,"hasNull":false}}]} 
name: {1 6} rowIndex: {"entry":[{"positions":[0,0,0],"statistics":{"numberOfValues":1000,"stringStatistics":{"minimum":"0","maximum":"ff","sum":2738},"hasNull":false}}]} 
Column统计数据
	 统计数据: numberOfValues:1000 hasNull:false  
	 统计数据: numberOfValues:1000 stringStatistics:<minimum:"0" maximum:"ff" sum:2738 > hasNull:false  
	 统计数据: numberOfValues:1000 timestampStatistics:<minimum:1478123411 maximum:1478128406 minimumUtc:1478123411 maximumUtc:1478128406 > hasNull:false  
	 统计数据: numberOfValues:1000 intStatistics:<minimum:2 maximum:9997 sum:4990095 > hasNull:false  
	 统计数据: numberOfValues:1000 bucketStatistics:<> hasNull:false  
	 统计数据: numberOfValues:1000 hasNull:false  
	 统计数据: numberOfValues:1000 hasNull:false  
	 统计数据: numberOfValues:1000 hasNull:false  
	 统计数据: numberOfValues:1000 hasNull:false  
	 统计数据: numberOfValues:1000 intStatistics:<minimum:8 maximum:9934 sum:4902367 > hasNull:false  
Footer统计数据:
	 offset:3 indexLength:180 dataLength:27496 footerLength:204 numberOfRows:1000  


参考说明:

1> 部分ORC描述参考: https://www.cnblogs.com/yylsjlove/p/14225875.html

2> 代码为个人实践总结

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表