in python/dpu_utils/tfmodels/sparsegnn.py [0:0]
def __init__(self, params: Dict[str, Any]):
self.params = params
self.num_edge_types = self.params['n_edge_types']
assert self.num_edge_types > 0, 'GNN should have at least one edge type'
h_dim = self.params['hidden_size']
edge_feature_sizes = self.params.get('edge_features_size', {}) # type: Dict[int, int]
if self.params['add_backwards_edges']:
effective_num_edge_types = self.num_edge_types * 2
else:
effective_num_edge_types = self.num_edge_types
message_aggregation_type = self.params.get('message_aggregation', 'sum')
if message_aggregation_type == 'sum':
self.unsorted_segment_aggregation_func = tf.unsorted_segment_sum
elif message_aggregation_type == 'max':
self.unsorted_segment_aggregation_func = tf.unsorted_segment_max
else:
raise Exception('Unrecognized message_aggregation type %s' % message_aggregation_type)
# Generate per-layer values for edge weights, biases and gated units. If we tie them, they are just copies:
self.__weights = GGNNWeights([], [], [], [], [], [])
for layer_idx in range(len(self.params['layer_timesteps'])):
with tf.variable_scope('gnn_layer_%i' % layer_idx):
edge_weights = tf.get_variable(name='gnn_edge_weights',
shape=[effective_num_edge_types * h_dim, h_dim],
initializer=tf.glorot_normal_initializer())
edge_weights = tf.reshape(edge_weights, [effective_num_edge_types, h_dim, h_dim])
self.__weights.edge_weights.append(edge_weights)
if self.params['use_propagation_attention']:
self.__weights.edge_type_attention_weights.append(tf.get_variable(name='edge_type_attention_weights',
shape=[effective_num_edge_types],
initializer=tf.ones_initializer()))
self.__weights.edge_feature_gate_weights.append({})
self.__weights.edge_feature_gate_bias.append({})
for edge_type, edge_feature_size in edge_feature_sizes.items():
self.__weights.edge_feature_gate_weights[layer_idx][edge_type] = \
tf.get_variable(name='gnn_edge_%i_feature_gate_weights' % (edge_type,),
shape=[2 * edge_feature_size, 1],
initializer=tf.ones_initializer())
self.__weights.edge_feature_gate_bias[layer_idx][edge_type] = \
tf.get_variable(name='gnn_edge_%i_feature_gate_bias' % (edge_type,),
shape=[1],
initializer=tf.zeros_initializer())
if self.params['add_backwards_edges']:
self.__weights.edge_feature_gate_weights[layer_idx][self.num_edge_types + edge_type] = \
tf.get_variable(name='gnn_edge_%i_feature_gate_weights' % (self.num_edge_types + edge_type,),
shape=[2 * edge_feature_size, 1],
initializer=tf.ones_initializer())
self.__weights.edge_feature_gate_bias[layer_idx][self.num_edge_types + edge_type] = \
tf.get_variable(name='gnn_edge_%i_feature_gate_bias' % (self.num_edge_types + edge_type,),
shape=[1],
initializer=tf.zeros_initializer())
if self.params['use_edge_bias']:
self.__weights.edge_biases.append(tf.get_variable(name='gnn_edge_biases',
shape=[effective_num_edge_types, h_dim],
initializer=tf.zeros_initializer()))
cell = self.__create_rnn_cell(h_dim)
self.__weights.rnn_cells.append(cell)