英文:
Partition events by time interval using Apache Arrow in Go
问题
我正在尝试根据时间间隔从Kafka
收集的一些事件进行拆分。基本上,目标是读取datetime
列中的值,运行一个简单的公式来检查当前事件是否在当前interval
中。如果是,则将事件附加到RecordBuilder
,否则将事件组(一个segment
)刷新到parquet
文件中。
以下是我目前的代码:
type Segment struct {
mu sync.Mutex
schema *arrow.Schema
evtStruct *arrow.StructType
builder *array.RecordBuilder
writer *pqarrow.FileWriter
timestampIndex int
}
func NewSegment(dir string, datetimeFieldName string, schema *arrow.Schema) (*Segment, error) {
// 其他初始化在这里
// ...
// 创建一个parquet文件
pFile, err := os.Create(fileName)
if err != nil {
return nil, err
}
w, err := pqarrow.NewFileWriter(schema, pFile, props, pqarrow.DefaultWriterProps())
if err != nil {
panic(err)
}
// 创建用于插入数据到arrow的新记录构建器
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
b := array.NewRecordBuilder(mem, schema)
evtStruct := arrow.StructOf(schema.Fields()...)
idx, ok := evtStruct.FieldIdx(datetimeFieldName)
if !ok {
return nil, fmt.Errorf("无法找到datetime列")
}
return &Segment{
schema: schema,
evtStruct: evtStruct,
mu: sync.Mutex{},
builder: b,
writer: w,
timestampIndex: idx,
}, nil
}
// data来自Kafka,表示单个事件
func (s *Segment) InsertData(data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
// TODO: 在此处按时间间隔进行分区
// 提取datetime值 --> dtVal(Unix时间戳)
// 现在假设间隔为5分钟
// dtPartition = math.floor(dtVal, 5*60)
/* if dtPartition > oldDtPartition {
return s.Flush()
}
*/
// 将数据附加到当前构建器
if err := s.builder.UnmarshalJSON(data); err != nil {
return err
}
return nil
}
// Flush将段持久化到磁盘
func (s *Segment) Flush() error {
s.mu.Lock()
defer s.mu.Unlock()
rec := s.builder.NewRecord()
// 可关闭的
defer s.builder.Release()
defer s.writer.Close()
defer rec.Release()
// 写入parquet文件
if err := s.writer.WriteBuffered(rec); err != nil {
return err
}
return nil
}
问题是我无法对InsertData
函数的data
输入参数进行Unmarshal
,因为没有可以将其解组的struct
。我能够创建一个arrow.Schema
和一个arrow.StructType
,因为我正在制作的服务允许用户定义事件的模式。因此,我正在尝试找到一种方法来读取事件中的datetime
值,并决定它属于哪个interval
。
在InsertData
函数中,我添加了一些愚蠢的伪代码,说明我想要实现的内容。也许Apache Arrow有一些函数可以帮助我完成我想做的事情。提前谢谢你。
英文:
I'm trying to split some events that I'm collecting from Kafka
based on a time interval. Basically, the goal is to read the value in the datetime
column, run a simple formula to check if the current event falls in the current interval
. If yes, then append the event to the RecordBuilder
otherwise flush
the group of events (a segment
) to a parquet
file.
Here is the code I have so far:
type Segment struct {
mu sync.Mutex
schema *arrow.Schema
evtStruct *arrow.StructType
builder *array.RecordBuilder
writer *pqarrow.FileWriter
timestampIndex int
}
func NewSegment(dir string, datetimeFieldName string, schema *arrow.Schema) (*Segment, error) {
// other inits here
// ...
// create a parquet file
pFile, err := os.Create(fileName)
if err != nil {
return nil, err
}
w, err := pqarrow.NewFileWriter(schema, pFile, props, pqarrow.DefaultWriterProps())
if err != nil {
panic(err)
}
// create the new record builder for inserting data to arrow
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
b := array.NewRecordBuilder(mem, schema)
evtStruct := arrow.StructOf(schema.Fields()...)
idx, ok := evtStruct.FieldIdx(datetimeFieldName)
if !ok {
return nil, fmt.Errorf("couldn't find datetime column")
}
return &Segment{
schema: schema,
evtStruct: evtStruct,
mu: sync.Mutex{},
builder: b,
writer: w,
timestampIndex: idx,
}, nil
}
// data comes from Kafka and it represent a single event
func (s *Segment) InsertData(data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
// TODO: do partition by interval here
// extract the datetime value --> dtVal (unix epoch)
// assume for now an interval of 5 minutes
// dtPartition = math.floor(dtVal, 5*60)
/* if dtPartition > oldDtPartition {
return s.Flush()
}
*/
// append the data to the current builder
if err := s.builder.UnmarshalJSON(data); err != nil {
return err
}
return nil
}
// Flush persist the segment on disk
func (s *Segment) Flush() error {
s.mu.Lock()
defer s.mu.Unlock()
rec := s.builder.NewRecord()
// closable
defer s.builder.Release()
defer s.writer.Close()
defer rec.Release()
// write parquet file
if err := s.writer.WriteBuffered(rec); err != nil {
return err
}
return nil
}
The problem is that I'm not able to "Unmarshal"
the data
input parameter of the InsertData
function because there is no "struct"
that it can be Unmarshaled to. I'm able to create a arrow.Schema
and a arrow.StructType
because the service I'm making allows a user to define the schema of the event. Hence I'm trying to find a way to read the datetime
value in the event and decide in which interval
falls in.
In the function InsertData
I added some silly pseudocode of what I'd like to achieve. Perhaps Apache Arrow has some functions that can help in doing what I'm trying to do. Thank you in advance.
答案1
得分: 1
如果你可以执行s.builder.UnmarshalJSON(data)
,那么data
就是一个JSON值。你可以使用fmt.Printf("%s", data)
打印data
来确认。
如果你确定每个事件都包含一个datetime
列,那么你可以定义一个如下的结构体,以便将data
解析为它:
type event struct {
Datetime int64 `json:"datetime"`
}
这是一个小的示例:
package main
import (
"encoding/json"
"fmt"
)
func getDatatime(data []byte) (int64, error) {
var e struct {
Datetime int64 `json:"datetime"`
}
if err := json.Unmarshal(data, &e); err != nil {
return 0, err
}
return e.Datetime, nil
}
func main() {
data := []byte(`{"datetime": 1685861011, "region": "NY", "model": "3", "sales": 742.0, "extra": 1234}`)
fmt.Println(getDatatime(data))
}
英文:
If you can do this: s.builder.UnmarshalJSON(data)
, then data
is a JSON value. You can print data
with fmt.Printf("%s", data)
to confirm that.
If you're sure that every event contains a datetime
column, then you can define a struct like this so that you can unmarshal the data
to it:
type event struct {
Datetime int64 `json:"datetime"`
}
Here is a small demo:
package main
import (
"encoding/json"
"fmt"
)
func getDatatime(data []byte) (int64, error) {
var e struct {
Datetime int64 `json:"datetime"`
}
if err := json.Unmarshal(data, &e); err != nil {
return 0, err
}
return e.Datetime, nil
}
func main() {
data := []byte(`{"datetime": 1685861011, "region": "NY", "model": "3", "sales": 742.0, "extra": 1234}`)
fmt.Println(getDatatime(data))
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论