WebSocketService.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. <?php
  2. namespace App\Services;
  3. @error_reporting(E_ALL & ~E_NOTICE);
  4. use App\Module\Base;
  5. use App\Module\Chat;
  6. use App\Module\Users;
  7. use App\Tasks\ChromeExtendTask;
  8. use App\Tasks\NotificationTask;
  9. use App\Tasks\PushTask;
  10. use Cache;
  11. use DB;
  12. use Hhxsv5\LaravelS\Swoole\Task\Task;
  13. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  14. use Swoole\Http\Request;
  15. use Swoole\WebSocket\Frame;
  16. use Swoole\WebSocket\Server;
  17. /**
  18. * @see https://wiki.swoole.com/#/start/start_ws_server
  19. */
  20. class WebSocketService implements WebSocketHandlerInterface
  21. {
  22. /**
  23. * 声明没有参数的构造函数
  24. * WebSocketService constructor.
  25. */
  26. public function __construct()
  27. {
  28. }
  29. /**
  30. * 连接建立时触发
  31. * @param Server $server
  32. * @param Request $request
  33. */
  34. public function onOpen(Server $server, Request $request)
  35. {
  36. global $_A;
  37. $_A = [
  38. '__static_langdata' => [],
  39. ];
  40. //判断参数
  41. $fd = $request->fd;
  42. if (!isset($request->get['token'])) {
  43. $server->push($fd, Chat::formatMsgSend([
  44. 'messageType' => 'error',
  45. 'body' => [
  46. 'error' => '参数错误'
  47. ],
  48. ]));
  49. $server->close($fd);
  50. $this->deleteUser($fd);
  51. return;
  52. }
  53. //判断token
  54. $token = $request->get['token'];
  55. $channel = $request->get['channel'] ?: '';
  56. $cacheKey = "ws::token:" . md5($token);
  57. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  58. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  59. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  60. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  61. return $username;
  62. }
  63. }
  64. return null;
  65. });
  66. if (empty($username)) {
  67. Cache::forget($cacheKey);
  68. $server->push($fd, Chat::formatMsgSend([
  69. 'messageType' => 'error',
  70. 'channel' => $channel,
  71. 'body' => [
  72. 'error' => '会员不存在',
  73. ],
  74. ]));
  75. $server->close($fd);
  76. $this->deleteUser($fd);
  77. return;
  78. }
  79. //踢下线
  80. if (in_array($channel, ['ios', 'android'])) {
  81. $userLists = $this->getUser('', $channel, $username);
  82. foreach ($userLists AS $user) {
  83. $server->push($user['fd'], Chat::formatMsgSend([
  84. 'messageType' => 'kick',
  85. 'channel' => $channel,
  86. 'body' => [
  87. 'ip' => Base::getIp(),
  88. 'time' => time(),
  89. 'newfd' => $fd,
  90. ],
  91. ]));
  92. $this->deleteUser($user['fd']);
  93. }
  94. }
  95. //保存用户、发送open事件
  96. Cache::forever("ws::immediatelyNotify-" . $username, "no");
  97. $this->saveUser($fd, $channel, $username);
  98. $server->push($fd, Chat::formatMsgSend([
  99. 'messageType' => 'open',
  100. 'channel' => $channel,
  101. 'body' => [
  102. 'fd' => $fd,
  103. ],
  104. ]));
  105. //发送最后一条未发送的信息
  106. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  107. if ($lastMsg && $lastMsg['roger'] === 0) {
  108. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  109. if (!Base::isError($dialog)) {
  110. $dialog = $dialog['data'];
  111. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  112. $body = Base::string2array($lastMsg['message']);
  113. $body['id'] = $lastMsg['id'];
  114. $body['resend'] = 1;
  115. $body['unread'] = $unread;
  116. $body['username'] = $lastMsg['username'];
  117. $body['userimg'] = Users::userimg($lastMsg['username']);
  118. $body['indate'] = $lastMsg['indate'];
  119. //
  120. $basic = Users::username2basic($lastMsg['username']);
  121. $body['userid'] = $basic ? $basic['userid'] : 0;
  122. $body['nickname'] = $basic ? $basic['nickname'] : ($body['nickname'] || '');
  123. $body['userimg'] = $basic ? $basic['userimg'] : ($body['userimg'] || '');
  124. //
  125. $server->push($fd, Chat::formatMsgSend([
  126. 'messageType' => 'user',
  127. 'contentId' => $lastMsg['id'],
  128. 'channel' => $channel,
  129. 'username' => $lastMsg['username'],
  130. 'target' => $lastMsg['receive'],
  131. 'body' => $body,
  132. 'time' => $lastMsg['indate'],
  133. ]));
  134. }
  135. }
  136. }
  137. /**
  138. * 收到消息时触发
  139. * @param Server $server
  140. * @param Frame $frame
  141. */
  142. public function onMessage(Server $server, Frame $frame)
  143. {
  144. global $_A;
  145. $_A = [
  146. '__static_langdata' => [],
  147. ];
  148. //
  149. $data = Chat::formatMsgReceive($frame->data);
  150. $back = [
  151. 'status' => 1,
  152. 'message' => '',
  153. ];
  154. //
  155. switch ($data['messageType']) {
  156. /**
  157. * APP激活进入前台
  158. */
  159. case 'appActivity':
  160. Cache::forever("ws::immediatelyNotify-" . $data['username'], "no");
  161. break;
  162. /**
  163. * 刷新
  164. */
  165. case 'refresh':
  166. DB::table('ws')->where([
  167. 'fd' => $frame->fd,
  168. 'channel' => $data['channel'],
  169. ])->update(['update' => time()]);
  170. break;
  171. /**
  172. * 总未读消息数
  173. */
  174. case 'unread':
  175. $username = DB::table('ws')->where([
  176. 'fd' => $frame->fd,
  177. 'channel' => $data['channel'],
  178. ])->value('username');
  179. if ($username) {
  180. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  181. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  182. $back['message'] = $num;
  183. } else {
  184. $back['message'] = 0;
  185. }
  186. break;
  187. /**
  188. * 已读会员消息
  189. */
  190. case 'read':
  191. $username = DB::table('ws')->where([
  192. 'fd' => $frame->fd,
  193. 'channel' => $data['channel'],
  194. ])->value('username');
  195. $dialog = Chat::openDialog($username, $data['target']);
  196. if (!Base::isError($dialog)) {
  197. $dialog = $dialog['data'];
  198. $upArray = [];
  199. if ($dialog['user1'] == $dialog['user2']) {
  200. $upArray['unread1'] = 0;
  201. $upArray['unread2'] = 0;
  202. } else {
  203. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  204. }
  205. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  206. }
  207. $chromeExtendTask = new ChromeExtendTask($username);
  208. Task::deliver($chromeExtendTask);
  209. break;
  210. /**
  211. * 收到信息回执
  212. */
  213. case 'roger':
  214. $contentIds = Base::explodeInt(',', $data['contentId']);
  215. if ($contentIds) {
  216. $username = DB::table('ws')->where([
  217. 'fd' => $frame->fd,
  218. 'channel' => $data['channel'],
  219. ])->value('username');
  220. if ($username) {
  221. DB::table('chat_msg')->where('receive', $username)->whereIn('id', $contentIds)->update([
  222. 'roger' => 1,
  223. ]);
  224. }
  225. }
  226. break;
  227. /**
  228. * 发给用户
  229. */
  230. case 'user':
  231. $username = DB::table('ws')->where([
  232. 'fd' => $frame->fd,
  233. 'channel' => $data['channel'],
  234. ])->value('username');
  235. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  236. if (Base::isError($res)) {
  237. $back = [
  238. 'status' => 0,
  239. 'message' => $res['msg'],
  240. ];
  241. } else {
  242. $resData = $res['data'];
  243. $back['message'] = $resData['id'];
  244. $data['contentId'] = $resData['id'];
  245. $data['body']['id'] = $resData['id'];
  246. $data['body']['unread'] = $resData['unread'];
  247. //
  248. $basic = Users::username2basic($username);
  249. $data['body']['userid'] = $basic ? $basic['userid'] : 0;
  250. $data['body']['nickname'] = $basic ? $basic['nickname'] : ($data['body']['nickname'] || '');
  251. $data['body']['userimg'] = $basic ? $basic['userimg'] : ($data['body']['userimg'] || '');
  252. //
  253. $pushLists = [];
  254. foreach ($this->getUserOfName($data['target']) AS $item) {
  255. $pushLists[] = [
  256. 'fd' => $item['fd'],
  257. 'msg' => $data
  258. ];
  259. }
  260. $pushTask = new PushTask($pushLists);
  261. Task::deliver($pushTask);
  262. //
  263. $notificationTask = new NotificationTask($resData['id']);
  264. $notificationTask->delay(Cache::get("ws::immediatelyNotify-" . $data['target']) == "yes" ? 2 : 10);
  265. Task::deliver($notificationTask);
  266. }
  267. break;
  268. /**
  269. * 发给用户(不保存记录)
  270. */
  271. case 'info':
  272. $pushLists = [];
  273. foreach ($this->getUserOfName($data['target']) AS $item) {
  274. $pushLists[] = [
  275. 'fd' => $item['fd'],
  276. 'msg' => $data
  277. ];
  278. }
  279. $pushTask = new PushTask($pushLists);
  280. Task::deliver($pushTask);
  281. break;
  282. /**
  283. * 发给整个团队
  284. */
  285. case 'team':
  286. if ($data['body']['type'] === 'taskA') {
  287. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  288. if ($taskId > 0) {
  289. $userLists = Chat::getTaskUsers($taskId);
  290. } else {
  291. $userLists = $this->getTeamUsers();
  292. }
  293. //
  294. $pushLists = [];
  295. foreach ($userLists as $user) {
  296. $data['messageType'] = 'user';
  297. $data['target'] = $user['username'];
  298. $pushLists[] = [
  299. 'fd' => $user['fd'],
  300. 'msg' => $data
  301. ];
  302. }
  303. $pushTask = new PushTask($pushLists);
  304. Task::deliver($pushTask);
  305. }
  306. break;
  307. /**
  308. * 知识库协作
  309. */
  310. case 'docs':
  311. $back['message'] = [];
  312. $body = $data['body'];
  313. $type = $body['type'];
  314. $sid = intval($body['sid']);
  315. if ($sid <= 0) {
  316. return;
  317. }
  318. $array = Base::json2array(Cache::get("docs::" . $sid));
  319. if ($array) {
  320. foreach ($array as $uname => $vbody) {
  321. if (intval($vbody['indate']) + 20 < time()) {
  322. unset($array[$uname]);
  323. }
  324. }
  325. }
  326. if ($type == 'enter' || $type == 'refresh') {
  327. $array[$body['username']] = $body;
  328. } elseif ($type == 'quit') {
  329. unset($array[$body['username']]);
  330. }
  331. //
  332. Cache::put("docs::" . $sid, Base::array2json($array), 30);
  333. if ($array) {
  334. ksort($array);
  335. }
  336. $back['message'] = array_values($array);
  337. //
  338. if ($type == 'enter' || $type == 'quit') {
  339. $pushLists = [];
  340. foreach ($back['message'] AS $tuser) {
  341. foreach ($this->getUserOfName($tuser['username']) AS $item) {
  342. $pushLists[] = [
  343. 'fd' => $item['fd'],
  344. 'msg' => [
  345. 'messageType' => 'docs',
  346. 'body' => [
  347. 'type' => 'users',
  348. 'sid' => $sid,
  349. 'lists' => $back['message']
  350. ]
  351. ]
  352. ];
  353. }
  354. }
  355. $pushTask = new PushTask($pushLists);
  356. Task::deliver($pushTask);
  357. }
  358. break;
  359. }
  360. if ($data['messageId']) {
  361. $pushLists = [];
  362. $pushLists[] = [
  363. 'fd' => $frame->fd,
  364. 'msg' => [
  365. 'messageType' => 'back',
  366. 'messageId' => $data['messageId'],
  367. 'body' => $back,
  368. ]
  369. ];
  370. $pushTask = new PushTask($pushLists);
  371. Task::deliver($pushTask);
  372. }
  373. }
  374. /**
  375. * 关闭连接时触发
  376. * @param Server $server
  377. * @param $fd
  378. * @param $reactorId
  379. */
  380. public function onClose(Server $server, $fd, $reactorId)
  381. {
  382. $this->deleteUser($fd);
  383. }
  384. /** ****************************************************************************** */
  385. /** ****************************************************************************** */
  386. /** ****************************************************************************** */
  387. /**
  388. * 保存用户
  389. * @param $fd
  390. * @param $channel
  391. * @param $username
  392. */
  393. private function saveUser($fd, $channel, $username)
  394. {
  395. try {
  396. DB::transaction(function () use ($username, $channel, $fd) {
  397. $this->deleteUser($fd);
  398. DB::table('ws')->updateOrInsert([
  399. 'key' => md5($fd . '@' . $channel . '@' . $username)
  400. ], [
  401. 'fd' => $fd,
  402. 'username' => $username,
  403. 'channel' => $channel,
  404. 'update' => time()
  405. ]);
  406. });
  407. } catch (\Throwable $e) {
  408. }
  409. }
  410. /**
  411. * 清除用户
  412. * @param $fd
  413. */
  414. private function deleteUser($fd)
  415. {
  416. DB::table('ws')->where('fd', $fd)->delete();
  417. }
  418. /**
  419. * 获取用户
  420. * @param string $fd
  421. * @param string $channel
  422. * @param string $username
  423. * @return array
  424. */
  425. private function getUser($fd = '', $channel = '', $username = '')
  426. {
  427. $array = [];
  428. if ($fd) $array['fd'] = $fd;
  429. if ($channel) $array['channel'] = $channel;
  430. if ($username) $array['username'] = $username;
  431. if (empty($array)) {
  432. return [];
  433. }
  434. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  435. }
  436. private function getUserOfFd($fd, $channel = '') {
  437. return $this->getUser($fd, $channel);
  438. }
  439. private function getUserOfName($username, $channel = '') {
  440. return $this->getUser('', $channel, $username);
  441. }
  442. /**
  443. * 获取团队所有在线用户
  444. * @return array|string
  445. */
  446. private function getTeamUsers()
  447. {
  448. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  449. ['update', '>', time() - 600],
  450. ])->get());
  451. }
  452. }