使用Go语言中的Apache Arrow按时间间隔对事件进行分区。

huangapple go评论79阅读模式
英文:

Partition events by time interval using Apache Arrow in Go

问题

我正在尝试根据时间间隔从Kafka收集的一些事件进行拆分。基本上,目标是读取datetime列中的值,运行一个简单的公式来检查当前事件是否在当前interval中。如果是,则将事件附加到RecordBuilder,否则将事件组(一个segment)刷新到parquet文件中。

以下是我目前的代码:

type Segment struct {
    mu             sync.Mutex
    schema         *arrow.Schema
    evtStruct      *arrow.StructType
    builder        *array.RecordBuilder
    writer         *pqarrow.FileWriter
    timestampIndex int
}

func NewSegment(dir string, datetimeFieldName string, schema *arrow.Schema) (*Segment, error) {
        // 其他初始化在这里
        // ...

	// 创建一个parquet文件
	pFile, err := os.Create(fileName)
	if err != nil {
		return nil, err
	}
	w, err := pqarrow.NewFileWriter(schema, pFile, props, pqarrow.DefaultWriterProps())
	if err != nil {
	    panic(err)
	}

	// 创建用于插入数据到arrow的新记录构建器
	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
	b := array.NewRecordBuilder(mem, schema)

	evtStruct := arrow.StructOf(schema.Fields()...)
	idx, ok := evtStruct.FieldIdx(datetimeFieldName)
	if !ok {
	    return nil, fmt.Errorf("无法找到datetime列")
	}

	return &Segment{
		schema:         schema,
		evtStruct:      evtStruct,
		mu:             sync.Mutex{},
		builder:        b,
		writer:         w,
		timestampIndex: idx,
	}, nil
}

// data来自Kafka,表示单个事件
func (s *Segment) InsertData(data []byte) error {
	s.mu.Lock()
	defer s.mu.Unlock()

        // TODO: 在此处按时间间隔进行分区
        // 提取datetime值 --> dtVal(Unix时间戳)
        // 现在假设间隔为5分钟
        // dtPartition = math.floor(dtVal, 5*60)
        /* if dtPartition > oldDtPartition {
                return s.Flush()
           }
        */

        // 将数据附加到当前构建器
	if err := s.builder.UnmarshalJSON(data); err != nil {
		return err
	}

	return nil
}

// Flush将段持久化到磁盘
func (s *Segment) Flush() error {
	s.mu.Lock()
	defer s.mu.Unlock()

	rec := s.builder.NewRecord()

	// 可关闭的
	defer s.builder.Release()
	defer s.writer.Close()
	defer rec.Release()

	// 写入parquet文件
	if err := s.writer.WriteBuffered(rec); err != nil {
		return err
	}

	return nil
}

问题是我无法对InsertData函数的data输入参数进行Unmarshal,因为没有可以将其解组的struct。我能够创建一个arrow.Schema和一个arrow.StructType,因为我正在制作的服务允许用户定义事件的模式。因此,我正在尝试找到一种方法来读取事件中的datetime值,并决定它属于哪个interval

InsertData函数中,我添加了一些愚蠢的伪代码,说明我想要实现的内容。也许Apache Arrow有一些函数可以帮助我完成我想做的事情。提前谢谢你。

英文:

I'm trying to split some events that I'm collecting from Kafka based on a time interval. Basically, the goal is to read the value in the datetime column, run a simple formula to check if the current event falls in the current interval. If yes, then append the event to the RecordBuilder otherwise flush the group of events (a segment) to a parquet file.

Here is the code I have so far:

type Segment struct {
    mu             sync.Mutex
    schema         *arrow.Schema
    evtStruct      *arrow.StructType
    builder        *array.RecordBuilder
    writer         *pqarrow.FileWriter
    timestampIndex int
}

