我对Go真的很陌生,su只在这里忍受我。我正在尝试使用以下代码编写将mysql数据加载到Redis集群的代码:redis-go- cluster, load2redis
这是代码。它有点长,请在这里忍受。
package main import ( "bytes" "database/sql" "flag" // "github.com/garyburd/redigo/redis" _ "github.com/go-sql-driver/mysql" //"gopkg.in/redis.v4" "github.com/chasex/redis-go-cluster" "log" "runtime" // "strings" "sync" "time" ) var client *redis.Cluster type Task interface { Execute() } type Pool struct { mu sync.Mutex size int tasks chan Task kill chan struct{} wg sync.WaitGroup } func NewPool(size int) *Pool { pool := &Pool{ tasks: make(chan Task, 128), kill: make(chan struct{}), } pool.Resize(size) return pool } func (p *Pool) worker() { defer p.wg.Done() for { select { case task, ok := <-p.tasks: if !ok { return } task.Execute() case <-p.kill: return } } } func (p *Pool) Resize(n int) { p.mu.Lock() defer p.mu.Unlock() for p.size < n { p.size++ p.wg.Add(1) go p.worker() } for p.size > n { p.size-- p.kill <- struct{}{} } } func (p *Pool) Close() { close(p.tasks) } func (p *Pool) Wait() { p.wg.Wait() } func (p *Pool) Exec(task Task) { p.tasks <- task } type RedisTask struct { Index int Command string Key string Value string MapData map[string]string } func (e RedisTask) Execute() { log.Println("executing:", e.Key, ",", e.Index) if e.Command == "SET" { _,err := redis.String(client.Do("SET", e.Key, e.Value)) checkErr(err, "set error:") } else if e.Command == "SADD" { _,err := redis.Strings(client.Do("SADD", e.Key, e.Value)) checkErr(err, "sadd error:") } else if e.Command == "HMSET" { _,err := redis.StringMap(client.Do("HMSET", e.Key, e.MapData)) checkErr(err, "hmset error:") } // TODO: clean data } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) startTime := time.Now().UnixNano() / int64(time.Millisecond) host := flag.String("s", "localhost:3306", "mysql server host and port ,eg localhost:3306") username := flag.String("u", "test", "username to login mysql") password := flag.String("p", "test", "password for mysql") database := flag.String("d", "test", "database you want to execute query") query := flag.String("q", "select 1;", "your query sql") ds := flag.String("ds", "key", "redis structure") PK := flag.String("pk", "Rkey", "the redis Key in the fields of mysql query result") //redisHost := flag.String("rs", "localhost:6379", "redis host and port ,eg localhost:6379") //redisPassword := flag.String("rp", "test", "redis password") poolSize := flag.Int("size", 10000, "redis pool size") flag.Parse() var buf bytes.Buffer = bytes.Buffer{} buf.WriteString(*username) buf.WriteString(":") buf.WriteString(*password) buf.WriteString("@tcp(") buf.WriteString(*host) buf.WriteString(")/") buf.WriteString(*database) db, err := sql.Open("mysql", buf.String()) checkErr(err, "connect to mysql error !") defer db.Close() poolWorker := NewPool(*poolSize) // Execute the query rows, err := db.Query(*query) checkErr(err, "execute sql error!") // pool = newPool(*redisHost, *redisPassword, *poolSize) //client = redis.NewClient(&redis.Options{ // Addr: *redisHost, // Password: *redisPassword, // no password set // DB: 0, // use default DB //}) client,_ = redis.NewCluster(&redis.Options{ StartNodes: []string{"10.x.x.x:6000", "10.x.x.x:6001", "10.x.x.x:6002"}, ConnTimeout: 50 * time.Millisecond, ReadTimeout: 50 * time.Millisecond, WriteTimeout: 50 * time.Millisecond, KeepAlive: 16, AliveTime: 60 * time.Second, }) //checkErr(err, "client error:") //pong, err := client.Ping().Result() //checkErr(err, "redis client error:") //log.Println(pong) columns, err := rows.Columns() checkErr(err, "get columns error!") length := len(columns) values := make([]sql.RawBytes, length) scanArgs := make([]interface{}, len(values)) for i := range values { scanArgs[i] = &values[i] } count := 0 for rows.Next() { count += 1 err = rows.Scan(scanArgs...) checkErr(err, "scan error") var value string var key string var task RedisTask if *ds == "key" { key = getStringData(values[0]) value = getStringData(values[1]) if value != "" { task = RedisTask{ Index: count, Command: "SET", Key: key, Value: value, } } } else if *ds == "set" { key = getStringData(values[0]) value = getStringData(values[1]) if value != "" { task = RedisTask{ Index: count, Command: "SADD", Key: key, Value: value, } } } else if *ds == "hash" { key = getStringData(values[0]) // args := redis.Args{}.Add(key) m := make(map[string]string) for i, col := range values { if col != nil && columns[i] != *PK { value = getStringData(col) m[columns[i]] = value } } task = RedisTask{ Index: count, Command: "HMSET", Key: key, MapData: m, } } poolWorker.Exec(task) } if err = rows.Err(); err != nil { panic(err.Error()) // proper error handling instead of panic in your app } poolWorker.Close() poolWorker.Wait() EndTime := time.Now().UnixNano() / int64(time.Millisecond) log.Println("======================================== executing time:", EndTime-startTime, " ms, total:", count) } func getStringData(data sql.RawBytes) string { if data == nil { return "" } value := string(data) return clearBad(value) } func clearBad(str string) string { // str = strings.Trim(str, "`") // str = strings.Trim(str, "ï½€") // str = strings.Trim(str, "-") // str = strings.Trim(str, ".") // str = strings.Trim(str, " ") // str = strings.Trim(str, ";") // str = strings.Trim(str, ",") // str = strings.Trim(str, ":") // str = strings.Trim(str, ";") // str = strings.Trim(str, "'") // str = strings.Trim(str, "!") return str } func checkErr(err error, msg string) { if err != nil { log.Fatalln(msg, err) } }
当我执行它时,出现以下异常:
./rak -u user -p user -s 10.X.X.X:8080 -d test -q "SELECT CONCAT( 'student:', c.sid ) Rkey, c.sname SNAME, c.saddress SADDRESS, c.sage SAGE FROM STUDENT c WHERE c.sid > 0;" -ds hash -size 1200 2017/07/21 10:29:09 rak.go:93: executing: student:2 , 2 2017/07/21 10:29:09 rak.go:93: executing: student:1 , 1 2017/07/21 10:29:09 rak.go:93: executing: student:3 , 3 2017/07/21 10:29:09 rak.go:268: hmset error: Do: unknown type map[string]string $
有人可以向我解释我在做什么错吗?我将不胜感激。
如前所述,Do不适用于地图。这是您可以解决它的一种方法。
Do
} else if e.Command == "HMSET" { // Build up a string slice to hold the key value pairs args := make([]string, 0, len(e.MapData) * 2) for k, v := range e.MapData { args = append(args, k, v) } _,err := redis.StringMap(client.Do("HMSET", e.Key, args...)) checkErr(err, "hmset error:") }
Do方法映射到Redis命令集,并且期望参数的方式相同。例如。
127.0.0.1:6379> HMSET myKey foo bar baz boff OK 127.0.0.1:6379> HGETALL myKey 1) "foo" 2) "bar" 3) "baz" 4) "boff" 127.0.0.1:6379>
在代码中使用redis客户端进行相同的地图设置操作将是
client.Do("HMSET", "myKey", "foo", "bar", "baz", "boff")
当映射的键和值的参数是动态的时,最直接的方法是
client.Do("HMSET", "myKey", []string{"foo", "bar", "baz", "boff"}...)
这正是上面的第一个代码块所做的。