MySQL到JSON的提取不一致

huangapple go评论198阅读模式
英文:

MySQL to JSON inconsistent extraction

问题

我有一个包含6个表和大约200万行数据的MySQL数据库。

我想将所有数据迁移到MongoDB。

我决定通过将SQL表转换为JSON并导入到MongoDB来实现这一目标。

我使用Golang编写了一个程序来提取数据并将其输出为JSON。

以下是程序的主要函数:

  1. func main() {
  2. // 打开数据库连接
  3. var err error
  4. db, err = sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/employees")
  5. checkErr(err)
  6. // 检查是否可达
  7. if err = db.Ping(); err != nil {
  8. log.Fatal("无法连接到数据库:", err)
  9. }
  10. // 填充变量数据
  11. err = populateVars()
  12. checkErr(err)
  13. // 将变量转换为JSON
  14. binaryJSON, err := json.Marshal(collection)
  15. checkErr(err)
  16. // 将JSON写入文件
  17. err = writeStringToFile("/home/user01/Temporary/sql2data.json", string(binaryJSON))
  18. checkErr(err)
  19. }

问题是输出结果不一致。

每次运行程序时,生成的文件大小都不同,并且一些随机字段丢失了。

这可能是什么原因?

看起来不像是程序逻辑的问题,因为所有操作都没有报错,并且大多数字段都被正确填充。

我是否读取信息过快,导致有时会丢失一些内容?

还是我漏掉了其他什么东西?

编辑:

大部分工作都在populateVars()函数调用中完成。

它有多个代码块,执行给定的SQL查询并根据模式填充结构变量。

以下是其中一个代码块:

  1. rows, err = db.Query("SELECT emp_no, dept_emp.dept_no, dept_name, from_date, to_date FROM dept_emp JOIN departments ON departments.dept_no = dept_emp.dept_no;")
  2. checkErr(err)
  3. i := 0
  4. for rows.Next() {
  5. var id int
  6. var depNumber string
  7. var depName string
  8. var fromDate string
  9. var toDate string
  10. var position = "Employee"
  11. err = rows.Scan(&id, &depNumber, &depName, &fromDate, &toDate,)
  12. // 用于调试:
  13. fmt.Println(id, depNumber, depName, fromDate, toDate, position, i)
  14. if err != nil {
  15. return err
  16. }
  17. for i := range collection {
  18. if collection[i].ID == id {
  19. collection[i].Departments = append(collection[i].Departments, Department{DepartmentNumber: depNumber, DepartmentName: depName, FromDate: fromDate, ToDate: toDate, Position: position})
  20. // 用于调试:
  21. fmt.Println(collection[i].Departments)
  22. }
  23. }
  24. i++
  25. }

这是整个程序的GitHub链接:
https://github.com/dchmie01/mysql_to_json/blob/master/main.go

编辑2:

问题似乎与查询超时有关。

每个查询大约需要10分钟执行,但在大约6分钟后,我收到以下错误,并且程序停止执行查询:

  1. [mysql] 2017/04/29 17:35:16 packets.go:66: unexpected EOF
  2. [mysql] 2017/04/29 17:35:16 packets.go:412: busy buffer
  3. 2017/04/29 17:35:16 driver: bad connection

在MySQL日志文件中,显示如下:

  1. 2017-04-29T16:28:49.975805Z 102 [Note] Aborted connection 102 to db: 'employees' user: 'root' host: 'localhost' (Got timeout writing communication packets)

到目前为止,我尝试调整MySQL变量以禁用可能存在的任何超时,但没有成功。

我认为问题可能出在Go的mysql驱动程序上。

英文:

I have a MySQL database with 6 tables and about 2 million rows all together.

I want to migrate all the data into MongoDB.

I decided to do this by converting the SQL tables into JSON and importing it to MongoDB.

I wrote a program in Golang to extract the data and output it as JSON.

