Pyflink将Varchar转换为Long吗?

huangapple go评论55阅读模式
英文:

Pyflink->Elastic converts Varchar to Long?

问题

I started working with Pyflink last week and found myself in a roadblock situation. Basically, I try to Import Data from Source A and sink it to Elastic, which works great, but there is one special Field that's not working properly.

The field is a 10 Char string which gets parsed by my PyFlink Job and runs through an encryption routine and converted to hex, which makes the string now 128 chars.

While sinking to elastic, somehow and somewhere the system seems to think of my string as a "long" type.

The following Error is thrown while trying to import:

Caused by: ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [some_encrypted_id] of type [long] in document with id '10'. Preview of field's value: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=For input string: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"]];

My sink definition:

CREATE TABLE es_sink( some_encrypted_id VARCHAR ) with ( 'connector' = 'elasticsearch-7', 'hosts' = 'x', //normally not x 'index' = 'x',//normally not x 'document-id.key-delimiter' = '$', 'sink.bulk-flush.max-size' = '42mb', 'sink.bulk-flush.max-actions' = '32', 'sink.bulk-flush.interval' = '1000', 'sink.bulk-flush.backoff.delay' = '1000', 'format' = 'json' )

I tried replacing Varchar with Text, but while creating the Job I get the following error:

java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: TEXT

I am honestly out of ideas here. I tried multiple different fields, everything seems to work as expected, just this one example is not.

I also don't see why the system tries to sink it as "long" type. I never defined anything as "long".

Hopefully, someone can figure out what I am doing wrong here and point me in the right direction. If more info is needed, please let me know!

  • Tried to replace data fields, nothing worked
  • Tried to use the value of the encryption routine in a "lit" field, same error
  • Rewrote the entire script
英文:

I started working with Pyflink last week and found myself in a roadblock situation.
Basically I try to Import Data from Source A and sink it to Elastic, which works great, but there is one special Field that's not working properly.

The field is a 10 Char string which gets parsed by my PyFlink Job and runs through an encryption routine and converted to hex, which makes the string now 128 chars.

While sinking to elastic, somehow and somewhere the system seems to think of my string as a "long" type.

The following Error is thrown while trying to import:

Caused by: ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [some_encrypted_id] of type [long] in document with id '10'. Preview of field's value: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=For input string: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"]];

My sink definition:

CREATE TABLE es_sink(
                some_encrypted_id VARCHAR
            ) with (
                'connector' = 'elasticsearch-7',
                'hosts' = 'x', //normally not x
                'index' = 'x',//normally not x
                'document-id.key-delimiter' = '$',
                'sink.bulk-flush.max-size' = '42mb',
                'sink.bulk-flush.max-actions' = '32',
                'sink.bulk-flush.interval' = '1000',
                'sink.bulk-flush.backoff.delay' = '1000',
                'format' = 'json'
            )

I tried replacing Varchar with Text, but while creating the Job I get the following error:

java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: TEXT

I am honestly out of ideas here. I tried multiple different fields, everything seems to work as expected, just this one example is not.

I also don't see why the system tries to sink it as "long" type. I never defined anything as "long".

Hopefully someone can figure out what I am doing wrong here and point me to the right direction. If more info is needed please let me know!

  • Tried to replace data fields, nothing worked
  • Tried to use the value of the encryption routine in a "lit" field, same error
  • rewrote the entire script

答案1

得分: 0

你是否在Elasticsearch索引中为字段指定了数据类型?ES将根据您插入的值猜测字段的类型,有时可能不是您预期的类型。

例如,如果您有一个名为AA的索引和一个字段aa,该字段尚未具有类型映射。然后,您的程序按顺序插入'57','abc'等值。当ES首次看到57时,它会猜测这可能是数值类型,并使用类似整数或长整数的类型,而您后续的插入可能会失败。

您可以尝试在写入之前为索引设置映射。PUT mappings

英文:

Did you specify a data type for your fields in your elastic search index? ES will guess a type for the field from the value you inserted, and sometimes it might not be the one you expect.

E.g. if you have an index AA and a field aa, which does not have a type mapping yet. And your program inserts '57', 'abc', ... sequentially. When ES first sees 57, it will guess this might be a numeric type and use something like integer or long, and your subsequent insertions might fail.

You can try putting a mapping for the index before writing. PUT mappings

huangapple
  • 本文由 发表于 2023年4月13日 22:40:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76006758.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定