Skip to content

Commit 42faf99

Browse files
committed
introduce --watch capability for oc rsync
1 parent f254c9b commit 42faf99

File tree

8 files changed

+315
-4
lines changed

8 files changed

+315
-4
lines changed

contrib/completions/bash/oc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2622,6 +2622,8 @@ _oc_rsync()
26222622
flags+=("--quiet")
26232623
flags+=("-q")
26242624
flags+=("--strategy=")
2625+
flags+=("--watch")
2626+
flags+=("-w")
26252627
flags+=("--api-version=")
26262628
flags+=("--certificate-authority=")
26272629
flags_with_completion+=("--certificate-authority")

contrib/completions/bash/openshift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6171,6 +6171,8 @@ _openshift_cli_rsync()
61716171
flags+=("--quiet")
61726172
flags+=("-q")
61736173
flags+=("--strategy=")
6174+
flags+=("--watch")
6175+
flags+=("-w")
61746176
flags+=("--api-version=")
61756177
flags+=("--certificate-authority=")
61766178
flags_with_completion+=("--certificate-authority")

pkg/cmd/cli/cmd/rsync/copyrsync.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ type rsyncStrategy struct {
2828
podChecker podChecker
2929
}
3030

31-
var rshExcludeFlags = sets.NewString("delete", "strategy", "quiet", "include", "exclude", "progress", "no-perms")
31+
// rshExcludeFlags are flags that are passed to oc rsync, and should not be passed on to the underlying command being invoked via oc rsh.
32+
var rshExcludeFlags = sets.NewString("delete", "strategy", "quiet", "include", "exclude", "progress", "no-perms", "watch")
3233

