|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using System.Threading;
- using System.Diagnostics;
- using System.IO;
- using System.Transactions;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System.Data.SQLite;
-
- namespace RDH.PharmacyPlatform.Sync.Core
- {
- public class Bootstrap
- {
- #region Fields
- #region Rabbitmq fields
- private static ConnectionFactory _connectionFactory;
- //客户端到MQ服务器的连接
- private static IConnection _connection;
- //与连接对象关联的通信通道
- private static IModel _channel;
- private static IBasicProperties _channelProperties;
- //表示通信的消息队列
- private static QueueDeclareOk _queue;
- private static EventingBasicConsumer _cusumer;
- #endregion
- private static Config _config;
- private static IMessageHandler _messageHandler;
- private static ILogger _logger;
- private static String _dbFileFullName;
- private static String _dbConnectionString;
- private static Thread _threadKeepMessageOut;
- private static Thread _threadKeepMessageIn;
- private static Boolean _isInited;
- private const String _ConfigFilePath = "sync.config";
- private const String _DbFilePath = "sync.db";
- #endregion
-
- #region Constructors
- static Bootstrap()
- {
- _dbFileFullName = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _DbFilePath);
- _dbConnectionString = $"Data Source={_dbFileFullName};Version=3;";
- }
- ~Bootstrap()
- {
- Exit();
- }
- #endregion
-
- #region Properties
- public static Config Config
- {
- get => _config;
- internal set
- {
- _config = value;
- }
- }
- #region Internal
- internal static ConnectionFactory MqConnectionFacotry
- {
- get => _connectionFactory;
- }
- internal static IModel MqChannel
- {
- get => _channel;
- }
- internal static IBasicProperties ChannelProperties
- {
- get => _channelProperties;
- }
- internal static IMessageHandler MessageHandler
- {
- get => _messageHandler;
- }
- internal static String DbFileFullName
- {
- get => _dbFileFullName;
- }
- internal static String DbConnectionString
- {
- get => _dbConnectionString;
- }
- #endregion
- #endregion
-
- #region Methods
- #region Public
- /// <summary>
- /// 根据配置文件进行初始化
- /// </summary>
- /// <returns>错误信息</returns>
- public static String Init()
- {
- if (!File.Exists(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _ConfigFilePath)))
- {
- return "配置文件不存在";
- }
- String configContent = File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _ConfigFilePath));
- Config config = null;
- //try
- {
- config = JsonSerializer.DeserializeObject<Config>(configContent);
- }
- //catch (Exception ex)
- //{
- // Log($"{ex.Message}\n{ex.StackTrace}");
- // return "配置格式不正确:" + ex.Message;
- //}
- return Init(config);
- }
- /// <summary>
- /// 根据指定的配置进行初始化
- /// </summary>
- /// <returns>错误信息</returns>
- public static String Init(Config config)
- {
- InitDatabase();
- if (_config != null)
- {
- return null;
- }
- if (config == null)
- {
- return "参数不能为空";
- }
- _config = config;
- #region 初始化连接基础参数
- if (_connectionFactory == null)
- {
- _connectionFactory = new ConnectionFactory();
- _connectionFactory.UserName = config.ServerUserName;
- _connectionFactory.Password = config.ServerUserPassword;
- _connectionFactory.HostName = config.ServerIpHost;
- _connectionFactory.Port = config.ServerSyncPort;
- //开启自动恢复连接
- _connectionFactory.AutomaticRecoveryEnabled = true;
- }
- #endregion
- try
- {
- #region 创建连接对象并初始化参数、订阅等
- _connection = Bootstrap.MqConnectionFacotry.CreateConnection();
- _channel = _connection.CreateModel();
- _channelProperties = _channel.CreateBasicProperties();
- _channelProperties.DeliveryMode = 2;
- _channelProperties.ContentType = "text/plain";
- _channel.BasicReturn += _channel_BasicReturn;
- _channel.BasicAcks += _channel_BasicAcks;
- _channel.BasicNacks += _channel_BasicNacks;
- _queue = _channel.QueueDeclare(Bootstrap.Config.LocalNodeId, true, true, false);
- _cusumer = new EventingBasicConsumer(_channel);
- //autoAck
- // true:表示消息服务器收到消息后,自动生成消息回复
- // false:表示消费端收到信息后,需要主动进行回复
- _channel.BasicConsume(_queue.QueueName, false, _cusumer);
- _cusumer.Received += _cusumer_Received;
- #endregion
- _isInited = true;
- }
- catch (Exception ex)
- {
- _config = null;
- Log($"{ex.Message}\n{ex.StackTrace}");
- return ex.Message;
- }
- _threadKeepMessageIn = new Thread(ThreadKeepMessageInHandler);
- _threadKeepMessageIn.IsBackground = true;
- _threadKeepMessageIn.Start();
- _threadKeepMessageOut = new Thread(ThreadKeepMessageOutHandler);
- _threadKeepMessageOut.IsBackground = true;
- _threadKeepMessageOut.Start();
- return null;
- }
-
- public static void Exit()
- {
- _isInited = false;
- CloseConnection();
- }
- /// <summary>
- /// 注册消息处理器
- /// </summary>
- public static void RegisteMessageHandler(IMessageHandler handler)
- {
- _messageHandler = handler;
- }
- /// <summary>
- /// 注册日志处理器
- /// </summary>
- public static void RegisteLogger(ILogger logger)
- {
- _logger = logger;
- }
- #endregion
- #region Internal
- //初始化本地数据库
- internal static void InitDatabase()
- {
- if (!File.Exists(_dbFileFullName))
- {
- Boolean createFlag = true;
- Log("创建消息数据库");
- try
- {
- #region 创建数据库
- SQLiteConnection.CreateFile(_dbFileFullName);
- Log("创建消息表");
- using (SQLiteConnection conn = new SQLiteConnection(_dbConnectionString))
- {
- conn.Open();
- SQLiteCommand cmd = conn.CreateCommand();
- cmd.CommandType = System.Data.CommandType.Text;
- cmd.CommandText = "CREATE TABLE T_Message_Data_In ("
- + " Create_Time DATETIME NOT NULL,"
- + " Source_Id VARCHAR(50) NOT NULL,"
- + " Message_Content VARCHAR(5000),"
- + " Message_Content_Type VARCHAR(50),"
- + " Filter_Type INTEGER(1) NOT NULL,"
- + " Target_Id VARCHAR(50) NOT NULL)";
- cmd.ExecuteNonQuery();
- cmd.CommandText = "CREATE TABLE T_Message_Data_Out ("
- + " Create_Time DATETIME NOT NULL,"
- + " Source_Id VARCHAR(50) NOT NULL,"
- + " Message_Content VARCHAR(5000),"
- + " Message_Content_Type VARCHAR(50),"
- + " Filter_Type INTEGER(1) NOT NULL,"
- + " Target_Id VARCHAR(50) NOT NULL)";
- cmd.ExecuteNonQuery();
- cmd.CommandText = "CREATE TABLE T_Nodes ("
- + " Node_Id VARCHAR(50) PRIMARY KEY,"
- + " Group_Name VARCHAR(50),"
- + " Create_Time DATETIME)";
- cmd.ExecuteNonQuery();
- }
- #endregion
- }
- catch (Exception ex)
- {
- createFlag = false;
- Log($"{ex.Message}\n{ex.StackTrace}");
- }
- if (!createFlag)
- {
- try
- {
- if (File.Exists(_dbFileFullName))
- {
- File.Delete(_dbFileFullName);
- }
- }
- catch
- {
- }
- }
- }
- }
- internal static void Log(String content)
- {
- if (_logger != null)
- {
- _logger.Log(content);
- }
- else
- {
- Debug.WriteLine(String.Format("{0} TID:{1} Log:{2}",
- DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
- Thread.CurrentThread.ManagedThreadId.ToString(),
- content));
- }
- }
- #endregion
- #region Private
- private async static void _cusumer_Received(object sender, BasicDeliverEventArgs e)
- {
- Log("_cusumer_Received");
- if (Bootstrap.MessageHandler != null)
- {
- try
- {
- String content = Encoding.UTF8.GetString(e.Body.ToArray());
- Log($"Content:{content}");
- MessageData data = JsonSerializer.DeserializeObject<MessageData>(content);
- //接收到消息后即时将消息持久化到本地
- using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString))
- {
- MessageDataInDAL messageDataInDAL = new MessageDataInDAL();
- messageDataInDAL.InsertModel(data);
- }
- _channel.BasicAck(e.DeliveryTag, false);
- }
- catch (Exception ex)
- {
- Bootstrap.Log($"{ex.Message}\n{ex.StackTrace}");
- }
- }
- else
- {
- Log("未注册MessageHandler");
- }
- }
- private static void _channel_BasicAcks(object sender, BasicAckEventArgs e)
- {
- Log("_channel_BasicAcks");
- }
- private static void _channel_BasicNacks(object sender, BasicNackEventArgs e)
- {
- Log("_channel_BasicNacks");
- }
- //处理消息发送后,未找到目标的处理
- private static void _channel_BasicReturn(object sender, BasicReturnEventArgs e)
- {
- Log("_channel_BasicReturn");
- MessageData data = JsonSerializer.DeserializeObject<MessageData>(Encoding.UTF8.GetString(e.Body.ToArray()));
- data.CreateTime = DateTime.Now;
- MessageClient.SendMessage(data);
- }
- //本地消息发送的线程处理
- private static void ThreadKeepMessageOutHandler()
- {
- MessageDataOutDAL messageDataOutDAL = new MessageDataOutDAL();
- while (_isInited)
- {
- MessageData messageOut = null;
- //Debug.WriteLine($"打开连接, id:{Thread.CurrentThread.ManagedThreadId}");
- using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString))
- {
- try
- {
- do
- {
- messageOut = messageDataOutDAL.Peek();
- if (messageOut != null)
- {
- Debug.WriteLine($"读取到待发送的消息, id:{Thread.CurrentThread.ManagedThreadId}");
- String sendState = MessageClient.InternalSendMessage(messageOut);
- if (sendState == null)
- {
- Debug.WriteLine($"发送成功,删除消息, id:{Thread.CurrentThread.ManagedThreadId}");
- //Boolean state = Bootstrap.MqChannel.WaitForConfirms();
- messageDataOutDAL.DeleteModel(messageOut);
- }
- else
- {
- Log("发送消息失败:" + sendState);
- }
- Thread.Sleep(1);
- }
- else
- {
- //Debug.WriteLine($"没有待发送的消息, id:{Thread.CurrentThread.ManagedThreadId}");
- break;
- }
- } while (true);
- }
- catch (Exception ex)
- {
- Log($"{ex.Message}\n{ex.StackTrace}");
- }
- }
- Thread.Sleep(1000);
- }
- }
- //本地消息响应的线程处理
- private static void ThreadKeepMessageInHandler()
- {
- Debug.WriteLine($"ThreadKeepMessageInHandler start, id:{Thread.CurrentThread.ManagedThreadId}");
- MessageDataInDAL messageDataInDAL = new MessageDataInDAL();
- while (_isInited)
- {
- MessageData messageIn = null;
- try
- {
- using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString))
- {
- do
- {
- messageIn = messageDataInDAL.Peek();
- if (messageIn != null)
- {
- Debug.WriteLine($"读取到待处理的消息, id:{Thread.CurrentThread.ManagedThreadId}");
- String handleState = _messageHandler.ReceivedMessage(messageIn).Result;
- Debug.WriteLine($"处理消息 end, id:{Thread.CurrentThread.ManagedThreadId}");
- if (handleState == null)
- {
- Debug.WriteLine($"处理成功,删除消息, id:{Thread.CurrentThread.ManagedThreadId}");
- messageDataInDAL.DeleteModel(messageIn);
- }
- else
- {
- MessageData copy = new MessageData()
- {
- CreateTime = DateTime.Now,
- MessageContent = messageIn.MessageContent,
- MessageContentType = messageIn.MessageContentType,
- FilterType = messageIn.FilterType,
- SourceId = messageIn.SourceId,
- TargetId = messageIn.TargetId,
- };
- conn.BeginTransaction();
- messageDataInDAL.DeleteModel(messageIn);
- messageDataInDAL.InsertModel(copy);
- conn.CommitTransaction();
- }
- Thread.Sleep(1);
- }
- else
- {
- //Debug.WriteLine($"没有待处理的消息, id:{Thread.CurrentThread.ManagedThreadId}");
- break;
- }
- } while (true);
- }
- }
- catch (Exception ex)
- {
- Log($"{ex.Message}\n{ex.StackTrace}");
- }
- Thread.Sleep(1000);
- }
- }
- private static void CloseConnection()
- {
- if (_channel != null)
- {
- _channel.Close();
- _channel = null;
- }
- if (_connection != null)
- {
- _connection.Close();
- _connection = null;
- }
- }
- #endregion
- #endregion
- }
- }
|