Skip to content

blockchainReputation

BlockchainHandler

Handles interaction with Oracle and Non-Validator Node of Blockchain Network

Source code in nebula/core/aggregation/blockchainReputation.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
class BlockchainHandler:
    """
    Handles interaction with Oracle and Non-Validator Node of Blockchain Network
    """

    # static ip address of non-validator node with RPC-API
    __rpc_url = "http://172.25.0.104:8545"

    # static ip address of oracle with REST-API
    __oracle_url = "http://172.25.0.105:8081"

    # default REST header for interacting with oracle
    __rest_header = {"Content-type": "application/json", "Accept": "application/json"}

    def __init__(self, home_address):
        print_with_frame("BLOCKCHAIN INITIALIZATION: START")

        # local NEBULA name, needed for reputation system
        self.__home_ip = home_address

        # randomly generated private key, needed to sign transaction
        self.__private_key = ""

        # public wallet address generated from the private key
        self.__acc_address = ""

        # variables for experiment, not required for aggregation
        self.__gas_used = 0
        self.__gas_price = 27.3
        self.round = 1

        # generate randomized primary key
        self.__acc = self.__create_account()

        # configure web3 objects for using Proof-of-Authority
        self.__web3 = self.__initialize_web3()

        # call Oracle to sense if blockchain is ready
        print(f"{'-' * 25} CONNECT TO ORACLE {'-' * 25}", flush=True)
        self.__wait_for_blockchain()

        # request ETH funds for creating transactions, paying gas
        self.__request_funds_from_oracle()

        # check if funds were assigned by checking directly with blockchain
        self.verify_balance()

        # request contract address and header from Oracle
        self.__contract_obj = self.__get_contract_from_oracle()

        # register public wallet address at reputation system
        print(f"{'-' * 25} CONNECT TO REPUTATION SYSTEM {'-' * 25}", flush=True)
        self.__register()
        print("BLOCKCHAIN: Registered to reputation system", flush=True)

        # check if registration was successful
        self.verify_registration()
        print("BLOCKCHAIN: Verified registration", flush=True)

        print_with_frame("BLOCKCHAIN INITIALIZATION: FINISHED")

    @classmethod
    @property
    def oracle_url(cls) -> str:
        return cls.__oracle_url

    @classmethod
    @property
    def rest_header(cls) -> Mapping[str, str]:
        return cls.__rest_header

    def __create_account(self):
        """
        Generates randomized primary key and derives public account from it
        Returns: None

        """
        print(f"{'-' * 25} REGISTER WORKING NODE {'-' * 25}", flush=True)

        # generate random private key, address, public address
        acc = Account.create()

        # initialize web3 utility object
        web3 = Web3()

        # convert private key to hex, used in raw transactions
        self.__private_key = web3.to_hex(acc.key)

        # convert address type, used in raw transactions
        self.__acc_address = web3.to_checksum_address(acc.address)

        print(f"WORKER NODE: Registered account: {self.__home_ip}", flush=True)
        print(f"WORKER NODE: Account address: {self.__acc_address}", flush=True)

        # return generated account
        return acc

    def __initialize_web3(self):
        """
        Initializes Web3 object and configures it for PoA protocol
        Returns: Web3 object

        """

        # initialize Web3 object with ip of non-validator node
        web3 = Web3(Web3.HTTPProvider(self.__rpc_url, request_kwargs={"timeout": 20}))  # 10

        # inject Proof-of-Authority settings to object
        web3.middleware_onion.inject(geth_poa_middleware, layer=0)

        # automatically sign transactions if available for execution
        web3.middleware_onion.add(construct_sign_and_send_raw_middleware(self.__acc))

        # inject local account as default
        web3.eth.default_account = self.__acc_address

        # return initialized object for executing transaction
        return web3

    @retry((Exception, requests.exceptions.HTTPError), tries=20, delay=4)
    def __wait_for_blockchain(self) -> None:
        """
        Request state of blockchain from Oracle by periodic calls and sleep
        Returns: None

        """

        # check with oracle if blockchain is ready for requests
        response = requests.get(
            url=f"{self.__oracle_url}/status",
            headers=self.__rest_header,
            timeout=20,  # 10
        )

        # raise Exception if status is not successful
        response.raise_for_status()

        return print("ORACLE: Blockchain is ready", flush=True)

    @retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
    def __request_funds_from_oracle(self) -> None:
        """
        Requests funds from Oracle by sending public address
        Returns: None

        """

        # call oracle's faucet by Http post request
        response = requests.post(
            url=f"{self.__oracle_url}/faucet",
            json={"address": self.__acc_address},
            headers=self.__rest_header,
            timeout=20,  # 10
        )

        # raise Exception if status is not successful
        response.raise_for_status()

        return print("ORACLE: Received 500 ETH", flush=True)

    def verify_balance(self) -> None:
        """
        Calls blockchain directly for requesting current balance
        Returns: None

        """

        # directly call view method from non-validator node
        balance = self.__web3.eth.get_balance(self.__acc_address, "latest")

        # convert wei to ether
        balance_eth = self.__web3.from_wei(balance, "ether")
        print(
            f"BLOCKCHAIN: Successfully verified balance of {balance_eth} ETH",
            flush=True,
        )

        # if node ran out of funds, it requests ether from the oracle
        if balance_eth <= 1:
            self.__request_funds_from_oracle()

        return None

    @retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
    def __get_contract_from_oracle(self):
        """
        Requests header file and contract address, generates Web3 Contract object with it
        Returns: Web3 Contract object

        """

        response = requests.get(
            url=f"{self.__oracle_url}/contract",
            headers=self.__rest_header,
            timeout=20,  # 10
        )

        # raise Exception if status is not successful
        response.raise_for_status()

        # convert response to json to extract the abi and address
        json_response = response.json()

        print(
            f"ORACLE: Initialized chain code: {json_response.get('address')}",
            flush=True,
        )

        # return an initialized web3 contract object
        return self.__web3.eth.contract(abi=json_response.get("abi"), address=json_response.get("address"))

    @retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
    def report_gas_oracle(self) -> list:
        """
        Reports accumulated gas costs of all transactions made to the blockchain
        Returns: List of all accumulated gas costs per registered node

        """

        # method used for experiments, not needed for aggregation
        response = requests.post(
            url=f"{self.__oracle_url}/gas",
            json={"amount": self.__gas_used, "round": self.round},
            headers=self.__rest_header,
            timeout=20,  # 10
        )

        # raise Exception if status is not successful
        response.raise_for_status()

        # reset local gas accumulation
        self.__gas_used = 0

        # return list with gas usage for logging
        return list(response.json().items())

    @retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
    def report_reputation_oracle(self, records: list) -> None:
        """
        Reports reputations used for aggregation
        Returns: None

        """

        # method used for experiments, not needed for aggregation
        response = requests.post(
            url=f"{self.__oracle_url}/reputation",
            json={"records": records, "round": self.round, "sender": self.__home_ip},
            headers=self.__rest_header,
            timeout=20,  # 10
        )

        # raise Exception if status is not successful
        response.raise_for_status()

        return None

    def __sign_and_deploy(self, trx_hash):
        """
        Signs a function call to the chain code with the primary key and awaits the receipt
        Args:
            trx_hash: Transformed dictionary of all properties relevant for call to chain code

        Returns: transaction receipt confirming the successful write to the ledger

        """

        # transaction is signed with private key
        signed_transaction = self.__web3.eth.account.sign_transaction(trx_hash, private_key=self.__private_key)

        # confirmation that transaction was passed from non-validator node to validator nodes
        executed_transaction = self.__web3.eth.send_raw_transaction(signed_transaction.rawTransaction)

        # non-validator node awaited the successful validation by validation nodes and returns receipt
        transaction_receipt = self.__web3.eth.wait_for_transaction_receipt(executed_transaction)

        # accumulate used gas
        self.__gas_used += transaction_receipt.gasUsed

        return transaction_receipt

    @retry(Exception, tries=3, delay=4)
    def push_opinions(self, opinion_dict: dict):
        """
        Pushes all locally computed opinions of models to aggregate to the reputation system
        Args:
            opinion_dict: Dict of all names:opinions for writing to the reputation system

        Returns: Json of transaction receipt

        """

        # create raw transaction object to call rate_neighbors() from the reputation system
        unsigned_trx = self.__contract_obj.functions.rate_neighbours(list(opinion_dict.items())).build_transaction({
            "chainId": self.__web3.eth.chain_id,
            "from": self.__acc_address,
            "nonce": self.__web3.eth.get_transaction_count(
                self.__web3.to_checksum_address(self.__acc_address), "pending"
            ),
            "gasPrice": self.__web3.to_wei(self.__gas_price, "gwei"),
        })

        # sign and execute the transaction
        conf = self.__sign_and_deploy(unsigned_trx)

        self.report_reputation_oracle(list(opinion_dict.items()))
        # return the receipt as json
        return self.__web3.to_json(conf)

    @retry(Exception, tries=3, delay=4)
    def get_reputations(self, ip_addresses: list) -> dict:
        """
        Requests globally aggregated opinions values from reputation system for computing aggregation weights
        Args:
            ip_addresses: Names of nodes of which the reputation values should be generated

        Returns: Dictionary of name:reputation from the reputation system

        """

        final_reputations = dict()
        stats_to_print = list()

        # call get_reputations() from reputation system
        raw_reputation = self.__contract_obj.functions.get_reputations(ip_addresses).call({"from": self.__acc_address})

        # loop list with tuples from reputation system
        for (
            name,
            reputation,
            weighted_reputation,
            stddev_count,
            divisor,
            final_reputation,
            avg,
            median,
            stddev,
            index,
            avg_deviation,
            avg_avg_deviation,
            malicious_opinions,
        ) in raw_reputation:
            # list elements with an empty name can be ignored
            if not name:
                continue

            # print statistical values
            stats_to_print.append([
                name,
                reputation / 10,
                weighted_reputation / 10,
                stddev_count / 10,
                divisor / 10,
                final_reputation / 10,
                avg / 10,
                median / 10,
                stddev / 10,
                avg_deviation / 10,
                avg_avg_deviation / 10,
                malicious_opinions,
            ])

            # assign the final reputation to a dict for later aggregation
            final_reputations[name] = final_reputation / 10

        print_table(
            "REPUTATION SYSTEM STATE",
            stats_to_print,
            [
                "Name",
                "Reputation",
                "Weighted Rep. by local Node",
                "Stddev Count",
                "Divisor",
                "Final Reputation",
                "Mean",
                "Median",
                "Stddev",
                "Avg Deviation in Opinion",
                "Avg of all Avg Deviations in Opinions",
                "Malicious Opinions",
            ],
        )

        # if sum(final_reputations.values()):
        #     self.report_reputation_oracle(list(final_reputations.items()))

        return final_reputations

    @retry(Exception, tries=3, delay=4)
    def __register(self) -> str:
        """
        Registers a node's name with its public address, signed with private key
        Returns: Json of transaction receipt

        """

        # build raw transaction object to call public method register() from reputation system
        unsigned_trx = self.__contract_obj.functions.register(self.__home_ip).build_transaction({
            "chainId": self.__web3.eth.chain_id,
            "from": self.__acc_address,
            "nonce": self.__web3.eth.get_transaction_count(
                self.__web3.to_checksum_address(self.__acc_address), "pending"
            ),
            "gasPrice": self.__web3.to_wei(self.__gas_price, "gwei"),
        })

        # sign and execute created transaction
        conf = self.__sign_and_deploy(unsigned_trx)

        # return the receipt as json
        return self.__web3.to_json(conf)

    @retry(Exception, tries=3, delay=4)
    def verify_registration(self) -> None:
        """
        Verifies the successful registration of the node itself,
        executes registration again if reputation system returns false
        Returns: None

        """

        # call view function of reputation system to check if registration was not abandoned by hard fork
        confirmation = self.__contract_obj.functions.confirm_registration().call({"from": self.__acc_address})

        # function returns boolean
        if not confirmation:
            # register again if not successful
            self.__register()

            # raise Exception to check again
            raise Exception("EXCEPTION: _verify_registration() => Could not be confirmed)")

        return None

    @retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
    def report_time_oracle(self, start: float) -> None:
        """
        Reports time used for aggregation
        Returns: None

        """
        # method used for experiments, not needed for aggregation
        # report aggregation time and round to oracle
        response = requests.post(
            url=f"{BlockchainHandler.oracle_url}/time",
            json={"time": (time.time_ns() - start) / (10**9), "round": self.round},
            headers=self.__rest_header,
            timeout=20,  # 10
        )

        # raise Exception if status is not successful
        response.raise_for_status()

        # increase aggregation round counter after reporting time
        self.round += 1
        return None

