将多个CSV文件索引到一个具有嵌套字段/对象的索引中

huangapple go评论68阅读模式
英文:

Indexing multiple csv files into one index with nested fields/objects

问题

我想从多个CSV文件(用户、分数、消息)中加载数据到一个索引中,通过logstash。

所有CSV文件都有相同的 "userId" 字段,用于连接其中的数据。

我的目标是将User-Index作为结果,其中包含来自User CSV文件的数据作为简单字段,以及来自Scores和Messages文件的数据作为嵌套字段。

是否有一种方法可以实现这一点?

一个用户可以有多条消息和分数。

我不确定是否正确理解了合并的方式,这是我尝试的logstash配置。

input {
    file {
        path => "C:/resources/files/users.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
    file {
        path => "C:/resources/files/scores.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
    file {
        path => "C:/resources/files/messages.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
}

filter {
    if [log][path] == "C:/resources/files/users.csv" {
        csv {
            separator => ","
            columns => ["userId", "username", "email"]
        }
        mutate {
            remove_field => ["[event][original]", "[log][path]", "[log][path][keyword]", "[message]", "[message][keyword]"]
        }
    }

    if [log][path] == "C:/resources/files/scores.csv" {
        csv{
            separator => ","
            columns => ["userId", "field", "score"]
        }
        translate { 
            destination => "[@metadata][scores]" 
            dictionary_path => "C:/resources/files/scores.csv"
            field => "userId" 
        }
        dissect { 
            mapping => { 
                "[@metadata][scores]" => "%{field};%{score}" 
            } 
        }
    }

    if [log][path] == "C:/resources/files/messages.csv" {
        csv {
            separator => ","
            columns => ["userId", "message", "tag"]
        }
        translate { 
            destination => "[@metadata][messages]" 
            dictionary_path => "C:/resources/files/messages.csv"
            field => "userId" 
        }
        dissect { 
            mapping => { 
                "[@metadata][messages]" => "%{message};%{tag}" 
            } 
        }
    }
}

output {
    elasticsearch{
        action => "create"
        hosts => "localhost:9200"
        index => "users-index"
    }
}
英文:

I want to load data from multiple CSV files(Users, Scores, Messages) into one index via logstash.
All CSV files have the same "userId" field that connects data in it.

My goal is to have User-Index as a result, that has the data from the User CSV file as simple fields and the data from Scores and Messages files as nested fields.

Is there a way to somehow achieve this?

One user can have multiple messages and scores.

I am not sure, that i got the idea of merging the correct way, here's the logstash config i tried.

input {
    file {
		path => "C:/resources/files/users.csv"
		start_position => "beginning"
		sincedb_path => "NUL"
	}
	file {
		path => "C:/resources/files/scores.csv"
		start_position => "beginning"
		sincedb_path => "NUL"
	}
	file {
		path => "C:/resources/files/messages.csv"
		start_position => "beginning"
		sincedb_path => "NUL"
	}
}    
              
filter {

	if [log][path] == "C:/resources/files/users.csv" {
		csv {
			separator => ","
            columns => ["userId", "username", "email"]
        }
		mutate {
		remove_field => ["[event][original]", "[log][path]", "[log][path][keyword]", "[message]", "[message][keyword]"]
		}
	}
	
	if [log][path] == "C:/resources/files/scores.csv" {
		csv{
			separator => ","
			columns => ["userId", "field", "score"]
		}
		
		translate { 
			destination => "[@metadata][scores]" 
		    dictionary_path => "C:/resources/files/scores.csv"
			field => "userId" 
			}
        dissect { 
			mapping => { 
			"[@metadata][scores]" => "%{field};%{score}" 
			} 
		}
	}
	
	if [log][path] == "C:/resources/files/messages.csv" {
		csv {
			separator => ","
			columns => ["userId", "message", "tag"]
		}
		
		translate { 
			destination => "[@metadata][messages]" 
		    dictionary_path => "C:/resources/files/messages.csv"
			field => "userId" 
			}
			
        dissect { 
			mapping => { 
			"[@metadata][messages]" => "%{message};%{tag}" 
			} 
		}
	}
		
}

output {
    elasticsearch{
		action => "create"
        hosts => "localhost:9200"
        index => "users-index"
    }
	
}
	

答案1

得分: 0

这不会按照您的预期方式工作。您首先需要将分数和消息数据加载到单独的索引中。

然后,您可以为这两个索引构建一个丰富策略

接下来,您可以创建一个摄取管道,利用在上一步中构建的两个丰富策略。

最后,您可以修改您的Logstash配置,以便在每个用户记录中使用在上一步中创建的摄取管道,以丰富每个用户的得分和消息。

英文:

It's not going to work the way you expect. You first need to load the scores and messages data into separate indexes.

Then, out of those two indexes you can build up an enrich policy for each of them.

Next, you can create an ingest pipeline that leverages the two enrich policies built during the previous step.

Finally, you can modify your Logstash configuration to use the ingest pipeline created during the previous step in order to enrich each user record with the appropriate scores and messages for that user.

huangapple
  • 本文由 发表于 2023年1月9日 19:14:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/75056470.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定