Finding Co-Authors using Map/Reduce

I was trying to write a map/reduce job for Hadoop using Visual Studio 2012 in a HDP for Window environment. In search for a suitable practical scenario I got some PubMed data from http://www.ncbi.nlm.nih.gov/pubmed, I decided to find the co-authors and the numbers of PubMed they published together for each individual author. Some thing like below:

PubMedArticle1               Authors {“A, B, X”}

PubMedArticle 2              Authors {“B, X, Y”}

PubMedArticle 3              Authors {“A, K”}

PubMedArticle 4              Authors {“M”}

The result will be

A      K (1), B (1), X (1)

B      A (1), X (2), Y (1)

K      A (1)

M      [No Co-Author (1)]

X      A (1), B (2), Y (1)

Y      B (1), X (1)

Below are the steps I followed.

The PubMed data is available for download in below link
http://www.ncbi.nlm.nih.gov/Structure/flink/flink.cgi

Importing Data into hdfs file system

Move the PubMed data to the hdfs file system. The path to store the file is input/pubmeds.

Writing map/reduce job using Visual Studio

Create a new console application in Visual Studio.

In solution explorer right click on the project and click on Manage NuGet Packages.

Search for hadoop in the search box. Look for the Microsoft .NET Map Reduce API For Hadoop and install it. The Microsoft .NET Map Reduce API provides assemblies which enables to write the map/reduce job.

Code

Mapper

In mapping section we have to map the data which needed to scale down. In this the data is in tab delimited format. Like below

1319 “Ditzel J, Standl E” 1975 Aug The oxygen transport system of red blood cells during diabetic ketoacidosis and recovery. “Ditzel J, Standl E.The oxygen…

We need to find the Authors data and separate each author. We have to create pairs of co-authors for each PubMed with all combination. The pairs will be the keys and as for now the value is 1 for all.

public class AuthorRelationMap :MapperBase
{
public override void Map(string inputLine, MapperContext context)
{string[] colVals = inputLine.Split(‘\t’);if (colVals.Length > 0)

{

string authorsValue = colVals[1].Replace(“\””,””);

string[] coAuthors = authorsValue.Split(‘,’);

if (coAuthors.Length == 1)

{

context.EmitKeyValue(coAuthors[0].ToUpper().Trim(), “1”);

}

foreach (string author in coAuthors)

{

for (int i = 0; i < coAuthors.Length; i++)

{

if (author != coAuthors[i])

{

context.EmitKeyValue(author.ToUpper().Trim() + ‘,’ + coAuthors[i].ToUpper().Trim(), “1”);

}

}

}

}

}

}

The output of the above code will be something like below

TANNO T,YOSHINAGA K  1TANNO T,SATO T       1

YOSHINAGA K,TANNO T  1

YOSHINAGA K,SATO T   1

SATO T,TANNO T       1

SATO T,YOSHINAGA K   1

YANAGA K,MATSUMATA T 1

YANAGA K,HAYASHI H   1

YANAGA K,SHIMADA M   1

YANAGA K,URATA K     1

YANAGA K,SUEHIRO T   1

YANAGA K,SUGIMACHI K 1

MATSUMATA T,YANAGA K 1

MATSUMATA T,HAYASHI H      1

Combiner

In combiner we have to reduce the identical key value pairs. We have to set single author as the key and the occurrences of the pair with the co-author as value.

public class CoAuthorCombiner : ReducerCombinerBase{public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context){

string[] authors = key.Split(‘,’);

if (authors.Length > 1)

{

context.EmitKeyValue(authors[0], authors[1] + ” (” + values.Count().ToString() + “)”);

}

else if(authors[0].Length>0)

{

context.EmitKeyValue(authors[0], “[No Co-Author (” + values.Count().ToString() + “)]”);

}

}

}

The output of the above code will be something like below

AAGENAES O    BANGSTAD HJ (2)

AAGENAES O    BJøRNEKLETT A (1)

AAGENAES O    BRINCHMANN-HANSEN O (2)

AAGENAES O    DAHL-JøRGENSEN K (7)

AAGENAES O    FAUSA O (1)

AAGENAES O    FRøLICH W (1)

AAGENAES O    GANES T (1)

AAGENAES O    HAAKENS K (1)

AAGENAES O    HANSSEN KF (10)

AAGENAES O    JELLING I (1)

AAGENAES O    JOø GB (1)

Reducer

In Reducer we have to combine the values for each unique key.

public class CoAuthorReducer : ReducerCombinerBase{public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context){

context.EmitKeyValue(key, values.Aggregate((x,y)=>x+”, “+y));

}

}