__create_account()

Generates randomized primary key and derives public account from it Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def __create_account(self):
    """
    Generates randomized primary key and derives public account from it
    Returns: None

    """
    print(f"{'-' * 25} REGISTER WORKING NODE {'-' * 25}", flush=True)

    # generate random private key, address, public address
    acc = Account.create()

    # initialize web3 utility object
    web3 = Web3()

    # convert private key to hex, used in raw transactions
    self.__private_key = web3.to_hex(acc.key)

    # convert address type, used in raw transactions
    self.__acc_address = web3.to_checksum_address(acc.address)

    print(f"WORKER NODE: Registered account: {self.__home_ip}", flush=True)
    print(f"WORKER NODE: Account address: {self.__acc_address}", flush=True)

    # return generated account
    return acc

__get_contract_from_oracle()

Requests header file and contract address, generates Web3 Contract object with it Returns: Web3 Contract object

Source code in nebula/core/aggregation/blockchainReputation.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
@retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
def __get_contract_from_oracle(self):
    """
    Requests header file and contract address, generates Web3 Contract object with it
    Returns: Web3 Contract object

    """

    response = requests.get(
        url=f"{self.__oracle_url}/contract",
        headers=self.__rest_header,
        timeout=20,  # 10
    )

    # raise Exception if status is not successful
    response.raise_for_status()

    # convert response to json to extract the abi and address
    json_response = response.json()

    print(
        f"ORACLE: Initialized chain code: {json_response.get('address')}",
        flush=True,
    )

    # return an initialized web3 contract object
    return self.__web3.eth.contract(abi=json_response.get("abi"), address=json_response.get("address"))

