Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

import re 

import mock 

import os 

import StringIO 

import fnmatch 

import string 

import random 

import textwrap 

import errno 

 

 

PATHSEP = '/' 

 

 

class ContextSetupError(Exception): 

    pass 

 

 

def get_error_codes(): 

    this_dir = os.path.dirname(__file__) 

    drivers_dir = os.path.join(this_dir, '..', 'drivers') 

    error_codes_path = os.path.join(drivers_dir, 'XE_SR_ERRORCODES.xml') 

    error_code_catalog = open(error_codes_path, 'r') 

    contents = error_code_catalog.read() 

    error_code_catalog.close() 

    return contents 

 

 

class SCSIDisk(object): 

    def __init__(self, adapter): 

        self.adapter = adapter 

        self.long_id = ''.join( 

            random.choice(string.digits) for _ in range(33)) 

 

    def disk_device_paths(self, host_id, disk_id, actual_disk_letter): 

        yield '/sys/class/scsi_disk/%s:0:%s:0' % (host_id, disk_id) 

        yield '/sys/class/scsi_disk/%s:0:%s:0/device/block/sd%s' % ( 

            host_id, disk_id, actual_disk_letter) 

        yield '/dev/disk/by-scsibus/%s-%s:0:%s:0' % ( 

            self.adapter.long_id, host_id, disk_id) 

        yield '/dev/disk/by-id/%s' % (self.long_id) 

 

 

class SCSIAdapter(object): 

    def __init__(self): 

        self.disks = [] 

        self.long_id = ''.join( 

            random.choice(string.digits) for _ in range(33)) 

        self.parameters = [] 

 

    def add_disk(self): 

        disk = SCSIDisk(self) 

        self.disks.append(disk) 

        return disk 

 

    def add_parameter(self, host_class, values): 

        self.parameters.append((host_class, values)) 

 

    def adapter_device_paths(self, host_id): 

        yield '/sys/class/scsi_host/host%s' % host_id 

 

 

class AdapterWithNonBlockDevice(SCSIAdapter): 

    def adapter_device_paths(self, host_id): 

        for adapter_device_path in super(AdapterWithNonBlockDevice, 

                                         self).adapter_device_paths(host_id): 

            yield adapter_device_path 

        yield '/sys/class/fc_transport/target7:0:0/device/7:0:0:0' 

 

 

class Executable(object): 

    def __init__(self, function_to_call): 

        self.function_to_call = function_to_call 

 

    def run(self, args, stdin): 

        (return_code, stdout, stderr) = self.function_to_call(args, stdin) 

        return (return_code, stdout, stderr) 

 

 

class Subprocess(object): 

    def __init__(self, executable, args): 

        self.executable = executable 

        self.args = args 

 

    def communicate(self, data): 

        self.returncode, out, err = self.executable.run(self.args, data) 

        return out, err 

 

 

