如何从Golang Apache Arrow中获取列数据?

huangapple go评论73阅读模式
英文:

How to get columns data from golang apache-arrow?

问题

我正在使用apache-arrow/go来读取parquet数据。

我可以使用apache-arrow将数据解析为表格。

	reader, err := ipc.NewReader(buf, ipc.WithAllocator(alloc))
	if err != nil {
		log.Println(err.Error())
		return nil
	}
	defer reader.Release()
	records := make([]array.Record, 0)
	for reader.Next() {
		rec := reader.Record()
		rec.Retain()
		defer rec.Release()
		records = append(records, rec)
	}
	table := array.NewTableFromRecords(reader.Schema(), records)

在这里,我可以通过table.Column(index)获取列信息,例如:

for i, _ := range table.Schema().Fields() {
	a := table.Column(i)
	log.Println(a)
}

但是Column结构被定义为:

type Column struct {
	field arrow.Field
	data  *Chunked
}

并且println的结果是:

["WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN"]

然而,这不是一个字符串或切片。有没有办法可以以字符串类型或[]interface{}的方式获取每列的数据?

更新:

我发现可以使用反射(reflect)从列(col)中获取元素。

log.Println(col.(*array.Int64).Value(0))

但我不确定这是否是推荐的使用方式。

英文:

I am using apache-arrow/go to read parquet data.

I can parse the data to table by using apach-arrow.

	reader, err := ipc.NewReader(buf, ipc.WithAllocator(alloc))
	if err != nil {
		log.Println(err.Error())
		return nil
	}
	defer reader.Release()
	records := make([]array.Record, 0)
	for reader.Next() {
		rec := reader.Record()
		rec.Retain()
		defer rec.Release()
		records = append(records, rec)
	}
	table := array.NewTableFromRecords(reader.Schema(), records)

Here, i can get the column info from table.Colunmn(index), such as:

for i, _ := range table.Schema().Fields() {
			a := table.Column(i)
			log.Println(a)
		}

But the Column struct is defined as

type Column struct {
	field arrow.Field
	data  *Chunked
}

and the println result is like

["WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN" "WARN"]

However, this is not a string or slice. Is there anyway that i can get the data of each column with string type or []interface{} ?

Update:

I find that i can use reflect to get the element from col.

log.Println(col.(*array.Int64).Value(0))

But i am not sure if this is the recommended way to use it.

答案1

得分: 2

在处理Arrow数据时,有几个概念需要理解:

Array:元数据 + 连续的数据缓冲区

Record Batch:模式 + 一组长度相同的Arrays

Chunked Array:一组长度不同但数据类型相同的Arrays。这使得你可以将多个Arrays视为单个数据列,而无需将它们全部复制到连续的缓冲区中。

Column:由字段(Field)和Chunked Array组成

Table:由多个Columns组成,可以将多个非连续的数组视为一个大表,而无需将它们全部复制到连续的缓冲区中。

在你的情况下,你正在读取多个记录批次(连续的Arrays组),并将它们视为一个大表。有几种不同的方法可以处理这些数据:

一种方法是使用TableReader

tr := array.NewTableReader(tbl, 5)
defer tr.Release()

for tr.Next() {
    rec := tr.Record()
    for i, col := range rec.Columns() {
        // 对Array进行操作
    }
}

另一种方法是直接与列进行交互,就像你在示例中所做的那样:

for i := 0; i < table.NumCols(); i++ {
    col := table.Column(i)
    for _, chunk := range col.Data().Chunks() {
        // 对chunk(一个arrow.Array)进行操作
    }
}

无论哪种方法,最终你都会得到一个arrow.Array,它是一个包含一种特定类型的Array的接口。此时,你需要根据某些条件进行切换,可以根据Array本身的类型进行类型切换:

switch arr := col.(type) {
case *array.Int64:
    // 对arr进行操作
case *array.Int32:
    // 对arr进行操作
case *array.String:
    // 对arr进行操作
...
}

或者,你可以根据数据类型进行类型切换:

switch col.DataType().ID() {
case arrow.INT64:
    // 需要类型断言 col.(*array.Int64)
case arrow.INT32:
    // 需要类型断言 col.(*array.Int32)
...
}

要从数组中获取数据,存储连续的原始类型通常具有*Values方法,该方法将返回该类型的切片。例如,array.Int64具有Int64Values()方法,返回[]int64。否则,所有类型都有.Value(int)方法,该方法返回特定索引处的值,就像你在示例中展示的那样。

希望对你有所帮助!

英文:

When working with Arrow data, there's a couple concepts to understand:

Array: Metadata + contiguous buffers of data

Record Batch: A schema + a collection of Arrays that are all the same length.

Chunked Array: A group of Arrays of varying lengths but all the same data type. This allows you to treat multiple Arrays as one single column of data without having to copy them all into a contiguous buffer.

Column: Is just a Field + a Chunked Array

Table: A collection of Columns allowing you to treat multiple non-contiguous arrays as a single large table without having to copy them all into contiguous buffers.