3334
func newRsyncStrategy(f *clientcmd.Factory, c *cobra.Command, o *RsyncOptions) (copyStrategy, error) {
3435
// Determine the rsh command to pass to the local rsync command

pkg/cmd/cli/cmd/rsync/rsync.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@ import (
44
"errors"
55
"fmt"
66
"io"
7+
"sync"
8+
"time"
79

10+
"github.com/fsnotify/fsnotify"
11+
"github.com/golang/glog"
812
"github.com/spf13/cobra"
913
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
1014

1115
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
16+
"github.com/openshift/origin/pkg/util/fsnotification"
1217
)
1318

1419
const (
@@ -70,6 +75,7 @@ type RsyncOptions struct {
7075
StrategyName string
7176
Quiet bool
7277
Delete bool
78+
Watch bool
7379

7480
RsyncInclude string
7581
RsyncExclude string
@@ -111,6 +117,7 @@ func NewCmdRsync(name, parent string, f *clientcmd.Factory, out, errOut io.Write
111117
cmd.Flags().StringVar(&o.RsyncInclude, "include", "", "rsync - include files matching specified pattern")
112118
cmd.Flags().BoolVar(&o.RsyncProgress, "progress", false, "rsync - show progress during transfer")
113119
cmd.Flags().BoolVar(&o.RsyncNoPerms, "no-perms", false, "rsync - do not transfer permissions")
120+
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", false, "Watch directory for changes and resync automatically")
114121
return cmd
115122
}
116123

@@ -229,7 +236,92 @@ func (o *RsyncOptions) Validate() error {
229236

230237
// RunRsync copies files from source to destination
231238
func (o *RsyncOptions) RunRsync() error {
232-
return o.Strategy.Copy(o.Source, o.Destination, o.Out, o.ErrOut)
239+
if err := o.Strategy.Copy(o.Source, o.Destination, o.Out, o.ErrOut); err != nil {
240+
return err
241+
}
242+
243+
if !o.Watch {
244+
return nil
245+
}
246+
return o.WatchAndSync()
247+
}
248+
249+
// WatchAndSync sets up a recursive filesystem watch on the sync path
250+
// and invokes rsync each time the path changes.
251+
func (o *RsyncOptions) WatchAndSync() error {
252+
253+
// these variables must be accessed while holding the changeLock
254+
// mutex as they are shared between goroutines to communicate
255+
// sync state/events.
256+
var (
257+
changeLock sync.Mutex
258+
dirty bool
259+
lastChange time.Time
260+
watchError error
261+
)
262+
263+
watcher, err := fsnotify.NewWatcher()
264+
if err != nil {
265+
return fmt.Errorf("error setting up filesystem watcher: %v", err)
266+
}
267+
defer watcher.Close()
268+
269+
go func() {
270+
for {
271+
select {
272+
case event := <-watcher.Events:
273+
changeLock.Lock()
274+
glog.V(5).Infof("filesystem watch event: %s", event)
275+
lastChange = time.Now()
276+
dirty = true
277+
if event.Op&fsnotify.Remove == fsnotify.Remove {
278+
if e := watcher.Remove(event.Name); e != nil {
279+
glog.V(5).Infof("error removing watch for %s: %v", event.Name, e)
280+
}
281+
} else {
282+
if e := fsnotification.AddRecursiveWatch(watcher, event.Name); e != nil && watchError == nil {
283+
watchError = e
284+
}
285+
}
286+
changeLock.Unlock()
287+
case err := <-watcher.Errors:
288+
changeLock.Lock()
289+
watchError = fmt.Errorf("error watching filesystem for changes: %v", err)
290+
changeLock.Unlock()
291+
}
292+
}
293+
}()
294+
295+
err = fsnotification.AddRecursiveWatch(watcher, o.Source.Path)
296+
if err != nil {
297+
return fmt.Errorf("error watching source path %s: %v", o.Source.Path, err)
298+
}
299+
300+
delay := 2 * time.Second
301+
ticker := time.NewTicker(delay)
302+
defer ticker.Stop()
303+
for {
304+
changeLock.Lock()
305+
if watchError != nil {
306+
return watchError
307+
}
308+
// if a change happened more than 'delay' seconds ago, sync it now.
309+
// if a change happened less than 'delay' seconds ago, sleep for 'delay' seconds
310+
// and see if more changes happen, we don't want to sync when
311+
// the filesystem is in the middle of changing due to a massive
312+
// set of changes (such as a local build in progress).
313+
if dirty && time.Now().After(lastChange.Add(delay)) {
314+
glog.V(1).Info("Synchronizing filesystem changes...")
315+
err = o.Strategy.Copy(o.Source, o.Destination, o.Out, o.ErrOut)
316+
if err != nil {
317+
return err
318+
}
319+
glog.V(1).Info("Done.")
320+
dirty = false
321+
}
322+
changeLock.Unlock()
323+
<-ticker.C
324+
}
233325
}
234326

235327
// PodName returns the name of the pod as specified in either the

pkg/util/fsnotification/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// Package fsnotification provides helper functions that wrap the fsnotify filesystem
2+
// notification package.
3+
package fsnotification
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package fsnotification
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
8+
"github.com/golang/glog"
9+
10+
"github.com/fsnotify/fsnotify"
11+
)
12+
13+
// AddRecursiveWatch handles adding watches recursively for the path provided
14+
// and its subdirectories. If a non-directory is specified, this call is a no-op.
15+
// Recursive logic from https://github.com/bronze1man/kmg/blob/master/fsnotify/Watcher.go
16+
func AddRecursiveWatch(watcher *fsnotify.Watcher, path string) error {
17+
file, err := os.Stat(path)
18+
if err != nil {
19+
if os.IsNotExist(err) {
20+
return nil
21+
}
22+
return fmt.Errorf("error introspecting path %s: %v", path, err)
23+
}
24+
if !file.IsDir() {
25+
return nil
26+
}
27+
28+
folders, err := getSubFolders(path)
29+
for _, v := range folders {
30+
glog.V(5).Infof("adding watch on path %s", v)
31+
err = watcher.Add(v)
32+
if err != nil {
33+
return fmt.Errorf("error adding watcher for path %s: %v", v, err)
34+
}
35+
}
36+
return nil
37+
}
38+
39+
// getSubFolders recursively retrieves all subfolders of the specified path.
40+
func getSubFolders(path string) (paths []string, err error) {
41+
err = filepath.Walk(path, func(newPath string, info os.FileInfo, err error) error {
42+
if err != nil {
43+
return err
44+
}
45+
46+
if info.IsDir() {
47+
paths = append(paths, newPath)
48+
}
49+
return nil
50+
})
51+
return paths, err
52+
}

test/extended/cli/rsync.go

Lines changed: 140 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
package cli
22

33
import (
4+
"bufio"
45
"fmt"
56
"io/ioutil"
67
"os"
78
"path/filepath"
89
"strings"
10+
"time"
911

1012
g "github.com/onsi/ginkgo"
1113
o "github.com/onsi/gomega"
1214

1315
kapi "k8s.io/kubernetes/pkg/api"
1416
"k8s.io/kubernetes/pkg/labels"
17+
"k8s.io/kubernetes/test/e2e"
1518

1619
exutil "github.com/openshift/origin/test/extended/util"
1720
)
@@ -47,8 +50,144 @@ var _ = g.Describe("[cli][Slow] can use rsync to upload files to pods", func() {
4750
podName = pods.Items[0].Name
4851
})
4952

50-
g.Describe("copy by strategy", func() {
53+
g.Describe("using a watch", func() {
54+
g.It("should watch for changes and rsync them", func() {
55+
g.By("Creating a local temporary directory")
56+
tempDir, err := ioutil.TempDir("", "rsync")
57+
o.Expect(err).NotTo(o.HaveOccurred())
58+
59+
g.By("creating a subdirectory in that temp directory")
60+
subdir1 := filepath.Join(tempDir, "subdir1")
61+
err = os.Mkdir(subdir1, 0777)
62+
o.Expect(err).NotTo(o.HaveOccurred())
63+
64+
g.By("creating a file in the subdirectory")
65+
subdir1file1 := filepath.Join(subdir1, "file1")
66+
_, err = os.Create(subdir1file1)
67+
o.Expect(err).NotTo(o.HaveOccurred())
68+
69+
g.By("Creating a scratch directory in the pod")
70+
_, err = oc.Run("rsh").Args(podName, "mkdir", "/tmp/rsync").Output()
71+
o.Expect(err).NotTo(o.HaveOccurred())
72+
73+
g.By(fmt.Sprintf("Calling oc rsync %s/ %s:/tmp/rsync --delete --watch", tempDir, podName))
74+
cmd, stdout, stderr, err := oc.Run("rsync").Args(
75+
fmt.Sprintf("%s/", tempDir),
76+
fmt.Sprintf("%s:/tmp/rsync", podName),
77+
"--loglevel=5",
78+
"--delete",
79+
"--watch").Background()
80+
81+
failed := true
82+
defer cmd.Process.Kill()
83+
defer func() {
84+
if failed {
85+
writer := cmd.Stdout.(*bufio.Writer)
86+
writer.Flush()
87+
writer2 := cmd.Stderr.(*bufio.Writer)
88+
writer2.Flush()
89+
fmt.Fprintf(g.GinkgoWriter, "Dumping rsync output: \n%s\n%s\n", stdout.String(), stderr.String())
90+
}
91+
}()
92+
o.Expect(err).NotTo(o.HaveOccurred())
93+
94+
var result string
95+
found := false
96+
for i := 0; i < 12; i++ {
97+
g.By("Verifying that files are copied to the container")
98+
result, _ = oc.Run("rsh").Args(podName, "ls", "/tmp/rsync/subdir1").Output()
99+
if strings.Contains(result, "file1") {
100+
found = true
101+
break
102+
}
103+
time.Sleep(5 * time.Second)
104+
}
105+
if !found {
106+
e2e.Failf("Directory does not contain expected files: \n%s", result)
107+
}
108+
109+
g.By("renaming file1 to file2")
110+
subdir1file2 := filepath.Join(subdir1, "file2")
111+
err = os.Rename(subdir1file1, subdir1file2)
112+
o.Expect(err).NotTo(o.HaveOccurred())
113+
114+
found = false
115+
for i := 0; i < 12; i++ {
116+
g.By("Verifying that files are copied to the container")
117+
result, _ = oc.Run("rsh").Args(podName, "ls", "/tmp/rsync/subdir1").Output()
118+
if strings.Contains(result, "file2") && !strings.Contains(result, "file1") {
119+
found = true
120+
break
121+
}
122+
time.Sleep(5 * time.Second)
123+
}
124+
if !found {
125+
e2e.Failf("Directory does not contain expected files: \n%s", result)
126+
}
127+
128+
g.By("removing file2")
129+
err = os.Remove(subdir1file2)
130+
o.Expect(err).NotTo(o.HaveOccurred())
51131

132+
found = false
133+
for i := 0; i < 12; i++ {
134+
g.By("Verifying that files are copied to the container")
135+
result, _ = oc.Run("rsh").Args(podName, "ls", "/tmp/rsync/subdir1").Output()
136+
if !strings.Contains(result, "file2") {
137+
found = true
138+
break
139+
}
140+
time.Sleep(5 * time.Second)
141+
}
142+
if !found {
143+
e2e.Failf("Directory does not contain expected files: \n%s", result)
144+
}
145+
146+
g.By("renaming subdir1 to subdir2")
147+
subdir2 := filepath.Join(tempDir, "subdir2")
148+
err = os.Rename(subdir1, subdir2)
149+
o.Expect(err).NotTo(o.HaveOccurred())
150+
151+
g.By("creating a file in the subdir2")
152+
subdir2file1 := filepath.Join(subdir2, "file1")
153+
_, err = os.Create(subdir2file1)
154+
o.Expect(err).NotTo(o.HaveOccurred())
155+
156+
found = false
157+
for i := 0; i < 12; i++ {
158+
g.By("Verifying that files are copied to the container")
159+
result, _ = oc.Run("rsh").Args(podName, "ls", "/tmp/rsync/subdir2").Output()
160+
if !strings.Contains(result, "file1") {
161+
found = true
162+
break
163+
}
164+
time.Sleep(5 * time.Second)
165+
}
166+
if !found {
167+
e2e.Failf("Directory does not contain expected files: \n%s", result)
168+
}
169+
170+
g.By("removing subdir2")
171+
err = os.RemoveAll(subdir2)
172+
o.Expect(err).NotTo(o.HaveOccurred())
173+
174+
found = false
175+
for i := 0; i < 12; i++ {
176+
g.By("Verifying that files are copied to the container")
177+
result, _ = oc.Run("rsh").Args(podName, "ls", "/tmp/rsync").Output()
178+
if !strings.Contains(result, "subdir2") {
179+
found = true
180+
break
181+
}
182+
time.Sleep(5 * time.Second)
183+
}
184+
if !found {
185+
e2e.Failf("Directory does not contain expected files: \n%s", result)
186+
}
187+
failed = false
188+
})
189+
})
190+
g.Describe("copy by strategy", func() {
52191
testRsyncFn := func(strategy string) func() {
53192
return func() {
54193
g.By(fmt.Sprintf("Calling oc rsync %s %s:/tmp --strategy=%s", sourcePath1, podName, strategy))
@@ -219,5 +358,4 @@ var _ = g.Describe("[cli][Slow] can use rsync to upload files to pods", func() {
219358
o.Expect(err).NotTo(o.HaveOccurred())
220359
})
221360
})
222-
223361
})

0 commit comments

Comments
 (0)