class TestContext(object): 

    def __init__(self): 

        self.patchers = [] 

        self.error_codes = get_error_codes() 

        self.inventory = { 

            'PRIMARY_DISK': '/dev/disk/by-id/primary' 

        } 

        self.scsi_adapters = [] 

        self.kernel_version = '3.1' 

        self.executables = {} 

        self._created_directories = [] 

        self._path_content = {} 

        self._next_fileno = 0 

 

    def _get_inc_fileno(self): 

        result = self._next_fileno 

        self._next_fileno += 1 

        return result 

 

    def add_executable(self, fpath, funct): 

        self.executables[fpath] = Executable(funct) 

 

    def generate_inventory_contents(self): 

        return '\n'.join( 

            [ 

                '='.join( 

                    [k, v.join(2 * ["'"])]) for k, v in self.inventory.items() 

            ] 

        ) 

 

    def start(self): 

        self.patchers = [ 

            mock.patch('__builtin__.open', new=self.fake_open), 

            mock.patch('__builtin__.file', new=self.fake_open), 

            mock.patch('os.path.exists', new=self.fake_exists), 

            mock.patch('os.makedirs', new=self.fake_makedirs), 

            mock.patch('os.listdir', new=self.fake_listdir), 

            mock.patch('glob.glob', new=self.fake_glob), 

            mock.patch('os.uname', new=self.fake_uname), 

            mock.patch('subprocess.Popen', new=self.fake_popen), 

            mock.patch('os.rmdir', new=self.fake_rmdir), 

            mock.patch('os.stat', new=self.fake_stat), 

        ] 

        map(lambda patcher: patcher.start(), self.patchers) 

        self.setup_modinfo() 

 

    def fake_rmdir(self, path): 

        if path not in self.get_filesystem(): 

            raise OSError(errno.ENOENT, 'No such file %s' % path) 

 

        if self.fake_glob(os.path.join(path, '*')): 

            raise OSError(errno.ENOTEMPTY, 'Directory is not empty %s' % path) 

 

        assert path in self._created_directories 

        self._created_directories = [ 

            d for d in self._created_directories if d != path] 

 

    def fake_stat(self, path): 

        if not self.fake_exists(path): 

            raise OSError() 

 

    def fake_makedirs(self, path): 

        if path in self.get_filesystem(): 

            raise OSError(path + " Already exists") 

        self._created_directories.append(path) 

        self.log("Recursively created directory", path) 

 

    def setup_modinfo(self): 

        self.add_executable('/sbin/modinfo', self.fake_modinfo) 

 

    def setup_error_codes(self): 

        self._path_content['/opt/xensource/sm/XE_SR_ERRORCODES.xml'] = ( 

            self.error_codes 

        ) 

 

    def fake_modinfo(self, args, stdin_data): 

        assert len(args) == 3 

        assert args[1] == '-d' 

        return (0, args[2] + '-description', '') 

 

    def fake_popen(self, args, stdin, stdout, stderr, close_fds): 

        import subprocess 

        assert stdin == subprocess.PIPE 

        assert stdout == subprocess.PIPE 

        assert stderr == subprocess.PIPE 

        assert close_fds is True 

 

        path_to_executable = args[0] 

 

        if path_to_executable not in self.executables: 

            raise ContextSetupError( 

                path_to_executable 

                + ' was not found. Set it up using add_executable.' 

                + ' was called with: ' + str(args)) 

 

        executable = self.executables[path_to_executable] 

        return Subprocess(executable, args) 

 

    def fake_uname(self): 

        return ( 

            'Linux', 

            'testbox', 

            self.kernel_version, 

            '#1 SMP Thu May 8 09:50:50 EDT 2014', 

            'x86_64' 

        ) 

 

    def fake_open(self, fname, mode='r'): 

        if fname == '/etc/xensource-inventory': 

            return StringIO.StringIO(self.generate_inventory_contents()) 

 

        for fpath, contents in self.generate_path_content(): 

            if fpath == fname: 

                return StringIO.StringIO(contents) 

 

        if 'w' in mode: 

            if os.path.dirname(fname) in self.get_created_directories(): 

                self._path_content[fname] = '' 

                return WriteableFile(self, fname, self._get_inc_fileno()) 

            error = IOError('No such file %s' % fname) 

            error.errno = errno.ENOENT 

            raise error 

 

        self.log('tried to open file', fname) 

        raise IOError(fname) 

 

    def fake_exists(self, fname): 

        for existing_fname in self.get_filesystem(): 

            if fname == existing_fname: 

                return True 

 

        self.log('not exists', fname) 

        return False 

 

    def fake_listdir(self, path): 

        assert '*' not in path 

        glob_pattern = path + '/*' 

        glob_matches = self.fake_glob(glob_pattern) 

        return [match[len(path)+1:] for match in glob_matches] 

 

    def get_filesystem(self): 

        result = set(['/']) 

        for devpath in self.generate_device_paths(): 

            for path in filesystem_for(devpath): 

                result.add(path) 

 

        for executable_path in self.executables: 

            for path in filesystem_for(executable_path): 

                result.add(path) 

 

        for directory in self.get_created_directories(): 

            result.add(directory) 

 

        return sorted(result) 

 

    def get_created_directories(self): 

        result = set(['/']) 

        for created_directory in self._created_directories: 

            for path in filesystem_for(created_directory): 

                result.add(path) 

        return sorted(result) 

 

    def generate_path_content(self): 

        for host_id, adapter in enumerate(self.scsi_adapters): 

            for host_class, values in adapter.parameters: 

                for key, value in values.iteritems(): 

                    path = '/sys/class/%s/host%s/%s' % ( 

                        host_class, host_id, key) 

                    yield (path, value) 

 

        for path, value in self._path_content.iteritems(): 

            yield (path, value) 

 

    def generate_device_paths(self): 

        actual_disk_letter = 'a' 

        for host_id, adapter in enumerate(self.scsi_adapters): 

            for adapter_device_path in adapter.adapter_device_paths(host_id): 

                yield adapter_device_path 

            for disk_id, disk in enumerate(adapter.disks): 

                for path in disk.disk_device_paths(host_id, disk_id, 

                                                   actual_disk_letter): 

                    yield path 

                actual_disk_letter = chr(ord(actual_disk_letter) + 1) 

 

        for path, _content in self.generate_path_content(): 

            yield path 

 

    def fake_glob(self, pattern): 

        result = [] 

        pattern_parts = pattern.split(PATHSEP) 

        for fname in self.get_filesystem(): 

            fname_parts = fname.split(PATHSEP) 

            if len(fname_parts) != len(pattern_parts): 

                continue 

 

            found = True 

            for pattern_part, fname_part in zip(pattern_parts, fname_parts): 

                if not fnmatch.fnmatch(fname_part, pattern_part): 

                    found = False 

            if found: 

                result.append(fname) 

 

        if not result: 

            self.log('no glob', pattern) 

        return list(set(result)) 

 

    def log(self, *args): 

        WARNING = '\033[93m' 

        ENDC = '\033[0m' 

        import sys 

        sys.stdout.write( 

            WARNING 

            + ' '.join(str(arg) for arg in args) 

            + ENDC 

            + '\n') 

 

    def stop(self): 

        map(lambda patcher: patcher.stop(), self.patchers) 

 

    def add_adapter(self, adapter): 

        self.scsi_adapters.append(adapter) 

        return adapter 

 

 

