MythTV  master
MythExternControl.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Copyright (C) John Poet 2018
4  *
5  * This file is part of MythTV
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "MythExternControl.h"
22 #include "mythlogging.h"
23 
24 #include <QFile>
25 #include <QTextStream>
26 
27 #include <unistd.h>
28 #include <poll.h>
29 
30 #include <iostream>
31 
32 using namespace std;
33 
34 const QString VERSION = "0.6";
35 
36 #define LOC Desc()
37 
39  : m_buffer(this)
40  , m_commands(this)
41  , m_run(true)
42  , m_commands_running(true)
43  , m_buffer_running(true)
44  , m_fatal(false)
45  , m_streaming(false)
46  , m_xon(false)
47  , m_ready(false)
48 {
49  setObjectName("Control");
50 
51  m_buffer.Start();
52  m_commands.Start();
53 }
54 
56 {
57  Terminate();
58  m_commands.Join();
59  m_buffer.Join();
60 }
61 
62 Q_SLOT void MythExternControl::Opened(void)
63 {
64  std::lock_guard<std::mutex> lock(m_flow_mutex);
65 
66  m_ready = true;
67  m_flow_cond.notify_all();
68 }
69 
70 Q_SLOT void MythExternControl::Streaming(bool val)
71 {
72  m_streaming = val;
73  m_flow_cond.notify_all();
74 }
75 
77 {
78  emit Close();
79 }
80 
81 Q_SLOT void MythExternControl::Done(void)
82 {
84  {
85  m_run = false;
86  m_flow_cond.notify_all();
87  m_run_cond.notify_all();
88 
89  std::this_thread::sleep_for(std::chrono::microseconds(50));
90 
92  {
93  std::unique_lock<std::mutex> lk(m_flow_mutex);
94  m_flow_cond.wait_for(lk, std::chrono::milliseconds(1000));
95  }
96 
97  LOG(VB_RECORD, LOG_CRIT, LOC + "Terminated.");
98  }
99 }
100 
101 void MythExternControl::Error(const QString & msg)
102 {
103  LOG(VB_RECORD, LOG_CRIT, LOC + msg);
104 
105  std::unique_lock<std::mutex> lk(m_msg_mutex);
106  if (m_errmsg.isEmpty())
107  m_errmsg = msg;
108  else
109  m_errmsg += "; " + msg;
110 }
111 
112 void MythExternControl::Fatal(const QString & msg)
113 {
114  Error(msg);
115  m_fatal = true;
116  Terminate();
117 }
118 
119 Q_SLOT void MythExternControl::SendMessage(const QString & cmd,
120  const QString & serial,
121  const QString & msg)
122 {
123  std::unique_lock<std::mutex> lk(m_msg_mutex);
124  m_commands.SendStatus(cmd, serial, msg);
125 }
126 
127 Q_SLOT void MythExternControl::ErrorMessage(const QString & msg)
128 {
129  std::unique_lock<std::mutex> lk(m_msg_mutex);
130  if (m_errmsg.isEmpty())
131  m_errmsg = msg;
132  else
133  m_errmsg += "; " + msg;
134 }
135 
136 #undef LOC
137 #define LOC QString("%1").arg(m_parent->Desc())
138 
140  : m_thread()
141  , m_parent(parent)
142  , m_apiVersion(-1)
143 {
144 }
145 
147 {
148 }
149 
150 void Commands::Close(void)
151 {
152  std::lock_guard<std::mutex> lock(m_parent->m_flow_mutex);
153 
154  emit m_parent->Close();
155  m_parent->m_ready = false;
156  m_parent->m_flow_cond.notify_all();
157 }
158 
159 void Commands::StartStreaming(const QString & serial)
160 {
161  emit m_parent->StartStreaming(serial);
162 }
163 
164 void Commands::StopStreaming(const QString & serial, bool silent)
165 {
166  emit m_parent->StopStreaming(serial, silent);
167 }
168 
169 void Commands::LockTimeout(const QString & serial) const
170 {
171  emit m_parent->LockTimeout(serial);
172 }
173 
174 void Commands::HasTuner(const QString & serial) const
175 {
176  emit m_parent->HasTuner(serial);
177 }
178 
179 void Commands::HasPictureAttributes(const QString & serial) const
180 {
181  emit m_parent->HasPictureAttributes(serial);
182 }
183 
184 void Commands::SetBlockSize(const QString & serial, int blksz)
185 {
186  emit m_parent->SetBlockSize(serial, blksz);
187 }
188 
189 void Commands::TuneChannel(const QString & serial, const QString & channum)
190 {
191  emit m_parent->TuneChannel(serial, channum);
192 }
193 
194 void Commands::LoadChannels(const QString & serial)
195 {
196  emit m_parent->LoadChannels(serial);
197 }
198 
199 void Commands::FirstChannel(const QString & serial)
200 {
201  emit m_parent->FirstChannel(serial);
202 }
203 
204 void Commands::NextChannel(const QString & serial)
205 {
206  emit m_parent->NextChannel(serial);
207 }
208 
209 bool Commands::SendStatus(const QString & command, const QString & msg)
210 {
211  int len = write(2, msg.toUtf8().constData(), msg.size());
212  write(2, "\n", 1);
213 
214  if (len != msg.size())
215  {
216  LOG(VB_RECORD, LOG_ERR, LOC +
217  QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
218  .arg(command).arg(len).arg(msg.size()).arg(msg));
219  return false;
220  }
221 
222  LOG(VB_RECORD, LOG_INFO, LOC + QString("Processing '%1' --> '%2'")
223  .arg(command).arg(msg));
224 
225  m_parent->ClearError();
226  return true;
227 }
228 
229 bool Commands::SendStatus(const QString & command, const QString & serial,
230  const QString & status)
231 {
232  QString msg = QString("%1:%2").arg(serial).arg(status);
233 
234  int len = write(2, msg.toUtf8().constData(), msg.size());
235  write(2, "\n", 1);
236 
237  if (len != msg.size())
238  {
239  LOG(VB_RECORD, LOG_ERR, LOC +
240  QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
241  .arg(command).arg(len).arg(msg.size()).arg(msg));
242  return false;
243  }
244 
245  if (!command.isEmpty())
246  {
247  LOG(VB_RECORD, LOG_INFO, LOC + QString("Processing '%1' --> '%2'")
248  .arg(command).arg(msg));
249  }
250 #if 0
251  else
252  LOG(VB_RECORD, LOG_INFO, LOC + QString("%1").arg(msg));
253 #endif
254 
255  m_parent->ClearError();
256  return true;
257 }
258 
259 bool Commands::ProcessCommand(const QString & cmd)
260 {
261  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("Processing '%1'").arg(cmd));
262 
263  std::unique_lock<std::mutex> lk1(m_parent->m_msg_mutex);
264 
265  if (cmd.startsWith("APIVersion?"))
266  {
267  if (m_parent->m_fatal)
268  SendStatus(cmd, "ERR:" + m_parent->ErrorString());
269  else
270  SendStatus(cmd, "OK:2");
271  return true;
272  }
273 
274  QStringList tokens = cmd.split(':', QString::SkipEmptyParts);
275  if (tokens.size() < 2)
276  {
277  SendStatus(cmd, "0",
278  QString("0:ERR:Version 2 API expects serial_no:msg format. "
279  "Saw '%1' instead").arg(cmd));
280  return true;
281  }
282 
283  if (tokens[1].startsWith("APIVersion?"))
284  {
285  if (m_parent->m_fatal)
286  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
287  else
288  SendStatus(cmd, tokens[0], "OK:2");
289  }
290  else if (tokens[1].startsWith("APIVersion"))
291  {
292  if (tokens.size() > 1)
293  {
294  m_apiVersion = tokens[2].toInt();
295  SendStatus(cmd, tokens[0], QString("OK:%1").arg(m_apiVersion));
296  }
297  else
298  SendStatus(cmd, tokens[0], "ERR:Missing API Version number");
299  }
300  else if (tokens[1].startsWith("Version?"))
301  {
302  if (m_parent->m_fatal)
303  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
304  else
305  SendStatus(cmd, tokens[0], "OK:" + VERSION);
306  }
307  else if (tokens[1].startsWith("Description?"))
308  {
309  if (m_parent->m_fatal)
310  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
311  else if (m_parent->m_desc.trimmed().isEmpty())
312  SendStatus(cmd, tokens[0], "WARN:Not set");
313  else
314  SendStatus(cmd, tokens[0], "OK:" + m_parent->m_desc.trimmed());
315  }
316  else if (tokens[1].startsWith("HasLock?"))
317  {
318  if (m_parent->m_ready)
319  SendStatus(cmd, tokens[0], "OK:Yes");
320  else
321  SendStatus(cmd, tokens[0], "OK:No");
322  }
323  else if (tokens[1].startsWith("SignalStrengthPercent"))
324  {
325  if (m_parent->m_ready)
326  SendStatus(cmd, tokens[0], "OK:100");
327  else
328  SendStatus(cmd, tokens[0], "OK:20");
329  }
330  else if (tokens[1].startsWith("LockTimeout"))
331  {
332  LockTimeout(tokens[0]);
333  }
334  else if (tokens[1].startsWith("HasTuner"))
335  {
336  HasTuner(tokens[0]);
337  }
338  else if (tokens[1].startsWith("HasPictureAttributes"))
339  {
340  HasPictureAttributes(tokens[0]);
341  }
342  else if (tokens[1].startsWith("SendBytes"))
343  {
344  // Used when FlowControl is Polling
345  SendStatus(cmd, tokens[0], "ERR:Not supported");
346  }
347  else if (tokens[1].startsWith("XON"))
348  {
349  // Used when FlowControl is XON/XOFF
350  if (m_parent->m_streaming)
351  {
352  SendStatus(cmd, tokens[0], "OK");
353  m_parent->m_xon = true;
354  m_parent->m_flow_cond.notify_all();
355  }
356  else
357  SendStatus(cmd, tokens[0], "WARN:Not streaming");
358  }
359  else if (tokens[1].startsWith("XOFF"))
360  {
361  if (m_parent->m_streaming)
362  {
363  SendStatus(cmd, tokens[0], "OK");
364  // Used when FlowControl is XON/XOFF
365  m_parent->m_xon = false;
366  m_parent->m_flow_cond.notify_all();
367  }
368  else
369  SendStatus(cmd, tokens[0], "WARN:Not streaming");
370  }
371  else if (tokens[1].startsWith("TuneChannel"))
372  {
373  if (tokens.size() > 1)
374  TuneChannel(tokens[0], tokens[2]);
375  else
376  SendStatus(cmd, tokens[0], "ERR:Missing channum");
377  }
378  else if (tokens[1].startsWith("LoadChannels"))
379  {
380  LoadChannels(tokens[0]);
381  }
382  else if (tokens[1].startsWith("FirstChannel"))
383  {
384  FirstChannel(tokens[0]);
385  }
386  else if (tokens[1].startsWith("NextChannel"))
387  {
388  NextChannel(tokens[0]);
389  }
390  else if (tokens[1].startsWith("IsOpen?"))
391  {
392  std::unique_lock<std::mutex> lk2(m_parent->m_run_mutex);
393  if (m_parent->m_fatal)
394  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
395  else if (m_parent->m_ready)
396  SendStatus(cmd, tokens[0], "OK:Open");
397  else
398  SendStatus(cmd, tokens[0], "WARN:Not Open yet");
399  }
400  else if (tokens[1].startsWith("CloseRecorder"))
401  {
402  if (m_parent->m_streaming)
403  StopStreaming(tokens[0], true);
404  m_parent->Terminate();
405  SendStatus(cmd, tokens[0], "OK:Terminating");
406  }
407  else if (tokens[1].startsWith("FlowControl?"))
408  {
409  SendStatus(cmd, tokens[0], "OK:XON/XOFF");
410  }
411  else if (tokens[1].startsWith("BlockSize"))
412  {
413  if (tokens.size() > 1)
414  SetBlockSize(tokens[0], tokens[2].toUInt());
415  else
416  SendStatus(cmd, tokens[0], "ERR:Missing block size");
417  }
418  else if (tokens[1].startsWith("StartStreaming"))
419  {
420  StartStreaming(tokens[0]);
421  }
422  else if (tokens[1].startsWith("StopStreaming"))
423  {
424  /* This does not close the stream! When Myth is done with
425  * this 'recording' ExternalChannel::EnterPowerSavingMode()
426  * will be called, which invokes CloseRecorder() */
427  StopStreaming(tokens[0], false);
428  }
429  else
430  SendStatus(cmd, tokens[0],
431  QString("ERR:Unrecognized command '%1'").arg(tokens[1]));
432 
433  return true;
434 }
435 
436 void Commands::Run(void)
437 {
438  setObjectName("Commands");
439 
440  QString cmd;
441  int timeout = 250;
442 
443  int ret;
444  int poll_cnt = 1;
445  struct pollfd polls[2];
446  memset(polls, 0, sizeof(polls));
447 
448  polls[0].fd = 0;
449  polls[0].events = POLLIN | POLLPRI;
450  polls[0].revents = 0;
451 
452  QFile input;
453  input.open(stdin, QIODevice::ReadOnly);
454  QTextStream qtin(&input);
455 
456  LOG(VB_RECORD, LOG_INFO, LOC + "Command parser ready.");
457 
458  while (m_parent->m_run)
459  {
460  ret = poll(polls, poll_cnt, timeout);
461 
462  if (polls[0].revents & POLLHUP)
463  {
464  LOG(VB_RECORD, LOG_ERR, LOC + "poll eof (POLLHUP)");
465  break;
466  }
467  else if (polls[0].revents & POLLNVAL)
468  {
469  m_parent->Fatal("poll error");
470  return;
471  }
472 
473  if (polls[0].revents & POLLIN)
474  {
475  if (ret > 0)
476  {
477  cmd = qtin.readLine();
478  if (!ProcessCommand(cmd))
479  m_parent->Fatal("Invalid command");
480  }
481  else if (ret < 0)
482  {
483  if ((EOVERFLOW == errno))
484  {
485  LOG(VB_RECORD, LOG_ERR, "command overflow");
486  break; // we have an error to handle
487  }
488 
489  if ((EAGAIN == errno) || (EINTR == errno))
490  {
491  LOG(VB_RECORD, LOG_ERR, LOC + "retry command read.");
492  continue; // errors that tell you to try again
493  }
494 
495  LOG(VB_RECORD, LOG_ERR, LOC + "unknown error reading command.");
496  }
497  }
498  }
499 
500  LOG(VB_RECORD, LOG_INFO, LOC + "Command parser: shutting down");
501  m_parent->m_commands_running = false;
502  m_parent->m_flow_cond.notify_all();
503 }
504 
506  : m_parent(parent), m_thread()
507 {
508  m_heartbeat = std::chrono::system_clock::now();
509 }
510 
512 {
513 }
514 
515 bool Buffer::Fill(const QByteArray & buffer)
516 {
517  if (buffer.size() < 1)
518  return false;
519 
520  static int dropped = 0;
521  static int dropped_bytes = 0;
522 
523  m_parent->m_flow_mutex.lock();
524  if (m_data.size() < MAX_QUEUE)
525  {
526  block_t blk(reinterpret_cast<const uint8_t *>(buffer.constData()),
527  reinterpret_cast<const uint8_t *>(buffer.constData())
528  + buffer.size());
529 
530  m_data.push(blk);
531  dropped = 0;
532 
533  LOG(VB_GENERAL, LOG_DEBUG, LOC +
534  QString("Adding %1 bytes").arg(buffer.size()));
535  }
536  else
537  {
538  dropped_bytes += buffer.size();
539  LOG(VB_RECORD, LOG_WARNING, LOC +
540  QString("Packet queue overrun. Dropped %1 packets, %2 bytes.")
541  .arg(++dropped).arg(dropped_bytes));
542 
543  std::this_thread::sleep_for(std::chrono::microseconds(250));
544  }
545 
546  m_parent->m_flow_mutex.unlock();
547  m_parent->m_flow_cond.notify_all();
548 
549  m_heartbeat = std::chrono::system_clock::now();
550 
551  return true;
552 }
553 
554 void Buffer::Run(void)
555 {
556  setObjectName("Buffer");
557 
558  bool is_empty = false;
559  bool wait = false;
560  time_t send_time = time (nullptr) + (60 * 5);
561  uint64_t write_total = 0;
562  uint64_t written = 0;
563  uint64_t write_cnt = 0;
564  uint64_t empty_cnt = 0;
565  uint sz;
566 
567  LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: Ready for data.");
568 
569  while (m_parent->m_run)
570  {
571  {
572  std::unique_lock<std::mutex> lk(m_parent->m_flow_mutex);
573  m_parent->m_flow_cond.wait_for(lk,
574  std::chrono::milliseconds
575  (wait ? 5000 : 25));
576  wait = false;
577  }
578 
579  if (send_time < static_cast<double>(time (nullptr)))
580  {
581  // Every 5 minutes, write out some statistics.
582  send_time = time (nullptr) + (60 * 5);
583  write_total += written;
584  if (m_parent->m_streaming)
585  LOG(VB_RECORD, LOG_NOTICE, LOC +
586  QString("Count: %1, Empty cnt %2, Written %3, Total %4")
587  .arg(write_cnt).arg(empty_cnt)
588  .arg(written).arg(write_total));
589  else
590  LOG(VB_GENERAL, LOG_NOTICE, LOC + "Not streaming.");
591 
592  write_cnt = empty_cnt = written = 0;
593  }
594 
595  if (m_parent->m_streaming)
596  {
597  if (m_parent->m_xon)
598  {
599  block_t pkt;
600  m_parent->m_flow_mutex.lock();
601  if (!m_data.empty())
602  {
603  pkt = m_data.front();
604  m_data.pop();
605  is_empty = m_data.empty();
606  }
607  m_parent->m_flow_mutex.unlock();
608 
609  if (!pkt.empty())
610  {
611  sz = write(1, pkt.data(), pkt.size());
612  written += sz;
613  ++write_cnt;
614 
615  if (sz != pkt.size())
616  {
617  LOG(VB_GENERAL, LOG_WARNING, LOC +
618  QString("Only wrote %1 of %2 bytes to mythbackend")
619  .arg(sz).arg(pkt.size()));
620  }
621  }
622 
623  if (is_empty)
624  {
625  wait = true;
626  ++empty_cnt;
627  }
628  }
629  else
630  wait = true;
631  }
632  else
633  {
634  // Clear packet queue
635  m_parent->m_flow_mutex.lock();
636  if (!m_data.empty())
637  {
638  stack_t dummy;
639  std::swap(m_data, dummy);
640  }
641  m_parent->m_flow_mutex.unlock();
642 
643  wait = true;
644  }
645  }
646 
647  LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: shutting down");
648  m_parent->m_buffer_running = false;
649  m_parent->m_flow_cond.notify_all();
650 }
std::atomic< bool > m_run
std::atomic< bool > m_commands_running
def write(text, progress=True)
Definition: mythburn.py:279
static int swap(VideoFrame *frame, int datasize, int offset, int shift)
Definition: filter_vflip.c:24
MythExternControl * m_parent
void Join(void)
void TuneChannel(const QString &serial, const QString &channum)
VERBOSE_PREAMBLE Most true
Definition: verbosedefs.h:91
void ErrorMessage(const QString &msg)
std::condition_variable m_run_cond
void NextChannel(const QString &serial)
void Run(void)
Buffer(MythExternControl *parent)
void TuneChannel(const QString &serial, const QString &channum)
std::atomic< bool > m_buffer_running
void StopStreaming(const QString &serial, bool silent)
#define LOC
bool SendStatus(const QString &cmd, const QString &status)
void Error(const QString &msg)
unsigned int uint
Definition: compat.h:140
void LockTimeout(const QString &serial) const
bool Fill(const QByteArray &buffer)
void LoadChannels(const QString &serial)
void StopStreaming(const QString &serial, bool silent)
QString ErrorString(void) const
const QString VERSION
void FirstChannel(const QString &serial)
void HasTuner(const QString &serial) const
void Run(void)
VERBOSE_PREAMBLE false
Definition: verbosedefs.h:85
std::condition_variable m_flow_cond
void SetBlockSize(const QString &serial, int blksz)
void Start(void)
std::vector< uint8_t > block_t
void FirstChannel(const QString &serial)
void HasPictureAttributes(const QString &serial) const
std::atomic< bool > m_ready
void SetBlockSize(const QString &serial, int blksz)
void StartStreaming(const QString &serial)
MythExternControl * m_parent
std::atomic< bool > m_streaming
void Close(void)
void Fatal(const QString &msg)
bool ProcessCommand(const QString &cmd)
void Join(void)
void Streaming(bool val)
void HasPictureAttributes(const QString &serial) const
std::queue< block_t > stack_t
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void HasTuner(const QString &serial) const
void Start(void)
void StartStreaming(const QString &serial)
stack_t m_data
void SendMessage(const QString &command, const QString &serial, const QString &msg)
std::chrono::time_point< std::chrono::system_clock > m_heartbeat
void NextChannel(const QString &serial)
void LoadChannels(const QString &serial)
void Close(void)
std::atomic< bool > m_xon
void LockTimeout(const QString &serial) const