func NewSegment(dir string, datetimeFieldName string, schema *arrow.Schema) (*Segment, error) {
        // other inits here
        // ...

	// create a parquet file
	pFile, err := os.Create(fileName)
	if err != nil {
		return nil, err
	}
	w, err := pqarrow.NewFileWriter(schema, pFile, props, pqarrow.DefaultWriterProps())
	if err != nil {
	    panic(err)
	}

	// create the new record builder for inserting data to arrow
	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
	b := array.NewRecordBuilder(mem, schema)

	evtStruct := arrow.StructOf(schema.Fields()...)
	idx, ok := evtStruct.FieldIdx(datetimeFieldName)
	if !ok {
	    return nil, fmt.Errorf("couldn't find datetime column")
	}

	return &Segment{
		schema:         schema,
		evtStruct:      evtStruct,
		mu:             sync.Mutex{},
		builder:        b,
		writer:         w,
		timestampIndex: idx,
	}, nil
}

// data comes from Kafka and it represent a single event
func (s *Segment) InsertData(data []byte) error {
	s.mu.Lock()
	defer s.mu.Unlock()

        // TODO: do partition by interval here
        // extract the datetime value --> dtVal (unix epoch)
        // assume for now an interval of 5 minutes
        // dtPartition = math.floor(dtVal, 5*60)
        /* if dtPartition > oldDtPartition {
                return s.Flush()
           }
        */

        // append the data to the current builder
	if err := s.builder.UnmarshalJSON(data); err != nil {
		return err
	}

	return nil
}

// Flush persist the segment on disk
func (s *Segment) Flush() error {
	s.mu.Lock()
	defer s.mu.Unlock()

	rec := s.builder.NewRecord()

	// closable
	defer s.builder.Release()
	defer s.writer.Close()
	defer rec.Release()

	// write parquet file
	if err := s.writer.WriteBuffered(rec); err != nil {
		return err
	}

	return nil
}

The problem is that I'm not able to "Unmarshal" the data input parameter of the InsertData function because there is no "struct" that it can be Unmarshaled to. I'm able to create a arrow.Schema and a arrow.StructType because the service I'm making allows a user to define the schema of the event. Hence I'm trying to find a way to read the datetime value in the event and decide in which interval falls in.

In the function InsertData I added some silly pseudocode of what I'd like to achieve. Perhaps Apache Arrow has some functions that can help in doing what I'm trying to do. Thank you in advance.

答案1

得分: 1

如果你可以执行s.builder.UnmarshalJSON(data),那么data就是一个JSON值。你可以使用fmt.Printf("%s", data)打印data来确认。

如果你确定每个事件都包含一个datetime列,那么你可以定义一个如下的结构体,以便将data解析为它:

type event struct {
	Datetime int64 `json:"datetime"`
}

这是一个小的示例:

package main

import (
	"encoding/json"
	"fmt"
)

func getDatatime(data []byte) (int64, error) {
	var e struct {
		Datetime int64 `json:"datetime"`
	}
	if err := json.Unmarshal(data, &e); err != nil {
		return 0, err
	}

	return e.Datetime, nil
}

func main() {
	data := []byte(`{"datetime": 1685861011, "region": "NY", "model": "3", "sales": 742.0, "extra": 1234}`)

	fmt.Println(getDatatime(data))
}
英文:

If you can do this: s.builder.UnmarshalJSON(data), then data is a JSON value. You can print data with fmt.Printf("%s", data) to confirm that.

If you're sure that every event contains a datetime column, then you can define a struct like this so that you can unmarshal the data to it:

type event struct {
	Datetime int64 `json:"datetime"`
}

Here is a small demo:

package main

import (
	"encoding/json"
	"fmt"
)

func getDatatime(data []byte) (int64, error) {
	var e struct {
		Datetime int64 `json:"datetime"`
	}
	if err := json.Unmarshal(data, &e); err != nil {
		return 0, err
	}

	return e.Datetime, nil
}

func main() {
	data := []byte(`{"datetime": 1685861011, "region": "NY", "model": "3", "sales": 742.0, "extra": 1234}`)

	fmt.Println(getDatatime(data))
}

huangapple
  • 本文由 发表于 2023年6月4日 04:00:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76397832.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定