__initialize_web3()

Initializes Web3 object and configures it for PoA protocol Returns: Web3 object

Source code in nebula/core/aggregation/blockchainReputation.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def __initialize_web3(self):
    """
    Initializes Web3 object and configures it for PoA protocol
    Returns: Web3 object

    """

    # initialize Web3 object with ip of non-validator node
    web3 = Web3(Web3.HTTPProvider(self.__rpc_url, request_kwargs={"timeout": 20}))  # 10

    # inject Proof-of-Authority settings to object
    web3.middleware_onion.inject(geth_poa_middleware, layer=0)

    # automatically sign transactions if available for execution
    web3.middleware_onion.add(construct_sign_and_send_raw_middleware(self.__acc))

    # inject local account as default
    web3.eth.default_account = self.__acc_address

    # return initialized object for executing transaction
    return web3

__register()

Registers a node's name with its public address, signed with private key Returns: Json of transaction receipt

Source code in nebula/core/aggregation/blockchainReputation.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
@retry(Exception, tries=3, delay=4)
def __register(self) -> str:
    """
    Registers a node's name with its public address, signed with private key
    Returns: Json of transaction receipt

    """

    # build raw transaction object to call public method register() from reputation system
    unsigned_trx = self.__contract_obj.functions.register(self.__home_ip).build_transaction({
        "chainId": self.__web3.eth.chain_id,
        "from": self.__acc_address,
        "nonce": self.__web3.eth.get_transaction_count(
            self.__web3.to_checksum_address(self.__acc_address), "pending"
        ),
        "gasPrice": self.__web3.to_wei(self.__gas_price, "gwei"),
    })

    # sign and execute created transaction
    conf = self.__sign_and_deploy(unsigned_trx)

    # return the receipt as json
    return self.__web3.to_json(conf)