def with_custom_context(context_class): 

    def _with_context(func): 

        def decorated(self, *args, **kwargs): 

            context = context_class() 

            context.start() 

            try: 

                result = func(self, context, *args, **kwargs) 

                context.stop() 

                return result 

            except: 

                context.stop() 

                raise 

 

        decorated.__name__ = func.__name__ 

        return decorated 

    return _with_context 

 

 

def with_context(func): 

    decorator = with_custom_context(TestContext) 

    return decorator(func) 

 

 

def xml_string(text): 

    dedented = textwrap.dedent(text).strip() 

    lines = [] 

    for line in dedented.split('\n'): 

        lines.append(re.sub(r'^ *', '', line)) 

 

    return ''.join(lines) 

 

 

def marshalled(dom): 

    text = dom.toxml() 

    result = text.replace('\n', '') 

    result = result.replace('\t', '') 

    return result 

 

 

def filesystem_for(path): 

    result = [PATHSEP] 

    assert path.startswith(PATHSEP) 

    segments = [seg for seg in path.split(PATHSEP) if seg] 

    for i in range(len(segments)): 

        result.append(PATHSEP + PATHSEP.join(segments[:i+1])) 

    return result 

 

 

class XmlMixIn(object): 

    def assertXML(self, expected, actual): 

        import xml 

 

        expected_dom = xml.dom.minidom.parseString( 

            xml_string(expected)) 

 

        actual_dom = xml.dom.minidom.parseString(actual) 

 

        self.assertEquals( 

            marshalled(expected_dom), 

            marshalled(actual_dom) 

        ) 

 

 

class WriteableFile(object): 

    def __init__(self, context, fname, fileno, data=None): 

        self._context = context 

        self._fname = fname 

        self._file = StringIO.StringIO(data) 

        self._fileno = fileno 

 

    def fileno(self): 

        return self._fileno 

 

    def write(self, data): 

        return self._file.write(data) 

 

    def close(self): 

        self._context._path_content[self._fname] = self._file.getvalue() 

        self._file.close()