WebSocketService.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. <?php
  2. namespace App\Services;
  3. @error_reporting(E_ALL & ~E_NOTICE);
  4. use App\Module\Base;
  5. use App\Module\Chat;
  6. use App\Module\Users;
  7. use App\Tasks\ChromeExtendTask;
  8. use App\Tasks\PushTask;
  9. use Cache;
  10. use DB;
  11. use Hhxsv5\LaravelS\Swoole\Task\Task;
  12. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  13. use Swoole\Http\Request;
  14. use Swoole\WebSocket\Frame;
  15. use Swoole\WebSocket\Server;
  16. /**
  17. * @see https://wiki.swoole.com/#/start/start_ws_server
  18. */
  19. class WebSocketService implements WebSocketHandlerInterface
  20. {
  21. /**
  22. * 声明没有参数的构造函数
  23. * WebSocketService constructor.
  24. */
  25. public function __construct()
  26. {
  27. }
  28. /**
  29. * 连接建立时触发
  30. * @param Server $server
  31. * @param Request $request
  32. */
  33. public function onOpen(Server $server, Request $request)
  34. {
  35. global $_A;
  36. $_A = [
  37. '__static_langdata' => [],
  38. ];
  39. //判断参数
  40. $fd = $request->fd;
  41. if (!isset($request->get['token'])) {
  42. $server->push($fd, Chat::formatMsgSend([
  43. 'messageType' => 'error',
  44. 'body' => [
  45. 'error' => '参数错误'
  46. ],
  47. ]));
  48. $server->close($fd);
  49. $this->deleteUser($fd);
  50. return;
  51. }
  52. //判断token
  53. $token = $request->get['token'];
  54. $channel = $request->get['channel'] ?: '';
  55. $cacheKey = "ws::token:" . md5($token);
  56. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  57. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  58. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  59. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  60. return $username;
  61. }
  62. }
  63. return null;
  64. });
  65. if (empty($username)) {
  66. Cache::forget($cacheKey);
  67. $server->push($fd, Chat::formatMsgSend([
  68. 'messageType' => 'error',
  69. 'channel' => $channel,
  70. 'body' => [
  71. 'error' => '会员不存在',
  72. ],
  73. ]));
  74. $server->close($fd);
  75. $this->deleteUser($fd);
  76. return;
  77. }
  78. //踢下线
  79. /*if ($channel != 'chromeExtend') {
  80. $userLists = $this->getUser('', $channel, $username);
  81. foreach ($userLists AS $user) {
  82. $server->push($user['fd'], Chat::formatMsgSend([
  83. 'messageType' => 'kick',
  84. 'channel' => $channel,
  85. 'body' => [
  86. 'ip' => Base::getIp(),
  87. 'time' => time(),
  88. 'newfd' => $fd,
  89. ],
  90. ]));
  91. $this->deleteUser($user['fd']);
  92. }
  93. }*/
  94. //保存用户、发送open事件
  95. $this->saveUser($fd, $channel, $username);
  96. $server->push($fd, Chat::formatMsgSend([
  97. 'messageType' => 'open',
  98. 'channel' => $channel,
  99. 'body' => [
  100. 'fd' => $fd,
  101. ],
  102. ]));
  103. //发送最后一条未发送的信息
  104. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  105. if ($lastMsg && $lastMsg['roger'] === 0) {
  106. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  107. if (!Base::isError($dialog)) {
  108. $dialog = $dialog['data'];
  109. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  110. $body = Base::string2array($lastMsg['message']);
  111. $body['id'] = $lastMsg['id'];
  112. $body['resend'] = 1;
  113. $body['unread'] = $unread;
  114. $body['username'] = $lastMsg['username'];
  115. $body['userimg'] = Users::userimg($lastMsg['username']);
  116. $body['indate'] = $lastMsg['indate'];
  117. $server->push($fd, Chat::formatMsgSend([
  118. 'messageType' => 'user',
  119. 'contentId' => $lastMsg['id'],
  120. 'channel' => $channel,
  121. 'username' => $lastMsg['username'],
  122. 'target' => $lastMsg['receive'],
  123. 'body' => $body,
  124. 'time' => $lastMsg['indate'],
  125. ]));
  126. }
  127. }
  128. }
  129. /**
  130. * 收到消息时触发
  131. * @param Server $server
  132. * @param Frame $frame
  133. */
  134. public function onMessage(Server $server, Frame $frame)
  135. {
  136. global $_A;
  137. $_A = [
  138. '__static_langdata' => [],
  139. ];
  140. //
  141. $data = Chat::formatMsgReceive($frame->data);
  142. $back = [
  143. 'status' => 1,
  144. 'message' => '',
  145. ];
  146. //
  147. switch ($data['messageType']) {
  148. /**
  149. * 刷新
  150. */
  151. case 'refresh':
  152. DB::table('ws')->where([
  153. 'fd' => $frame->fd,
  154. 'channel' => $data['channel'],
  155. ])->update(['update' => time()]);
  156. break;
  157. /**
  158. * 总未读消息数
  159. */
  160. case 'unread':
  161. $username = DB::table('ws')->where([
  162. 'fd' => $frame->fd,
  163. 'channel' => $data['channel'],
  164. ])->value('username');
  165. if ($username) {
  166. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  167. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  168. $back['message'] = $num;
  169. } else {
  170. $back['message'] = 0;
  171. }
  172. break;
  173. /**
  174. * 已读会员消息
  175. */
  176. case 'read':
  177. $username = DB::table('ws')->where([
  178. 'fd' => $frame->fd,
  179. 'channel' => $data['channel'],
  180. ])->value('username');
  181. $dialog = Chat::openDialog($username, $data['target']);
  182. if (!Base::isError($dialog)) {
  183. $dialog = $dialog['data'];
  184. $upArray = [];
  185. if ($dialog['user1'] == $dialog['user2']) {
  186. $upArray['unread1'] = 0;
  187. $upArray['unread2'] = 0;
  188. } else {
  189. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  190. }
  191. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  192. }
  193. $chromeExtendTask = new ChromeExtendTask($username);
  194. Task::deliver($chromeExtendTask);
  195. break;
  196. /**
  197. * 收到信息回执
  198. */
  199. case 'roger':
  200. if ($data['contentId'] > 0) {
  201. $username = DB::table('ws')->where([
  202. 'fd' => $frame->fd,
  203. 'channel' => $data['channel'],
  204. ])->value('username');
  205. DB::table('chat_msg')->where([
  206. 'id' => $data['contentId'],
  207. 'receive' => $username,
  208. ])->update([
  209. 'roger' => 1,
  210. ]);
  211. }
  212. break;
  213. /**
  214. * 发给用户
  215. */
  216. case 'user':
  217. $username = DB::table('ws')->where([
  218. 'fd' => $frame->fd,
  219. 'channel' => $data['channel'],
  220. ])->value('username');
  221. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  222. if (Base::isError($res)) {
  223. $back = [
  224. 'status' => 0,
  225. 'message' => $res['msg'],
  226. ];
  227. } else {
  228. $resData = $res['data'];
  229. $back['message'] = $resData['id'];
  230. $data['contentId'] = $resData['id'];
  231. $data['body']['id'] = $resData['id'];
  232. $data['body']['unread'] = $resData['unread'];
  233. //
  234. $pushLists = [];
  235. foreach ($this->getUserOfName($data['target']) AS $item) {
  236. $pushLists[] = [
  237. 'fd' => $item['fd'],
  238. 'msg' => $data
  239. ];
  240. }
  241. $pushTask = new PushTask($pushLists);
  242. Task::deliver($pushTask);
  243. }
  244. break;
  245. /**
  246. * 发给用户(不保存记录)
  247. */
  248. case 'info':
  249. $pushLists = [];
  250. foreach ($this->getUserOfName($data['target']) AS $item) {
  251. $pushLists[] = [
  252. 'fd' => $item['fd'],
  253. 'msg' => $data
  254. ];
  255. }
  256. $pushTask = new PushTask($pushLists);
  257. Task::deliver($pushTask);
  258. break;
  259. /**
  260. * 发给整个团队
  261. */
  262. case 'team':
  263. if ($data['body']['type'] === 'taskA') {
  264. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  265. if ($taskId > 0) {
  266. $userLists = $this->getTaskUsers($taskId);
  267. } else {
  268. $userLists = $this->getTeamUsers();
  269. }
  270. //
  271. $pushLists = [];
  272. foreach ($userLists as $user) {
  273. $data['messageType'] = 'user';
  274. $data['target'] = $user['username'];
  275. $pushLists[] = [
  276. 'fd' => $user['fd'],
  277. 'msg' => $data
  278. ];
  279. }
  280. $pushTask = new PushTask($pushLists);
  281. Task::deliver($pushTask);
  282. }
  283. break;
  284. /**
  285. * 知识库协作
  286. */
  287. case 'docs':
  288. $back['message'] = [];
  289. $body = $data['body'];
  290. $type = $body['type'];
  291. $sid = intval($body['sid']);
  292. if ($sid <= 0) {
  293. return;
  294. }
  295. $array = Base::json2array(Cache::get("docs::" . $sid));
  296. if ($array) {
  297. foreach ($array as $uname => $vbody) {
  298. if (intval($vbody['indate']) + 20 < time()) {
  299. unset($array[$uname]);
  300. }
  301. }
  302. }
  303. if ($type == 'enter' || $type == 'refresh') {
  304. $array[$body['username']] = $body;
  305. } elseif ($type == 'quit') {
  306. unset($array[$body['username']]);
  307. }
  308. //
  309. Cache::put("docs::" . $sid, Base::array2json($array), 30);
  310. ksort($array);
  311. $back['message'] = array_values($array);
  312. //
  313. if ($type == 'enter' || $type == 'quit') {
  314. $pushLists = [];
  315. foreach ($back['message'] AS $tuser) {
  316. foreach ($this->getUserOfName($tuser['username']) AS $item) {
  317. $pushLists[] = [
  318. 'fd' => $item['fd'],
  319. 'msg' => [
  320. 'messageType' => 'docs',
  321. 'body' => [
  322. 'type' => 'users',
  323. 'sid' => $sid,
  324. 'lists' => $back['message']
  325. ]
  326. ]
  327. ];
  328. }
  329. }
  330. $pushTask = new PushTask($pushLists);
  331. Task::deliver($pushTask);
  332. }
  333. break;
  334. }
  335. if ($data['messageId']) {
  336. $pushLists = [];
  337. $pushLists[] = [
  338. 'fd' => $frame->fd,
  339. 'msg' => [
  340. 'messageType' => 'back',
  341. 'messageId' => $data['messageId'],
  342. 'body' => $back,
  343. ]
  344. ];
  345. $pushTask = new PushTask($pushLists);
  346. Task::deliver($pushTask);
  347. }
  348. }
  349. /**
  350. * 关闭连接时触发
  351. * @param Server $server
  352. * @param $fd
  353. * @param $reactorId
  354. */
  355. public function onClose(Server $server, $fd, $reactorId)
  356. {
  357. $this->deleteUser($fd);
  358. }
  359. /** ****************************************************************************** */
  360. /** ****************************************************************************** */
  361. /** ****************************************************************************** */
  362. /**
  363. * 保存用户
  364. * @param $fd
  365. * @param $channel
  366. * @param $username
  367. */
  368. private function saveUser($fd, $channel, $username)
  369. {
  370. try {
  371. DB::transaction(function () use ($username, $channel, $fd) {
  372. $this->deleteUser($fd);
  373. DB::table('ws')->updateOrInsert([
  374. 'key' => md5($fd . '@' . $channel . '@' . $username)
  375. ], [
  376. 'fd' => $fd,
  377. 'username' => $username,
  378. 'channel' => $channel,
  379. 'update' => time()
  380. ]);
  381. });
  382. } catch (\Throwable $e) {
  383. }
  384. }
  385. /**
  386. * 清除用户
  387. * @param $fd
  388. */
  389. private function deleteUser($fd)
  390. {
  391. DB::table('ws')->where('fd', $fd)->delete();
  392. }
  393. /**
  394. * 获取用户
  395. * @param string $fd
  396. * @param string $channel
  397. * @param string $username
  398. * @return array
  399. */
  400. private function getUser($fd = '', $channel = '', $username = '')
  401. {
  402. $array = [];
  403. if ($fd) $array['fd'] = $fd;
  404. if ($channel) $array['channel'] = $channel;
  405. if ($username) $array['username'] = $username;
  406. if (empty($array)) {
  407. return [];
  408. }
  409. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  410. }
  411. private function getUserOfFd($fd, $channel = '') {
  412. return $this->getUser($fd, $channel);
  413. }
  414. private function getUserOfName($username, $channel = '') {
  415. return $this->getUser('', $channel, $username);
  416. }
  417. /**
  418. * 获取团队用户
  419. * @return array|string
  420. */
  421. private function getTeamUsers()
  422. {
  423. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  424. ['update', '>', time() - 600],
  425. ])->get());
  426. }
  427. /**
  428. * 获取跟任务有关系的用户(关注的、在项目里的、负责人、创建者)
  429. * @param $taskId
  430. * @return array
  431. */
  432. private function getTaskUsers($taskId)
  433. {
  434. $taskDeatil = Base::DBC2A(DB::table('project_task')->select(['follower', 'createuser', 'username', 'projectid'])->where('id', $taskId)->first());
  435. if (empty($taskDeatil)) {
  436. return [];
  437. }
  438. //关注的用户
  439. $userArray = Base::string2array($taskDeatil['follower']);
  440. //创建者
  441. $userArray[] = $taskDeatil['createuser'];
  442. //负责人
  443. $userArray[] = $taskDeatil['username'];
  444. //在项目里的用户
  445. if ($taskDeatil['projectid'] > 0) {
  446. $tempLists = Base::DBC2A(DB::table('project_users')->select(['username'])->where(['projectid' => $taskDeatil['projectid'], 'type' => '成员' ])->get());
  447. foreach ($tempLists AS $item) {
  448. $userArray[] = $item['username'];
  449. }
  450. }
  451. //
  452. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  453. ['update', '>', time() - 600],
  454. ])->whereIn('username', array_values(array_unique($userArray)))->get());
  455. }
  456. }