MySQL到JSON的提取不一致

huangapple go评论162阅读模式
英文:

MySQL to JSON inconsistent extraction

问题

我有一个包含6个表和大约200万行数据的MySQL数据库。

我想将所有数据迁移到MongoDB。

我决定通过将SQL表转换为JSON并导入到MongoDB来实现这一目标。

我使用Golang编写了一个程序来提取数据并将其输出为JSON。

以下是程序的主要函数:

func main() {
    // 打开数据库连接
    var err error
    db, err = sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/employees")
    checkErr(err)
    // 检查是否可达
    if err = db.Ping(); err != nil {
        log.Fatal("无法连接到数据库:", err)
    }
    // 填充变量数据
    err = populateVars()
    checkErr(err)
    // 将变量转换为JSON
    binaryJSON, err := json.Marshal(collection)
    checkErr(err)
    // 将JSON写入文件
    err = writeStringToFile("/home/user01/Temporary/sql2data.json", string(binaryJSON))
    checkErr(err)
}

问题是输出结果不一致。

每次运行程序时,生成的文件大小都不同,并且一些随机字段丢失了。

这可能是什么原因?

看起来不像是程序逻辑的问题,因为所有操作都没有报错,并且大多数字段都被正确填充。

我是否读取信息过快,导致有时会丢失一些内容?

还是我漏掉了其他什么东西?

编辑:

大部分工作都在populateVars()函数调用中完成。

它有多个代码块,执行给定的SQL查询并根据模式填充结构变量。

以下是其中一个代码块:

rows, err = db.Query("SELECT emp_no, dept_emp.dept_no, dept_name, from_date, to_date FROM dept_emp JOIN departments ON departments.dept_no = dept_emp.dept_no;")
checkErr(err)
i := 0
for rows.Next() {
    var id int
    var depNumber string
    var depName string
    var fromDate string
    var toDate string
    var position = "Employee"
    err = rows.Scan(&id, &depNumber, &depName, &fromDate, &toDate,)
    // 用于调试:
    fmt.Println(id, depNumber, depName, fromDate, toDate, position, i)
    if err != nil {
        return err
    }
    for i := range collection {
        if collection[i].ID == id {
            collection[i].Departments = append(collection[i].Departments, Department{DepartmentNumber: depNumber, DepartmentName: depName, FromDate: fromDate, ToDate: toDate, Position: position})
            // 用于调试:
            fmt.Println(collection[i].Departments)
        }
    }
    i++
}

这是整个程序的GitHub链接:
https://github.com/dchmie01/mysql_to_json/blob/master/main.go

编辑2:

问题似乎与查询超时有关。

每个查询大约需要10分钟执行,但在大约6分钟后,我收到以下错误,并且程序停止执行查询:

[mysql] 2017/04/29 17:35:16 packets.go:66: unexpected EOF
[mysql] 2017/04/29 17:35:16 packets.go:412: busy buffer
2017/04/29 17:35:16 driver: bad connection

在MySQL日志文件中,显示如下:

2017-04-29T16:28:49.975805Z 102 [Note] Aborted connection 102 to db: 'employees' user: 'root' host: 'localhost' (Got timeout writing communication packets)

到目前为止,我尝试调整MySQL变量以禁用可能存在的任何超时,但没有成功。

我认为问题可能出在Go的mysql驱动程序上。

英文:

I have a MySQL database with 6 tables and about 2 million rows all together.

I want to migrate all the data into MongoDB.

I decided to do this by converting the SQL tables into JSON and importing it to MongoDB.

I wrote a program in Golang to extract the data and output it as JSON.

This is the main function of the program:

func main() {
	// Open a database connection
	var err error
	db, err = sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/employees")
	checkErr(err)
	// Check if reachable
	if err = db.Ping(); err != nil {
		log.Fatal("Database is unreachable:", err)
	}
	// Populate variables with data
	err = populateVars()
	checkErr(err)
	// Marshal variables into JSON
	binaryJSON, err := json.Marshal(collection)
	checkErr(err)
	// Write JSON to a file
	err = writeStringToFile("/home/user01/Temporary/sql2data.json", string(binaryJSON))
    checkErr(err)
}

The problem is that the output is inconsistent.

Every time I run the program, the resulting file has a different size and some random fields are missing.

What could be causing this?

It doesn't seem like it's a problem with the logic of the program since everything executes without errors, and most fields are populated just fine.

Could I be reading the information too fast, so that some things get lost occasionally?

Or is there something else that I'm missing?

Edit:

Most of the work happens inside the populateVars() function call.