__request_funds_from_oracle()

Requests funds from Oracle by sending public address Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
@retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
def __request_funds_from_oracle(self) -> None:
    """
    Requests funds from Oracle by sending public address
    Returns: None

    """

    # call oracle's faucet by Http post request
    response = requests.post(
        url=f"{self.__oracle_url}/faucet",
        json={"address": self.__acc_address},
        headers=self.__rest_header,
        timeout=20,  # 10
    )

    # raise Exception if status is not successful
    response.raise_for_status()

    return print("ORACLE: Received 500 ETH", flush=True)

__sign_and_deploy(trx_hash)

Signs a function call to the chain code with the primary key and awaits the receipt Args: trx_hash: Transformed dictionary of all properties relevant for call to chain code

Returns: transaction receipt confirming the successful write to the ledger

Source code in nebula/core/aggregation/blockchainReputation.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def __sign_and_deploy(self, trx_hash):
    """
    Signs a function call to the chain code with the primary key and awaits the receipt
    Args:
        trx_hash: Transformed dictionary of all properties relevant for call to chain code

    Returns: transaction receipt confirming the successful write to the ledger

    """

    # transaction is signed with private key
    signed_transaction = self.__web3.eth.account.sign_transaction(trx_hash, private_key=self.__private_key)

    # confirmation that transaction was passed from non-validator node to validator nodes
    executed_transaction = self.__web3.eth.send_raw_transaction(signed_transaction.rawTransaction)

    # non-validator node awaited the successful validation by validation nodes and returns receipt
    transaction_receipt = self.__web3.eth.wait_for_transaction_receipt(executed_transaction)

    # accumulate used gas
    self.__gas_used += transaction_receipt.gasUsed

    return transaction_receipt