This is the main function of the program:

  1. func main() {
  2. // Open a database connection
  3. var err error
  4. db, err = sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/employees")
  5. checkErr(err)
  6. // Check if reachable
  7. if err = db.Ping(); err != nil {
  8. log.Fatal("Database is unreachable:", err)
  9. }
  10. // Populate variables with data
  11. err = populateVars()
  12. checkErr(err)
  13. // Marshal variables into JSON
  14. binaryJSON, err := json.Marshal(collection)
  15. checkErr(err)
  16. // Write JSON to a file
  17. err = writeStringToFile("/home/user01/Temporary/sql2data.json", string(binaryJSON))
  18. checkErr(err)
  19. }

The problem is that the output is inconsistent.

Every time I run the program, the resulting file has a different size and some random fields are missing.

What could be causing this?

It doesn't seem like it's a problem with the logic of the program since everything executes without errors, and most fields are populated just fine.

Could I be reading the information too fast, so that some things get lost occasionally?

Or is there something else that I'm missing?

Edit:

Most of the work happens inside the populateVars() function call.

It has multiple blocks of code that execute a given SQL query and populate struct variables according to the schema.

This is one such block:

  1. rows, err = db.Query("SELECT emp_no, dept_emp.dept_no, dept_name, from_date, to_date FROM dept_emp JOIN departments ON departments.dept_no = dept_emp.dept_no;")
  2. checkErr(err)
  3. i := 0
  4. for rows.Next() {
  5. var id int
  6. var depNumber string
  7. var depName string
  8. var fromDate string
  9. var toDate string
  10. var position = "Employee"
  11. err = rows.Scan(&id, &depNumber, &depName, &fromDate, &toDate,)
  12. // For debugging purposes:
  13. fmt.Println(id, depNumber, depName, fromDate, toDate, position, i)
  14. if err != nil {
  15. return err
  16. }
  17. for i := range collection {
  18. if collection[i].ID == id {
  19. collection[i].Departments = append(collection[i].Departments, Department{DepartmentNumber: depNumber, DepartmentName: depName, FromDate: fromDate, ToDate: toDate, Position: position})
  20. // For debugging purposes:
  21. fmt.Println(collection[i].Departments)
  22. }
  23. }
  24. i++
  25. }

Here's a GitHub link to the whole program:
https://github.com/dchmie01/mysql_to_json/blob/master/main.go

Edit 2:

It seems like the issue has to do with query timeout.

Each query takes about 10 min to execute but at about 6 minutes in, I get this error, and the program stops executing the query:

  1. [mysql] 2017/04/29 17:35:16 packets.go:66: unexpected EOF
  2. [mysql] 2017/04/29 17:35:16 packets.go:412: busy buffer
  3. 2017/04/29 17:35:16 driver: bad connection

And in the MySQL log file it says:

  1. 2017-04-29T16:28:49.975805Z 102 [Note] Aborted connection 102 to db: 'employees' user: 'root' host: 'localhost' (Got timeout writing communication packets)

So far I tried playing around with MySQL variables to disable any timeouts that might be present, but no luck.

I think the issue might be with the mysql driver for Go.

答案1

得分: 1

考虑使用Mysql SELECT INTO OUTFILEmongoimport --type csv来替代。

该程序的唯一功能是嵌入一对多和多对多的文档,这可以通过聚合框架轻松完成。