The output of the above code will be something like below

A AHMED K     MUNIANDY S (1), S ISMAIL I (1)
A M S  B P (1), H L K (1), M K J (1), P S (1), R N S (1), V A (1), V H P (1)
A SPINAS G    BERNAYS R (1), BRÃ¥NDLE M (1), SCHMID C (1), SEILER H (1), WIESLI P (1), ZAPF J (1), ZWIMPFER C (1)AABY P [No Co-Author (1)]

AABY SVENDSEN P      DECKERT T (1), LAURITZEN T (1), MATHIESEN ER (1), RUBIN P (1), SANDAHL CHRISTIANSEN J (1)

AAGENAES O    AASETH J (1), BANGSTAD HJ (2), BJøRNEKLETT A (1), BRINCHMANN-HANSEN O (2), DAHL-JøRGENSEN K (7), FAUSA O (1), FRøLICH W (1), GANES T (1), HAAKENS K (1), HANSSEN KF (10), JELLING I (1), JOø GB (1), KIERULF P (2), LARSEN S (1), MOSAND R (1), ODEGAARD B (1), SANDVIK L (3), SKREDE G (1), SMELAND E (1), TORJESEN P (1), VAALER S (6), WISETH R (1), MOE H (1)

AAGREN M      FAKHOURY W (1), KOTCHIE RW (1), LEREUN C (1), LOCKHART I (1)

AAKHUS S      DAHL K (1), WIDERøE TE (1)

Job

Define the map reduce job. Configure the input and output path.

public class FindCoAuthorsJob : HadoopJob<AuthorRelationMap, CoAuthorCombiner, CoAuthorReducer>{public override HadoopJobConfiguration Configure(ExecutorContext context){

var config = new HadoopJobConfiguration();

config.InputPath = “input/pubmeds”;

config.OutputFolder = “output/pubmeds”;

return config;

}

}

Execute the Job

Write the code to execute the job in main method.

static void Main(string[] args){var hadoop = Hadoop.Connect();var result = hadoop.MapReduceJob.ExecuteJob<FindCoAuthorsJob>();

Console.ReadKey();

}

Run the console program by pressing f5. If the job runs successfully you can see the out put by browsing file system.

Below is the complete code

using System;using System.Collections.Generic;using System.Linq;using System.Text;

using Microsoft.Hadoop.MapReduce;

namespace PubmedAuthorRelation

{

class Program

{

//Mapper

public class AuthorRelationMap : MapperBase

{

public override void Map(string inputLine, MapperContext context)

{

string[] colVals = inputLine.Split(‘\t’);

if (colVals.Length > 0)

{

string authorsValue = colVals[1].Replace(“\””,””);

string[] coAuthors = authorsValue.Split(‘,’);

if (coAuthors.Length == 1)

{

context.EmitKeyValue(coAuthors[0].ToUpper().Trim(), “1”);

}

foreach (string author in coAuthors)

{

for (int i = 0; i < coAuthors.Length; i++)

{

if (author != coAuthors[i])

{

context.EmitKeyValue(author.ToUpper().Trim() + ‘,’ + coAuthors[i].ToUpper().Trim(), “1”);

}

}

}

}

}

}

//Combiner

public class CoAuthorCombiner : ReducerCombinerBase

{

public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context)

{

string[] authors = key.Split(‘,’);

if (authors.Length > 1)

{

context.EmitKeyValue(authors[0], authors[1] + ” (” + values.Count().ToString() + “)”);

}

else if(authors[0].Length>0)

{

context.EmitKeyValue(authors[0], “[No Co-Author (” + values.Count().ToString() + “)]”);

}

}

}

//Reducer

public class CoAuthorReducer : ReducerCombinerBase

{

public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context)

{

context.EmitKeyValue(key, values.Aggregate((x,y)=>x+”, “+y));

}

}

//Job

public class FindCoAuthorsJob : HadoopJob<AuthorRelationMap, CoAuthorCombiner, CoAuthorReducer>

{

public override HadoopJobConfiguration Configure(ExecutorContext context)

{

var config = new HadoopJobConfiguration();

config.InputPath = “input/pubmeds”;

config.OutputFolder = “output/pubmeds”;

return config;

}

}

static void Main(string[] args)

{

var hadoop = Hadoop.Connect();

var result = hadoop.MapReduceJob.ExecuteJob<FindCoAuthorsJob>();

Console.ReadKey();

}

}

}

Leave a Reply

Close Menu