__wait_for_blockchain()

Request state of blockchain from Oracle by periodic calls and sleep Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
@retry((Exception, requests.exceptions.HTTPError), tries=20, delay=4)
def __wait_for_blockchain(self) -> None:
    """
    Request state of blockchain from Oracle by periodic calls and sleep
    Returns: None

    """

    # check with oracle if blockchain is ready for requests
    response = requests.get(
        url=f"{self.__oracle_url}/status",
        headers=self.__rest_header,
        timeout=20,  # 10
    )

    # raise Exception if status is not successful
    response.raise_for_status()

    return print("ORACLE: Blockchain is ready", flush=True)

get_reputations(ip_addresses)

Requests globally aggregated opinions values from reputation system for computing aggregation weights Args: ip_addresses: Names of nodes of which the reputation values should be generated

Returns: Dictionary of name:reputation from the reputation system

Source code in nebula/core/aggregation/blockchainReputation.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
@retry(Exception, tries=3, delay=4)
def get_reputations(self, ip_addresses: list) -> dict:
    """
    Requests globally aggregated opinions values from reputation system for computing aggregation weights
    Args:
        ip_addresses: Names of nodes of which the reputation values should be generated

    Returns: Dictionary of name:reputation from the reputation system

    """

    final_reputations = dict()
    stats_to_print = list()

    # call get_reputations() from reputation system
    raw_reputation = self.__contract_obj.functions.get_reputations(ip_addresses).call({"from": self.__acc_address})

    # loop list with tuples from reputation system
    for (
        name,
        reputation,
        weighted_reputation,
        stddev_count,
        divisor,
        final_reputation,
        avg,
        median,
        stddev,
        index,
        avg_deviation,
        avg_avg_deviation,
        malicious_opinions,
    ) in raw_reputation:
        # list elements with an empty name can be ignored
        if not name:
            continue

        # print statistical values
        stats_to_print.append([
            name,
            reputation / 10,
            weighted_reputation / 10,
            stddev_count / 10,
            divisor / 10,
            final_reputation / 10,
            avg / 10,
            median / 10,
            stddev / 10,
            avg_deviation / 10,
            avg_avg_deviation / 10,
            malicious_opinions,
        ])

        # assign the final reputation to a dict for later aggregation
        final_reputations[name] = final_reputation / 10

    print_table(
        "REPUTATION SYSTEM STATE",
        stats_to_print,
        [
            "Name",
            "Reputation",
            "Weighted Rep. by local Node",
            "Stddev Count",
            "Divisor",
            "Final Reputation",
            "Mean",
            "Median",
            "Stddev",
            "Avg Deviation in Opinion",
            "Avg of all Avg Deviations in Opinions",
            "Malicious Opinions",
        ],
    )

    # if sum(final_reputations.values()):
    #     self.report_reputation_oracle(list(final_reputations.items()))

    return final_reputations