It has multiple blocks of code that execute a given SQL query and populate struct variables according to the schema.

This is one such block:

rows, err = db.Query("SELECT emp_no, dept_emp.dept_no, dept_name, from_date, to_date FROM dept_emp JOIN departments ON departments.dept_no = dept_emp.dept_no;")
checkErr(err)
i := 0
for rows.Next() {
	var id int
	var depNumber string
	var depName string
	var fromDate string
	var toDate string
	var position = "Employee"
	err = rows.Scan(&id, &depNumber, &depName, &fromDate, &toDate,)
    // For debugging purposes:
	fmt.Println(id, depNumber, depName, fromDate, toDate, position, i)
	if err != nil {
		return err
	}
	for i := range collection {
		if collection[i].ID == id {
			collection[i].Departments = append(collection[i].Departments, Department{DepartmentNumber: depNumber, DepartmentName: depName, FromDate: fromDate, ToDate: toDate, Position: position})
            // For debugging purposes:
			fmt.Println(collection[i].Departments)
		}
	}
	i++
}

Here's a GitHub link to the whole program:
https://github.com/dchmie01/mysql_to_json/blob/master/main.go

Edit 2:

It seems like the issue has to do with query timeout.

Each query takes about 10 min to execute but at about 6 minutes in, I get this error, and the program stops executing the query:

[mysql] 2017/04/29 17:35:16 packets.go:66: unexpected EOF
[mysql] 2017/04/29 17:35:16 packets.go:412: busy buffer
2017/04/29 17:35:16 driver: bad connection

And in the MySQL log file it says:

2017-04-29T16:28:49.975805Z 102 [Note] Aborted connection 102 to db: 'employees' user: 'root' host: 'localhost' (Got timeout writing communication packets)

So far I tried playing around with MySQL variables to disable any timeouts that might be present, but no luck.

I think the issue might be with the mysql driver for Go.

答案1

得分: 1

考虑使用Mysql SELECT INTO OUTFILEmongoimport --type csv来替代。

该程序的唯一功能是嵌入一对多和多对多的文档,这可以通过聚合框架轻松完成。