以下是一个逐步示例:

  1. 从MySQL导出CSV文件

    1. SELECT * FROM employees INTO OUTFILE '/tmp/employees.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    2. SELECT * FROM salaries INTO OUTFILE '/tmp/salaries.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    3. SELECT * FROM titles INTO OUTFILE '/tmp/titles.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    4. SELECT * FROM departments INTO OUTFILE '/tmp/departments.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    5. SELECT * FROM dept_emp INTO OUTFILE '/tmp/dept_emp.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    6. SELECT * FROM dept_manager INTO OUTFILE '/tmp/dept_manager.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
  2. 将CSV文件导入MongoDB(根据你的模式定义字段规范,以下是employees字段规范的示例)

    1. mongoimport -d <dbname> -c tmp_employees -f 'id.int32(),birth.date(2006-01-02),first_name.string(),last_name.string(),gender.string(),hire_date.date(2006-01-02)' --columnsHaveTypes --type csv --file /tmp/employees.csv --drop
    2. mongoimport -d <dbname> -c tmp_salaries -f 'field spec' --columnsHaveTypes --type csv --file /tmp/salaries.csv --drop
    3. mongoimport -d <dbname> -c tmp_titles -f 'field spec' --columnsHaveTypes --type csv --file /tmp/titles.csv --drop
    4. mongoimport -d <dbname> -c tmp_departments -f 'field spec' --columnsHaveTypes --type csv --file /tmp/departments.csv --drop
    5. mongoimport -d <dbname> -c tmp_dept_emp -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_emp.csv --drop
    6. mongoimport -d <dbname> -c tmp_dept_manager -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_manager.csv --drop
  3. 从MongoDB Shell嵌入数据

    1. db.tmp_employees.aggregate([
    2. // 一对多连接
    3. {
    4. $lookup: {
    5. from: 'tmp_salaries',
    6. localField: 'id',
    7. foreignField: 'emp_no',
    8. as: 'salaries'
    9. }
    10. },
    11. {
    12. $lookup: {
    13. from: 'tmp_titles',
    14. localField: 'id',
    15. foreignField: 'emp_no',
    16. as: 'titles'
    17. }
    18. },
    19. // 多对多连接
    20. {
    21. $lookup: {
    22. from: 'tmp_dept_emp',
    23. localField: 'id',
    24. foreignField: 'emp_no',
    25. as: 'dept_emp'
    26. }
    27. },
    28. {
    29. $lookup: {
    30. from: 'tmp_dept_manager',
    31. localField: 'id',
    32. foreignField: 'emp_no',
    33. as: 'dept_manager'
    34. }
    35. },
    36. {
    37. $unwind: {
    38. path: '$dept_emp',
    39. preserveNullAndEmptyArrays: true
    40. }
    41. },
    42. {
    43. $lookup: {
    44. from: 'tmp_departments',
    45. localField: 'dept_emp.dept_no',
    46. foreignField: 'dept_no',
    47. as: 'dept_emp_deps'
    48. }
    49. },
    50. {
    51. $unwind: {
    52. path: '$dept_emp_deps',
    53. preserveNullAndEmptyArrays: true
    54. }
    55. },
    56. {
    57. $group: {
    58. _id: '$_id',
    59. root: {
    60. $first: '$$ROOT'
    61. },
    62. dept_manager: {
    63. $first: '$dept_manager'
    64. },
    65. departments_emp: {
    66. $push: {
    67. department_number: '$dept_emp.emp_no',
    68. department_name: '$dept_emp_deps.dept_name',
    69. from_date: '$dept_emp.from_date',
    70. to_date: '$dept_emp.to_date',
    71. position: '$dept_emp.position'
    72. }
    73. }
    74. }
    75. },
    76. {
    77. $unwind: {
    78. path: '$dept_manager',
    79. preserveNullAndEmptyArrays: true
    80. }
    81. },
    82. {
    83. $lookup: {
    84. from: 'tmp_departments',
    85. localField: 'dept_manager.dept_no',
    86. foreignField: 'dept_no',
    87. as: 'dept_manager_deps'
    88. }
    89. },
    90. {
    91. $unwind: {
    92. path: '$dept_manager_deps',
    93. preserveNullAndEmptyArrays: true
    94. }
    95. },
    96. {
    97. $group: {
    98. _id: '$_id',
    99. root: {
    100. $first: '$root'
    101. },
    102. departments_emp: {
    103. $first: '$departments_emp'
    104. },
    105. departments_manager: {
    106. $push: {
    107. department_number: '$dept_manager.emp_no',
    108. department_name: '$dept_manager_deps.dept_name',
    109. from_date: '$dept_manager.from_date',
    110. to_date: '$dept_manager.to_date',
    111. position: '$dept_manager.position'
    112. }
    113. }
    114. }
    115. },
    116. // 将部门组合为单个数组
    117. {
    118. $project: {
    119. root: 1,
    120. departments_all: {
    121. $concatArrays: ['$departments_emp', '$departments_manager']
    122. }
    123. }
    124. },
    125. // 最终重塑
    126. {
    127. $project: {
    128. id: '$root.id',
    129. birth_date: '$root.birth_date',
    130. first_name: '$root.first_name',
    131. last_name: '$root.last_name',
    132. gender: '$root.gender',
    133. hire_date: '$root.hire_date',
    134. salaries: '$root.salaries',
    135. titles: '$root.titles',
    136. departments: {
    137. $filter: {
    138. input: '$departments_all',
    139. as: 'departments',
    140. cond: {
    141. $ne: ['$$departments', {}]
    142. }
    143. }
    144. }
    145. }
    146. },
    147. {
    148. $out: 'employees'
    149. }
    150. ])
  4. 从MongoDB Shell删除导入的集合

    1. db.tmp_employees.drop();
    2. db.tmp_salaries.drop();
    3. db.tmp_titles.drop();
    4. db.tmp_departments.drop();
    5. db.tmp_dept_emp.drop();
    6. db.tmp_dept_manager.drop();