push_opinions(opinion_dict)

Pushes all locally computed opinions of models to aggregate to the reputation system Args: opinion_dict: Dict of all names:opinions for writing to the reputation system

Returns: Json of transaction receipt

Source code in nebula/core/aggregation/blockchainReputation.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
@retry(Exception, tries=3, delay=4)
def push_opinions(self, opinion_dict: dict):
    """
    Pushes all locally computed opinions of models to aggregate to the reputation system
    Args:
        opinion_dict: Dict of all names:opinions for writing to the reputation system

    Returns: Json of transaction receipt

    """

    # create raw transaction object to call rate_neighbors() from the reputation system
    unsigned_trx = self.__contract_obj.functions.rate_neighbours(list(opinion_dict.items())).build_transaction({
        "chainId": self.__web3.eth.chain_id,
        "from": self.__acc_address,
        "nonce": self.__web3.eth.get_transaction_count(
            self.__web3.to_checksum_address(self.__acc_address), "pending"
        ),
        "gasPrice": self.__web3.to_wei(self.__gas_price, "gwei"),
    })

    # sign and execute the transaction
    conf = self.__sign_and_deploy(unsigned_trx)

    self.report_reputation_oracle(list(opinion_dict.items()))
    # return the receipt as json
    return self.__web3.to_json(conf)

report_gas_oracle()

Reports accumulated gas costs of all transactions made to the blockchain Returns: List of all accumulated gas costs per registered node

Source code in nebula/core/aggregation/blockchainReputation.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
@retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
def report_gas_oracle(self) -> list:
    """
    Reports accumulated gas costs of all transactions made to the blockchain
    Returns: List of all accumulated gas costs per registered node

    """

    # method used for experiments, not needed for aggregation
    response = requests.post(
        url=f"{self.__oracle_url}/gas",
        json={"amount": self.__gas_used, "round": self.round},
        headers=self.__rest_header,
        timeout=20,  # 10
    )

    # raise Exception if status is not successful
    response.raise_for_status()

    # reset local gas accumulation
    self.__gas_used = 0

    # return list with gas usage for logging
    return list(response.json().items())

report_reputation_oracle(records)

Reports reputations used for aggregation Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
@retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
def report_reputation_oracle(self, records: list) -> None:
    """
    Reports reputations used for aggregation
    Returns: None

    """

    # method used for experiments, not needed for aggregation
    response = requests.post(
        url=f"{self.__oracle_url}/reputation",
        json={"records": records, "round": self.round, "sender": self.__home_ip},
        headers=self.__rest_header,
        timeout=20,  # 10
    )

    # raise Exception if status is not successful
    response.raise_for_status()

    return None

report_time_oracle(start)

Reports time used for aggregation Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
@retry((Exception, requests.exceptions.HTTPError), tries=3, delay=4)
def report_time_oracle(self, start: float) -> None:
    """
    Reports time used for aggregation
    Returns: None

    """
    # method used for experiments, not needed for aggregation
    # report aggregation time and round to oracle
    response = requests.post(
        url=f"{BlockchainHandler.oracle_url}/time",
        json={"time": (time.time_ns() - start) / (10**9), "round": self.round},
        headers=self.__rest_header,
        timeout=20,  # 10
    )

    # raise Exception if status is not successful
    response.raise_for_status()

    # increase aggregation round counter after reporting time
    self.round += 1
    return None