以下是一个逐步示例:

  1. 从MySQL导出CSV文件

    SELECT * FROM employees INTO OUTFILE '/tmp/employees.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    SELECT * FROM salaries INTO OUTFILE '/tmp/salaries.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    SELECT * FROM titles INTO OUTFILE '/tmp/titles.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    SELECT * FROM departments INTO OUTFILE '/tmp/departments.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    SELECT * FROM dept_emp INTO OUTFILE '/tmp/dept_emp.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    SELECT * FROM dept_manager INTO OUTFILE '/tmp/dept_manager.csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"';
    
  2. 将CSV文件导入MongoDB(根据你的模式定义字段规范,以下是employees字段规范的示例)

    mongoimport -d <dbname> -c tmp_employees -f 'id.int32(),birth.date(2006-01-02),first_name.string(),last_name.string(),gender.string(),hire_date.date(2006-01-02)' --columnsHaveTypes --type csv --file /tmp/employees.csv --drop
    mongoimport -d <dbname> -c tmp_salaries -f 'field spec' --columnsHaveTypes --type csv --file /tmp/salaries.csv --drop
    mongoimport -d <dbname> -c tmp_titles -f 'field spec' --columnsHaveTypes --type csv --file /tmp/titles.csv --drop
    mongoimport -d <dbname> -c tmp_departments -f 'field spec' --columnsHaveTypes --type csv --file /tmp/departments.csv --drop
    mongoimport -d <dbname> -c tmp_dept_emp -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_emp.csv --drop
    mongoimport -d <dbname> -c tmp_dept_manager -f 'field spec' --columnsHaveTypes --type csv --file /tmp/dept_manager.csv --drop
    
  3. 从MongoDB Shell嵌入数据

    db.tmp_employees.aggregate([
        // 一对多连接
        {
            $lookup: {
                from: 'tmp_salaries',
                localField: 'id',
                foreignField: 'emp_no',
                as: 'salaries'
            }
        },
        {
            $lookup: {
                from: 'tmp_titles',
                localField: 'id',
                foreignField: 'emp_no',
                as: 'titles'
            }
        },
        // 多对多连接
        {
            $lookup: {
                from: 'tmp_dept_emp',
                localField: 'id',
                foreignField: 'emp_no',
                as: 'dept_emp'
            }
        },
        {
            $lookup: {
                from: 'tmp_dept_manager',
                localField: 'id',
                foreignField: 'emp_no',
                as: 'dept_manager'
            }
        },
        {
            $unwind: {
                path: '$dept_emp',
                preserveNullAndEmptyArrays: true
            }
        },
        {
            $lookup: {
                from: 'tmp_departments',
                localField: 'dept_emp.dept_no',
                foreignField: 'dept_no',
                as: 'dept_emp_deps'
            }
        },
        {
            $unwind: {
                path: '$dept_emp_deps',
                preserveNullAndEmptyArrays: true
            }
        },
        {
            $group: {
                _id: '$_id',
                root: {
                    $first: '$$ROOT'
                },
                dept_manager: {
                    $first: '$dept_manager'
                },
                departments_emp: {
                    $push: {
                        department_number: '$dept_emp.emp_no',
                        department_name: '$dept_emp_deps.dept_name',
                        from_date: '$dept_emp.from_date',
                        to_date: '$dept_emp.to_date',
                        position: '$dept_emp.position'
                    }
                }
            }
        },
        {
            $unwind: {
                path: '$dept_manager',
                preserveNullAndEmptyArrays: true
            }
        },
        {
            $lookup: {
                from: 'tmp_departments',
                localField: 'dept_manager.dept_no',
                foreignField: 'dept_no',
                as: 'dept_manager_deps'
            }
        },
        {
            $unwind: {
                path: '$dept_manager_deps',
                preserveNullAndEmptyArrays: true
            }
        },
        {
            $group: {
                _id: '$_id',
                root: {
                    $first: '$root'
                },
                departments_emp: {
                    $first: '$departments_emp'
                },
                departments_manager: {
                    $push: {
                        department_number: '$dept_manager.emp_no',
                        department_name: '$dept_manager_deps.dept_name',
                        from_date: '$dept_manager.from_date',
                        to_date: '$dept_manager.to_date',
                        position: '$dept_manager.position'
                    }
                }
            }
        },
        // 将部门组合为单个数组
        {
            $project: {
                root: 1,
                departments_all: {
                    $concatArrays: ['$departments_emp', '$departments_manager']
                }
            }
        },
        // 最终重塑
        {
            $project: {
                id: '$root.id',
                birth_date: '$root.birth_date',
                first_name: '$root.first_name',
                last_name: '$root.last_name',
                gender: '$root.gender',
                hire_date: '$root.hire_date',
                salaries: '$root.salaries',
                titles: '$root.titles',
                departments: {
                    $filter: {
                        input: '$departments_all',
                        as: 'departments',
                        cond: {
                            $ne: ['$$departments', {}]
                        }
                    }
                }
            }
        },
        {
            $out: 'employees'
        }
    ])
    
  4. 从MongoDB Shell删除导入的集合

    db.tmp_employees.drop();
    db.tmp_salaries.drop();
    db.tmp_titles.drop();
    db.tmp_departments.drop();
    db.tmp_dept_emp.drop();
    db.tmp_dept_manager.drop();
    
英文:

Consider to use Mysql SELECT INTO OUTFILE and mongoiport --type csv instead.

The only thing that the program does is embedding 1-to-many and many-to-many documents, which can be easily done with aggregation framework.

