想想咱们遇到以下问题一般怎么解决?
- 新建了一个Pod, 想把另外一个Pod中的文件拷贝到新Pod中进行分析, 怎么实现呢?
- 如何在项目中, 如何像kubectl cp拷贝文件一样, 实现Pod间文件拷贝呢?
- 新Pod与实例Pod共享pvc? 或者封装一个带认证上下文的kubectl执行命令行?
本文通过K8S的exec websocket接口结合tar将文件树压缩为数据流以及解压还原, 实现两个Pod间拷贝文件的功能.
关于exec接口请参考:https://www.cnblogs.com/a00ium/p/10905279.html
请看图流程说明
- 首先初始化信号通道, 用于协程间的信号通知, 收到信号的协程执行暂停/退出循环/关闭通道等操作
- 初始化数据通道srcStdOutCh, 类型为字节数组[]byte, 用于将源Pod的标准输出放入通道, 发送给目的Pod标准输入的数据就是从该数据通道中读取
- 拼接exec接口的访问地址(集群连接,token), tar压缩命令, 标准输入/输出,tty, pod名,容器名等参数. tar czf - /var/log/xxx.log 表示将该文件树结构压缩为数据流
- 调用websocket的Dialer方法与源Pod容器建立websocket连接, 并开启协程将标准输出写入数据通道srcStdOutCh
- 参考源pod exec接口, 拼接目的Pod exec访问连接, tar xzf - -C /tmp表示从标准输入读取数据流, 并解压成文件树结构(注意:解压后包含文件目录树结构)
- 与目的Pod建立wss连接, 开启协程从数据通道srcStdOutCh中读取源Pod标准输出, 并写入目的Pod的标准输入, 如果从数据通道读取超时,则表示数据已经传输完毕, 此时停止向目的容器输入数据, 并发送通知信号, 通知主协程可以退出,关闭源Pod的wss连接
- wesocket连上源Pod时, 标准输出中会输出空数据, tar命令输出等干扰数据, 所以接收数据的时候需要传入一个过滤器回调函数, 用于数据过滤
- 向目的容器发送数据时, 需要将源容器收到的第一个字节删除, 一般为1, 表示标准输出标识, 发送给目的容器是不需要该字节的
- 发送数据时, 需要设置第一个字节为0, 表示发送到标准输入
cp.go
package cpFilePod2Podimport ( "crypto/tls" "errors" "fmt" "log" "net/url" "regexp" "strings" "sync" "time" "github.com/gorilla/websocket")// 定义过滤器回调函数type filterCallback func(input string) bool// 带有互斥锁的Websocket连接对象type WsConn struct { Conn *websocket.Conn mu sync.Mutex}// 发送字符串, 自动添加换行符func (self *WsConn) Send(sender string, str string) { self.mu.Lock() defer self.mu.Unlock() // 利用k8s exec websocket接口发送数据时, 第一个字节需要设置为0, 表示将数据发送到标准输入 data:= []byte{0} data = append(data, []byte(str "n")...) err := self.Conn.WriteMessage(websocket.BinaryMessage, data) //发送二进制数据类型 if err != nil { log.Printf("发送错误, %s", err.Error()) } log.Printf("%s, 数据:%s, 字节:% v", sender, str, []byte(str "n"))}//发送字符串, 不添加换行符, 内部做字节过滤,等操作func (self *WsConn) SendWithFilter(sender string, str string) { self.mu.Lock() defer self.mu.Unlock() // log.Printf("向目的容器发送数据:%s", str) str = strings.ReplaceAll(str, "rn", "n") // /r=13, /n=10, windows换行符转Linux换行符 //去掉第一个字节(标准输出1, byte:[0 1 ...]), 因为从源容器输出的字节中, 第一位标识了标准输出1, 给目的容器发送字节时, 需要去除该标志 //当WebSocket建立连接后,发送数据时需要在字节Buffer第一个字节设置为stdin(buf[0] = 0),而接受数据时, 需要判断第一个字节, stdout(buf[0] = 1)或stderr(buf[0] = 2) strByte := append([]byte(str)[:0], []byte(str)[1:]...) data:= []byte{0} data = append(data, strByte...) err := self.Conn.WriteMessage(websocket.BinaryMessage, data) log.Printf("向目的容器标准输入发送数据:n%s, 字节数:%d, 字节:% v", string(data), len(data), data) if err != nil { log.Printf("发送错误, %s", err.Error()) }}//从连接中获取数据流, 并写入字节数组通道中, 内部执行过滤器(回调函数)func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error { self.mu.Lock() defer self.mu.Unlock() msgType, msgByte, err := self.Conn.ReadMessage() //阻塞读取, 类型为2表示二进制数据, 1表示文本, -1表示连接已关闭:websocket: Close 1000 (normal) log.Printf("%s, 读取到数据:%s, 类型:%d, 字节数:%d, 字节:% v", receiver, string(msgByte), msgType, len(msgByte), msgByte) if err != nil { log.Printf("%s, 读取出错, %s", receiver, err.Error()) return err } if filter(string(msgByte)) && len(msgByte) > 1 { ch <- msgByte } else { log.Printf("%s, 数据不满足, 直接丢弃数据, 字符:%s, 字节数:%d, 字节:%v", receiver, string(msgByte), len(msgByte), msgByte) } return nil}func NewWsConn(host string, path string, params map[string]string, headers map[string][]string) (*websocket.Conn, error) { paramArray := []string{} for k, v := range params { paramArray = append(paramArray, fmt.Sprintf("%s=%s", k, v)) } u := url.URL{Scheme: "wss", Host: host, Path: path, RawQuery: strings.Join(paramArray, "&")} log.Printf("API:%s", u.String()) dialer := websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true}} conn, _, err := dialer.Dial(u.String(), headers) if err != nil { return nil, errors.New(fmt.Sprintf("连接错误:%s", err.Error())) } return conn, nil}//核心: tar -cf - 将具有文件夹结构的数据转换成数据流, 通过 tar -xf - 将数据流转换成 linux 文件系统func CpPod2Pod() { //通知主函数可以退出的信号通道 signalExit := make(chan bool, 1) defer close(signalExit) //下发不要给目的容器发送数据的信号 signalStopDstSend := make(chan bool, 1) defer close(signalStopDstSend) //下发不要从源容器读取数据的信号 signalStopSrcRead := make(chan bool, 1) defer close(signalStopSrcRead) //下发不要从目的容器读取数据的信号 signalStopDstRead := make(chan bool, 1) defer close(signalStopDstRead) //下发不要打印目的容器的输出数据 signalStopPrintDstStdout := make(chan bool, 1) defer close(signalStopPrintDstStdout) //连接pod host := "172.16.xxx.xxx:6443" token := "xxx" headers := map[string][]string{"authorization": {fmt.Sprintf("Bearer %s", token)}} pathSrc := "/api/v1/namespaces/xxx/pods/xxx/exec" commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - sourceFile paraSrc := map[string]string{"stdout": "1", "stdin": "0", "stderr": "1", "tty": "0", "container": "xxx", "command": commandSrc} srcConn, err := NewWsConn(host, pathSrc, paraSrc, headers) if err != nil { log.Printf("源Pod连接出错, %s", err.Error()) } pathDst := "/api/v1/namespaces/xxx/pods/xxx/exec" commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp // paraDst := map[string]string{"stdout": "1", "stdin": "1", "stderr": "1", "tty": "0", "container": "xxx", "command": commandDst} paraDst := map[string]string{"stdout": "0", "stdin": "1", "stderr": "0", "tty": "0", "container": "xxx", "command": commandDst} //关闭目的Pod标准输出和错误输出 dstConn, err := NewWsConn(host, pathDst, paraDst, headers) if err != nil { log.Printf("目的Pod连接出错, %s", err.Error()) } wsSrc := WsConn{ Conn: srcConn, } wsDst := WsConn{ Conn: dstConn, } defer srcConn.Close() defer dstConn.close() srcStdOutCh := make(chan []byte, 2048) dstStdOutCh := make(chan []byte) defer close(srcStdOutCh) defer close(dstStdOutCh) // 接收源容器标准输出到数据通道中 go func() { i := 1 for { log.Printf("第%d次, 从源容器读取标准输出", i) i //定义匿名过滤器回调方法, 对源容器标准输出中不需要的数据进行过滤 err := wsSrc.Receive("源容器", srcStdOutCh, func(input string) bool { if input == "cat /var/log/mysql/slow.log" { return false // } else if match, _ := regexp.MatchString("root@(. )#", input); match { // return false // } else if match, _ := regexp.MatchString("cat /(. ).log", input); match { // return false // } else if match, _ := regexp.MatchString("cat /tmp/(. )", input); match { // return false } else if match, _ := regexp.MatchString("tar: Removing leading(. )", input); match { return false } else if len(input) == 0 { //过滤空消息 // log.Printf("读取到标准错误输出") return false } return true }) if err != nil { log.Printf("读取源容器标准输出失败") // signalExit <- true break } // time.Sleep(time.MicroSecond * 100) } }() // // 从目的容器获取标准输出到数据通道中 // go func() { // // i := 0 // for { // // 该过滤器直接返回true, 仅占位 // err := wsDst.Receive("目的容器", dstStdOutCh, func(input string) bool { // return true // }) // if err != nil { // log.Printf("从目的容器读取数据失败") // break // } // // wsDst.Send() // time.Sleep(time.Microsecond * 100000) // } // // log.Printf("从目的容器读取数据, 第%d次循环", i) // // i // }() // //从数据通道中读取, 目的容器的标准输出, 并打印 // go func() { // BreakPrintDstPodStdout: // for { // select { // case data:= <-dstStdOutCh: // log.Printf("目的容器标准输出:%s", string(data)) // // time.Sleep(time.Microsecond * 200) // case <-signalStopPrintDstStdout: // log.Printf("收到信号, 停止打印目的容器标准输出") // // close(dataOutput) // // close(dataCh) // // signalStopRead <- true // // log.Printf("发送停止读信号") // // close(dataOutput) // // close(dataCh) // break BreakPrintDstPodStdout // } // // time.Sleep(time.Microsecond * 100) // } // }() //从源容器标准输出的数据通道获取数据, 然后发送给目的容器标准输入 //定义超时时间 timeOutSecond := 3 timer := time.NewTimer(time.Second * time.Duration(timeOutSecond))Break2Main: for { select { case data:= <-srcStdOutCh: wsDst.SendWithFilter("向目的容器发送", string(data)) // time.Sleep(time.Millisecond * 200) timer.Reset(time.Second * time.Duration(timeOutSecond)) case <-timer.C: // time.Sleep(time.Second * 5) log.Printf("================ 源容器标准输出,没有新的数据,获取超时,停止向目的容器发送数据 ================") // log.Printf("发送信号:停止打印目的容器标准输出") // signalStopPrintDstStdout <- true log.Printf("发送信号:停止从源容器读取数据") wsSrc.Conn.Close() // log.Printf("发送信号:停止从目的容器读取数据") // wsDst.Conn.Close() log.Printf("发送信号:主函数可以退出了") signalExit <- true log.Printf("所有信号发送完毕") log.Printf("================== 跳出循环 =================") break Break2Main } // time.Sleep(time.Microsecond * 1000) } // signalStopRead <- true <-signalExit //阻塞通道, 直到收到一个信号 // signalStopRead <- true log.Printf("主函数收到信号, 准备退出") // close(dataCh) // time.Sleep(time.Second) // close(dataOutput) // time.Sleep(time.Second) // select {}}
cp_test.go
package cpFilePod2Podimport ( "log" "testing")// go test -race -test.run TestCpPod2Pod 切到该目录执行该测试func TestCpPod2Pod(t *testing.T) { log.Printf("开始测试") CpPod2Pod()}
参考结果:源容器:root@xxx-mysql-0:/var/log/mysql# md5sum slow.log16577613b6ea957ecb5d9d5e976d9c50 slow.log目的容器:root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log16577613b6ea957ecb5d9d5e976d9c50 slow.log
Kubernetes exec API串接分析:https://www.cnblogs.com/a00ium/p/10905279.html
kubernetes-client-go-实现-kubectl-copy:https://ica10888.com/2019/08/31/kubernetes-client-go-实现-kubectl-copy.html
END已结束
欢迎大家留言, 订阅, 交流哦!
往期回顾
Golang的23种设计模式之代码示例 示例图解 设计模式资料
[翻译自官方]什么是RDB和AOF? 一文了解Redis持久化!
Golang GinWeb框架9-编译模板/自定义结构体绑定/http2/操作cookie/完结
Golang GinWeb框架8-重定向/自定义中间件/认证/HTTPS支持/优雅重启等
Golang GinWeb框架7-静态文件/模板渲染
Golang GinWeb框架6-XML/JSON/YAML/ProtoBuf等渲染
Golang GinWeb框架5-绑定请求字符串/URI/请求头/复选框/表单类型
Golang GinWeb框架4-请求参数绑定和验证
Golang GinWeb框架3-自定义日志格式和输出方式/启禁日志颜色
Golang GinWeb框架2-文件上传/程序panic崩溃后自定义处理方式
Golang GinWeb框架-快速入门/参数解析
Golang与亚马逊对象存储服务AmazonS3快速入门
Golang Vue实现Websocket全双工通信入门
GolangWeb编程之控制器方法HandlerFunc与中间件Middleware
Golang连接MySQL执行查询并解析-告别结构体
Golang的一种发布订阅模式实现
Golang 并发数据冲突检测器(Data Race Detector)与并发安全
Golang"驱动"MongoDB-快速入门("快码加鞭")
欢迎点击[在看], 推荐给其他小伙伴哦!
,