英文:

Consider to use Mysql SELECT INTO OUTFILE and mongoiport --type csv instead.

The only thing that the program does is embedding 1-to-many and many-to-many documents, which can be easily done with aggregation framework.

A step-by step example:

  1. export csv from mysql

    1. SELECT * from employees INTO OUTFILE &#39;/tmp/employees.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    2. SELECT * from salaries INTO OUTFILE &#39;/tmp/salaries.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    3. SELECT * from titles INTO OUTFILE &#39;/tmp/titles.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    4. SELECT * from departments INTO OUTFILE &#39;/tmp.departments.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    5. SELECT * from dept_emp INTO OUTFILE &#39;/tmp/dept_emp.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    6. SELECT * from dept_manager INTO OUTFILE &#39;/tmp/dept_manager.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
  2. import csv into mongo (define 'field spec' according to your schema, see example for employees field spec)

    1. mongoimport -d &lt;dbname&gt; -c tmp_employees -f &#39;id.int32(),birth.date(2006-01-02),first_name.string(),last_name.string(),gender.string(),hire_date.date(2006-01-02)&#39; --columnsHaveTypes --type csv --file /tmp/employees.csv --drop
    2. mongoimport -d &lt;dbname&gt; -c tmp_salaries -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/salaries.csv --drop
    3. mongoimport -d &lt;dbname&gt; -c tmp_titles -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/titles.csv --drop
    4. mongoimport -d &lt;dbname&gt; -c tmp_departments -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/departments.csv --drop
    5. mongoimport -d &lt;dbname&gt; -c tmp_dept_emp -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/dept_emp.csv --drop
    6. mongoimport -d &lt;dbname&gt; -c tmp_dept_manager -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/dept_manager.csv --drop
  3. embed data from mongo shell

    1. db.tmp_employees.aggregate([
    2. // 1-to-many joins
    3. {$lookup: {
    4. from: &#39;tmp_salaries&#39;,
    5. localField: &#39;id&#39;,
    6. foreignField: &#39;emp_no&#39;,
    7. as: &#39;salaries&#39;
    8. }},
    9. {$lookup: {
    10. from: &#39;tmp_titles&#39;,
    11. localField: &#39;id&#39;,
    12. foreignField: &#39;emp_no&#39;,
    13. as: &#39;titles&#39;
    14. }},
    15. // many-to-many joins
    16. {$lookup: {
    17. from: &#39;tmp_dept_emp&#39;,
    18. localField: &#39;id&#39;,
    19. foreignField: &#39;emp_no&#39;,
    20. as: &#39;dept_emp&#39;
    21. }},
    22. {$lookup: {
    23. from: &#39;tmp_dept_manager&#39;,
    24. localField: &#39;id&#39;,
    25. foreignField: &#39;emp_no&#39;,
    26. as: &#39;dept_manager&#39;
    27. }},
    28. {$unwind: { path: &#39;$dept_emp&#39;, preserveNullAndEmptyArrays: true }},
    29. {$lookup: {
    30. from: &#39;tmp_departments&#39;,
    31. localField: &#39;dept_emp.dept_no&#39;,
    32. foreignField: &#39;dept_no&#39;,
    33. as: &#39;dept_emp_deps&#39;
    34. }},
    35. {$unwind: { path: &#39;$dept_emp_deps&#39;, preserveNullAndEmptyArrays: true }},
    36. {$group: {
    37. _id: &#39;$_id&#39;,
    38. root: {$first: &#39;$$ROOT&#39;},
    39. dept_manager: {$first: &#39;$dept_manager&#39;},
    40. departments_emp: {$push: {
    41. department_number: &#39;$dept_emp.emp_no&#39;,
    42. department_name: &#39;$dept_emp_deps.dept_name&#39;,
    43. from_date: &#39;$dept_emp.from_date&#39;,
    44. to_date: &#39;$dept_emp.to_date&#39;,
    45. position: &#39;$dept_emp.position&#39;
    46. }},
    47. }},
    48. {$unwind: { path: &#39;$dept_manager&#39;, preserveNullAndEmptyArrays: true }},
    49. {$lookup: {
    50. from: &#39;tmp_departments&#39;,
    51. localField: &#39;dept_manager.dept_no&#39;,
    52. foreignField: &#39;dept_no&#39;,
    53. as: &#39;dept_manager_deps&#39;
    54. }},
    55. {$unwind: { path: &#39;$dept_manager_deps&#39;, preserveNullAndEmptyArrays: true }},
    56. {$group: {
    57. _id: &#39;$_id&#39;,
    58. root: {$first: &#39;$root&#39;},
    59. departments_emp: {$first: &#39;$departments_emp&#39;},
    60. departments_manager: {$push: {
    61. department_number: &#39;$dept_manager.emp_no&#39;,
    62. department_name: &#39;$dept_manager_deps.dept_name&#39;,
    63. from_date: &#39;$dept_manager.from_date&#39;,
    64. to_date: &#39;$dept_manager.to_date&#39;,
    65. position: &#39;$dept_manager.position&#39;
    66. }},
    67. }},
    68. // combine departments to a single array
    69. {$project: {
    70. root: 1,
    71. departments_all: {$concatArrays: [ &quot;$departments_emp&quot;, &quot;$departments_manager&quot; ] }
    72. }},
    73. //final reshape
    74. {$project: {
    75. id: &#39;$root.id&#39;,
    76. birth_date: &#39;$root.birth_date&#39;,
    77. first_name: &#39;$root.first_name&#39;,
    78. last_name: &#39;$root.last_name&#39;,
    79. gender: &#39;$root.gender&#39;,
    80. hire_date: &#39;$root.hire_date&#39;,
    81. salaries: &#39;$root.salaries&#39;,
    82. titles: &#39;$root.titles&#39;,
    83. departments: {$filter: {
    84. input: &quot;$departments_all&quot;,
    85. as: &quot;departments&quot;,
    86. cond: { $ne: [ &quot;$$departments&quot;, {} ] }}}
    87. }},
    88. { $out : &quot;employees&quot; }
    89. ])
  4. delete imported collections from mongo shell

    1. db.tmp_employees.drop();
    2. db.tmp_salaries.drop();
    3. db.tmp_titles.drop();
    4. db.tmp_departments.drop();
    5. db.tmp_dept_emp.drop();
    6. db.tmp_dept_manager.drop();

huangapple
  • 本文由 发表于 2017年4月29日 22:05:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/43696607.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定