1
2
3
4
5
6
7
8
9 """
10 Services Twisted de collection et de publication de données.
11 """
12
13 import locale, gettext, os, pwd, shutil, random
14 from glob import glob
15
16
17 from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR
18 APP = 'zephir-agents'
19 DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n')
20 gettext.install(APP, DIR, unicode=False)
21
22
23 from twisted.python import log
24 from twisted.application import internet, service
25 from twisted.internet import utils, reactor
26 from twisted.web import resource, server, static, util, xmlrpc
27
28 from zephir.monitor.agentmanager import config as cfg
29 from zephir.monitor.agentmanager.util import ensure_dirs, md5file, md5files
30 from zephir.monitor.agentmanager.web_resources import ZephirServerResource
31 from zephir.monitor.agentmanager.clientmanager import ClientManager
32
33 try:
34 import zephir.zephir_conf.zephir_conf as conf_zeph
35 from zephir.lib_zephir import zephir, convert, zephir_dir
36 from zephir.lib_zephir import log as zeph_log
37 registered = 1
38 except:
39
40 registered = 0
41
43 """Main Twisted service for Zephir apps"""
44
45 - def __init__(self, config, root_resource=None, serve_static=False):
46 """config will be completed by default values"""
47 service.MultiService.__init__(self)
48 self.config = cfg.DEFAULT_CONFIG.copy()
49 self.config.update(config)
50 self.updater = self.publisher = None
51
52 if root_resource is None:
53 self.root_resource = resource.Resource()
54 webserver = internet.TCPServer(self.config['webserver_port'],
55 server.Site(self.root_resource))
56 webserver.setServiceParent(service.IServiceCollection(self))
57 else:
58 self.root_resource = root_resource
59
60 if serve_static:
61 self.root_resource.putChild('static',
62 static.File(self.config['static_web_dir']))
63
64
65
66
68 assert self.updater is None
69 self.updater = UpdaterService(self.config, self, self.root_resource)
70 return self
71
73 assert self.publisher is None
74 self.publisher = PublisherService(self.config, self, self.root_resource)
75 return self
76
78 assert self.updater is None
79 assert self.publisher is None
80 self.updater = UpdaterService(self.config, self, self.root_resource)
81 self.publisher = PublisherService(self.config, self, self.root_resource,
82 show_clients_page = False,
83 live_agents={self.config['host_ref']: self.updater.agents})
84 return self
85
86
87
88
90 """Schedules measures, data serialisation and upload."""
91
92 - def __init__(self, config, parent, root_resource):
93 """config should be complete"""
94 service.MultiService.__init__(self)
95 xmlrpc.XMLRPC.__init__(self)
96 self.config = config
97 self.module = ""
98
99 if os.path.exists('/etc/eole/version'):
100 f = file('/etc/eole/version')
101 self.module = f.read().split('\n')[0]
102 f.close()
103
104 self.update_static_data()
105
106 loc, enc = locale.getdefaultlocale()
107 log.msg(_('default locale: %s encoding: %s') % (loc, enc))
108 if enc == 'utf':
109 log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc)
110 self.agents = self.load_agents()
111
112 self.setServiceParent(service.IServiceCollection(parent))
113 root_resource.putChild('xmlrpc', self)
114
116 """initialize zephir services"""
117 service.MultiService.startService(self)
118 reactor.callLater(2,self.schedule_all)
119 if registered != 0:
120
121
122 self.setup_uucp()
123
124 if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')):
125 try:
126 zeph_log('REBOOT',0,'redémarrage du serveur terminé')
127 os.unlink(os.path.join(zephir_dir,'reboot.lck'))
128 except:
129 pass
130
132 """Charge tous les agents du répertoire de configurations."""
133 log.msg(_("Loading agents from %s...") % self.config['config_dir'])
134 loaded_agents = {}
135 list_agents = glob(os.path.join(self.config['config_dir'], "*.agent"))
136 list_agents.extend(glob(os.path.join(self.config['config_dir'],self.module,"*.agent")))
137 for f in list_agents:
138 log.msg(_(" from %s:") % os.path.basename(f))
139 h = { 'AGENTS': None }
140 execfile(f, globals(), h)
141 assert h.has_key('AGENTS')
142 for a in h['AGENTS']:
143 assert not loaded_agents.has_key(a.name)
144
145 a.init_data(os.path.join(self.config['state_dir'],
146 self.config['host_ref'],
147 a.name))
148 a.manager = self
149 a.archive()
150 loaded_agents[a.name] = a
151 log.msg(_(" %s, period %d") % (a.name, a.period))
152 log.msg(_("Loaded."))
153 return loaded_agents
154
155
156
157
159 """Planifie les mesures périodiques d'un agent."""
160 assert self.agents.has_key(agent_name)
161 if self.agents[agent_name].period > 0:
162 timer = internet.TimerService(self.agents[agent_name].period,
163 self.wakeup_for_measure, agent_name)
164 timer.setName(agent_name)
165 timer.setServiceParent(service.IServiceCollection(self))
166
167
173
174
176 """Planifie tous les agents chargés.
177 Démarre le cycle de mesures périodiques de chaque agent
178 chargé. La première mesure est prise immédiatement.
179 """
180 for agent_name in self.agents.keys():
181
182 self.schedule(agent_name)
183
184
186 assert self.agents.has_key(agent_name)
187 return self.getServiceNamed(agent_name)
188
189
190
191
193 ensure_dirs(self.config['uucp_dir'])
194 self.update_static_data()
195
196 try:
197 reload(conf_zeph)
198
199
200 for dir in os.listdir(self.config['state_dir']):
201 if dir != str(conf_zeph.id_serveur):
202 shutil.rmtree(os.path.join(self.config['state_dir'],dir))
203
204 period = convert(zephir.serveurs.get_timeout(conf_zeph.id_serveur)[1])
205 except:
206 period = 0
207
208 if period == 0:
209 period = self.config['upload_period']
210
211
212 delay = random.randrange(30,period)
213 reactor.callLater(delay,self.wakeup_for_upload)
214
216 original = os.path.join(self.config['config_dir'], 'site.cfg')
217 if os.path.isfile(original):
218 destination = cfg.client_data_dir(self.config, self.config['host_ref'])
219 ensure_dirs(destination)
220 need_copy = False
221 try:
222 org_mtime = os.path.getmtime(original)
223 dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg'))
224 except OSError:
225 need_copy = True
226 if need_copy or (org_mtime > dest_mtime):
227 shutil.copy(original, destination)
228
230
231 try:
232 reload(conf_zeph)
233 period = convert(zephir.serveurs.get_timeout(conf_zeph.id_serveur)[1])
234 except:
235 period = 0
236
237 if period == 0:
238 period = self.config['upload_period']
239
240
241 if recall:
242 reactor.callLater(period,self.wakeup_for_upload)
243
244
245 for agent in self.agents.values():
246 agent.archive()
247
248 self.update_static_data()
249
250 try:
251
252 client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref'])
253 if os.path.isdir(client_dir):
254 shutil.rmtree(client_dir)
255 os.makedirs(client_dir)
256 except:
257 pass
258 args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))]
259 ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list'))
260 if os.path.exists(ignore_file):
261 args.append(ignore_file)
262
263
264 for agent_name in self.agents.keys():
265 args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name)))
266 args.append(os.path.abspath(client_dir))
267 res = utils.getProcessOutput('/bin/cp', args = args)
268 res.addCallbacks(self._make_archive,
269 lambda x: log.msg(_("/!\ copy failed (%s)\n"
270 "data: %s")
271 % (x, self.config['state_dir'])))
272
274
275 rep_src = "/etc/eole"
276 data = []
277 for src, dst, pattern in md5files:
278 if os.path.isdir(os.path.join(rep_src,src)):
279 fics = os.listdir(os.path.join(rep_src,src))
280 fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics]
281 else:
282 fics = [(src,dst)]
283 for fic, fic_dst in fics:
284 if os.path.isfile(os.path.join(rep_src,fic)):
285 if (pattern is None) or fic.endswith(pattern):
286 md5res = md5file(os.path.join(rep_src,fic))
287 data.append("%s %s\n" % (md5res, fic_dst))
288 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w")
289 outf.writelines(data)
290 outf.close()
291
293 self._check_md5()
294
295 tarball = os.path.join(self.config['uucp_dir'],
296 'site%s.tar' % self.config['host_ref'])
297 res = utils.getProcessOutput('/bin/tar',
298 args = ('czf', tarball,
299 '--exclude', 'private',
300 self.config['tmp_data_dir']))
301 res.addCallbacks(self._try_chown,
302 lambda x: log.msg(_("/!\ archiving failed (%s)\n"
303 "data: %s\narchive: %s")
304 % (x, self.config['state_dir'], tarball)),
305 callbackArgs = [tarball])
306
308 try:
309 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
310 uid = os.getuid()
311 os.chown(tarball, uucp_uid, uucp_gid)
312 except OSError, e:
313 log.msg("/!\ chown error, check authorizations (%s)" % e)
314
315
316 try:
317 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
318 os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid)
319 except:
320 log.msg("/!\ chown error on deffered_logs")
321 os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
322
323
324
325
327 """@return: Liste des agents chargés"""
328 return self.agents.keys()
329 xmlrpc_list_agents.signature = [['array']]
330
332 """@return: Liste des agents chargés et structure d'affichage"""
333 try:
334 menu = {}
335 for name, agent in self.agents.items():
336 if agent.section != None:
337 if not menu.has_key(agent.section):
338 menu[agent.section] = []
339 menu[agent.section].append((name, agent.description))
340 return menu
341 except Exception, e:
342 log.msg(e)
343 xmlrpc_agents_menu.signature = [['struct']]
344
346 """
347 @return: Les statuts des agents listés dans un dictionnaire
348 C{{nom:status}}. Le status est lui-même un dictionnaire avec
349 pour clés C{'level'} et C{'message'}. Seuls les noms d'agents
350 effectivement chargés apparaîtront parmi les clés du
351 dictionnaire.
352 """
353 result = {}
354 if len(agent_name_list) == 0:
355 agent_name_list = self.agents.keys()
356 for agent_name in agent_name_list:
357 if self.agents.has_key(agent_name):
358 result[agent_name] = self.agents[agent_name].check_status().to_dict()
359 return result
360 xmlrpc_status_for_agents.signature = [['string', 'struct']]
361
369
370
374
375
377 """Serves the web interface for current agent data"""
378
379 - def __init__(self, config, parent, root_resource,
380 live_agents=None,
381 show_clients_page=True):
382 """config should be complete"""
383 service.MultiService.__init__(self)
384 self.config = config
385 self.show_clients_page = show_clients_page
386 self.manager = ClientManager(self.config, live_agents)
387
388 self.setServiceParent(service.IServiceCollection(parent))
389
390 rsrc = ZephirServerResource(self.config, self.manager)
391 root_resource.putChild('agents', rsrc)
392 default_page = './agents/'
393 if not self.show_clients_page:
394 default_page += self.config['host_ref'] + '/'
395 root_resource.putChild('', util.Redirect(default_page))
396
397
398
399
400
401
402
403