Statistics
| Branch: | Tag: | Revision:

chatsecureios / ChatSecure / Classes / Controllers / MessageQueueHandler.swift @ 9a71f7ed

History | View | Annotate | Download (27.3 KB)

1
//
2
//  MessageQueueHandler.swift
3
//  ChatSecure
4
//
5
//  Created by David Chiles on 5/4/16.
6
//  Copyright © 2016 Chris Ballinger. All rights reserved.
7
//
8

    
9
import Foundation
10
import YapTaskQueue
11
import XMPPFramework
12
import CocoaLumberjack
13

    
14
private class OutstandingActionInfo: Hashable, Equatable {
15
    let action:YapTaskQueueAction
16
    let timer:Timer?
17
    let completion:((_ success: Bool, _ retryTimeout: TimeInterval) -> Void)
18
    
19
    public init(action:YapTaskQueueAction,timer:Timer?,completion:@escaping ((_ success: Bool, _ retryTimeout: TimeInterval) -> Void)) {
20
        self.action = action
21
        self.timer = timer
22
        self.completion = completion
23
    }
24
    
25
    /// Needed so we can store the struct in a dictionary
26
    var hashValue: Int {
27
        get {
28
            return action.yapKey().hashValue
29
        }
30
    }
31
}
32

    
33
private func ==(lhs: OutstandingActionInfo, rhs: OutstandingActionInfo) -> Bool {
34
    return lhs.action.yapKey() == rhs.action.yapKey()
35
}
36

    
37
/// This is just small struct to store the necessary inormation about a message while we wait for delegate callbacks from the XMPPStream
38
private struct OutstandingMessageInfo {
39
    let messageKey:String
40
    let messageCollection:String
41
    let messageSecurity:OTRMessageTransportSecurity
42
    let timer:Timer?
43
    let completion:((_ success: Bool, _ retryTimeout: TimeInterval) -> Void)
44
}
45

    
46
/// Needed so we can store the struct in a dictionary
47
extension OutstandingMessageInfo: Hashable {
48
    var hashValue: Int {
49
        get {
50
            return "\(self.messageKey)\(self.messageCollection)".hashValue
51
        }
52
    }
53
}
54

    
55
extension OutstandingMessageInfo: Equatable {}
56
private func ==(lhs: OutstandingMessageInfo, rhs: OutstandingMessageInfo) -> Bool {
57
    return lhs.messageKey == rhs.messageKey && lhs.messageCollection == rhs.messageCollection
58
}
59

    
60
public class MessageQueueHandler:NSObject {
61
    
62
    public var accountRetryTimeout:TimeInterval = 30
63
    public var otrTimeout:TimeInterval = 7
64
    public var messageRetryTimeout:TimeInterval = 10
65
    public var maxFailureCount:UInt = 2
66
    
67
    let operationQueue = OperationQueue()
68
    let databaseConnection:YapDatabaseConnection
69
    fileprivate var outstandingMessages = [String:OutstandingMessageInfo]()
70
    fileprivate var outstandingBuddies = [String:OutstandingMessageInfo]()
71
    fileprivate var outstandingAccounts = [String:Set<OutstandingActionInfo>]()
72
    fileprivate let isolationQueue = DispatchQueue(label: "MessageQueueHandler-IsolationQueue", attributes: [])
73
    fileprivate var accountLoginNotificationObserver:NSObjectProtocol?
74
    fileprivate var messageStateDidChangeNotificationObserver:NSObjectProtocol?
75
    
76
    @objc public init(dbConnection:YapDatabaseConnection) {
77
        self.databaseConnection = dbConnection
78
        self.operationQueue.maxConcurrentOperationCount = 1
79
        super.init()
80
        self.accountLoginNotificationObserver = NotificationCenter.default.addObserver(forName: NSNotification.Name(rawValue: kOTRProtocolLoginSuccess), object: nil, queue: self.operationQueue, using: { [weak self] (notification) in
81
            self?.handleAccountLoginNotification(notification)
82
        })
83
        self.messageStateDidChangeNotificationObserver = NotificationCenter.default.addObserver(forName: NSNotification.Name.OTRMessageStateDidChange, object: nil, queue: self.operationQueue) {[weak self] (notification) in
84
            self?.handleMessageStateDidChangeNotification(notification)
85
        }
86
    }
87
    
88
    deinit {
89
        if let observer = self.accountLoginNotificationObserver {
90
            NotificationCenter.default.removeObserver(observer)
91
        }
92
        
93
        if let observer = self.messageStateDidChangeNotificationObserver {
94
            NotificationCenter.default.removeObserver(observer)
95
        }
96
        
97
    }
98
    
99
    //MARK: Access to outstanding messages and account
100
    
101
    fileprivate func waitingForAccount(_ accountString:String,action:OutstandingActionInfo) {
102
        
103
        self.isolationQueue.async {
104
            
105
            // Get the set out or create a new one
106
            var actionSet = self.outstandingAccounts[accountString]
107
            if actionSet == nil {
108
                actionSet = Set<OutstandingActionInfo>()
109
            }
110
            
111
            // Guarantee set is real
112
            guard var set = actionSet else {
113
                return
114
            }
115
            // Add new item
116
            set.insert(action)
117
            //Insert back into dictionary
118
            self.outstandingAccounts.updateValue(set, forKey: accountString)
119
        }
120
    }
121
    
122
    fileprivate func popWaitingAccount(_ accountString:String) -> Set<OutstandingActionInfo>? {
123
        var actionSet:Set<OutstandingActionInfo>? = nil
124
        self.isolationQueue.sync {
125
            actionSet = self.outstandingAccounts.removeValue(forKey: accountString)
126
        }
127
        
128
        return actionSet
129
    }
130
    
131
    fileprivate func waitingForBuddy(_ buddyKey:String,messageKey:String, messageCollection:String, messageSecurity:OTRMessageTransportSecurity, timer:Timer,completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
132
        
133
        let messageInfo = OutstandingMessageInfo(messageKey: messageKey, messageCollection: messageCollection,messageSecurity:messageSecurity, timer:nil, completion: completion)
134
        
135
        self.isolationQueue.async {
136
            self.outstandingBuddies.updateValue(messageInfo, forKey: buddyKey)
137
        }
138
    }
139
    
140
    fileprivate func popWaitingBuddy(_ buddyKey:String) -> OutstandingMessageInfo? {
141
        var messageInfo:OutstandingMessageInfo? = nil
142
        self.isolationQueue.sync {
143
            messageInfo = self.outstandingBuddies.removeValue(forKey: buddyKey)
144
        }
145
        return messageInfo
146
    }
147
    
148
    fileprivate func waitingForMessage(_ messageKey:String,messageCollection:String,messageSecurity:OTRMessageTransportSecurity,completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
149
        let messageInfo = OutstandingMessageInfo(messageKey: messageKey, messageCollection: messageCollection, messageSecurity:messageSecurity, timer:nil, completion: completion)
150
        let key = "\(messageKey)\(messageCollection)"
151
        
152
        self.isolationQueue.async {
153
            self.outstandingMessages.updateValue(messageInfo, forKey: key)
154
        }
155
    }
156
    
157
    /**
158
     * Remove a waiting message info from the outstaning message dictionary. After the message info is removed the completion block should be called.
159
     * This ensures that the outstandingMessages dictionary is accessed from the correct queue.
160
     *
161
     * - parameter messageKey: The yap database messsage key.
162
     * - parameter messageCollection: The yap database message key.
163
     * - returns: The OutstandingMessageInfo if one exists. Removed from the waiting dictioanry.
164
     */
165
    fileprivate func popWaitingMessage(_ messageKey:String,messageCollection:String) -> OutstandingMessageInfo? {
166
        var messageInfo:OutstandingMessageInfo? = nil
167
        let key = "\(messageKey)\(messageCollection)"
168
        self.isolationQueue.sync {
169
            messageInfo = self.outstandingMessages.removeValue(forKey: key)
170
        }
171
        
172
        return messageInfo
173
    }
174
    
175
    //MARK: Database Functions
176
    
177
    fileprivate func fetchMessage(_ key:String, collection:String, transaction:YapDatabaseReadTransaction) -> OTRMessageProtocol? {
178
        guard let message = transaction.object(forKey: key, inCollection: collection) as? OTRMessageProtocol else {
179
            return nil
180
        }
181
        return message
182
    }
183
    
184
    fileprivate func fetchSendingAction(_ messageKey:String, messageCollection:String, transaction:YapDatabaseReadTransaction) -> OTRYapMessageSendAction? {
185
        let key = OTRYapMessageSendAction.actionKey(forMessageKey: messageKey, messageCollection: messageCollection)
186
        guard let action = OTRYapMessageSendAction.fetchObject(withUniqueID: key, transaction: transaction) else {
187
            return nil
188
        }
189
        return action
190
    }
191
    
192
    //MARK: XMPPManager functions
193
    
194
    fileprivate func sendMessage(_ outstandingMessage:OutstandingMessageInfo) {
195
        self.operationQueue.addOperation { [weak self] in
196
            guard let strongSelf = self else { return }
197
            var msgAction:OTRYapMessageSendAction? = nil
198
            strongSelf.databaseConnection.read({ (transaction) in
199
                msgAction = strongSelf.fetchSendingAction(outstandingMessage.messageKey, messageCollection: outstandingMessage.messageCollection, transaction: transaction)
200
            })
201
            
202
            guard let action = msgAction else {
203
                outstandingMessage.completion(true, 0.0)
204
                return
205
            }
206
            
207
            strongSelf.sendMessage(action, completion: outstandingMessage.completion)
208
        }
209
    }
210
    
211
    fileprivate func sendDirectMessage(_ message: OTROutgoingMessage,
212
                                       buddy: OTRXMPPBuddy,
213
                                       account: OTRAccount,
214
                                       accountProtocol: OTRXMPPManager,
215
                                       messageSendingAction:OTRYapMessageSendAction,
216
                                       completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
217
        switch message.messageSecurity {
218
        case .plaintext:
219
            self.waitingForMessage(message.uniqueId, messageCollection: message.messageCollection, messageSecurity:message.messageSecurity, completion: completion)
220
            OTRProtocolManager.sharedInstance().send(message)
221
            break
222
        case .plaintextWithOTR:
223
            self.sendOTRMessage(message: message, buddyKey: buddy.uniqueId, buddyUsername: buddy.username, accountUsername: account.username, accountProtocolStrintg: account.protocolTypeString(), requiresActiveSession: false, completion: completion)
224
            break
225
        case .OTR:
226
            self.sendOTRMessage(message: message, buddyKey: buddy.uniqueId, buddyUsername: buddy.username, accountUsername: account.username, accountProtocolStrintg: account.protocolTypeString(), requiresActiveSession: true, completion: completion)
227
            break
228
        case .OMEMO:
229
            self.sendOMEMOMessage(message: message, accountProtocol: accountProtocol, completion: completion)
230
            break
231
        case .invalid:
232
            fatalError("Invalid message security. This should never happen... so let's crash!")
233
            break
234
        }
235
    }
236
    
237
    fileprivate func sendGroupMessage(_ message: OTRXMPPRoomMessage,
238
                                      thread: OTRThreadOwner,
239
                                      account: OTRAccount,
240
                                      accountProtocol: OTRXMPPManager,
241
                                      messageSendingAction:OTRYapMessageSendAction,
242
                                      completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
243
        let roomManager = accountProtocol.roomManager
244
        guard let roomJidString = message.roomJID,
245
            let roomJid = XMPPJID(string: roomJidString),
246
            let room = roomManager.room(for: roomJid) else {
247
            // Can't send a message to nowhere...
248
            DDLogError("Cannot send group message to nowhere \(message)")
249
            completion(true, 0.0)
250
            return
251
        }
252
        self.waitingForMessage(message.uniqueId, messageCollection: message.messageCollection, messageSecurity:message.messageSecurity, completion: completion)
253
        room.send(message)
254
        databaseConnection.readWrite { transaction in
255
            if let sentMessage = message.refetch(with: transaction) {
256
                sentMessage.state = .pendingSent
257
                sentMessage.save(with: transaction)
258
            }
259
        }
260
        completion(true, 0.0)
261
    }
262
    
263
    fileprivate func sendMessage(_ messageSendingAction:OTRYapMessageSendAction, completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
264
        
265
        let messageKey = messageSendingAction.messageKey
266
        let messageCollection = messageSendingAction.messageCollection
267
        var anyMessage: OTRMessageProtocol? = nil
268
        self.databaseConnection.read { (transaction) in
269
            anyMessage = self.fetchMessage(messageKey, collection: messageCollection, transaction: transaction)
270
        }
271
        
272
        guard let message = anyMessage else {
273
            // Somehow we have an action without a message. This is very strange. Do not like.
274
            // We tell the queue broker that we handle it successfully so it will be rmeoved and go on to the next action.
275
            completion(true, 0.0)
276
            return
277
        }
278
        
279
        var threadOwner: OTRThreadOwner? = nil
280
        var acc:OTRAccount? = nil
281
        self.databaseConnection.read({ (transaction) in
282
            threadOwner = message.threadOwner(with: transaction)
283
            acc = threadOwner?.account(with: transaction)
284
        })
285
        guard let thread = threadOwner, let account = acc else {
286
            completion(true, 0.0)
287
            return
288
        }
289
        
290
        //Get the XMPP procol manager associated with this message and therefore account
291
        guard let accountProtocol = OTRProtocolManager.sharedInstance().protocol(for: account) as? OTRXMPPManager else {
292
            completion(true, 0.0)
293
            return
294
        }
295
        
296
        /**
297
         * Message is considered successuflly sent if the stream responds with didSendMessage.
298
         * When XEP-0198 is enabled and when an ack is reveived in (x) seconds then it is later makered as failed. It is up to the user to resubmit
299
         * a msesage to be sent.
300
         */
301
        //Some way to store a message dictionary with the key and block
302
        
303
        //Ensure protocol is connected or if not and autologin then connnect
304
        if (accountProtocol.connectionStatus == .connected) {
305
            if let groupMessage = message as? OTRXMPPRoomMessage {
306
                sendGroupMessage(groupMessage, thread: thread, account: account, accountProtocol: accountProtocol, messageSendingAction: messageSendingAction, completion: completion)
307
            } else if let directMessage = message as? OTROutgoingMessage, let buddy = thread as? OTRXMPPBuddy {
308
                sendDirectMessage(directMessage, buddy: buddy, account: account, accountProtocol: accountProtocol, messageSendingAction: messageSendingAction, completion: completion)
309
            } else {
310
                // Unsupported message type
311
                completion(true, 0.0)
312
                return
313
            }
314
        } else if (account.autologin == true) {
315
            self.waitingForAccount(account.uniqueId, action: OutstandingActionInfo(action: messageSendingAction, timer: nil, completion: completion))
316
            accountProtocol.connectUserInitiated(false)
317
        } else {
318
            // The account might be connected then? even if not auto connecting we might just start up faster then the
319
            // can enter credentials. Try again in a bit myabe the account will be ready
320
            
321
            // Decided that this won't go into the retry failure because we're just waiting on the user to manually connect the account.
322
            // Not really a 'failure' but we should still try to push the messages through at some point.
323
            
324
            completion(false, self.accountRetryTimeout)
325
        }
326
    }
327
    
328
    fileprivate func addBuddyToRoster(_ addBuddyAction:OTRYapAddBuddyAction, completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
329
        
330
        var bud:OTRBuddy? = nil
331
        var acc:OTRAccount? = nil
332
        self.databaseConnection.read({ (transaction) in
333
            bud = OTRBuddy.fetchObject(withUniqueID: addBuddyAction.buddyKey, transaction: transaction)
334
            if let accountKey = bud?.accountUniqueId {
335
                acc = OTRAccount.fetchObject(withUniqueID: accountKey, transaction: transaction)
336
            }
337
            
338
        })
339
        guard let buddy = bud,let account = acc else {
340
            completion(true, 0.0)
341
            return
342
        }
343
        
344
        //Get the XMPP procol manager associated with this message and therefore account
345
        guard let accountProtocol = OTRProtocolManager.sharedInstance().protocol(for: account) as? OTRXMPPManager else {
346
            completion(true, 0.0)
347
            return
348
        }
349

    
350
        //Ensure protocol is connected or if not and autologin then connnect
351
        if (accountProtocol.connectionStatus == .connected) {
352
            // Add the buddy to our roster
353
            if let jid = XMPPJID(string: buddy.username) {
354
                accountProtocol.xmppRoster.addUser(jid, withNickname:buddy.displayName)
355
            }
356
            completion(true, 0.0)
357
        } else if (account.autologin == true) {
358
            self.waitingForAccount(account.uniqueId, action: OutstandingActionInfo(action: addBuddyAction, timer: nil, completion: completion))
359
            accountProtocol.connectUserInitiated(false)
360
        } else {
361
            // Retry later
362
            completion(false, self.accountRetryTimeout)
363
        }
364
    }
365

    
366
    fileprivate func removeBuddyFromRoster(_ removeBuddyAction:OTRYapRemoveBuddyAction, completion:@escaping (_ success: Bool, _ retryTimeout: TimeInterval) -> Void) {
367
        
368
        var acc:OTRAccount? = nil
369
        self.databaseConnection.read({ (transaction) in
370
            if let accountKey = removeBuddyAction.accountKey {
371
                acc = OTRAccount.fetchObject(withUniqueID: accountKey, transaction: transaction)
372
            }
373
        })
374
        guard let account = acc else {
375
            completion(true, 0.0)
376
            return
377
        }
378
        
379
        //Get the XMPP procol manager associated with this message and therefore account
380
        guard let accountProtocol = OTRProtocolManager.sharedInstance().protocol(for: account) as? OTRXMPPManager else {
381
            completion(true, 0.0)
382
            return
383
        }
384
        
385
        //Ensure protocol is connected or if not and autologin then connnect
386
        if accountProtocol.connectionStatus == .connected,
387
            let jidStr = removeBuddyAction.buddyJid {
388
            // Add the buddy to our roster
389
            if let jid = XMPPJID(string: jidStr) {
390
                accountProtocol.xmppRoster.removeUser(jid)
391
            }
392
            completion(true, 0.0)
393
        } else if (account.autologin == true) {
394
            self.waitingForAccount(account.uniqueId, action: OutstandingActionInfo(action: removeBuddyAction, timer: nil, completion: completion))
395
            accountProtocol.connectUserInitiated(false)
396
        } else {
397
            // Retry later
398
            completion(false, self.accountRetryTimeout)
399
        }
400
    }
401

    
402
    
403
    //Mark: Callback for Account
404
    
405
    fileprivate func handleAccountLoginNotification(_ notification:Notification) {
406
        guard let userInfo = notification.userInfo as? [String:AnyObject] else {
407
            return
408
        }
409
        if let accountKey = userInfo[kOTRNotificationAccountUniqueIdKey] as? String, let accountCollection = userInfo[kOTRNotificationAccountCollectionKey] as? String  {
410
            self.didConnectAccount(accountKey, accountCollection: accountCollection)
411
        }
412
    }
413
    
414
    fileprivate func didConnectAccount(_ accountKey:String, accountCollection:String) {
415
        
416
        guard let actionSet = self.popWaitingAccount(accountKey) else {
417
            return
418
        }
419
        
420
        for actionInfo in actionSet {
421
            self.operationQueue.addOperation { [weak self] in
422
                guard let strongSelf = self else { return }
423
                strongSelf.handleNextItem(actionInfo.action, completion: actionInfo.completion)
424
            }
425
        }
426
    }
427
    
428
    //Mark: Callback for OTRSession
429
    
430
    fileprivate func handleMessageStateDidChangeNotification(_ notification:Notification) {
431
        guard let buddy = notification.object as? OTRBuddy,
432
            let messageStateInt = (notification.userInfo?[OTRMessageStateKey] as? NSNumber)?.uintValue else {
433
                return
434
        }
435
        
436
        if  messageStateInt == OTREncryptionMessageState.encrypted.rawValue {
437
            // Buddy has gone encrypted
438
            // Check if we have an outstanding messages for this buddy
439
            guard let messageInfo = self.popWaitingBuddy(buddy.uniqueId) else {
440
                return
441
            }
442
            //Cancle outsanding timer
443
            messageInfo.timer?.invalidate()
444
            self.sendMessage(messageInfo)
445
        }
446
    }
447
    
448
    //Mark: OTR timeout
449
    @objc public func otrInitatiateTimeout(_ timer:Timer) {
450
        
451
        guard let buddyKey = timer.userInfo as? String else {
452
            return
453
        }
454
        
455
        self.operationQueue.addOperation { [weak self] in
456
            guard let strongSelf = self else {return}
457
            
458
            guard let messageInfo = strongSelf.popWaitingBuddy(buddyKey) else {
459
                return
460
            }
461
            
462
            let err = NSError.chatSecureError(EncryptionError.unableToCreateOTRSession, userInfo: nil)
463
            
464
            strongSelf.databaseConnection.readWrite({ (transaction) in
465
                if let message = (transaction.object(forKey: messageInfo.messageKey, inCollection: messageInfo.messageCollection)as? OTRBaseMessage)?.copy() as? OTRBaseMessage {
466
                    message.error = err
467
                    message.save(with: transaction)
468
                }
469
            })
470
            
471
            
472
            messageInfo.completion(true, 0.0)
473
        }
474
        
475
    }
476
    
477
    
478
    
479
}
480

    
481
//MARK: Callback from protocol
482
extension MessageQueueHandler: OTRXMPPMessageStatusModuleDelegate {
483
    
484
    public func didSendMessage(_ messageKey: String, messageCollection: String) {
485
        
486
        guard let messageInfo = self.popWaitingMessage(messageKey, messageCollection: messageCollection) else {
487
            return;
488
        }
489
        
490
        //Update date sent
491
        self.databaseConnection.asyncReadWrite { (transaction) in
492
            guard let object = transaction.object(forKey: messageKey, inCollection: messageCollection) as? NSCopying, let message = object.copy() as? OTROutgoingMessage else {
493
                return
494
            }
495
            message.dateSent = Date()
496
            message.save(with: transaction)
497
        }
498
        
499
        messageInfo.completion(true, 0.0)
500
    }
501
    
502
    public func didFailToSendMessage(_ messageKey:String, messageCollection:String, error:NSError?) {
503
        guard let messageInfo = self.popWaitingMessage(messageKey, messageCollection: messageCollection) else {
504
            return;
505
        }
506
        
507
        //Even though this action failed we need to keep the queue moving.
508
        messageInfo.completion(true, 0.0)
509
    }
510
}
511

    
512
//MARK: YapTaskQueueHandler Protocol
513
extension MessageQueueHandler: YapTaskQueueHandler {
514
    /** This method is called when an item is available to be exectued. Call completion once finished with the action item.
515
     
516
     */
517
    
518
    public func handleNextItem(_ action:YapTaskQueueAction, completion:@escaping (_ success:Bool, _ retryTimeout:TimeInterval)->Void) {
519
        switch action {
520
        case let sendMessageAction as OTRYapMessageSendAction:
521
                self.sendMessage(sendMessageAction, completion: completion)
522
        case let addBuddyAction as OTRYapAddBuddyAction:
523
                self.addBuddyToRoster(addBuddyAction, completion: completion)
524
        case let removeBuddyAction as OTRYapRemoveBuddyAction:
525
            self.removeBuddyFromRoster(removeBuddyAction, completion: completion)
526
        default: break
527
        }
528
    }
529
}
530

    
531
// Message sending logic
532
extension MessageQueueHandler {
533
    typealias MessageQueueHandlerCompletion = (_ success: Bool, _ retryTimeout: TimeInterval) -> Void
534
    
535
    func sendOTRMessage(message:OTROutgoingMessage, buddyKey:String, buddyUsername:String, accountUsername:String, accountProtocolStrintg:String, requiresActiveSession:Bool, completion:@escaping MessageQueueHandlerCompletion) {
536
        
537
        guard let text = message.text else {
538
            completion(true, 0.0)
539
            return
540
        }
541
        //We're connected now we need to check encryption requirements
542
        let otrKit = OTRProtocolManager.sharedInstance().encryptionManager.otrKit
543
        let otrKitSend = {
544
            self.waitingForMessage(message.uniqueId, messageCollection: OTROutgoingMessage.collection, messageSecurity:message.messageSecurity, completion: completion)
545
            otrKit.encodeMessage(text, tlvs: nil, username:buddyUsername , accountName: accountUsername, protocol: accountProtocolStrintg, tag: message)
546
        }
547
        
548
        if (requiresActiveSession && otrKit.messageState(forUsername: buddyUsername, accountName: accountUsername, protocol: accountProtocolStrintg) != .encrypted) {
549
            //We need to initiate an OTR session
550
            
551
            //Timeout at some point waiting for OTR session
552
            DispatchQueue.main.async {
553
                let timer = Timer.scheduledTimer(timeInterval: self.otrTimeout, target: self, selector: #selector(MessageQueueHandler.otrInitatiateTimeout(_:)), userInfo: buddyKey, repeats: false)
554
                self.waitingForBuddy(buddyKey, messageKey: message.uniqueId, messageCollection: OTROutgoingMessage.collection,messageSecurity:message.messageSecurity, timer:timer, completion: completion)
555
                otrKit.initiateEncryption(withUsername: buddyUsername, accountName: accountUsername, protocol: accountProtocolStrintg)
556
            }
557
        } else {
558
            // If we need to send it encrypted and we have a session or we don't need to encrypt send out message
559
            otrKitSend()
560
        }
561
    }
562
    
563
    func sendOMEMOMessage(message:OTROutgoingMessage, accountProtocol:OTRXMPPManager,completion:@escaping MessageQueueHandlerCompletion) {
564
        guard let text = message.text, text.count > 0 else {
565
            completion(true, 0.0)
566
            return
567
        }
568
        
569
        guard let signalCoordinator = accountProtocol.omemoSignalCoordinator else {
570
            self.databaseConnection.asyncReadWrite({ (transaction) in
571
                guard let message = OTROutgoingMessage.fetchObject(withUniqueID: message.uniqueId, transaction: transaction)?.copy() as? OTROutgoingMessage else {
572
                    return
573
                }
574
                message.error = NSError.chatSecureError(EncryptionError.omemoNotSuported, userInfo: nil)
575
                message.save(with: transaction)
576
            })
577
            completion(true, 0.0)
578
            return
579
        }
580
        self.waitingForMessage(message.uniqueId, messageCollection: OTROutgoingMessage.collection, messageSecurity:message.messageSecurity, completion: completion)
581
        
582
        
583
        
584
        signalCoordinator.encryptAndSendMessage(message, buddyYapKey: message.buddyUniqueId, messageId: message.messageId, completion: { [weak self] (success, error) in
585
            guard let strongSelf = self else {
586
                return
587
            }
588
            
589
            if (success == false) {
590
                //Something went wrong getting ready to send the message
591
                //Save error object to message
592
                strongSelf.databaseConnection.readWrite({ (transaction) in
593
                    guard let message = message.refetch(with: transaction) else {
594
                        return
595
                    }
596
                    message.error = error
597
                    message.save(with: transaction)
598
                })
599
                
600
                if let messageInfo = strongSelf.popWaitingMessage(message.uniqueId, messageCollection: type(of: message).collection) {
601
                    //Even though we were not succesfull in sending a message. The action needs to be removed from the queue so the next message can be handled.
602
                    messageInfo.completion(true, 0.0)
603
                }
604
            }
605
        })
606
    }
607
}