In your case, you're reading multiple record batches (groups of contiguous Arrays) and treating them as a single large table. There's a few different ways you can work with the data:

One way is to use a TableReader:

tr := array.NewTableReader(tbl, 5)
defer tr.Release()

for tr.Next() {
	rec := tr.Record()
	for i, col := range rec.Columns() {
		// do something with the Array
	}
}

Another way would be to interact with the columns directly as you were in your example:

for i := 0; i &lt; table.NumCols(); i++ {
    col := table.Column(i)
    for _, chunk := range col.Data().Chunks() {
        // do something with chunk (an arrow.Array)
    }
}

Either way, you eventually have an arrow.Array to deal with, which is an interface containing one of the typed Array types. At this point you are going to have to switch on something, you could type switch on the type of the Array itself:

switch arr := col.(type) {
case *array.Int64:
    // do stuff with arr
case *array.Int32:
    // do stuff with arr
case *array.String:
    // do stuff with arr
...
}

Alternately, you could type switch on the data type:

switch col.DataType().ID() {
case arrow.INT64:
    // type assertion needed col.(*array.Int64)
case arrow.INT32:
    // type assertion needed col.(*array.Int32)
...
}

For getting the data out of the array, primitive types which are stored contiguously tend to have a *Values method which will return a slice of the type. For example array.Int64 has Int64Values() which returns []int64. Otherwise, all of the types have .Value(int) methods which return the value at a particular index as you showed in your example.

Hope this helps!

答案2

得分: -1

确保你使用的是v9版本 (import "github.com/apache/arrow/go/v9/arrow"),因为它已经实现了json.Marshaller(来自go-json)。

使用"github.com/goccy/go-json"进行Marshaler(因为这个链接)。

然后你可以使用TableReader将其Marshal,然后使用类型[]any进行Unmarshal。

在你的示例中可能是这样的:

import (
	"github.com/apache/arrow/go/v9/arrow"
	"github.com/apache/arrow/go/v9/arrow/array"
	"github.com/apache/arrow/go/v9/arrow/memory"
	"github.com/goccy/go-json"
)

    ...
	tr := array.NewTableReader(tabel, 6)
	defer tr.Release()
	// fmt.Printf("tbl.NumRows() = %+v\n", tbl.NumRows())
	// fmt.Printf("tbl.NumColumn = %+v\n", tbl.NumCols())

    // keySlice is for sorting same as data source
	keySlice := make([]string, 0, tabel.NumCols())

	res := make(map[string][]any, 0)
	var key string
	for tr.Next() {
		rec := tr.Record()

		for i, col := range rec.Columns() {
			key = rec.ColumnName(i)
			if res[key] == nil {
				res[key] = make([]any, 0)
				keySlice = append(keySlice, key)
			}
			var tmp []any
			b2, err := json.Marshal(col)
			if err != nil {
				panic(err)
			}
			err = json.Unmarshal(b2, &tmp)
			if err != nil {
				panic(err)
			}
			// fmt.Printf("key = %s\n", key)
			// fmt.Printf("tmp = %+v\n", tmp)
			res[key] = append(res[key], tmp...)
		}
	}

	fmt.Println("res", res)
英文:
  1. Make sure you use v9
    (import &quot;github.com/apache/arrow/go/v9/arrow&quot;) because it have implemented json.Marshaller (from go-json)
  2. Use &quot;github.com/goccy/go-json&quot; for Marshaler (because of this)

Then you can use TableReader to Marshal it then Unmarshal with type []any

In your example maybe look like this:

import (
	&quot;github.com/apache/arrow/go/v9/arrow&quot;
	&quot;github.com/apache/arrow/go/v9/arrow/array&quot;
	&quot;github.com/apache/arrow/go/v9/arrow/memory&quot;
	&quot;github.com/goccy/go-json&quot;
)

    ...
	tr := array.NewTableReader(tabel, 6)
	defer tr.Release()
	// fmt.Printf(&quot;tbl.NumRows() = %+v\n&quot;, tbl.NumRows())
	// fmt.Printf(&quot;tbl.NumColumn = %+v\n&quot;, tbl.NumCols())

    // keySlice is for sorting same as data source
	keySlice := make([]string, 0, tabel.NumCols())

	res := make(map[string][]any, 0)
	var key string
	for tr.Next() {
		rec := tr.Record()

		for i, col := range rec.Columns() {
			key = rec.ColumnName(i)
			if res[key] == nil {
				res[key] = make([]any, 0)
				keySlice = append(keySlice, key)
			}
			var tmp []any
			b2, err := json.Marshal(col)
			if err != nil {
				panic(err)
			}
			err = json.Unmarshal(b2, &amp;tmp)
			if err != nil {
				panic(err)
			}
			// fmt.Printf(&quot;key = %s\n&quot;, key)
			// fmt.Printf(&quot;tmp = %+v\n&quot;, tmp)
			res[key] = append(res[key], tmp...)
		}
	}

	fmt.Println(&quot;res&quot;, res)

huangapple
  • 本文由 发表于 2022年8月25日 18:39:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/73486082.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定