英文:
MySQL to JSON inconsistent extraction
问题
我有一个包含6个表和大约200万行数据的MySQL数据库。
我想将所有数据迁移到MongoDB。
我决定通过将SQL表转换为JSON并导入到MongoDB来实现这一目标。
我使用Golang编写了一个程序来提取数据并将其输出为JSON。
以下是程序的主要函数:
func main() {
// 打开数据库连接
var err error
db, err = sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/employees")
checkErr(err)
// 检查是否可达
if err = db.Ping(); err != nil {
log.Fatal("无法连接到数据库:", err)
}
// 填充变量数据
err = populateVars()
checkErr(err)
// 将变量转换为JSON
binaryJSON, err := json.Marshal(collection)
checkErr(err)
// 将JSON写入文件
err = writeStringToFile("/home/user01/Temporary/sql2data.json", string(binaryJSON))
checkErr(err)
}
问题是输出结果不一致。
每次运行程序时,生成的文件大小都不同,并且一些随机字段丢失了。
这可能是什么原因?
看起来不像是程序逻辑的问题,因为所有操作都没有报错,并且大多数字段都被正确填充。
我是否读取信息过快,导致有时会丢失一些内容?
还是我漏掉了其他什么东西?
编辑:
大部分工作都在populateVars()
函数调用中完成。
它有多个代码块,执行给定的SQL查询并根据模式填充结构变量。
以下是其中一个代码块:
rows, err = db.Query("SELECT emp_no, dept_emp.dept_no, dept_name, from_date, to_date FROM dept_emp JOIN departments ON departments.dept_no = dept_emp.dept_no;")
checkErr(err)
i := 0
for rows.Next() {
var id int
var depNumber string
var depName string
var fromDate string
var toDate string
var position = "Employee"
err = rows.Scan(&id, &depNumber, &depName, &fromDate, &toDate,)
// 用于调试:
fmt.Println(id, depNumber, depName, fromDate, toDate, position, i)
if err != nil {
return err
}
for i := range collection {
if collection[i].ID == id {
collection[i].Departments = append(collection[i].Departments, Department{DepartmentNumber: depNumber, DepartmentName: depName, FromDate: fromDate, ToDate: toDate, Position: position})
// 用于调试:
fmt.Println(collection[i].Departments)
}
}
i++
}
这是整个程序的GitHub链接:
https://github.com/dchmie01/mysql_to_json/blob/master/main.go
编辑2:
问题似乎与查询超时有关。
每个查询大约需要10分钟执行,但在大约6分钟后,我收到以下错误,并且程序停止执行查询:
[mysql] 2017/04/29 17:35:16 packets.go:66: unexpected EOF
[mysql] 2017/04/29 17:35:16 packets.go:412: busy buffer
2017/04/29 17:35:16 driver: bad connection
在MySQL日志文件中,显示如下:
2017-04-29T16:28:49.975805Z 102 [Note] Aborted connection 102 to db: 'employees' user: 'root' host: 'localhost' (Got timeout writing communication packets)
到目前为止,我尝试调整MySQL变量以禁用可能存在的任何超时,但没有成功。
我认为问题可能出在Go的mysql
驱动程序上。
英文:
I have a MySQL database with 6 tables and about 2 million rows all together.
I want to migrate all the data into MongoDB.
I decided to do this by converting the SQL tables into JSON and importing it to MongoDB.
I wrote a program in Golang to extract the data and output it as JSON.
This is the main function of the program:
func main() {
// Open a database connection
var err error
db, err = sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/employees")
checkErr(err)
// Check if reachable
if err = db.Ping(); err != nil {
log.Fatal("Database is unreachable:", err)
}
// Populate variables with data
err = populateVars()
checkErr(err)
// Marshal variables into JSON
binaryJSON, err := json.Marshal(collection)
checkErr(err)
// Write JSON to a file
err = writeStringToFile("/home/user01/Temporary/sql2data.json", string(binaryJSON))
checkErr(err)
}
The problem is that the output is inconsistent.
Every time I run the program, the resulting file has a different size and some random fields are missing.
What could be causing this?
It doesn't seem like it's a problem with the logic of the program since everything executes without errors, and most fields are populated just fine.
Could I be reading the information too fast, so that some things get lost occasionally?
Or is there something else that I'm missing?
Edit:
Most of the work happens inside the populateVars()
function call.
It has multiple blocks of code that execute a given SQL query and populate struct variables according to the schema.
This is one such block:
rows, err = db.Query("SELECT emp_no, dept_emp.dept_no, dept_name, from_date, to_date FROM dept_emp JOIN departments ON departments.dept_no = dept_emp.dept_no;")
checkErr(err)
i := 0
for rows.Next() {
var id int
var depNumber string
var depName string
var fromDate string
var toDate string
var position = "Employee"
err = rows.Scan(&id, &depNumber, &depName, &fromDate, &toDate,)
// For debugging purposes:
fmt.Println(id, depNumber, depName, fromDate, toDate, position, i)
if err != nil {
return err
}
for i := range collection {
if collection[i].ID == id {
collection[i].Departments = append(collection[i].Departments, Department{DepartmentNumber: depNumber, DepartmentName: depName, FromDate: fromDate, ToDate: toDate, Position: position})
// For debugging purposes:
fmt.Println(collection[i].Departments)
}
}
i++
}
Here's a GitHub link to the whole program:
https://github.com/dchmie01/mysql_to_json/blob/master/main.go
Edit 2:
It seems like the issue has to do with query timeout.
Each query takes about 10 min to execute but at about 6 minutes in, I get this error, and the program stops executing the query:
[mysql] 2017/04/29 17:35:16 packets.go:66: unexpected EOF
[mysql] 2017/04/29 17:35:16 packets.go:412: busy buffer
2017/04/29 17:35:16 driver: bad connection
And in the MySQL log file it says:
2017-04-29T16:28:49.975805Z 102 [Note] Aborted connection 102 to db: 'employees' user: 'root' host: 'localhost' (Got timeout writing communication packets)
So far I tried playing around with MySQL variables to disable any timeouts that might be present, but no luck.
I think the issue might be with the mysql
driver for Go.
答案1
得分: 1
考虑使用Mysql SELECT INTO OUTFILE和mongoimport --type csv来替代。
该程序的唯一功能是嵌入一对多和多对多的文档,这可以通过聚合框架轻松完成。
以下是一个逐步示例:
-
从MySQL导出CSV文件
SELECT * FROM employees INTO OUTFILE '/tmp/employees.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * FROM salaries INTO OUTFILE '/tmp/salaries.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * FROM titles INTO OUTFILE '/tmp/titles.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * FROM departments INTO OUTFILE '/tmp/departments.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * FROM dept_emp INTO OUTFILE '/tmp/dept_emp.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * FROM dept_manager INTO OUTFILE '/tmp/dept_manager.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
-
将CSV文件导入MongoDB(根据你的模式定义字段规范,以下是employees字段规范的示例)
mongoimport -d <dbname> -c tmp_employees -f 'id.int32(),birth.date(2006-01-02),first_name.string(),last_name.string(),gender.string(),hire_date.date(2006-01-02)' --columnsHaveTypes --type csv --file /tmp/employees.csv --drop mongoimport -d <dbname> -c tmp_salaries -f 'field spec' --columnsHaveTypes --type csv --file /tmp/salaries.csv --drop mongoimport -d <dbname> -c tmp_titles -f 'field spec' --columnsHaveTypes --type csv --file /tmp/titles.csv --drop mongoimport -d <dbname> -c tmp_departments -f 'field spec' --columnsHaveTypes --type csv --file /tmp/departments.csv --drop mongoimport -d <dbname> -c tmp_dept_emp -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_emp.csv --drop mongoimport -d <dbname> -c tmp_dept_manager -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_manager.csv --drop
-
从MongoDB Shell嵌入数据
db.tmp_employees.aggregate([ // 一对多连接 { $lookup: { from: 'tmp_salaries', localField: 'id', foreignField: 'emp_no', as: 'salaries' } }, { $lookup: { from: 'tmp_titles', localField: 'id', foreignField: 'emp_no', as: 'titles' } }, // 多对多连接 { $lookup: { from: 'tmp_dept_emp', localField: 'id', foreignField: 'emp_no', as: 'dept_emp' } }, { $lookup: { from: 'tmp_dept_manager', localField: 'id', foreignField: 'emp_no', as: 'dept_manager' } }, { $unwind: { path: '$dept_emp', preserveNullAndEmptyArrays: true } }, { $lookup: { from: 'tmp_departments', localField: 'dept_emp.dept_no', foreignField: 'dept_no', as: 'dept_emp_deps' } }, { $unwind: { path: '$dept_emp_deps', preserveNullAndEmptyArrays: true } }, { $group: { _id: '$_id', root: { $first: '$$ROOT' }, dept_manager: { $first: '$dept_manager' }, departments_emp: { $push: { department_number: '$dept_emp.emp_no', department_name: '$dept_emp_deps.dept_name', from_date: '$dept_emp.from_date', to_date: '$dept_emp.to_date', position: '$dept_emp.position' } } } }, { $unwind: { path: '$dept_manager', preserveNullAndEmptyArrays: true } }, { $lookup: { from: 'tmp_departments', localField: 'dept_manager.dept_no', foreignField: 'dept_no', as: 'dept_manager_deps' } }, { $unwind: { path: '$dept_manager_deps', preserveNullAndEmptyArrays: true } }, { $group: { _id: '$_id', root: { $first: '$root' }, departments_emp: { $first: '$departments_emp' }, departments_manager: { $push: { department_number: '$dept_manager.emp_no', department_name: '$dept_manager_deps.dept_name', from_date: '$dept_manager.from_date', to_date: '$dept_manager.to_date', position: '$dept_manager.position' } } } }, // 将部门组合为单个数组 { $project: { root: 1, departments_all: { $concatArrays: ['$departments_emp', '$departments_manager'] } } }, // 最终重塑 { $project: { id: '$root.id', birth_date: '$root.birth_date', first_name: '$root.first_name', last_name: '$root.last_name', gender: '$root.gender', hire_date: '$root.hire_date', salaries: '$root.salaries', titles: '$root.titles', departments: { $filter: { input: '$departments_all', as: 'departments', cond: { $ne: ['$$departments', {}] } } } } }, { $out: 'employees' } ])
-
从MongoDB Shell删除导入的集合
db.tmp_employees.drop(); db.tmp_salaries.drop(); db.tmp_titles.drop(); db.tmp_departments.drop(); db.tmp_dept_emp.drop(); db.tmp_dept_manager.drop();
英文:
Consider to use Mysql SELECT INTO OUTFILE and mongoiport --type csv instead.
The only thing that the program does is embedding 1-to-many and many-to-many documents, which can be easily done with aggregation framework.
A step-by step example:
-
export csv from mysql
SELECT * from employees INTO OUTFILE '/tmp/employees.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * from salaries INTO OUTFILE '/tmp/salaries.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * from titles INTO OUTFILE '/tmp/titles.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * from departments INTO OUTFILE '/tmp.departments.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * from dept_emp INTO OUTFILE '/tmp/dept_emp.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"'; SELECT * from dept_manager INTO OUTFILE '/tmp/dept_manager.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
-
import csv into mongo (define 'field spec' according to your schema, see example for employees field spec)
mongoimport -d <dbname> -c tmp_employees -f 'id.int32(),birth.date(2006-01-02),first_name.string(),last_name.string(),gender.string(),hire_date.date(2006-01-02)' --columnsHaveTypes --type csv --file /tmp/employees.csv --drop mongoimport -d <dbname> -c tmp_salaries -f 'field spec' --columnsHaveTypes --type csv --file /tmp/salaries.csv --drop mongoimport -d <dbname> -c tmp_titles -f 'field spec' --columnsHaveTypes --type csv --file /tmp/titles.csv --drop mongoimport -d <dbname> -c tmp_departments -f 'field spec' --columnsHaveTypes --type csv --file /tmp/departments.csv --drop mongoimport -d <dbname> -c tmp_dept_emp -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_emp.csv --drop mongoimport -d <dbname> -c tmp_dept_manager -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_manager.csv --drop
-
embed data from mongo shell
db.tmp_employees.aggregate([ // 1-to-many joins {$lookup: { from: 'tmp_salaries', localField: 'id', foreignField: 'emp_no', as: 'salaries' }}, {$lookup: { from: 'tmp_titles', localField: 'id', foreignField: 'emp_no', as: 'titles' }}, // many-to-many joins {$lookup: { from: 'tmp_dept_emp', localField: 'id', foreignField: 'emp_no', as: 'dept_emp' }}, {$lookup: { from: 'tmp_dept_manager', localField: 'id', foreignField: 'emp_no', as: 'dept_manager' }}, {$unwind: { path: '$dept_emp', preserveNullAndEmptyArrays: true }}, {$lookup: { from: 'tmp_departments', localField: 'dept_emp.dept_no', foreignField: 'dept_no', as: 'dept_emp_deps' }}, {$unwind: { path: '$dept_emp_deps', preserveNullAndEmptyArrays: true }}, {$group: { _id: '$_id', root: {$first: '$$ROOT'}, dept_manager: {$first: '$dept_manager'}, departments_emp: {$push: { department_number: '$dept_emp.emp_no', department_name: '$dept_emp_deps.dept_name', from_date: '$dept_emp.from_date', to_date: '$dept_emp.to_date', position: '$dept_emp.position' }}, }}, {$unwind: { path: '$dept_manager', preserveNullAndEmptyArrays: true }}, {$lookup: { from: 'tmp_departments', localField: 'dept_manager.dept_no', foreignField: 'dept_no', as: 'dept_manager_deps' }}, {$unwind: { path: '$dept_manager_deps', preserveNullAndEmptyArrays: true }}, {$group: { _id: '$_id', root: {$first: '$root'}, departments_emp: {$first: '$departments_emp'}, departments_manager: {$push: { department_number: '$dept_manager.emp_no', department_name: '$dept_manager_deps.dept_name', from_date: '$dept_manager.from_date', to_date: '$dept_manager.to_date', position: '$dept_manager.position' }}, }}, // combine departments to a single array {$project: { root: 1, departments_all: {$concatArrays: [ "$departments_emp", "$departments_manager" ] } }}, //final reshape {$project: { id: '$root.id', birth_date: '$root.birth_date', first_name: '$root.first_name', last_name: '$root.last_name', gender: '$root.gender', hire_date: '$root.hire_date', salaries: '$root.salaries', titles: '$root.titles', departments: {$filter: { input: "$departments_all", as: "departments", cond: { $ne: [ "$$departments", {} ] }}} }}, { $out : "employees" } ])
-
delete imported collections from mongo shell
db.tmp_employees.drop(); db.tmp_salaries.drop(); db.tmp_titles.drop(); db.tmp_departments.drop(); db.tmp_dept_emp.drop(); db.tmp_dept_manager.drop();
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论