verify_balance()

Calls blockchain directly for requesting current balance Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def verify_balance(self) -> None:
    """
    Calls blockchain directly for requesting current balance
    Returns: None

    """

    # directly call view method from non-validator node
    balance = self.__web3.eth.get_balance(self.__acc_address, "latest")

    # convert wei to ether
    balance_eth = self.__web3.from_wei(balance, "ether")
    print(
        f"BLOCKCHAIN: Successfully verified balance of {balance_eth} ETH",
        flush=True,
    )

    # if node ran out of funds, it requests ether from the oracle
    if balance_eth <= 1:
        self.__request_funds_from_oracle()

    return None

verify_registration()

Verifies the successful registration of the node itself, executes registration again if reputation system returns false Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
@retry(Exception, tries=3, delay=4)
def verify_registration(self) -> None:
    """
    Verifies the successful registration of the node itself,
    executes registration again if reputation system returns false
    Returns: None

    """

    # call view function of reputation system to check if registration was not abandoned by hard fork
    confirmation = self.__contract_obj.functions.confirm_registration().call({"from": self.__acc_address})

    # function returns boolean
    if not confirmation:
        # register again if not successful
        self.__register()

        # raise Exception to check again
        raise Exception("EXCEPTION: _verify_registration() => Could not be confirmed)")

    return None

BlockchainReputation

Bases: Aggregator

BAT-SandrinHunkeler (BlockchainReputation)

Weighted FedAvg by using relative reputation of each model's trainer Returns: aggregated model