A step-by step example:

  1. export csv from mysql

    SELECT * from employees INTO OUTFILE &#39;/tmp/employees.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    SELECT * from salaries INTO OUTFILE &#39;/tmp/salaries.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    SELECT * from titles INTO OUTFILE &#39;/tmp/titles.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    SELECT * from departments INTO OUTFILE &#39;/tmp.departments.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    SELECT * from dept_emp INTO OUTFILE &#39;/tmp/dept_emp.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    SELECT * from dept_manager INTO OUTFILE &#39;/tmp/dept_manager.csv&#39; FIELDS TERMINATED BY &#39;,&#39; ENCLOSED BY &#39;&quot;&#39;;
    
  2. import csv into mongo (define 'field spec' according to your schema, see example for employees field spec)

    mongoimport -d &lt;dbname&gt; -c tmp_employees -f &#39;id.int32(),birth.date(2006-01-02),first_name.string(),last_name.string(),gender.string(),hire_date.date(2006-01-02)&#39; --columnsHaveTypes --type csv --file /tmp/employees.csv --drop 
    mongoimport -d &lt;dbname&gt; -c tmp_salaries -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/salaries.csv --drop 
    mongoimport -d &lt;dbname&gt; -c tmp_titles -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/titles.csv --drop 
    mongoimport -d &lt;dbname&gt; -c tmp_departments -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/departments.csv --drop 
    mongoimport -d &lt;dbname&gt; -c tmp_dept_emp -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/dept_emp.csv --drop 
    mongoimport -d &lt;dbname&gt; -c tmp_dept_manager -f &#39;field spec&#39; --columnsHaveTypes --type csv --file /tmp/dept_manager.csv --drop 
    
  3. embed data from mongo shell

    db.tmp_employees.aggregate([
    // 1-to-many joins
    {$lookup: {
    from: &#39;tmp_salaries&#39;,
    localField: &#39;id&#39;,
    foreignField: &#39;emp_no&#39;,
    as: &#39;salaries&#39;
    }},
    {$lookup: {
    from: &#39;tmp_titles&#39;,
    localField: &#39;id&#39;,
    foreignField: &#39;emp_no&#39;,
    as: &#39;titles&#39;
    }},
    // many-to-many joins
    {$lookup: {
    from: &#39;tmp_dept_emp&#39;,
    localField: &#39;id&#39;,
    foreignField: &#39;emp_no&#39;,
    as: &#39;dept_emp&#39;
    }},
    {$lookup: {
    from: &#39;tmp_dept_manager&#39;,
    localField: &#39;id&#39;,
    foreignField: &#39;emp_no&#39;,
    as: &#39;dept_manager&#39;
    }},
    {$unwind: { path: &#39;$dept_emp&#39;, preserveNullAndEmptyArrays: true }},
    {$lookup: {
    from: &#39;tmp_departments&#39;,
    localField: &#39;dept_emp.dept_no&#39;,
    foreignField: &#39;dept_no&#39;,
    as: &#39;dept_emp_deps&#39;
    }},    
    {$unwind: { path: &#39;$dept_emp_deps&#39;, preserveNullAndEmptyArrays: true }},
    {$group: {
    _id: &#39;$_id&#39;,
    root: {$first: &#39;$$ROOT&#39;},
    dept_manager: {$first: &#39;$dept_manager&#39;},
    departments_emp: {$push: {
    department_number: &#39;$dept_emp.emp_no&#39;,
    department_name: &#39;$dept_emp_deps.dept_name&#39;,
    from_date: &#39;$dept_emp.from_date&#39;,
    to_date: &#39;$dept_emp.to_date&#39;,
    position: &#39;$dept_emp.position&#39;
    }},
    }},
    {$unwind: { path: &#39;$dept_manager&#39;, preserveNullAndEmptyArrays: true }},
    {$lookup: {
    from: &#39;tmp_departments&#39;,
    localField: &#39;dept_manager.dept_no&#39;,
    foreignField: &#39;dept_no&#39;,
    as: &#39;dept_manager_deps&#39;
    }},    
    {$unwind: { path: &#39;$dept_manager_deps&#39;, preserveNullAndEmptyArrays: true }},
    {$group: {
    _id: &#39;$_id&#39;,
    root: {$first: &#39;$root&#39;},
    departments_emp: {$first: &#39;$departments_emp&#39;},
    departments_manager: {$push: {
    department_number: &#39;$dept_manager.emp_no&#39;,
    department_name: &#39;$dept_manager_deps.dept_name&#39;,
    from_date: &#39;$dept_manager.from_date&#39;,
    to_date: &#39;$dept_manager.to_date&#39;,
    position: &#39;$dept_manager.position&#39;
    }},
    }},
    // combine departments to a single array
    {$project: {
    root: 1,
    departments_all: {$concatArrays: [ &quot;$departments_emp&quot;, &quot;$departments_manager&quot; ] }
    }},
    //final reshape
    {$project: {
    id: &#39;$root.id&#39;,
    birth_date: &#39;$root.birth_date&#39;,
    first_name: &#39;$root.first_name&#39;,
    last_name: &#39;$root.last_name&#39;,
    gender: &#39;$root.gender&#39;,
    hire_date: &#39;$root.hire_date&#39;,
    salaries: &#39;$root.salaries&#39;,
    titles: &#39;$root.titles&#39;,
    departments: {$filter: {
    input: &quot;$departments_all&quot;,
    as: &quot;departments&quot;,
    cond: { $ne: [ &quot;$$departments&quot;, {} ] }}}
    }},
    { $out : &quot;employees&quot; }
    ])
    
  4. delete imported collections from mongo shell

    db.tmp_employees.drop();
    db.tmp_salaries.drop();
    db.tmp_titles.drop();
    db.tmp_departments.drop();
    db.tmp_dept_emp.drop();
    db.tmp_dept_manager.drop();
    

huangapple
  • 本文由 发表于 2017年4月29日 22:05:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/43696607.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定