Source code in nebula/core/aggregation/blockchainReputation.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class BlockchainReputation(Aggregator):
    """
    # BAT-SandrinHunkeler (BlockchainReputation)
    Weighted FedAvg by using relative reputation of each model's trainer
    Returns: aggregated model
    """

    ALGORITHM_MAP = {
        "Cossim": cosine_metric,
        "Pearson": pearson_correlation_metric,
        "Euclidean": euclidean_metric,
        "Minkowski": minkowski_metric,
        "Manhattan": manhattan_metric,
        "Jaccard": jaccard_metric,
        "CossimEuclid": cossim_euclidean,
    }

    def __init__(self, similarity_metric: str = "CossimEuclid", config=None, **kwargs):
        # initialize parent class
        super().__init__(config, **kwargs)

        self.config = config

        # extract local NEBULA name
        self.node_name = self.config.participant["network_args"]["addr"]

        # initialize BlockchainHandler for interacting with oracle and non-validator node
        self.__blockchain = BlockchainHandler(self.node_name)

        # check if node is malicious for debugging
        self.__malicious = self.config.participant["device_args"]["malicious"]

        self.__opinion_algo = BlockchainReputation.ALGORITHM_MAP.get(similarity_metric)
        self.__similarity_metric = similarity_metric

    def run_aggregation(self, model_buffer: OrderedDict[str, OrderedDict[torch.Tensor, int]]) -> torch.Tensor:
        print_with_frame("BLOCKCHAIN AGGREGATION: START")

        # track aggregation time for experiments
        start = time.time_ns()

        # verify the registration process during initialization of the BlockchainHandler
        self.__blockchain.verify_registration()

        # verify if ether balance is still sufficient for aggregating, request more otherwise
        self.__blockchain.verify_balance()

        # create dict<sender, model>
        current_models = {sender: model for sender, (model, weight) in model_buffer.items()}

        print(f"Node: {self.node_name}", flush=True)
        print(f"self.__malicious: {self.__malicious}", flush=True)

        # extract local model from models to aggregate
        local_model = model_buffer[self.node_name][0]

        # compute similarity between local model and all buffered models
        metric_values = {
            sender: max(
                min(
                    round(
                        self.__opinion_algo(local_model, current_models[sender], similarity=True),
                        5,
                    ),
                    1,
                ),
                0,
            )
            for sender in current_models
            if sender != self.node_name
        }

        # log similarity metric values
        print_table(
            "SIMILARITY METRIC",
            list(metric_values.items()),
            ["neighbour Node", f"{self.__similarity_metric} Similarity"],
        )

        # increase resolution of metric in upper half of interval
        opinion_values = {sender: round(metric**3 * 100) for sender, metric in metric_values.items()}

        # DEBUG
        if int(self.node_name[-7]) <= 1 and self.__blockchain.round >= 5:
            opinion_values = {sender: int(torch.randint(0, 101, (1,))[0]) for sender, metric in metric_values.items()}

        # push local opinions to reputation system
        self.__blockchain.push_opinions(opinion_values)

        # log pushed opinion values
        print_table(
            "REPORT LOCAL OPINION",
            list(opinion_values.items()),
            ["Node", f"Transformed {self.__similarity_metric} Similarity"],
        )

        # request global reputation values from reputation system
        reputation_values = self.__blockchain.get_reputations([sender for sender in current_models])

        # log received global reputations
        print_table(
            "GLOBAL REPUTATION",
            list(reputation_values.items()),
            ["Node", "Global Reputation"],
        )

        # normalize all reputation values to sum() == 1
        sum_reputations = sum(reputation_values.values())
        if sum_reputations > 0:
            normalized_reputation_values = {
                name: round(reputation_values[name] / sum_reputations, 3) for name in reputation_values
            }
        else:
            normalized_reputation_values = reputation_values

        # log normalized aggregation weights
        print_table(
            "AGGREGATION WEIGHTS",
            list(normalized_reputation_values.items()),
            ["Node", "Aggregation Weight"],
        )

        # initialize empty model
        final_model = {layer: torch.zeros_like(param).float() for layer, param in local_model.items()}

        # cover rare case where no models were added or reputation is 0 to return the local model
        if sum_reputations > 0:
            for sender in normalized_reputation_values.keys():
                for layer in final_model:
                    final_model[layer] += current_models[sender][layer].float() * normalized_reputation_values[sender]

        # otherwise, skip aggregation
        else:
            final_model = local_model

        # report used gas to oracle and log cumulative gas used
        print_table(
            "TOTAL GAS USED",
            self.__blockchain.report_gas_oracle(),
            ["Node", "Cumulative Gas used"],
        )
        self.__blockchain.report_time_oracle(start)

        print_with_frame("BLOCKCHAIN AGGREGATION: FINISHED")

        # return newly aggregated model
        return final_model

print_table(title, values, headers)

Prints a title, all values ordered in a table, with the headers as column titles. Args: title: Title of the table values: Rows of table headers: Column headers of table

Returns: None, prints output

Source code in nebula/core/aggregation/blockchainReputation.py
179
180
181
182
183
184
185
186
187
188
189
190
191
def print_table(title: str, values: list[tuple | list], headers: list[str]) -> None:
    """
    Prints a title, all values ordered in a table, with the headers as column titles.
    Args:
        title: Title of the table
        values: Rows of table
        headers: Column headers of table

    Returns: None, prints output

    """
    print(f"\n{'-' * 25} {title.upper()} {'-' * 25}", flush=True)
    print(tabulate(sorted(values), headers=headers, tablefmt="grid"), flush=True)

print_with_frame(message)

Prints a large frame with a title inside Args: message: Title to put into the frame

Returns: None

Source code in nebula/core/aggregation/blockchainReputation.py
194
195
196
197
198
199
200
201
202
203
204
205
206
def print_with_frame(message) -> None:
    """
    Prints a large frame with a title inside
    Args:
        message: Title to put into the frame

    Returns: None

    """
    message_length = len(message)
    print(f"{' ' * 20}+{'-' * (message_length + 2)}+", flush=True)
    print(f"{'*' * 20}| {message.upper()} |{'*' * 20}", flush=True)
    print(f"{' ' * 20}+{'-' * (message_length + 2)}+", flush=True)